1use crate::db::{self, ChannelId, ProjectId, UserId};
2use anyhow::{anyhow, Result};
3use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet};
4use rpc::{proto, ConnectionId};
5use serde::Serialize;
6use std::{mem, path::PathBuf, str, time::Duration};
7use time::OffsetDateTime;
8use tracing::instrument;
9use util::post_inc;
10
11pub type RoomId = u64;
12
13#[derive(Default, Serialize)]
14pub struct Store {
15 connections: BTreeMap<ConnectionId, ConnectionState>,
16 connected_users: BTreeMap<UserId, ConnectedUser>,
17 next_room_id: RoomId,
18 rooms: BTreeMap<RoomId, proto::Room>,
19 projects: BTreeMap<ProjectId, Project>,
20 #[serde(skip)]
21 channels: BTreeMap<ChannelId, Channel>,
22}
23
24#[derive(Default, Serialize)]
25struct ConnectedUser {
26 connection_ids: HashSet<ConnectionId>,
27 active_call: Option<Call>,
28}
29
30#[derive(Serialize)]
31struct ConnectionState {
32 user_id: UserId,
33 admin: bool,
34 projects: BTreeSet<ProjectId>,
35 channels: HashSet<ChannelId>,
36}
37
38#[derive(Copy, Clone, Eq, PartialEq, Serialize)]
39pub struct Call {
40 pub caller_user_id: UserId,
41 pub room_id: RoomId,
42 pub connection_id: Option<ConnectionId>,
43 pub initial_project_id: Option<ProjectId>,
44}
45
46#[derive(Serialize)]
47pub struct Project {
48 pub id: ProjectId,
49 pub room_id: RoomId,
50 pub host_connection_id: ConnectionId,
51 pub host: Collaborator,
52 pub guests: HashMap<ConnectionId, Collaborator>,
53 pub active_replica_ids: HashSet<ReplicaId>,
54 pub worktrees: BTreeMap<u64, Worktree>,
55 pub language_servers: Vec<proto::LanguageServer>,
56}
57
58#[derive(Serialize)]
59pub struct Collaborator {
60 pub replica_id: ReplicaId,
61 pub user_id: UserId,
62 #[serde(skip)]
63 pub last_activity: Option<OffsetDateTime>,
64 pub admin: bool,
65}
66
67#[derive(Default, Serialize)]
68pub struct Worktree {
69 pub abs_path: PathBuf,
70 pub root_name: String,
71 pub visible: bool,
72 #[serde(skip)]
73 pub entries: BTreeMap<u64, proto::Entry>,
74 #[serde(skip)]
75 pub diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
76 pub scan_id: u64,
77 pub is_complete: bool,
78}
79
80#[derive(Default)]
81pub struct Channel {
82 pub connection_ids: HashSet<ConnectionId>,
83}
84
85pub type ReplicaId = u16;
86
87#[derive(Default)]
88pub struct RemovedConnectionState {
89 pub user_id: UserId,
90 pub hosted_projects: Vec<Project>,
91 pub guest_projects: Vec<LeftProject>,
92 pub contact_ids: HashSet<UserId>,
93 pub room_id: Option<RoomId>,
94 pub canceled_call_connection_ids: Vec<ConnectionId>,
95}
96
97pub struct LeftProject {
98 pub id: ProjectId,
99 pub host_user_id: UserId,
100 pub host_connection_id: ConnectionId,
101 pub connection_ids: Vec<ConnectionId>,
102 pub remove_collaborator: bool,
103}
104
105pub struct LeftRoom<'a> {
106 pub room: Option<&'a proto::Room>,
107 pub unshared_projects: Vec<Project>,
108 pub left_projects: Vec<LeftProject>,
109 pub canceled_call_connection_ids: Vec<ConnectionId>,
110}
111
112#[derive(Copy, Clone)]
113pub struct Metrics {
114 pub connections: usize,
115 pub registered_projects: usize,
116 pub active_projects: usize,
117 pub shared_projects: usize,
118}
119
120impl Store {
121 pub fn metrics(&self) -> Metrics {
122 const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60);
123 let active_window_start = OffsetDateTime::now_utc() - ACTIVE_PROJECT_TIMEOUT;
124
125 let connections = self.connections.values().filter(|c| !c.admin).count();
126 let mut registered_projects = 0;
127 let mut active_projects = 0;
128 let mut shared_projects = 0;
129 for project in self.projects.values() {
130 if let Some(connection) = self.connections.get(&project.host_connection_id) {
131 if !connection.admin {
132 registered_projects += 1;
133 if project.is_active_since(active_window_start) {
134 active_projects += 1;
135 if !project.guests.is_empty() {
136 shared_projects += 1;
137 }
138 }
139 }
140 }
141 }
142
143 Metrics {
144 connections,
145 registered_projects,
146 active_projects,
147 shared_projects,
148 }
149 }
150
151 #[instrument(skip(self))]
152 pub fn add_connection(
153 &mut self,
154 connection_id: ConnectionId,
155 user_id: UserId,
156 admin: bool,
157 ) -> Option<proto::IncomingCall> {
158 self.connections.insert(
159 connection_id,
160 ConnectionState {
161 user_id,
162 admin,
163 projects: Default::default(),
164 channels: Default::default(),
165 },
166 );
167 let connected_user = self.connected_users.entry(user_id).or_default();
168 connected_user.connection_ids.insert(connection_id);
169 if let Some(active_call) = connected_user.active_call {
170 if active_call.connection_id.is_some() {
171 None
172 } else {
173 let room = self.room(active_call.room_id)?;
174 Some(proto::IncomingCall {
175 room_id: active_call.room_id,
176 caller_user_id: active_call.caller_user_id.to_proto(),
177 participant_user_ids: room
178 .participants
179 .iter()
180 .map(|participant| participant.user_id)
181 .collect(),
182 initial_project: active_call
183 .initial_project_id
184 .and_then(|id| Self::build_participant_project(id, &self.projects)),
185 })
186 }
187 } else {
188 None
189 }
190 }
191
192 #[instrument(skip(self))]
193 pub fn remove_connection(
194 &mut self,
195 connection_id: ConnectionId,
196 ) -> Result<RemovedConnectionState> {
197 let connection = self
198 .connections
199 .get_mut(&connection_id)
200 .ok_or_else(|| anyhow!("no such connection"))?;
201
202 let user_id = connection.user_id;
203 let connection_channels = mem::take(&mut connection.channels);
204
205 let mut result = RemovedConnectionState {
206 user_id,
207 ..Default::default()
208 };
209
210 // Leave all channels.
211 for channel_id in connection_channels {
212 self.leave_channel(connection_id, channel_id);
213 }
214
215 let connected_user = self.connected_users.get(&user_id).unwrap();
216 if let Some(active_call) = connected_user.active_call.as_ref() {
217 let room_id = active_call.room_id;
218 if active_call.connection_id == Some(connection_id) {
219 let left_room = self.leave_room(room_id, connection_id)?;
220 result.hosted_projects = left_room.unshared_projects;
221 result.guest_projects = left_room.left_projects;
222 result.room_id = Some(room_id);
223 result.canceled_call_connection_ids = left_room.canceled_call_connection_ids;
224 }
225 }
226
227 let connected_user = self.connected_users.get_mut(&user_id).unwrap();
228 connected_user.connection_ids.remove(&connection_id);
229 if connected_user.connection_ids.is_empty() {
230 self.connected_users.remove(&user_id);
231 }
232 self.connections.remove(&connection_id).unwrap();
233
234 Ok(result)
235 }
236
237 #[cfg(test)]
238 pub fn channel(&self, id: ChannelId) -> Option<&Channel> {
239 self.channels.get(&id)
240 }
241
242 pub fn join_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
243 if let Some(connection) = self.connections.get_mut(&connection_id) {
244 connection.channels.insert(channel_id);
245 self.channels
246 .entry(channel_id)
247 .or_default()
248 .connection_ids
249 .insert(connection_id);
250 }
251 }
252
253 pub fn leave_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
254 if let Some(connection) = self.connections.get_mut(&connection_id) {
255 connection.channels.remove(&channel_id);
256 if let btree_map::Entry::Occupied(mut entry) = self.channels.entry(channel_id) {
257 entry.get_mut().connection_ids.remove(&connection_id);
258 if entry.get_mut().connection_ids.is_empty() {
259 entry.remove();
260 }
261 }
262 }
263 }
264
265 pub fn user_id_for_connection(&self, connection_id: ConnectionId) -> Result<UserId> {
266 Ok(self
267 .connections
268 .get(&connection_id)
269 .ok_or_else(|| anyhow!("unknown connection"))?
270 .user_id)
271 }
272
273 pub fn connection_ids_for_user(
274 &self,
275 user_id: UserId,
276 ) -> impl Iterator<Item = ConnectionId> + '_ {
277 self.connected_users
278 .get(&user_id)
279 .into_iter()
280 .map(|state| &state.connection_ids)
281 .flatten()
282 .copied()
283 }
284
285 pub fn is_user_online(&self, user_id: UserId) -> bool {
286 !self
287 .connected_users
288 .get(&user_id)
289 .unwrap_or(&Default::default())
290 .connection_ids
291 .is_empty()
292 }
293
294 fn is_user_busy(&self, user_id: UserId) -> bool {
295 self.connected_users
296 .get(&user_id)
297 .unwrap_or(&Default::default())
298 .active_call
299 .is_some()
300 }
301
302 pub fn build_initial_contacts_update(
303 &self,
304 contacts: Vec<db::Contact>,
305 ) -> proto::UpdateContacts {
306 let mut update = proto::UpdateContacts::default();
307
308 for contact in contacts {
309 match contact {
310 db::Contact::Accepted {
311 user_id,
312 should_notify,
313 } => {
314 update
315 .contacts
316 .push(self.contact_for_user(user_id, should_notify));
317 }
318 db::Contact::Outgoing { user_id } => {
319 update.outgoing_requests.push(user_id.to_proto())
320 }
321 db::Contact::Incoming {
322 user_id,
323 should_notify,
324 } => update
325 .incoming_requests
326 .push(proto::IncomingContactRequest {
327 requester_id: user_id.to_proto(),
328 should_notify,
329 }),
330 }
331 }
332
333 update
334 }
335
336 pub fn contact_for_user(&self, user_id: UserId, should_notify: bool) -> proto::Contact {
337 proto::Contact {
338 user_id: user_id.to_proto(),
339 online: self.is_user_online(user_id),
340 busy: self.is_user_busy(user_id),
341 should_notify,
342 }
343 }
344
345 pub fn create_room(&mut self, creator_connection_id: ConnectionId) -> Result<RoomId> {
346 let connection = self
347 .connections
348 .get_mut(&creator_connection_id)
349 .ok_or_else(|| anyhow!("no such connection"))?;
350 let connected_user = self
351 .connected_users
352 .get_mut(&connection.user_id)
353 .ok_or_else(|| anyhow!("no such connection"))?;
354 anyhow::ensure!(
355 connected_user.active_call.is_none(),
356 "can't create a room with an active call"
357 );
358
359 let mut room = proto::Room::default();
360 room.participants.push(proto::Participant {
361 user_id: connection.user_id.to_proto(),
362 peer_id: creator_connection_id.0,
363 projects: Default::default(),
364 location: Some(proto::ParticipantLocation {
365 variant: Some(proto::participant_location::Variant::External(
366 proto::participant_location::External {},
367 )),
368 }),
369 });
370
371 let room_id = post_inc(&mut self.next_room_id);
372 self.rooms.insert(room_id, room);
373 connected_user.active_call = Some(Call {
374 caller_user_id: connection.user_id,
375 room_id,
376 connection_id: Some(creator_connection_id),
377 initial_project_id: None,
378 });
379 Ok(room_id)
380 }
381
382 pub fn join_room(
383 &mut self,
384 room_id: RoomId,
385 connection_id: ConnectionId,
386 ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
387 let connection = self
388 .connections
389 .get_mut(&connection_id)
390 .ok_or_else(|| anyhow!("no such connection"))?;
391 let user_id = connection.user_id;
392 let recipient_connection_ids = self.connection_ids_for_user(user_id).collect::<Vec<_>>();
393
394 let connected_user = self
395 .connected_users
396 .get_mut(&user_id)
397 .ok_or_else(|| anyhow!("no such connection"))?;
398 let active_call = connected_user
399 .active_call
400 .as_mut()
401 .ok_or_else(|| anyhow!("not being called"))?;
402 anyhow::ensure!(
403 active_call.room_id == room_id && active_call.connection_id.is_none(),
404 "not being called on this room"
405 );
406
407 let room = self
408 .rooms
409 .get_mut(&room_id)
410 .ok_or_else(|| anyhow!("no such room"))?;
411 anyhow::ensure!(
412 room.pending_participant_user_ids
413 .contains(&user_id.to_proto()),
414 anyhow!("no such room")
415 );
416 room.pending_participant_user_ids
417 .retain(|pending| *pending != user_id.to_proto());
418 room.participants.push(proto::Participant {
419 user_id: user_id.to_proto(),
420 peer_id: connection_id.0,
421 projects: Default::default(),
422 location: Some(proto::ParticipantLocation {
423 variant: Some(proto::participant_location::Variant::External(
424 proto::participant_location::External {},
425 )),
426 }),
427 });
428 active_call.connection_id = Some(connection_id);
429
430 Ok((room, recipient_connection_ids))
431 }
432
433 pub fn leave_room(&mut self, room_id: RoomId, connection_id: ConnectionId) -> Result<LeftRoom> {
434 let connection = self
435 .connections
436 .get_mut(&connection_id)
437 .ok_or_else(|| anyhow!("no such connection"))?;
438 let user_id = connection.user_id;
439
440 let connected_user = self
441 .connected_users
442 .get(&user_id)
443 .ok_or_else(|| anyhow!("no such connection"))?;
444 anyhow::ensure!(
445 connected_user
446 .active_call
447 .map_or(false, |call| call.room_id == room_id
448 && call.connection_id == Some(connection_id)),
449 "cannot leave a room before joining it"
450 );
451
452 // Given that users can only join one room at a time, we can safely unshare
453 // and leave all projects associated with the connection.
454 let mut unshared_projects = Vec::new();
455 let mut left_projects = Vec::new();
456 for project_id in connection.projects.clone() {
457 if let Ok((_, project)) = self.unshare_project(project_id, connection_id) {
458 unshared_projects.push(project);
459 } else if let Ok(project) = self.leave_project(project_id, connection_id) {
460 left_projects.push(project);
461 }
462 }
463 self.connected_users.get_mut(&user_id).unwrap().active_call = None;
464
465 let room = self
466 .rooms
467 .get_mut(&room_id)
468 .ok_or_else(|| anyhow!("no such room"))?;
469 room.participants
470 .retain(|participant| participant.peer_id != connection_id.0);
471
472 let mut canceled_call_connection_ids = Vec::new();
473 room.pending_participant_user_ids
474 .retain(|pending_participant_user_id| {
475 if let Some(connected_user) = self
476 .connected_users
477 .get_mut(&UserId::from_proto(*pending_participant_user_id))
478 {
479 if let Some(call) = connected_user.active_call.as_ref() {
480 if call.caller_user_id == user_id {
481 connected_user.active_call.take();
482 canceled_call_connection_ids
483 .extend(connected_user.connection_ids.iter().copied());
484 false
485 } else {
486 true
487 }
488 } else {
489 true
490 }
491 } else {
492 true
493 }
494 });
495
496 if room.participants.is_empty() && room.pending_participant_user_ids.is_empty() {
497 self.rooms.remove(&room_id);
498 }
499
500 Ok(LeftRoom {
501 room: self.rooms.get(&room_id),
502 unshared_projects,
503 left_projects,
504 canceled_call_connection_ids,
505 })
506 }
507
508 pub fn room(&self, room_id: RoomId) -> Option<&proto::Room> {
509 self.rooms.get(&room_id)
510 }
511
512 pub fn call(
513 &mut self,
514 room_id: RoomId,
515 recipient_user_id: UserId,
516 initial_project_id: Option<ProjectId>,
517 from_connection_id: ConnectionId,
518 ) -> Result<(&proto::Room, Vec<ConnectionId>, proto::IncomingCall)> {
519 let caller_user_id = self.user_id_for_connection(from_connection_id)?;
520
521 let recipient_connection_ids = self
522 .connection_ids_for_user(recipient_user_id)
523 .collect::<Vec<_>>();
524 let mut recipient = self
525 .connected_users
526 .get_mut(&recipient_user_id)
527 .ok_or_else(|| anyhow!("no such connection"))?;
528 anyhow::ensure!(
529 recipient.active_call.is_none(),
530 "recipient is already on another call"
531 );
532
533 let room = self
534 .rooms
535 .get_mut(&room_id)
536 .ok_or_else(|| anyhow!("no such room"))?;
537 anyhow::ensure!(
538 room.participants
539 .iter()
540 .any(|participant| participant.peer_id == from_connection_id.0),
541 "no such room"
542 );
543 anyhow::ensure!(
544 room.pending_participant_user_ids
545 .iter()
546 .all(|user_id| UserId::from_proto(*user_id) != recipient_user_id),
547 "cannot call the same user more than once"
548 );
549 room.pending_participant_user_ids
550 .push(recipient_user_id.to_proto());
551
552 if let Some(initial_project_id) = initial_project_id {
553 let project = self
554 .projects
555 .get(&initial_project_id)
556 .ok_or_else(|| anyhow!("no such project"))?;
557 anyhow::ensure!(project.room_id == room_id, "no such project");
558 }
559
560 recipient.active_call = Some(Call {
561 caller_user_id,
562 room_id,
563 connection_id: None,
564 initial_project_id,
565 });
566
567 Ok((
568 room,
569 recipient_connection_ids,
570 proto::IncomingCall {
571 room_id,
572 caller_user_id: caller_user_id.to_proto(),
573 participant_user_ids: room
574 .participants
575 .iter()
576 .map(|participant| participant.user_id)
577 .collect(),
578 initial_project: initial_project_id
579 .and_then(|id| Self::build_participant_project(id, &self.projects)),
580 },
581 ))
582 }
583
584 pub fn call_failed(&mut self, room_id: RoomId, to_user_id: UserId) -> Result<&proto::Room> {
585 let mut recipient = self
586 .connected_users
587 .get_mut(&to_user_id)
588 .ok_or_else(|| anyhow!("no such connection"))?;
589 anyhow::ensure!(recipient
590 .active_call
591 .map_or(false, |call| call.room_id == room_id
592 && call.connection_id.is_none()));
593 recipient.active_call = None;
594 let room = self
595 .rooms
596 .get_mut(&room_id)
597 .ok_or_else(|| anyhow!("no such room"))?;
598 room.pending_participant_user_ids
599 .retain(|user_id| UserId::from_proto(*user_id) != to_user_id);
600 Ok(room)
601 }
602
603 pub fn cancel_call(
604 &mut self,
605 room_id: RoomId,
606 recipient_user_id: UserId,
607 canceller_connection_id: ConnectionId,
608 ) -> Result<(&proto::Room, HashSet<ConnectionId>)> {
609 let canceller_user_id = self.user_id_for_connection(canceller_connection_id)?;
610 let canceller = self
611 .connected_users
612 .get(&canceller_user_id)
613 .ok_or_else(|| anyhow!("no such connection"))?;
614 let recipient = self
615 .connected_users
616 .get(&recipient_user_id)
617 .ok_or_else(|| anyhow!("no such connection"))?;
618 let canceller_active_call = canceller
619 .active_call
620 .as_ref()
621 .ok_or_else(|| anyhow!("no active call"))?;
622 let recipient_active_call = recipient
623 .active_call
624 .as_ref()
625 .ok_or_else(|| anyhow!("no active call for recipient"))?;
626
627 anyhow::ensure!(
628 canceller_active_call.room_id == room_id,
629 "users are on different calls"
630 );
631 anyhow::ensure!(
632 recipient_active_call.room_id == room_id,
633 "users are on different calls"
634 );
635 anyhow::ensure!(
636 recipient_active_call.connection_id.is_none(),
637 "recipient has already answered"
638 );
639 let room_id = recipient_active_call.room_id;
640 let room = self
641 .rooms
642 .get_mut(&room_id)
643 .ok_or_else(|| anyhow!("no such room"))?;
644 room.pending_participant_user_ids
645 .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
646
647 let recipient = self.connected_users.get_mut(&recipient_user_id).unwrap();
648 recipient.active_call.take();
649
650 Ok((room, recipient.connection_ids.clone()))
651 }
652
653 pub fn decline_call(
654 &mut self,
655 room_id: RoomId,
656 recipient_connection_id: ConnectionId,
657 ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
658 let recipient_user_id = self.user_id_for_connection(recipient_connection_id)?;
659 let recipient = self
660 .connected_users
661 .get_mut(&recipient_user_id)
662 .ok_or_else(|| anyhow!("no such connection"))?;
663 if let Some(active_call) = recipient.active_call.take() {
664 anyhow::ensure!(active_call.room_id == room_id, "no such room");
665 let recipient_connection_ids = self
666 .connection_ids_for_user(recipient_user_id)
667 .collect::<Vec<_>>();
668 let room = self
669 .rooms
670 .get_mut(&active_call.room_id)
671 .ok_or_else(|| anyhow!("no such room"))?;
672 room.pending_participant_user_ids
673 .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
674 Ok((room, recipient_connection_ids))
675 } else {
676 Err(anyhow!("user is not being called"))
677 }
678 }
679
680 pub fn update_participant_location(
681 &mut self,
682 room_id: RoomId,
683 location: proto::ParticipantLocation,
684 connection_id: ConnectionId,
685 ) -> Result<&proto::Room> {
686 let room = self
687 .rooms
688 .get_mut(&room_id)
689 .ok_or_else(|| anyhow!("no such room"))?;
690 if let Some(proto::participant_location::Variant::SharedProject(project)) =
691 location.variant.as_ref()
692 {
693 anyhow::ensure!(
694 room.participants
695 .iter()
696 .flat_map(|participant| &participant.projects)
697 .any(|participant_project| participant_project.id == project.id),
698 "no such project"
699 );
700 }
701
702 let participant = room
703 .participants
704 .iter_mut()
705 .find(|participant| participant.peer_id == connection_id.0)
706 .ok_or_else(|| anyhow!("no such room"))?;
707 participant.location = Some(location);
708
709 Ok(room)
710 }
711
712 pub fn share_project(
713 &mut self,
714 room_id: RoomId,
715 project_id: ProjectId,
716 worktrees: Vec<proto::WorktreeMetadata>,
717 host_connection_id: ConnectionId,
718 ) -> Result<&proto::Room> {
719 let connection = self
720 .connections
721 .get_mut(&host_connection_id)
722 .ok_or_else(|| anyhow!("no such connection"))?;
723
724 let room = self
725 .rooms
726 .get_mut(&room_id)
727 .ok_or_else(|| anyhow!("no such room"))?;
728 let participant = room
729 .participants
730 .iter_mut()
731 .find(|participant| participant.peer_id == host_connection_id.0)
732 .ok_or_else(|| anyhow!("no such room"))?;
733
734 connection.projects.insert(project_id);
735 self.projects.insert(
736 project_id,
737 Project {
738 id: project_id,
739 room_id,
740 host_connection_id,
741 host: Collaborator {
742 user_id: connection.user_id,
743 replica_id: 0,
744 last_activity: None,
745 admin: connection.admin,
746 },
747 guests: Default::default(),
748 active_replica_ids: Default::default(),
749 worktrees: worktrees
750 .into_iter()
751 .map(|worktree| {
752 (
753 worktree.id,
754 Worktree {
755 root_name: worktree.root_name,
756 visible: worktree.visible,
757 ..Default::default()
758 },
759 )
760 })
761 .collect(),
762 language_servers: Default::default(),
763 },
764 );
765
766 participant
767 .projects
768 .extend(Self::build_participant_project(project_id, &self.projects));
769
770 Ok(room)
771 }
772
773 pub fn unshare_project(
774 &mut self,
775 project_id: ProjectId,
776 connection_id: ConnectionId,
777 ) -> Result<(&proto::Room, Project)> {
778 match self.projects.entry(project_id) {
779 btree_map::Entry::Occupied(e) => {
780 if e.get().host_connection_id == connection_id {
781 let project = e.remove();
782
783 if let Some(host_connection) = self.connections.get_mut(&connection_id) {
784 host_connection.projects.remove(&project_id);
785 }
786
787 for guest_connection in project.guests.keys() {
788 if let Some(connection) = self.connections.get_mut(guest_connection) {
789 connection.projects.remove(&project_id);
790 }
791 }
792
793 let room = self
794 .rooms
795 .get_mut(&project.room_id)
796 .ok_or_else(|| anyhow!("no such room"))?;
797 let participant = room
798 .participants
799 .iter_mut()
800 .find(|participant| participant.peer_id == connection_id.0)
801 .ok_or_else(|| anyhow!("no such room"))?;
802 participant
803 .projects
804 .retain(|project| project.id != project_id.to_proto());
805
806 Ok((room, project))
807 } else {
808 Err(anyhow!("no such project"))?
809 }
810 }
811 btree_map::Entry::Vacant(_) => Err(anyhow!("no such project"))?,
812 }
813 }
814
815 pub fn update_project(
816 &mut self,
817 project_id: ProjectId,
818 worktrees: &[proto::WorktreeMetadata],
819 connection_id: ConnectionId,
820 ) -> Result<&proto::Room> {
821 let project = self
822 .projects
823 .get_mut(&project_id)
824 .ok_or_else(|| anyhow!("no such project"))?;
825 if project.host_connection_id == connection_id {
826 let mut old_worktrees = mem::take(&mut project.worktrees);
827 for worktree in worktrees {
828 if let Some(old_worktree) = old_worktrees.remove(&worktree.id) {
829 project.worktrees.insert(worktree.id, old_worktree);
830 } else {
831 project.worktrees.insert(
832 worktree.id,
833 Worktree {
834 root_name: worktree.root_name.clone(),
835 visible: worktree.visible,
836 ..Default::default()
837 },
838 );
839 }
840 }
841
842 let room = self
843 .rooms
844 .get_mut(&project.room_id)
845 .ok_or_else(|| anyhow!("no such room"))?;
846 let participant_project = room
847 .participants
848 .iter_mut()
849 .flat_map(|participant| &mut participant.projects)
850 .find(|project| project.id == project_id.to_proto())
851 .ok_or_else(|| anyhow!("no such project"))?;
852 participant_project.worktree_root_names = worktrees
853 .iter()
854 .filter(|worktree| worktree.visible)
855 .map(|worktree| worktree.root_name.clone())
856 .collect();
857
858 Ok(room)
859 } else {
860 Err(anyhow!("no such project"))?
861 }
862 }
863
864 pub fn update_diagnostic_summary(
865 &mut self,
866 project_id: ProjectId,
867 worktree_id: u64,
868 connection_id: ConnectionId,
869 summary: proto::DiagnosticSummary,
870 ) -> Result<Vec<ConnectionId>> {
871 let project = self
872 .projects
873 .get_mut(&project_id)
874 .ok_or_else(|| anyhow!("no such project"))?;
875 if project.host_connection_id == connection_id {
876 let worktree = project
877 .worktrees
878 .get_mut(&worktree_id)
879 .ok_or_else(|| anyhow!("no such worktree"))?;
880 worktree
881 .diagnostic_summaries
882 .insert(summary.path.clone().into(), summary);
883 return Ok(project.connection_ids());
884 }
885
886 Err(anyhow!("no such worktree"))?
887 }
888
889 pub fn start_language_server(
890 &mut self,
891 project_id: ProjectId,
892 connection_id: ConnectionId,
893 language_server: proto::LanguageServer,
894 ) -> Result<Vec<ConnectionId>> {
895 let project = self
896 .projects
897 .get_mut(&project_id)
898 .ok_or_else(|| anyhow!("no such project"))?;
899 if project.host_connection_id == connection_id {
900 project.language_servers.push(language_server);
901 return Ok(project.connection_ids());
902 }
903
904 Err(anyhow!("no such project"))?
905 }
906
907 pub fn join_project(
908 &mut self,
909 requester_connection_id: ConnectionId,
910 project_id: ProjectId,
911 ) -> Result<(&Project, ReplicaId)> {
912 let connection = self
913 .connections
914 .get_mut(&requester_connection_id)
915 .ok_or_else(|| anyhow!("no such connection"))?;
916 let user = self
917 .connected_users
918 .get(&connection.user_id)
919 .ok_or_else(|| anyhow!("no such connection"))?;
920 let active_call = user.active_call.ok_or_else(|| anyhow!("no such project"))?;
921 anyhow::ensure!(
922 active_call.connection_id == Some(requester_connection_id),
923 "no such project"
924 );
925
926 let project = self
927 .projects
928 .get_mut(&project_id)
929 .ok_or_else(|| anyhow!("no such project"))?;
930 anyhow::ensure!(project.room_id == active_call.room_id, "no such project");
931
932 connection.projects.insert(project_id);
933 let mut replica_id = 1;
934 while project.active_replica_ids.contains(&replica_id) {
935 replica_id += 1;
936 }
937 project.active_replica_ids.insert(replica_id);
938 project.guests.insert(
939 requester_connection_id,
940 Collaborator {
941 replica_id,
942 user_id: connection.user_id,
943 last_activity: Some(OffsetDateTime::now_utc()),
944 admin: connection.admin,
945 },
946 );
947
948 project.host.last_activity = Some(OffsetDateTime::now_utc());
949 Ok((project, replica_id))
950 }
951
952 pub fn leave_project(
953 &mut self,
954 project_id: ProjectId,
955 connection_id: ConnectionId,
956 ) -> Result<LeftProject> {
957 let project = self
958 .projects
959 .get_mut(&project_id)
960 .ok_or_else(|| anyhow!("no such project"))?;
961
962 // If the connection leaving the project is a collaborator, remove it.
963 let remove_collaborator = if let Some(guest) = project.guests.remove(&connection_id) {
964 project.active_replica_ids.remove(&guest.replica_id);
965 true
966 } else {
967 false
968 };
969
970 if let Some(connection) = self.connections.get_mut(&connection_id) {
971 connection.projects.remove(&project_id);
972 }
973
974 Ok(LeftProject {
975 id: project.id,
976 host_connection_id: project.host_connection_id,
977 host_user_id: project.host.user_id,
978 connection_ids: project.connection_ids(),
979 remove_collaborator,
980 })
981 }
982
983 #[allow(clippy::too_many_arguments)]
984 pub fn update_worktree(
985 &mut self,
986 connection_id: ConnectionId,
987 project_id: ProjectId,
988 worktree_id: u64,
989 worktree_root_name: &str,
990 removed_entries: &[u64],
991 updated_entries: &[proto::Entry],
992 scan_id: u64,
993 is_last_update: bool,
994 ) -> Result<Vec<ConnectionId>> {
995 let project = self.write_project(project_id, connection_id)?;
996
997 let connection_ids = project.connection_ids();
998 let mut worktree = project.worktrees.entry(worktree_id).or_default();
999 worktree.root_name = worktree_root_name.to_string();
1000
1001 for entry_id in removed_entries {
1002 worktree.entries.remove(entry_id);
1003 }
1004
1005 for entry in updated_entries {
1006 worktree.entries.insert(entry.id, entry.clone());
1007 }
1008
1009 worktree.scan_id = scan_id;
1010 worktree.is_complete = is_last_update;
1011 Ok(connection_ids)
1012 }
1013
1014 fn build_participant_project(
1015 project_id: ProjectId,
1016 projects: &BTreeMap<ProjectId, Project>,
1017 ) -> Option<proto::ParticipantProject> {
1018 Some(proto::ParticipantProject {
1019 id: project_id.to_proto(),
1020 worktree_root_names: projects
1021 .get(&project_id)?
1022 .worktrees
1023 .values()
1024 .filter(|worktree| worktree.visible)
1025 .map(|worktree| worktree.root_name.clone())
1026 .collect(),
1027 })
1028 }
1029
1030 pub fn project_connection_ids(
1031 &self,
1032 project_id: ProjectId,
1033 acting_connection_id: ConnectionId,
1034 ) -> Result<Vec<ConnectionId>> {
1035 Ok(self
1036 .read_project(project_id, acting_connection_id)?
1037 .connection_ids())
1038 }
1039
1040 pub fn channel_connection_ids(&self, channel_id: ChannelId) -> Result<Vec<ConnectionId>> {
1041 Ok(self
1042 .channels
1043 .get(&channel_id)
1044 .ok_or_else(|| anyhow!("no such channel"))?
1045 .connection_ids())
1046 }
1047
1048 pub fn project(&self, project_id: ProjectId) -> Result<&Project> {
1049 self.projects
1050 .get(&project_id)
1051 .ok_or_else(|| anyhow!("no such project"))
1052 }
1053
1054 pub fn register_project_activity(
1055 &mut self,
1056 project_id: ProjectId,
1057 connection_id: ConnectionId,
1058 ) -> Result<()> {
1059 let project = self
1060 .projects
1061 .get_mut(&project_id)
1062 .ok_or_else(|| anyhow!("no such project"))?;
1063 let collaborator = if connection_id == project.host_connection_id {
1064 &mut project.host
1065 } else if let Some(guest) = project.guests.get_mut(&connection_id) {
1066 guest
1067 } else {
1068 return Err(anyhow!("no such project"))?;
1069 };
1070 collaborator.last_activity = Some(OffsetDateTime::now_utc());
1071 Ok(())
1072 }
1073
1074 pub fn projects(&self) -> impl Iterator<Item = (&ProjectId, &Project)> {
1075 self.projects.iter()
1076 }
1077
1078 pub fn read_project(
1079 &self,
1080 project_id: ProjectId,
1081 connection_id: ConnectionId,
1082 ) -> Result<&Project> {
1083 let project = self
1084 .projects
1085 .get(&project_id)
1086 .ok_or_else(|| anyhow!("no such project"))?;
1087 if project.host_connection_id == connection_id
1088 || project.guests.contains_key(&connection_id)
1089 {
1090 Ok(project)
1091 } else {
1092 Err(anyhow!("no such project"))?
1093 }
1094 }
1095
1096 fn write_project(
1097 &mut self,
1098 project_id: ProjectId,
1099 connection_id: ConnectionId,
1100 ) -> Result<&mut Project> {
1101 let project = self
1102 .projects
1103 .get_mut(&project_id)
1104 .ok_or_else(|| anyhow!("no such project"))?;
1105 if project.host_connection_id == connection_id
1106 || project.guests.contains_key(&connection_id)
1107 {
1108 Ok(project)
1109 } else {
1110 Err(anyhow!("no such project"))?
1111 }
1112 }
1113
1114 #[cfg(test)]
1115 pub fn check_invariants(&self) {
1116 for (connection_id, connection) in &self.connections {
1117 for project_id in &connection.projects {
1118 let project = &self.projects.get(project_id).unwrap();
1119 if project.host_connection_id != *connection_id {
1120 assert!(project.guests.contains_key(connection_id));
1121 }
1122
1123 for (worktree_id, worktree) in project.worktrees.iter() {
1124 let mut paths = HashMap::default();
1125 for entry in worktree.entries.values() {
1126 let prev_entry = paths.insert(&entry.path, entry);
1127 assert_eq!(
1128 prev_entry,
1129 None,
1130 "worktree {:?}, duplicate path for entries {:?} and {:?}",
1131 worktree_id,
1132 prev_entry.unwrap(),
1133 entry
1134 );
1135 }
1136 }
1137 }
1138 for channel_id in &connection.channels {
1139 let channel = self.channels.get(channel_id).unwrap();
1140 assert!(channel.connection_ids.contains(connection_id));
1141 }
1142 assert!(self
1143 .connected_users
1144 .get(&connection.user_id)
1145 .unwrap()
1146 .connection_ids
1147 .contains(connection_id));
1148 }
1149
1150 for (user_id, state) in &self.connected_users {
1151 for connection_id in &state.connection_ids {
1152 assert_eq!(
1153 self.connections.get(connection_id).unwrap().user_id,
1154 *user_id
1155 );
1156 }
1157
1158 if let Some(active_call) = state.active_call.as_ref() {
1159 if let Some(active_call_connection_id) = active_call.connection_id {
1160 assert!(
1161 state.connection_ids.contains(&active_call_connection_id),
1162 "call is active on a dead connection"
1163 );
1164 assert!(
1165 state.connection_ids.contains(&active_call_connection_id),
1166 "call is active on a dead connection"
1167 );
1168 }
1169 }
1170 }
1171
1172 for (room_id, room) in &self.rooms {
1173 for pending_user_id in &room.pending_participant_user_ids {
1174 assert!(
1175 self.connected_users
1176 .contains_key(&UserId::from_proto(*pending_user_id)),
1177 "call is active on a user that has disconnected"
1178 );
1179 }
1180
1181 for participant in &room.participants {
1182 assert!(
1183 self.connections
1184 .contains_key(&ConnectionId(participant.peer_id)),
1185 "room contains participant that has disconnected"
1186 );
1187
1188 for participant_project in &participant.projects {
1189 let project = &self.projects[&ProjectId::from_proto(participant_project.id)];
1190 assert_eq!(
1191 project.room_id, *room_id,
1192 "project was shared on a different room"
1193 );
1194 }
1195 }
1196
1197 assert!(
1198 !room.pending_participant_user_ids.is_empty() || !room.participants.is_empty(),
1199 "room can't be empty"
1200 );
1201 }
1202
1203 for (project_id, project) in &self.projects {
1204 let host_connection = self.connections.get(&project.host_connection_id).unwrap();
1205 assert!(host_connection.projects.contains(project_id));
1206
1207 for guest_connection_id in project.guests.keys() {
1208 let guest_connection = self.connections.get(guest_connection_id).unwrap();
1209 assert!(guest_connection.projects.contains(project_id));
1210 }
1211 assert_eq!(project.active_replica_ids.len(), project.guests.len());
1212 assert_eq!(
1213 project.active_replica_ids,
1214 project
1215 .guests
1216 .values()
1217 .map(|guest| guest.replica_id)
1218 .collect::<HashSet<_>>(),
1219 );
1220
1221 let room = &self.rooms[&project.room_id];
1222 let room_participant = room
1223 .participants
1224 .iter()
1225 .find(|participant| participant.peer_id == project.host_connection_id.0)
1226 .unwrap();
1227 assert!(
1228 room_participant
1229 .projects
1230 .iter()
1231 .any(|project| project.id == project_id.to_proto()),
1232 "project was not shared in room"
1233 );
1234 }
1235
1236 for (channel_id, channel) in &self.channels {
1237 for connection_id in &channel.connection_ids {
1238 let connection = self.connections.get(connection_id).unwrap();
1239 assert!(connection.channels.contains(channel_id));
1240 }
1241 }
1242 }
1243}
1244
1245impl Project {
1246 fn is_active_since(&self, start_time: OffsetDateTime) -> bool {
1247 self.guests
1248 .values()
1249 .chain([&self.host])
1250 .any(|collaborator| {
1251 collaborator
1252 .last_activity
1253 .map_or(false, |active_time| active_time > start_time)
1254 })
1255 }
1256
1257 pub fn guest_connection_ids(&self) -> Vec<ConnectionId> {
1258 self.guests.keys().copied().collect()
1259 }
1260
1261 pub fn connection_ids(&self) -> Vec<ConnectionId> {
1262 self.guests
1263 .keys()
1264 .copied()
1265 .chain(Some(self.host_connection_id))
1266 .collect()
1267 }
1268}
1269
1270impl Channel {
1271 fn connection_ids(&self) -> Vec<ConnectionId> {
1272 self.connection_ids.iter().copied().collect()
1273 }
1274}