store.rs

  1use crate::db::{self, ChannelId, ProjectId, UserId};
  2use anyhow::{anyhow, Result};
  3use collections::{btree_map, hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet};
  4use rpc::{proto, ConnectionId, Receipt};
  5use serde::Serialize;
  6use std::{
  7    mem,
  8    path::{Path, PathBuf},
  9    str,
 10    time::Duration,
 11};
 12use time::OffsetDateTime;
 13use tracing::instrument;
 14
 15#[derive(Default, Serialize)]
 16pub struct Store {
 17    connections: BTreeMap<ConnectionId, ConnectionState>,
 18    connections_by_user_id: BTreeMap<UserId, HashSet<ConnectionId>>,
 19    projects: BTreeMap<ProjectId, Project>,
 20    #[serde(skip)]
 21    channels: BTreeMap<ChannelId, Channel>,
 22}
 23
 24#[derive(Serialize)]
 25struct ConnectionState {
 26    user_id: UserId,
 27    admin: bool,
 28    projects: BTreeSet<ProjectId>,
 29    requested_projects: HashSet<ProjectId>,
 30    channels: HashSet<ChannelId>,
 31}
 32
 33#[derive(Serialize)]
 34pub struct Project {
 35    pub host_connection_id: ConnectionId,
 36    pub host: Collaborator,
 37    pub guests: HashMap<ConnectionId, Collaborator>,
 38    #[serde(skip)]
 39    pub join_requests: HashMap<UserId, Vec<Receipt<proto::JoinProject>>>,
 40    pub active_replica_ids: HashSet<ReplicaId>,
 41    pub worktrees: BTreeMap<u64, Worktree>,
 42    pub language_servers: Vec<proto::LanguageServer>,
 43}
 44
 45#[derive(Serialize)]
 46pub struct Collaborator {
 47    pub replica_id: ReplicaId,
 48    pub user_id: UserId,
 49    #[serde(skip)]
 50    pub last_activity: Option<OffsetDateTime>,
 51    pub admin: bool,
 52}
 53
 54#[derive(Default, Serialize)]
 55pub struct Worktree {
 56    pub root_name: String,
 57    pub visible: bool,
 58    #[serde(skip)]
 59    pub entries: BTreeMap<u64, proto::Entry>,
 60    #[serde(skip)]
 61    pub extension_counts: HashMap<String, usize>,
 62    #[serde(skip)]
 63    pub diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
 64    pub scan_id: u64,
 65    pub is_complete: bool,
 66}
 67
 68#[derive(Default)]
 69pub struct Channel {
 70    pub connection_ids: HashSet<ConnectionId>,
 71}
 72
 73pub type ReplicaId = u16;
 74
 75#[derive(Default)]
 76pub struct RemovedConnectionState {
 77    pub user_id: UserId,
 78    pub hosted_projects: HashMap<ProjectId, Project>,
 79    pub guest_project_ids: HashSet<ProjectId>,
 80    pub contact_ids: HashSet<UserId>,
 81}
 82
 83pub struct LeftProject {
 84    pub host_user_id: UserId,
 85    pub host_connection_id: ConnectionId,
 86    pub connection_ids: Vec<ConnectionId>,
 87    pub remove_collaborator: bool,
 88    pub cancel_request: Option<UserId>,
 89    pub unshare: bool,
 90}
 91
 92#[derive(Copy, Clone)]
 93pub struct Metrics {
 94    pub connections: usize,
 95    pub registered_projects: usize,
 96    pub active_projects: usize,
 97    pub shared_projects: usize,
 98}
 99
100impl Store {
101    pub fn metrics(&self) -> Metrics {
102        const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60);
103        let active_window_start = OffsetDateTime::now_utc() - ACTIVE_PROJECT_TIMEOUT;
104
105        let connections = self.connections.values().filter(|c| !c.admin).count();
106        let mut registered_projects = 0;
107        let mut active_projects = 0;
108        let mut shared_projects = 0;
109        for project in self.projects.values() {
110            if let Some(connection) = self.connections.get(&project.host_connection_id) {
111                if !connection.admin {
112                    registered_projects += 1;
113                    if project.is_active_since(active_window_start) {
114                        active_projects += 1;
115                        if !project.guests.is_empty() {
116                            shared_projects += 1;
117                        }
118                    }
119                }
120            }
121        }
122
123        Metrics {
124            connections,
125            registered_projects,
126            active_projects,
127            shared_projects,
128        }
129    }
130
131    #[instrument(skip(self))]
132    pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId, admin: bool) {
133        self.connections.insert(
134            connection_id,
135            ConnectionState {
136                user_id,
137                admin,
138                projects: Default::default(),
139                requested_projects: Default::default(),
140                channels: Default::default(),
141            },
142        );
143        self.connections_by_user_id
144            .entry(user_id)
145            .or_default()
146            .insert(connection_id);
147    }
148
149    #[instrument(skip(self))]
150    pub fn remove_connection(
151        &mut self,
152        connection_id: ConnectionId,
153    ) -> Result<RemovedConnectionState> {
154        let connection = self
155            .connections
156            .get_mut(&connection_id)
157            .ok_or_else(|| anyhow!("no such connection"))?;
158
159        let user_id = connection.user_id;
160        let connection_projects = mem::take(&mut connection.projects);
161        let connection_channels = mem::take(&mut connection.channels);
162
163        let mut result = RemovedConnectionState::default();
164        result.user_id = user_id;
165
166        // Leave all channels.
167        for channel_id in connection_channels {
168            self.leave_channel(connection_id, channel_id);
169        }
170
171        // Unregister and leave all projects.
172        for project_id in connection_projects {
173            if let Ok(project) = self.unregister_project(project_id, connection_id) {
174                result.hosted_projects.insert(project_id, project);
175            } else if self.leave_project(connection_id, project_id).is_ok() {
176                result.guest_project_ids.insert(project_id);
177            }
178        }
179
180        let user_connections = self.connections_by_user_id.get_mut(&user_id).unwrap();
181        user_connections.remove(&connection_id);
182        if user_connections.is_empty() {
183            self.connections_by_user_id.remove(&user_id);
184        }
185
186        self.connections.remove(&connection_id).unwrap();
187
188        Ok(result)
189    }
190
191    #[cfg(test)]
192    pub fn channel(&self, id: ChannelId) -> Option<&Channel> {
193        self.channels.get(&id)
194    }
195
196    pub fn join_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
197        if let Some(connection) = self.connections.get_mut(&connection_id) {
198            connection.channels.insert(channel_id);
199            self.channels
200                .entry(channel_id)
201                .or_default()
202                .connection_ids
203                .insert(connection_id);
204        }
205    }
206
207    pub fn leave_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
208        if let Some(connection) = self.connections.get_mut(&connection_id) {
209            connection.channels.remove(&channel_id);
210            if let btree_map::Entry::Occupied(mut entry) = self.channels.entry(channel_id) {
211                entry.get_mut().connection_ids.remove(&connection_id);
212                if entry.get_mut().connection_ids.is_empty() {
213                    entry.remove();
214                }
215            }
216        }
217    }
218
219    pub fn user_id_for_connection(&self, connection_id: ConnectionId) -> Result<UserId> {
220        Ok(self
221            .connections
222            .get(&connection_id)
223            .ok_or_else(|| anyhow!("unknown connection"))?
224            .user_id)
225    }
226
227    pub fn connection_ids_for_user<'a>(
228        &'a self,
229        user_id: UserId,
230    ) -> impl 'a + Iterator<Item = ConnectionId> {
231        self.connections_by_user_id
232            .get(&user_id)
233            .into_iter()
234            .flatten()
235            .copied()
236    }
237
238    pub fn is_user_online(&self, user_id: UserId) -> bool {
239        !self
240            .connections_by_user_id
241            .get(&user_id)
242            .unwrap_or(&Default::default())
243            .is_empty()
244    }
245
246    pub fn build_initial_contacts_update(
247        &self,
248        contacts: Vec<db::Contact>,
249    ) -> proto::UpdateContacts {
250        let mut update = proto::UpdateContacts::default();
251
252        for contact in contacts {
253            match contact {
254                db::Contact::Accepted {
255                    user_id,
256                    should_notify,
257                } => {
258                    update
259                        .contacts
260                        .push(self.contact_for_user(user_id, should_notify));
261                }
262                db::Contact::Outgoing { user_id } => {
263                    update.outgoing_requests.push(user_id.to_proto())
264                }
265                db::Contact::Incoming {
266                    user_id,
267                    should_notify,
268                } => update
269                    .incoming_requests
270                    .push(proto::IncomingContactRequest {
271                        requester_id: user_id.to_proto(),
272                        should_notify,
273                    }),
274            }
275        }
276
277        update
278    }
279
280    pub fn contact_for_user(&self, user_id: UserId, should_notify: bool) -> proto::Contact {
281        proto::Contact {
282            user_id: user_id.to_proto(),
283            projects: self.project_metadata_for_user(user_id),
284            online: self.is_user_online(user_id),
285            should_notify,
286        }
287    }
288
289    pub fn project_metadata_for_user(&self, user_id: UserId) -> Vec<proto::ProjectMetadata> {
290        let connection_ids = self.connections_by_user_id.get(&user_id);
291        let project_ids = connection_ids.iter().flat_map(|connection_ids| {
292            connection_ids
293                .iter()
294                .filter_map(|connection_id| self.connections.get(connection_id))
295                .flat_map(|connection| connection.projects.iter().copied())
296        });
297
298        let mut metadata = Vec::new();
299        for project_id in project_ids {
300            if let Some(project) = self.projects.get(&project_id) {
301                if project.host.user_id == user_id {
302                    metadata.push(proto::ProjectMetadata {
303                        id: project_id.to_proto(),
304                        visible_worktree_root_names: project
305                            .worktrees
306                            .values()
307                            .filter(|worktree| worktree.visible)
308                            .map(|worktree| worktree.root_name.clone())
309                            .collect(),
310                        guests: project
311                            .guests
312                            .values()
313                            .map(|guest| guest.user_id.to_proto())
314                            .collect(),
315                    });
316                }
317            }
318        }
319
320        metadata
321    }
322
323    pub fn register_project(
324        &mut self,
325        host_connection_id: ConnectionId,
326        project_id: ProjectId,
327    ) -> Result<()> {
328        let connection = self
329            .connections
330            .get_mut(&host_connection_id)
331            .ok_or_else(|| anyhow!("no such connection"))?;
332        connection.projects.insert(project_id);
333        self.projects.insert(
334            project_id,
335            Project {
336                host_connection_id,
337                host: Collaborator {
338                    user_id: connection.user_id,
339                    replica_id: 0,
340                    last_activity: None,
341                    admin: connection.admin,
342                },
343                guests: Default::default(),
344                join_requests: Default::default(),
345                active_replica_ids: Default::default(),
346                worktrees: Default::default(),
347                language_servers: Default::default(),
348            },
349        );
350        Ok(())
351    }
352
353    pub fn update_project(
354        &mut self,
355        project_id: ProjectId,
356        worktrees: &[proto::WorktreeMetadata],
357        connection_id: ConnectionId,
358    ) -> Result<()> {
359        let project = self
360            .projects
361            .get_mut(&project_id)
362            .ok_or_else(|| anyhow!("no such project"))?;
363        if project.host_connection_id == connection_id {
364            let mut old_worktrees = mem::take(&mut project.worktrees);
365            for worktree in worktrees {
366                if let Some(old_worktree) = old_worktrees.remove(&worktree.id) {
367                    project.worktrees.insert(worktree.id, old_worktree);
368                } else {
369                    project.worktrees.insert(
370                        worktree.id,
371                        Worktree {
372                            root_name: worktree.root_name.clone(),
373                            visible: worktree.visible,
374                            ..Default::default()
375                        },
376                    );
377                }
378            }
379            Ok(())
380        } else {
381            Err(anyhow!("no such project"))?
382        }
383    }
384
385    pub fn unregister_project(
386        &mut self,
387        project_id: ProjectId,
388        connection_id: ConnectionId,
389    ) -> Result<Project> {
390        match self.projects.entry(project_id) {
391            btree_map::Entry::Occupied(e) => {
392                if e.get().host_connection_id == connection_id {
393                    let project = e.remove();
394
395                    if let Some(host_connection) = self.connections.get_mut(&connection_id) {
396                        host_connection.projects.remove(&project_id);
397                    }
398
399                    for guest_connection in project.guests.keys() {
400                        if let Some(connection) = self.connections.get_mut(&guest_connection) {
401                            connection.projects.remove(&project_id);
402                        }
403                    }
404
405                    for requester_user_id in project.join_requests.keys() {
406                        if let Some(requester_connection_ids) =
407                            self.connections_by_user_id.get_mut(&requester_user_id)
408                        {
409                            for requester_connection_id in requester_connection_ids.iter() {
410                                if let Some(requester_connection) =
411                                    self.connections.get_mut(requester_connection_id)
412                                {
413                                    requester_connection.requested_projects.remove(&project_id);
414                                }
415                            }
416                        }
417                    }
418
419                    Ok(project)
420                } else {
421                    Err(anyhow!("no such project"))?
422                }
423            }
424            btree_map::Entry::Vacant(_) => Err(anyhow!("no such project"))?,
425        }
426    }
427
428    pub fn update_diagnostic_summary(
429        &mut self,
430        project_id: ProjectId,
431        worktree_id: u64,
432        connection_id: ConnectionId,
433        summary: proto::DiagnosticSummary,
434    ) -> Result<Vec<ConnectionId>> {
435        let project = self
436            .projects
437            .get_mut(&project_id)
438            .ok_or_else(|| anyhow!("no such project"))?;
439        if project.host_connection_id == connection_id {
440            let worktree = project
441                .worktrees
442                .get_mut(&worktree_id)
443                .ok_or_else(|| anyhow!("no such worktree"))?;
444            worktree
445                .diagnostic_summaries
446                .insert(summary.path.clone().into(), summary);
447            return Ok(project.connection_ids());
448        }
449
450        Err(anyhow!("no such worktree"))?
451    }
452
453    pub fn start_language_server(
454        &mut self,
455        project_id: ProjectId,
456        connection_id: ConnectionId,
457        language_server: proto::LanguageServer,
458    ) -> Result<Vec<ConnectionId>> {
459        let project = self
460            .projects
461            .get_mut(&project_id)
462            .ok_or_else(|| anyhow!("no such project"))?;
463        if project.host_connection_id == connection_id {
464            project.language_servers.push(language_server);
465            return Ok(project.connection_ids());
466        }
467
468        Err(anyhow!("no such project"))?
469    }
470
471    pub fn request_join_project(
472        &mut self,
473        requester_id: UserId,
474        project_id: ProjectId,
475        receipt: Receipt<proto::JoinProject>,
476    ) -> Result<()> {
477        let connection = self
478            .connections
479            .get_mut(&receipt.sender_id)
480            .ok_or_else(|| anyhow!("no such connection"))?;
481        let project = self
482            .projects
483            .get_mut(&project_id)
484            .ok_or_else(|| anyhow!("no such project"))?;
485        connection.requested_projects.insert(project_id);
486        project
487            .join_requests
488            .entry(requester_id)
489            .or_default()
490            .push(receipt);
491        Ok(())
492    }
493
494    pub fn deny_join_project_request(
495        &mut self,
496        responder_connection_id: ConnectionId,
497        requester_id: UserId,
498        project_id: ProjectId,
499    ) -> Option<Vec<Receipt<proto::JoinProject>>> {
500        let project = self.projects.get_mut(&project_id)?;
501        if responder_connection_id != project.host_connection_id {
502            return None;
503        }
504
505        let receipts = project.join_requests.remove(&requester_id)?;
506        for receipt in &receipts {
507            let requester_connection = self.connections.get_mut(&receipt.sender_id)?;
508            requester_connection.requested_projects.remove(&project_id);
509        }
510        project.host.last_activity = Some(OffsetDateTime::now_utc());
511
512        Some(receipts)
513    }
514
515    pub fn accept_join_project_request(
516        &mut self,
517        responder_connection_id: ConnectionId,
518        requester_id: UserId,
519        project_id: ProjectId,
520    ) -> Option<(Vec<(Receipt<proto::JoinProject>, ReplicaId)>, &Project)> {
521        let project = self.projects.get_mut(&project_id)?;
522        if responder_connection_id != project.host_connection_id {
523            return None;
524        }
525
526        let receipts = project.join_requests.remove(&requester_id)?;
527        let mut receipts_with_replica_ids = Vec::new();
528        for receipt in receipts {
529            let requester_connection = self.connections.get_mut(&receipt.sender_id)?;
530            requester_connection.requested_projects.remove(&project_id);
531            requester_connection.projects.insert(project_id);
532            let mut replica_id = 1;
533            while project.active_replica_ids.contains(&replica_id) {
534                replica_id += 1;
535            }
536            project.active_replica_ids.insert(replica_id);
537            project.guests.insert(
538                receipt.sender_id,
539                Collaborator {
540                    replica_id,
541                    user_id: requester_id,
542                    last_activity: Some(OffsetDateTime::now_utc()),
543                    admin: requester_connection.admin,
544                },
545            );
546            receipts_with_replica_ids.push((receipt, replica_id));
547        }
548
549        project.host.last_activity = Some(OffsetDateTime::now_utc());
550        Some((receipts_with_replica_ids, project))
551    }
552
553    pub fn leave_project(
554        &mut self,
555        connection_id: ConnectionId,
556        project_id: ProjectId,
557    ) -> Result<LeftProject> {
558        let user_id = self.user_id_for_connection(connection_id)?;
559        let project = self
560            .projects
561            .get_mut(&project_id)
562            .ok_or_else(|| anyhow!("no such project"))?;
563
564        // If the connection leaving the project is a collaborator, remove it.
565        let remove_collaborator = if let Some(guest) = project.guests.remove(&connection_id) {
566            project.active_replica_ids.remove(&guest.replica_id);
567            true
568        } else {
569            false
570        };
571
572        // If the connection leaving the project has a pending request, remove it.
573        // If that user has no other pending requests on other connections, indicate that the request should be cancelled.
574        let mut cancel_request = None;
575        if let Entry::Occupied(mut entry) = project.join_requests.entry(user_id) {
576            entry
577                .get_mut()
578                .retain(|receipt| receipt.sender_id != connection_id);
579            if entry.get().is_empty() {
580                entry.remove();
581                cancel_request = Some(user_id);
582            }
583        }
584
585        if let Some(connection) = self.connections.get_mut(&connection_id) {
586            connection.projects.remove(&project_id);
587        }
588
589        let connection_ids = project.connection_ids();
590        let unshare = connection_ids.len() <= 1 && project.join_requests.is_empty();
591        if unshare {
592            project.language_servers.clear();
593            for worktree in project.worktrees.values_mut() {
594                worktree.diagnostic_summaries.clear();
595                worktree.entries.clear();
596                worktree.extension_counts.clear();
597            }
598        }
599
600        Ok(LeftProject {
601            host_connection_id: project.host_connection_id,
602            host_user_id: project.host.user_id,
603            connection_ids,
604            cancel_request,
605            unshare,
606            remove_collaborator,
607        })
608    }
609
610    pub fn update_worktree(
611        &mut self,
612        connection_id: ConnectionId,
613        project_id: ProjectId,
614        worktree_id: u64,
615        worktree_root_name: &str,
616        removed_entries: &[u64],
617        updated_entries: &[proto::Entry],
618        scan_id: u64,
619        is_last_update: bool,
620    ) -> Result<(Vec<ConnectionId>, bool, HashMap<String, usize>)> {
621        let project = self.write_project(project_id, connection_id)?;
622        let connection_ids = project.connection_ids();
623        let mut worktree = project.worktrees.entry(worktree_id).or_default();
624        let metadata_changed = worktree_root_name != worktree.root_name;
625        worktree.root_name = worktree_root_name.to_string();
626
627        for entry_id in removed_entries {
628            if let Some(entry) = worktree.entries.remove(&entry_id) {
629                if !entry.is_ignored {
630                    if let Some(extension) = extension_for_entry(&entry) {
631                        if let Some(count) = worktree.extension_counts.get_mut(extension) {
632                            *count = count.saturating_sub(1);
633                        }
634                    }
635                }
636            }
637        }
638
639        for entry in updated_entries {
640            if let Some(old_entry) = worktree.entries.insert(entry.id, entry.clone()) {
641                if !old_entry.is_ignored {
642                    if let Some(extension) = extension_for_entry(&old_entry) {
643                        if let Some(count) = worktree.extension_counts.get_mut(extension) {
644                            *count = count.saturating_sub(1);
645                        }
646                    }
647                }
648            }
649
650            if !entry.is_ignored {
651                if let Some(extension) = extension_for_entry(&entry) {
652                    if let Some(count) = worktree.extension_counts.get_mut(extension) {
653                        *count += 1;
654                    } else {
655                        worktree.extension_counts.insert(extension.into(), 1);
656                    }
657                }
658            }
659        }
660
661        worktree.scan_id = scan_id;
662        worktree.is_complete = is_last_update;
663        Ok((
664            connection_ids,
665            metadata_changed,
666            worktree.extension_counts.clone(),
667        ))
668    }
669
670    pub fn project_connection_ids(
671        &self,
672        project_id: ProjectId,
673        acting_connection_id: ConnectionId,
674    ) -> Result<Vec<ConnectionId>> {
675        Ok(self
676            .read_project(project_id, acting_connection_id)?
677            .connection_ids())
678    }
679
680    pub fn channel_connection_ids(&self, channel_id: ChannelId) -> Result<Vec<ConnectionId>> {
681        Ok(self
682            .channels
683            .get(&channel_id)
684            .ok_or_else(|| anyhow!("no such channel"))?
685            .connection_ids())
686    }
687
688    pub fn project(&self, project_id: ProjectId) -> Result<&Project> {
689        self.projects
690            .get(&project_id)
691            .ok_or_else(|| anyhow!("no such project"))
692    }
693
694    pub fn register_project_activity(
695        &mut self,
696        project_id: ProjectId,
697        connection_id: ConnectionId,
698    ) -> Result<()> {
699        let project = self
700            .projects
701            .get_mut(&project_id)
702            .ok_or_else(|| anyhow!("no such project"))?;
703        let collaborator = if connection_id == project.host_connection_id {
704            &mut project.host
705        } else if let Some(guest) = project.guests.get_mut(&connection_id) {
706            guest
707        } else {
708            return Err(anyhow!("no such project"))?;
709        };
710        collaborator.last_activity = Some(OffsetDateTime::now_utc());
711        Ok(())
712    }
713
714    pub fn projects(&self) -> impl Iterator<Item = (&ProjectId, &Project)> {
715        self.projects.iter()
716    }
717
718    pub fn read_project(
719        &self,
720        project_id: ProjectId,
721        connection_id: ConnectionId,
722    ) -> Result<&Project> {
723        let project = self
724            .projects
725            .get(&project_id)
726            .ok_or_else(|| anyhow!("no such project"))?;
727        if project.host_connection_id == connection_id
728            || project.guests.contains_key(&connection_id)
729        {
730            Ok(project)
731        } else {
732            Err(anyhow!("no such project"))?
733        }
734    }
735
736    fn write_project(
737        &mut self,
738        project_id: ProjectId,
739        connection_id: ConnectionId,
740    ) -> Result<&mut Project> {
741        let project = self
742            .projects
743            .get_mut(&project_id)
744            .ok_or_else(|| anyhow!("no such project"))?;
745        if project.host_connection_id == connection_id
746            || project.guests.contains_key(&connection_id)
747        {
748            Ok(project)
749        } else {
750            Err(anyhow!("no such project"))?
751        }
752    }
753
754    #[cfg(test)]
755    pub fn check_invariants(&self) {
756        for (connection_id, connection) in &self.connections {
757            for project_id in &connection.projects {
758                let project = &self.projects.get(&project_id).unwrap();
759                if project.host_connection_id != *connection_id {
760                    assert!(project.guests.contains_key(connection_id));
761                }
762
763                for (worktree_id, worktree) in project.worktrees.iter() {
764                    let mut paths = HashMap::default();
765                    for entry in worktree.entries.values() {
766                        let prev_entry = paths.insert(&entry.path, entry);
767                        assert_eq!(
768                            prev_entry,
769                            None,
770                            "worktree {:?}, duplicate path for entries {:?} and {:?}",
771                            worktree_id,
772                            prev_entry.unwrap(),
773                            entry
774                        );
775                    }
776                }
777            }
778            for channel_id in &connection.channels {
779                let channel = self.channels.get(channel_id).unwrap();
780                assert!(channel.connection_ids.contains(connection_id));
781            }
782            assert!(self
783                .connections_by_user_id
784                .get(&connection.user_id)
785                .unwrap()
786                .contains(connection_id));
787        }
788
789        for (user_id, connection_ids) in &self.connections_by_user_id {
790            for connection_id in connection_ids {
791                assert_eq!(
792                    self.connections.get(connection_id).unwrap().user_id,
793                    *user_id
794                );
795            }
796        }
797
798        for (project_id, project) in &self.projects {
799            let host_connection = self.connections.get(&project.host_connection_id).unwrap();
800            assert!(host_connection.projects.contains(project_id));
801
802            for guest_connection_id in project.guests.keys() {
803                let guest_connection = self.connections.get(guest_connection_id).unwrap();
804                assert!(guest_connection.projects.contains(project_id));
805            }
806            assert_eq!(project.active_replica_ids.len(), project.guests.len(),);
807            assert_eq!(
808                project.active_replica_ids,
809                project
810                    .guests
811                    .values()
812                    .map(|guest| guest.replica_id)
813                    .collect::<HashSet<_>>(),
814            );
815        }
816
817        for (channel_id, channel) in &self.channels {
818            for connection_id in &channel.connection_ids {
819                let connection = self.connections.get(connection_id).unwrap();
820                assert!(connection.channels.contains(channel_id));
821            }
822        }
823    }
824}
825
826impl Project {
827    fn is_active_since(&self, start_time: OffsetDateTime) -> bool {
828        self.guests
829            .values()
830            .chain([&self.host])
831            .any(|collaborator| {
832                collaborator
833                    .last_activity
834                    .map_or(false, |active_time| active_time > start_time)
835            })
836    }
837
838    pub fn guest_connection_ids(&self) -> Vec<ConnectionId> {
839        self.guests.keys().copied().collect()
840    }
841
842    pub fn connection_ids(&self) -> Vec<ConnectionId> {
843        self.guests
844            .keys()
845            .copied()
846            .chain(Some(self.host_connection_id))
847            .collect()
848    }
849}
850
851impl Channel {
852    fn connection_ids(&self) -> Vec<ConnectionId> {
853        self.connection_ids.iter().copied().collect()
854    }
855}
856
857fn extension_for_entry(entry: &proto::Entry) -> Option<&str> {
858    str::from_utf8(&entry.path)
859        .ok()
860        .map(Path::new)
861        .and_then(|p| p.extension())
862        .and_then(|e| e.to_str())
863}