store.rs

  1use crate::db::{self, ChannelId, ProjectId, UserId};
  2use anyhow::{anyhow, Result};
  3use collections::{btree_map, hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet};
  4use rpc::{proto, ConnectionId, Receipt};
  5use serde::Serialize;
  6use std::{
  7    mem,
  8    path::{Path, PathBuf},
  9    str,
 10    time::Duration,
 11};
 12use time::OffsetDateTime;
 13use tracing::instrument;
 14
 15#[derive(Default, Serialize)]
 16pub struct Store {
 17    connections: BTreeMap<ConnectionId, ConnectionState>,
 18    connections_by_user_id: BTreeMap<UserId, HashSet<ConnectionId>>,
 19    projects: BTreeMap<ProjectId, Project>,
 20    #[serde(skip)]
 21    channels: BTreeMap<ChannelId, Channel>,
 22}
 23
 24#[derive(Serialize)]
 25struct ConnectionState {
 26    user_id: UserId,
 27    admin: bool,
 28    projects: BTreeSet<ProjectId>,
 29    requested_projects: HashSet<ProjectId>,
 30    channels: HashSet<ChannelId>,
 31}
 32
 33#[derive(Serialize)]
 34pub struct Project {
 35    pub host_connection_id: ConnectionId,
 36    pub host: Collaborator,
 37    pub guests: HashMap<ConnectionId, Collaborator>,
 38    #[serde(skip)]
 39    pub join_requests: HashMap<UserId, Vec<Receipt<proto::JoinProject>>>,
 40    pub active_replica_ids: HashSet<ReplicaId>,
 41    pub worktrees: BTreeMap<u64, Worktree>,
 42    pub language_servers: Vec<proto::LanguageServer>,
 43}
 44
 45#[derive(Serialize)]
 46pub struct Collaborator {
 47    pub replica_id: ReplicaId,
 48    pub user_id: UserId,
 49    #[serde(skip)]
 50    pub last_activity: Option<OffsetDateTime>,
 51    pub admin: bool,
 52}
 53
 54#[derive(Default, Serialize)]
 55pub struct Worktree {
 56    pub root_name: String,
 57    pub visible: bool,
 58    #[serde(skip)]
 59    pub entries: BTreeMap<u64, proto::Entry>,
 60    #[serde(skip)]
 61    pub extension_counts: HashMap<String, usize>,
 62    #[serde(skip)]
 63    pub diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
 64    pub scan_id: u64,
 65}
 66
 67#[derive(Default)]
 68pub struct Channel {
 69    pub connection_ids: HashSet<ConnectionId>,
 70}
 71
 72pub type ReplicaId = u16;
 73
 74#[derive(Default)]
 75pub struct RemovedConnectionState {
 76    pub user_id: UserId,
 77    pub hosted_projects: HashMap<ProjectId, Project>,
 78    pub guest_project_ids: HashSet<ProjectId>,
 79    pub contact_ids: HashSet<UserId>,
 80}
 81
 82pub struct LeftProject {
 83    pub host_user_id: UserId,
 84    pub host_connection_id: ConnectionId,
 85    pub connection_ids: Vec<ConnectionId>,
 86    pub remove_collaborator: bool,
 87    pub cancel_request: Option<UserId>,
 88    pub unshare: bool,
 89}
 90
 91#[derive(Copy, Clone)]
 92pub struct Metrics {
 93    pub connections: usize,
 94    pub registered_projects: usize,
 95    pub active_projects: usize,
 96    pub shared_projects: usize,
 97}
 98
 99impl Store {
100    pub fn metrics(&self) -> Metrics {
101        const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60);
102        let active_window_start = OffsetDateTime::now_utc() - ACTIVE_PROJECT_TIMEOUT;
103
104        let connections = self.connections.values().filter(|c| !c.admin).count();
105        let mut registered_projects = 0;
106        let mut active_projects = 0;
107        let mut shared_projects = 0;
108        for project in self.projects.values() {
109            if let Some(connection) = self.connections.get(&project.host_connection_id) {
110                if !connection.admin {
111                    registered_projects += 1;
112                    if project.is_active_since(active_window_start) {
113                        active_projects += 1;
114                        if !project.guests.is_empty() {
115                            shared_projects += 1;
116                        }
117                    }
118                }
119            }
120        }
121
122        Metrics {
123            connections,
124            registered_projects,
125            active_projects,
126            shared_projects,
127        }
128    }
129
130    #[instrument(skip(self))]
131    pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId, admin: bool) {
132        self.connections.insert(
133            connection_id,
134            ConnectionState {
135                user_id,
136                admin,
137                projects: Default::default(),
138                requested_projects: Default::default(),
139                channels: Default::default(),
140            },
141        );
142        self.connections_by_user_id
143            .entry(user_id)
144            .or_default()
145            .insert(connection_id);
146    }
147
148    #[instrument(skip(self))]
149    pub fn remove_connection(
150        &mut self,
151        connection_id: ConnectionId,
152    ) -> Result<RemovedConnectionState> {
153        let connection = self
154            .connections
155            .get_mut(&connection_id)
156            .ok_or_else(|| anyhow!("no such connection"))?;
157
158        let user_id = connection.user_id;
159        let connection_projects = mem::take(&mut connection.projects);
160        let connection_channels = mem::take(&mut connection.channels);
161
162        let mut result = RemovedConnectionState::default();
163        result.user_id = user_id;
164
165        // Leave all channels.
166        for channel_id in connection_channels {
167            self.leave_channel(connection_id, channel_id);
168        }
169
170        // Unregister and leave all projects.
171        for project_id in connection_projects {
172            if let Ok(project) = self.unregister_project(project_id, connection_id) {
173                result.hosted_projects.insert(project_id, project);
174            } else if self.leave_project(connection_id, project_id).is_ok() {
175                result.guest_project_ids.insert(project_id);
176            }
177        }
178
179        let user_connections = self.connections_by_user_id.get_mut(&user_id).unwrap();
180        user_connections.remove(&connection_id);
181        if user_connections.is_empty() {
182            self.connections_by_user_id.remove(&user_id);
183        }
184
185        self.connections.remove(&connection_id).unwrap();
186
187        Ok(result)
188    }
189
190    #[cfg(test)]
191    pub fn channel(&self, id: ChannelId) -> Option<&Channel> {
192        self.channels.get(&id)
193    }
194
195    pub fn join_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
196        if let Some(connection) = self.connections.get_mut(&connection_id) {
197            connection.channels.insert(channel_id);
198            self.channels
199                .entry(channel_id)
200                .or_default()
201                .connection_ids
202                .insert(connection_id);
203        }
204    }
205
206    pub fn leave_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
207        if let Some(connection) = self.connections.get_mut(&connection_id) {
208            connection.channels.remove(&channel_id);
209            if let btree_map::Entry::Occupied(mut entry) = self.channels.entry(channel_id) {
210                entry.get_mut().connection_ids.remove(&connection_id);
211                if entry.get_mut().connection_ids.is_empty() {
212                    entry.remove();
213                }
214            }
215        }
216    }
217
218    pub fn user_id_for_connection(&self, connection_id: ConnectionId) -> Result<UserId> {
219        Ok(self
220            .connections
221            .get(&connection_id)
222            .ok_or_else(|| anyhow!("unknown connection"))?
223            .user_id)
224    }
225
226    pub fn connection_ids_for_user<'a>(
227        &'a self,
228        user_id: UserId,
229    ) -> impl 'a + Iterator<Item = ConnectionId> {
230        self.connections_by_user_id
231            .get(&user_id)
232            .into_iter()
233            .flatten()
234            .copied()
235    }
236
237    pub fn is_user_online(&self, user_id: UserId) -> bool {
238        !self
239            .connections_by_user_id
240            .get(&user_id)
241            .unwrap_or(&Default::default())
242            .is_empty()
243    }
244
245    pub fn build_initial_contacts_update(
246        &self,
247        contacts: Vec<db::Contact>,
248    ) -> proto::UpdateContacts {
249        let mut update = proto::UpdateContacts::default();
250
251        for contact in contacts {
252            match contact {
253                db::Contact::Accepted {
254                    user_id,
255                    should_notify,
256                } => {
257                    update
258                        .contacts
259                        .push(self.contact_for_user(user_id, should_notify));
260                }
261                db::Contact::Outgoing { user_id } => {
262                    update.outgoing_requests.push(user_id.to_proto())
263                }
264                db::Contact::Incoming {
265                    user_id,
266                    should_notify,
267                } => update
268                    .incoming_requests
269                    .push(proto::IncomingContactRequest {
270                        requester_id: user_id.to_proto(),
271                        should_notify,
272                    }),
273            }
274        }
275
276        update
277    }
278
279    pub fn contact_for_user(&self, user_id: UserId, should_notify: bool) -> proto::Contact {
280        proto::Contact {
281            user_id: user_id.to_proto(),
282            projects: self.project_metadata_for_user(user_id),
283            online: self.is_user_online(user_id),
284            should_notify,
285        }
286    }
287
288    pub fn project_metadata_for_user(&self, user_id: UserId) -> Vec<proto::ProjectMetadata> {
289        let connection_ids = self.connections_by_user_id.get(&user_id);
290        let project_ids = connection_ids.iter().flat_map(|connection_ids| {
291            connection_ids
292                .iter()
293                .filter_map(|connection_id| self.connections.get(connection_id))
294                .flat_map(|connection| connection.projects.iter().copied())
295        });
296
297        let mut metadata = Vec::new();
298        for project_id in project_ids {
299            if let Some(project) = self.projects.get(&project_id) {
300                if project.host.user_id == user_id {
301                    metadata.push(proto::ProjectMetadata {
302                        id: project_id.to_proto(),
303                        visible_worktree_root_names: project
304                            .worktrees
305                            .values()
306                            .filter(|worktree| worktree.visible)
307                            .map(|worktree| worktree.root_name.clone())
308                            .collect(),
309                        guests: project
310                            .guests
311                            .values()
312                            .map(|guest| guest.user_id.to_proto())
313                            .collect(),
314                    });
315                }
316            }
317        }
318
319        metadata
320    }
321
322    pub fn register_project(
323        &mut self,
324        host_connection_id: ConnectionId,
325        project_id: ProjectId,
326    ) -> Result<()> {
327        let connection = self
328            .connections
329            .get_mut(&host_connection_id)
330            .ok_or_else(|| anyhow!("no such connection"))?;
331        connection.projects.insert(project_id);
332        self.projects.insert(
333            project_id,
334            Project {
335                host_connection_id,
336                host: Collaborator {
337                    user_id: connection.user_id,
338                    replica_id: 0,
339                    last_activity: None,
340                    admin: connection.admin,
341                },
342                guests: Default::default(),
343                join_requests: Default::default(),
344                active_replica_ids: Default::default(),
345                worktrees: Default::default(),
346                language_servers: Default::default(),
347            },
348        );
349        Ok(())
350    }
351
352    pub fn update_project(
353        &mut self,
354        project_id: ProjectId,
355        worktrees: &[proto::WorktreeMetadata],
356        connection_id: ConnectionId,
357    ) -> Result<()> {
358        let project = self
359            .projects
360            .get_mut(&project_id)
361            .ok_or_else(|| anyhow!("no such project"))?;
362        if project.host_connection_id == connection_id {
363            let mut old_worktrees = mem::take(&mut project.worktrees);
364            for worktree in worktrees {
365                if let Some(old_worktree) = old_worktrees.remove(&worktree.id) {
366                    project.worktrees.insert(worktree.id, old_worktree);
367                } else {
368                    project.worktrees.insert(
369                        worktree.id,
370                        Worktree {
371                            root_name: worktree.root_name.clone(),
372                            visible: worktree.visible,
373                            ..Default::default()
374                        },
375                    );
376                }
377            }
378            Ok(())
379        } else {
380            Err(anyhow!("no such project"))?
381        }
382    }
383
384    pub fn unregister_project(
385        &mut self,
386        project_id: ProjectId,
387        connection_id: ConnectionId,
388    ) -> Result<Project> {
389        match self.projects.entry(project_id) {
390            btree_map::Entry::Occupied(e) => {
391                if e.get().host_connection_id == connection_id {
392                    let project = e.remove();
393
394                    if let Some(host_connection) = self.connections.get_mut(&connection_id) {
395                        host_connection.projects.remove(&project_id);
396                    }
397
398                    for guest_connection in project.guests.keys() {
399                        if let Some(connection) = self.connections.get_mut(&guest_connection) {
400                            connection.projects.remove(&project_id);
401                        }
402                    }
403
404                    for requester_user_id in project.join_requests.keys() {
405                        if let Some(requester_connection_ids) =
406                            self.connections_by_user_id.get_mut(&requester_user_id)
407                        {
408                            for requester_connection_id in requester_connection_ids.iter() {
409                                if let Some(requester_connection) =
410                                    self.connections.get_mut(requester_connection_id)
411                                {
412                                    requester_connection.requested_projects.remove(&project_id);
413                                }
414                            }
415                        }
416                    }
417
418                    Ok(project)
419                } else {
420                    Err(anyhow!("no such project"))?
421                }
422            }
423            btree_map::Entry::Vacant(_) => Err(anyhow!("no such project"))?,
424        }
425    }
426
427    pub fn update_diagnostic_summary(
428        &mut self,
429        project_id: ProjectId,
430        worktree_id: u64,
431        connection_id: ConnectionId,
432        summary: proto::DiagnosticSummary,
433    ) -> Result<Vec<ConnectionId>> {
434        let project = self
435            .projects
436            .get_mut(&project_id)
437            .ok_or_else(|| anyhow!("no such project"))?;
438        if project.host_connection_id == connection_id {
439            let worktree = project
440                .worktrees
441                .get_mut(&worktree_id)
442                .ok_or_else(|| anyhow!("no such worktree"))?;
443            worktree
444                .diagnostic_summaries
445                .insert(summary.path.clone().into(), summary);
446            return Ok(project.connection_ids());
447        }
448
449        Err(anyhow!("no such worktree"))?
450    }
451
452    pub fn start_language_server(
453        &mut self,
454        project_id: ProjectId,
455        connection_id: ConnectionId,
456        language_server: proto::LanguageServer,
457    ) -> Result<Vec<ConnectionId>> {
458        let project = self
459            .projects
460            .get_mut(&project_id)
461            .ok_or_else(|| anyhow!("no such project"))?;
462        if project.host_connection_id == connection_id {
463            project.language_servers.push(language_server);
464            return Ok(project.connection_ids());
465        }
466
467        Err(anyhow!("no such project"))?
468    }
469
470    pub fn request_join_project(
471        &mut self,
472        requester_id: UserId,
473        project_id: ProjectId,
474        receipt: Receipt<proto::JoinProject>,
475    ) -> Result<()> {
476        let connection = self
477            .connections
478            .get_mut(&receipt.sender_id)
479            .ok_or_else(|| anyhow!("no such connection"))?;
480        let project = self
481            .projects
482            .get_mut(&project_id)
483            .ok_or_else(|| anyhow!("no such project"))?;
484        connection.requested_projects.insert(project_id);
485        project
486            .join_requests
487            .entry(requester_id)
488            .or_default()
489            .push(receipt);
490        Ok(())
491    }
492
493    pub fn deny_join_project_request(
494        &mut self,
495        responder_connection_id: ConnectionId,
496        requester_id: UserId,
497        project_id: ProjectId,
498    ) -> Option<Vec<Receipt<proto::JoinProject>>> {
499        let project = self.projects.get_mut(&project_id)?;
500        if responder_connection_id != project.host_connection_id {
501            return None;
502        }
503
504        let receipts = project.join_requests.remove(&requester_id)?;
505        for receipt in &receipts {
506            let requester_connection = self.connections.get_mut(&receipt.sender_id)?;
507            requester_connection.requested_projects.remove(&project_id);
508        }
509        project.host.last_activity = Some(OffsetDateTime::now_utc());
510
511        Some(receipts)
512    }
513
514    pub fn accept_join_project_request(
515        &mut self,
516        responder_connection_id: ConnectionId,
517        requester_id: UserId,
518        project_id: ProjectId,
519    ) -> Option<(Vec<(Receipt<proto::JoinProject>, ReplicaId)>, &Project)> {
520        let project = self.projects.get_mut(&project_id)?;
521        if responder_connection_id != project.host_connection_id {
522            return None;
523        }
524
525        let receipts = project.join_requests.remove(&requester_id)?;
526        let mut receipts_with_replica_ids = Vec::new();
527        for receipt in receipts {
528            let requester_connection = self.connections.get_mut(&receipt.sender_id)?;
529            requester_connection.requested_projects.remove(&project_id);
530            requester_connection.projects.insert(project_id);
531            let mut replica_id = 1;
532            while project.active_replica_ids.contains(&replica_id) {
533                replica_id += 1;
534            }
535            project.active_replica_ids.insert(replica_id);
536            project.guests.insert(
537                receipt.sender_id,
538                Collaborator {
539                    replica_id,
540                    user_id: requester_id,
541                    last_activity: Some(OffsetDateTime::now_utc()),
542                    admin: requester_connection.admin,
543                },
544            );
545            receipts_with_replica_ids.push((receipt, replica_id));
546        }
547
548        project.host.last_activity = Some(OffsetDateTime::now_utc());
549        Some((receipts_with_replica_ids, project))
550    }
551
552    pub fn leave_project(
553        &mut self,
554        connection_id: ConnectionId,
555        project_id: ProjectId,
556    ) -> Result<LeftProject> {
557        let user_id = self.user_id_for_connection(connection_id)?;
558        let project = self
559            .projects
560            .get_mut(&project_id)
561            .ok_or_else(|| anyhow!("no such project"))?;
562
563        // If the connection leaving the project is a collaborator, remove it.
564        let remove_collaborator = if let Some(guest) = project.guests.remove(&connection_id) {
565            project.active_replica_ids.remove(&guest.replica_id);
566            true
567        } else {
568            false
569        };
570
571        // If the connection leaving the project has a pending request, remove it.
572        // If that user has no other pending requests on other connections, indicate that the request should be cancelled.
573        let mut cancel_request = None;
574        if let Entry::Occupied(mut entry) = project.join_requests.entry(user_id) {
575            entry
576                .get_mut()
577                .retain(|receipt| receipt.sender_id != connection_id);
578            if entry.get().is_empty() {
579                entry.remove();
580                cancel_request = Some(user_id);
581            }
582        }
583
584        if let Some(connection) = self.connections.get_mut(&connection_id) {
585            connection.projects.remove(&project_id);
586        }
587
588        let connection_ids = project.connection_ids();
589        let unshare = connection_ids.len() <= 1 && project.join_requests.is_empty();
590        if unshare {
591            project.language_servers.clear();
592            for worktree in project.worktrees.values_mut() {
593                worktree.diagnostic_summaries.clear();
594                worktree.entries.clear();
595                worktree.extension_counts.clear();
596            }
597        }
598
599        Ok(LeftProject {
600            host_connection_id: project.host_connection_id,
601            host_user_id: project.host.user_id,
602            connection_ids,
603            cancel_request,
604            unshare,
605            remove_collaborator,
606        })
607    }
608
609    pub fn update_worktree(
610        &mut self,
611        connection_id: ConnectionId,
612        project_id: ProjectId,
613        worktree_id: u64,
614        worktree_root_name: &str,
615        removed_entries: &[u64],
616        updated_entries: &[proto::Entry],
617        scan_id: u64,
618    ) -> Result<(Vec<ConnectionId>, bool, HashMap<String, usize>)> {
619        let project = self.write_project(project_id, connection_id)?;
620        let connection_ids = project.connection_ids();
621        let mut worktree = project.worktrees.entry(worktree_id).or_default();
622        let metadata_changed = worktree_root_name != worktree.root_name;
623        worktree.root_name = worktree_root_name.to_string();
624
625        for entry_id in removed_entries {
626            if let Some(entry) = worktree.entries.remove(&entry_id) {
627                if !entry.is_ignored {
628                    if let Some(extension) = extension_for_entry(&entry) {
629                        if let Some(count) = worktree.extension_counts.get_mut(extension) {
630                            *count = count.saturating_sub(1);
631                        }
632                    }
633                }
634            }
635        }
636
637        for entry in updated_entries {
638            if let Some(old_entry) = worktree.entries.insert(entry.id, entry.clone()) {
639                if !old_entry.is_ignored {
640                    if let Some(extension) = extension_for_entry(&old_entry) {
641                        if let Some(count) = worktree.extension_counts.get_mut(extension) {
642                            *count = count.saturating_sub(1);
643                        }
644                    }
645                }
646            }
647
648            if !entry.is_ignored {
649                if let Some(extension) = extension_for_entry(&entry) {
650                    if let Some(count) = worktree.extension_counts.get_mut(extension) {
651                        *count += 1;
652                    } else {
653                        worktree.extension_counts.insert(extension.into(), 1);
654                    }
655                }
656            }
657        }
658
659        worktree.scan_id = scan_id;
660        Ok((
661            connection_ids,
662            metadata_changed,
663            worktree.extension_counts.clone(),
664        ))
665    }
666
667    pub fn project_connection_ids(
668        &self,
669        project_id: ProjectId,
670        acting_connection_id: ConnectionId,
671    ) -> Result<Vec<ConnectionId>> {
672        Ok(self
673            .read_project(project_id, acting_connection_id)?
674            .connection_ids())
675    }
676
677    pub fn channel_connection_ids(&self, channel_id: ChannelId) -> Result<Vec<ConnectionId>> {
678        Ok(self
679            .channels
680            .get(&channel_id)
681            .ok_or_else(|| anyhow!("no such channel"))?
682            .connection_ids())
683    }
684
685    pub fn project(&self, project_id: ProjectId) -> Result<&Project> {
686        self.projects
687            .get(&project_id)
688            .ok_or_else(|| anyhow!("no such project"))
689    }
690
691    pub fn register_project_activity(
692        &mut self,
693        project_id: ProjectId,
694        connection_id: ConnectionId,
695    ) -> Result<()> {
696        let project = self
697            .projects
698            .get_mut(&project_id)
699            .ok_or_else(|| anyhow!("no such project"))?;
700        let collaborator = if connection_id == project.host_connection_id {
701            &mut project.host
702        } else if let Some(guest) = project.guests.get_mut(&connection_id) {
703            guest
704        } else {
705            return Err(anyhow!("no such project"))?;
706        };
707        collaborator.last_activity = Some(OffsetDateTime::now_utc());
708        Ok(())
709    }
710
711    pub fn projects(&self) -> impl Iterator<Item = (&ProjectId, &Project)> {
712        self.projects.iter()
713    }
714
715    pub fn read_project(
716        &self,
717        project_id: ProjectId,
718        connection_id: ConnectionId,
719    ) -> Result<&Project> {
720        let project = self
721            .projects
722            .get(&project_id)
723            .ok_or_else(|| anyhow!("no such project"))?;
724        if project.host_connection_id == connection_id
725            || project.guests.contains_key(&connection_id)
726        {
727            Ok(project)
728        } else {
729            Err(anyhow!("no such project"))?
730        }
731    }
732
733    fn write_project(
734        &mut self,
735        project_id: ProjectId,
736        connection_id: ConnectionId,
737    ) -> Result<&mut Project> {
738        let project = self
739            .projects
740            .get_mut(&project_id)
741            .ok_or_else(|| anyhow!("no such project"))?;
742        if project.host_connection_id == connection_id
743            || project.guests.contains_key(&connection_id)
744        {
745            Ok(project)
746        } else {
747            Err(anyhow!("no such project"))?
748        }
749    }
750
751    #[cfg(test)]
752    pub fn check_invariants(&self) {
753        for (connection_id, connection) in &self.connections {
754            for project_id in &connection.projects {
755                let project = &self.projects.get(&project_id).unwrap();
756                if project.host_connection_id != *connection_id {
757                    assert!(project.guests.contains_key(connection_id));
758                }
759
760                for (worktree_id, worktree) in project.worktrees.iter() {
761                    let mut paths = HashMap::default();
762                    for entry in worktree.entries.values() {
763                        let prev_entry = paths.insert(&entry.path, entry);
764                        assert_eq!(
765                            prev_entry,
766                            None,
767                            "worktree {:?}, duplicate path for entries {:?} and {:?}",
768                            worktree_id,
769                            prev_entry.unwrap(),
770                            entry
771                        );
772                    }
773                }
774            }
775            for channel_id in &connection.channels {
776                let channel = self.channels.get(channel_id).unwrap();
777                assert!(channel.connection_ids.contains(connection_id));
778            }
779            assert!(self
780                .connections_by_user_id
781                .get(&connection.user_id)
782                .unwrap()
783                .contains(connection_id));
784        }
785
786        for (user_id, connection_ids) in &self.connections_by_user_id {
787            for connection_id in connection_ids {
788                assert_eq!(
789                    self.connections.get(connection_id).unwrap().user_id,
790                    *user_id
791                );
792            }
793        }
794
795        for (project_id, project) in &self.projects {
796            let host_connection = self.connections.get(&project.host_connection_id).unwrap();
797            assert!(host_connection.projects.contains(project_id));
798
799            for guest_connection_id in project.guests.keys() {
800                let guest_connection = self.connections.get(guest_connection_id).unwrap();
801                assert!(guest_connection.projects.contains(project_id));
802            }
803            assert_eq!(project.active_replica_ids.len(), project.guests.len(),);
804            assert_eq!(
805                project.active_replica_ids,
806                project
807                    .guests
808                    .values()
809                    .map(|guest| guest.replica_id)
810                    .collect::<HashSet<_>>(),
811            );
812        }
813
814        for (channel_id, channel) in &self.channels {
815            for connection_id in &channel.connection_ids {
816                let connection = self.connections.get(connection_id).unwrap();
817                assert!(connection.channels.contains(channel_id));
818            }
819        }
820    }
821}
822
823impl Project {
824    fn is_active_since(&self, start_time: OffsetDateTime) -> bool {
825        self.guests
826            .values()
827            .chain([&self.host])
828            .any(|collaborator| {
829                collaborator
830                    .last_activity
831                    .map_or(false, |active_time| active_time > start_time)
832            })
833    }
834
835    pub fn guest_connection_ids(&self) -> Vec<ConnectionId> {
836        self.guests.keys().copied().collect()
837    }
838
839    pub fn connection_ids(&self) -> Vec<ConnectionId> {
840        self.guests
841            .keys()
842            .copied()
843            .chain(Some(self.host_connection_id))
844            .collect()
845    }
846}
847
848impl Channel {
849    fn connection_ids(&self) -> Vec<ConnectionId> {
850        self.connection_ids.iter().copied().collect()
851    }
852}
853
854fn extension_for_entry(entry: &proto::Entry) -> Option<&str> {
855    str::from_utf8(&entry.path)
856        .ok()
857        .map(Path::new)
858        .and_then(|p| p.extension())
859        .and_then(|e| e.to_str())
860}