1use crate::db::{self, ChannelId, ProjectId, UserId};
2use anyhow::{anyhow, Result};
3use collections::{btree_map, hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet};
4use rpc::{proto, ConnectionId, Receipt};
5use serde::Serialize;
6use std::{mem, path::PathBuf, str, time::Duration};
7use time::OffsetDateTime;
8use tracing::instrument;
9
10#[derive(Default, Serialize)]
11pub struct Store {
12 connections: BTreeMap<ConnectionId, ConnectionState>,
13 connections_by_user_id: BTreeMap<UserId, HashSet<ConnectionId>>,
14 projects: BTreeMap<ProjectId, Project>,
15 #[serde(skip)]
16 channels: BTreeMap<ChannelId, Channel>,
17}
18
19#[derive(Serialize)]
20struct ConnectionState {
21 user_id: UserId,
22 admin: bool,
23 projects: BTreeSet<ProjectId>,
24 requested_projects: HashSet<ProjectId>,
25 channels: HashSet<ChannelId>,
26}
27
28#[derive(Serialize)]
29pub struct Project {
30 pub online: bool,
31 pub host_connection_id: ConnectionId,
32 pub host: Collaborator,
33 pub guests: HashMap<ConnectionId, Collaborator>,
34 #[serde(skip)]
35 pub join_requests: HashMap<UserId, Vec<Receipt<proto::JoinProject>>>,
36 pub active_replica_ids: HashSet<ReplicaId>,
37 pub worktrees: BTreeMap<u64, Worktree>,
38 pub language_servers: Vec<proto::LanguageServer>,
39}
40
41#[derive(Serialize)]
42pub struct Collaborator {
43 pub replica_id: ReplicaId,
44 pub user_id: UserId,
45 #[serde(skip)]
46 pub last_activity: Option<OffsetDateTime>,
47 pub admin: bool,
48}
49
50#[derive(Default, Serialize)]
51pub struct Worktree {
52 pub root_name: String,
53 pub visible: bool,
54 #[serde(skip)]
55 pub entries: BTreeMap<u64, proto::Entry>,
56 #[serde(skip)]
57 pub diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
58 pub scan_id: u64,
59 pub is_complete: bool,
60}
61
62#[derive(Default)]
63pub struct Channel {
64 pub connection_ids: HashSet<ConnectionId>,
65}
66
67pub type ReplicaId = u16;
68
69#[derive(Default)]
70pub struct RemovedConnectionState {
71 pub user_id: UserId,
72 pub hosted_projects: HashMap<ProjectId, Project>,
73 pub guest_project_ids: HashSet<ProjectId>,
74 pub contact_ids: HashSet<UserId>,
75}
76
77pub struct LeftProject {
78 pub host_user_id: UserId,
79 pub host_connection_id: ConnectionId,
80 pub connection_ids: Vec<ConnectionId>,
81 pub remove_collaborator: bool,
82 pub cancel_request: Option<UserId>,
83 pub unshare: bool,
84}
85
86pub struct UnsharedProject {
87 pub guests: HashMap<ConnectionId, Collaborator>,
88 pub pending_join_requests: HashMap<UserId, Vec<Receipt<proto::JoinProject>>>,
89}
90
91#[derive(Copy, Clone)]
92pub struct Metrics {
93 pub connections: usize,
94 pub registered_projects: usize,
95 pub active_projects: usize,
96 pub shared_projects: usize,
97}
98
99impl Store {
100 pub fn metrics(&self) -> Metrics {
101 const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60);
102 let active_window_start = OffsetDateTime::now_utc() - ACTIVE_PROJECT_TIMEOUT;
103
104 let connections = self.connections.values().filter(|c| !c.admin).count();
105 let mut registered_projects = 0;
106 let mut active_projects = 0;
107 let mut shared_projects = 0;
108 for project in self.projects.values() {
109 if let Some(connection) = self.connections.get(&project.host_connection_id) {
110 if !connection.admin {
111 registered_projects += 1;
112 if project.is_active_since(active_window_start) {
113 active_projects += 1;
114 if !project.guests.is_empty() {
115 shared_projects += 1;
116 }
117 }
118 }
119 }
120 }
121
122 Metrics {
123 connections,
124 registered_projects,
125 active_projects,
126 shared_projects,
127 }
128 }
129
130 #[instrument(skip(self))]
131 pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId, admin: bool) {
132 self.connections.insert(
133 connection_id,
134 ConnectionState {
135 user_id,
136 admin,
137 projects: Default::default(),
138 requested_projects: Default::default(),
139 channels: Default::default(),
140 },
141 );
142 self.connections_by_user_id
143 .entry(user_id)
144 .or_default()
145 .insert(connection_id);
146 }
147
148 #[instrument(skip(self))]
149 pub fn remove_connection(
150 &mut self,
151 connection_id: ConnectionId,
152 ) -> Result<RemovedConnectionState> {
153 let connection = self
154 .connections
155 .get_mut(&connection_id)
156 .ok_or_else(|| anyhow!("no such connection"))?;
157
158 let user_id = connection.user_id;
159 let connection_projects = mem::take(&mut connection.projects);
160 let connection_channels = mem::take(&mut connection.channels);
161
162 let mut result = RemovedConnectionState {
163 user_id,
164 ..Default::default()
165 };
166
167 // Leave all channels.
168 for channel_id in connection_channels {
169 self.leave_channel(connection_id, channel_id);
170 }
171
172 // Unregister and leave all projects.
173 for project_id in connection_projects {
174 if let Ok(project) = self.unregister_project(project_id, connection_id) {
175 result.hosted_projects.insert(project_id, project);
176 } else if self.leave_project(connection_id, project_id).is_ok() {
177 result.guest_project_ids.insert(project_id);
178 }
179 }
180
181 let user_connections = self.connections_by_user_id.get_mut(&user_id).unwrap();
182 user_connections.remove(&connection_id);
183 if user_connections.is_empty() {
184 self.connections_by_user_id.remove(&user_id);
185 }
186
187 self.connections.remove(&connection_id).unwrap();
188
189 Ok(result)
190 }
191
192 #[cfg(test)]
193 pub fn channel(&self, id: ChannelId) -> Option<&Channel> {
194 self.channels.get(&id)
195 }
196
197 pub fn join_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
198 if let Some(connection) = self.connections.get_mut(&connection_id) {
199 connection.channels.insert(channel_id);
200 self.channels
201 .entry(channel_id)
202 .or_default()
203 .connection_ids
204 .insert(connection_id);
205 }
206 }
207
208 pub fn leave_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
209 if let Some(connection) = self.connections.get_mut(&connection_id) {
210 connection.channels.remove(&channel_id);
211 if let btree_map::Entry::Occupied(mut entry) = self.channels.entry(channel_id) {
212 entry.get_mut().connection_ids.remove(&connection_id);
213 if entry.get_mut().connection_ids.is_empty() {
214 entry.remove();
215 }
216 }
217 }
218 }
219
220 pub fn user_id_for_connection(&self, connection_id: ConnectionId) -> Result<UserId> {
221 Ok(self
222 .connections
223 .get(&connection_id)
224 .ok_or_else(|| anyhow!("unknown connection"))?
225 .user_id)
226 }
227
228 pub fn connection_ids_for_user(
229 &self,
230 user_id: UserId,
231 ) -> impl Iterator<Item = ConnectionId> + '_ {
232 self.connections_by_user_id
233 .get(&user_id)
234 .into_iter()
235 .flatten()
236 .copied()
237 }
238
239 pub fn is_user_online(&self, user_id: UserId) -> bool {
240 !self
241 .connections_by_user_id
242 .get(&user_id)
243 .unwrap_or(&Default::default())
244 .is_empty()
245 }
246
247 pub fn build_initial_contacts_update(
248 &self,
249 contacts: Vec<db::Contact>,
250 ) -> proto::UpdateContacts {
251 let mut update = proto::UpdateContacts::default();
252
253 for contact in contacts {
254 match contact {
255 db::Contact::Accepted {
256 user_id,
257 should_notify,
258 } => {
259 update
260 .contacts
261 .push(self.contact_for_user(user_id, should_notify));
262 }
263 db::Contact::Outgoing { user_id } => {
264 update.outgoing_requests.push(user_id.to_proto())
265 }
266 db::Contact::Incoming {
267 user_id,
268 should_notify,
269 } => update
270 .incoming_requests
271 .push(proto::IncomingContactRequest {
272 requester_id: user_id.to_proto(),
273 should_notify,
274 }),
275 }
276 }
277
278 update
279 }
280
281 pub fn contact_for_user(&self, user_id: UserId, should_notify: bool) -> proto::Contact {
282 proto::Contact {
283 user_id: user_id.to_proto(),
284 projects: self.project_metadata_for_user(user_id),
285 online: self.is_user_online(user_id),
286 should_notify,
287 }
288 }
289
290 pub fn project_metadata_for_user(&self, user_id: UserId) -> Vec<proto::ProjectMetadata> {
291 let connection_ids = self.connections_by_user_id.get(&user_id);
292 let project_ids = connection_ids.iter().flat_map(|connection_ids| {
293 connection_ids
294 .iter()
295 .filter_map(|connection_id| self.connections.get(connection_id))
296 .flat_map(|connection| connection.projects.iter().copied())
297 });
298
299 let mut metadata = Vec::new();
300 for project_id in project_ids {
301 if let Some(project) = self.projects.get(&project_id) {
302 if project.host.user_id == user_id && project.online {
303 metadata.push(proto::ProjectMetadata {
304 id: project_id.to_proto(),
305 visible_worktree_root_names: project
306 .worktrees
307 .values()
308 .filter(|worktree| worktree.visible)
309 .map(|worktree| worktree.root_name.clone())
310 .collect(),
311 guests: project
312 .guests
313 .values()
314 .map(|guest| guest.user_id.to_proto())
315 .collect(),
316 });
317 }
318 }
319 }
320
321 metadata
322 }
323
324 pub fn register_project(
325 &mut self,
326 host_connection_id: ConnectionId,
327 project_id: ProjectId,
328 online: bool,
329 ) -> Result<()> {
330 let connection = self
331 .connections
332 .get_mut(&host_connection_id)
333 .ok_or_else(|| anyhow!("no such connection"))?;
334 connection.projects.insert(project_id);
335 self.projects.insert(
336 project_id,
337 Project {
338 online,
339 host_connection_id,
340 host: Collaborator {
341 user_id: connection.user_id,
342 replica_id: 0,
343 last_activity: None,
344 admin: connection.admin,
345 },
346 guests: Default::default(),
347 join_requests: Default::default(),
348 active_replica_ids: Default::default(),
349 worktrees: Default::default(),
350 language_servers: Default::default(),
351 },
352 );
353 Ok(())
354 }
355
356 pub fn update_project(
357 &mut self,
358 project_id: ProjectId,
359 worktrees: &[proto::WorktreeMetadata],
360 online: bool,
361 connection_id: ConnectionId,
362 ) -> Result<Option<UnsharedProject>> {
363 let project = self
364 .projects
365 .get_mut(&project_id)
366 .ok_or_else(|| anyhow!("no such project"))?;
367 if project.host_connection_id == connection_id {
368 let mut old_worktrees = mem::take(&mut project.worktrees);
369 for worktree in worktrees {
370 if let Some(old_worktree) = old_worktrees.remove(&worktree.id) {
371 project.worktrees.insert(worktree.id, old_worktree);
372 } else {
373 project.worktrees.insert(
374 worktree.id,
375 Worktree {
376 root_name: worktree.root_name.clone(),
377 visible: worktree.visible,
378 ..Default::default()
379 },
380 );
381 }
382 }
383
384 if online != project.online {
385 project.online = online;
386 if project.online {
387 Ok(None)
388 } else {
389 for connection_id in project.guest_connection_ids() {
390 if let Some(connection) = self.connections.get_mut(&connection_id) {
391 connection.projects.remove(&project_id);
392 }
393 }
394
395 project.active_replica_ids.clear();
396 project.language_servers.clear();
397 for worktree in project.worktrees.values_mut() {
398 worktree.diagnostic_summaries.clear();
399 worktree.entries.clear();
400 }
401
402 Ok(Some(UnsharedProject {
403 guests: mem::take(&mut project.guests),
404 pending_join_requests: mem::take(&mut project.join_requests),
405 }))
406 }
407 } else {
408 Ok(None)
409 }
410 } else {
411 Err(anyhow!("no such project"))?
412 }
413 }
414
415 pub fn unregister_project(
416 &mut self,
417 project_id: ProjectId,
418 connection_id: ConnectionId,
419 ) -> Result<Project> {
420 match self.projects.entry(project_id) {
421 btree_map::Entry::Occupied(e) => {
422 if e.get().host_connection_id == connection_id {
423 let project = e.remove();
424
425 if let Some(host_connection) = self.connections.get_mut(&connection_id) {
426 host_connection.projects.remove(&project_id);
427 }
428
429 for guest_connection in project.guests.keys() {
430 if let Some(connection) = self.connections.get_mut(guest_connection) {
431 connection.projects.remove(&project_id);
432 }
433 }
434
435 for requester_user_id in project.join_requests.keys() {
436 if let Some(requester_connection_ids) =
437 self.connections_by_user_id.get_mut(requester_user_id)
438 {
439 for requester_connection_id in requester_connection_ids.iter() {
440 if let Some(requester_connection) =
441 self.connections.get_mut(requester_connection_id)
442 {
443 requester_connection.requested_projects.remove(&project_id);
444 }
445 }
446 }
447 }
448
449 Ok(project)
450 } else {
451 Err(anyhow!("no such project"))?
452 }
453 }
454 btree_map::Entry::Vacant(_) => Err(anyhow!("no such project"))?,
455 }
456 }
457
458 pub fn update_diagnostic_summary(
459 &mut self,
460 project_id: ProjectId,
461 worktree_id: u64,
462 connection_id: ConnectionId,
463 summary: proto::DiagnosticSummary,
464 ) -> Result<Vec<ConnectionId>> {
465 let project = self
466 .projects
467 .get_mut(&project_id)
468 .ok_or_else(|| anyhow!("no such project"))?;
469 if project.host_connection_id == connection_id {
470 let worktree = project
471 .worktrees
472 .get_mut(&worktree_id)
473 .ok_or_else(|| anyhow!("no such worktree"))?;
474 worktree
475 .diagnostic_summaries
476 .insert(summary.path.clone().into(), summary);
477 return Ok(project.connection_ids());
478 }
479
480 Err(anyhow!("no such worktree"))?
481 }
482
483 pub fn start_language_server(
484 &mut self,
485 project_id: ProjectId,
486 connection_id: ConnectionId,
487 language_server: proto::LanguageServer,
488 ) -> Result<Vec<ConnectionId>> {
489 let project = self
490 .projects
491 .get_mut(&project_id)
492 .ok_or_else(|| anyhow!("no such project"))?;
493 if project.host_connection_id == connection_id {
494 project.language_servers.push(language_server);
495 return Ok(project.connection_ids());
496 }
497
498 Err(anyhow!("no such project"))?
499 }
500
501 pub fn request_join_project(
502 &mut self,
503 requester_id: UserId,
504 project_id: ProjectId,
505 receipt: Receipt<proto::JoinProject>,
506 ) -> Result<()> {
507 let connection = self
508 .connections
509 .get_mut(&receipt.sender_id)
510 .ok_or_else(|| anyhow!("no such connection"))?;
511 let project = self
512 .projects
513 .get_mut(&project_id)
514 .ok_or_else(|| anyhow!("no such project"))?;
515 if project.online {
516 connection.requested_projects.insert(project_id);
517 project
518 .join_requests
519 .entry(requester_id)
520 .or_default()
521 .push(receipt);
522 Ok(())
523 } else {
524 Err(anyhow!("no such project"))
525 }
526 }
527
528 pub fn deny_join_project_request(
529 &mut self,
530 responder_connection_id: ConnectionId,
531 requester_id: UserId,
532 project_id: ProjectId,
533 ) -> Option<Vec<Receipt<proto::JoinProject>>> {
534 let project = self.projects.get_mut(&project_id)?;
535 if responder_connection_id != project.host_connection_id {
536 return None;
537 }
538
539 let receipts = project.join_requests.remove(&requester_id)?;
540 for receipt in &receipts {
541 let requester_connection = self.connections.get_mut(&receipt.sender_id)?;
542 requester_connection.requested_projects.remove(&project_id);
543 }
544 project.host.last_activity = Some(OffsetDateTime::now_utc());
545
546 Some(receipts)
547 }
548
549 #[allow(clippy::type_complexity)]
550 pub fn accept_join_project_request(
551 &mut self,
552 responder_connection_id: ConnectionId,
553 requester_id: UserId,
554 project_id: ProjectId,
555 ) -> Option<(Vec<(Receipt<proto::JoinProject>, ReplicaId)>, &Project)> {
556 let project = self.projects.get_mut(&project_id)?;
557 if responder_connection_id != project.host_connection_id {
558 return None;
559 }
560
561 let receipts = project.join_requests.remove(&requester_id)?;
562 let mut receipts_with_replica_ids = Vec::new();
563 for receipt in receipts {
564 let requester_connection = self.connections.get_mut(&receipt.sender_id)?;
565 requester_connection.requested_projects.remove(&project_id);
566 requester_connection.projects.insert(project_id);
567 let mut replica_id = 1;
568 while project.active_replica_ids.contains(&replica_id) {
569 replica_id += 1;
570 }
571 project.active_replica_ids.insert(replica_id);
572 project.guests.insert(
573 receipt.sender_id,
574 Collaborator {
575 replica_id,
576 user_id: requester_id,
577 last_activity: Some(OffsetDateTime::now_utc()),
578 admin: requester_connection.admin,
579 },
580 );
581 receipts_with_replica_ids.push((receipt, replica_id));
582 }
583
584 project.host.last_activity = Some(OffsetDateTime::now_utc());
585 Some((receipts_with_replica_ids, project))
586 }
587
588 pub fn leave_project(
589 &mut self,
590 connection_id: ConnectionId,
591 project_id: ProjectId,
592 ) -> Result<LeftProject> {
593 let user_id = self.user_id_for_connection(connection_id)?;
594 let project = self
595 .projects
596 .get_mut(&project_id)
597 .ok_or_else(|| anyhow!("no such project"))?;
598
599 // If the connection leaving the project is a collaborator, remove it.
600 let remove_collaborator = if let Some(guest) = project.guests.remove(&connection_id) {
601 project.active_replica_ids.remove(&guest.replica_id);
602 true
603 } else {
604 false
605 };
606
607 // If the connection leaving the project has a pending request, remove it.
608 // If that user has no other pending requests on other connections, indicate that the request should be cancelled.
609 let mut cancel_request = None;
610 if let Entry::Occupied(mut entry) = project.join_requests.entry(user_id) {
611 entry
612 .get_mut()
613 .retain(|receipt| receipt.sender_id != connection_id);
614 if entry.get().is_empty() {
615 entry.remove();
616 cancel_request = Some(user_id);
617 }
618 }
619
620 if let Some(connection) = self.connections.get_mut(&connection_id) {
621 connection.projects.remove(&project_id);
622 }
623
624 let connection_ids = project.connection_ids();
625 let unshare = connection_ids.len() <= 1 && project.join_requests.is_empty();
626 if unshare {
627 project.language_servers.clear();
628 for worktree in project.worktrees.values_mut() {
629 worktree.diagnostic_summaries.clear();
630 worktree.entries.clear();
631 }
632 }
633
634 Ok(LeftProject {
635 host_connection_id: project.host_connection_id,
636 host_user_id: project.host.user_id,
637 connection_ids,
638 cancel_request,
639 unshare,
640 remove_collaborator,
641 })
642 }
643
644 #[allow(clippy::too_many_arguments)]
645 pub fn update_worktree(
646 &mut self,
647 connection_id: ConnectionId,
648 project_id: ProjectId,
649 worktree_id: u64,
650 worktree_root_name: &str,
651 removed_entries: &[u64],
652 updated_entries: &[proto::Entry],
653 scan_id: u64,
654 is_last_update: bool,
655 ) -> Result<(Vec<ConnectionId>, bool)> {
656 let project = self.write_project(project_id, connection_id)?;
657 if !project.online {
658 return Err(anyhow!("project is not online"));
659 }
660
661 let connection_ids = project.connection_ids();
662 let mut worktree = project.worktrees.entry(worktree_id).or_default();
663 let metadata_changed = worktree_root_name != worktree.root_name;
664 worktree.root_name = worktree_root_name.to_string();
665
666 for entry_id in removed_entries {
667 worktree.entries.remove(entry_id);
668 }
669
670 for entry in updated_entries {
671 worktree.entries.insert(entry.id, entry.clone());
672 }
673
674 worktree.scan_id = scan_id;
675 worktree.is_complete = is_last_update;
676 Ok((connection_ids, metadata_changed))
677 }
678
679 pub fn project_connection_ids(
680 &self,
681 project_id: ProjectId,
682 acting_connection_id: ConnectionId,
683 ) -> Result<Vec<ConnectionId>> {
684 Ok(self
685 .read_project(project_id, acting_connection_id)?
686 .connection_ids())
687 }
688
689 pub fn channel_connection_ids(&self, channel_id: ChannelId) -> Result<Vec<ConnectionId>> {
690 Ok(self
691 .channels
692 .get(&channel_id)
693 .ok_or_else(|| anyhow!("no such channel"))?
694 .connection_ids())
695 }
696
697 pub fn project(&self, project_id: ProjectId) -> Result<&Project> {
698 self.projects
699 .get(&project_id)
700 .ok_or_else(|| anyhow!("no such project"))
701 }
702
703 pub fn register_project_activity(
704 &mut self,
705 project_id: ProjectId,
706 connection_id: ConnectionId,
707 ) -> Result<()> {
708 let project = self
709 .projects
710 .get_mut(&project_id)
711 .ok_or_else(|| anyhow!("no such project"))?;
712 let collaborator = if connection_id == project.host_connection_id {
713 &mut project.host
714 } else if let Some(guest) = project.guests.get_mut(&connection_id) {
715 guest
716 } else {
717 return Err(anyhow!("no such project"))?;
718 };
719 collaborator.last_activity = Some(OffsetDateTime::now_utc());
720 Ok(())
721 }
722
723 pub fn projects(&self) -> impl Iterator<Item = (&ProjectId, &Project)> {
724 self.projects.iter()
725 }
726
727 pub fn read_project(
728 &self,
729 project_id: ProjectId,
730 connection_id: ConnectionId,
731 ) -> Result<&Project> {
732 let project = self
733 .projects
734 .get(&project_id)
735 .ok_or_else(|| anyhow!("no such project"))?;
736 if project.host_connection_id == connection_id
737 || project.guests.contains_key(&connection_id)
738 {
739 Ok(project)
740 } else {
741 Err(anyhow!("no such project"))?
742 }
743 }
744
745 fn write_project(
746 &mut self,
747 project_id: ProjectId,
748 connection_id: ConnectionId,
749 ) -> Result<&mut Project> {
750 let project = self
751 .projects
752 .get_mut(&project_id)
753 .ok_or_else(|| anyhow!("no such project"))?;
754 if project.host_connection_id == connection_id
755 || project.guests.contains_key(&connection_id)
756 {
757 Ok(project)
758 } else {
759 Err(anyhow!("no such project"))?
760 }
761 }
762
763 #[cfg(test)]
764 pub fn check_invariants(&self) {
765 for (connection_id, connection) in &self.connections {
766 for project_id in &connection.projects {
767 let project = &self.projects.get(project_id).unwrap();
768 if project.host_connection_id != *connection_id {
769 assert!(project.guests.contains_key(connection_id));
770 }
771
772 for (worktree_id, worktree) in project.worktrees.iter() {
773 let mut paths = HashMap::default();
774 for entry in worktree.entries.values() {
775 let prev_entry = paths.insert(&entry.path, entry);
776 assert_eq!(
777 prev_entry,
778 None,
779 "worktree {:?}, duplicate path for entries {:?} and {:?}",
780 worktree_id,
781 prev_entry.unwrap(),
782 entry
783 );
784 }
785 }
786 }
787 for channel_id in &connection.channels {
788 let channel = self.channels.get(channel_id).unwrap();
789 assert!(channel.connection_ids.contains(connection_id));
790 }
791 assert!(self
792 .connections_by_user_id
793 .get(&connection.user_id)
794 .unwrap()
795 .contains(connection_id));
796 }
797
798 for (user_id, connection_ids) in &self.connections_by_user_id {
799 for connection_id in connection_ids {
800 assert_eq!(
801 self.connections.get(connection_id).unwrap().user_id,
802 *user_id
803 );
804 }
805 }
806
807 for (project_id, project) in &self.projects {
808 let host_connection = self.connections.get(&project.host_connection_id).unwrap();
809 assert!(host_connection.projects.contains(project_id));
810
811 for guest_connection_id in project.guests.keys() {
812 let guest_connection = self.connections.get(guest_connection_id).unwrap();
813 assert!(guest_connection.projects.contains(project_id));
814 }
815 assert_eq!(project.active_replica_ids.len(), project.guests.len(),);
816 assert_eq!(
817 project.active_replica_ids,
818 project
819 .guests
820 .values()
821 .map(|guest| guest.replica_id)
822 .collect::<HashSet<_>>(),
823 );
824 }
825
826 for (channel_id, channel) in &self.channels {
827 for connection_id in &channel.connection_ids {
828 let connection = self.connections.get(connection_id).unwrap();
829 assert!(connection.channels.contains(channel_id));
830 }
831 }
832 }
833}
834
835impl Project {
836 fn is_active_since(&self, start_time: OffsetDateTime) -> bool {
837 self.guests
838 .values()
839 .chain([&self.host])
840 .any(|collaborator| {
841 collaborator
842 .last_activity
843 .map_or(false, |active_time| active_time > start_time)
844 })
845 }
846
847 pub fn guest_connection_ids(&self) -> Vec<ConnectionId> {
848 self.guests.keys().copied().collect()
849 }
850
851 pub fn connection_ids(&self) -> Vec<ConnectionId> {
852 self.guests
853 .keys()
854 .copied()
855 .chain(Some(self.host_connection_id))
856 .collect()
857 }
858}
859
860impl Channel {
861 fn connection_ids(&self) -> Vec<ConnectionId> {
862 self.connection_ids.iter().copied().collect()
863 }
864}