1use crate::db::{self, ProjectId, UserId};
2use anyhow::{anyhow, Result};
3use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet};
4use nanoid::nanoid;
5use rpc::{proto, ConnectionId};
6use serde::Serialize;
7use std::{borrow::Cow, mem, path::PathBuf, str};
8use tracing::instrument;
9use util::post_inc;
10
11pub type RoomId = u64;
12
13#[derive(Default, Serialize)]
14pub struct Store {
15 connections: BTreeMap<ConnectionId, ConnectionState>,
16 connected_users: BTreeMap<UserId, ConnectedUser>,
17 next_room_id: RoomId,
18 rooms: BTreeMap<RoomId, proto::Room>,
19 projects: BTreeMap<ProjectId, Project>,
20}
21
22#[derive(Default, Serialize)]
23struct ConnectedUser {
24 connection_ids: HashSet<ConnectionId>,
25 active_call: Option<Call>,
26}
27
28#[derive(Serialize)]
29struct ConnectionState {
30 user_id: UserId,
31 admin: bool,
32 projects: BTreeSet<ProjectId>,
33}
34
35#[derive(Copy, Clone, Eq, PartialEq, Serialize)]
36pub struct Call {
37 pub caller_user_id: UserId,
38 pub room_id: RoomId,
39 pub connection_id: Option<ConnectionId>,
40 pub initial_project_id: Option<ProjectId>,
41}
42
43#[derive(Serialize)]
44pub struct Project {
45 pub id: ProjectId,
46 pub room_id: RoomId,
47 pub host_connection_id: ConnectionId,
48 pub host: Collaborator,
49 pub guests: HashMap<ConnectionId, Collaborator>,
50 pub active_replica_ids: HashSet<ReplicaId>,
51 pub worktrees: BTreeMap<u64, Worktree>,
52 pub language_servers: Vec<proto::LanguageServer>,
53}
54
55#[derive(Serialize)]
56pub struct Collaborator {
57 pub replica_id: ReplicaId,
58 pub user_id: UserId,
59 pub admin: bool,
60}
61
62#[derive(Default, Serialize)]
63pub struct Worktree {
64 pub abs_path: Vec<u8>,
65 pub root_name: String,
66 pub visible: bool,
67 #[serde(skip)]
68 pub entries: BTreeMap<u64, proto::Entry>,
69 #[serde(skip)]
70 pub diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
71 pub scan_id: u64,
72 pub is_complete: bool,
73}
74
75pub type ReplicaId = u16;
76
77#[derive(Default)]
78pub struct RemovedConnectionState<'a> {
79 pub user_id: UserId,
80 pub hosted_projects: Vec<Project>,
81 pub guest_projects: Vec<LeftProject>,
82 pub contact_ids: HashSet<UserId>,
83 pub room: Option<Cow<'a, proto::Room>>,
84 pub canceled_call_connection_ids: Vec<ConnectionId>,
85}
86
87pub struct LeftProject {
88 pub id: ProjectId,
89 pub host_user_id: UserId,
90 pub host_connection_id: ConnectionId,
91 pub connection_ids: Vec<ConnectionId>,
92 pub remove_collaborator: bool,
93}
94
95pub struct LeftRoom<'a> {
96 pub room: Cow<'a, proto::Room>,
97 pub unshared_projects: Vec<Project>,
98 pub left_projects: Vec<LeftProject>,
99 pub canceled_call_connection_ids: Vec<ConnectionId>,
100}
101
102#[derive(Copy, Clone)]
103pub struct Metrics {
104 pub connections: usize,
105 pub shared_projects: usize,
106}
107
108impl Store {
109 pub fn metrics(&self) -> Metrics {
110 let connections = self.connections.values().filter(|c| !c.admin).count();
111 let mut shared_projects = 0;
112 for project in self.projects.values() {
113 if let Some(connection) = self.connections.get(&project.host_connection_id) {
114 if !connection.admin {
115 shared_projects += 1;
116 }
117 }
118 }
119
120 Metrics {
121 connections,
122 shared_projects,
123 }
124 }
125
126 #[instrument(skip(self))]
127 pub fn add_connection(
128 &mut self,
129 connection_id: ConnectionId,
130 user_id: UserId,
131 admin: bool,
132 ) -> Option<proto::IncomingCall> {
133 self.connections.insert(
134 connection_id,
135 ConnectionState {
136 user_id,
137 admin,
138 projects: Default::default(),
139 },
140 );
141 let connected_user = self.connected_users.entry(user_id).or_default();
142 connected_user.connection_ids.insert(connection_id);
143 if let Some(active_call) = connected_user.active_call {
144 if active_call.connection_id.is_some() {
145 None
146 } else {
147 let room = self.room(active_call.room_id)?;
148 Some(proto::IncomingCall {
149 room_id: active_call.room_id,
150 caller_user_id: active_call.caller_user_id.to_proto(),
151 participant_user_ids: room
152 .participants
153 .iter()
154 .map(|participant| participant.user_id)
155 .collect(),
156 initial_project: active_call
157 .initial_project_id
158 .and_then(|id| Self::build_participant_project(id, &self.projects)),
159 })
160 }
161 } else {
162 None
163 }
164 }
165
166 #[instrument(skip(self))]
167 pub fn remove_connection(
168 &mut self,
169 connection_id: ConnectionId,
170 ) -> Result<RemovedConnectionState> {
171 let connection = self
172 .connections
173 .get_mut(&connection_id)
174 .ok_or_else(|| anyhow!("no such connection"))?;
175
176 let user_id = connection.user_id;
177
178 let mut result = RemovedConnectionState {
179 user_id,
180 ..Default::default()
181 };
182
183 let connected_user = self.connected_users.get(&user_id).unwrap();
184 if let Some(active_call) = connected_user.active_call.as_ref() {
185 let room_id = active_call.room_id;
186 if active_call.connection_id == Some(connection_id) {
187 let left_room = self.leave_room(room_id, connection_id)?;
188 result.hosted_projects = left_room.unshared_projects;
189 result.guest_projects = left_room.left_projects;
190 result.room = Some(Cow::Owned(left_room.room.into_owned()));
191 result.canceled_call_connection_ids = left_room.canceled_call_connection_ids;
192 } else if connected_user.connection_ids.len() == 1 {
193 let (room, _) = self.decline_call(room_id, connection_id)?;
194 result.room = Some(Cow::Owned(room.clone()));
195 }
196 }
197
198 let connected_user = self.connected_users.get_mut(&user_id).unwrap();
199 connected_user.connection_ids.remove(&connection_id);
200 if connected_user.connection_ids.is_empty() {
201 self.connected_users.remove(&user_id);
202 }
203 self.connections.remove(&connection_id).unwrap();
204
205 Ok(result)
206 }
207
208 pub fn user_id_for_connection(&self, connection_id: ConnectionId) -> Result<UserId> {
209 Ok(self
210 .connections
211 .get(&connection_id)
212 .ok_or_else(|| anyhow!("unknown connection"))?
213 .user_id)
214 }
215
216 pub fn connection_ids_for_user(
217 &self,
218 user_id: UserId,
219 ) -> impl Iterator<Item = ConnectionId> + '_ {
220 self.connected_users
221 .get(&user_id)
222 .into_iter()
223 .map(|state| &state.connection_ids)
224 .flatten()
225 .copied()
226 }
227
228 pub fn is_user_online(&self, user_id: UserId) -> bool {
229 !self
230 .connected_users
231 .get(&user_id)
232 .unwrap_or(&Default::default())
233 .connection_ids
234 .is_empty()
235 }
236
237 fn is_user_busy(&self, user_id: UserId) -> bool {
238 self.connected_users
239 .get(&user_id)
240 .unwrap_or(&Default::default())
241 .active_call
242 .is_some()
243 }
244
245 pub fn build_initial_contacts_update(
246 &self,
247 contacts: Vec<db::Contact>,
248 ) -> proto::UpdateContacts {
249 let mut update = proto::UpdateContacts::default();
250
251 for contact in contacts {
252 match contact {
253 db::Contact::Accepted {
254 user_id,
255 should_notify,
256 } => {
257 update
258 .contacts
259 .push(self.contact_for_user(user_id, should_notify));
260 }
261 db::Contact::Outgoing { user_id } => {
262 update.outgoing_requests.push(user_id.to_proto())
263 }
264 db::Contact::Incoming {
265 user_id,
266 should_notify,
267 } => update
268 .incoming_requests
269 .push(proto::IncomingContactRequest {
270 requester_id: user_id.to_proto(),
271 should_notify,
272 }),
273 }
274 }
275
276 update
277 }
278
279 pub fn contact_for_user(&self, user_id: UserId, should_notify: bool) -> proto::Contact {
280 proto::Contact {
281 user_id: user_id.to_proto(),
282 online: self.is_user_online(user_id),
283 busy: self.is_user_busy(user_id),
284 should_notify,
285 }
286 }
287
288 pub fn create_room(&mut self, creator_connection_id: ConnectionId) -> Result<&proto::Room> {
289 let connection = self
290 .connections
291 .get_mut(&creator_connection_id)
292 .ok_or_else(|| anyhow!("no such connection"))?;
293 let connected_user = self
294 .connected_users
295 .get_mut(&connection.user_id)
296 .ok_or_else(|| anyhow!("no such connection"))?;
297 anyhow::ensure!(
298 connected_user.active_call.is_none(),
299 "can't create a room with an active call"
300 );
301
302 let room_id = post_inc(&mut self.next_room_id);
303 let room = proto::Room {
304 id: room_id,
305 participants: vec![proto::Participant {
306 user_id: connection.user_id.to_proto(),
307 peer_id: creator_connection_id.0,
308 projects: Default::default(),
309 location: Some(proto::ParticipantLocation {
310 variant: Some(proto::participant_location::Variant::External(
311 proto::participant_location::External {},
312 )),
313 }),
314 }],
315 pending_participant_user_ids: Default::default(),
316 live_kit_room: nanoid!(30),
317 };
318
319 self.rooms.insert(room_id, room);
320 connected_user.active_call = Some(Call {
321 caller_user_id: connection.user_id,
322 room_id,
323 connection_id: Some(creator_connection_id),
324 initial_project_id: None,
325 });
326 Ok(self.rooms.get(&room_id).unwrap())
327 }
328
329 pub fn join_room(
330 &mut self,
331 room_id: RoomId,
332 connection_id: ConnectionId,
333 ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
334 let connection = self
335 .connections
336 .get_mut(&connection_id)
337 .ok_or_else(|| anyhow!("no such connection"))?;
338 let user_id = connection.user_id;
339 let recipient_connection_ids = self.connection_ids_for_user(user_id).collect::<Vec<_>>();
340
341 let connected_user = self
342 .connected_users
343 .get_mut(&user_id)
344 .ok_or_else(|| anyhow!("no such connection"))?;
345 let active_call = connected_user
346 .active_call
347 .as_mut()
348 .ok_or_else(|| anyhow!("not being called"))?;
349 anyhow::ensure!(
350 active_call.room_id == room_id && active_call.connection_id.is_none(),
351 "not being called on this room"
352 );
353
354 let room = self
355 .rooms
356 .get_mut(&room_id)
357 .ok_or_else(|| anyhow!("no such room"))?;
358 anyhow::ensure!(
359 room.pending_participant_user_ids
360 .contains(&user_id.to_proto()),
361 anyhow!("no such room")
362 );
363 room.pending_participant_user_ids
364 .retain(|pending| *pending != user_id.to_proto());
365 room.participants.push(proto::Participant {
366 user_id: user_id.to_proto(),
367 peer_id: connection_id.0,
368 projects: Default::default(),
369 location: Some(proto::ParticipantLocation {
370 variant: Some(proto::participant_location::Variant::External(
371 proto::participant_location::External {},
372 )),
373 }),
374 });
375 active_call.connection_id = Some(connection_id);
376
377 Ok((room, recipient_connection_ids))
378 }
379
380 pub fn leave_room(&mut self, room_id: RoomId, connection_id: ConnectionId) -> Result<LeftRoom> {
381 let connection = self
382 .connections
383 .get_mut(&connection_id)
384 .ok_or_else(|| anyhow!("no such connection"))?;
385 let user_id = connection.user_id;
386
387 let connected_user = self
388 .connected_users
389 .get(&user_id)
390 .ok_or_else(|| anyhow!("no such connection"))?;
391 anyhow::ensure!(
392 connected_user
393 .active_call
394 .map_or(false, |call| call.room_id == room_id
395 && call.connection_id == Some(connection_id)),
396 "cannot leave a room before joining it"
397 );
398
399 // Given that users can only join one room at a time, we can safely unshare
400 // and leave all projects associated with the connection.
401 let mut unshared_projects = Vec::new();
402 let mut left_projects = Vec::new();
403 for project_id in connection.projects.clone() {
404 if let Ok((_, project)) = self.unshare_project(project_id, connection_id) {
405 unshared_projects.push(project);
406 } else if let Ok(project) = self.leave_project(project_id, connection_id) {
407 left_projects.push(project);
408 }
409 }
410 self.connected_users.get_mut(&user_id).unwrap().active_call = None;
411
412 let room = self
413 .rooms
414 .get_mut(&room_id)
415 .ok_or_else(|| anyhow!("no such room"))?;
416 room.participants
417 .retain(|participant| participant.peer_id != connection_id.0);
418
419 let mut canceled_call_connection_ids = Vec::new();
420 room.pending_participant_user_ids
421 .retain(|pending_participant_user_id| {
422 if let Some(connected_user) = self
423 .connected_users
424 .get_mut(&UserId::from_proto(*pending_participant_user_id))
425 {
426 if let Some(call) = connected_user.active_call.as_ref() {
427 if call.caller_user_id == user_id {
428 connected_user.active_call.take();
429 canceled_call_connection_ids
430 .extend(connected_user.connection_ids.iter().copied());
431 false
432 } else {
433 true
434 }
435 } else {
436 true
437 }
438 } else {
439 true
440 }
441 });
442
443 let room = if room.participants.is_empty() {
444 Cow::Owned(self.rooms.remove(&room_id).unwrap())
445 } else {
446 Cow::Borrowed(self.rooms.get(&room_id).unwrap())
447 };
448
449 Ok(LeftRoom {
450 room,
451 unshared_projects,
452 left_projects,
453 canceled_call_connection_ids,
454 })
455 }
456
457 pub fn room(&self, room_id: RoomId) -> Option<&proto::Room> {
458 self.rooms.get(&room_id)
459 }
460
461 pub fn rooms(&self) -> &BTreeMap<RoomId, proto::Room> {
462 &self.rooms
463 }
464
465 pub fn call(
466 &mut self,
467 room_id: RoomId,
468 recipient_user_id: UserId,
469 initial_project_id: Option<ProjectId>,
470 from_connection_id: ConnectionId,
471 ) -> Result<(&proto::Room, Vec<ConnectionId>, proto::IncomingCall)> {
472 let caller_user_id = self.user_id_for_connection(from_connection_id)?;
473
474 let recipient_connection_ids = self
475 .connection_ids_for_user(recipient_user_id)
476 .collect::<Vec<_>>();
477 let mut recipient = self
478 .connected_users
479 .get_mut(&recipient_user_id)
480 .ok_or_else(|| anyhow!("no such connection"))?;
481 anyhow::ensure!(
482 recipient.active_call.is_none(),
483 "recipient is already on another call"
484 );
485
486 let room = self
487 .rooms
488 .get_mut(&room_id)
489 .ok_or_else(|| anyhow!("no such room"))?;
490 anyhow::ensure!(
491 room.participants
492 .iter()
493 .any(|participant| participant.peer_id == from_connection_id.0),
494 "no such room"
495 );
496 anyhow::ensure!(
497 room.pending_participant_user_ids
498 .iter()
499 .all(|user_id| UserId::from_proto(*user_id) != recipient_user_id),
500 "cannot call the same user more than once"
501 );
502 room.pending_participant_user_ids
503 .push(recipient_user_id.to_proto());
504
505 if let Some(initial_project_id) = initial_project_id {
506 let project = self
507 .projects
508 .get(&initial_project_id)
509 .ok_or_else(|| anyhow!("no such project"))?;
510 anyhow::ensure!(project.room_id == room_id, "no such project");
511 }
512
513 recipient.active_call = Some(Call {
514 caller_user_id,
515 room_id,
516 connection_id: None,
517 initial_project_id,
518 });
519
520 Ok((
521 room,
522 recipient_connection_ids,
523 proto::IncomingCall {
524 room_id,
525 caller_user_id: caller_user_id.to_proto(),
526 participant_user_ids: room
527 .participants
528 .iter()
529 .map(|participant| participant.user_id)
530 .collect(),
531 initial_project: initial_project_id
532 .and_then(|id| Self::build_participant_project(id, &self.projects)),
533 },
534 ))
535 }
536
537 pub fn call_failed(&mut self, room_id: RoomId, to_user_id: UserId) -> Result<&proto::Room> {
538 let mut recipient = self
539 .connected_users
540 .get_mut(&to_user_id)
541 .ok_or_else(|| anyhow!("no such connection"))?;
542 anyhow::ensure!(recipient
543 .active_call
544 .map_or(false, |call| call.room_id == room_id
545 && call.connection_id.is_none()));
546 recipient.active_call = None;
547 let room = self
548 .rooms
549 .get_mut(&room_id)
550 .ok_or_else(|| anyhow!("no such room"))?;
551 room.pending_participant_user_ids
552 .retain(|user_id| UserId::from_proto(*user_id) != to_user_id);
553 Ok(room)
554 }
555
556 pub fn cancel_call(
557 &mut self,
558 room_id: RoomId,
559 recipient_user_id: UserId,
560 canceller_connection_id: ConnectionId,
561 ) -> Result<(&proto::Room, HashSet<ConnectionId>)> {
562 let canceller_user_id = self.user_id_for_connection(canceller_connection_id)?;
563 let canceller = self
564 .connected_users
565 .get(&canceller_user_id)
566 .ok_or_else(|| anyhow!("no such connection"))?;
567 let recipient = self
568 .connected_users
569 .get(&recipient_user_id)
570 .ok_or_else(|| anyhow!("no such connection"))?;
571 let canceller_active_call = canceller
572 .active_call
573 .as_ref()
574 .ok_or_else(|| anyhow!("no active call"))?;
575 let recipient_active_call = recipient
576 .active_call
577 .as_ref()
578 .ok_or_else(|| anyhow!("no active call for recipient"))?;
579
580 anyhow::ensure!(
581 canceller_active_call.room_id == room_id,
582 "users are on different calls"
583 );
584 anyhow::ensure!(
585 recipient_active_call.room_id == room_id,
586 "users are on different calls"
587 );
588 anyhow::ensure!(
589 recipient_active_call.connection_id.is_none(),
590 "recipient has already answered"
591 );
592 let room_id = recipient_active_call.room_id;
593 let room = self
594 .rooms
595 .get_mut(&room_id)
596 .ok_or_else(|| anyhow!("no such room"))?;
597 room.pending_participant_user_ids
598 .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
599
600 let recipient = self.connected_users.get_mut(&recipient_user_id).unwrap();
601 recipient.active_call.take();
602
603 Ok((room, recipient.connection_ids.clone()))
604 }
605
606 pub fn decline_call(
607 &mut self,
608 room_id: RoomId,
609 recipient_connection_id: ConnectionId,
610 ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
611 let recipient_user_id = self.user_id_for_connection(recipient_connection_id)?;
612 let recipient = self
613 .connected_users
614 .get_mut(&recipient_user_id)
615 .ok_or_else(|| anyhow!("no such connection"))?;
616 if let Some(active_call) = recipient.active_call {
617 anyhow::ensure!(active_call.room_id == room_id, "no such room");
618 anyhow::ensure!(
619 active_call.connection_id.is_none(),
620 "cannot decline a call after joining room"
621 );
622 recipient.active_call.take();
623 let recipient_connection_ids = self
624 .connection_ids_for_user(recipient_user_id)
625 .collect::<Vec<_>>();
626 let room = self
627 .rooms
628 .get_mut(&active_call.room_id)
629 .ok_or_else(|| anyhow!("no such room"))?;
630 room.pending_participant_user_ids
631 .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
632 Ok((room, recipient_connection_ids))
633 } else {
634 Err(anyhow!("user is not being called"))
635 }
636 }
637
638 pub fn update_participant_location(
639 &mut self,
640 room_id: RoomId,
641 location: proto::ParticipantLocation,
642 connection_id: ConnectionId,
643 ) -> Result<&proto::Room> {
644 let room = self
645 .rooms
646 .get_mut(&room_id)
647 .ok_or_else(|| anyhow!("no such room"))?;
648 if let Some(proto::participant_location::Variant::SharedProject(project)) =
649 location.variant.as_ref()
650 {
651 anyhow::ensure!(
652 room.participants
653 .iter()
654 .flat_map(|participant| &participant.projects)
655 .any(|participant_project| participant_project.id == project.id),
656 "no such project"
657 );
658 }
659
660 let participant = room
661 .participants
662 .iter_mut()
663 .find(|participant| participant.peer_id == connection_id.0)
664 .ok_or_else(|| anyhow!("no such room"))?;
665 participant.location = Some(location);
666
667 Ok(room)
668 }
669
670 pub fn share_project(
671 &mut self,
672 room_id: RoomId,
673 project_id: ProjectId,
674 worktrees: Vec<proto::WorktreeMetadata>,
675 host_connection_id: ConnectionId,
676 ) -> Result<&proto::Room> {
677 let connection = self
678 .connections
679 .get_mut(&host_connection_id)
680 .ok_or_else(|| anyhow!("no such connection"))?;
681
682 let room = self
683 .rooms
684 .get_mut(&room_id)
685 .ok_or_else(|| anyhow!("no such room"))?;
686 let participant = room
687 .participants
688 .iter_mut()
689 .find(|participant| participant.peer_id == host_connection_id.0)
690 .ok_or_else(|| anyhow!("no such room"))?;
691
692 connection.projects.insert(project_id);
693 self.projects.insert(
694 project_id,
695 Project {
696 id: project_id,
697 room_id,
698 host_connection_id,
699 host: Collaborator {
700 user_id: connection.user_id,
701 replica_id: 0,
702 admin: connection.admin,
703 },
704 guests: Default::default(),
705 active_replica_ids: Default::default(),
706 worktrees: worktrees
707 .into_iter()
708 .map(|worktree| {
709 (
710 worktree.id,
711 Worktree {
712 root_name: worktree.root_name,
713 visible: worktree.visible,
714 abs_path: worktree.abs_path.clone(),
715 entries: Default::default(),
716 diagnostic_summaries: Default::default(),
717 scan_id: Default::default(),
718 is_complete: Default::default(),
719 },
720 )
721 })
722 .collect(),
723 language_servers: Default::default(),
724 },
725 );
726
727 participant
728 .projects
729 .extend(Self::build_participant_project(project_id, &self.projects));
730
731 Ok(room)
732 }
733
734 pub fn unshare_project(
735 &mut self,
736 project_id: ProjectId,
737 connection_id: ConnectionId,
738 ) -> Result<(&proto::Room, Project)> {
739 match self.projects.entry(project_id) {
740 btree_map::Entry::Occupied(e) => {
741 if e.get().host_connection_id == connection_id {
742 let project = e.remove();
743
744 if let Some(host_connection) = self.connections.get_mut(&connection_id) {
745 host_connection.projects.remove(&project_id);
746 }
747
748 for guest_connection in project.guests.keys() {
749 if let Some(connection) = self.connections.get_mut(guest_connection) {
750 connection.projects.remove(&project_id);
751 }
752 }
753
754 let room = self
755 .rooms
756 .get_mut(&project.room_id)
757 .ok_or_else(|| anyhow!("no such room"))?;
758 let participant = room
759 .participants
760 .iter_mut()
761 .find(|participant| participant.peer_id == connection_id.0)
762 .ok_or_else(|| anyhow!("no such room"))?;
763 participant
764 .projects
765 .retain(|project| project.id != project_id.to_proto());
766
767 Ok((room, project))
768 } else {
769 Err(anyhow!("no such project"))?
770 }
771 }
772 btree_map::Entry::Vacant(_) => Err(anyhow!("no such project"))?,
773 }
774 }
775
776 pub fn update_project(
777 &mut self,
778 project_id: ProjectId,
779 worktrees: &[proto::WorktreeMetadata],
780 connection_id: ConnectionId,
781 ) -> Result<&proto::Room> {
782 let project = self
783 .projects
784 .get_mut(&project_id)
785 .ok_or_else(|| anyhow!("no such project"))?;
786 if project.host_connection_id == connection_id {
787 let mut old_worktrees = mem::take(&mut project.worktrees);
788 for worktree in worktrees {
789 if let Some(old_worktree) = old_worktrees.remove(&worktree.id) {
790 project.worktrees.insert(worktree.id, old_worktree);
791 } else {
792 project.worktrees.insert(
793 worktree.id,
794 Worktree {
795 root_name: worktree.root_name.clone(),
796 visible: worktree.visible,
797 abs_path: worktree.abs_path.clone(),
798 entries: Default::default(),
799 diagnostic_summaries: Default::default(),
800 scan_id: Default::default(),
801 is_complete: false,
802 },
803 );
804 }
805 }
806
807 let room = self
808 .rooms
809 .get_mut(&project.room_id)
810 .ok_or_else(|| anyhow!("no such room"))?;
811 let participant_project = room
812 .participants
813 .iter_mut()
814 .flat_map(|participant| &mut participant.projects)
815 .find(|project| project.id == project_id.to_proto())
816 .ok_or_else(|| anyhow!("no such project"))?;
817 participant_project.worktree_root_names = worktrees
818 .iter()
819 .filter(|worktree| worktree.visible)
820 .map(|worktree| worktree.root_name.clone())
821 .collect();
822
823 Ok(room)
824 } else {
825 Err(anyhow!("no such project"))?
826 }
827 }
828
829 pub fn update_diagnostic_summary(
830 &mut self,
831 project_id: ProjectId,
832 worktree_id: u64,
833 connection_id: ConnectionId,
834 summary: proto::DiagnosticSummary,
835 ) -> Result<Vec<ConnectionId>> {
836 let project = self
837 .projects
838 .get_mut(&project_id)
839 .ok_or_else(|| anyhow!("no such project"))?;
840 if project.host_connection_id == connection_id {
841 let worktree = project
842 .worktrees
843 .get_mut(&worktree_id)
844 .ok_or_else(|| anyhow!("no such worktree"))?;
845 worktree
846 .diagnostic_summaries
847 .insert(summary.path.clone().into(), summary);
848 return Ok(project.connection_ids());
849 }
850
851 Err(anyhow!("no such worktree"))?
852 }
853
854 pub fn start_language_server(
855 &mut self,
856 project_id: ProjectId,
857 connection_id: ConnectionId,
858 language_server: proto::LanguageServer,
859 ) -> Result<Vec<ConnectionId>> {
860 let project = self
861 .projects
862 .get_mut(&project_id)
863 .ok_or_else(|| anyhow!("no such project"))?;
864 if project.host_connection_id == connection_id {
865 project.language_servers.push(language_server);
866 return Ok(project.connection_ids());
867 }
868
869 Err(anyhow!("no such project"))?
870 }
871
872 pub fn join_project(
873 &mut self,
874 requester_connection_id: ConnectionId,
875 project_id: ProjectId,
876 ) -> Result<(&Project, ReplicaId)> {
877 let connection = self
878 .connections
879 .get_mut(&requester_connection_id)
880 .ok_or_else(|| anyhow!("no such connection"))?;
881 let user = self
882 .connected_users
883 .get(&connection.user_id)
884 .ok_or_else(|| anyhow!("no such connection"))?;
885 let active_call = user.active_call.ok_or_else(|| anyhow!("no such project"))?;
886 anyhow::ensure!(
887 active_call.connection_id == Some(requester_connection_id),
888 "no such project"
889 );
890
891 let project = self
892 .projects
893 .get_mut(&project_id)
894 .ok_or_else(|| anyhow!("no such project"))?;
895 anyhow::ensure!(project.room_id == active_call.room_id, "no such project");
896
897 connection.projects.insert(project_id);
898 let mut replica_id = 1;
899 while project.active_replica_ids.contains(&replica_id) {
900 replica_id += 1;
901 }
902 project.active_replica_ids.insert(replica_id);
903 project.guests.insert(
904 requester_connection_id,
905 Collaborator {
906 replica_id,
907 user_id: connection.user_id,
908 admin: connection.admin,
909 },
910 );
911
912 Ok((project, replica_id))
913 }
914
915 pub fn leave_project(
916 &mut self,
917 project_id: ProjectId,
918 connection_id: ConnectionId,
919 ) -> Result<LeftProject> {
920 let project = self
921 .projects
922 .get_mut(&project_id)
923 .ok_or_else(|| anyhow!("no such project"))?;
924
925 // If the connection leaving the project is a collaborator, remove it.
926 let remove_collaborator = if let Some(guest) = project.guests.remove(&connection_id) {
927 project.active_replica_ids.remove(&guest.replica_id);
928 true
929 } else {
930 false
931 };
932
933 if let Some(connection) = self.connections.get_mut(&connection_id) {
934 connection.projects.remove(&project_id);
935 }
936
937 Ok(LeftProject {
938 id: project.id,
939 host_connection_id: project.host_connection_id,
940 host_user_id: project.host.user_id,
941 connection_ids: project.connection_ids(),
942 remove_collaborator,
943 })
944 }
945
946 #[allow(clippy::too_many_arguments)]
947 pub fn update_worktree(
948 &mut self,
949 connection_id: ConnectionId,
950 project_id: ProjectId,
951 worktree_id: u64,
952 worktree_root_name: &str,
953 worktree_abs_path: &[u8],
954 removed_entries: &[u64],
955 updated_entries: &[proto::Entry],
956 scan_id: u64,
957 is_last_update: bool,
958 ) -> Result<Vec<ConnectionId>> {
959 let project = self.write_project(project_id, connection_id)?;
960
961 let connection_ids = project.connection_ids();
962 let mut worktree = project.worktrees.entry(worktree_id).or_default();
963 worktree.root_name = worktree_root_name.to_string();
964 worktree.abs_path = worktree_abs_path.to_vec();
965
966 for entry_id in removed_entries {
967 worktree.entries.remove(entry_id);
968 }
969
970 for entry in updated_entries {
971 worktree.entries.insert(entry.id, entry.clone());
972 }
973
974 worktree.scan_id = scan_id;
975 worktree.is_complete = is_last_update;
976 Ok(connection_ids)
977 }
978
979 fn build_participant_project(
980 project_id: ProjectId,
981 projects: &BTreeMap<ProjectId, Project>,
982 ) -> Option<proto::ParticipantProject> {
983 Some(proto::ParticipantProject {
984 id: project_id.to_proto(),
985 worktree_root_names: projects
986 .get(&project_id)?
987 .worktrees
988 .values()
989 .filter(|worktree| worktree.visible)
990 .map(|worktree| worktree.root_name.clone())
991 .collect(),
992 })
993 }
994
995 pub fn project_connection_ids(
996 &self,
997 project_id: ProjectId,
998 acting_connection_id: ConnectionId,
999 ) -> Result<Vec<ConnectionId>> {
1000 Ok(self
1001 .read_project(project_id, acting_connection_id)?
1002 .connection_ids())
1003 }
1004
1005 pub fn project(&self, project_id: ProjectId) -> Result<&Project> {
1006 self.projects
1007 .get(&project_id)
1008 .ok_or_else(|| anyhow!("no such project"))
1009 }
1010
1011 pub fn read_project(
1012 &self,
1013 project_id: ProjectId,
1014 connection_id: ConnectionId,
1015 ) -> Result<&Project> {
1016 let project = self
1017 .projects
1018 .get(&project_id)
1019 .ok_or_else(|| anyhow!("no such project"))?;
1020 if project.host_connection_id == connection_id
1021 || project.guests.contains_key(&connection_id)
1022 {
1023 Ok(project)
1024 } else {
1025 Err(anyhow!("no such project"))?
1026 }
1027 }
1028
1029 fn write_project(
1030 &mut self,
1031 project_id: ProjectId,
1032 connection_id: ConnectionId,
1033 ) -> Result<&mut Project> {
1034 let project = self
1035 .projects
1036 .get_mut(&project_id)
1037 .ok_or_else(|| anyhow!("no such project"))?;
1038 if project.host_connection_id == connection_id
1039 || project.guests.contains_key(&connection_id)
1040 {
1041 Ok(project)
1042 } else {
1043 Err(anyhow!("no such project"))?
1044 }
1045 }
1046
1047 #[cfg(test)]
1048 pub fn check_invariants(&self) {
1049 for (connection_id, connection) in &self.connections {
1050 for project_id in &connection.projects {
1051 let project = &self.projects.get(project_id).unwrap();
1052 if project.host_connection_id != *connection_id {
1053 assert!(project.guests.contains_key(connection_id));
1054 }
1055
1056 for (worktree_id, worktree) in project.worktrees.iter() {
1057 let mut paths = HashMap::default();
1058 for entry in worktree.entries.values() {
1059 let prev_entry = paths.insert(&entry.path, entry);
1060 assert_eq!(
1061 prev_entry,
1062 None,
1063 "worktree {:?}, duplicate path for entries {:?} and {:?}",
1064 worktree_id,
1065 prev_entry.unwrap(),
1066 entry
1067 );
1068 }
1069 }
1070 }
1071
1072 assert!(self
1073 .connected_users
1074 .get(&connection.user_id)
1075 .unwrap()
1076 .connection_ids
1077 .contains(connection_id));
1078 }
1079
1080 for (user_id, state) in &self.connected_users {
1081 for connection_id in &state.connection_ids {
1082 assert_eq!(
1083 self.connections.get(connection_id).unwrap().user_id,
1084 *user_id
1085 );
1086 }
1087
1088 if let Some(active_call) = state.active_call.as_ref() {
1089 if let Some(active_call_connection_id) = active_call.connection_id {
1090 assert!(
1091 state.connection_ids.contains(&active_call_connection_id),
1092 "call is active on a dead connection"
1093 );
1094 assert!(
1095 state.connection_ids.contains(&active_call_connection_id),
1096 "call is active on a dead connection"
1097 );
1098 }
1099 }
1100 }
1101
1102 for (room_id, room) in &self.rooms {
1103 for pending_user_id in &room.pending_participant_user_ids {
1104 assert!(
1105 self.connected_users
1106 .contains_key(&UserId::from_proto(*pending_user_id)),
1107 "call is active on a user that has disconnected"
1108 );
1109 }
1110
1111 for participant in &room.participants {
1112 assert!(
1113 self.connections
1114 .contains_key(&ConnectionId(participant.peer_id)),
1115 "room {} contains participant {:?} that has disconnected",
1116 room_id,
1117 participant
1118 );
1119
1120 for participant_project in &participant.projects {
1121 let project = &self.projects[&ProjectId::from_proto(participant_project.id)];
1122 assert_eq!(
1123 project.room_id, *room_id,
1124 "project was shared on a different room"
1125 );
1126 }
1127 }
1128
1129 assert!(
1130 !room.pending_participant_user_ids.is_empty() || !room.participants.is_empty(),
1131 "room can't be empty"
1132 );
1133 }
1134
1135 for (project_id, project) in &self.projects {
1136 let host_connection = self.connections.get(&project.host_connection_id).unwrap();
1137 assert!(host_connection.projects.contains(project_id));
1138
1139 for guest_connection_id in project.guests.keys() {
1140 let guest_connection = self.connections.get(guest_connection_id).unwrap();
1141 assert!(guest_connection.projects.contains(project_id));
1142 }
1143 assert_eq!(project.active_replica_ids.len(), project.guests.len());
1144 assert_eq!(
1145 project.active_replica_ids,
1146 project
1147 .guests
1148 .values()
1149 .map(|guest| guest.replica_id)
1150 .collect::<HashSet<_>>(),
1151 );
1152
1153 let room = &self.rooms[&project.room_id];
1154 let room_participant = room
1155 .participants
1156 .iter()
1157 .find(|participant| participant.peer_id == project.host_connection_id.0)
1158 .unwrap();
1159 assert!(
1160 room_participant
1161 .projects
1162 .iter()
1163 .any(|project| project.id == project_id.to_proto()),
1164 "project was not shared in room"
1165 );
1166 }
1167 }
1168}
1169
1170impl Project {
1171 pub fn guest_connection_ids(&self) -> Vec<ConnectionId> {
1172 self.guests.keys().copied().collect()
1173 }
1174
1175 pub fn connection_ids(&self) -> Vec<ConnectionId> {
1176 self.guests
1177 .keys()
1178 .copied()
1179 .chain(Some(self.host_connection_id))
1180 .collect()
1181 }
1182}