1use crate::db::{self, ChannelId, ProjectId, UserId};
2use anyhow::{anyhow, Result};
3use collections::{btree_map, hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet};
4use rpc::{proto, ConnectionId, Receipt};
5use serde::Serialize;
6use std::{mem, path::PathBuf, str, time::Duration};
7use time::OffsetDateTime;
8use tracing::instrument;
9
10#[derive(Default, Serialize)]
11pub struct Store {
12 connections: BTreeMap<ConnectionId, ConnectionState>,
13 connections_by_user_id: BTreeMap<UserId, HashSet<ConnectionId>>,
14 projects: BTreeMap<ProjectId, Project>,
15 #[serde(skip)]
16 channels: BTreeMap<ChannelId, Channel>,
17}
18
19#[derive(Serialize)]
20struct ConnectionState {
21 user_id: UserId,
22 admin: bool,
23 projects: BTreeSet<ProjectId>,
24 requested_projects: HashSet<ProjectId>,
25 channels: HashSet<ChannelId>,
26}
27
28#[derive(Serialize)]
29pub struct Project {
30 pub online: bool,
31 pub host_connection_id: ConnectionId,
32 pub host: Collaborator,
33 pub guests: HashMap<ConnectionId, Collaborator>,
34 #[serde(skip)]
35 pub join_requests: HashMap<UserId, Vec<Receipt<proto::JoinProject>>>,
36 pub active_replica_ids: HashSet<ReplicaId>,
37 pub worktrees: BTreeMap<u64, Worktree>,
38 pub language_servers: Vec<proto::LanguageServer>,
39}
40
41#[derive(Serialize)]
42pub struct Collaborator {
43 pub replica_id: ReplicaId,
44 pub user_id: UserId,
45 #[serde(skip)]
46 pub last_activity: Option<OffsetDateTime>,
47 pub admin: bool,
48}
49
50#[derive(Default, Serialize)]
51pub struct Worktree {
52 pub root_name: String,
53 pub visible: bool,
54 #[serde(skip)]
55 pub entries: BTreeMap<u64, proto::Entry>,
56 #[serde(skip)]
57 pub diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
58 pub scan_id: u64,
59 pub is_complete: bool,
60}
61
62#[derive(Default)]
63pub struct Channel {
64 pub connection_ids: HashSet<ConnectionId>,
65}
66
67pub type ReplicaId = u16;
68
69#[derive(Default)]
70pub struct RemovedConnectionState {
71 pub user_id: UserId,
72 pub hosted_projects: HashMap<ProjectId, Project>,
73 pub guest_project_ids: HashSet<ProjectId>,
74 pub contact_ids: HashSet<UserId>,
75}
76
77pub struct LeftProject {
78 pub host_user_id: UserId,
79 pub host_connection_id: ConnectionId,
80 pub connection_ids: Vec<ConnectionId>,
81 pub remove_collaborator: bool,
82 pub cancel_request: Option<UserId>,
83 pub unshare: bool,
84}
85
86pub struct UnsharedProject {
87 pub guests: HashMap<ConnectionId, Collaborator>,
88 pub pending_join_requests: HashMap<UserId, Vec<Receipt<proto::JoinProject>>>,
89}
90
91#[derive(Copy, Clone)]
92pub struct Metrics {
93 pub connections: usize,
94 pub registered_projects: usize,
95 pub active_projects: usize,
96 pub shared_projects: usize,
97}
98
99impl Store {
100 pub fn metrics(&self) -> Metrics {
101 const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60);
102 let active_window_start = OffsetDateTime::now_utc() - ACTIVE_PROJECT_TIMEOUT;
103
104 let connections = self.connections.values().filter(|c| !c.admin).count();
105 let mut registered_projects = 0;
106 let mut active_projects = 0;
107 let mut shared_projects = 0;
108 for project in self.projects.values() {
109 if let Some(connection) = self.connections.get(&project.host_connection_id) {
110 if !connection.admin {
111 registered_projects += 1;
112 if project.is_active_since(active_window_start) {
113 active_projects += 1;
114 if !project.guests.is_empty() {
115 shared_projects += 1;
116 }
117 }
118 }
119 }
120 }
121
122 Metrics {
123 connections,
124 registered_projects,
125 active_projects,
126 shared_projects,
127 }
128 }
129
130 #[instrument(skip(self))]
131 pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId, admin: bool) {
132 self.connections.insert(
133 connection_id,
134 ConnectionState {
135 user_id,
136 admin,
137 projects: Default::default(),
138 requested_projects: Default::default(),
139 channels: Default::default(),
140 },
141 );
142 self.connections_by_user_id
143 .entry(user_id)
144 .or_default()
145 .insert(connection_id);
146 }
147
148 #[instrument(skip(self))]
149 pub fn remove_connection(
150 &mut self,
151 connection_id: ConnectionId,
152 ) -> Result<RemovedConnectionState> {
153 let connection = self
154 .connections
155 .get_mut(&connection_id)
156 .ok_or_else(|| anyhow!("no such connection"))?;
157
158 let user_id = connection.user_id;
159 let connection_projects = mem::take(&mut connection.projects);
160 let connection_channels = mem::take(&mut connection.channels);
161
162 let mut result = RemovedConnectionState::default();
163 result.user_id = user_id;
164
165 // Leave all channels.
166 for channel_id in connection_channels {
167 self.leave_channel(connection_id, channel_id);
168 }
169
170 // Unregister and leave all projects.
171 for project_id in connection_projects {
172 if let Ok(project) = self.unregister_project(project_id, connection_id) {
173 result.hosted_projects.insert(project_id, project);
174 } else if self.leave_project(connection_id, project_id).is_ok() {
175 result.guest_project_ids.insert(project_id);
176 }
177 }
178
179 let user_connections = self.connections_by_user_id.get_mut(&user_id).unwrap();
180 user_connections.remove(&connection_id);
181 if user_connections.is_empty() {
182 self.connections_by_user_id.remove(&user_id);
183 }
184
185 self.connections.remove(&connection_id).unwrap();
186
187 Ok(result)
188 }
189
190 #[cfg(test)]
191 pub fn channel(&self, id: ChannelId) -> Option<&Channel> {
192 self.channels.get(&id)
193 }
194
195 pub fn join_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
196 if let Some(connection) = self.connections.get_mut(&connection_id) {
197 connection.channels.insert(channel_id);
198 self.channels
199 .entry(channel_id)
200 .or_default()
201 .connection_ids
202 .insert(connection_id);
203 }
204 }
205
206 pub fn leave_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
207 if let Some(connection) = self.connections.get_mut(&connection_id) {
208 connection.channels.remove(&channel_id);
209 if let btree_map::Entry::Occupied(mut entry) = self.channels.entry(channel_id) {
210 entry.get_mut().connection_ids.remove(&connection_id);
211 if entry.get_mut().connection_ids.is_empty() {
212 entry.remove();
213 }
214 }
215 }
216 }
217
218 pub fn user_id_for_connection(&self, connection_id: ConnectionId) -> Result<UserId> {
219 Ok(self
220 .connections
221 .get(&connection_id)
222 .ok_or_else(|| anyhow!("unknown connection"))?
223 .user_id)
224 }
225
226 pub fn connection_ids_for_user<'a>(
227 &'a self,
228 user_id: UserId,
229 ) -> impl 'a + Iterator<Item = ConnectionId> {
230 self.connections_by_user_id
231 .get(&user_id)
232 .into_iter()
233 .flatten()
234 .copied()
235 }
236
237 pub fn is_user_online(&self, user_id: UserId) -> bool {
238 !self
239 .connections_by_user_id
240 .get(&user_id)
241 .unwrap_or(&Default::default())
242 .is_empty()
243 }
244
245 pub fn build_initial_contacts_update(
246 &self,
247 contacts: Vec<db::Contact>,
248 ) -> proto::UpdateContacts {
249 let mut update = proto::UpdateContacts::default();
250
251 for contact in contacts {
252 match contact {
253 db::Contact::Accepted {
254 user_id,
255 should_notify,
256 } => {
257 update
258 .contacts
259 .push(self.contact_for_user(user_id, should_notify));
260 }
261 db::Contact::Outgoing { user_id } => {
262 update.outgoing_requests.push(user_id.to_proto())
263 }
264 db::Contact::Incoming {
265 user_id,
266 should_notify,
267 } => update
268 .incoming_requests
269 .push(proto::IncomingContactRequest {
270 requester_id: user_id.to_proto(),
271 should_notify,
272 }),
273 }
274 }
275
276 update
277 }
278
279 pub fn contact_for_user(&self, user_id: UserId, should_notify: bool) -> proto::Contact {
280 proto::Contact {
281 user_id: user_id.to_proto(),
282 projects: self.project_metadata_for_user(user_id),
283 online: self.is_user_online(user_id),
284 should_notify,
285 }
286 }
287
288 pub fn project_metadata_for_user(&self, user_id: UserId) -> Vec<proto::ProjectMetadata> {
289 let connection_ids = self.connections_by_user_id.get(&user_id);
290 let project_ids = connection_ids.iter().flat_map(|connection_ids| {
291 connection_ids
292 .iter()
293 .filter_map(|connection_id| self.connections.get(connection_id))
294 .flat_map(|connection| connection.projects.iter().copied())
295 });
296
297 let mut metadata = Vec::new();
298 for project_id in project_ids {
299 if let Some(project) = self.projects.get(&project_id) {
300 if project.host.user_id == user_id && project.online {
301 metadata.push(proto::ProjectMetadata {
302 id: project_id.to_proto(),
303 visible_worktree_root_names: project
304 .worktrees
305 .values()
306 .filter(|worktree| worktree.visible)
307 .map(|worktree| worktree.root_name.clone())
308 .collect(),
309 guests: project
310 .guests
311 .values()
312 .map(|guest| guest.user_id.to_proto())
313 .collect(),
314 });
315 }
316 }
317 }
318
319 metadata
320 }
321
322 pub fn register_project(
323 &mut self,
324 host_connection_id: ConnectionId,
325 project_id: ProjectId,
326 online: bool,
327 ) -> Result<()> {
328 let connection = self
329 .connections
330 .get_mut(&host_connection_id)
331 .ok_or_else(|| anyhow!("no such connection"))?;
332 connection.projects.insert(project_id);
333 self.projects.insert(
334 project_id,
335 Project {
336 online,
337 host_connection_id,
338 host: Collaborator {
339 user_id: connection.user_id,
340 replica_id: 0,
341 last_activity: None,
342 admin: connection.admin,
343 },
344 guests: Default::default(),
345 join_requests: Default::default(),
346 active_replica_ids: Default::default(),
347 worktrees: Default::default(),
348 language_servers: Default::default(),
349 },
350 );
351 Ok(())
352 }
353
354 pub fn update_project(
355 &mut self,
356 project_id: ProjectId,
357 worktrees: &[proto::WorktreeMetadata],
358 online: bool,
359 connection_id: ConnectionId,
360 ) -> Result<Option<UnsharedProject>> {
361 let project = self
362 .projects
363 .get_mut(&project_id)
364 .ok_or_else(|| anyhow!("no such project"))?;
365 if project.host_connection_id == connection_id {
366 let mut old_worktrees = mem::take(&mut project.worktrees);
367 for worktree in worktrees {
368 if let Some(old_worktree) = old_worktrees.remove(&worktree.id) {
369 project.worktrees.insert(worktree.id, old_worktree);
370 } else {
371 project.worktrees.insert(
372 worktree.id,
373 Worktree {
374 root_name: worktree.root_name.clone(),
375 visible: worktree.visible,
376 ..Default::default()
377 },
378 );
379 }
380 }
381
382 if online != project.online {
383 project.online = online;
384 if project.online {
385 Ok(None)
386 } else {
387 for connection_id in project.guest_connection_ids() {
388 if let Some(connection) = self.connections.get_mut(&connection_id) {
389 connection.projects.remove(&project_id);
390 }
391 }
392
393 project.active_replica_ids.clear();
394 project.language_servers.clear();
395 for worktree in project.worktrees.values_mut() {
396 worktree.diagnostic_summaries.clear();
397 worktree.entries.clear();
398 }
399
400 Ok(Some(UnsharedProject {
401 guests: mem::take(&mut project.guests),
402 pending_join_requests: mem::take(&mut project.join_requests),
403 }))
404 }
405 } else {
406 Ok(None)
407 }
408 } else {
409 Err(anyhow!("no such project"))?
410 }
411 }
412
413 pub fn unregister_project(
414 &mut self,
415 project_id: ProjectId,
416 connection_id: ConnectionId,
417 ) -> Result<Project> {
418 match self.projects.entry(project_id) {
419 btree_map::Entry::Occupied(e) => {
420 if e.get().host_connection_id == connection_id {
421 let project = e.remove();
422
423 if let Some(host_connection) = self.connections.get_mut(&connection_id) {
424 host_connection.projects.remove(&project_id);
425 }
426
427 for guest_connection in project.guests.keys() {
428 if let Some(connection) = self.connections.get_mut(&guest_connection) {
429 connection.projects.remove(&project_id);
430 }
431 }
432
433 for requester_user_id in project.join_requests.keys() {
434 if let Some(requester_connection_ids) =
435 self.connections_by_user_id.get_mut(&requester_user_id)
436 {
437 for requester_connection_id in requester_connection_ids.iter() {
438 if let Some(requester_connection) =
439 self.connections.get_mut(requester_connection_id)
440 {
441 requester_connection.requested_projects.remove(&project_id);
442 }
443 }
444 }
445 }
446
447 Ok(project)
448 } else {
449 Err(anyhow!("no such project"))?
450 }
451 }
452 btree_map::Entry::Vacant(_) => Err(anyhow!("no such project"))?,
453 }
454 }
455
456 pub fn update_diagnostic_summary(
457 &mut self,
458 project_id: ProjectId,
459 worktree_id: u64,
460 connection_id: ConnectionId,
461 summary: proto::DiagnosticSummary,
462 ) -> Result<Vec<ConnectionId>> {
463 let project = self
464 .projects
465 .get_mut(&project_id)
466 .ok_or_else(|| anyhow!("no such project"))?;
467 if project.host_connection_id == connection_id {
468 let worktree = project
469 .worktrees
470 .get_mut(&worktree_id)
471 .ok_or_else(|| anyhow!("no such worktree"))?;
472 worktree
473 .diagnostic_summaries
474 .insert(summary.path.clone().into(), summary);
475 return Ok(project.connection_ids());
476 }
477
478 Err(anyhow!("no such worktree"))?
479 }
480
481 pub fn start_language_server(
482 &mut self,
483 project_id: ProjectId,
484 connection_id: ConnectionId,
485 language_server: proto::LanguageServer,
486 ) -> Result<Vec<ConnectionId>> {
487 let project = self
488 .projects
489 .get_mut(&project_id)
490 .ok_or_else(|| anyhow!("no such project"))?;
491 if project.host_connection_id == connection_id {
492 project.language_servers.push(language_server);
493 return Ok(project.connection_ids());
494 }
495
496 Err(anyhow!("no such project"))?
497 }
498
499 pub fn request_join_project(
500 &mut self,
501 requester_id: UserId,
502 project_id: ProjectId,
503 receipt: Receipt<proto::JoinProject>,
504 ) -> Result<()> {
505 let connection = self
506 .connections
507 .get_mut(&receipt.sender_id)
508 .ok_or_else(|| anyhow!("no such connection"))?;
509 let project = self
510 .projects
511 .get_mut(&project_id)
512 .ok_or_else(|| anyhow!("no such project"))?;
513 if project.online {
514 connection.requested_projects.insert(project_id);
515 project
516 .join_requests
517 .entry(requester_id)
518 .or_default()
519 .push(receipt);
520 Ok(())
521 } else {
522 Err(anyhow!("no such project"))
523 }
524 }
525
526 pub fn deny_join_project_request(
527 &mut self,
528 responder_connection_id: ConnectionId,
529 requester_id: UserId,
530 project_id: ProjectId,
531 ) -> Option<Vec<Receipt<proto::JoinProject>>> {
532 let project = self.projects.get_mut(&project_id)?;
533 if responder_connection_id != project.host_connection_id {
534 return None;
535 }
536
537 let receipts = project.join_requests.remove(&requester_id)?;
538 for receipt in &receipts {
539 let requester_connection = self.connections.get_mut(&receipt.sender_id)?;
540 requester_connection.requested_projects.remove(&project_id);
541 }
542 project.host.last_activity = Some(OffsetDateTime::now_utc());
543
544 Some(receipts)
545 }
546
547 pub fn accept_join_project_request(
548 &mut self,
549 responder_connection_id: ConnectionId,
550 requester_id: UserId,
551 project_id: ProjectId,
552 ) -> Option<(Vec<(Receipt<proto::JoinProject>, ReplicaId)>, &Project)> {
553 let project = self.projects.get_mut(&project_id)?;
554 if responder_connection_id != project.host_connection_id {
555 return None;
556 }
557
558 let receipts = project.join_requests.remove(&requester_id)?;
559 let mut receipts_with_replica_ids = Vec::new();
560 for receipt in receipts {
561 let requester_connection = self.connections.get_mut(&receipt.sender_id)?;
562 requester_connection.requested_projects.remove(&project_id);
563 requester_connection.projects.insert(project_id);
564 let mut replica_id = 1;
565 while project.active_replica_ids.contains(&replica_id) {
566 replica_id += 1;
567 }
568 project.active_replica_ids.insert(replica_id);
569 project.guests.insert(
570 receipt.sender_id,
571 Collaborator {
572 replica_id,
573 user_id: requester_id,
574 last_activity: Some(OffsetDateTime::now_utc()),
575 admin: requester_connection.admin,
576 },
577 );
578 receipts_with_replica_ids.push((receipt, replica_id));
579 }
580
581 project.host.last_activity = Some(OffsetDateTime::now_utc());
582 Some((receipts_with_replica_ids, project))
583 }
584
585 pub fn leave_project(
586 &mut self,
587 connection_id: ConnectionId,
588 project_id: ProjectId,
589 ) -> Result<LeftProject> {
590 let user_id = self.user_id_for_connection(connection_id)?;
591 let project = self
592 .projects
593 .get_mut(&project_id)
594 .ok_or_else(|| anyhow!("no such project"))?;
595
596 // If the connection leaving the project is a collaborator, remove it.
597 let remove_collaborator = if let Some(guest) = project.guests.remove(&connection_id) {
598 project.active_replica_ids.remove(&guest.replica_id);
599 true
600 } else {
601 false
602 };
603
604 // If the connection leaving the project has a pending request, remove it.
605 // If that user has no other pending requests on other connections, indicate that the request should be cancelled.
606 let mut cancel_request = None;
607 if let Entry::Occupied(mut entry) = project.join_requests.entry(user_id) {
608 entry
609 .get_mut()
610 .retain(|receipt| receipt.sender_id != connection_id);
611 if entry.get().is_empty() {
612 entry.remove();
613 cancel_request = Some(user_id);
614 }
615 }
616
617 if let Some(connection) = self.connections.get_mut(&connection_id) {
618 connection.projects.remove(&project_id);
619 }
620
621 let connection_ids = project.connection_ids();
622 let unshare = connection_ids.len() <= 1 && project.join_requests.is_empty();
623 if unshare {
624 project.language_servers.clear();
625 for worktree in project.worktrees.values_mut() {
626 worktree.diagnostic_summaries.clear();
627 worktree.entries.clear();
628 }
629 }
630
631 Ok(LeftProject {
632 host_connection_id: project.host_connection_id,
633 host_user_id: project.host.user_id,
634 connection_ids,
635 cancel_request,
636 unshare,
637 remove_collaborator,
638 })
639 }
640
641 pub fn update_worktree(
642 &mut self,
643 connection_id: ConnectionId,
644 project_id: ProjectId,
645 worktree_id: u64,
646 worktree_root_name: &str,
647 removed_entries: &[u64],
648 updated_entries: &[proto::Entry],
649 scan_id: u64,
650 is_last_update: bool,
651 ) -> Result<(Vec<ConnectionId>, bool)> {
652 let project = self.write_project(project_id, connection_id)?;
653 if !project.online {
654 return Err(anyhow!("project is not online"));
655 }
656
657 let connection_ids = project.connection_ids();
658 let mut worktree = project.worktrees.entry(worktree_id).or_default();
659 let metadata_changed = worktree_root_name != worktree.root_name;
660 worktree.root_name = worktree_root_name.to_string();
661
662 for entry_id in removed_entries {
663 worktree.entries.remove(&entry_id);
664 }
665
666 for entry in updated_entries {
667 worktree.entries.insert(entry.id, entry.clone());
668 }
669
670 worktree.scan_id = scan_id;
671 worktree.is_complete = is_last_update;
672 Ok((connection_ids, metadata_changed))
673 }
674
675 pub fn project_connection_ids(
676 &self,
677 project_id: ProjectId,
678 acting_connection_id: ConnectionId,
679 ) -> Result<Vec<ConnectionId>> {
680 Ok(self
681 .read_project(project_id, acting_connection_id)?
682 .connection_ids())
683 }
684
685 pub fn channel_connection_ids(&self, channel_id: ChannelId) -> Result<Vec<ConnectionId>> {
686 Ok(self
687 .channels
688 .get(&channel_id)
689 .ok_or_else(|| anyhow!("no such channel"))?
690 .connection_ids())
691 }
692
693 pub fn project(&self, project_id: ProjectId) -> Result<&Project> {
694 self.projects
695 .get(&project_id)
696 .ok_or_else(|| anyhow!("no such project"))
697 }
698
699 pub fn register_project_activity(
700 &mut self,
701 project_id: ProjectId,
702 connection_id: ConnectionId,
703 ) -> Result<()> {
704 let project = self
705 .projects
706 .get_mut(&project_id)
707 .ok_or_else(|| anyhow!("no such project"))?;
708 let collaborator = if connection_id == project.host_connection_id {
709 &mut project.host
710 } else if let Some(guest) = project.guests.get_mut(&connection_id) {
711 guest
712 } else {
713 return Err(anyhow!("no such project"))?;
714 };
715 collaborator.last_activity = Some(OffsetDateTime::now_utc());
716 Ok(())
717 }
718
719 pub fn projects(&self) -> impl Iterator<Item = (&ProjectId, &Project)> {
720 self.projects.iter()
721 }
722
723 pub fn read_project(
724 &self,
725 project_id: ProjectId,
726 connection_id: ConnectionId,
727 ) -> Result<&Project> {
728 let project = self
729 .projects
730 .get(&project_id)
731 .ok_or_else(|| anyhow!("no such project"))?;
732 if project.host_connection_id == connection_id
733 || project.guests.contains_key(&connection_id)
734 {
735 Ok(project)
736 } else {
737 Err(anyhow!("no such project"))?
738 }
739 }
740
741 fn write_project(
742 &mut self,
743 project_id: ProjectId,
744 connection_id: ConnectionId,
745 ) -> Result<&mut Project> {
746 let project = self
747 .projects
748 .get_mut(&project_id)
749 .ok_or_else(|| anyhow!("no such project"))?;
750 if project.host_connection_id == connection_id
751 || project.guests.contains_key(&connection_id)
752 {
753 Ok(project)
754 } else {
755 Err(anyhow!("no such project"))?
756 }
757 }
758
759 #[cfg(test)]
760 pub fn check_invariants(&self) {
761 for (connection_id, connection) in &self.connections {
762 for project_id in &connection.projects {
763 let project = &self.projects.get(&project_id).unwrap();
764 if project.host_connection_id != *connection_id {
765 assert!(project.guests.contains_key(connection_id));
766 }
767
768 for (worktree_id, worktree) in project.worktrees.iter() {
769 let mut paths = HashMap::default();
770 for entry in worktree.entries.values() {
771 let prev_entry = paths.insert(&entry.path, entry);
772 assert_eq!(
773 prev_entry,
774 None,
775 "worktree {:?}, duplicate path for entries {:?} and {:?}",
776 worktree_id,
777 prev_entry.unwrap(),
778 entry
779 );
780 }
781 }
782 }
783 for channel_id in &connection.channels {
784 let channel = self.channels.get(channel_id).unwrap();
785 assert!(channel.connection_ids.contains(connection_id));
786 }
787 assert!(self
788 .connections_by_user_id
789 .get(&connection.user_id)
790 .unwrap()
791 .contains(connection_id));
792 }
793
794 for (user_id, connection_ids) in &self.connections_by_user_id {
795 for connection_id in connection_ids {
796 assert_eq!(
797 self.connections.get(connection_id).unwrap().user_id,
798 *user_id
799 );
800 }
801 }
802
803 for (project_id, project) in &self.projects {
804 let host_connection = self.connections.get(&project.host_connection_id).unwrap();
805 assert!(host_connection.projects.contains(project_id));
806
807 for guest_connection_id in project.guests.keys() {
808 let guest_connection = self.connections.get(guest_connection_id).unwrap();
809 assert!(guest_connection.projects.contains(project_id));
810 }
811 assert_eq!(project.active_replica_ids.len(), project.guests.len(),);
812 assert_eq!(
813 project.active_replica_ids,
814 project
815 .guests
816 .values()
817 .map(|guest| guest.replica_id)
818 .collect::<HashSet<_>>(),
819 );
820 }
821
822 for (channel_id, channel) in &self.channels {
823 for connection_id in &channel.connection_ids {
824 let connection = self.connections.get(connection_id).unwrap();
825 assert!(connection.channels.contains(channel_id));
826 }
827 }
828 }
829}
830
831impl Project {
832 fn is_active_since(&self, start_time: OffsetDateTime) -> bool {
833 self.guests
834 .values()
835 .chain([&self.host])
836 .any(|collaborator| {
837 collaborator
838 .last_activity
839 .map_or(false, |active_time| active_time > start_time)
840 })
841 }
842
843 pub fn guest_connection_ids(&self) -> Vec<ConnectionId> {
844 self.guests.keys().copied().collect()
845 }
846
847 pub fn connection_ids(&self) -> Vec<ConnectionId> {
848 self.guests
849 .keys()
850 .copied()
851 .chain(Some(self.host_connection_id))
852 .collect()
853 }
854}
855
856impl Channel {
857 fn connection_ids(&self) -> Vec<ConnectionId> {
858 self.connection_ids.iter().copied().collect()
859 }
860}