1use crate::{
2 call_settings::CallSettings,
3 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant},
4};
5use anyhow::{anyhow, Result};
6use audio::{Audio, Sound};
7use client::{
8 proto::{self, PeerId},
9 Client, ParticipantIndex, TypedEnvelope, User, UserStore,
10};
11use collections::{BTreeMap, HashMap, HashSet};
12use fs::Fs;
13use futures::{FutureExt, StreamExt};
14use gpui::{
15 AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
16};
17use language::LanguageRegistry;
18use live_kit_client::{LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RoomUpdate};
19use postage::{sink::Sink, stream::Stream, watch};
20use project::Project;
21use settings::Settings as _;
22use std::{future::Future, mem, sync::Arc, time::Duration};
23use util::{post_inc, ResultExt, TryFutureExt};
24
25pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
26
27#[derive(Clone, Debug, PartialEq, Eq)]
28pub enum Event {
29 RoomJoined {
30 channel_id: Option<u64>,
31 },
32 ParticipantLocationChanged {
33 participant_id: proto::PeerId,
34 },
35 RemoteVideoTracksChanged {
36 participant_id: proto::PeerId,
37 },
38 RemoteAudioTracksChanged {
39 participant_id: proto::PeerId,
40 },
41 RemoteProjectShared {
42 owner: Arc<User>,
43 project_id: u64,
44 worktree_root_names: Vec<String>,
45 },
46 RemoteProjectUnshared {
47 project_id: u64,
48 },
49 RemoteProjectJoined {
50 project_id: u64,
51 },
52 RemoteProjectInvitationDiscarded {
53 project_id: u64,
54 },
55 Left {
56 channel_id: Option<u64>,
57 },
58}
59
60pub struct Room {
61 id: u64,
62 channel_id: Option<u64>,
63 live_kit: Option<LiveKitRoom>,
64 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
65 status: RoomStatus,
66 shared_projects: HashSet<WeakModel<Project>>,
67 joined_projects: HashSet<WeakModel<Project>>,
68 local_participant: LocalParticipant,
69 remote_participants: BTreeMap<u64, RemoteParticipant>,
70 pending_participants: Vec<Arc<User>>,
71 participant_user_ids: HashSet<u64>,
72 pending_call_count: usize,
73 leave_when_empty: bool,
74 client: Arc<Client>,
75 user_store: Model<UserStore>,
76 follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
77 client_subscriptions: Vec<client::Subscription>,
78 _subscriptions: Vec<gpui::Subscription>,
79 room_update_completed_tx: watch::Sender<Option<()>>,
80 room_update_completed_rx: watch::Receiver<Option<()>>,
81 pending_room_update: Option<Task<()>>,
82 maintain_connection: Option<Task<Option<()>>>,
83}
84
85impl EventEmitter<Event> for Room {}
86
87impl Room {
88 pub fn channel_id(&self) -> Option<u64> {
89 self.channel_id
90 }
91
92 pub fn is_sharing_project(&self) -> bool {
93 !self.shared_projects.is_empty()
94 }
95
96 #[cfg(any(test, feature = "test-support"))]
97 pub fn is_connected(&self) -> bool {
98 if let Some(live_kit) = self.live_kit.as_ref() {
99 matches!(
100 *live_kit.room.status().borrow(),
101 live_kit_client::ConnectionState::Connected { .. }
102 )
103 } else {
104 false
105 }
106 }
107
108 fn new(
109 id: u64,
110 channel_id: Option<u64>,
111 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
112 client: Arc<Client>,
113 user_store: Model<UserStore>,
114 cx: &mut ModelContext<Self>,
115 ) -> Self {
116 let maintain_connection = cx.spawn({
117 let client = client.clone();
118 move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
119 });
120
121 let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
122
123 let mut this = Self {
124 id,
125 channel_id,
126 live_kit: None,
127 live_kit_connection_info,
128 status: RoomStatus::Online,
129 shared_projects: Default::default(),
130 joined_projects: Default::default(),
131 participant_user_ids: Default::default(),
132 local_participant: Default::default(),
133 remote_participants: Default::default(),
134 pending_participants: Default::default(),
135 pending_call_count: 0,
136 client_subscriptions: vec![
137 client.add_message_handler(cx.weak_model(), Self::handle_room_updated)
138 ],
139 _subscriptions: vec![
140 cx.on_release(Self::released),
141 cx.on_app_quit(Self::app_will_quit),
142 ],
143 leave_when_empty: false,
144 pending_room_update: None,
145 client,
146 user_store,
147 follows_by_leader_id_project_id: Default::default(),
148 maintain_connection: Some(maintain_connection),
149 room_update_completed_tx,
150 room_update_completed_rx,
151 };
152 if this.live_kit_connection_info.is_some() {
153 this.join_call(cx).detach_and_log_err(cx);
154 }
155 this
156 }
157
158 pub(crate) fn create(
159 called_user_id: u64,
160 initial_project: Option<Model<Project>>,
161 client: Arc<Client>,
162 user_store: Model<UserStore>,
163 cx: &mut AppContext,
164 ) -> Task<Result<Model<Self>>> {
165 cx.spawn(move |mut cx| async move {
166 let response = client.request(proto::CreateRoom {}).await?;
167 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
168 let room = cx.new_model(|cx| {
169 let mut room = Self::new(
170 room_proto.id,
171 None,
172 response.live_kit_connection_info,
173 client,
174 user_store,
175 cx,
176 );
177 if let Some(participant) = room_proto.participants.first() {
178 room.local_participant.role = participant.role()
179 }
180 room
181 })?;
182
183 let initial_project_id = if let Some(initial_project) = initial_project {
184 let initial_project_id = room
185 .update(&mut cx, |room, cx| {
186 room.share_project(initial_project.clone(), cx)
187 })?
188 .await?;
189 Some(initial_project_id)
190 } else {
191 None
192 };
193
194 match room
195 .update(&mut cx, |room, cx| {
196 room.leave_when_empty = true;
197 room.call(called_user_id, initial_project_id, cx)
198 })?
199 .await
200 {
201 Ok(()) => Ok(room),
202 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
203 }
204 })
205 }
206
207 pub(crate) async fn join_channel(
208 channel_id: u64,
209 client: Arc<Client>,
210 user_store: Model<UserStore>,
211 cx: AsyncAppContext,
212 ) -> Result<Model<Self>> {
213 Self::from_join_response(
214 client.request(proto::JoinChannel2 { channel_id }).await?,
215 client,
216 user_store,
217 cx,
218 )
219 }
220
221 pub(crate) async fn join(
222 room_id: u64,
223 client: Arc<Client>,
224 user_store: Model<UserStore>,
225 cx: AsyncAppContext,
226 ) -> Result<Model<Self>> {
227 Self::from_join_response(
228 client.request(proto::JoinRoom { id: room_id }).await?,
229 client,
230 user_store,
231 cx,
232 )
233 }
234
235 fn released(&mut self, cx: &mut AppContext) {
236 if self.status.is_online() {
237 self.leave_internal(cx).detach_and_log_err(cx);
238 }
239 }
240
241 fn app_will_quit(&mut self, cx: &mut ModelContext<Self>) -> impl Future<Output = ()> {
242 let task = if self.status.is_online() {
243 let leave = self.leave_internal(cx);
244 Some(cx.background_executor().spawn(async move {
245 leave.await.log_err();
246 }))
247 } else {
248 None
249 };
250
251 async move {
252 if let Some(task) = task {
253 task.await;
254 }
255 }
256 }
257
258 pub fn mute_on_join(cx: &AppContext) -> bool {
259 CallSettings::get_global(cx).mute_on_join
260 }
261
262 fn from_join_response(
263 response: proto::JoinRoomResponse,
264 client: Arc<Client>,
265 user_store: Model<UserStore>,
266 mut cx: AsyncAppContext,
267 ) -> Result<Model<Self>> {
268 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
269 let room = cx.new_model(|cx| {
270 Self::new(
271 room_proto.id,
272 response.channel_id,
273 response.live_kit_connection_info,
274 client,
275 user_store,
276 cx,
277 )
278 })?;
279 room.update(&mut cx, |room, cx| {
280 room.leave_when_empty = room.channel_id.is_none();
281 room.apply_room_update(room_proto, cx)?;
282 anyhow::Ok(())
283 })??;
284 Ok(room)
285 }
286
287 fn should_leave(&self) -> bool {
288 self.leave_when_empty
289 && self.pending_room_update.is_none()
290 && self.pending_participants.is_empty()
291 && self.remote_participants.is_empty()
292 && self.pending_call_count == 0
293 }
294
295 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
296 cx.notify();
297 cx.emit(Event::Left {
298 channel_id: self.channel_id(),
299 });
300 self.leave_internal(cx)
301 }
302
303 fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
304 if self.status.is_offline() {
305 return Task::ready(Err(anyhow!("room is offline")));
306 }
307
308 log::info!("leaving room");
309 if self.live_kit.is_some() {
310 Audio::play_sound(Sound::Leave, cx);
311 }
312
313 self.clear_state(cx);
314
315 let leave_room = self.client.request(proto::LeaveRoom {});
316 cx.background_executor().spawn(async move {
317 leave_room.await?;
318 anyhow::Ok(())
319 })
320 }
321
322 pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
323 for project in self.shared_projects.drain() {
324 if let Some(project) = project.upgrade() {
325 project.update(cx, |project, cx| {
326 project.unshare(cx).log_err();
327 });
328 }
329 }
330 for project in self.joined_projects.drain() {
331 if let Some(project) = project.upgrade() {
332 project.update(cx, |project, cx| {
333 project.disconnected_from_host(cx);
334 project.close(cx);
335 });
336 }
337 }
338
339 self.status = RoomStatus::Offline;
340 self.remote_participants.clear();
341 self.pending_participants.clear();
342 self.participant_user_ids.clear();
343 self.client_subscriptions.clear();
344 self.live_kit.take();
345 self.pending_room_update.take();
346 self.maintain_connection.take();
347 }
348
349 async fn maintain_connection(
350 this: WeakModel<Self>,
351 client: Arc<Client>,
352 mut cx: AsyncAppContext,
353 ) -> Result<()> {
354 let mut client_status = client.status();
355 loop {
356 let _ = client_status.try_recv();
357 let is_connected = client_status.borrow().is_connected();
358 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
359 if !is_connected || client_status.next().await.is_some() {
360 log::info!("detected client disconnection");
361
362 this.upgrade()
363 .ok_or_else(|| anyhow!("room was dropped"))?
364 .update(&mut cx, |this, cx| {
365 this.status = RoomStatus::Rejoining;
366 cx.notify();
367 })?;
368
369 // Wait for client to re-establish a connection to the server.
370 {
371 let mut reconnection_timeout =
372 cx.background_executor().timer(RECONNECT_TIMEOUT).fuse();
373 let client_reconnection = async {
374 let mut remaining_attempts = 3;
375 while remaining_attempts > 0 {
376 if client_status.borrow().is_connected() {
377 log::info!("client reconnected, attempting to rejoin room");
378
379 let Some(this) = this.upgrade() else { break };
380 match this.update(&mut cx, |this, cx| this.rejoin(cx)) {
381 Ok(task) => {
382 if task.await.log_err().is_some() {
383 return true;
384 } else {
385 remaining_attempts -= 1;
386 }
387 }
388 Err(_app_dropped) => return false,
389 }
390 } else if client_status.borrow().is_signed_out() {
391 return false;
392 }
393
394 log::info!(
395 "waiting for client status change, remaining attempts {}",
396 remaining_attempts
397 );
398 client_status.next().await;
399 }
400 false
401 }
402 .fuse();
403 futures::pin_mut!(client_reconnection);
404
405 futures::select_biased! {
406 reconnected = client_reconnection => {
407 if reconnected {
408 log::info!("successfully reconnected to room");
409 // If we successfully joined the room, go back around the loop
410 // waiting for future connection status changes.
411 continue;
412 }
413 }
414 _ = reconnection_timeout => {
415 log::info!("room reconnection timeout expired");
416 }
417 }
418 }
419
420 break;
421 }
422 }
423
424 // The client failed to re-establish a connection to the server
425 // or an error occurred while trying to re-join the room. Either way
426 // we leave the room and return an error.
427 if let Some(this) = this.upgrade() {
428 log::info!("reconnection failed, leaving room");
429 let _ = this.update(&mut cx, |this, cx| this.leave(cx))?;
430 }
431 Err(anyhow!(
432 "can't reconnect to room: client failed to re-establish connection"
433 ))
434 }
435
436 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
437 let mut projects = HashMap::default();
438 let mut reshared_projects = Vec::new();
439 let mut rejoined_projects = Vec::new();
440 self.shared_projects.retain(|project| {
441 if let Some(handle) = project.upgrade() {
442 let project = handle.read(cx);
443 if let Some(project_id) = project.remote_id() {
444 projects.insert(project_id, handle.clone());
445 reshared_projects.push(proto::UpdateProject {
446 project_id,
447 worktrees: project.worktree_metadata_protos(cx),
448 });
449 return true;
450 }
451 }
452 false
453 });
454 self.joined_projects.retain(|project| {
455 if let Some(handle) = project.upgrade() {
456 let project = handle.read(cx);
457 if let Some(project_id) = project.remote_id() {
458 projects.insert(project_id, handle.clone());
459 rejoined_projects.push(proto::RejoinProject {
460 id: project_id,
461 worktrees: project
462 .worktrees()
463 .map(|worktree| {
464 let worktree = worktree.read(cx);
465 proto::RejoinWorktree {
466 id: worktree.id().to_proto(),
467 scan_id: worktree.completed_scan_id() as u64,
468 }
469 })
470 .collect(),
471 });
472 }
473 return true;
474 }
475 false
476 });
477
478 let response = self.client.request_envelope(proto::RejoinRoom {
479 id: self.id,
480 reshared_projects,
481 rejoined_projects,
482 });
483
484 cx.spawn(|this, mut cx| async move {
485 let response = response.await?;
486 let message_id = response.message_id;
487 let response = response.payload;
488 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
489 this.update(&mut cx, |this, cx| {
490 this.status = RoomStatus::Online;
491 this.apply_room_update(room_proto, cx)?;
492
493 for reshared_project in response.reshared_projects {
494 if let Some(project) = projects.get(&reshared_project.id) {
495 project.update(cx, |project, cx| {
496 project.reshared(reshared_project, cx).log_err();
497 });
498 }
499 }
500
501 for rejoined_project in response.rejoined_projects {
502 if let Some(project) = projects.get(&rejoined_project.id) {
503 project.update(cx, |project, cx| {
504 project.rejoined(rejoined_project, message_id, cx).log_err();
505 });
506 }
507 }
508
509 anyhow::Ok(())
510 })?
511 })
512 }
513
514 pub fn id(&self) -> u64 {
515 self.id
516 }
517
518 pub fn status(&self) -> RoomStatus {
519 self.status
520 }
521
522 pub fn local_participant(&self) -> &LocalParticipant {
523 &self.local_participant
524 }
525
526 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
527 &self.remote_participants
528 }
529
530 pub fn call_participants(&self, cx: &AppContext) -> Vec<Arc<User>> {
531 self.remote_participants()
532 .values()
533 .filter_map(|participant| {
534 if participant.in_call {
535 Some(participant.user.clone())
536 } else {
537 None
538 }
539 })
540 .chain(if self.in_call() {
541 self.user_store.read(cx).current_user()
542 } else {
543 None
544 })
545 .collect()
546 }
547
548 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
549 self.remote_participants
550 .values()
551 .find(|p| p.peer_id == peer_id)
552 }
553
554 pub fn role_for_user(&self, user_id: u64) -> Option<proto::ChannelRole> {
555 self.remote_participants
556 .get(&user_id)
557 .map(|participant| participant.role)
558 }
559
560 pub fn contains_guests(&self) -> bool {
561 self.local_participant.role == proto::ChannelRole::Guest
562 || self
563 .remote_participants
564 .values()
565 .any(|p| p.role == proto::ChannelRole::Guest)
566 }
567
568 pub fn local_participant_is_admin(&self) -> bool {
569 self.local_participant.role == proto::ChannelRole::Admin
570 }
571
572 pub fn set_participant_role(
573 &mut self,
574 user_id: u64,
575 role: proto::ChannelRole,
576 cx: &ModelContext<Self>,
577 ) -> Task<Result<()>> {
578 let client = self.client.clone();
579 let room_id = self.id;
580 let role = role.into();
581 cx.spawn(|_, _| async move {
582 client
583 .request(proto::SetRoomParticipantRole {
584 room_id,
585 user_id,
586 role,
587 })
588 .await
589 .map(|_| ())
590 })
591 }
592
593 pub fn pending_participants(&self) -> &[Arc<User>] {
594 &self.pending_participants
595 }
596
597 pub fn contains_participant(&self, user_id: u64) -> bool {
598 self.participant_user_ids.contains(&user_id)
599 }
600
601 pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
602 self.follows_by_leader_id_project_id
603 .get(&(leader_id, project_id))
604 .map_or(&[], |v| v.as_slice())
605 }
606
607 /// Returns the most 'active' projects, defined as most people in the project
608 pub fn most_active_project(&self, cx: &AppContext) -> Option<(u64, u64)> {
609 let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
610 for participant in self.remote_participants.values() {
611 match participant.location {
612 ParticipantLocation::SharedProject { project_id } => {
613 project_hosts_and_guest_counts
614 .entry(project_id)
615 .or_default()
616 .1 += 1;
617 }
618 ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
619 }
620 for project in &participant.projects {
621 project_hosts_and_guest_counts
622 .entry(project.id)
623 .or_default()
624 .0 = Some(participant.user.id);
625 }
626 }
627
628 if let Some(user) = self.user_store.read(cx).current_user() {
629 for project in &self.local_participant.projects {
630 project_hosts_and_guest_counts
631 .entry(project.id)
632 .or_default()
633 .0 = Some(user.id);
634 }
635 }
636
637 project_hosts_and_guest_counts
638 .into_iter()
639 .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
640 .max_by_key(|(_, _, guest_count)| *guest_count)
641 .map(|(id, host, _)| (id, host))
642 }
643
644 async fn handle_room_updated(
645 this: Model<Self>,
646 envelope: TypedEnvelope<proto::RoomUpdated>,
647 _: Arc<Client>,
648 mut cx: AsyncAppContext,
649 ) -> Result<()> {
650 let room = envelope
651 .payload
652 .room
653 .ok_or_else(|| anyhow!("invalid room"))?;
654 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?
655 }
656
657 fn apply_room_update(
658 &mut self,
659 mut room: proto::Room,
660 cx: &mut ModelContext<Self>,
661 ) -> Result<()> {
662 // Filter ourselves out from the room's participants.
663 let local_participant_ix = room
664 .participants
665 .iter()
666 .position(|participant| Some(participant.user_id) == self.client.user_id());
667 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
668
669 let pending_participant_user_ids = room
670 .pending_participants
671 .iter()
672 .map(|p| p.user_id)
673 .collect::<Vec<_>>();
674
675 let remote_participant_user_ids = room
676 .participants
677 .iter()
678 .map(|p| p.user_id)
679 .collect::<Vec<_>>();
680
681 let (remote_participants, pending_participants) =
682 self.user_store.update(cx, move |user_store, cx| {
683 (
684 user_store.get_users(remote_participant_user_ids, cx),
685 user_store.get_users(pending_participant_user_ids, cx),
686 )
687 });
688
689 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
690 let (remote_participants, pending_participants) =
691 futures::join!(remote_participants, pending_participants);
692
693 this.update(&mut cx, |this, cx| {
694 this.participant_user_ids.clear();
695
696 if let Some(participant) = local_participant {
697 let role = participant.role();
698 this.local_participant.projects = participant.projects;
699 if this.local_participant.role != role {
700 this.local_participant.role = role;
701
702 if role == proto::ChannelRole::Guest {
703 for project in mem::take(&mut this.shared_projects) {
704 if let Some(project) = project.upgrade() {
705 this.unshare_project(project, cx).log_err();
706 }
707 }
708 this.local_participant.projects.clear();
709 if let Some(live_kit_room) = &mut this.live_kit {
710 live_kit_room.stop_publishing(cx);
711 }
712 }
713
714 this.joined_projects.retain(|project| {
715 if let Some(project) = project.upgrade() {
716 project.update(cx, |project, cx| project.set_role(role, cx));
717 true
718 } else {
719 false
720 }
721 });
722 }
723 } else {
724 this.local_participant.projects.clear();
725 }
726
727 if let Some(participants) = remote_participants.log_err() {
728 for (participant, user) in room.participants.into_iter().zip(participants) {
729 let Some(peer_id) = participant.peer_id else {
730 continue;
731 };
732 let participant_index = ParticipantIndex(participant.participant_index);
733 this.participant_user_ids.insert(participant.user_id);
734
735 let old_projects = this
736 .remote_participants
737 .get(&participant.user_id)
738 .into_iter()
739 .flat_map(|existing| &existing.projects)
740 .map(|project| project.id)
741 .collect::<HashSet<_>>();
742 let new_projects = participant
743 .projects
744 .iter()
745 .map(|project| project.id)
746 .collect::<HashSet<_>>();
747
748 for project in &participant.projects {
749 if !old_projects.contains(&project.id) {
750 cx.emit(Event::RemoteProjectShared {
751 owner: user.clone(),
752 project_id: project.id,
753 worktree_root_names: project.worktree_root_names.clone(),
754 });
755 }
756 }
757
758 for unshared_project_id in old_projects.difference(&new_projects) {
759 this.joined_projects.retain(|project| {
760 if let Some(project) = project.upgrade() {
761 project.update(cx, |project, cx| {
762 if project.remote_id() == Some(*unshared_project_id) {
763 project.disconnected_from_host(cx);
764 false
765 } else {
766 true
767 }
768 })
769 } else {
770 false
771 }
772 });
773 cx.emit(Event::RemoteProjectUnshared {
774 project_id: *unshared_project_id,
775 });
776 }
777
778 let role = participant.role();
779 let in_call = participant.in_call;
780 let location = ParticipantLocation::from_proto(participant.location)
781 .unwrap_or(ParticipantLocation::External);
782 if let Some(remote_participant) =
783 this.remote_participants.get_mut(&participant.user_id)
784 {
785 remote_participant.peer_id = peer_id;
786 remote_participant.projects = participant.projects;
787 remote_participant.participant_index = participant_index;
788 if location != remote_participant.location
789 || role != remote_participant.role
790 || in_call != remote_participant.in_call
791 {
792 if in_call && !remote_participant.in_call {
793 Audio::play_sound(Sound::Joined, cx);
794 }
795 remote_participant.location = location;
796 remote_participant.role = role;
797 remote_participant.in_call = participant.in_call;
798
799 cx.emit(Event::ParticipantLocationChanged {
800 participant_id: peer_id,
801 });
802 }
803 } else {
804 this.remote_participants.insert(
805 participant.user_id,
806 RemoteParticipant {
807 user: user.clone(),
808 participant_index,
809 peer_id,
810 projects: participant.projects,
811 location,
812 role,
813 muted: true,
814 speaking: false,
815 in_call: participant.in_call,
816 video_tracks: Default::default(),
817 audio_tracks: Default::default(),
818 },
819 );
820
821 if participant.in_call {
822 Audio::play_sound(Sound::Joined, cx);
823 }
824
825 if let Some(live_kit) = this.live_kit.as_ref() {
826 let video_tracks =
827 live_kit.room.remote_video_tracks(&user.id.to_string());
828 let audio_tracks =
829 live_kit.room.remote_audio_tracks(&user.id.to_string());
830 let publications = live_kit
831 .room
832 .remote_audio_track_publications(&user.id.to_string());
833
834 for track in video_tracks {
835 this.live_kit_room_updated(
836 RoomUpdate::SubscribedToRemoteVideoTrack(track),
837 cx,
838 )
839 .log_err();
840 }
841
842 for (track, publication) in
843 audio_tracks.iter().zip(publications.iter())
844 {
845 this.live_kit_room_updated(
846 RoomUpdate::SubscribedToRemoteAudioTrack(
847 track.clone(),
848 publication.clone(),
849 ),
850 cx,
851 )
852 .log_err();
853 }
854 }
855 }
856 }
857
858 this.remote_participants.retain(|user_id, participant| {
859 if this.participant_user_ids.contains(user_id) {
860 true
861 } else {
862 for project in &participant.projects {
863 cx.emit(Event::RemoteProjectUnshared {
864 project_id: project.id,
865 });
866 }
867 false
868 }
869 });
870 }
871
872 if let Some(pending_participants) = pending_participants.log_err() {
873 this.pending_participants = pending_participants;
874 for participant in &this.pending_participants {
875 this.participant_user_ids.insert(participant.id);
876 }
877 }
878
879 this.follows_by_leader_id_project_id.clear();
880 for follower in room.followers {
881 let project_id = follower.project_id;
882 let (leader, follower) = match (follower.leader_id, follower.follower_id) {
883 (Some(leader), Some(follower)) => (leader, follower),
884
885 _ => {
886 log::error!("Follower message {follower:?} missing some state");
887 continue;
888 }
889 };
890
891 let list = this
892 .follows_by_leader_id_project_id
893 .entry((leader, project_id))
894 .or_insert(Vec::new());
895 if !list.contains(&follower) {
896 list.push(follower);
897 }
898 }
899
900 this.pending_room_update.take();
901 if this.should_leave() {
902 log::info!("room is empty, leaving");
903 let _ = this.leave(cx);
904 }
905
906 this.user_store.update(cx, |user_store, cx| {
907 let participant_indices_by_user_id = this
908 .remote_participants
909 .iter()
910 .map(|(user_id, participant)| (*user_id, participant.participant_index))
911 .collect();
912 user_store.set_participant_indices(participant_indices_by_user_id, cx);
913 });
914
915 this.check_invariants();
916 this.room_update_completed_tx.try_send(Some(())).ok();
917 cx.notify();
918 })
919 .ok();
920 }));
921
922 cx.notify();
923 Ok(())
924 }
925
926 pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
927 let mut done_rx = self.room_update_completed_rx.clone();
928 async move {
929 while let Some(result) = done_rx.next().await {
930 if result.is_some() {
931 break;
932 }
933 }
934 }
935 }
936
937 fn live_kit_room_updated(
938 &mut self,
939 update: RoomUpdate,
940 cx: &mut ModelContext<Self>,
941 ) -> Result<()> {
942 match update {
943 RoomUpdate::SubscribedToRemoteVideoTrack(track) => {
944 let user_id = track.publisher_id().parse()?;
945 let track_id = track.sid().to_string();
946 let participant = self
947 .remote_participants
948 .get_mut(&user_id)
949 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
950 participant.video_tracks.insert(track_id.clone(), track);
951 cx.emit(Event::RemoteVideoTracksChanged {
952 participant_id: participant.peer_id,
953 });
954 }
955
956 RoomUpdate::UnsubscribedFromRemoteVideoTrack {
957 publisher_id,
958 track_id,
959 } => {
960 let user_id = publisher_id.parse()?;
961 let participant = self
962 .remote_participants
963 .get_mut(&user_id)
964 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
965 participant.video_tracks.remove(&track_id);
966 cx.emit(Event::RemoteVideoTracksChanged {
967 participant_id: participant.peer_id,
968 });
969 }
970
971 RoomUpdate::ActiveSpeakersChanged { speakers } => {
972 let mut speaker_ids = speakers
973 .into_iter()
974 .filter_map(|speaker_sid| speaker_sid.parse().ok())
975 .collect::<Vec<u64>>();
976 speaker_ids.sort_unstable();
977 for (sid, participant) in &mut self.remote_participants {
978 if let Ok(_) = speaker_ids.binary_search(sid) {
979 participant.speaking = true;
980 } else {
981 participant.speaking = false;
982 }
983 }
984 if let Some(id) = self.client.user_id() {
985 if let Some(room) = &mut self.live_kit {
986 if let Ok(_) = speaker_ids.binary_search(&id) {
987 room.speaking = true;
988 } else {
989 room.speaking = false;
990 }
991 }
992 }
993 }
994
995 RoomUpdate::RemoteAudioTrackMuteChanged { track_id, muted } => {
996 let mut found = false;
997 for participant in &mut self.remote_participants.values_mut() {
998 for track in participant.audio_tracks.values() {
999 if track.sid() == track_id {
1000 found = true;
1001 break;
1002 }
1003 }
1004 if found {
1005 participant.muted = muted;
1006 break;
1007 }
1008 }
1009 }
1010
1011 RoomUpdate::SubscribedToRemoteAudioTrack(track, publication) => {
1012 let user_id = track.publisher_id().parse()?;
1013 let track_id = track.sid().to_string();
1014 let participant = self
1015 .remote_participants
1016 .get_mut(&user_id)
1017 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
1018 participant.audio_tracks.insert(track_id.clone(), track);
1019 participant.muted = publication.is_muted();
1020
1021 cx.emit(Event::RemoteAudioTracksChanged {
1022 participant_id: participant.peer_id,
1023 });
1024 }
1025
1026 RoomUpdate::UnsubscribedFromRemoteAudioTrack {
1027 publisher_id,
1028 track_id,
1029 } => {
1030 let user_id = publisher_id.parse()?;
1031 let participant = self
1032 .remote_participants
1033 .get_mut(&user_id)
1034 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1035 participant.audio_tracks.remove(&track_id);
1036 cx.emit(Event::RemoteAudioTracksChanged {
1037 participant_id: participant.peer_id,
1038 });
1039 }
1040
1041 RoomUpdate::LocalAudioTrackUnpublished { publication } => {
1042 log::info!("unpublished audio track {}", publication.sid());
1043 if let Some(room) = &mut self.live_kit {
1044 room.microphone_track = LocalTrack::None;
1045 }
1046 }
1047
1048 RoomUpdate::LocalVideoTrackUnpublished { publication } => {
1049 log::info!("unpublished video track {}", publication.sid());
1050 if let Some(room) = &mut self.live_kit {
1051 room.screen_track = LocalTrack::None;
1052 }
1053 }
1054
1055 RoomUpdate::LocalAudioTrackPublished { publication } => {
1056 log::info!("published audio track {}", publication.sid());
1057 }
1058
1059 RoomUpdate::LocalVideoTrackPublished { publication } => {
1060 log::info!("published video track {}", publication.sid());
1061 }
1062 }
1063
1064 cx.notify();
1065 Ok(())
1066 }
1067
1068 fn check_invariants(&self) {
1069 #[cfg(any(test, feature = "test-support"))]
1070 {
1071 for participant in self.remote_participants.values() {
1072 assert!(self.participant_user_ids.contains(&participant.user.id));
1073 assert_ne!(participant.user.id, self.client.user_id().unwrap());
1074 }
1075
1076 for participant in &self.pending_participants {
1077 assert!(self.participant_user_ids.contains(&participant.id));
1078 assert_ne!(participant.id, self.client.user_id().unwrap());
1079 }
1080
1081 assert_eq!(
1082 self.participant_user_ids.len(),
1083 self.remote_participants.len() + self.pending_participants.len()
1084 );
1085 }
1086 }
1087
1088 pub(crate) fn call(
1089 &mut self,
1090 called_user_id: u64,
1091 initial_project_id: Option<u64>,
1092 cx: &mut ModelContext<Self>,
1093 ) -> Task<Result<()>> {
1094 if self.status.is_offline() {
1095 return Task::ready(Err(anyhow!("room is offline")));
1096 }
1097
1098 cx.notify();
1099 let client = self.client.clone();
1100 let room_id = self.id;
1101 self.pending_call_count += 1;
1102 cx.spawn(move |this, mut cx| async move {
1103 let result = client
1104 .request(proto::Call {
1105 room_id,
1106 called_user_id,
1107 initial_project_id,
1108 })
1109 .await;
1110 this.update(&mut cx, |this, cx| {
1111 this.pending_call_count -= 1;
1112 if this.should_leave() {
1113 this.leave(cx).detach_and_log_err(cx);
1114 }
1115 })?;
1116 result?;
1117 Ok(())
1118 })
1119 }
1120
1121 pub fn join_project(
1122 &mut self,
1123 id: u64,
1124 language_registry: Arc<LanguageRegistry>,
1125 fs: Arc<dyn Fs>,
1126 cx: &mut ModelContext<Self>,
1127 ) -> Task<Result<Model<Project>>> {
1128 let client = self.client.clone();
1129 let user_store = self.user_store.clone();
1130 let role = self.local_participant.role;
1131 cx.emit(Event::RemoteProjectJoined { project_id: id });
1132 cx.spawn(move |this, mut cx| async move {
1133 let project = Project::remote(
1134 id,
1135 client,
1136 user_store,
1137 language_registry,
1138 fs,
1139 role,
1140 cx.clone(),
1141 )
1142 .await?;
1143
1144 this.update(&mut cx, |this, cx| {
1145 this.joined_projects.retain(|project| {
1146 if let Some(project) = project.upgrade() {
1147 !project.read(cx).is_disconnected()
1148 } else {
1149 false
1150 }
1151 });
1152 this.joined_projects.insert(project.downgrade());
1153 })?;
1154 Ok(project)
1155 })
1156 }
1157
1158 pub(crate) fn share_project(
1159 &mut self,
1160 project: Model<Project>,
1161 cx: &mut ModelContext<Self>,
1162 ) -> Task<Result<u64>> {
1163 if let Some(project_id) = project.read(cx).remote_id() {
1164 return Task::ready(Ok(project_id));
1165 }
1166
1167 let request = self.client.request(proto::ShareProject {
1168 room_id: self.id(),
1169 worktrees: project.read(cx).worktree_metadata_protos(cx),
1170 });
1171 cx.spawn(|this, mut cx| async move {
1172 let response = request.await?;
1173
1174 project.update(&mut cx, |project, cx| {
1175 project.shared(response.project_id, cx)
1176 })??;
1177
1178 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1179 this.update(&mut cx, |this, cx| {
1180 this.shared_projects.insert(project.downgrade());
1181 let active_project = this.local_participant.active_project.as_ref();
1182 if active_project.map_or(false, |location| *location == project) {
1183 this.set_location(Some(&project), cx)
1184 } else {
1185 Task::ready(Ok(()))
1186 }
1187 })?
1188 .await?;
1189
1190 Ok(response.project_id)
1191 })
1192 }
1193
1194 pub(crate) fn unshare_project(
1195 &mut self,
1196 project: Model<Project>,
1197 cx: &mut ModelContext<Self>,
1198 ) -> Result<()> {
1199 let project_id = match project.read(cx).remote_id() {
1200 Some(project_id) => project_id,
1201 None => return Ok(()),
1202 };
1203
1204 self.client.send(proto::UnshareProject { project_id })?;
1205 project.update(cx, |this, cx| this.unshare(cx))?;
1206
1207 if self.local_participant.active_project == Some(project.downgrade()) {
1208 self.set_location(Some(&project), cx).detach_and_log_err(cx);
1209 }
1210 Ok(())
1211 }
1212
1213 pub(crate) fn set_location(
1214 &mut self,
1215 project: Option<&Model<Project>>,
1216 cx: &mut ModelContext<Self>,
1217 ) -> Task<Result<()>> {
1218 if self.status.is_offline() {
1219 return Task::ready(Err(anyhow!("room is offline")));
1220 }
1221
1222 let client = self.client.clone();
1223 let room_id = self.id;
1224 let location = if let Some(project) = project {
1225 self.local_participant.active_project = Some(project.downgrade());
1226 if let Some(project_id) = project.read(cx).remote_id() {
1227 proto::participant_location::Variant::SharedProject(
1228 proto::participant_location::SharedProject { id: project_id },
1229 )
1230 } else {
1231 proto::participant_location::Variant::UnsharedProject(
1232 proto::participant_location::UnsharedProject {},
1233 )
1234 }
1235 } else {
1236 self.local_participant.active_project = None;
1237 proto::participant_location::Variant::External(proto::participant_location::External {})
1238 };
1239
1240 cx.notify();
1241 cx.background_executor().spawn(async move {
1242 client
1243 .request(proto::UpdateParticipantLocation {
1244 room_id,
1245 location: Some(proto::ParticipantLocation {
1246 variant: Some(location),
1247 }),
1248 })
1249 .await?;
1250 Ok(())
1251 })
1252 }
1253
1254 pub fn is_screen_sharing(&self) -> bool {
1255 self.live_kit.as_ref().map_or(false, |live_kit| {
1256 !matches!(live_kit.screen_track, LocalTrack::None)
1257 })
1258 }
1259
1260 pub fn is_muted(&self) -> bool {
1261 self.live_kit
1262 .as_ref()
1263 .map_or(true, |live_kit| match &live_kit.microphone_track {
1264 LocalTrack::None => true,
1265 LocalTrack::Pending { .. } => true,
1266 LocalTrack::Published { track_publication } => track_publication.is_muted(),
1267 })
1268 }
1269
1270 pub fn read_only(&self) -> bool {
1271 !(self.local_participant().role == proto::ChannelRole::Member
1272 || self.local_participant().role == proto::ChannelRole::Admin)
1273 }
1274
1275 pub fn is_speaking(&self) -> bool {
1276 self.live_kit
1277 .as_ref()
1278 .map_or(false, |live_kit| live_kit.speaking)
1279 }
1280
1281 pub fn in_call(&self) -> bool {
1282 self.live_kit.is_some()
1283 }
1284
1285 #[track_caller]
1286 pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1287 if self.status.is_offline() {
1288 return Task::ready(Err(anyhow!("room is offline")));
1289 }
1290
1291 let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1292 let publish_id = post_inc(&mut live_kit.next_publish_id);
1293 live_kit.microphone_track = LocalTrack::Pending { publish_id };
1294 cx.notify();
1295 publish_id
1296 } else {
1297 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1298 };
1299
1300 cx.spawn(move |this, mut cx| async move {
1301 let publish_track = async {
1302 let track = LocalAudioTrack::create();
1303 this.upgrade()
1304 .ok_or_else(|| anyhow!("room was dropped"))?
1305 .update(&mut cx, |this, _| {
1306 this.live_kit
1307 .as_ref()
1308 .map(|live_kit| live_kit.room.publish_audio_track(track))
1309 })?
1310 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1311 .await
1312 };
1313 let publication = publish_track.await;
1314 this.upgrade()
1315 .ok_or_else(|| anyhow!("room was dropped"))?
1316 .update(&mut cx, |this, cx| {
1317 let live_kit = this
1318 .live_kit
1319 .as_mut()
1320 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1321
1322 let canceled = if let LocalTrack::Pending {
1323 publish_id: cur_publish_id,
1324 } = &live_kit.microphone_track
1325 {
1326 *cur_publish_id != publish_id
1327 } else {
1328 true
1329 };
1330
1331 match publication {
1332 Ok(publication) => {
1333 if canceled {
1334 live_kit.room.unpublish_track(publication);
1335 live_kit.microphone_track = LocalTrack::None;
1336 } else {
1337 live_kit.microphone_track = LocalTrack::Published {
1338 track_publication: publication,
1339 };
1340 cx.notify();
1341 }
1342 Ok(())
1343 }
1344 Err(error) => {
1345 if canceled {
1346 Ok(())
1347 } else {
1348 live_kit.microphone_track = LocalTrack::None;
1349 cx.notify();
1350 Err(error)
1351 }
1352 }
1353 }
1354 })?
1355 })
1356 }
1357
1358 pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1359 if self.status.is_offline() {
1360 return Task::ready(Err(anyhow!("room is offline")));
1361 } else if self.is_screen_sharing() {
1362 return Task::ready(Err(anyhow!("screen was already shared")));
1363 }
1364
1365 let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1366 let publish_id = post_inc(&mut live_kit.next_publish_id);
1367 live_kit.screen_track = LocalTrack::Pending { publish_id };
1368 cx.notify();
1369 (live_kit.room.display_sources(), publish_id)
1370 } else {
1371 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1372 };
1373
1374 cx.spawn(move |this, mut cx| async move {
1375 let publish_track = async {
1376 let displays = displays.await?;
1377 let display = displays
1378 .first()
1379 .ok_or_else(|| anyhow!("no display found"))?;
1380 let track = LocalVideoTrack::screen_share_for_display(display);
1381 this.upgrade()
1382 .ok_or_else(|| anyhow!("room was dropped"))?
1383 .update(&mut cx, |this, _| {
1384 this.live_kit
1385 .as_ref()
1386 .map(|live_kit| live_kit.room.publish_video_track(track))
1387 })?
1388 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1389 .await
1390 };
1391
1392 let publication = publish_track.await;
1393 this.upgrade()
1394 .ok_or_else(|| anyhow!("room was dropped"))?
1395 .update(&mut cx, |this, cx| {
1396 let live_kit = this
1397 .live_kit
1398 .as_mut()
1399 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1400
1401 let canceled = if let LocalTrack::Pending {
1402 publish_id: cur_publish_id,
1403 } = &live_kit.screen_track
1404 {
1405 *cur_publish_id != publish_id
1406 } else {
1407 true
1408 };
1409
1410 match publication {
1411 Ok(publication) => {
1412 if canceled {
1413 live_kit.room.unpublish_track(publication);
1414 } else {
1415 live_kit.screen_track = LocalTrack::Published {
1416 track_publication: publication,
1417 };
1418 cx.notify();
1419 }
1420
1421 Audio::play_sound(Sound::StartScreenshare, cx);
1422
1423 Ok(())
1424 }
1425 Err(error) => {
1426 if canceled {
1427 Ok(())
1428 } else {
1429 live_kit.screen_track = LocalTrack::None;
1430 cx.notify();
1431 Err(error)
1432 }
1433 }
1434 }
1435 })?
1436 })
1437 }
1438
1439 pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) {
1440 let muted = !self.is_muted();
1441 if let Some(task) = self.set_mute(muted, cx) {
1442 task.detach_and_log_err(cx);
1443 }
1444 }
1445
1446 pub fn join_call(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1447 if self.live_kit.is_some() {
1448 return Task::ready(Ok(()));
1449 }
1450
1451 let room = live_kit_client::Room::new();
1452 let mut status = room.status();
1453 // Consume the initial status of the room.
1454 let _ = status.try_recv();
1455 let _maintain_room = cx.spawn(|this, mut cx| async move {
1456 while let Some(status) = status.next().await {
1457 let this = if let Some(this) = this.upgrade() {
1458 this
1459 } else {
1460 break;
1461 };
1462
1463 if status == live_kit_client::ConnectionState::Disconnected {
1464 this.update(&mut cx, |this, cx| this.leave(cx).log_err())
1465 .ok();
1466 break;
1467 }
1468 }
1469 });
1470
1471 let _handle_updates = cx.spawn({
1472 let room = room.clone();
1473 move |this, mut cx| async move {
1474 let mut updates = room.updates();
1475 while let Some(update) = updates.next().await {
1476 let this = if let Some(this) = this.upgrade() {
1477 this
1478 } else {
1479 break;
1480 };
1481
1482 this.update(&mut cx, |this, cx| {
1483 this.live_kit_room_updated(update, cx).log_err()
1484 })
1485 .ok();
1486 }
1487 }
1488 });
1489
1490 self.live_kit = Some(LiveKitRoom {
1491 room: room.clone(),
1492 screen_track: LocalTrack::None,
1493 microphone_track: LocalTrack::None,
1494 next_publish_id: 0,
1495 speaking: false,
1496 _maintain_room,
1497 _handle_updates,
1498 });
1499
1500 cx.spawn({
1501 let client = self.client.clone();
1502 let share_microphone = !self.read_only() && !Self::mute_on_join(cx);
1503 let connection_info = self.live_kit_connection_info.clone();
1504 let channel_id = self.channel_id;
1505
1506 move |this, mut cx| async move {
1507 let connection_info = if let Some(connection_info) = connection_info {
1508 connection_info.clone()
1509 } else if let Some(channel_id) = channel_id {
1510 if let Some(connection_info) = client
1511 .request(proto::JoinChannelCall { channel_id })
1512 .await?
1513 .live_kit_connection_info
1514 {
1515 connection_info
1516 } else {
1517 return Err(anyhow!("failed to get connection info from server"));
1518 }
1519 } else {
1520 return Err(anyhow!(
1521 "tried to connect to livekit without connection info"
1522 ));
1523 };
1524 room.connect(&connection_info.server_url, &connection_info.token)
1525 .await?;
1526
1527 let track_updates = this.update(&mut cx, |this, cx| {
1528 Audio::play_sound(Sound::Joined, cx);
1529 let Some(live_kit) = this.live_kit.as_mut() else {
1530 return vec![];
1531 };
1532
1533 let mut track_updates = Vec::new();
1534 for participant in this.remote_participants.values() {
1535 for publication in live_kit
1536 .room
1537 .remote_audio_track_publications(&participant.user.id.to_string())
1538 {
1539 track_updates.push(publication.set_enabled(true));
1540 }
1541
1542 for track in participant.audio_tracks.values() {
1543 track.start();
1544 }
1545 }
1546 track_updates
1547 })?;
1548
1549 if share_microphone {
1550 this.update(&mut cx, |this, cx| this.share_microphone(cx))?
1551 .await?
1552 };
1553
1554 for result in futures::future::join_all(track_updates).await {
1555 result?;
1556 }
1557 anyhow::Ok(())
1558 }
1559 })
1560 }
1561
1562 pub fn leave_call(&mut self, cx: &mut ModelContext<Self>) {
1563 Audio::play_sound(Sound::Leave, cx);
1564 if let Some(channel_id) = self.channel_id() {
1565 let client = self.client.clone();
1566 cx.background_executor()
1567 .spawn(client.request(proto::LeaveChannelCall { channel_id }))
1568 .detach_and_log_err(cx);
1569 self.live_kit.take();
1570 self.live_kit_connection_info.take();
1571 cx.notify();
1572 } else {
1573 self.leave(cx).detach_and_log_err(cx)
1574 }
1575 }
1576
1577 pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1578 if self.status.is_offline() {
1579 return Err(anyhow!("room is offline"));
1580 }
1581
1582 let live_kit = self
1583 .live_kit
1584 .as_mut()
1585 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1586 match mem::take(&mut live_kit.screen_track) {
1587 LocalTrack::None => Err(anyhow!("screen was not shared")),
1588 LocalTrack::Pending { .. } => {
1589 cx.notify();
1590 Ok(())
1591 }
1592 LocalTrack::Published {
1593 track_publication, ..
1594 } => {
1595 live_kit.room.unpublish_track(track_publication);
1596 cx.notify();
1597
1598 Audio::play_sound(Sound::StopScreenshare, cx);
1599 Ok(())
1600 }
1601 }
1602 }
1603
1604 fn set_mute(
1605 &mut self,
1606 should_mute: bool,
1607 cx: &mut ModelContext<Room>,
1608 ) -> Option<Task<Result<()>>> {
1609 let live_kit = self.live_kit.as_mut()?;
1610 cx.notify();
1611
1612 if should_mute {
1613 Audio::play_sound(Sound::Mute, cx);
1614 } else {
1615 Audio::play_sound(Sound::Unmute, cx);
1616 }
1617
1618 match &mut live_kit.microphone_track {
1619 LocalTrack::None => {
1620 if should_mute {
1621 None
1622 } else {
1623 Some(self.share_microphone(cx))
1624 }
1625 }
1626 LocalTrack::Pending { .. } => None,
1627 LocalTrack::Published { track_publication } => Some(
1628 cx.foreground_executor()
1629 .spawn(track_publication.set_mute(should_mute)),
1630 ),
1631 }
1632 }
1633
1634 #[cfg(any(test, feature = "test-support"))]
1635 pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1636 self.live_kit
1637 .as_ref()
1638 .unwrap()
1639 .room
1640 .set_display_sources(sources);
1641 }
1642}
1643
1644struct LiveKitRoom {
1645 room: Arc<live_kit_client::Room>,
1646 screen_track: LocalTrack,
1647 microphone_track: LocalTrack,
1648 speaking: bool,
1649 next_publish_id: usize,
1650 _maintain_room: Task<()>,
1651 _handle_updates: Task<()>,
1652}
1653
1654impl LiveKitRoom {
1655 fn stop_publishing(&mut self, cx: &mut ModelContext<Room>) {
1656 if let LocalTrack::Published {
1657 track_publication, ..
1658 } = mem::replace(&mut self.microphone_track, LocalTrack::None)
1659 {
1660 self.room.unpublish_track(track_publication);
1661 cx.notify();
1662 }
1663
1664 if let LocalTrack::Published {
1665 track_publication, ..
1666 } = mem::replace(&mut self.screen_track, LocalTrack::None)
1667 {
1668 self.room.unpublish_track(track_publication);
1669 cx.notify();
1670 }
1671 }
1672}
1673
1674enum LocalTrack {
1675 None,
1676 Pending {
1677 publish_id: usize,
1678 },
1679 Published {
1680 track_publication: LocalTrackPublication,
1681 },
1682}
1683
1684impl Default for LocalTrack {
1685 fn default() -> Self {
1686 Self::None
1687 }
1688}
1689
1690#[derive(Copy, Clone, PartialEq, Eq)]
1691pub enum RoomStatus {
1692 Online,
1693 Rejoining,
1694 Offline,
1695}
1696
1697impl RoomStatus {
1698 pub fn is_offline(&self) -> bool {
1699 matches!(self, RoomStatus::Offline)
1700 }
1701
1702 pub fn is_online(&self) -> bool {
1703 matches!(self, RoomStatus::Online)
1704 }
1705}