store.rs

  1use crate::db::{self, ChannelId, ProjectId, UserId};
  2use anyhow::{anyhow, Result};
  3use collections::{
  4    btree_map,
  5    hash_map::{self, Entry},
  6    BTreeMap, BTreeSet, HashMap, HashSet,
  7};
  8use rpc::{proto, ConnectionId, Receipt};
  9use serde::Serialize;
 10use std::{
 11    mem,
 12    path::{Path, PathBuf},
 13    str,
 14    time::Duration,
 15};
 16use time::OffsetDateTime;
 17use tracing::instrument;
 18
 19#[derive(Default, Serialize)]
 20pub struct Store {
 21    connections: HashMap<ConnectionId, ConnectionState>,
 22    connections_by_user_id: HashMap<UserId, HashSet<ConnectionId>>,
 23    projects: BTreeMap<ProjectId, Project>,
 24    #[serde(skip)]
 25    channels: HashMap<ChannelId, Channel>,
 26}
 27
 28#[derive(Serialize)]
 29struct ConnectionState {
 30    user_id: UserId,
 31    admin: bool,
 32    projects: BTreeSet<ProjectId>,
 33    requested_projects: HashSet<ProjectId>,
 34    channels: HashSet<ChannelId>,
 35}
 36
 37#[derive(Serialize)]
 38pub struct Project {
 39    pub host_connection_id: ConnectionId,
 40    pub host: Collaborator,
 41    pub guests: HashMap<ConnectionId, Collaborator>,
 42    #[serde(skip)]
 43    pub join_requests: HashMap<UserId, Vec<Receipt<proto::JoinProject>>>,
 44    pub active_replica_ids: HashSet<ReplicaId>,
 45    pub worktrees: BTreeMap<u64, Worktree>,
 46    pub language_servers: Vec<proto::LanguageServer>,
 47}
 48
 49#[derive(Serialize)]
 50pub struct Collaborator {
 51    pub replica_id: ReplicaId,
 52    pub user_id: UserId,
 53    #[serde(skip)]
 54    pub last_activity: Option<OffsetDateTime>,
 55}
 56
 57#[derive(Default, Serialize)]
 58pub struct Worktree {
 59    pub root_name: String,
 60    pub visible: bool,
 61    #[serde(skip)]
 62    pub entries: HashMap<u64, proto::Entry>,
 63    #[serde(skip)]
 64    pub extension_counts: HashMap<String, usize>,
 65    #[serde(skip)]
 66    pub diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
 67    pub scan_id: u64,
 68}
 69
 70#[derive(Default)]
 71pub struct Channel {
 72    pub connection_ids: HashSet<ConnectionId>,
 73}
 74
 75pub type ReplicaId = u16;
 76
 77#[derive(Default)]
 78pub struct RemovedConnectionState {
 79    pub user_id: UserId,
 80    pub hosted_projects: HashMap<ProjectId, Project>,
 81    pub guest_project_ids: HashSet<ProjectId>,
 82    pub contact_ids: HashSet<UserId>,
 83}
 84
 85pub struct LeftProject {
 86    pub host_user_id: UserId,
 87    pub host_connection_id: ConnectionId,
 88    pub connection_ids: Vec<ConnectionId>,
 89    pub remove_collaborator: bool,
 90    pub cancel_request: Option<UserId>,
 91    pub unshare: bool,
 92}
 93
 94#[derive(Copy, Clone)]
 95pub struct Metrics {
 96    pub connections: usize,
 97    pub registered_projects: usize,
 98    pub active_projects: usize,
 99    pub shared_projects: usize,
100}
101
102impl Store {
103    pub fn metrics(&self) -> Metrics {
104        const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60);
105        let active_window_start = OffsetDateTime::now_utc() - ACTIVE_PROJECT_TIMEOUT;
106
107        let connections = self.connections.values().filter(|c| !c.admin).count();
108        let mut registered_projects = 0;
109        let mut active_projects = 0;
110        let mut shared_projects = 0;
111        for project in self.projects.values() {
112            if let Some(connection) = self.connections.get(&project.host_connection_id) {
113                if !connection.admin {
114                    registered_projects += 1;
115                    if project.is_active_since(active_window_start) {
116                        active_projects += 1;
117                        if !project.guests.is_empty() {
118                            shared_projects += 1;
119                        }
120                    }
121                }
122            }
123        }
124
125        Metrics {
126            connections,
127            registered_projects,
128            active_projects,
129            shared_projects,
130        }
131    }
132
133    #[instrument(skip(self))]
134    pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId, admin: bool) {
135        self.connections.insert(
136            connection_id,
137            ConnectionState {
138                user_id,
139                admin,
140                projects: Default::default(),
141                requested_projects: Default::default(),
142                channels: Default::default(),
143            },
144        );
145        self.connections_by_user_id
146            .entry(user_id)
147            .or_default()
148            .insert(connection_id);
149    }
150
151    #[instrument(skip(self))]
152    pub fn remove_connection(
153        &mut self,
154        connection_id: ConnectionId,
155    ) -> Result<RemovedConnectionState> {
156        let connection = self
157            .connections
158            .get_mut(&connection_id)
159            .ok_or_else(|| anyhow!("no such connection"))?;
160
161        let user_id = connection.user_id;
162        let connection_projects = mem::take(&mut connection.projects);
163        let connection_channels = mem::take(&mut connection.channels);
164
165        let mut result = RemovedConnectionState::default();
166        result.user_id = user_id;
167
168        // Leave all channels.
169        for channel_id in connection_channels {
170            self.leave_channel(connection_id, channel_id);
171        }
172
173        // Unregister and leave all projects.
174        for project_id in connection_projects {
175            if let Ok(project) = self.unregister_project(project_id, connection_id) {
176                result.hosted_projects.insert(project_id, project);
177            } else if self.leave_project(connection_id, project_id).is_ok() {
178                result.guest_project_ids.insert(project_id);
179            }
180        }
181
182        let user_connections = self.connections_by_user_id.get_mut(&user_id).unwrap();
183        user_connections.remove(&connection_id);
184        if user_connections.is_empty() {
185            self.connections_by_user_id.remove(&user_id);
186        }
187
188        self.connections.remove(&connection_id).unwrap();
189
190        Ok(result)
191    }
192
193    #[cfg(test)]
194    pub fn channel(&self, id: ChannelId) -> Option<&Channel> {
195        self.channels.get(&id)
196    }
197
198    pub fn join_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
199        if let Some(connection) = self.connections.get_mut(&connection_id) {
200            connection.channels.insert(channel_id);
201            self.channels
202                .entry(channel_id)
203                .or_default()
204                .connection_ids
205                .insert(connection_id);
206        }
207    }
208
209    pub fn leave_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
210        if let Some(connection) = self.connections.get_mut(&connection_id) {
211            connection.channels.remove(&channel_id);
212            if let hash_map::Entry::Occupied(mut entry) = self.channels.entry(channel_id) {
213                entry.get_mut().connection_ids.remove(&connection_id);
214                if entry.get_mut().connection_ids.is_empty() {
215                    entry.remove();
216                }
217            }
218        }
219    }
220
221    pub fn user_id_for_connection(&self, connection_id: ConnectionId) -> Result<UserId> {
222        Ok(self
223            .connections
224            .get(&connection_id)
225            .ok_or_else(|| anyhow!("unknown connection"))?
226            .user_id)
227    }
228
229    pub fn connection_ids_for_user<'a>(
230        &'a self,
231        user_id: UserId,
232    ) -> impl 'a + Iterator<Item = ConnectionId> {
233        self.connections_by_user_id
234            .get(&user_id)
235            .into_iter()
236            .flatten()
237            .copied()
238    }
239
240    pub fn is_user_online(&self, user_id: UserId) -> bool {
241        !self
242            .connections_by_user_id
243            .get(&user_id)
244            .unwrap_or(&Default::default())
245            .is_empty()
246    }
247
248    pub fn build_initial_contacts_update(
249        &self,
250        contacts: Vec<db::Contact>,
251    ) -> proto::UpdateContacts {
252        let mut update = proto::UpdateContacts::default();
253
254        for contact in contacts {
255            match contact {
256                db::Contact::Accepted {
257                    user_id,
258                    should_notify,
259                } => {
260                    update
261                        .contacts
262                        .push(self.contact_for_user(user_id, should_notify));
263                }
264                db::Contact::Outgoing { user_id } => {
265                    update.outgoing_requests.push(user_id.to_proto())
266                }
267                db::Contact::Incoming {
268                    user_id,
269                    should_notify,
270                } => update
271                    .incoming_requests
272                    .push(proto::IncomingContactRequest {
273                        requester_id: user_id.to_proto(),
274                        should_notify,
275                    }),
276            }
277        }
278
279        update
280    }
281
282    pub fn contact_for_user(&self, user_id: UserId, should_notify: bool) -> proto::Contact {
283        proto::Contact {
284            user_id: user_id.to_proto(),
285            projects: self.project_metadata_for_user(user_id),
286            online: self.is_user_online(user_id),
287            should_notify,
288        }
289    }
290
291    pub fn project_metadata_for_user(&self, user_id: UserId) -> Vec<proto::ProjectMetadata> {
292        let connection_ids = self.connections_by_user_id.get(&user_id);
293        let project_ids = connection_ids.iter().flat_map(|connection_ids| {
294            connection_ids
295                .iter()
296                .filter_map(|connection_id| self.connections.get(connection_id))
297                .flat_map(|connection| connection.projects.iter().copied())
298        });
299
300        let mut metadata = Vec::new();
301        for project_id in project_ids {
302            if let Some(project) = self.projects.get(&project_id) {
303                if project.host.user_id == user_id {
304                    metadata.push(proto::ProjectMetadata {
305                        id: project_id.to_proto(),
306                        visible_worktree_root_names: project
307                            .worktrees
308                            .values()
309                            .filter(|worktree| worktree.visible)
310                            .map(|worktree| worktree.root_name.clone())
311                            .collect(),
312                        guests: project
313                            .guests
314                            .values()
315                            .map(|guest| guest.user_id.to_proto())
316                            .collect(),
317                    });
318                }
319            }
320        }
321
322        metadata
323    }
324
325    pub fn register_project(
326        &mut self,
327        host_connection_id: ConnectionId,
328        project_id: ProjectId,
329    ) -> Result<()> {
330        let connection = self
331            .connections
332            .get_mut(&host_connection_id)
333            .ok_or_else(|| anyhow!("no such connection"))?;
334        connection.projects.insert(project_id);
335        self.projects.insert(
336            project_id,
337            Project {
338                host_connection_id,
339                host: Collaborator {
340                    user_id: connection.user_id,
341                    replica_id: 0,
342                    last_activity: None,
343                },
344                guests: Default::default(),
345                join_requests: Default::default(),
346                active_replica_ids: Default::default(),
347                worktrees: Default::default(),
348                language_servers: Default::default(),
349            },
350        );
351        Ok(())
352    }
353
354    pub fn update_project(
355        &mut self,
356        project_id: ProjectId,
357        worktrees: &[proto::WorktreeMetadata],
358        connection_id: ConnectionId,
359    ) -> Result<()> {
360        let project = self
361            .projects
362            .get_mut(&project_id)
363            .ok_or_else(|| anyhow!("no such project"))?;
364        if project.host_connection_id == connection_id {
365            let mut old_worktrees = mem::take(&mut project.worktrees);
366            for worktree in worktrees {
367                if let Some(old_worktree) = old_worktrees.remove(&worktree.id) {
368                    project.worktrees.insert(worktree.id, old_worktree);
369                } else {
370                    project.worktrees.insert(
371                        worktree.id,
372                        Worktree {
373                            root_name: worktree.root_name.clone(),
374                            visible: worktree.visible,
375                            ..Default::default()
376                        },
377                    );
378                }
379            }
380            Ok(())
381        } else {
382            Err(anyhow!("no such project"))?
383        }
384    }
385
386    pub fn unregister_project(
387        &mut self,
388        project_id: ProjectId,
389        connection_id: ConnectionId,
390    ) -> Result<Project> {
391        match self.projects.entry(project_id) {
392            btree_map::Entry::Occupied(e) => {
393                if e.get().host_connection_id == connection_id {
394                    let project = e.remove();
395
396                    if let Some(host_connection) = self.connections.get_mut(&connection_id) {
397                        host_connection.projects.remove(&project_id);
398                    }
399
400                    for guest_connection in project.guests.keys() {
401                        if let Some(connection) = self.connections.get_mut(&guest_connection) {
402                            connection.projects.remove(&project_id);
403                        }
404                    }
405
406                    for requester_user_id in project.join_requests.keys() {
407                        if let Some(requester_connection_ids) =
408                            self.connections_by_user_id.get_mut(&requester_user_id)
409                        {
410                            for requester_connection_id in requester_connection_ids.iter() {
411                                if let Some(requester_connection) =
412                                    self.connections.get_mut(requester_connection_id)
413                                {
414                                    requester_connection.requested_projects.remove(&project_id);
415                                }
416                            }
417                        }
418                    }
419
420                    Ok(project)
421                } else {
422                    Err(anyhow!("no such project"))?
423                }
424            }
425            btree_map::Entry::Vacant(_) => Err(anyhow!("no such project"))?,
426        }
427    }
428
429    pub fn update_diagnostic_summary(
430        &mut self,
431        project_id: ProjectId,
432        worktree_id: u64,
433        connection_id: ConnectionId,
434        summary: proto::DiagnosticSummary,
435    ) -> Result<Vec<ConnectionId>> {
436        let project = self
437            .projects
438            .get_mut(&project_id)
439            .ok_or_else(|| anyhow!("no such project"))?;
440        if project.host_connection_id == connection_id {
441            let worktree = project
442                .worktrees
443                .get_mut(&worktree_id)
444                .ok_or_else(|| anyhow!("no such worktree"))?;
445            worktree
446                .diagnostic_summaries
447                .insert(summary.path.clone().into(), summary);
448            return Ok(project.connection_ids());
449        }
450
451        Err(anyhow!("no such worktree"))?
452    }
453
454    pub fn start_language_server(
455        &mut self,
456        project_id: ProjectId,
457        connection_id: ConnectionId,
458        language_server: proto::LanguageServer,
459    ) -> Result<Vec<ConnectionId>> {
460        let project = self
461            .projects
462            .get_mut(&project_id)
463            .ok_or_else(|| anyhow!("no such project"))?;
464        if project.host_connection_id == connection_id {
465            project.language_servers.push(language_server);
466            return Ok(project.connection_ids());
467        }
468
469        Err(anyhow!("no such project"))?
470    }
471
472    pub fn request_join_project(
473        &mut self,
474        requester_id: UserId,
475        project_id: ProjectId,
476        receipt: Receipt<proto::JoinProject>,
477    ) -> Result<()> {
478        let connection = self
479            .connections
480            .get_mut(&receipt.sender_id)
481            .ok_or_else(|| anyhow!("no such connection"))?;
482        let project = self
483            .projects
484            .get_mut(&project_id)
485            .ok_or_else(|| anyhow!("no such project"))?;
486        connection.requested_projects.insert(project_id);
487        project
488            .join_requests
489            .entry(requester_id)
490            .or_default()
491            .push(receipt);
492        Ok(())
493    }
494
495    pub fn deny_join_project_request(
496        &mut self,
497        responder_connection_id: ConnectionId,
498        requester_id: UserId,
499        project_id: ProjectId,
500    ) -> Option<Vec<Receipt<proto::JoinProject>>> {
501        let project = self.projects.get_mut(&project_id)?;
502        if responder_connection_id != project.host_connection_id {
503            return None;
504        }
505
506        let receipts = project.join_requests.remove(&requester_id)?;
507        for receipt in &receipts {
508            let requester_connection = self.connections.get_mut(&receipt.sender_id)?;
509            requester_connection.requested_projects.remove(&project_id);
510        }
511        project.host.last_activity = Some(OffsetDateTime::now_utc());
512
513        Some(receipts)
514    }
515
516    pub fn accept_join_project_request(
517        &mut self,
518        responder_connection_id: ConnectionId,
519        requester_id: UserId,
520        project_id: ProjectId,
521    ) -> Option<(Vec<(Receipt<proto::JoinProject>, ReplicaId)>, &Project)> {
522        let project = self.projects.get_mut(&project_id)?;
523        if responder_connection_id != project.host_connection_id {
524            return None;
525        }
526
527        let receipts = project.join_requests.remove(&requester_id)?;
528        let mut receipts_with_replica_ids = Vec::new();
529        for receipt in receipts {
530            let requester_connection = self.connections.get_mut(&receipt.sender_id)?;
531            requester_connection.requested_projects.remove(&project_id);
532            requester_connection.projects.insert(project_id);
533            let mut replica_id = 1;
534            while project.active_replica_ids.contains(&replica_id) {
535                replica_id += 1;
536            }
537            project.active_replica_ids.insert(replica_id);
538            project.guests.insert(
539                receipt.sender_id,
540                Collaborator {
541                    replica_id,
542                    user_id: requester_id,
543                    last_activity: Some(OffsetDateTime::now_utc()),
544                },
545            );
546            receipts_with_replica_ids.push((receipt, replica_id));
547        }
548
549        project.host.last_activity = Some(OffsetDateTime::now_utc());
550        Some((receipts_with_replica_ids, project))
551    }
552
553    pub fn leave_project(
554        &mut self,
555        connection_id: ConnectionId,
556        project_id: ProjectId,
557    ) -> Result<LeftProject> {
558        let user_id = self.user_id_for_connection(connection_id)?;
559        let project = self
560            .projects
561            .get_mut(&project_id)
562            .ok_or_else(|| anyhow!("no such project"))?;
563
564        // If the connection leaving the project is a collaborator, remove it.
565        let remove_collaborator = if let Some(guest) = project.guests.remove(&connection_id) {
566            project.active_replica_ids.remove(&guest.replica_id);
567            true
568        } else {
569            false
570        };
571
572        // If the connection leaving the project has a pending request, remove it.
573        // If that user has no other pending requests on other connections, indicate that the request should be cancelled.
574        let mut cancel_request = None;
575        if let Entry::Occupied(mut entry) = project.join_requests.entry(user_id) {
576            entry
577                .get_mut()
578                .retain(|receipt| receipt.sender_id != connection_id);
579            if entry.get().is_empty() {
580                entry.remove();
581                cancel_request = Some(user_id);
582            }
583        }
584
585        if let Some(connection) = self.connections.get_mut(&connection_id) {
586            connection.projects.remove(&project_id);
587        }
588
589        let connection_ids = project.connection_ids();
590        let unshare = connection_ids.len() <= 1 && project.join_requests.is_empty();
591        if unshare {
592            project.language_servers.clear();
593            for worktree in project.worktrees.values_mut() {
594                worktree.diagnostic_summaries.clear();
595                worktree.entries.clear();
596            }
597        }
598
599        Ok(LeftProject {
600            host_connection_id: project.host_connection_id,
601            host_user_id: project.host.user_id,
602            connection_ids,
603            cancel_request,
604            unshare,
605            remove_collaborator,
606        })
607    }
608
609    pub fn update_worktree(
610        &mut self,
611        connection_id: ConnectionId,
612        project_id: ProjectId,
613        worktree_id: u64,
614        worktree_root_name: &str,
615        removed_entries: &[u64],
616        updated_entries: &[proto::Entry],
617        scan_id: u64,
618    ) -> Result<(Vec<ConnectionId>, bool, HashMap<String, usize>)> {
619        let project = self.write_project(project_id, connection_id)?;
620        let connection_ids = project.connection_ids();
621        let mut worktree = project.worktrees.entry(worktree_id).or_default();
622        let metadata_changed = worktree_root_name != worktree.root_name;
623        worktree.root_name = worktree_root_name.to_string();
624
625        for entry_id in removed_entries {
626            if let Some(entry) = worktree.entries.remove(&entry_id) {
627                if !entry.is_ignored {
628                    if let Some(extension) = extension_for_entry(&entry) {
629                        if let Some(count) = worktree.extension_counts.get_mut(extension) {
630                            *count = count.saturating_sub(1);
631                        }
632                    }
633                }
634            }
635        }
636
637        for entry in updated_entries {
638            if let Some(old_entry) = worktree.entries.insert(entry.id, entry.clone()) {
639                if !old_entry.is_ignored {
640                    if let Some(extension) = extension_for_entry(&old_entry) {
641                        if let Some(count) = worktree.extension_counts.get_mut(extension) {
642                            *count = count.saturating_sub(1);
643                        }
644                    }
645                }
646            }
647
648            if !entry.is_ignored {
649                if let Some(extension) = extension_for_entry(&entry) {
650                    if let Some(count) = worktree.extension_counts.get_mut(extension) {
651                        *count += 1;
652                    } else {
653                        worktree.extension_counts.insert(extension.into(), 1);
654                    }
655                }
656            }
657        }
658
659        worktree.scan_id = scan_id;
660        Ok((
661            connection_ids,
662            metadata_changed,
663            worktree.extension_counts.clone(),
664        ))
665    }
666
667    pub fn project_connection_ids(
668        &self,
669        project_id: ProjectId,
670        acting_connection_id: ConnectionId,
671    ) -> Result<Vec<ConnectionId>> {
672        Ok(self
673            .read_project(project_id, acting_connection_id)?
674            .connection_ids())
675    }
676
677    pub fn channel_connection_ids(&self, channel_id: ChannelId) -> Result<Vec<ConnectionId>> {
678        Ok(self
679            .channels
680            .get(&channel_id)
681            .ok_or_else(|| anyhow!("no such channel"))?
682            .connection_ids())
683    }
684
685    pub fn project(&self, project_id: ProjectId) -> Result<&Project> {
686        self.projects
687            .get(&project_id)
688            .ok_or_else(|| anyhow!("no such project"))
689    }
690
691    pub fn register_project_activity(
692        &mut self,
693        project_id: ProjectId,
694        connection_id: ConnectionId,
695    ) -> Result<()> {
696        let project = self
697            .projects
698            .get_mut(&project_id)
699            .ok_or_else(|| anyhow!("no such project"))?;
700        let collaborator = if connection_id == project.host_connection_id {
701            &mut project.host
702        } else if let Some(guest) = project.guests.get_mut(&connection_id) {
703            guest
704        } else {
705            return Err(anyhow!("no such project"))?;
706        };
707        collaborator.last_activity = Some(OffsetDateTime::now_utc());
708        Ok(())
709    }
710
711    pub fn projects(&self) -> impl Iterator<Item = (&ProjectId, &Project)> {
712        self.projects.iter()
713    }
714
715    pub fn read_project(
716        &self,
717        project_id: ProjectId,
718        connection_id: ConnectionId,
719    ) -> Result<&Project> {
720        let project = self
721            .projects
722            .get(&project_id)
723            .ok_or_else(|| anyhow!("no such project"))?;
724        if project.host_connection_id == connection_id
725            || project.guests.contains_key(&connection_id)
726        {
727            Ok(project)
728        } else {
729            Err(anyhow!("no such project"))?
730        }
731    }
732
733    fn write_project(
734        &mut self,
735        project_id: ProjectId,
736        connection_id: ConnectionId,
737    ) -> Result<&mut Project> {
738        let project = self
739            .projects
740            .get_mut(&project_id)
741            .ok_or_else(|| anyhow!("no such project"))?;
742        if project.host_connection_id == connection_id
743            || project.guests.contains_key(&connection_id)
744        {
745            Ok(project)
746        } else {
747            Err(anyhow!("no such project"))?
748        }
749    }
750
751    #[cfg(test)]
752    pub fn check_invariants(&self) {
753        for (connection_id, connection) in &self.connections {
754            for project_id in &connection.projects {
755                let project = &self.projects.get(&project_id).unwrap();
756                if project.host_connection_id != *connection_id {
757                    assert!(project.guests.contains_key(connection_id));
758                }
759
760                for (worktree_id, worktree) in project.worktrees.iter() {
761                    let mut paths = HashMap::default();
762                    for entry in worktree.entries.values() {
763                        let prev_entry = paths.insert(&entry.path, entry);
764                        assert_eq!(
765                            prev_entry,
766                            None,
767                            "worktree {:?}, duplicate path for entries {:?} and {:?}",
768                            worktree_id,
769                            prev_entry.unwrap(),
770                            entry
771                        );
772                    }
773                }
774            }
775            for channel_id in &connection.channels {
776                let channel = self.channels.get(channel_id).unwrap();
777                assert!(channel.connection_ids.contains(connection_id));
778            }
779            assert!(self
780                .connections_by_user_id
781                .get(&connection.user_id)
782                .unwrap()
783                .contains(connection_id));
784        }
785
786        for (user_id, connection_ids) in &self.connections_by_user_id {
787            for connection_id in connection_ids {
788                assert_eq!(
789                    self.connections.get(connection_id).unwrap().user_id,
790                    *user_id
791                );
792            }
793        }
794
795        for (project_id, project) in &self.projects {
796            let host_connection = self.connections.get(&project.host_connection_id).unwrap();
797            assert!(host_connection.projects.contains(project_id));
798
799            for guest_connection_id in project.guests.keys() {
800                let guest_connection = self.connections.get(guest_connection_id).unwrap();
801                assert!(guest_connection.projects.contains(project_id));
802            }
803            assert_eq!(project.active_replica_ids.len(), project.guests.len(),);
804            assert_eq!(
805                project.active_replica_ids,
806                project
807                    .guests
808                    .values()
809                    .map(|guest| guest.replica_id)
810                    .collect::<HashSet<_>>(),
811            );
812        }
813
814        for (channel_id, channel) in &self.channels {
815            for connection_id in &channel.connection_ids {
816                let connection = self.connections.get(connection_id).unwrap();
817                assert!(connection.channels.contains(channel_id));
818            }
819        }
820    }
821}
822
823impl Project {
824    fn is_active_since(&self, start_time: OffsetDateTime) -> bool {
825        self.guests
826            .values()
827            .chain([&self.host])
828            .any(|collaborator| {
829                collaborator
830                    .last_activity
831                    .map_or(false, |active_time| active_time > start_time)
832            })
833    }
834
835    pub fn guest_connection_ids(&self) -> Vec<ConnectionId> {
836        self.guests.keys().copied().collect()
837    }
838
839    pub fn connection_ids(&self) -> Vec<ConnectionId> {
840        self.guests
841            .keys()
842            .copied()
843            .chain(Some(self.host_connection_id))
844            .collect()
845    }
846}
847
848impl Channel {
849    fn connection_ids(&self) -> Vec<ConnectionId> {
850        self.connection_ids.iter().copied().collect()
851    }
852}
853
854fn extension_for_entry(entry: &proto::Entry) -> Option<&str> {
855    str::from_utf8(&entry.path)
856        .ok()
857        .map(Path::new)
858        .and_then(|p| p.extension())
859        .and_then(|e| e.to_str())
860}