1use crate::db::{self, ProjectId, UserId};
2use anyhow::{anyhow, Result};
3use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet};
4use nanoid::nanoid;
5use rpc::{proto, ConnectionId};
6use serde::Serialize;
7use std::{borrow::Cow, mem, path::PathBuf, str};
8use tracing::instrument;
9use util::post_inc;
10
11pub type RoomId = u64;
12
13#[derive(Default, Serialize)]
14pub struct Store {
15 connections: BTreeMap<ConnectionId, ConnectionState>,
16 connected_users: BTreeMap<UserId, ConnectedUser>,
17 next_room_id: RoomId,
18 rooms: BTreeMap<RoomId, proto::Room>,
19 projects: BTreeMap<ProjectId, Project>,
20}
21
22#[derive(Default, Serialize)]
23struct ConnectedUser {
24 connection_ids: HashSet<ConnectionId>,
25 active_call: Option<Call>,
26}
27
28#[derive(Serialize)]
29struct ConnectionState {
30 user_id: UserId,
31 admin: bool,
32 projects: BTreeSet<ProjectId>,
33}
34
35#[derive(Copy, Clone, Eq, PartialEq, Serialize)]
36pub struct Call {
37 pub caller_user_id: UserId,
38 pub room_id: RoomId,
39 pub connection_id: Option<ConnectionId>,
40 pub initial_project_id: Option<ProjectId>,
41}
42
43#[derive(Serialize)]
44pub struct Project {
45 pub id: ProjectId,
46 pub room_id: RoomId,
47 pub host_connection_id: ConnectionId,
48 pub host: Collaborator,
49 pub guests: HashMap<ConnectionId, Collaborator>,
50 pub active_replica_ids: HashSet<ReplicaId>,
51 pub worktrees: BTreeMap<u64, Worktree>,
52 pub language_servers: Vec<proto::LanguageServer>,
53}
54
55#[derive(Serialize)]
56pub struct Collaborator {
57 pub replica_id: ReplicaId,
58 pub user_id: UserId,
59 pub admin: bool,
60}
61
62#[derive(Default, Serialize)]
63pub struct Worktree {
64 pub abs_path: PathBuf,
65 pub root_name: String,
66 pub visible: bool,
67 #[serde(skip)]
68 pub entries: BTreeMap<u64, proto::Entry>,
69 #[serde(skip)]
70 pub diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
71 pub scan_id: u64,
72 pub is_complete: bool,
73}
74
75pub type ReplicaId = u16;
76
77#[derive(Default)]
78pub struct RemovedConnectionState<'a> {
79 pub user_id: UserId,
80 pub hosted_projects: Vec<Project>,
81 pub guest_projects: Vec<LeftProject>,
82 pub contact_ids: HashSet<UserId>,
83 pub room: Option<Cow<'a, proto::Room>>,
84 pub canceled_call_connection_ids: Vec<ConnectionId>,
85}
86
87pub struct LeftProject {
88 pub id: ProjectId,
89 pub host_user_id: UserId,
90 pub host_connection_id: ConnectionId,
91 pub connection_ids: Vec<ConnectionId>,
92 pub remove_collaborator: bool,
93}
94
95pub struct LeftRoom<'a> {
96 pub room: Cow<'a, proto::Room>,
97 pub unshared_projects: Vec<Project>,
98 pub left_projects: Vec<LeftProject>,
99 pub canceled_call_connection_ids: Vec<ConnectionId>,
100}
101
102#[derive(Copy, Clone)]
103pub struct Metrics {
104 pub connections: usize,
105 pub shared_projects: usize,
106}
107
108impl Store {
109 pub fn metrics(&self) -> Metrics {
110 let connections = self.connections.values().filter(|c| !c.admin).count();
111 let mut shared_projects = 0;
112 for project in self.projects.values() {
113 if let Some(connection) = self.connections.get(&project.host_connection_id) {
114 if !connection.admin {
115 shared_projects += 1;
116 }
117 }
118 }
119
120 Metrics {
121 connections,
122 shared_projects,
123 }
124 }
125
126 #[instrument(skip(self))]
127 pub fn add_connection(
128 &mut self,
129 connection_id: ConnectionId,
130 user_id: UserId,
131 admin: bool,
132 ) -> Option<proto::IncomingCall> {
133 self.connections.insert(
134 connection_id,
135 ConnectionState {
136 user_id,
137 admin,
138 projects: Default::default(),
139 },
140 );
141 let connected_user = self.connected_users.entry(user_id).or_default();
142 connected_user.connection_ids.insert(connection_id);
143 if let Some(active_call) = connected_user.active_call {
144 if active_call.connection_id.is_some() {
145 None
146 } else {
147 let room = self.room(active_call.room_id)?;
148 Some(proto::IncomingCall {
149 room_id: active_call.room_id,
150 caller_user_id: active_call.caller_user_id.to_proto(),
151 participant_user_ids: room
152 .participants
153 .iter()
154 .map(|participant| participant.user_id)
155 .collect(),
156 initial_project: active_call
157 .initial_project_id
158 .and_then(|id| Self::build_participant_project(id, &self.projects)),
159 })
160 }
161 } else {
162 None
163 }
164 }
165
166 #[instrument(skip(self))]
167 pub fn remove_connection(
168 &mut self,
169 connection_id: ConnectionId,
170 ) -> Result<RemovedConnectionState> {
171 let connection = self
172 .connections
173 .get_mut(&connection_id)
174 .ok_or_else(|| anyhow!("no such connection"))?;
175
176 let user_id = connection.user_id;
177
178 let mut result = RemovedConnectionState {
179 user_id,
180 ..Default::default()
181 };
182
183 let connected_user = self.connected_users.get(&user_id).unwrap();
184 if let Some(active_call) = connected_user.active_call.as_ref() {
185 let room_id = active_call.room_id;
186 if active_call.connection_id == Some(connection_id) {
187 let left_room = self.leave_room(room_id, connection_id)?;
188 result.hosted_projects = left_room.unshared_projects;
189 result.guest_projects = left_room.left_projects;
190 result.room = Some(Cow::Owned(left_room.room.into_owned()));
191 result.canceled_call_connection_ids = left_room.canceled_call_connection_ids;
192 } else if connected_user.connection_ids.len() == 1 {
193 let (room, _) = self.decline_call(room_id, connection_id)?;
194 result.room = Some(Cow::Owned(room.clone()));
195 }
196 }
197
198 let connected_user = self.connected_users.get_mut(&user_id).unwrap();
199 connected_user.connection_ids.remove(&connection_id);
200 if connected_user.connection_ids.is_empty() {
201 self.connected_users.remove(&user_id);
202 }
203 self.connections.remove(&connection_id).unwrap();
204
205 Ok(result)
206 }
207
208 pub fn user_id_for_connection(&self, connection_id: ConnectionId) -> Result<UserId> {
209 Ok(self
210 .connections
211 .get(&connection_id)
212 .ok_or_else(|| anyhow!("unknown connection"))?
213 .user_id)
214 }
215
216 pub fn connection_ids_for_user(
217 &self,
218 user_id: UserId,
219 ) -> impl Iterator<Item = ConnectionId> + '_ {
220 self.connected_users
221 .get(&user_id)
222 .into_iter()
223 .map(|state| &state.connection_ids)
224 .flatten()
225 .copied()
226 }
227
228 pub fn is_user_online(&self, user_id: UserId) -> bool {
229 !self
230 .connected_users
231 .get(&user_id)
232 .unwrap_or(&Default::default())
233 .connection_ids
234 .is_empty()
235 }
236
237 fn is_user_busy(&self, user_id: UserId) -> bool {
238 self.connected_users
239 .get(&user_id)
240 .unwrap_or(&Default::default())
241 .active_call
242 .is_some()
243 }
244
245 pub fn build_initial_contacts_update(
246 &self,
247 contacts: Vec<db::Contact>,
248 ) -> proto::UpdateContacts {
249 let mut update = proto::UpdateContacts::default();
250
251 for contact in contacts {
252 match contact {
253 db::Contact::Accepted {
254 user_id,
255 should_notify,
256 } => {
257 update
258 .contacts
259 .push(self.contact_for_user(user_id, should_notify));
260 }
261 db::Contact::Outgoing { user_id } => {
262 update.outgoing_requests.push(user_id.to_proto())
263 }
264 db::Contact::Incoming {
265 user_id,
266 should_notify,
267 } => update
268 .incoming_requests
269 .push(proto::IncomingContactRequest {
270 requester_id: user_id.to_proto(),
271 should_notify,
272 }),
273 }
274 }
275
276 update
277 }
278
279 pub fn contact_for_user(&self, user_id: UserId, should_notify: bool) -> proto::Contact {
280 proto::Contact {
281 user_id: user_id.to_proto(),
282 online: self.is_user_online(user_id),
283 busy: self.is_user_busy(user_id),
284 should_notify,
285 }
286 }
287
288 pub fn create_room(&mut self, creator_connection_id: ConnectionId) -> Result<&proto::Room> {
289 let connection = self
290 .connections
291 .get_mut(&creator_connection_id)
292 .ok_or_else(|| anyhow!("no such connection"))?;
293 let connected_user = self
294 .connected_users
295 .get_mut(&connection.user_id)
296 .ok_or_else(|| anyhow!("no such connection"))?;
297 anyhow::ensure!(
298 connected_user.active_call.is_none(),
299 "can't create a room with an active call"
300 );
301
302 let room_id = post_inc(&mut self.next_room_id);
303 let room = proto::Room {
304 id: room_id,
305 participants: vec![proto::Participant {
306 user_id: connection.user_id.to_proto(),
307 peer_id: creator_connection_id.0,
308 projects: Default::default(),
309 location: Some(proto::ParticipantLocation {
310 variant: Some(proto::participant_location::Variant::External(
311 proto::participant_location::External {},
312 )),
313 }),
314 }],
315 pending_participant_user_ids: Default::default(),
316 live_kit_room: nanoid!(30),
317 };
318
319 self.rooms.insert(room_id, room);
320 connected_user.active_call = Some(Call {
321 caller_user_id: connection.user_id,
322 room_id,
323 connection_id: Some(creator_connection_id),
324 initial_project_id: None,
325 });
326 Ok(self.rooms.get(&room_id).unwrap())
327 }
328
329 pub fn join_room(
330 &mut self,
331 room_id: RoomId,
332 connection_id: ConnectionId,
333 ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
334 let connection = self
335 .connections
336 .get_mut(&connection_id)
337 .ok_or_else(|| anyhow!("no such connection"))?;
338 let user_id = connection.user_id;
339 let recipient_connection_ids = self.connection_ids_for_user(user_id).collect::<Vec<_>>();
340
341 let connected_user = self
342 .connected_users
343 .get_mut(&user_id)
344 .ok_or_else(|| anyhow!("no such connection"))?;
345 let active_call = connected_user
346 .active_call
347 .as_mut()
348 .ok_or_else(|| anyhow!("not being called"))?;
349 anyhow::ensure!(
350 active_call.room_id == room_id && active_call.connection_id.is_none(),
351 "not being called on this room"
352 );
353
354 let room = self
355 .rooms
356 .get_mut(&room_id)
357 .ok_or_else(|| anyhow!("no such room"))?;
358 anyhow::ensure!(
359 room.pending_participant_user_ids
360 .contains(&user_id.to_proto()),
361 anyhow!("no such room")
362 );
363 room.pending_participant_user_ids
364 .retain(|pending| *pending != user_id.to_proto());
365 room.participants.push(proto::Participant {
366 user_id: user_id.to_proto(),
367 peer_id: connection_id.0,
368 projects: Default::default(),
369 location: Some(proto::ParticipantLocation {
370 variant: Some(proto::participant_location::Variant::External(
371 proto::participant_location::External {},
372 )),
373 }),
374 });
375 active_call.connection_id = Some(connection_id);
376
377 Ok((room, recipient_connection_ids))
378 }
379
380 pub fn leave_room(&mut self, room_id: RoomId, connection_id: ConnectionId) -> Result<LeftRoom> {
381 let connection = self
382 .connections
383 .get_mut(&connection_id)
384 .ok_or_else(|| anyhow!("no such connection"))?;
385 let user_id = connection.user_id;
386
387 let connected_user = self
388 .connected_users
389 .get(&user_id)
390 .ok_or_else(|| anyhow!("no such connection"))?;
391 anyhow::ensure!(
392 connected_user
393 .active_call
394 .map_or(false, |call| call.room_id == room_id
395 && call.connection_id == Some(connection_id)),
396 "cannot leave a room before joining it"
397 );
398
399 // Given that users can only join one room at a time, we can safely unshare
400 // and leave all projects associated with the connection.
401 let mut unshared_projects = Vec::new();
402 let mut left_projects = Vec::new();
403 for project_id in connection.projects.clone() {
404 if let Ok((_, project)) = self.unshare_project(project_id, connection_id) {
405 unshared_projects.push(project);
406 } else if let Ok(project) = self.leave_project(project_id, connection_id) {
407 left_projects.push(project);
408 }
409 }
410 self.connected_users.get_mut(&user_id).unwrap().active_call = None;
411
412 let room = self
413 .rooms
414 .get_mut(&room_id)
415 .ok_or_else(|| anyhow!("no such room"))?;
416 room.participants
417 .retain(|participant| participant.peer_id != connection_id.0);
418
419 let mut canceled_call_connection_ids = Vec::new();
420 room.pending_participant_user_ids
421 .retain(|pending_participant_user_id| {
422 if let Some(connected_user) = self
423 .connected_users
424 .get_mut(&UserId::from_proto(*pending_participant_user_id))
425 {
426 if let Some(call) = connected_user.active_call.as_ref() {
427 if call.caller_user_id == user_id {
428 connected_user.active_call.take();
429 canceled_call_connection_ids
430 .extend(connected_user.connection_ids.iter().copied());
431 false
432 } else {
433 true
434 }
435 } else {
436 true
437 }
438 } else {
439 true
440 }
441 });
442
443 let room = if room.participants.is_empty() {
444 Cow::Owned(self.rooms.remove(&room_id).unwrap())
445 } else {
446 Cow::Borrowed(self.rooms.get(&room_id).unwrap())
447 };
448
449 Ok(LeftRoom {
450 room,
451 unshared_projects,
452 left_projects,
453 canceled_call_connection_ids,
454 })
455 }
456
457 pub fn room(&self, room_id: RoomId) -> Option<&proto::Room> {
458 self.rooms.get(&room_id)
459 }
460
461 pub fn rooms(&self) -> &BTreeMap<RoomId, proto::Room> {
462 &self.rooms
463 }
464
465 pub fn call(
466 &mut self,
467 room_id: RoomId,
468 recipient_user_id: UserId,
469 initial_project_id: Option<ProjectId>,
470 from_connection_id: ConnectionId,
471 ) -> Result<(&proto::Room, Vec<ConnectionId>, proto::IncomingCall)> {
472 let caller_user_id = self.user_id_for_connection(from_connection_id)?;
473
474 let recipient_connection_ids = self
475 .connection_ids_for_user(recipient_user_id)
476 .collect::<Vec<_>>();
477 let mut recipient = self
478 .connected_users
479 .get_mut(&recipient_user_id)
480 .ok_or_else(|| anyhow!("no such connection"))?;
481 anyhow::ensure!(
482 recipient.active_call.is_none(),
483 "recipient is already on another call"
484 );
485
486 let room = self
487 .rooms
488 .get_mut(&room_id)
489 .ok_or_else(|| anyhow!("no such room"))?;
490 anyhow::ensure!(
491 room.participants
492 .iter()
493 .any(|participant| participant.peer_id == from_connection_id.0),
494 "no such room"
495 );
496 anyhow::ensure!(
497 room.pending_participant_user_ids
498 .iter()
499 .all(|user_id| UserId::from_proto(*user_id) != recipient_user_id),
500 "cannot call the same user more than once"
501 );
502 room.pending_participant_user_ids
503 .push(recipient_user_id.to_proto());
504
505 if let Some(initial_project_id) = initial_project_id {
506 let project = self
507 .projects
508 .get(&initial_project_id)
509 .ok_or_else(|| anyhow!("no such project"))?;
510 anyhow::ensure!(project.room_id == room_id, "no such project");
511 }
512
513 recipient.active_call = Some(Call {
514 caller_user_id,
515 room_id,
516 connection_id: None,
517 initial_project_id,
518 });
519
520 Ok((
521 room,
522 recipient_connection_ids,
523 proto::IncomingCall {
524 room_id,
525 caller_user_id: caller_user_id.to_proto(),
526 participant_user_ids: room
527 .participants
528 .iter()
529 .map(|participant| participant.user_id)
530 .collect(),
531 initial_project: initial_project_id
532 .and_then(|id| Self::build_participant_project(id, &self.projects)),
533 },
534 ))
535 }
536
537 pub fn call_failed(&mut self, room_id: RoomId, to_user_id: UserId) -> Result<&proto::Room> {
538 let mut recipient = self
539 .connected_users
540 .get_mut(&to_user_id)
541 .ok_or_else(|| anyhow!("no such connection"))?;
542 anyhow::ensure!(recipient
543 .active_call
544 .map_or(false, |call| call.room_id == room_id
545 && call.connection_id.is_none()));
546 recipient.active_call = None;
547 let room = self
548 .rooms
549 .get_mut(&room_id)
550 .ok_or_else(|| anyhow!("no such room"))?;
551 room.pending_participant_user_ids
552 .retain(|user_id| UserId::from_proto(*user_id) != to_user_id);
553 Ok(room)
554 }
555
556 pub fn cancel_call(
557 &mut self,
558 room_id: RoomId,
559 recipient_user_id: UserId,
560 canceller_connection_id: ConnectionId,
561 ) -> Result<(&proto::Room, HashSet<ConnectionId>)> {
562 let canceller_user_id = self.user_id_for_connection(canceller_connection_id)?;
563 let canceller = self
564 .connected_users
565 .get(&canceller_user_id)
566 .ok_or_else(|| anyhow!("no such connection"))?;
567 let recipient = self
568 .connected_users
569 .get(&recipient_user_id)
570 .ok_or_else(|| anyhow!("no such connection"))?;
571 let canceller_active_call = canceller
572 .active_call
573 .as_ref()
574 .ok_or_else(|| anyhow!("no active call"))?;
575 let recipient_active_call = recipient
576 .active_call
577 .as_ref()
578 .ok_or_else(|| anyhow!("no active call for recipient"))?;
579
580 anyhow::ensure!(
581 canceller_active_call.room_id == room_id,
582 "users are on different calls"
583 );
584 anyhow::ensure!(
585 recipient_active_call.room_id == room_id,
586 "users are on different calls"
587 );
588 anyhow::ensure!(
589 recipient_active_call.connection_id.is_none(),
590 "recipient has already answered"
591 );
592 let room_id = recipient_active_call.room_id;
593 let room = self
594 .rooms
595 .get_mut(&room_id)
596 .ok_or_else(|| anyhow!("no such room"))?;
597 room.pending_participant_user_ids
598 .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
599
600 let recipient = self.connected_users.get_mut(&recipient_user_id).unwrap();
601 recipient.active_call.take();
602
603 Ok((room, recipient.connection_ids.clone()))
604 }
605
606 pub fn decline_call(
607 &mut self,
608 room_id: RoomId,
609 recipient_connection_id: ConnectionId,
610 ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
611 let recipient_user_id = self.user_id_for_connection(recipient_connection_id)?;
612 let recipient = self
613 .connected_users
614 .get_mut(&recipient_user_id)
615 .ok_or_else(|| anyhow!("no such connection"))?;
616 if let Some(active_call) = recipient.active_call {
617 anyhow::ensure!(active_call.room_id == room_id, "no such room");
618 anyhow::ensure!(
619 active_call.connection_id.is_none(),
620 "cannot decline a call after joining room"
621 );
622 recipient.active_call.take();
623 let recipient_connection_ids = self
624 .connection_ids_for_user(recipient_user_id)
625 .collect::<Vec<_>>();
626 let room = self
627 .rooms
628 .get_mut(&active_call.room_id)
629 .ok_or_else(|| anyhow!("no such room"))?;
630 room.pending_participant_user_ids
631 .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
632 Ok((room, recipient_connection_ids))
633 } else {
634 Err(anyhow!("user is not being called"))
635 }
636 }
637
638 pub fn update_participant_location(
639 &mut self,
640 room_id: RoomId,
641 location: proto::ParticipantLocation,
642 connection_id: ConnectionId,
643 ) -> Result<&proto::Room> {
644 let room = self
645 .rooms
646 .get_mut(&room_id)
647 .ok_or_else(|| anyhow!("no such room"))?;
648 if let Some(proto::participant_location::Variant::SharedProject(project)) =
649 location.variant.as_ref()
650 {
651 anyhow::ensure!(
652 room.participants
653 .iter()
654 .flat_map(|participant| &participant.projects)
655 .any(|participant_project| participant_project.id == project.id),
656 "no such project"
657 );
658 }
659
660 let participant = room
661 .participants
662 .iter_mut()
663 .find(|participant| participant.peer_id == connection_id.0)
664 .ok_or_else(|| anyhow!("no such room"))?;
665 participant.location = Some(location);
666
667 Ok(room)
668 }
669
670 pub fn share_project(
671 &mut self,
672 room_id: RoomId,
673 project_id: ProjectId,
674 worktrees: Vec<proto::WorktreeMetadata>,
675 host_connection_id: ConnectionId,
676 ) -> Result<&proto::Room> {
677 let connection = self
678 .connections
679 .get_mut(&host_connection_id)
680 .ok_or_else(|| anyhow!("no such connection"))?;
681
682 let room = self
683 .rooms
684 .get_mut(&room_id)
685 .ok_or_else(|| anyhow!("no such room"))?;
686 let participant = room
687 .participants
688 .iter_mut()
689 .find(|participant| participant.peer_id == host_connection_id.0)
690 .ok_or_else(|| anyhow!("no such room"))?;
691
692 connection.projects.insert(project_id);
693 self.projects.insert(
694 project_id,
695 Project {
696 id: project_id,
697 room_id,
698 host_connection_id,
699 host: Collaborator {
700 user_id: connection.user_id,
701 replica_id: 0,
702 admin: connection.admin,
703 },
704 guests: Default::default(),
705 active_replica_ids: Default::default(),
706 worktrees: worktrees
707 .into_iter()
708 .map(|worktree| {
709 (
710 worktree.id,
711 Worktree {
712 root_name: worktree.root_name,
713 visible: worktree.visible,
714 ..Default::default()
715 },
716 )
717 })
718 .collect(),
719 language_servers: Default::default(),
720 },
721 );
722
723 participant
724 .projects
725 .extend(Self::build_participant_project(project_id, &self.projects));
726
727 Ok(room)
728 }
729
730 pub fn unshare_project(
731 &mut self,
732 project_id: ProjectId,
733 connection_id: ConnectionId,
734 ) -> Result<(&proto::Room, Project)> {
735 match self.projects.entry(project_id) {
736 btree_map::Entry::Occupied(e) => {
737 if e.get().host_connection_id == connection_id {
738 let project = e.remove();
739
740 if let Some(host_connection) = self.connections.get_mut(&connection_id) {
741 host_connection.projects.remove(&project_id);
742 }
743
744 for guest_connection in project.guests.keys() {
745 if let Some(connection) = self.connections.get_mut(guest_connection) {
746 connection.projects.remove(&project_id);
747 }
748 }
749
750 let room = self
751 .rooms
752 .get_mut(&project.room_id)
753 .ok_or_else(|| anyhow!("no such room"))?;
754 let participant = room
755 .participants
756 .iter_mut()
757 .find(|participant| participant.peer_id == connection_id.0)
758 .ok_or_else(|| anyhow!("no such room"))?;
759 participant
760 .projects
761 .retain(|project| project.id != project_id.to_proto());
762
763 Ok((room, project))
764 } else {
765 Err(anyhow!("no such project"))?
766 }
767 }
768 btree_map::Entry::Vacant(_) => Err(anyhow!("no such project"))?,
769 }
770 }
771
772 pub fn update_project(
773 &mut self,
774 project_id: ProjectId,
775 worktrees: &[proto::WorktreeMetadata],
776 connection_id: ConnectionId,
777 ) -> Result<&proto::Room> {
778 let project = self
779 .projects
780 .get_mut(&project_id)
781 .ok_or_else(|| anyhow!("no such project"))?;
782 if project.host_connection_id == connection_id {
783 let mut old_worktrees = mem::take(&mut project.worktrees);
784 for worktree in worktrees {
785 if let Some(old_worktree) = old_worktrees.remove(&worktree.id) {
786 project.worktrees.insert(worktree.id, old_worktree);
787 } else {
788 project.worktrees.insert(
789 worktree.id,
790 Worktree {
791 root_name: worktree.root_name.clone(),
792 visible: worktree.visible,
793 ..Default::default()
794 },
795 );
796 }
797 }
798
799 let room = self
800 .rooms
801 .get_mut(&project.room_id)
802 .ok_or_else(|| anyhow!("no such room"))?;
803 let participant_project = room
804 .participants
805 .iter_mut()
806 .flat_map(|participant| &mut participant.projects)
807 .find(|project| project.id == project_id.to_proto())
808 .ok_or_else(|| anyhow!("no such project"))?;
809 participant_project.worktree_root_names = worktrees
810 .iter()
811 .filter(|worktree| worktree.visible)
812 .map(|worktree| worktree.root_name.clone())
813 .collect();
814
815 Ok(room)
816 } else {
817 Err(anyhow!("no such project"))?
818 }
819 }
820
821 pub fn update_diagnostic_summary(
822 &mut self,
823 project_id: ProjectId,
824 worktree_id: u64,
825 connection_id: ConnectionId,
826 summary: proto::DiagnosticSummary,
827 ) -> Result<Vec<ConnectionId>> {
828 let project = self
829 .projects
830 .get_mut(&project_id)
831 .ok_or_else(|| anyhow!("no such project"))?;
832 if project.host_connection_id == connection_id {
833 let worktree = project
834 .worktrees
835 .get_mut(&worktree_id)
836 .ok_or_else(|| anyhow!("no such worktree"))?;
837 worktree
838 .diagnostic_summaries
839 .insert(summary.path.clone().into(), summary);
840 return Ok(project.connection_ids());
841 }
842
843 Err(anyhow!("no such worktree"))?
844 }
845
846 pub fn start_language_server(
847 &mut self,
848 project_id: ProjectId,
849 connection_id: ConnectionId,
850 language_server: proto::LanguageServer,
851 ) -> Result<Vec<ConnectionId>> {
852 let project = self
853 .projects
854 .get_mut(&project_id)
855 .ok_or_else(|| anyhow!("no such project"))?;
856 if project.host_connection_id == connection_id {
857 project.language_servers.push(language_server);
858 return Ok(project.connection_ids());
859 }
860
861 Err(anyhow!("no such project"))?
862 }
863
864 pub fn join_project(
865 &mut self,
866 requester_connection_id: ConnectionId,
867 project_id: ProjectId,
868 ) -> Result<(&Project, ReplicaId)> {
869 let connection = self
870 .connections
871 .get_mut(&requester_connection_id)
872 .ok_or_else(|| anyhow!("no such connection"))?;
873 let user = self
874 .connected_users
875 .get(&connection.user_id)
876 .ok_or_else(|| anyhow!("no such connection"))?;
877 let active_call = user.active_call.ok_or_else(|| anyhow!("no such project"))?;
878 anyhow::ensure!(
879 active_call.connection_id == Some(requester_connection_id),
880 "no such project"
881 );
882
883 let project = self
884 .projects
885 .get_mut(&project_id)
886 .ok_or_else(|| anyhow!("no such project"))?;
887 anyhow::ensure!(project.room_id == active_call.room_id, "no such project");
888
889 connection.projects.insert(project_id);
890 let mut replica_id = 1;
891 while project.active_replica_ids.contains(&replica_id) {
892 replica_id += 1;
893 }
894 project.active_replica_ids.insert(replica_id);
895 project.guests.insert(
896 requester_connection_id,
897 Collaborator {
898 replica_id,
899 user_id: connection.user_id,
900 admin: connection.admin,
901 },
902 );
903
904 Ok((project, replica_id))
905 }
906
907 pub fn leave_project(
908 &mut self,
909 project_id: ProjectId,
910 connection_id: ConnectionId,
911 ) -> Result<LeftProject> {
912 let project = self
913 .projects
914 .get_mut(&project_id)
915 .ok_or_else(|| anyhow!("no such project"))?;
916
917 // If the connection leaving the project is a collaborator, remove it.
918 let remove_collaborator = if let Some(guest) = project.guests.remove(&connection_id) {
919 project.active_replica_ids.remove(&guest.replica_id);
920 true
921 } else {
922 false
923 };
924
925 if let Some(connection) = self.connections.get_mut(&connection_id) {
926 connection.projects.remove(&project_id);
927 }
928
929 Ok(LeftProject {
930 id: project.id,
931 host_connection_id: project.host_connection_id,
932 host_user_id: project.host.user_id,
933 connection_ids: project.connection_ids(),
934 remove_collaborator,
935 })
936 }
937
938 #[allow(clippy::too_many_arguments)]
939 pub fn update_worktree(
940 &mut self,
941 connection_id: ConnectionId,
942 project_id: ProjectId,
943 worktree_id: u64,
944 worktree_root_name: &str,
945 removed_entries: &[u64],
946 updated_entries: &[proto::Entry],
947 scan_id: u64,
948 is_last_update: bool,
949 ) -> Result<Vec<ConnectionId>> {
950 let project = self.write_project(project_id, connection_id)?;
951
952 let connection_ids = project.connection_ids();
953 let mut worktree = project.worktrees.entry(worktree_id).or_default();
954 worktree.root_name = worktree_root_name.to_string();
955
956 for entry_id in removed_entries {
957 worktree.entries.remove(entry_id);
958 }
959
960 for entry in updated_entries {
961 worktree.entries.insert(entry.id, entry.clone());
962 }
963
964 worktree.scan_id = scan_id;
965 worktree.is_complete = is_last_update;
966 Ok(connection_ids)
967 }
968
969 fn build_participant_project(
970 project_id: ProjectId,
971 projects: &BTreeMap<ProjectId, Project>,
972 ) -> Option<proto::ParticipantProject> {
973 Some(proto::ParticipantProject {
974 id: project_id.to_proto(),
975 worktree_root_names: projects
976 .get(&project_id)?
977 .worktrees
978 .values()
979 .filter(|worktree| worktree.visible)
980 .map(|worktree| worktree.root_name.clone())
981 .collect(),
982 })
983 }
984
985 pub fn project_connection_ids(
986 &self,
987 project_id: ProjectId,
988 acting_connection_id: ConnectionId,
989 ) -> Result<Vec<ConnectionId>> {
990 Ok(self
991 .read_project(project_id, acting_connection_id)?
992 .connection_ids())
993 }
994
995 pub fn project(&self, project_id: ProjectId) -> Result<&Project> {
996 self.projects
997 .get(&project_id)
998 .ok_or_else(|| anyhow!("no such project"))
999 }
1000
1001 pub fn read_project(
1002 &self,
1003 project_id: ProjectId,
1004 connection_id: ConnectionId,
1005 ) -> Result<&Project> {
1006 let project = self
1007 .projects
1008 .get(&project_id)
1009 .ok_or_else(|| anyhow!("no such project"))?;
1010 if project.host_connection_id == connection_id
1011 || project.guests.contains_key(&connection_id)
1012 {
1013 Ok(project)
1014 } else {
1015 Err(anyhow!("no such project"))?
1016 }
1017 }
1018
1019 fn write_project(
1020 &mut self,
1021 project_id: ProjectId,
1022 connection_id: ConnectionId,
1023 ) -> Result<&mut Project> {
1024 let project = self
1025 .projects
1026 .get_mut(&project_id)
1027 .ok_or_else(|| anyhow!("no such project"))?;
1028 if project.host_connection_id == connection_id
1029 || project.guests.contains_key(&connection_id)
1030 {
1031 Ok(project)
1032 } else {
1033 Err(anyhow!("no such project"))?
1034 }
1035 }
1036
1037 #[cfg(test)]
1038 pub fn check_invariants(&self) {
1039 for (connection_id, connection) in &self.connections {
1040 for project_id in &connection.projects {
1041 let project = &self.projects.get(project_id).unwrap();
1042 if project.host_connection_id != *connection_id {
1043 assert!(project.guests.contains_key(connection_id));
1044 }
1045
1046 for (worktree_id, worktree) in project.worktrees.iter() {
1047 let mut paths = HashMap::default();
1048 for entry in worktree.entries.values() {
1049 let prev_entry = paths.insert(&entry.path, entry);
1050 assert_eq!(
1051 prev_entry,
1052 None,
1053 "worktree {:?}, duplicate path for entries {:?} and {:?}",
1054 worktree_id,
1055 prev_entry.unwrap(),
1056 entry
1057 );
1058 }
1059 }
1060 }
1061
1062 assert!(self
1063 .connected_users
1064 .get(&connection.user_id)
1065 .unwrap()
1066 .connection_ids
1067 .contains(connection_id));
1068 }
1069
1070 for (user_id, state) in &self.connected_users {
1071 for connection_id in &state.connection_ids {
1072 assert_eq!(
1073 self.connections.get(connection_id).unwrap().user_id,
1074 *user_id
1075 );
1076 }
1077
1078 if let Some(active_call) = state.active_call.as_ref() {
1079 if let Some(active_call_connection_id) = active_call.connection_id {
1080 assert!(
1081 state.connection_ids.contains(&active_call_connection_id),
1082 "call is active on a dead connection"
1083 );
1084 assert!(
1085 state.connection_ids.contains(&active_call_connection_id),
1086 "call is active on a dead connection"
1087 );
1088 }
1089 }
1090 }
1091
1092 for (room_id, room) in &self.rooms {
1093 for pending_user_id in &room.pending_participant_user_ids {
1094 assert!(
1095 self.connected_users
1096 .contains_key(&UserId::from_proto(*pending_user_id)),
1097 "call is active on a user that has disconnected"
1098 );
1099 }
1100
1101 for participant in &room.participants {
1102 assert!(
1103 self.connections
1104 .contains_key(&ConnectionId(participant.peer_id)),
1105 "room {} contains participant {:?} that has disconnected",
1106 room_id,
1107 participant
1108 );
1109
1110 for participant_project in &participant.projects {
1111 let project = &self.projects[&ProjectId::from_proto(participant_project.id)];
1112 assert_eq!(
1113 project.room_id, *room_id,
1114 "project was shared on a different room"
1115 );
1116 }
1117 }
1118
1119 assert!(
1120 !room.pending_participant_user_ids.is_empty() || !room.participants.is_empty(),
1121 "room can't be empty"
1122 );
1123 }
1124
1125 for (project_id, project) in &self.projects {
1126 let host_connection = self.connections.get(&project.host_connection_id).unwrap();
1127 assert!(host_connection.projects.contains(project_id));
1128
1129 for guest_connection_id in project.guests.keys() {
1130 let guest_connection = self.connections.get(guest_connection_id).unwrap();
1131 assert!(guest_connection.projects.contains(project_id));
1132 }
1133 assert_eq!(project.active_replica_ids.len(), project.guests.len());
1134 assert_eq!(
1135 project.active_replica_ids,
1136 project
1137 .guests
1138 .values()
1139 .map(|guest| guest.replica_id)
1140 .collect::<HashSet<_>>(),
1141 );
1142
1143 let room = &self.rooms[&project.room_id];
1144 let room_participant = room
1145 .participants
1146 .iter()
1147 .find(|participant| participant.peer_id == project.host_connection_id.0)
1148 .unwrap();
1149 assert!(
1150 room_participant
1151 .projects
1152 .iter()
1153 .any(|project| project.id == project_id.to_proto()),
1154 "project was not shared in room"
1155 );
1156 }
1157 }
1158}
1159
1160impl Project {
1161 pub fn guest_connection_ids(&self) -> Vec<ConnectionId> {
1162 self.guests.keys().copied().collect()
1163 }
1164
1165 pub fn connection_ids(&self) -> Vec<ConnectionId> {
1166 self.guests
1167 .keys()
1168 .copied()
1169 .chain(Some(self.host_connection_id))
1170 .collect()
1171 }
1172}