1use crate::db::{self, ChannelId, ProjectId, UserId};
2use anyhow::{anyhow, Result};
3use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet};
4use nanoid::nanoid;
5use rpc::{proto, ConnectionId};
6use serde::Serialize;
7use std::{borrow::Cow, mem, path::PathBuf, str, time::Duration};
8use time::OffsetDateTime;
9use tracing::instrument;
10use util::post_inc;
11
12pub type RoomId = u64;
13
14#[derive(Default, Serialize)]
15pub struct Store {
16 connections: BTreeMap<ConnectionId, ConnectionState>,
17 connected_users: BTreeMap<UserId, ConnectedUser>,
18 next_room_id: RoomId,
19 rooms: BTreeMap<RoomId, proto::Room>,
20 projects: BTreeMap<ProjectId, Project>,
21 #[serde(skip)]
22 channels: BTreeMap<ChannelId, Channel>,
23}
24
25#[derive(Default, Serialize)]
26struct ConnectedUser {
27 connection_ids: HashSet<ConnectionId>,
28 active_call: Option<Call>,
29}
30
31#[derive(Serialize)]
32struct ConnectionState {
33 user_id: UserId,
34 admin: bool,
35 projects: BTreeSet<ProjectId>,
36 channels: HashSet<ChannelId>,
37}
38
39#[derive(Copy, Clone, Eq, PartialEq, Serialize)]
40pub struct Call {
41 pub caller_user_id: UserId,
42 pub room_id: RoomId,
43 pub connection_id: Option<ConnectionId>,
44 pub initial_project_id: Option<ProjectId>,
45}
46
47#[derive(Serialize)]
48pub struct Project {
49 pub id: ProjectId,
50 pub room_id: RoomId,
51 pub host_connection_id: ConnectionId,
52 pub host: Collaborator,
53 pub guests: HashMap<ConnectionId, Collaborator>,
54 pub active_replica_ids: HashSet<ReplicaId>,
55 pub worktrees: BTreeMap<u64, Worktree>,
56 pub language_servers: Vec<proto::LanguageServer>,
57}
58
59#[derive(Serialize)]
60pub struct Collaborator {
61 pub replica_id: ReplicaId,
62 pub user_id: UserId,
63 #[serde(skip)]
64 pub last_activity: Option<OffsetDateTime>,
65 pub admin: bool,
66}
67
68#[derive(Default, Serialize)]
69pub struct Worktree {
70 pub root_name: String,
71 pub visible: bool,
72 #[serde(skip)]
73 pub entries: BTreeMap<u64, proto::Entry>,
74 #[serde(skip)]
75 pub diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
76 pub scan_id: u64,
77 pub is_complete: bool,
78}
79
80#[derive(Default)]
81pub struct Channel {
82 pub connection_ids: HashSet<ConnectionId>,
83}
84
85pub type ReplicaId = u16;
86
87#[derive(Default)]
88pub struct RemovedConnectionState<'a> {
89 pub user_id: UserId,
90 pub hosted_projects: Vec<Project>,
91 pub guest_projects: Vec<LeftProject>,
92 pub contact_ids: HashSet<UserId>,
93 pub room: Option<Cow<'a, proto::Room>>,
94 pub canceled_call_connection_ids: Vec<ConnectionId>,
95}
96
97pub struct LeftProject {
98 pub id: ProjectId,
99 pub host_user_id: UserId,
100 pub host_connection_id: ConnectionId,
101 pub connection_ids: Vec<ConnectionId>,
102 pub remove_collaborator: bool,
103}
104
105pub struct LeftRoom<'a> {
106 pub room: Cow<'a, proto::Room>,
107 pub unshared_projects: Vec<Project>,
108 pub left_projects: Vec<LeftProject>,
109 pub canceled_call_connection_ids: Vec<ConnectionId>,
110}
111
112#[derive(Copy, Clone)]
113pub struct Metrics {
114 pub connections: usize,
115 pub registered_projects: usize,
116 pub active_projects: usize,
117 pub shared_projects: usize,
118}
119
120impl Store {
121 pub fn metrics(&self) -> Metrics {
122 const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60);
123 let active_window_start = OffsetDateTime::now_utc() - ACTIVE_PROJECT_TIMEOUT;
124
125 let connections = self.connections.values().filter(|c| !c.admin).count();
126 let mut registered_projects = 0;
127 let mut active_projects = 0;
128 let mut shared_projects = 0;
129 for project in self.projects.values() {
130 if let Some(connection) = self.connections.get(&project.host_connection_id) {
131 if !connection.admin {
132 registered_projects += 1;
133 if project.is_active_since(active_window_start) {
134 active_projects += 1;
135 if !project.guests.is_empty() {
136 shared_projects += 1;
137 }
138 }
139 }
140 }
141 }
142
143 Metrics {
144 connections,
145 registered_projects,
146 active_projects,
147 shared_projects,
148 }
149 }
150
151 #[instrument(skip(self))]
152 pub fn add_connection(
153 &mut self,
154 connection_id: ConnectionId,
155 user_id: UserId,
156 admin: bool,
157 ) -> Option<proto::IncomingCall> {
158 self.connections.insert(
159 connection_id,
160 ConnectionState {
161 user_id,
162 admin,
163 projects: Default::default(),
164 channels: Default::default(),
165 },
166 );
167 let connected_user = self.connected_users.entry(user_id).or_default();
168 connected_user.connection_ids.insert(connection_id);
169 if let Some(active_call) = connected_user.active_call {
170 if active_call.connection_id.is_some() {
171 None
172 } else {
173 let room = self.room(active_call.room_id)?;
174 Some(proto::IncomingCall {
175 room_id: active_call.room_id,
176 caller_user_id: active_call.caller_user_id.to_proto(),
177 participant_user_ids: room
178 .participants
179 .iter()
180 .map(|participant| participant.user_id)
181 .collect(),
182 initial_project: active_call
183 .initial_project_id
184 .and_then(|id| Self::build_participant_project(id, &self.projects)),
185 })
186 }
187 } else {
188 None
189 }
190 }
191
192 #[instrument(skip(self))]
193 pub fn remove_connection(
194 &mut self,
195 connection_id: ConnectionId,
196 ) -> Result<RemovedConnectionState> {
197 let connection = self
198 .connections
199 .get_mut(&connection_id)
200 .ok_or_else(|| anyhow!("no such connection"))?;
201
202 let user_id = connection.user_id;
203 let connection_channels = mem::take(&mut connection.channels);
204
205 let mut result = RemovedConnectionState {
206 user_id,
207 ..Default::default()
208 };
209
210 // Leave all channels.
211 for channel_id in connection_channels {
212 self.leave_channel(connection_id, channel_id);
213 }
214
215 let connected_user = self.connected_users.get(&user_id).unwrap();
216 if let Some(active_call) = connected_user.active_call.as_ref() {
217 let room_id = active_call.room_id;
218 let left_room = self.leave_room(room_id, connection_id)?;
219 result.hosted_projects = left_room.unshared_projects;
220 result.guest_projects = left_room.left_projects;
221 result.room = Some(Cow::Owned(left_room.room.into_owned()));
222 result.canceled_call_connection_ids = left_room.canceled_call_connection_ids;
223 }
224
225 let connected_user = self.connected_users.get_mut(&user_id).unwrap();
226 connected_user.connection_ids.remove(&connection_id);
227 if connected_user.connection_ids.is_empty() {
228 self.connected_users.remove(&user_id);
229 }
230 self.connections.remove(&connection_id).unwrap();
231
232 Ok(result)
233 }
234
235 #[cfg(test)]
236 pub fn channel(&self, id: ChannelId) -> Option<&Channel> {
237 self.channels.get(&id)
238 }
239
240 pub fn join_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
241 if let Some(connection) = self.connections.get_mut(&connection_id) {
242 connection.channels.insert(channel_id);
243 self.channels
244 .entry(channel_id)
245 .or_default()
246 .connection_ids
247 .insert(connection_id);
248 }
249 }
250
251 pub fn leave_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
252 if let Some(connection) = self.connections.get_mut(&connection_id) {
253 connection.channels.remove(&channel_id);
254 if let btree_map::Entry::Occupied(mut entry) = self.channels.entry(channel_id) {
255 entry.get_mut().connection_ids.remove(&connection_id);
256 if entry.get_mut().connection_ids.is_empty() {
257 entry.remove();
258 }
259 }
260 }
261 }
262
263 pub fn user_id_for_connection(&self, connection_id: ConnectionId) -> Result<UserId> {
264 Ok(self
265 .connections
266 .get(&connection_id)
267 .ok_or_else(|| anyhow!("unknown connection"))?
268 .user_id)
269 }
270
271 pub fn connection_ids_for_user(
272 &self,
273 user_id: UserId,
274 ) -> impl Iterator<Item = ConnectionId> + '_ {
275 self.connected_users
276 .get(&user_id)
277 .into_iter()
278 .map(|state| &state.connection_ids)
279 .flatten()
280 .copied()
281 }
282
283 pub fn is_user_online(&self, user_id: UserId) -> bool {
284 !self
285 .connected_users
286 .get(&user_id)
287 .unwrap_or(&Default::default())
288 .connection_ids
289 .is_empty()
290 }
291
292 fn is_user_busy(&self, user_id: UserId) -> bool {
293 self.connected_users
294 .get(&user_id)
295 .unwrap_or(&Default::default())
296 .active_call
297 .is_some()
298 }
299
300 pub fn build_initial_contacts_update(
301 &self,
302 contacts: Vec<db::Contact>,
303 ) -> proto::UpdateContacts {
304 let mut update = proto::UpdateContacts::default();
305
306 for contact in contacts {
307 match contact {
308 db::Contact::Accepted {
309 user_id,
310 should_notify,
311 } => {
312 update
313 .contacts
314 .push(self.contact_for_user(user_id, should_notify));
315 }
316 db::Contact::Outgoing { user_id } => {
317 update.outgoing_requests.push(user_id.to_proto())
318 }
319 db::Contact::Incoming {
320 user_id,
321 should_notify,
322 } => update
323 .incoming_requests
324 .push(proto::IncomingContactRequest {
325 requester_id: user_id.to_proto(),
326 should_notify,
327 }),
328 }
329 }
330
331 update
332 }
333
334 pub fn contact_for_user(&self, user_id: UserId, should_notify: bool) -> proto::Contact {
335 proto::Contact {
336 user_id: user_id.to_proto(),
337 online: self.is_user_online(user_id),
338 busy: self.is_user_busy(user_id),
339 should_notify,
340 }
341 }
342
343 pub fn create_room(&mut self, creator_connection_id: ConnectionId) -> Result<&proto::Room> {
344 let connection = self
345 .connections
346 .get_mut(&creator_connection_id)
347 .ok_or_else(|| anyhow!("no such connection"))?;
348 let connected_user = self
349 .connected_users
350 .get_mut(&connection.user_id)
351 .ok_or_else(|| anyhow!("no such connection"))?;
352 anyhow::ensure!(
353 connected_user.active_call.is_none(),
354 "can't create a room with an active call"
355 );
356
357 let room_id = post_inc(&mut self.next_room_id);
358 let room = proto::Room {
359 id: room_id,
360 participants: vec![proto::Participant {
361 user_id: connection.user_id.to_proto(),
362 peer_id: creator_connection_id.0,
363 projects: Default::default(),
364 location: Some(proto::ParticipantLocation {
365 variant: Some(proto::participant_location::Variant::External(
366 proto::participant_location::External {},
367 )),
368 }),
369 }],
370 pending_participant_user_ids: Default::default(),
371 live_kit_room: nanoid!(30),
372 };
373
374 self.rooms.insert(room_id, room);
375 connected_user.active_call = Some(Call {
376 caller_user_id: connection.user_id,
377 room_id,
378 connection_id: Some(creator_connection_id),
379 initial_project_id: None,
380 });
381 Ok(self.rooms.get(&room_id).unwrap())
382 }
383
384 pub fn join_room(
385 &mut self,
386 room_id: RoomId,
387 connection_id: ConnectionId,
388 ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
389 let connection = self
390 .connections
391 .get_mut(&connection_id)
392 .ok_or_else(|| anyhow!("no such connection"))?;
393 let user_id = connection.user_id;
394 let recipient_connection_ids = self.connection_ids_for_user(user_id).collect::<Vec<_>>();
395
396 let connected_user = self
397 .connected_users
398 .get_mut(&user_id)
399 .ok_or_else(|| anyhow!("no such connection"))?;
400 let active_call = connected_user
401 .active_call
402 .as_mut()
403 .ok_or_else(|| anyhow!("not being called"))?;
404 anyhow::ensure!(
405 active_call.room_id == room_id && active_call.connection_id.is_none(),
406 "not being called on this room"
407 );
408
409 let room = self
410 .rooms
411 .get_mut(&room_id)
412 .ok_or_else(|| anyhow!("no such room"))?;
413 anyhow::ensure!(
414 room.pending_participant_user_ids
415 .contains(&user_id.to_proto()),
416 anyhow!("no such room")
417 );
418 room.pending_participant_user_ids
419 .retain(|pending| *pending != user_id.to_proto());
420 room.participants.push(proto::Participant {
421 user_id: user_id.to_proto(),
422 peer_id: connection_id.0,
423 projects: Default::default(),
424 location: Some(proto::ParticipantLocation {
425 variant: Some(proto::participant_location::Variant::External(
426 proto::participant_location::External {},
427 )),
428 }),
429 });
430 active_call.connection_id = Some(connection_id);
431
432 Ok((room, recipient_connection_ids))
433 }
434
435 pub fn leave_room(&mut self, room_id: RoomId, connection_id: ConnectionId) -> Result<LeftRoom> {
436 let connection = self
437 .connections
438 .get_mut(&connection_id)
439 .ok_or_else(|| anyhow!("no such connection"))?;
440 let user_id = connection.user_id;
441
442 let connected_user = self
443 .connected_users
444 .get(&user_id)
445 .ok_or_else(|| anyhow!("no such connection"))?;
446 anyhow::ensure!(
447 connected_user
448 .active_call
449 .map_or(false, |call| call.room_id == room_id
450 && call.connection_id == Some(connection_id)),
451 "cannot leave a room before joining it"
452 );
453
454 // Given that users can only join one room at a time, we can safely unshare
455 // and leave all projects associated with the connection.
456 let mut unshared_projects = Vec::new();
457 let mut left_projects = Vec::new();
458 for project_id in connection.projects.clone() {
459 if let Ok((_, project)) = self.unshare_project(project_id, connection_id) {
460 unshared_projects.push(project);
461 } else if let Ok(project) = self.leave_project(project_id, connection_id) {
462 left_projects.push(project);
463 }
464 }
465 self.connected_users.get_mut(&user_id).unwrap().active_call = None;
466
467 let room = self
468 .rooms
469 .get_mut(&room_id)
470 .ok_or_else(|| anyhow!("no such room"))?;
471 room.participants
472 .retain(|participant| participant.peer_id != connection_id.0);
473
474 let mut canceled_call_connection_ids = Vec::new();
475 room.pending_participant_user_ids
476 .retain(|pending_participant_user_id| {
477 if let Some(connected_user) = self
478 .connected_users
479 .get_mut(&UserId::from_proto(*pending_participant_user_id))
480 {
481 if let Some(call) = connected_user.active_call.as_ref() {
482 if call.caller_user_id == user_id {
483 connected_user.active_call.take();
484 canceled_call_connection_ids
485 .extend(connected_user.connection_ids.iter().copied());
486 false
487 } else {
488 true
489 }
490 } else {
491 true
492 }
493 } else {
494 true
495 }
496 });
497
498 let room = if room.participants.is_empty() && room.pending_participant_user_ids.is_empty() {
499 Cow::Owned(self.rooms.remove(&room_id).unwrap())
500 } else {
501 Cow::Borrowed(self.rooms.get(&room_id).unwrap())
502 };
503
504 Ok(LeftRoom {
505 room,
506 unshared_projects,
507 left_projects,
508 canceled_call_connection_ids,
509 })
510 }
511
512 pub fn room(&self, room_id: RoomId) -> Option<&proto::Room> {
513 self.rooms.get(&room_id)
514 }
515
516 pub fn call(
517 &mut self,
518 room_id: RoomId,
519 recipient_user_id: UserId,
520 initial_project_id: Option<ProjectId>,
521 from_connection_id: ConnectionId,
522 ) -> Result<(&proto::Room, Vec<ConnectionId>, proto::IncomingCall)> {
523 let caller_user_id = self.user_id_for_connection(from_connection_id)?;
524
525 let recipient_connection_ids = self
526 .connection_ids_for_user(recipient_user_id)
527 .collect::<Vec<_>>();
528 let mut recipient = self
529 .connected_users
530 .get_mut(&recipient_user_id)
531 .ok_or_else(|| anyhow!("no such connection"))?;
532 anyhow::ensure!(
533 recipient.active_call.is_none(),
534 "recipient is already on another call"
535 );
536
537 let room = self
538 .rooms
539 .get_mut(&room_id)
540 .ok_or_else(|| anyhow!("no such room"))?;
541 anyhow::ensure!(
542 room.participants
543 .iter()
544 .any(|participant| participant.peer_id == from_connection_id.0),
545 "no such room"
546 );
547 anyhow::ensure!(
548 room.pending_participant_user_ids
549 .iter()
550 .all(|user_id| UserId::from_proto(*user_id) != recipient_user_id),
551 "cannot call the same user more than once"
552 );
553 room.pending_participant_user_ids
554 .push(recipient_user_id.to_proto());
555
556 if let Some(initial_project_id) = initial_project_id {
557 let project = self
558 .projects
559 .get(&initial_project_id)
560 .ok_or_else(|| anyhow!("no such project"))?;
561 anyhow::ensure!(project.room_id == room_id, "no such project");
562 }
563
564 recipient.active_call = Some(Call {
565 caller_user_id,
566 room_id,
567 connection_id: None,
568 initial_project_id,
569 });
570
571 Ok((
572 room,
573 recipient_connection_ids,
574 proto::IncomingCall {
575 room_id,
576 caller_user_id: caller_user_id.to_proto(),
577 participant_user_ids: room
578 .participants
579 .iter()
580 .map(|participant| participant.user_id)
581 .collect(),
582 initial_project: initial_project_id
583 .and_then(|id| Self::build_participant_project(id, &self.projects)),
584 },
585 ))
586 }
587
588 pub fn call_failed(&mut self, room_id: RoomId, to_user_id: UserId) -> Result<&proto::Room> {
589 let mut recipient = self
590 .connected_users
591 .get_mut(&to_user_id)
592 .ok_or_else(|| anyhow!("no such connection"))?;
593 anyhow::ensure!(recipient
594 .active_call
595 .map_or(false, |call| call.room_id == room_id
596 && call.connection_id.is_none()));
597 recipient.active_call = None;
598 let room = self
599 .rooms
600 .get_mut(&room_id)
601 .ok_or_else(|| anyhow!("no such room"))?;
602 room.pending_participant_user_ids
603 .retain(|user_id| UserId::from_proto(*user_id) != to_user_id);
604 Ok(room)
605 }
606
607 pub fn cancel_call(
608 &mut self,
609 room_id: RoomId,
610 recipient_user_id: UserId,
611 canceller_connection_id: ConnectionId,
612 ) -> Result<(&proto::Room, HashSet<ConnectionId>)> {
613 let canceller_user_id = self.user_id_for_connection(canceller_connection_id)?;
614 let canceller = self
615 .connected_users
616 .get(&canceller_user_id)
617 .ok_or_else(|| anyhow!("no such connection"))?;
618 let recipient = self
619 .connected_users
620 .get(&recipient_user_id)
621 .ok_or_else(|| anyhow!("no such connection"))?;
622 let canceller_active_call = canceller
623 .active_call
624 .as_ref()
625 .ok_or_else(|| anyhow!("no active call"))?;
626 let recipient_active_call = recipient
627 .active_call
628 .as_ref()
629 .ok_or_else(|| anyhow!("no active call for recipient"))?;
630
631 anyhow::ensure!(
632 canceller_active_call.room_id == room_id,
633 "users are on different calls"
634 );
635 anyhow::ensure!(
636 recipient_active_call.room_id == room_id,
637 "users are on different calls"
638 );
639 anyhow::ensure!(
640 recipient_active_call.connection_id.is_none(),
641 "recipient has already answered"
642 );
643 let room_id = recipient_active_call.room_id;
644 let room = self
645 .rooms
646 .get_mut(&room_id)
647 .ok_or_else(|| anyhow!("no such room"))?;
648 room.pending_participant_user_ids
649 .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
650
651 let recipient = self.connected_users.get_mut(&recipient_user_id).unwrap();
652 recipient.active_call.take();
653
654 Ok((room, recipient.connection_ids.clone()))
655 }
656
657 pub fn decline_call(
658 &mut self,
659 room_id: RoomId,
660 recipient_connection_id: ConnectionId,
661 ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
662 let recipient_user_id = self.user_id_for_connection(recipient_connection_id)?;
663 let recipient = self
664 .connected_users
665 .get_mut(&recipient_user_id)
666 .ok_or_else(|| anyhow!("no such connection"))?;
667 if let Some(active_call) = recipient.active_call.take() {
668 anyhow::ensure!(active_call.room_id == room_id, "no such room");
669 let recipient_connection_ids = self
670 .connection_ids_for_user(recipient_user_id)
671 .collect::<Vec<_>>();
672 let room = self
673 .rooms
674 .get_mut(&active_call.room_id)
675 .ok_or_else(|| anyhow!("no such room"))?;
676 room.pending_participant_user_ids
677 .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
678 Ok((room, recipient_connection_ids))
679 } else {
680 Err(anyhow!("user is not being called"))
681 }
682 }
683
684 pub fn update_participant_location(
685 &mut self,
686 room_id: RoomId,
687 location: proto::ParticipantLocation,
688 connection_id: ConnectionId,
689 ) -> Result<&proto::Room> {
690 let room = self
691 .rooms
692 .get_mut(&room_id)
693 .ok_or_else(|| anyhow!("no such room"))?;
694 if let Some(proto::participant_location::Variant::SharedProject(project)) =
695 location.variant.as_ref()
696 {
697 anyhow::ensure!(
698 room.participants
699 .iter()
700 .flat_map(|participant| &participant.projects)
701 .any(|participant_project| participant_project.id == project.id),
702 "no such project"
703 );
704 }
705
706 let participant = room
707 .participants
708 .iter_mut()
709 .find(|participant| participant.peer_id == connection_id.0)
710 .ok_or_else(|| anyhow!("no such room"))?;
711 participant.location = Some(location);
712
713 Ok(room)
714 }
715
716 pub fn share_project(
717 &mut self,
718 room_id: RoomId,
719 project_id: ProjectId,
720 worktrees: Vec<proto::WorktreeMetadata>,
721 host_connection_id: ConnectionId,
722 ) -> Result<&proto::Room> {
723 let connection = self
724 .connections
725 .get_mut(&host_connection_id)
726 .ok_or_else(|| anyhow!("no such connection"))?;
727
728 let room = self
729 .rooms
730 .get_mut(&room_id)
731 .ok_or_else(|| anyhow!("no such room"))?;
732 let participant = room
733 .participants
734 .iter_mut()
735 .find(|participant| participant.peer_id == host_connection_id.0)
736 .ok_or_else(|| anyhow!("no such room"))?;
737
738 connection.projects.insert(project_id);
739 self.projects.insert(
740 project_id,
741 Project {
742 id: project_id,
743 room_id,
744 host_connection_id,
745 host: Collaborator {
746 user_id: connection.user_id,
747 replica_id: 0,
748 last_activity: None,
749 admin: connection.admin,
750 },
751 guests: Default::default(),
752 active_replica_ids: Default::default(),
753 worktrees: worktrees
754 .into_iter()
755 .map(|worktree| {
756 (
757 worktree.id,
758 Worktree {
759 root_name: worktree.root_name,
760 visible: worktree.visible,
761 ..Default::default()
762 },
763 )
764 })
765 .collect(),
766 language_servers: Default::default(),
767 },
768 );
769
770 participant
771 .projects
772 .extend(Self::build_participant_project(project_id, &self.projects));
773
774 Ok(room)
775 }
776
777 pub fn unshare_project(
778 &mut self,
779 project_id: ProjectId,
780 connection_id: ConnectionId,
781 ) -> Result<(&proto::Room, Project)> {
782 match self.projects.entry(project_id) {
783 btree_map::Entry::Occupied(e) => {
784 if e.get().host_connection_id == connection_id {
785 let project = e.remove();
786
787 if let Some(host_connection) = self.connections.get_mut(&connection_id) {
788 host_connection.projects.remove(&project_id);
789 }
790
791 for guest_connection in project.guests.keys() {
792 if let Some(connection) = self.connections.get_mut(guest_connection) {
793 connection.projects.remove(&project_id);
794 }
795 }
796
797 let room = self
798 .rooms
799 .get_mut(&project.room_id)
800 .ok_or_else(|| anyhow!("no such room"))?;
801 let participant = room
802 .participants
803 .iter_mut()
804 .find(|participant| participant.peer_id == connection_id.0)
805 .ok_or_else(|| anyhow!("no such room"))?;
806 participant
807 .projects
808 .retain(|project| project.id != project_id.to_proto());
809
810 Ok((room, project))
811 } else {
812 Err(anyhow!("no such project"))?
813 }
814 }
815 btree_map::Entry::Vacant(_) => Err(anyhow!("no such project"))?,
816 }
817 }
818
819 pub fn update_project(
820 &mut self,
821 project_id: ProjectId,
822 worktrees: &[proto::WorktreeMetadata],
823 connection_id: ConnectionId,
824 ) -> Result<&proto::Room> {
825 let project = self
826 .projects
827 .get_mut(&project_id)
828 .ok_or_else(|| anyhow!("no such project"))?;
829 if project.host_connection_id == connection_id {
830 let mut old_worktrees = mem::take(&mut project.worktrees);
831 for worktree in worktrees {
832 if let Some(old_worktree) = old_worktrees.remove(&worktree.id) {
833 project.worktrees.insert(worktree.id, old_worktree);
834 } else {
835 project.worktrees.insert(
836 worktree.id,
837 Worktree {
838 root_name: worktree.root_name.clone(),
839 visible: worktree.visible,
840 ..Default::default()
841 },
842 );
843 }
844 }
845
846 let room = self
847 .rooms
848 .get_mut(&project.room_id)
849 .ok_or_else(|| anyhow!("no such room"))?;
850 let participant_project = room
851 .participants
852 .iter_mut()
853 .flat_map(|participant| &mut participant.projects)
854 .find(|project| project.id == project_id.to_proto())
855 .ok_or_else(|| anyhow!("no such project"))?;
856 participant_project.worktree_root_names = worktrees
857 .iter()
858 .filter(|worktree| worktree.visible)
859 .map(|worktree| worktree.root_name.clone())
860 .collect();
861
862 Ok(room)
863 } else {
864 Err(anyhow!("no such project"))?
865 }
866 }
867
868 pub fn update_diagnostic_summary(
869 &mut self,
870 project_id: ProjectId,
871 worktree_id: u64,
872 connection_id: ConnectionId,
873 summary: proto::DiagnosticSummary,
874 ) -> Result<Vec<ConnectionId>> {
875 let project = self
876 .projects
877 .get_mut(&project_id)
878 .ok_or_else(|| anyhow!("no such project"))?;
879 if project.host_connection_id == connection_id {
880 let worktree = project
881 .worktrees
882 .get_mut(&worktree_id)
883 .ok_or_else(|| anyhow!("no such worktree"))?;
884 worktree
885 .diagnostic_summaries
886 .insert(summary.path.clone().into(), summary);
887 return Ok(project.connection_ids());
888 }
889
890 Err(anyhow!("no such worktree"))?
891 }
892
893 pub fn start_language_server(
894 &mut self,
895 project_id: ProjectId,
896 connection_id: ConnectionId,
897 language_server: proto::LanguageServer,
898 ) -> Result<Vec<ConnectionId>> {
899 let project = self
900 .projects
901 .get_mut(&project_id)
902 .ok_or_else(|| anyhow!("no such project"))?;
903 if project.host_connection_id == connection_id {
904 project.language_servers.push(language_server);
905 return Ok(project.connection_ids());
906 }
907
908 Err(anyhow!("no such project"))?
909 }
910
911 pub fn join_project(
912 &mut self,
913 requester_connection_id: ConnectionId,
914 project_id: ProjectId,
915 ) -> Result<(&Project, ReplicaId)> {
916 let connection = self
917 .connections
918 .get_mut(&requester_connection_id)
919 .ok_or_else(|| anyhow!("no such connection"))?;
920 let user = self
921 .connected_users
922 .get(&connection.user_id)
923 .ok_or_else(|| anyhow!("no such connection"))?;
924 let active_call = user.active_call.ok_or_else(|| anyhow!("no such project"))?;
925 anyhow::ensure!(
926 active_call.connection_id == Some(requester_connection_id),
927 "no such project"
928 );
929
930 let project = self
931 .projects
932 .get_mut(&project_id)
933 .ok_or_else(|| anyhow!("no such project"))?;
934 anyhow::ensure!(project.room_id == active_call.room_id, "no such project");
935
936 connection.projects.insert(project_id);
937 let mut replica_id = 1;
938 while project.active_replica_ids.contains(&replica_id) {
939 replica_id += 1;
940 }
941 project.active_replica_ids.insert(replica_id);
942 project.guests.insert(
943 requester_connection_id,
944 Collaborator {
945 replica_id,
946 user_id: connection.user_id,
947 last_activity: Some(OffsetDateTime::now_utc()),
948 admin: connection.admin,
949 },
950 );
951
952 project.host.last_activity = Some(OffsetDateTime::now_utc());
953 Ok((project, replica_id))
954 }
955
956 pub fn leave_project(
957 &mut self,
958 project_id: ProjectId,
959 connection_id: ConnectionId,
960 ) -> Result<LeftProject> {
961 let project = self
962 .projects
963 .get_mut(&project_id)
964 .ok_or_else(|| anyhow!("no such project"))?;
965
966 // If the connection leaving the project is a collaborator, remove it.
967 let remove_collaborator = if let Some(guest) = project.guests.remove(&connection_id) {
968 project.active_replica_ids.remove(&guest.replica_id);
969 true
970 } else {
971 false
972 };
973
974 if let Some(connection) = self.connections.get_mut(&connection_id) {
975 connection.projects.remove(&project_id);
976 }
977
978 Ok(LeftProject {
979 id: project.id,
980 host_connection_id: project.host_connection_id,
981 host_user_id: project.host.user_id,
982 connection_ids: project.connection_ids(),
983 remove_collaborator,
984 })
985 }
986
987 #[allow(clippy::too_many_arguments)]
988 pub fn update_worktree(
989 &mut self,
990 connection_id: ConnectionId,
991 project_id: ProjectId,
992 worktree_id: u64,
993 worktree_root_name: &str,
994 removed_entries: &[u64],
995 updated_entries: &[proto::Entry],
996 scan_id: u64,
997 is_last_update: bool,
998 ) -> Result<Vec<ConnectionId>> {
999 let project = self.write_project(project_id, connection_id)?;
1000
1001 let connection_ids = project.connection_ids();
1002 let mut worktree = project.worktrees.entry(worktree_id).or_default();
1003 worktree.root_name = worktree_root_name.to_string();
1004
1005 for entry_id in removed_entries {
1006 worktree.entries.remove(entry_id);
1007 }
1008
1009 for entry in updated_entries {
1010 worktree.entries.insert(entry.id, entry.clone());
1011 }
1012
1013 worktree.scan_id = scan_id;
1014 worktree.is_complete = is_last_update;
1015 Ok(connection_ids)
1016 }
1017
1018 fn build_participant_project(
1019 project_id: ProjectId,
1020 projects: &BTreeMap<ProjectId, Project>,
1021 ) -> Option<proto::ParticipantProject> {
1022 Some(proto::ParticipantProject {
1023 id: project_id.to_proto(),
1024 worktree_root_names: projects
1025 .get(&project_id)?
1026 .worktrees
1027 .values()
1028 .filter(|worktree| worktree.visible)
1029 .map(|worktree| worktree.root_name.clone())
1030 .collect(),
1031 })
1032 }
1033
1034 pub fn project_connection_ids(
1035 &self,
1036 project_id: ProjectId,
1037 acting_connection_id: ConnectionId,
1038 ) -> Result<Vec<ConnectionId>> {
1039 Ok(self
1040 .read_project(project_id, acting_connection_id)?
1041 .connection_ids())
1042 }
1043
1044 pub fn channel_connection_ids(&self, channel_id: ChannelId) -> Result<Vec<ConnectionId>> {
1045 Ok(self
1046 .channels
1047 .get(&channel_id)
1048 .ok_or_else(|| anyhow!("no such channel"))?
1049 .connection_ids())
1050 }
1051
1052 pub fn project(&self, project_id: ProjectId) -> Result<&Project> {
1053 self.projects
1054 .get(&project_id)
1055 .ok_or_else(|| anyhow!("no such project"))
1056 }
1057
1058 pub fn register_project_activity(
1059 &mut self,
1060 project_id: ProjectId,
1061 connection_id: ConnectionId,
1062 ) -> Result<()> {
1063 let project = self
1064 .projects
1065 .get_mut(&project_id)
1066 .ok_or_else(|| anyhow!("no such project"))?;
1067 let collaborator = if connection_id == project.host_connection_id {
1068 &mut project.host
1069 } else if let Some(guest) = project.guests.get_mut(&connection_id) {
1070 guest
1071 } else {
1072 return Err(anyhow!("no such project"))?;
1073 };
1074 collaborator.last_activity = Some(OffsetDateTime::now_utc());
1075 Ok(())
1076 }
1077
1078 pub fn projects(&self) -> impl Iterator<Item = (&ProjectId, &Project)> {
1079 self.projects.iter()
1080 }
1081
1082 pub fn read_project(
1083 &self,
1084 project_id: ProjectId,
1085 connection_id: ConnectionId,
1086 ) -> Result<&Project> {
1087 let project = self
1088 .projects
1089 .get(&project_id)
1090 .ok_or_else(|| anyhow!("no such project"))?;
1091 if project.host_connection_id == connection_id
1092 || project.guests.contains_key(&connection_id)
1093 {
1094 Ok(project)
1095 } else {
1096 Err(anyhow!("no such project"))?
1097 }
1098 }
1099
1100 fn write_project(
1101 &mut self,
1102 project_id: ProjectId,
1103 connection_id: ConnectionId,
1104 ) -> Result<&mut Project> {
1105 let project = self
1106 .projects
1107 .get_mut(&project_id)
1108 .ok_or_else(|| anyhow!("no such project"))?;
1109 if project.host_connection_id == connection_id
1110 || project.guests.contains_key(&connection_id)
1111 {
1112 Ok(project)
1113 } else {
1114 Err(anyhow!("no such project"))?
1115 }
1116 }
1117
1118 #[cfg(test)]
1119 pub fn check_invariants(&self) {
1120 for (connection_id, connection) in &self.connections {
1121 for project_id in &connection.projects {
1122 let project = &self.projects.get(project_id).unwrap();
1123 if project.host_connection_id != *connection_id {
1124 assert!(project.guests.contains_key(connection_id));
1125 }
1126
1127 for (worktree_id, worktree) in project.worktrees.iter() {
1128 let mut paths = HashMap::default();
1129 for entry in worktree.entries.values() {
1130 let prev_entry = paths.insert(&entry.path, entry);
1131 assert_eq!(
1132 prev_entry,
1133 None,
1134 "worktree {:?}, duplicate path for entries {:?} and {:?}",
1135 worktree_id,
1136 prev_entry.unwrap(),
1137 entry
1138 );
1139 }
1140 }
1141 }
1142 for channel_id in &connection.channels {
1143 let channel = self.channels.get(channel_id).unwrap();
1144 assert!(channel.connection_ids.contains(connection_id));
1145 }
1146 assert!(self
1147 .connected_users
1148 .get(&connection.user_id)
1149 .unwrap()
1150 .connection_ids
1151 .contains(connection_id));
1152 }
1153
1154 for (user_id, state) in &self.connected_users {
1155 for connection_id in &state.connection_ids {
1156 assert_eq!(
1157 self.connections.get(connection_id).unwrap().user_id,
1158 *user_id
1159 );
1160 }
1161
1162 if let Some(active_call) = state.active_call.as_ref() {
1163 if let Some(active_call_connection_id) = active_call.connection_id {
1164 assert!(
1165 state.connection_ids.contains(&active_call_connection_id),
1166 "call is active on a dead connection"
1167 );
1168 assert!(
1169 state.connection_ids.contains(&active_call_connection_id),
1170 "call is active on a dead connection"
1171 );
1172 }
1173 }
1174 }
1175
1176 for (room_id, room) in &self.rooms {
1177 for pending_user_id in &room.pending_participant_user_ids {
1178 assert!(
1179 self.connected_users
1180 .contains_key(&UserId::from_proto(*pending_user_id)),
1181 "call is active on a user that has disconnected"
1182 );
1183 }
1184
1185 for participant in &room.participants {
1186 assert!(
1187 self.connections
1188 .contains_key(&ConnectionId(participant.peer_id)),
1189 "room contains participant that has disconnected"
1190 );
1191
1192 for participant_project in &participant.projects {
1193 let project = &self.projects[&ProjectId::from_proto(participant_project.id)];
1194 assert_eq!(
1195 project.room_id, *room_id,
1196 "project was shared on a different room"
1197 );
1198 }
1199 }
1200
1201 assert!(
1202 !room.pending_participant_user_ids.is_empty() || !room.participants.is_empty(),
1203 "room can't be empty"
1204 );
1205 }
1206
1207 for (project_id, project) in &self.projects {
1208 let host_connection = self.connections.get(&project.host_connection_id).unwrap();
1209 assert!(host_connection.projects.contains(project_id));
1210
1211 for guest_connection_id in project.guests.keys() {
1212 let guest_connection = self.connections.get(guest_connection_id).unwrap();
1213 assert!(guest_connection.projects.contains(project_id));
1214 }
1215 assert_eq!(project.active_replica_ids.len(), project.guests.len());
1216 assert_eq!(
1217 project.active_replica_ids,
1218 project
1219 .guests
1220 .values()
1221 .map(|guest| guest.replica_id)
1222 .collect::<HashSet<_>>(),
1223 );
1224
1225 let room = &self.rooms[&project.room_id];
1226 let room_participant = room
1227 .participants
1228 .iter()
1229 .find(|participant| participant.peer_id == project.host_connection_id.0)
1230 .unwrap();
1231 assert!(
1232 room_participant
1233 .projects
1234 .iter()
1235 .any(|project| project.id == project_id.to_proto()),
1236 "project was not shared in room"
1237 );
1238 }
1239
1240 for (channel_id, channel) in &self.channels {
1241 for connection_id in &channel.connection_ids {
1242 let connection = self.connections.get(connection_id).unwrap();
1243 assert!(connection.channels.contains(channel_id));
1244 }
1245 }
1246 }
1247}
1248
1249impl Project {
1250 fn is_active_since(&self, start_time: OffsetDateTime) -> bool {
1251 self.guests
1252 .values()
1253 .chain([&self.host])
1254 .any(|collaborator| {
1255 collaborator
1256 .last_activity
1257 .map_or(false, |active_time| active_time > start_time)
1258 })
1259 }
1260
1261 pub fn guest_connection_ids(&self) -> Vec<ConnectionId> {
1262 self.guests.keys().copied().collect()
1263 }
1264
1265 pub fn connection_ids(&self) -> Vec<ConnectionId> {
1266 self.guests
1267 .keys()
1268 .copied()
1269 .chain(Some(self.host_connection_id))
1270 .collect()
1271 }
1272}
1273
1274impl Channel {
1275 fn connection_ids(&self) -> Vec<ConnectionId> {
1276 self.connection_ids.iter().copied().collect()
1277 }
1278}