1use crate::db::{self, ChannelId, ProjectId, UserId};
2use anyhow::{anyhow, Result};
3use collections::{btree_map, hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet};
4use rpc::{proto, ConnectionId, Receipt};
5use serde::Serialize;
6use std::{
7 mem,
8 path::{Path, PathBuf},
9 str,
10 time::Duration,
11};
12use time::OffsetDateTime;
13use tracing::instrument;
14
15#[derive(Default, Serialize)]
16pub struct Store {
17 connections: BTreeMap<ConnectionId, ConnectionState>,
18 connections_by_user_id: BTreeMap<UserId, HashSet<ConnectionId>>,
19 projects: BTreeMap<ProjectId, Project>,
20 #[serde(skip)]
21 channels: BTreeMap<ChannelId, Channel>,
22}
23
24#[derive(Serialize)]
25struct ConnectionState {
26 user_id: UserId,
27 admin: bool,
28 projects: BTreeSet<ProjectId>,
29 requested_projects: HashSet<ProjectId>,
30 channels: HashSet<ChannelId>,
31}
32
33#[derive(Serialize)]
34pub struct Project {
35 pub online: bool,
36 pub host_connection_id: ConnectionId,
37 pub host: Collaborator,
38 pub guests: HashMap<ConnectionId, Collaborator>,
39 #[serde(skip)]
40 pub join_requests: HashMap<UserId, Vec<Receipt<proto::JoinProject>>>,
41 pub active_replica_ids: HashSet<ReplicaId>,
42 pub worktrees: BTreeMap<u64, Worktree>,
43 pub language_servers: Vec<proto::LanguageServer>,
44}
45
46#[derive(Serialize)]
47pub struct Collaborator {
48 pub replica_id: ReplicaId,
49 pub user_id: UserId,
50 #[serde(skip)]
51 pub last_activity: Option<OffsetDateTime>,
52 pub admin: bool,
53}
54
55#[derive(Default, Serialize)]
56pub struct Worktree {
57 pub root_name: String,
58 pub visible: bool,
59 #[serde(skip)]
60 pub entries: BTreeMap<u64, proto::Entry>,
61 #[serde(skip)]
62 pub extension_counts: HashMap<String, usize>,
63 #[serde(skip)]
64 pub diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
65 pub scan_id: u64,
66}
67
68#[derive(Default)]
69pub struct Channel {
70 pub connection_ids: HashSet<ConnectionId>,
71}
72
73pub type ReplicaId = u16;
74
75#[derive(Default)]
76pub struct RemovedConnectionState {
77 pub user_id: UserId,
78 pub hosted_projects: HashMap<ProjectId, Project>,
79 pub guest_project_ids: HashSet<ProjectId>,
80 pub contact_ids: HashSet<UserId>,
81}
82
83pub struct LeftProject {
84 pub host_user_id: UserId,
85 pub host_connection_id: ConnectionId,
86 pub connection_ids: Vec<ConnectionId>,
87 pub remove_collaborator: bool,
88 pub cancel_request: Option<UserId>,
89 pub unshare: bool,
90}
91
92pub struct UnsharedProject {
93 pub guests: HashMap<ConnectionId, Collaborator>,
94 pub pending_join_requests: HashMap<UserId, Vec<Receipt<proto::JoinProject>>>,
95}
96
97#[derive(Copy, Clone)]
98pub struct Metrics {
99 pub connections: usize,
100 pub registered_projects: usize,
101 pub active_projects: usize,
102 pub shared_projects: usize,
103}
104
105impl Store {
106 pub fn metrics(&self) -> Metrics {
107 const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60);
108 let active_window_start = OffsetDateTime::now_utc() - ACTIVE_PROJECT_TIMEOUT;
109
110 let connections = self.connections.values().filter(|c| !c.admin).count();
111 let mut registered_projects = 0;
112 let mut active_projects = 0;
113 let mut shared_projects = 0;
114 for project in self.projects.values() {
115 if let Some(connection) = self.connections.get(&project.host_connection_id) {
116 if !connection.admin {
117 registered_projects += 1;
118 if project.is_active_since(active_window_start) {
119 active_projects += 1;
120 if !project.guests.is_empty() {
121 shared_projects += 1;
122 }
123 }
124 }
125 }
126 }
127
128 Metrics {
129 connections,
130 registered_projects,
131 active_projects,
132 shared_projects,
133 }
134 }
135
136 #[instrument(skip(self))]
137 pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId, admin: bool) {
138 self.connections.insert(
139 connection_id,
140 ConnectionState {
141 user_id,
142 admin,
143 projects: Default::default(),
144 requested_projects: Default::default(),
145 channels: Default::default(),
146 },
147 );
148 self.connections_by_user_id
149 .entry(user_id)
150 .or_default()
151 .insert(connection_id);
152 }
153
154 #[instrument(skip(self))]
155 pub fn remove_connection(
156 &mut self,
157 connection_id: ConnectionId,
158 ) -> Result<RemovedConnectionState> {
159 let connection = self
160 .connections
161 .get_mut(&connection_id)
162 .ok_or_else(|| anyhow!("no such connection"))?;
163
164 let user_id = connection.user_id;
165 let connection_projects = mem::take(&mut connection.projects);
166 let connection_channels = mem::take(&mut connection.channels);
167
168 let mut result = RemovedConnectionState::default();
169 result.user_id = user_id;
170
171 // Leave all channels.
172 for channel_id in connection_channels {
173 self.leave_channel(connection_id, channel_id);
174 }
175
176 // Unregister and leave all projects.
177 for project_id in connection_projects {
178 if let Ok(project) = self.unregister_project(project_id, connection_id) {
179 result.hosted_projects.insert(project_id, project);
180 } else if self.leave_project(connection_id, project_id).is_ok() {
181 result.guest_project_ids.insert(project_id);
182 }
183 }
184
185 let user_connections = self.connections_by_user_id.get_mut(&user_id).unwrap();
186 user_connections.remove(&connection_id);
187 if user_connections.is_empty() {
188 self.connections_by_user_id.remove(&user_id);
189 }
190
191 self.connections.remove(&connection_id).unwrap();
192
193 Ok(result)
194 }
195
196 #[cfg(test)]
197 pub fn channel(&self, id: ChannelId) -> Option<&Channel> {
198 self.channels.get(&id)
199 }
200
201 pub fn join_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
202 if let Some(connection) = self.connections.get_mut(&connection_id) {
203 connection.channels.insert(channel_id);
204 self.channels
205 .entry(channel_id)
206 .or_default()
207 .connection_ids
208 .insert(connection_id);
209 }
210 }
211
212 pub fn leave_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
213 if let Some(connection) = self.connections.get_mut(&connection_id) {
214 connection.channels.remove(&channel_id);
215 if let btree_map::Entry::Occupied(mut entry) = self.channels.entry(channel_id) {
216 entry.get_mut().connection_ids.remove(&connection_id);
217 if entry.get_mut().connection_ids.is_empty() {
218 entry.remove();
219 }
220 }
221 }
222 }
223
224 pub fn user_id_for_connection(&self, connection_id: ConnectionId) -> Result<UserId> {
225 Ok(self
226 .connections
227 .get(&connection_id)
228 .ok_or_else(|| anyhow!("unknown connection"))?
229 .user_id)
230 }
231
232 pub fn connection_ids_for_user<'a>(
233 &'a self,
234 user_id: UserId,
235 ) -> impl 'a + Iterator<Item = ConnectionId> {
236 self.connections_by_user_id
237 .get(&user_id)
238 .into_iter()
239 .flatten()
240 .copied()
241 }
242
243 pub fn is_user_online(&self, user_id: UserId) -> bool {
244 !self
245 .connections_by_user_id
246 .get(&user_id)
247 .unwrap_or(&Default::default())
248 .is_empty()
249 }
250
251 pub fn build_initial_contacts_update(
252 &self,
253 contacts: Vec<db::Contact>,
254 ) -> proto::UpdateContacts {
255 let mut update = proto::UpdateContacts::default();
256
257 for contact in contacts {
258 match contact {
259 db::Contact::Accepted {
260 user_id,
261 should_notify,
262 } => {
263 update
264 .contacts
265 .push(self.contact_for_user(user_id, should_notify));
266 }
267 db::Contact::Outgoing { user_id } => {
268 update.outgoing_requests.push(user_id.to_proto())
269 }
270 db::Contact::Incoming {
271 user_id,
272 should_notify,
273 } => update
274 .incoming_requests
275 .push(proto::IncomingContactRequest {
276 requester_id: user_id.to_proto(),
277 should_notify,
278 }),
279 }
280 }
281
282 update
283 }
284
285 pub fn contact_for_user(&self, user_id: UserId, should_notify: bool) -> proto::Contact {
286 proto::Contact {
287 user_id: user_id.to_proto(),
288 projects: self.project_metadata_for_user(user_id),
289 online: self.is_user_online(user_id),
290 should_notify,
291 }
292 }
293
294 pub fn project_metadata_for_user(&self, user_id: UserId) -> Vec<proto::ProjectMetadata> {
295 let connection_ids = self.connections_by_user_id.get(&user_id);
296 let project_ids = connection_ids.iter().flat_map(|connection_ids| {
297 connection_ids
298 .iter()
299 .filter_map(|connection_id| self.connections.get(connection_id))
300 .flat_map(|connection| connection.projects.iter().copied())
301 });
302
303 let mut metadata = Vec::new();
304 for project_id in project_ids {
305 if let Some(project) = self.projects.get(&project_id) {
306 if project.host.user_id == user_id && project.online {
307 metadata.push(proto::ProjectMetadata {
308 id: project_id.to_proto(),
309 visible_worktree_root_names: project
310 .worktrees
311 .values()
312 .filter(|worktree| worktree.visible)
313 .map(|worktree| worktree.root_name.clone())
314 .collect(),
315 guests: project
316 .guests
317 .values()
318 .map(|guest| guest.user_id.to_proto())
319 .collect(),
320 });
321 }
322 }
323 }
324
325 metadata
326 }
327
328 pub fn register_project(
329 &mut self,
330 host_connection_id: ConnectionId,
331 project_id: ProjectId,
332 online: bool,
333 ) -> Result<()> {
334 let connection = self
335 .connections
336 .get_mut(&host_connection_id)
337 .ok_or_else(|| anyhow!("no such connection"))?;
338 connection.projects.insert(project_id);
339 self.projects.insert(
340 project_id,
341 Project {
342 online,
343 host_connection_id,
344 host: Collaborator {
345 user_id: connection.user_id,
346 replica_id: 0,
347 last_activity: None,
348 admin: connection.admin,
349 },
350 guests: Default::default(),
351 join_requests: Default::default(),
352 active_replica_ids: Default::default(),
353 worktrees: Default::default(),
354 language_servers: Default::default(),
355 },
356 );
357 Ok(())
358 }
359
360 pub fn update_project(
361 &mut self,
362 project_id: ProjectId,
363 worktrees: &[proto::WorktreeMetadata],
364 online: bool,
365 connection_id: ConnectionId,
366 ) -> Result<Option<UnsharedProject>> {
367 let project = self
368 .projects
369 .get_mut(&project_id)
370 .ok_or_else(|| anyhow!("no such project"))?;
371 if project.host_connection_id == connection_id {
372 let mut old_worktrees = mem::take(&mut project.worktrees);
373 for worktree in worktrees {
374 if let Some(old_worktree) = old_worktrees.remove(&worktree.id) {
375 project.worktrees.insert(worktree.id, old_worktree);
376 } else {
377 project.worktrees.insert(
378 worktree.id,
379 Worktree {
380 root_name: worktree.root_name.clone(),
381 visible: worktree.visible,
382 ..Default::default()
383 },
384 );
385 }
386 }
387
388 if online != project.online {
389 project.online = online;
390 if project.online {
391 Ok(None)
392 } else {
393 for connection_id in project.guest_connection_ids() {
394 if let Some(connection) = self.connections.get_mut(&connection_id) {
395 connection.projects.remove(&project_id);
396 }
397 }
398
399 project.active_replica_ids.clear();
400 project.language_servers.clear();
401 for worktree in project.worktrees.values_mut() {
402 worktree.diagnostic_summaries.clear();
403 worktree.entries.clear();
404 worktree.extension_counts.clear();
405 }
406
407 Ok(Some(UnsharedProject {
408 guests: mem::take(&mut project.guests),
409 pending_join_requests: mem::take(&mut project.join_requests),
410 }))
411 }
412 } else {
413 Ok(None)
414 }
415 } else {
416 Err(anyhow!("no such project"))?
417 }
418 }
419
420 pub fn unregister_project(
421 &mut self,
422 project_id: ProjectId,
423 connection_id: ConnectionId,
424 ) -> Result<Project> {
425 match self.projects.entry(project_id) {
426 btree_map::Entry::Occupied(e) => {
427 if e.get().host_connection_id == connection_id {
428 let project = e.remove();
429
430 if let Some(host_connection) = self.connections.get_mut(&connection_id) {
431 host_connection.projects.remove(&project_id);
432 }
433
434 for guest_connection in project.guests.keys() {
435 if let Some(connection) = self.connections.get_mut(&guest_connection) {
436 connection.projects.remove(&project_id);
437 }
438 }
439
440 for requester_user_id in project.join_requests.keys() {
441 if let Some(requester_connection_ids) =
442 self.connections_by_user_id.get_mut(&requester_user_id)
443 {
444 for requester_connection_id in requester_connection_ids.iter() {
445 if let Some(requester_connection) =
446 self.connections.get_mut(requester_connection_id)
447 {
448 requester_connection.requested_projects.remove(&project_id);
449 }
450 }
451 }
452 }
453
454 Ok(project)
455 } else {
456 Err(anyhow!("no such project"))?
457 }
458 }
459 btree_map::Entry::Vacant(_) => Err(anyhow!("no such project"))?,
460 }
461 }
462
463 pub fn update_diagnostic_summary(
464 &mut self,
465 project_id: ProjectId,
466 worktree_id: u64,
467 connection_id: ConnectionId,
468 summary: proto::DiagnosticSummary,
469 ) -> Result<Vec<ConnectionId>> {
470 let project = self
471 .projects
472 .get_mut(&project_id)
473 .ok_or_else(|| anyhow!("no such project"))?;
474 if project.host_connection_id == connection_id {
475 let worktree = project
476 .worktrees
477 .get_mut(&worktree_id)
478 .ok_or_else(|| anyhow!("no such worktree"))?;
479 worktree
480 .diagnostic_summaries
481 .insert(summary.path.clone().into(), summary);
482 return Ok(project.connection_ids());
483 }
484
485 Err(anyhow!("no such worktree"))?
486 }
487
488 pub fn start_language_server(
489 &mut self,
490 project_id: ProjectId,
491 connection_id: ConnectionId,
492 language_server: proto::LanguageServer,
493 ) -> Result<Vec<ConnectionId>> {
494 let project = self
495 .projects
496 .get_mut(&project_id)
497 .ok_or_else(|| anyhow!("no such project"))?;
498 if project.host_connection_id == connection_id {
499 project.language_servers.push(language_server);
500 return Ok(project.connection_ids());
501 }
502
503 Err(anyhow!("no such project"))?
504 }
505
506 pub fn request_join_project(
507 &mut self,
508 requester_id: UserId,
509 project_id: ProjectId,
510 receipt: Receipt<proto::JoinProject>,
511 ) -> Result<()> {
512 let connection = self
513 .connections
514 .get_mut(&receipt.sender_id)
515 .ok_or_else(|| anyhow!("no such connection"))?;
516 let project = self
517 .projects
518 .get_mut(&project_id)
519 .ok_or_else(|| anyhow!("no such project"))?;
520 if project.online {
521 connection.requested_projects.insert(project_id);
522 project
523 .join_requests
524 .entry(requester_id)
525 .or_default()
526 .push(receipt);
527 Ok(())
528 } else {
529 Err(anyhow!("no such project"))
530 }
531 }
532
533 pub fn deny_join_project_request(
534 &mut self,
535 responder_connection_id: ConnectionId,
536 requester_id: UserId,
537 project_id: ProjectId,
538 ) -> Option<Vec<Receipt<proto::JoinProject>>> {
539 let project = self.projects.get_mut(&project_id)?;
540 if responder_connection_id != project.host_connection_id {
541 return None;
542 }
543
544 let receipts = project.join_requests.remove(&requester_id)?;
545 for receipt in &receipts {
546 let requester_connection = self.connections.get_mut(&receipt.sender_id)?;
547 requester_connection.requested_projects.remove(&project_id);
548 }
549 project.host.last_activity = Some(OffsetDateTime::now_utc());
550
551 Some(receipts)
552 }
553
554 pub fn accept_join_project_request(
555 &mut self,
556 responder_connection_id: ConnectionId,
557 requester_id: UserId,
558 project_id: ProjectId,
559 ) -> Option<(Vec<(Receipt<proto::JoinProject>, ReplicaId)>, &Project)> {
560 let project = self.projects.get_mut(&project_id)?;
561 if responder_connection_id != project.host_connection_id {
562 return None;
563 }
564
565 let receipts = project.join_requests.remove(&requester_id)?;
566 let mut receipts_with_replica_ids = Vec::new();
567 for receipt in receipts {
568 let requester_connection = self.connections.get_mut(&receipt.sender_id)?;
569 requester_connection.requested_projects.remove(&project_id);
570 requester_connection.projects.insert(project_id);
571 let mut replica_id = 1;
572 while project.active_replica_ids.contains(&replica_id) {
573 replica_id += 1;
574 }
575 project.active_replica_ids.insert(replica_id);
576 project.guests.insert(
577 receipt.sender_id,
578 Collaborator {
579 replica_id,
580 user_id: requester_id,
581 last_activity: Some(OffsetDateTime::now_utc()),
582 admin: requester_connection.admin,
583 },
584 );
585 receipts_with_replica_ids.push((receipt, replica_id));
586 }
587
588 project.host.last_activity = Some(OffsetDateTime::now_utc());
589 Some((receipts_with_replica_ids, project))
590 }
591
592 pub fn leave_project(
593 &mut self,
594 connection_id: ConnectionId,
595 project_id: ProjectId,
596 ) -> Result<LeftProject> {
597 let user_id = self.user_id_for_connection(connection_id)?;
598 let project = self
599 .projects
600 .get_mut(&project_id)
601 .ok_or_else(|| anyhow!("no such project"))?;
602
603 // If the connection leaving the project is a collaborator, remove it.
604 let remove_collaborator = if let Some(guest) = project.guests.remove(&connection_id) {
605 project.active_replica_ids.remove(&guest.replica_id);
606 true
607 } else {
608 false
609 };
610
611 // If the connection leaving the project has a pending request, remove it.
612 // If that user has no other pending requests on other connections, indicate that the request should be cancelled.
613 let mut cancel_request = None;
614 if let Entry::Occupied(mut entry) = project.join_requests.entry(user_id) {
615 entry
616 .get_mut()
617 .retain(|receipt| receipt.sender_id != connection_id);
618 if entry.get().is_empty() {
619 entry.remove();
620 cancel_request = Some(user_id);
621 }
622 }
623
624 if let Some(connection) = self.connections.get_mut(&connection_id) {
625 connection.projects.remove(&project_id);
626 }
627
628 let connection_ids = project.connection_ids();
629 let unshare = connection_ids.len() <= 1 && project.join_requests.is_empty();
630 if unshare {
631 project.language_servers.clear();
632 for worktree in project.worktrees.values_mut() {
633 worktree.diagnostic_summaries.clear();
634 worktree.entries.clear();
635 worktree.extension_counts.clear();
636 }
637 }
638
639 Ok(LeftProject {
640 host_connection_id: project.host_connection_id,
641 host_user_id: project.host.user_id,
642 connection_ids,
643 cancel_request,
644 unshare,
645 remove_collaborator,
646 })
647 }
648
649 pub fn update_worktree(
650 &mut self,
651 connection_id: ConnectionId,
652 project_id: ProjectId,
653 worktree_id: u64,
654 worktree_root_name: &str,
655 removed_entries: &[u64],
656 updated_entries: &[proto::Entry],
657 scan_id: u64,
658 ) -> Result<(Vec<ConnectionId>, bool, HashMap<String, usize>)> {
659 let project = self.write_project(project_id, connection_id)?;
660 if !project.online {
661 return Err(anyhow!("project is not online"));
662 }
663
664 let connection_ids = project.connection_ids();
665 let mut worktree = project.worktrees.entry(worktree_id).or_default();
666 let metadata_changed = worktree_root_name != worktree.root_name;
667 worktree.root_name = worktree_root_name.to_string();
668
669 for entry_id in removed_entries {
670 if let Some(entry) = worktree.entries.remove(&entry_id) {
671 if !entry.is_ignored {
672 if let Some(extension) = extension_for_entry(&entry) {
673 if let Some(count) = worktree.extension_counts.get_mut(extension) {
674 *count = count.saturating_sub(1);
675 }
676 }
677 }
678 }
679 }
680
681 for entry in updated_entries {
682 if let Some(old_entry) = worktree.entries.insert(entry.id, entry.clone()) {
683 if !old_entry.is_ignored {
684 if let Some(extension) = extension_for_entry(&old_entry) {
685 if let Some(count) = worktree.extension_counts.get_mut(extension) {
686 *count = count.saturating_sub(1);
687 }
688 }
689 }
690 }
691
692 if !entry.is_ignored {
693 if let Some(extension) = extension_for_entry(&entry) {
694 if let Some(count) = worktree.extension_counts.get_mut(extension) {
695 *count += 1;
696 } else {
697 worktree.extension_counts.insert(extension.into(), 1);
698 }
699 }
700 }
701 }
702
703 worktree.scan_id = scan_id;
704 Ok((
705 connection_ids,
706 metadata_changed,
707 worktree.extension_counts.clone(),
708 ))
709 }
710
711 pub fn project_connection_ids(
712 &self,
713 project_id: ProjectId,
714 acting_connection_id: ConnectionId,
715 ) -> Result<Vec<ConnectionId>> {
716 Ok(self
717 .read_project(project_id, acting_connection_id)?
718 .connection_ids())
719 }
720
721 pub fn channel_connection_ids(&self, channel_id: ChannelId) -> Result<Vec<ConnectionId>> {
722 Ok(self
723 .channels
724 .get(&channel_id)
725 .ok_or_else(|| anyhow!("no such channel"))?
726 .connection_ids())
727 }
728
729 pub fn project(&self, project_id: ProjectId) -> Result<&Project> {
730 self.projects
731 .get(&project_id)
732 .ok_or_else(|| anyhow!("no such project"))
733 }
734
735 pub fn register_project_activity(
736 &mut self,
737 project_id: ProjectId,
738 connection_id: ConnectionId,
739 ) -> Result<()> {
740 let project = self
741 .projects
742 .get_mut(&project_id)
743 .ok_or_else(|| anyhow!("no such project"))?;
744 let collaborator = if connection_id == project.host_connection_id {
745 &mut project.host
746 } else if let Some(guest) = project.guests.get_mut(&connection_id) {
747 guest
748 } else {
749 return Err(anyhow!("no such project"))?;
750 };
751 collaborator.last_activity = Some(OffsetDateTime::now_utc());
752 Ok(())
753 }
754
755 pub fn projects(&self) -> impl Iterator<Item = (&ProjectId, &Project)> {
756 self.projects.iter()
757 }
758
759 pub fn read_project(
760 &self,
761 project_id: ProjectId,
762 connection_id: ConnectionId,
763 ) -> Result<&Project> {
764 let project = self
765 .projects
766 .get(&project_id)
767 .ok_or_else(|| anyhow!("no such project"))?;
768 if project.host_connection_id == connection_id
769 || project.guests.contains_key(&connection_id)
770 {
771 Ok(project)
772 } else {
773 Err(anyhow!("no such project"))?
774 }
775 }
776
777 fn write_project(
778 &mut self,
779 project_id: ProjectId,
780 connection_id: ConnectionId,
781 ) -> Result<&mut Project> {
782 let project = self
783 .projects
784 .get_mut(&project_id)
785 .ok_or_else(|| anyhow!("no such project"))?;
786 if project.host_connection_id == connection_id
787 || project.guests.contains_key(&connection_id)
788 {
789 Ok(project)
790 } else {
791 Err(anyhow!("no such project"))?
792 }
793 }
794
795 #[cfg(test)]
796 pub fn check_invariants(&self) {
797 for (connection_id, connection) in &self.connections {
798 for project_id in &connection.projects {
799 let project = &self.projects.get(&project_id).unwrap();
800 if project.host_connection_id != *connection_id {
801 assert!(project.guests.contains_key(connection_id));
802 }
803
804 for (worktree_id, worktree) in project.worktrees.iter() {
805 let mut paths = HashMap::default();
806 for entry in worktree.entries.values() {
807 let prev_entry = paths.insert(&entry.path, entry);
808 assert_eq!(
809 prev_entry,
810 None,
811 "worktree {:?}, duplicate path for entries {:?} and {:?}",
812 worktree_id,
813 prev_entry.unwrap(),
814 entry
815 );
816 }
817 }
818 }
819 for channel_id in &connection.channels {
820 let channel = self.channels.get(channel_id).unwrap();
821 assert!(channel.connection_ids.contains(connection_id));
822 }
823 assert!(self
824 .connections_by_user_id
825 .get(&connection.user_id)
826 .unwrap()
827 .contains(connection_id));
828 }
829
830 for (user_id, connection_ids) in &self.connections_by_user_id {
831 for connection_id in connection_ids {
832 assert_eq!(
833 self.connections.get(connection_id).unwrap().user_id,
834 *user_id
835 );
836 }
837 }
838
839 for (project_id, project) in &self.projects {
840 let host_connection = self.connections.get(&project.host_connection_id).unwrap();
841 assert!(host_connection.projects.contains(project_id));
842
843 for guest_connection_id in project.guests.keys() {
844 let guest_connection = self.connections.get(guest_connection_id).unwrap();
845 assert!(guest_connection.projects.contains(project_id));
846 }
847 assert_eq!(project.active_replica_ids.len(), project.guests.len(),);
848 assert_eq!(
849 project.active_replica_ids,
850 project
851 .guests
852 .values()
853 .map(|guest| guest.replica_id)
854 .collect::<HashSet<_>>(),
855 );
856 }
857
858 for (channel_id, channel) in &self.channels {
859 for connection_id in &channel.connection_ids {
860 let connection = self.connections.get(connection_id).unwrap();
861 assert!(connection.channels.contains(channel_id));
862 }
863 }
864 }
865}
866
867impl Project {
868 fn is_active_since(&self, start_time: OffsetDateTime) -> bool {
869 self.guests
870 .values()
871 .chain([&self.host])
872 .any(|collaborator| {
873 collaborator
874 .last_activity
875 .map_or(false, |active_time| active_time > start_time)
876 })
877 }
878
879 pub fn guest_connection_ids(&self) -> Vec<ConnectionId> {
880 self.guests.keys().copied().collect()
881 }
882
883 pub fn connection_ids(&self) -> Vec<ConnectionId> {
884 self.guests
885 .keys()
886 .copied()
887 .chain(Some(self.host_connection_id))
888 .collect()
889 }
890}
891
892impl Channel {
893 fn connection_ids(&self) -> Vec<ConnectionId> {
894 self.connection_ids.iter().copied().collect()
895 }
896}
897
898fn extension_for_entry(entry: &proto::Entry) -> Option<&str> {
899 str::from_utf8(&entry.path)
900 .ok()
901 .map(Path::new)
902 .and_then(|p| p.extension())
903 .and_then(|e| e.to_str())
904}