1#![cfg_attr(target_os = "windows", allow(unused))]
2
3use crate::{
4 call_settings::CallSettings,
5 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant},
6};
7use anyhow::{anyhow, Result};
8use audio::{Audio, Sound};
9use client::{
10 proto::{self, PeerId},
11 ChannelId, Client, ParticipantIndex, TypedEnvelope, User, UserStore,
12};
13use collections::{BTreeMap, HashMap, HashSet};
14use fs::Fs;
15use futures::{FutureExt, StreamExt};
16use gpui::{
17 AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
18};
19use language::LanguageRegistry;
20#[cfg(not(target_os = "windows"))]
21use livekit::{
22 capture_local_audio_track, capture_local_video_track,
23 id::ParticipantIdentity,
24 options::{TrackPublishOptions, VideoCodec},
25 play_remote_audio_track,
26 publication::LocalTrackPublication,
27 track::{TrackKind, TrackSource},
28 RoomEvent, RoomOptions,
29};
30#[cfg(target_os = "windows")]
31use livekit::{publication::LocalTrackPublication, RoomEvent};
32use livekit_client as livekit;
33use postage::{sink::Sink, stream::Stream, watch};
34use project::Project;
35use settings::Settings as _;
36use std::{any::Any, future::Future, mem, sync::Arc, time::Duration};
37use util::{post_inc, ResultExt, TryFutureExt};
38
39pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
40
41#[derive(Clone, Debug, PartialEq, Eq)]
42pub enum Event {
43 RoomJoined {
44 channel_id: Option<ChannelId>,
45 },
46 ParticipantLocationChanged {
47 participant_id: proto::PeerId,
48 },
49 RemoteVideoTracksChanged {
50 participant_id: proto::PeerId,
51 },
52 RemoteAudioTracksChanged {
53 participant_id: proto::PeerId,
54 },
55 RemoteProjectShared {
56 owner: Arc<User>,
57 project_id: u64,
58 worktree_root_names: Vec<String>,
59 },
60 RemoteProjectUnshared {
61 project_id: u64,
62 },
63 RemoteProjectJoined {
64 project_id: u64,
65 },
66 RemoteProjectInvitationDiscarded {
67 project_id: u64,
68 },
69 RoomLeft {
70 channel_id: Option<ChannelId>,
71 },
72}
73
74pub struct Room {
75 id: u64,
76 channel_id: Option<ChannelId>,
77 live_kit: Option<LiveKitRoom>,
78 status: RoomStatus,
79 shared_projects: HashSet<WeakModel<Project>>,
80 joined_projects: HashSet<WeakModel<Project>>,
81 local_participant: LocalParticipant,
82 remote_participants: BTreeMap<u64, RemoteParticipant>,
83 pending_participants: Vec<Arc<User>>,
84 participant_user_ids: HashSet<u64>,
85 pending_call_count: usize,
86 leave_when_empty: bool,
87 client: Arc<Client>,
88 user_store: Model<UserStore>,
89 follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
90 client_subscriptions: Vec<client::Subscription>,
91 _subscriptions: Vec<gpui::Subscription>,
92 room_update_completed_tx: watch::Sender<Option<()>>,
93 room_update_completed_rx: watch::Receiver<Option<()>>,
94 pending_room_update: Option<Task<()>>,
95 maintain_connection: Option<Task<Option<()>>>,
96}
97
98impl EventEmitter<Event> for Room {}
99
100impl Room {
101 pub fn channel_id(&self) -> Option<ChannelId> {
102 self.channel_id
103 }
104
105 pub fn is_sharing_project(&self) -> bool {
106 !self.shared_projects.is_empty()
107 }
108
109 #[cfg(all(any(test, feature = "test-support"), not(target_os = "windows")))]
110 pub fn is_connected(&self) -> bool {
111 if let Some(live_kit) = self.live_kit.as_ref() {
112 live_kit.room.connection_state() == livekit::ConnectionState::Connected
113 } else {
114 false
115 }
116 }
117
118 fn new(
119 id: u64,
120 channel_id: Option<ChannelId>,
121 livekit_connection_info: Option<proto::LiveKitConnectionInfo>,
122 client: Arc<Client>,
123 user_store: Model<UserStore>,
124 cx: &mut ModelContext<Self>,
125 ) -> Self {
126 spawn_room_connection(livekit_connection_info, cx);
127
128 let maintain_connection = cx.spawn({
129 let client = client.clone();
130 move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
131 });
132
133 Audio::play_sound(Sound::Joined, cx);
134
135 let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
136
137 Self {
138 id,
139 channel_id,
140 live_kit: None,
141 status: RoomStatus::Online,
142 shared_projects: Default::default(),
143 joined_projects: Default::default(),
144 participant_user_ids: Default::default(),
145 local_participant: Default::default(),
146 remote_participants: Default::default(),
147 pending_participants: Default::default(),
148 pending_call_count: 0,
149 client_subscriptions: vec![
150 client.add_message_handler(cx.weak_model(), Self::handle_room_updated)
151 ],
152 _subscriptions: vec![
153 cx.on_release(Self::released),
154 cx.on_app_quit(Self::app_will_quit),
155 ],
156 leave_when_empty: false,
157 pending_room_update: None,
158 client,
159 user_store,
160 follows_by_leader_id_project_id: Default::default(),
161 maintain_connection: Some(maintain_connection),
162 room_update_completed_tx,
163 room_update_completed_rx,
164 }
165 }
166
167 pub(crate) fn create(
168 called_user_id: u64,
169 initial_project: Option<Model<Project>>,
170 client: Arc<Client>,
171 user_store: Model<UserStore>,
172 cx: &mut AppContext,
173 ) -> Task<Result<Model<Self>>> {
174 cx.spawn(move |mut cx| async move {
175 let response = client.request(proto::CreateRoom {}).await?;
176 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
177 let room = cx.new_model(|cx| {
178 let mut room = Self::new(
179 room_proto.id,
180 None,
181 response.live_kit_connection_info,
182 client,
183 user_store,
184 cx,
185 );
186 if let Some(participant) = room_proto.participants.first() {
187 room.local_participant.role = participant.role()
188 }
189 room
190 })?;
191
192 let initial_project_id = if let Some(initial_project) = initial_project {
193 let initial_project_id = room
194 .update(&mut cx, |room, cx| {
195 room.share_project(initial_project.clone(), cx)
196 })?
197 .await?;
198 Some(initial_project_id)
199 } else {
200 None
201 };
202
203 let did_join = room
204 .update(&mut cx, |room, cx| {
205 room.leave_when_empty = true;
206 room.call(called_user_id, initial_project_id, cx)
207 })?
208 .await;
209 match did_join {
210 Ok(()) => Ok(room),
211 Err(error) => Err(error.context("room creation failed")),
212 }
213 })
214 }
215
216 pub(crate) async fn join_channel(
217 channel_id: ChannelId,
218 client: Arc<Client>,
219 user_store: Model<UserStore>,
220 cx: AsyncAppContext,
221 ) -> Result<Model<Self>> {
222 Self::from_join_response(
223 client
224 .request(proto::JoinChannel {
225 channel_id: channel_id.0,
226 })
227 .await?,
228 client,
229 user_store,
230 cx,
231 )
232 }
233
234 pub(crate) async fn join(
235 room_id: u64,
236 client: Arc<Client>,
237 user_store: Model<UserStore>,
238 cx: AsyncAppContext,
239 ) -> Result<Model<Self>> {
240 Self::from_join_response(
241 client.request(proto::JoinRoom { id: room_id }).await?,
242 client,
243 user_store,
244 cx,
245 )
246 }
247
248 fn released(&mut self, cx: &mut AppContext) {
249 if self.status.is_online() {
250 self.leave_internal(cx).detach_and_log_err(cx);
251 }
252 }
253
254 fn app_will_quit(&mut self, cx: &mut ModelContext<Self>) -> impl Future<Output = ()> {
255 let task = if self.status.is_online() {
256 let leave = self.leave_internal(cx);
257 Some(cx.background_executor().spawn(async move {
258 leave.await.log_err();
259 }))
260 } else {
261 None
262 };
263
264 async move {
265 if let Some(task) = task {
266 task.await;
267 }
268 }
269 }
270
271 pub fn mute_on_join(cx: &AppContext) -> bool {
272 CallSettings::get_global(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
273 }
274
275 fn from_join_response(
276 response: proto::JoinRoomResponse,
277 client: Arc<Client>,
278 user_store: Model<UserStore>,
279 mut cx: AsyncAppContext,
280 ) -> Result<Model<Self>> {
281 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
282 let room = cx.new_model(|cx| {
283 Self::new(
284 room_proto.id,
285 response.channel_id.map(ChannelId),
286 response.live_kit_connection_info,
287 client,
288 user_store,
289 cx,
290 )
291 })?;
292 room.update(&mut cx, |room, cx| {
293 room.leave_when_empty = room.channel_id.is_none();
294 room.apply_room_update(room_proto, cx)?;
295 anyhow::Ok(())
296 })??;
297 Ok(room)
298 }
299
300 fn should_leave(&self) -> bool {
301 self.leave_when_empty
302 && self.pending_room_update.is_none()
303 && self.pending_participants.is_empty()
304 && self.remote_participants.is_empty()
305 && self.pending_call_count == 0
306 }
307
308 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
309 cx.notify();
310 self.leave_internal(cx)
311 }
312
313 fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
314 if self.status.is_offline() {
315 return Task::ready(Err(anyhow!("room is offline")));
316 }
317
318 log::info!("leaving room");
319 Audio::play_sound(Sound::Leave, cx);
320
321 self.clear_state(cx);
322
323 let leave_room = self.client.request(proto::LeaveRoom {});
324 cx.background_executor().spawn(async move {
325 leave_room.await?;
326 anyhow::Ok(())
327 })
328 }
329
330 pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
331 for project in self.shared_projects.drain() {
332 if let Some(project) = project.upgrade() {
333 project.update(cx, |project, cx| {
334 project.unshare(cx).log_err();
335 });
336 }
337 }
338 for project in self.joined_projects.drain() {
339 if let Some(project) = project.upgrade() {
340 project.update(cx, |project, cx| {
341 project.disconnected_from_host(cx);
342 project.close(cx);
343 });
344 }
345 }
346
347 self.status = RoomStatus::Offline;
348 self.remote_participants.clear();
349 self.pending_participants.clear();
350 self.participant_user_ids.clear();
351 self.client_subscriptions.clear();
352 self.live_kit.take();
353 self.pending_room_update.take();
354 self.maintain_connection.take();
355 }
356
357 async fn maintain_connection(
358 this: WeakModel<Self>,
359 client: Arc<Client>,
360 mut cx: AsyncAppContext,
361 ) -> Result<()> {
362 let mut client_status = client.status();
363 loop {
364 let _ = client_status.try_recv();
365 let is_connected = client_status.borrow().is_connected();
366 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
367 if !is_connected || client_status.next().await.is_some() {
368 log::info!("detected client disconnection");
369
370 this.upgrade()
371 .ok_or_else(|| anyhow!("room was dropped"))?
372 .update(&mut cx, |this, cx| {
373 this.status = RoomStatus::Rejoining;
374 cx.notify();
375 })?;
376
377 // Wait for client to re-establish a connection to the server.
378 {
379 let mut reconnection_timeout =
380 cx.background_executor().timer(RECONNECT_TIMEOUT).fuse();
381 let client_reconnection = async {
382 let mut remaining_attempts = 3;
383 while remaining_attempts > 0 {
384 if client_status.borrow().is_connected() {
385 log::info!("client reconnected, attempting to rejoin room");
386
387 let Some(this) = this.upgrade() else { break };
388 match this.update(&mut cx, |this, cx| this.rejoin(cx)) {
389 Ok(task) => {
390 if task.await.log_err().is_some() {
391 return true;
392 } else {
393 remaining_attempts -= 1;
394 }
395 }
396 Err(_app_dropped) => return false,
397 }
398 } else if client_status.borrow().is_signed_out() {
399 return false;
400 }
401
402 log::info!(
403 "waiting for client status change, remaining attempts {}",
404 remaining_attempts
405 );
406 client_status.next().await;
407 }
408 false
409 }
410 .fuse();
411 futures::pin_mut!(client_reconnection);
412
413 futures::select_biased! {
414 reconnected = client_reconnection => {
415 if reconnected {
416 log::info!("successfully reconnected to room");
417 // If we successfully joined the room, go back around the loop
418 // waiting for future connection status changes.
419 continue;
420 }
421 }
422 _ = reconnection_timeout => {
423 log::info!("room reconnection timeout expired");
424 }
425 }
426 }
427
428 break;
429 }
430 }
431
432 // The client failed to re-establish a connection to the server
433 // or an error occurred while trying to re-join the room. Either way
434 // we leave the room and return an error.
435 if let Some(this) = this.upgrade() {
436 log::info!("reconnection failed, leaving room");
437 this.update(&mut cx, |this, cx| this.leave(cx))?.await?;
438 }
439 Err(anyhow!(
440 "can't reconnect to room: client failed to re-establish connection"
441 ))
442 }
443
444 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
445 let mut projects = HashMap::default();
446 let mut reshared_projects = Vec::new();
447 let mut rejoined_projects = Vec::new();
448 self.shared_projects.retain(|project| {
449 if let Some(handle) = project.upgrade() {
450 let project = handle.read(cx);
451 if let Some(project_id) = project.remote_id() {
452 projects.insert(project_id, handle.clone());
453 reshared_projects.push(proto::UpdateProject {
454 project_id,
455 worktrees: project.worktree_metadata_protos(cx),
456 });
457 return true;
458 }
459 }
460 false
461 });
462 self.joined_projects.retain(|project| {
463 if let Some(handle) = project.upgrade() {
464 let project = handle.read(cx);
465 if let Some(project_id) = project.remote_id() {
466 projects.insert(project_id, handle.clone());
467 rejoined_projects.push(proto::RejoinProject {
468 id: project_id,
469 worktrees: project
470 .worktrees(cx)
471 .map(|worktree| {
472 let worktree = worktree.read(cx);
473 proto::RejoinWorktree {
474 id: worktree.id().to_proto(),
475 scan_id: worktree.completed_scan_id() as u64,
476 }
477 })
478 .collect(),
479 });
480 }
481 return true;
482 }
483 false
484 });
485
486 let response = self.client.request_envelope(proto::RejoinRoom {
487 id: self.id,
488 reshared_projects,
489 rejoined_projects,
490 });
491
492 cx.spawn(|this, mut cx| async move {
493 let response = response.await?;
494 let message_id = response.message_id;
495 let response = response.payload;
496 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
497 this.update(&mut cx, |this, cx| {
498 this.status = RoomStatus::Online;
499 this.apply_room_update(room_proto, cx)?;
500
501 for reshared_project in response.reshared_projects {
502 if let Some(project) = projects.get(&reshared_project.id) {
503 project.update(cx, |project, cx| {
504 project.reshared(reshared_project, cx).log_err();
505 });
506 }
507 }
508
509 for rejoined_project in response.rejoined_projects {
510 if let Some(project) = projects.get(&rejoined_project.id) {
511 project.update(cx, |project, cx| {
512 project.rejoined(rejoined_project, message_id, cx).log_err();
513 });
514 }
515 }
516
517 anyhow::Ok(())
518 })?
519 })
520 }
521
522 pub fn id(&self) -> u64 {
523 self.id
524 }
525
526 pub fn status(&self) -> RoomStatus {
527 self.status
528 }
529
530 pub fn local_participant(&self) -> &LocalParticipant {
531 &self.local_participant
532 }
533
534 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
535 &self.remote_participants
536 }
537
538 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
539 self.remote_participants
540 .values()
541 .find(|p| p.peer_id == peer_id)
542 }
543
544 pub fn role_for_user(&self, user_id: u64) -> Option<proto::ChannelRole> {
545 self.remote_participants
546 .get(&user_id)
547 .map(|participant| participant.role)
548 }
549
550 pub fn contains_guests(&self) -> bool {
551 self.local_participant.role == proto::ChannelRole::Guest
552 || self
553 .remote_participants
554 .values()
555 .any(|p| p.role == proto::ChannelRole::Guest)
556 }
557
558 pub fn local_participant_is_admin(&self) -> bool {
559 self.local_participant.role == proto::ChannelRole::Admin
560 }
561
562 pub fn local_participant_is_guest(&self) -> bool {
563 self.local_participant.role == proto::ChannelRole::Guest
564 }
565
566 pub fn set_participant_role(
567 &mut self,
568 user_id: u64,
569 role: proto::ChannelRole,
570 cx: &ModelContext<Self>,
571 ) -> Task<Result<()>> {
572 let client = self.client.clone();
573 let room_id = self.id;
574 let role = role.into();
575 cx.spawn(|_, _| async move {
576 client
577 .request(proto::SetRoomParticipantRole {
578 room_id,
579 user_id,
580 role,
581 })
582 .await
583 .map(|_| ())
584 })
585 }
586
587 pub fn pending_participants(&self) -> &[Arc<User>] {
588 &self.pending_participants
589 }
590
591 pub fn contains_participant(&self, user_id: u64) -> bool {
592 self.participant_user_ids.contains(&user_id)
593 }
594
595 pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
596 self.follows_by_leader_id_project_id
597 .get(&(leader_id, project_id))
598 .map_or(&[], |v| v.as_slice())
599 }
600
601 /// Returns the most 'active' projects, defined as most people in the project
602 pub fn most_active_project(&self, cx: &AppContext) -> Option<(u64, u64)> {
603 let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
604 for participant in self.remote_participants.values() {
605 match participant.location {
606 ParticipantLocation::SharedProject { project_id } => {
607 project_hosts_and_guest_counts
608 .entry(project_id)
609 .or_default()
610 .1 += 1;
611 }
612 ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
613 }
614 for project in &participant.projects {
615 project_hosts_and_guest_counts
616 .entry(project.id)
617 .or_default()
618 .0 = Some(participant.user.id);
619 }
620 }
621
622 if let Some(user) = self.user_store.read(cx).current_user() {
623 for project in &self.local_participant.projects {
624 project_hosts_and_guest_counts
625 .entry(project.id)
626 .or_default()
627 .0 = Some(user.id);
628 }
629 }
630
631 project_hosts_and_guest_counts
632 .into_iter()
633 .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
634 .max_by_key(|(_, _, guest_count)| *guest_count)
635 .map(|(id, host, _)| (id, host))
636 }
637
638 async fn handle_room_updated(
639 this: Model<Self>,
640 envelope: TypedEnvelope<proto::RoomUpdated>,
641 mut cx: AsyncAppContext,
642 ) -> Result<()> {
643 let room = envelope
644 .payload
645 .room
646 .ok_or_else(|| anyhow!("invalid room"))?;
647 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?
648 }
649
650 fn apply_room_update(&mut self, room: proto::Room, cx: &mut ModelContext<Self>) -> Result<()> {
651 log::trace!(
652 "client {:?}. room update: {:?}",
653 self.client.user_id(),
654 &room
655 );
656
657 self.pending_room_update = Some(self.start_room_connection(room, cx));
658
659 cx.notify();
660 Ok(())
661 }
662
663 pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
664 let mut done_rx = self.room_update_completed_rx.clone();
665 async move {
666 while let Some(result) = done_rx.next().await {
667 if result.is_some() {
668 break;
669 }
670 }
671 }
672 }
673
674 #[cfg(target_os = "windows")]
675 fn start_room_connection(
676 &self,
677 mut room: proto::Room,
678 cx: &mut ModelContext<Self>,
679 ) -> Task<()> {
680 Task::ready(())
681 }
682
683 #[cfg(not(target_os = "windows"))]
684 fn start_room_connection(
685 &self,
686 mut room: proto::Room,
687 cx: &mut ModelContext<Self>,
688 ) -> Task<()> {
689 // Filter ourselves out from the room's participants.
690 let local_participant_ix = room
691 .participants
692 .iter()
693 .position(|participant| Some(participant.user_id) == self.client.user_id());
694 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
695
696 let pending_participant_user_ids = room
697 .pending_participants
698 .iter()
699 .map(|p| p.user_id)
700 .collect::<Vec<_>>();
701
702 let remote_participant_user_ids = room
703 .participants
704 .iter()
705 .map(|p| p.user_id)
706 .collect::<Vec<_>>();
707
708 let (remote_participants, pending_participants) =
709 self.user_store.update(cx, move |user_store, cx| {
710 (
711 user_store.get_users(remote_participant_user_ids, cx),
712 user_store.get_users(pending_participant_user_ids, cx),
713 )
714 });
715 cx.spawn(|this, mut cx| async move {
716 let (remote_participants, pending_participants) =
717 futures::join!(remote_participants, pending_participants);
718
719 this.update(&mut cx, |this, cx| {
720 this.participant_user_ids.clear();
721
722 if let Some(participant) = local_participant {
723 let role = participant.role();
724 this.local_participant.projects = participant.projects;
725 if this.local_participant.role != role {
726 this.local_participant.role = role;
727
728 if role == proto::ChannelRole::Guest {
729 for project in mem::take(&mut this.shared_projects) {
730 if let Some(project) = project.upgrade() {
731 this.unshare_project(project, cx).log_err();
732 }
733 }
734 this.local_participant.projects.clear();
735 if let Some(livekit_room) = &mut this.live_kit {
736 livekit_room.stop_publishing(cx);
737 }
738 }
739
740 this.joined_projects.retain(|project| {
741 if let Some(project) = project.upgrade() {
742 project.update(cx, |project, cx| project.set_role(role, cx));
743 true
744 } else {
745 false
746 }
747 });
748 }
749 } else {
750 this.local_participant.projects.clear();
751 }
752
753 let livekit_participants = this
754 .live_kit
755 .as_ref()
756 .map(|live_kit| live_kit.room.remote_participants());
757
758 if let Some(participants) = remote_participants.log_err() {
759 for (participant, user) in room.participants.into_iter().zip(participants) {
760 let Some(peer_id) = participant.peer_id else {
761 continue;
762 };
763 let participant_index = ParticipantIndex(participant.participant_index);
764 this.participant_user_ids.insert(participant.user_id);
765
766 let old_projects = this
767 .remote_participants
768 .get(&participant.user_id)
769 .into_iter()
770 .flat_map(|existing| &existing.projects)
771 .map(|project| project.id)
772 .collect::<HashSet<_>>();
773 let new_projects = participant
774 .projects
775 .iter()
776 .map(|project| project.id)
777 .collect::<HashSet<_>>();
778
779 for project in &participant.projects {
780 if !old_projects.contains(&project.id) {
781 cx.emit(Event::RemoteProjectShared {
782 owner: user.clone(),
783 project_id: project.id,
784 worktree_root_names: project.worktree_root_names.clone(),
785 });
786 }
787 }
788
789 for unshared_project_id in old_projects.difference(&new_projects) {
790 this.joined_projects.retain(|project| {
791 if let Some(project) = project.upgrade() {
792 project.update(cx, |project, cx| {
793 if project.remote_id() == Some(*unshared_project_id) {
794 project.disconnected_from_host(cx);
795 false
796 } else {
797 true
798 }
799 })
800 } else {
801 false
802 }
803 });
804 cx.emit(Event::RemoteProjectUnshared {
805 project_id: *unshared_project_id,
806 });
807 }
808
809 let role = participant.role();
810 let location = ParticipantLocation::from_proto(participant.location)
811 .unwrap_or(ParticipantLocation::External);
812 if let Some(remote_participant) =
813 this.remote_participants.get_mut(&participant.user_id)
814 {
815 remote_participant.peer_id = peer_id;
816 remote_participant.projects = participant.projects;
817 remote_participant.participant_index = participant_index;
818 if location != remote_participant.location
819 || role != remote_participant.role
820 {
821 remote_participant.location = location;
822 remote_participant.role = role;
823 cx.emit(Event::ParticipantLocationChanged {
824 participant_id: peer_id,
825 });
826 }
827 } else {
828 this.remote_participants.insert(
829 participant.user_id,
830 RemoteParticipant {
831 user: user.clone(),
832 participant_index,
833 peer_id,
834 projects: participant.projects,
835 location,
836 role,
837 muted: true,
838 speaking: false,
839 video_tracks: Default::default(),
840 #[cfg(not(target_os = "windows"))]
841 audio_tracks: Default::default(),
842 },
843 );
844
845 Audio::play_sound(Sound::Joined, cx);
846 if let Some(livekit_participants) = &livekit_participants {
847 if let Some(livekit_participant) = livekit_participants
848 .get(&ParticipantIdentity(user.id.to_string()))
849 {
850 for publication in
851 livekit_participant.track_publications().into_values()
852 {
853 if let Some(track) = publication.track() {
854 this.livekit_room_updated(
855 RoomEvent::TrackSubscribed {
856 track,
857 publication,
858 participant: livekit_participant.clone(),
859 },
860 cx,
861 )
862 .warn_on_err();
863 }
864 }
865 }
866 }
867 }
868 }
869
870 this.remote_participants.retain(|user_id, participant| {
871 if this.participant_user_ids.contains(user_id) {
872 true
873 } else {
874 for project in &participant.projects {
875 cx.emit(Event::RemoteProjectUnshared {
876 project_id: project.id,
877 });
878 }
879 false
880 }
881 });
882 }
883
884 if let Some(pending_participants) = pending_participants.log_err() {
885 this.pending_participants = pending_participants;
886 for participant in &this.pending_participants {
887 this.participant_user_ids.insert(participant.id);
888 }
889 }
890
891 this.follows_by_leader_id_project_id.clear();
892 for follower in room.followers {
893 let project_id = follower.project_id;
894 let (leader, follower) = match (follower.leader_id, follower.follower_id) {
895 (Some(leader), Some(follower)) => (leader, follower),
896
897 _ => {
898 log::error!("Follower message {follower:?} missing some state");
899 continue;
900 }
901 };
902
903 let list = this
904 .follows_by_leader_id_project_id
905 .entry((leader, project_id))
906 .or_default();
907 if !list.contains(&follower) {
908 list.push(follower);
909 }
910 }
911
912 this.pending_room_update.take();
913 if this.should_leave() {
914 log::info!("room is empty, leaving");
915 this.leave(cx).detach();
916 }
917
918 this.user_store.update(cx, |user_store, cx| {
919 let participant_indices_by_user_id = this
920 .remote_participants
921 .iter()
922 .map(|(user_id, participant)| (*user_id, participant.participant_index))
923 .collect();
924 user_store.set_participant_indices(participant_indices_by_user_id, cx);
925 });
926
927 this.check_invariants();
928 this.room_update_completed_tx.try_send(Some(())).ok();
929 cx.notify();
930 })
931 .ok();
932 })
933 }
934
935 fn livekit_room_updated(
936 &mut self,
937 event: RoomEvent,
938 cx: &mut ModelContext<Self>,
939 ) -> Result<()> {
940 log::trace!(
941 "client {:?}. livekit event: {:?}",
942 self.client.user_id(),
943 &event
944 );
945
946 match event {
947 #[cfg(not(target_os = "windows"))]
948 RoomEvent::TrackSubscribed {
949 track,
950 participant,
951 publication,
952 } => {
953 let user_id = participant.identity().0.parse()?;
954 let track_id = track.sid();
955 let participant = self.remote_participants.get_mut(&user_id).ok_or_else(|| {
956 anyhow!(
957 "{:?} subscribed to track by unknown participant {user_id}",
958 self.client.user_id()
959 )
960 })?;
961 if self.live_kit.as_ref().map_or(true, |kit| kit.deafened) {
962 track.rtc_track().set_enabled(false);
963 }
964 match track {
965 livekit::track::RemoteTrack::Audio(track) => {
966 cx.emit(Event::RemoteAudioTracksChanged {
967 participant_id: participant.peer_id,
968 });
969 let stream = play_remote_audio_track(&track, cx.background_executor())?;
970 participant.audio_tracks.insert(track_id, (track, stream));
971 participant.muted = publication.is_muted();
972 }
973 livekit::track::RemoteTrack::Video(track) => {
974 cx.emit(Event::RemoteVideoTracksChanged {
975 participant_id: participant.peer_id,
976 });
977 participant.video_tracks.insert(track_id, track);
978 }
979 }
980 }
981
982 #[cfg(not(target_os = "windows"))]
983 RoomEvent::TrackUnsubscribed {
984 track, participant, ..
985 } => {
986 let user_id = participant.identity().0.parse()?;
987 let participant = self.remote_participants.get_mut(&user_id).ok_or_else(|| {
988 anyhow!(
989 "{:?}, unsubscribed from track by unknown participant {user_id}",
990 self.client.user_id()
991 )
992 })?;
993 match track {
994 livekit::track::RemoteTrack::Audio(track) => {
995 participant.audio_tracks.remove(&track.sid());
996 participant.muted = true;
997 cx.emit(Event::RemoteAudioTracksChanged {
998 participant_id: participant.peer_id,
999 });
1000 }
1001 livekit::track::RemoteTrack::Video(track) => {
1002 participant.video_tracks.remove(&track.sid());
1003 cx.emit(Event::RemoteVideoTracksChanged {
1004 participant_id: participant.peer_id,
1005 });
1006 }
1007 }
1008 }
1009
1010 #[cfg(not(target_os = "windows"))]
1011 RoomEvent::ActiveSpeakersChanged { speakers } => {
1012 let mut speaker_ids = speakers
1013 .into_iter()
1014 .filter_map(|speaker| speaker.identity().0.parse().ok())
1015 .collect::<Vec<u64>>();
1016 speaker_ids.sort_unstable();
1017 for (sid, participant) in &mut self.remote_participants {
1018 participant.speaking = speaker_ids.binary_search(sid).is_ok();
1019 }
1020 if let Some(id) = self.client.user_id() {
1021 if let Some(room) = &mut self.live_kit {
1022 room.speaking = speaker_ids.binary_search(&id).is_ok();
1023 }
1024 }
1025 }
1026
1027 #[cfg(not(target_os = "windows"))]
1028 RoomEvent::TrackMuted {
1029 participant,
1030 publication,
1031 }
1032 | RoomEvent::TrackUnmuted {
1033 participant,
1034 publication,
1035 } => {
1036 let mut found = false;
1037 let user_id = participant.identity().0.parse()?;
1038 let track_id = publication.sid();
1039 if let Some(participant) = self.remote_participants.get_mut(&user_id) {
1040 for (track, _) in participant.audio_tracks.values() {
1041 if track.sid() == track_id {
1042 found = true;
1043 break;
1044 }
1045 }
1046 if found {
1047 participant.muted = publication.is_muted();
1048 }
1049 }
1050 }
1051
1052 #[cfg(not(target_os = "windows"))]
1053 RoomEvent::LocalTrackUnpublished { publication, .. } => {
1054 log::info!("unpublished track {}", publication.sid());
1055 if let Some(room) = &mut self.live_kit {
1056 if let LocalTrack::Published {
1057 track_publication, ..
1058 } = &room.microphone_track
1059 {
1060 if track_publication.sid() == publication.sid() {
1061 room.microphone_track = LocalTrack::None;
1062 }
1063 }
1064 if let LocalTrack::Published {
1065 track_publication, ..
1066 } = &room.screen_track
1067 {
1068 if track_publication.sid() == publication.sid() {
1069 room.screen_track = LocalTrack::None;
1070 }
1071 }
1072 }
1073 }
1074
1075 #[cfg(not(target_os = "windows"))]
1076 RoomEvent::LocalTrackPublished { publication, .. } => {
1077 log::info!("published track {:?}", publication.sid());
1078 }
1079
1080 #[cfg(not(target_os = "windows"))]
1081 RoomEvent::Disconnected { reason } => {
1082 log::info!("disconnected from room: {reason:?}");
1083 self.leave(cx).detach_and_log_err(cx);
1084 }
1085 _ => {}
1086 }
1087
1088 cx.notify();
1089 Ok(())
1090 }
1091
1092 fn check_invariants(&self) {
1093 #[cfg(any(test, feature = "test-support"))]
1094 {
1095 for participant in self.remote_participants.values() {
1096 assert!(self.participant_user_ids.contains(&participant.user.id));
1097 assert_ne!(participant.user.id, self.client.user_id().unwrap());
1098 }
1099
1100 for participant in &self.pending_participants {
1101 assert!(self.participant_user_ids.contains(&participant.id));
1102 assert_ne!(participant.id, self.client.user_id().unwrap());
1103 }
1104
1105 assert_eq!(
1106 self.participant_user_ids.len(),
1107 self.remote_participants.len() + self.pending_participants.len()
1108 );
1109 }
1110 }
1111
1112 pub(crate) fn call(
1113 &mut self,
1114 called_user_id: u64,
1115 initial_project_id: Option<u64>,
1116 cx: &mut ModelContext<Self>,
1117 ) -> Task<Result<()>> {
1118 if self.status.is_offline() {
1119 return Task::ready(Err(anyhow!("room is offline")));
1120 }
1121
1122 cx.notify();
1123 let client = self.client.clone();
1124 let room_id = self.id;
1125 self.pending_call_count += 1;
1126 cx.spawn(move |this, mut cx| async move {
1127 let result = client
1128 .request(proto::Call {
1129 room_id,
1130 called_user_id,
1131 initial_project_id,
1132 })
1133 .await;
1134 this.update(&mut cx, |this, cx| {
1135 this.pending_call_count -= 1;
1136 if this.should_leave() {
1137 this.leave(cx).detach_and_log_err(cx);
1138 }
1139 })?;
1140 result?;
1141 Ok(())
1142 })
1143 }
1144
1145 pub fn join_project(
1146 &mut self,
1147 id: u64,
1148 language_registry: Arc<LanguageRegistry>,
1149 fs: Arc<dyn Fs>,
1150 cx: &mut ModelContext<Self>,
1151 ) -> Task<Result<Model<Project>>> {
1152 let client = self.client.clone();
1153 let user_store = self.user_store.clone();
1154 cx.emit(Event::RemoteProjectJoined { project_id: id });
1155 cx.spawn(move |this, mut cx| async move {
1156 let project =
1157 Project::in_room(id, client, user_store, language_registry, fs, cx.clone()).await?;
1158
1159 this.update(&mut cx, |this, cx| {
1160 this.joined_projects.retain(|project| {
1161 if let Some(project) = project.upgrade() {
1162 !project.read(cx).is_disconnected(cx)
1163 } else {
1164 false
1165 }
1166 });
1167 this.joined_projects.insert(project.downgrade());
1168 })?;
1169 Ok(project)
1170 })
1171 }
1172
1173 pub fn share_project(
1174 &mut self,
1175 project: Model<Project>,
1176 cx: &mut ModelContext<Self>,
1177 ) -> Task<Result<u64>> {
1178 if let Some(project_id) = project.read(cx).remote_id() {
1179 return Task::ready(Ok(project_id));
1180 }
1181
1182 let request = self.client.request(proto::ShareProject {
1183 room_id: self.id(),
1184 worktrees: project.read(cx).worktree_metadata_protos(cx),
1185 is_ssh_project: project.read(cx).is_via_ssh(),
1186 });
1187
1188 cx.spawn(|this, mut cx| async move {
1189 let response = request.await?;
1190
1191 project.update(&mut cx, |project, cx| {
1192 project.shared(response.project_id, cx)
1193 })??;
1194
1195 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1196 this.update(&mut cx, |this, cx| {
1197 this.shared_projects.insert(project.downgrade());
1198 let active_project = this.local_participant.active_project.as_ref();
1199 if active_project.map_or(false, |location| *location == project) {
1200 this.set_location(Some(&project), cx)
1201 } else {
1202 Task::ready(Ok(()))
1203 }
1204 })?
1205 .await?;
1206
1207 Ok(response.project_id)
1208 })
1209 }
1210
1211 pub(crate) fn unshare_project(
1212 &mut self,
1213 project: Model<Project>,
1214 cx: &mut ModelContext<Self>,
1215 ) -> Result<()> {
1216 let project_id = match project.read(cx).remote_id() {
1217 Some(project_id) => project_id,
1218 None => return Ok(()),
1219 };
1220
1221 self.client.send(proto::UnshareProject { project_id })?;
1222 project.update(cx, |this, cx| this.unshare(cx))?;
1223
1224 if self.local_participant.active_project == Some(project.downgrade()) {
1225 self.set_location(Some(&project), cx).detach_and_log_err(cx);
1226 }
1227 Ok(())
1228 }
1229
1230 pub(crate) fn set_location(
1231 &mut self,
1232 project: Option<&Model<Project>>,
1233 cx: &mut ModelContext<Self>,
1234 ) -> Task<Result<()>> {
1235 if self.status.is_offline() {
1236 return Task::ready(Err(anyhow!("room is offline")));
1237 }
1238
1239 let client = self.client.clone();
1240 let room_id = self.id;
1241 let location = if let Some(project) = project {
1242 self.local_participant.active_project = Some(project.downgrade());
1243 if let Some(project_id) = project.read(cx).remote_id() {
1244 proto::participant_location::Variant::SharedProject(
1245 proto::participant_location::SharedProject { id: project_id },
1246 )
1247 } else {
1248 proto::participant_location::Variant::UnsharedProject(
1249 proto::participant_location::UnsharedProject {},
1250 )
1251 }
1252 } else {
1253 self.local_participant.active_project = None;
1254 proto::participant_location::Variant::External(proto::participant_location::External {})
1255 };
1256
1257 cx.notify();
1258 cx.background_executor().spawn(async move {
1259 client
1260 .request(proto::UpdateParticipantLocation {
1261 room_id,
1262 location: Some(proto::ParticipantLocation {
1263 variant: Some(location),
1264 }),
1265 })
1266 .await?;
1267 Ok(())
1268 })
1269 }
1270
1271 pub fn is_screen_sharing(&self) -> bool {
1272 self.live_kit.as_ref().map_or(false, |live_kit| {
1273 !matches!(live_kit.screen_track, LocalTrack::None)
1274 })
1275 }
1276
1277 pub fn is_sharing_mic(&self) -> bool {
1278 self.live_kit.as_ref().map_or(false, |live_kit| {
1279 !matches!(live_kit.microphone_track, LocalTrack::None)
1280 })
1281 }
1282
1283 pub fn is_muted(&self) -> bool {
1284 self.live_kit.as_ref().map_or(false, |live_kit| {
1285 matches!(live_kit.microphone_track, LocalTrack::None)
1286 || live_kit.muted_by_user
1287 || live_kit.deafened
1288 })
1289 }
1290
1291 pub fn is_speaking(&self) -> bool {
1292 self.live_kit
1293 .as_ref()
1294 .map_or(false, |live_kit| live_kit.speaking)
1295 }
1296
1297 pub fn is_deafened(&self) -> Option<bool> {
1298 self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1299 }
1300
1301 pub fn can_use_microphone(&self, _cx: &AppContext) -> bool {
1302 use proto::ChannelRole::*;
1303
1304 #[cfg(not(any(test, feature = "test-support")))]
1305 {
1306 use feature_flags::FeatureFlagAppExt as _;
1307 if cfg!(target_os = "windows") || (cfg!(target_os = "linux") && !_cx.is_staff()) {
1308 return false;
1309 }
1310 }
1311
1312 match self.local_participant.role {
1313 Admin | Member | Talker => true,
1314 Guest | Banned => false,
1315 }
1316 }
1317
1318 pub fn can_share_projects(&self) -> bool {
1319 use proto::ChannelRole::*;
1320 match self.local_participant.role {
1321 Admin | Member => true,
1322 Guest | Banned | Talker => false,
1323 }
1324 }
1325
1326 #[cfg(target_os = "windows")]
1327 pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1328 Task::ready(Err(anyhow!("Windows is not supported yet")))
1329 }
1330
1331 #[cfg(not(target_os = "windows"))]
1332 #[track_caller]
1333 pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1334 if self.status.is_offline() {
1335 return Task::ready(Err(anyhow!("room is offline")));
1336 }
1337
1338 let (participant, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1339 let publish_id = post_inc(&mut live_kit.next_publish_id);
1340 live_kit.microphone_track = LocalTrack::Pending { publish_id };
1341 cx.notify();
1342 (live_kit.room.local_participant(), publish_id)
1343 } else {
1344 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1345 };
1346
1347 cx.spawn(move |this, mut cx| async move {
1348 let (track, stream) = capture_local_audio_track(cx.background_executor())?.await;
1349
1350 let publication = participant
1351 .publish_track(
1352 livekit::track::LocalTrack::Audio(track),
1353 TrackPublishOptions {
1354 source: TrackSource::Microphone,
1355 ..Default::default()
1356 },
1357 )
1358 .await
1359 .map_err(|error| anyhow!("failed to publish track: {error}"));
1360 this.update(&mut cx, |this, cx| {
1361 let live_kit = this
1362 .live_kit
1363 .as_mut()
1364 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1365
1366 let canceled = if let LocalTrack::Pending {
1367 publish_id: cur_publish_id,
1368 } = &live_kit.microphone_track
1369 {
1370 *cur_publish_id != publish_id
1371 } else {
1372 true
1373 };
1374
1375 match publication {
1376 Ok(publication) => {
1377 if canceled {
1378 cx.background_executor()
1379 .spawn(async move {
1380 participant.unpublish_track(&publication.sid()).await
1381 })
1382 .detach_and_log_err(cx)
1383 } else {
1384 if live_kit.muted_by_user || live_kit.deafened {
1385 publication.mute();
1386 }
1387 live_kit.microphone_track = LocalTrack::Published {
1388 track_publication: publication,
1389 _stream: Box::new(stream),
1390 };
1391 cx.notify();
1392 }
1393 Ok(())
1394 }
1395 Err(error) => {
1396 if canceled {
1397 Ok(())
1398 } else {
1399 live_kit.microphone_track = LocalTrack::None;
1400 cx.notify();
1401 Err(error)
1402 }
1403 }
1404 }
1405 })?
1406 })
1407 }
1408
1409 #[cfg(target_os = "windows")]
1410 pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1411 Task::ready(Err(anyhow!("Windows is not supported yet")))
1412 }
1413
1414 #[cfg(not(target_os = "windows"))]
1415 pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1416 if self.status.is_offline() {
1417 return Task::ready(Err(anyhow!("room is offline")));
1418 }
1419 if self.is_screen_sharing() {
1420 return Task::ready(Err(anyhow!("screen was already shared")));
1421 }
1422
1423 let (participant, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1424 let publish_id = post_inc(&mut live_kit.next_publish_id);
1425 live_kit.screen_track = LocalTrack::Pending { publish_id };
1426 cx.notify();
1427 (live_kit.room.local_participant(), publish_id)
1428 } else {
1429 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1430 };
1431
1432 let sources = cx.screen_capture_sources();
1433
1434 cx.spawn(move |this, mut cx| async move {
1435 let sources = sources.await??;
1436 let source = sources.first().ok_or_else(|| anyhow!("no display found"))?;
1437
1438 let (track, stream) = capture_local_video_track(&**source).await?;
1439
1440 let publication = participant
1441 .publish_track(
1442 livekit::track::LocalTrack::Video(track),
1443 TrackPublishOptions {
1444 source: TrackSource::Screenshare,
1445 video_codec: VideoCodec::H264,
1446 ..Default::default()
1447 },
1448 )
1449 .await
1450 .map_err(|error| anyhow!("error publishing screen track {error:?}"));
1451
1452 this.update(&mut cx, |this, cx| {
1453 let live_kit = this
1454 .live_kit
1455 .as_mut()
1456 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1457
1458 let canceled = if let LocalTrack::Pending {
1459 publish_id: cur_publish_id,
1460 } = &live_kit.screen_track
1461 {
1462 *cur_publish_id != publish_id
1463 } else {
1464 true
1465 };
1466
1467 match publication {
1468 Ok(publication) => {
1469 if canceled {
1470 cx.background_executor()
1471 .spawn(async move {
1472 participant.unpublish_track(&publication.sid()).await
1473 })
1474 .detach()
1475 } else {
1476 live_kit.screen_track = LocalTrack::Published {
1477 track_publication: publication,
1478 _stream: Box::new(stream),
1479 };
1480 cx.notify();
1481 }
1482
1483 Audio::play_sound(Sound::StartScreenshare, cx);
1484 Ok(())
1485 }
1486 Err(error) => {
1487 if canceled {
1488 Ok(())
1489 } else {
1490 live_kit.screen_track = LocalTrack::None;
1491 cx.notify();
1492 Err(error)
1493 }
1494 }
1495 }
1496 })?
1497 })
1498 }
1499
1500 pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) {
1501 if let Some(live_kit) = self.live_kit.as_mut() {
1502 // When unmuting, undeafen if the user was deafened before.
1503 let was_deafened = live_kit.deafened;
1504 if live_kit.muted_by_user
1505 || live_kit.deafened
1506 || matches!(live_kit.microphone_track, LocalTrack::None)
1507 {
1508 live_kit.muted_by_user = false;
1509 live_kit.deafened = false;
1510 } else {
1511 live_kit.muted_by_user = true;
1512 }
1513 let muted = live_kit.muted_by_user;
1514 let should_undeafen = was_deafened && !live_kit.deafened;
1515
1516 if let Some(task) = self.set_mute(muted, cx) {
1517 task.detach_and_log_err(cx);
1518 }
1519
1520 if should_undeafen {
1521 self.set_deafened(false, cx);
1522 }
1523 }
1524 }
1525
1526 pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) {
1527 if let Some(live_kit) = self.live_kit.as_mut() {
1528 // When deafening, mute the microphone if it was not already muted.
1529 // When un-deafening, unmute the microphone, unless it was explicitly muted.
1530 let deafened = !live_kit.deafened;
1531 live_kit.deafened = deafened;
1532 let should_change_mute = !live_kit.muted_by_user;
1533
1534 self.set_deafened(deafened, cx);
1535
1536 if should_change_mute {
1537 if let Some(task) = self.set_mute(deafened, cx) {
1538 task.detach_and_log_err(cx);
1539 }
1540 }
1541 }
1542 }
1543
1544 pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1545 if self.status.is_offline() {
1546 return Err(anyhow!("room is offline"));
1547 }
1548
1549 let live_kit = self
1550 .live_kit
1551 .as_mut()
1552 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1553 match mem::take(&mut live_kit.screen_track) {
1554 LocalTrack::None => Err(anyhow!("screen was not shared")),
1555 LocalTrack::Pending { .. } => {
1556 cx.notify();
1557 Ok(())
1558 }
1559 LocalTrack::Published {
1560 track_publication, ..
1561 } => {
1562 #[cfg(not(target_os = "windows"))]
1563 {
1564 let local_participant = live_kit.room.local_participant();
1565 let sid = track_publication.sid();
1566 cx.background_executor()
1567 .spawn(async move { local_participant.unpublish_track(&sid).await })
1568 .detach_and_log_err(cx);
1569 cx.notify();
1570 }
1571 Audio::play_sound(Sound::StopScreenshare, cx);
1572 Ok(())
1573 }
1574 }
1575 }
1576
1577 fn set_deafened(&mut self, deafened: bool, cx: &mut ModelContext<Self>) -> Option<()> {
1578 #[cfg(not(target_os = "windows"))]
1579 {
1580 let live_kit = self.live_kit.as_mut()?;
1581 cx.notify();
1582 for (_, participant) in live_kit.room.remote_participants() {
1583 for (_, publication) in participant.track_publications() {
1584 if publication.kind() == TrackKind::Audio {
1585 publication.set_enabled(!deafened);
1586 }
1587 }
1588 }
1589 }
1590
1591 None
1592 }
1593
1594 fn set_mute(
1595 &mut self,
1596 should_mute: bool,
1597 cx: &mut ModelContext<Room>,
1598 ) -> Option<Task<Result<()>>> {
1599 let live_kit = self.live_kit.as_mut()?;
1600 cx.notify();
1601
1602 if should_mute {
1603 Audio::play_sound(Sound::Mute, cx);
1604 } else {
1605 Audio::play_sound(Sound::Unmute, cx);
1606 }
1607
1608 match &mut live_kit.microphone_track {
1609 LocalTrack::None => {
1610 if should_mute {
1611 None
1612 } else {
1613 Some(self.share_microphone(cx))
1614 }
1615 }
1616 LocalTrack::Pending { .. } => None,
1617 LocalTrack::Published {
1618 track_publication, ..
1619 } => {
1620 #[cfg(not(target_os = "windows"))]
1621 {
1622 if should_mute {
1623 track_publication.mute()
1624 } else {
1625 track_publication.unmute()
1626 }
1627 }
1628 None
1629 }
1630 }
1631 }
1632}
1633
1634#[cfg(target_os = "windows")]
1635fn spawn_room_connection(
1636 livekit_connection_info: Option<proto::LiveKitConnectionInfo>,
1637 cx: &mut ModelContext<'_, Room>,
1638) {
1639}
1640
1641#[cfg(not(target_os = "windows"))]
1642fn spawn_room_connection(
1643 livekit_connection_info: Option<proto::LiveKitConnectionInfo>,
1644 cx: &mut ModelContext<'_, Room>,
1645) {
1646 if let Some(connection_info) = livekit_connection_info {
1647 cx.spawn(|this, mut cx| async move {
1648 let (room, mut events) = livekit::Room::connect(
1649 &connection_info.server_url,
1650 &connection_info.token,
1651 RoomOptions::default(),
1652 )
1653 .await?;
1654
1655 this.update(&mut cx, |this, cx| {
1656 let _handle_updates = cx.spawn(|this, mut cx| async move {
1657 while let Some(event) = events.recv().await {
1658 if this
1659 .update(&mut cx, |this, cx| {
1660 this.livekit_room_updated(event, cx).warn_on_err();
1661 })
1662 .is_err()
1663 {
1664 break;
1665 }
1666 }
1667 });
1668
1669 let muted_by_user = Room::mute_on_join(cx);
1670 this.live_kit = Some(LiveKitRoom {
1671 room: Arc::new(room),
1672 screen_track: LocalTrack::None,
1673 microphone_track: LocalTrack::None,
1674 next_publish_id: 0,
1675 muted_by_user,
1676 deafened: false,
1677 speaking: false,
1678 _handle_updates,
1679 });
1680
1681 if !muted_by_user && this.can_use_microphone(cx) {
1682 this.share_microphone(cx)
1683 } else {
1684 Task::ready(Ok(()))
1685 }
1686 })?
1687 .await
1688 })
1689 .detach_and_log_err(cx);
1690 }
1691}
1692
1693struct LiveKitRoom {
1694 room: Arc<livekit::Room>,
1695 screen_track: LocalTrack,
1696 microphone_track: LocalTrack,
1697 /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1698 muted_by_user: bool,
1699 deafened: bool,
1700 speaking: bool,
1701 next_publish_id: usize,
1702 _handle_updates: Task<()>,
1703}
1704
1705impl LiveKitRoom {
1706 #[cfg(target_os = "windows")]
1707 fn stop_publishing(&mut self, _cx: &mut ModelContext<Room>) {}
1708
1709 #[cfg(not(target_os = "windows"))]
1710 fn stop_publishing(&mut self, cx: &mut ModelContext<Room>) {
1711 let mut tracks_to_unpublish = Vec::new();
1712 if let LocalTrack::Published {
1713 track_publication, ..
1714 } = mem::replace(&mut self.microphone_track, LocalTrack::None)
1715 {
1716 tracks_to_unpublish.push(track_publication.sid());
1717 cx.notify();
1718 }
1719
1720 if let LocalTrack::Published {
1721 track_publication, ..
1722 } = mem::replace(&mut self.screen_track, LocalTrack::None)
1723 {
1724 tracks_to_unpublish.push(track_publication.sid());
1725 cx.notify();
1726 }
1727
1728 let participant = self.room.local_participant();
1729 cx.background_executor()
1730 .spawn(async move {
1731 for sid in tracks_to_unpublish {
1732 participant.unpublish_track(&sid).await.log_err();
1733 }
1734 })
1735 .detach();
1736 }
1737}
1738
1739enum LocalTrack {
1740 None,
1741 Pending {
1742 publish_id: usize,
1743 },
1744 Published {
1745 track_publication: LocalTrackPublication,
1746 _stream: Box<dyn Any>,
1747 },
1748}
1749
1750impl Default for LocalTrack {
1751 fn default() -> Self {
1752 Self::None
1753 }
1754}
1755
1756#[derive(Copy, Clone, PartialEq, Eq)]
1757pub enum RoomStatus {
1758 Online,
1759 Rejoining,
1760 Offline,
1761}
1762
1763impl RoomStatus {
1764 pub fn is_offline(&self) -> bool {
1765 matches!(self, RoomStatus::Offline)
1766 }
1767
1768 pub fn is_online(&self) -> bool {
1769 matches!(self, RoomStatus::Online)
1770 }
1771}