store.rs

  1use crate::db::{self, ChannelId, ProjectId, UserId};
  2use anyhow::{anyhow, Result};
  3use collections::{
  4    btree_map,
  5    hash_map::{self, Entry},
  6    BTreeMap, BTreeSet, HashMap, HashSet,
  7};
  8use rpc::{proto, ConnectionId, Receipt};
  9use serde::Serialize;
 10use std::{
 11    mem,
 12    path::{Path, PathBuf},
 13    str,
 14    time::Duration,
 15};
 16use time::OffsetDateTime;
 17use tracing::instrument;
 18
 19#[derive(Default, Serialize)]
 20pub struct Store {
 21    connections: HashMap<ConnectionId, ConnectionState>,
 22    connections_by_user_id: HashMap<UserId, HashSet<ConnectionId>>,
 23    projects: BTreeMap<ProjectId, Project>,
 24    #[serde(skip)]
 25    channels: HashMap<ChannelId, Channel>,
 26}
 27
 28#[derive(Serialize)]
 29struct ConnectionState {
 30    user_id: UserId,
 31    admin: bool,
 32    projects: BTreeSet<ProjectId>,
 33    requested_projects: HashSet<ProjectId>,
 34    channels: HashSet<ChannelId>,
 35}
 36
 37#[derive(Serialize)]
 38pub struct Project {
 39    pub host_connection_id: ConnectionId,
 40    pub host: Collaborator,
 41    pub guests: HashMap<ConnectionId, Collaborator>,
 42    #[serde(skip)]
 43    pub join_requests: HashMap<UserId, Vec<Receipt<proto::JoinProject>>>,
 44    pub active_replica_ids: HashSet<ReplicaId>,
 45    pub worktrees: BTreeMap<u64, Worktree>,
 46    pub language_servers: Vec<proto::LanguageServer>,
 47}
 48
 49#[derive(Serialize)]
 50pub struct Collaborator {
 51    pub replica_id: ReplicaId,
 52    pub user_id: UserId,
 53    #[serde(skip)]
 54    pub last_activity: Option<OffsetDateTime>,
 55    pub admin: bool,
 56}
 57
 58#[derive(Default, Serialize)]
 59pub struct Worktree {
 60    pub root_name: String,
 61    pub visible: bool,
 62    #[serde(skip)]
 63    pub entries: HashMap<u64, proto::Entry>,
 64    #[serde(skip)]
 65    pub extension_counts: HashMap<String, usize>,
 66    #[serde(skip)]
 67    pub diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
 68    pub scan_id: u64,
 69}
 70
 71#[derive(Default)]
 72pub struct Channel {
 73    pub connection_ids: HashSet<ConnectionId>,
 74}
 75
 76pub type ReplicaId = u16;
 77
 78#[derive(Default)]
 79pub struct RemovedConnectionState {
 80    pub user_id: UserId,
 81    pub hosted_projects: HashMap<ProjectId, Project>,
 82    pub guest_project_ids: HashSet<ProjectId>,
 83    pub contact_ids: HashSet<UserId>,
 84}
 85
 86pub struct LeftProject {
 87    pub host_user_id: UserId,
 88    pub host_connection_id: ConnectionId,
 89    pub connection_ids: Vec<ConnectionId>,
 90    pub remove_collaborator: bool,
 91    pub cancel_request: Option<UserId>,
 92    pub unshare: bool,
 93}
 94
 95#[derive(Copy, Clone)]
 96pub struct Metrics {
 97    pub connections: usize,
 98    pub registered_projects: usize,
 99    pub active_projects: usize,
100    pub shared_projects: usize,
101}
102
103impl Store {
104    pub fn metrics(&self) -> Metrics {
105        const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60);
106        let active_window_start = OffsetDateTime::now_utc() - ACTIVE_PROJECT_TIMEOUT;
107
108        let connections = self.connections.values().filter(|c| !c.admin).count();
109        let mut registered_projects = 0;
110        let mut active_projects = 0;
111        let mut shared_projects = 0;
112        for project in self.projects.values() {
113            if let Some(connection) = self.connections.get(&project.host_connection_id) {
114                if !connection.admin {
115                    registered_projects += 1;
116                    if project.is_active_since(active_window_start) {
117                        active_projects += 1;
118                        if !project.guests.is_empty() {
119                            shared_projects += 1;
120                        }
121                    }
122                }
123            }
124        }
125
126        Metrics {
127            connections,
128            registered_projects,
129            active_projects,
130            shared_projects,
131        }
132    }
133
134    #[instrument(skip(self))]
135    pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId, admin: bool) {
136        self.connections.insert(
137            connection_id,
138            ConnectionState {
139                user_id,
140                admin,
141                projects: Default::default(),
142                requested_projects: Default::default(),
143                channels: Default::default(),
144            },
145        );
146        self.connections_by_user_id
147            .entry(user_id)
148            .or_default()
149            .insert(connection_id);
150    }
151
152    #[instrument(skip(self))]
153    pub fn remove_connection(
154        &mut self,
155        connection_id: ConnectionId,
156    ) -> Result<RemovedConnectionState> {
157        let connection = self
158            .connections
159            .get_mut(&connection_id)
160            .ok_or_else(|| anyhow!("no such connection"))?;
161
162        let user_id = connection.user_id;
163        let connection_projects = mem::take(&mut connection.projects);
164        let connection_channels = mem::take(&mut connection.channels);
165
166        let mut result = RemovedConnectionState::default();
167        result.user_id = user_id;
168
169        // Leave all channels.
170        for channel_id in connection_channels {
171            self.leave_channel(connection_id, channel_id);
172        }
173
174        // Unregister and leave all projects.
175        for project_id in connection_projects {
176            if let Ok(project) = self.unregister_project(project_id, connection_id) {
177                result.hosted_projects.insert(project_id, project);
178            } else if self.leave_project(connection_id, project_id).is_ok() {
179                result.guest_project_ids.insert(project_id);
180            }
181        }
182
183        let user_connections = self.connections_by_user_id.get_mut(&user_id).unwrap();
184        user_connections.remove(&connection_id);
185        if user_connections.is_empty() {
186            self.connections_by_user_id.remove(&user_id);
187        }
188
189        self.connections.remove(&connection_id).unwrap();
190
191        Ok(result)
192    }
193
194    #[cfg(test)]
195    pub fn channel(&self, id: ChannelId) -> Option<&Channel> {
196        self.channels.get(&id)
197    }
198
199    pub fn join_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
200        if let Some(connection) = self.connections.get_mut(&connection_id) {
201            connection.channels.insert(channel_id);
202            self.channels
203                .entry(channel_id)
204                .or_default()
205                .connection_ids
206                .insert(connection_id);
207        }
208    }
209
210    pub fn leave_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
211        if let Some(connection) = self.connections.get_mut(&connection_id) {
212            connection.channels.remove(&channel_id);
213            if let hash_map::Entry::Occupied(mut entry) = self.channels.entry(channel_id) {
214                entry.get_mut().connection_ids.remove(&connection_id);
215                if entry.get_mut().connection_ids.is_empty() {
216                    entry.remove();
217                }
218            }
219        }
220    }
221
222    pub fn user_id_for_connection(&self, connection_id: ConnectionId) -> Result<UserId> {
223        Ok(self
224            .connections
225            .get(&connection_id)
226            .ok_or_else(|| anyhow!("unknown connection"))?
227            .user_id)
228    }
229
230    pub fn connection_ids_for_user<'a>(
231        &'a self,
232        user_id: UserId,
233    ) -> impl 'a + Iterator<Item = ConnectionId> {
234        self.connections_by_user_id
235            .get(&user_id)
236            .into_iter()
237            .flatten()
238            .copied()
239    }
240
241    pub fn is_user_online(&self, user_id: UserId) -> bool {
242        !self
243            .connections_by_user_id
244            .get(&user_id)
245            .unwrap_or(&Default::default())
246            .is_empty()
247    }
248
249    pub fn build_initial_contacts_update(
250        &self,
251        contacts: Vec<db::Contact>,
252    ) -> proto::UpdateContacts {
253        let mut update = proto::UpdateContacts::default();
254
255        for contact in contacts {
256            match contact {
257                db::Contact::Accepted {
258                    user_id,
259                    should_notify,
260                } => {
261                    update
262                        .contacts
263                        .push(self.contact_for_user(user_id, should_notify));
264                }
265                db::Contact::Outgoing { user_id } => {
266                    update.outgoing_requests.push(user_id.to_proto())
267                }
268                db::Contact::Incoming {
269                    user_id,
270                    should_notify,
271                } => update
272                    .incoming_requests
273                    .push(proto::IncomingContactRequest {
274                        requester_id: user_id.to_proto(),
275                        should_notify,
276                    }),
277            }
278        }
279
280        update
281    }
282
283    pub fn contact_for_user(&self, user_id: UserId, should_notify: bool) -> proto::Contact {
284        proto::Contact {
285            user_id: user_id.to_proto(),
286            projects: self.project_metadata_for_user(user_id),
287            online: self.is_user_online(user_id),
288            should_notify,
289        }
290    }
291
292    pub fn project_metadata_for_user(&self, user_id: UserId) -> Vec<proto::ProjectMetadata> {
293        let connection_ids = self.connections_by_user_id.get(&user_id);
294        let project_ids = connection_ids.iter().flat_map(|connection_ids| {
295            connection_ids
296                .iter()
297                .filter_map(|connection_id| self.connections.get(connection_id))
298                .flat_map(|connection| connection.projects.iter().copied())
299        });
300
301        let mut metadata = Vec::new();
302        for project_id in project_ids {
303            if let Some(project) = self.projects.get(&project_id) {
304                if project.host.user_id == user_id {
305                    metadata.push(proto::ProjectMetadata {
306                        id: project_id.to_proto(),
307                        visible_worktree_root_names: project
308                            .worktrees
309                            .values()
310                            .filter(|worktree| worktree.visible)
311                            .map(|worktree| worktree.root_name.clone())
312                            .collect(),
313                        guests: project
314                            .guests
315                            .values()
316                            .map(|guest| guest.user_id.to_proto())
317                            .collect(),
318                    });
319                }
320            }
321        }
322
323        metadata
324    }
325
326    pub fn register_project(
327        &mut self,
328        host_connection_id: ConnectionId,
329        project_id: ProjectId,
330    ) -> Result<()> {
331        let connection = self
332            .connections
333            .get_mut(&host_connection_id)
334            .ok_or_else(|| anyhow!("no such connection"))?;
335        connection.projects.insert(project_id);
336        self.projects.insert(
337            project_id,
338            Project {
339                host_connection_id,
340                host: Collaborator {
341                    user_id: connection.user_id,
342                    replica_id: 0,
343                    last_activity: None,
344                    admin: connection.admin,
345                },
346                guests: Default::default(),
347                join_requests: Default::default(),
348                active_replica_ids: Default::default(),
349                worktrees: Default::default(),
350                language_servers: Default::default(),
351            },
352        );
353        Ok(())
354    }
355
356    pub fn update_project(
357        &mut self,
358        project_id: ProjectId,
359        worktrees: &[proto::WorktreeMetadata],
360        connection_id: ConnectionId,
361    ) -> Result<()> {
362        let project = self
363            .projects
364            .get_mut(&project_id)
365            .ok_or_else(|| anyhow!("no such project"))?;
366        if project.host_connection_id == connection_id {
367            let mut old_worktrees = mem::take(&mut project.worktrees);
368            for worktree in worktrees {
369                if let Some(old_worktree) = old_worktrees.remove(&worktree.id) {
370                    project.worktrees.insert(worktree.id, old_worktree);
371                } else {
372                    project.worktrees.insert(
373                        worktree.id,
374                        Worktree {
375                            root_name: worktree.root_name.clone(),
376                            visible: worktree.visible,
377                            ..Default::default()
378                        },
379                    );
380                }
381            }
382            Ok(())
383        } else {
384            Err(anyhow!("no such project"))?
385        }
386    }
387
388    pub fn unregister_project(
389        &mut self,
390        project_id: ProjectId,
391        connection_id: ConnectionId,
392    ) -> Result<Project> {
393        match self.projects.entry(project_id) {
394            btree_map::Entry::Occupied(e) => {
395                if e.get().host_connection_id == connection_id {
396                    let project = e.remove();
397
398                    if let Some(host_connection) = self.connections.get_mut(&connection_id) {
399                        host_connection.projects.remove(&project_id);
400                    }
401
402                    for guest_connection in project.guests.keys() {
403                        if let Some(connection) = self.connections.get_mut(&guest_connection) {
404                            connection.projects.remove(&project_id);
405                        }
406                    }
407
408                    for requester_user_id in project.join_requests.keys() {
409                        if let Some(requester_connection_ids) =
410                            self.connections_by_user_id.get_mut(&requester_user_id)
411                        {
412                            for requester_connection_id in requester_connection_ids.iter() {
413                                if let Some(requester_connection) =
414                                    self.connections.get_mut(requester_connection_id)
415                                {
416                                    requester_connection.requested_projects.remove(&project_id);
417                                }
418                            }
419                        }
420                    }
421
422                    Ok(project)
423                } else {
424                    Err(anyhow!("no such project"))?
425                }
426            }
427            btree_map::Entry::Vacant(_) => Err(anyhow!("no such project"))?,
428        }
429    }
430
431    pub fn update_diagnostic_summary(
432        &mut self,
433        project_id: ProjectId,
434        worktree_id: u64,
435        connection_id: ConnectionId,
436        summary: proto::DiagnosticSummary,
437    ) -> Result<Vec<ConnectionId>> {
438        let project = self
439            .projects
440            .get_mut(&project_id)
441            .ok_or_else(|| anyhow!("no such project"))?;
442        if project.host_connection_id == connection_id {
443            let worktree = project
444                .worktrees
445                .get_mut(&worktree_id)
446                .ok_or_else(|| anyhow!("no such worktree"))?;
447            worktree
448                .diagnostic_summaries
449                .insert(summary.path.clone().into(), summary);
450            return Ok(project.connection_ids());
451        }
452
453        Err(anyhow!("no such worktree"))?
454    }
455
456    pub fn start_language_server(
457        &mut self,
458        project_id: ProjectId,
459        connection_id: ConnectionId,
460        language_server: proto::LanguageServer,
461    ) -> Result<Vec<ConnectionId>> {
462        let project = self
463            .projects
464            .get_mut(&project_id)
465            .ok_or_else(|| anyhow!("no such project"))?;
466        if project.host_connection_id == connection_id {
467            project.language_servers.push(language_server);
468            return Ok(project.connection_ids());
469        }
470
471        Err(anyhow!("no such project"))?
472    }
473
474    pub fn request_join_project(
475        &mut self,
476        requester_id: UserId,
477        project_id: ProjectId,
478        receipt: Receipt<proto::JoinProject>,
479    ) -> Result<()> {
480        let connection = self
481            .connections
482            .get_mut(&receipt.sender_id)
483            .ok_or_else(|| anyhow!("no such connection"))?;
484        let project = self
485            .projects
486            .get_mut(&project_id)
487            .ok_or_else(|| anyhow!("no such project"))?;
488        connection.requested_projects.insert(project_id);
489        project
490            .join_requests
491            .entry(requester_id)
492            .or_default()
493            .push(receipt);
494        Ok(())
495    }
496
497    pub fn deny_join_project_request(
498        &mut self,
499        responder_connection_id: ConnectionId,
500        requester_id: UserId,
501        project_id: ProjectId,
502    ) -> Option<Vec<Receipt<proto::JoinProject>>> {
503        let project = self.projects.get_mut(&project_id)?;
504        if responder_connection_id != project.host_connection_id {
505            return None;
506        }
507
508        let receipts = project.join_requests.remove(&requester_id)?;
509        for receipt in &receipts {
510            let requester_connection = self.connections.get_mut(&receipt.sender_id)?;
511            requester_connection.requested_projects.remove(&project_id);
512        }
513        project.host.last_activity = Some(OffsetDateTime::now_utc());
514
515        Some(receipts)
516    }
517
518    pub fn accept_join_project_request(
519        &mut self,
520        responder_connection_id: ConnectionId,
521        requester_id: UserId,
522        project_id: ProjectId,
523    ) -> Option<(Vec<(Receipt<proto::JoinProject>, ReplicaId)>, &Project)> {
524        let project = self.projects.get_mut(&project_id)?;
525        if responder_connection_id != project.host_connection_id {
526            return None;
527        }
528
529        let receipts = project.join_requests.remove(&requester_id)?;
530        let mut receipts_with_replica_ids = Vec::new();
531        for receipt in receipts {
532            let requester_connection = self.connections.get_mut(&receipt.sender_id)?;
533            requester_connection.requested_projects.remove(&project_id);
534            requester_connection.projects.insert(project_id);
535            let mut replica_id = 1;
536            while project.active_replica_ids.contains(&replica_id) {
537                replica_id += 1;
538            }
539            project.active_replica_ids.insert(replica_id);
540            project.guests.insert(
541                receipt.sender_id,
542                Collaborator {
543                    replica_id,
544                    user_id: requester_id,
545                    last_activity: Some(OffsetDateTime::now_utc()),
546                    admin: requester_connection.admin,
547                },
548            );
549            receipts_with_replica_ids.push((receipt, replica_id));
550        }
551
552        project.host.last_activity = Some(OffsetDateTime::now_utc());
553        Some((receipts_with_replica_ids, project))
554    }
555
556    pub fn leave_project(
557        &mut self,
558        connection_id: ConnectionId,
559        project_id: ProjectId,
560    ) -> Result<LeftProject> {
561        let user_id = self.user_id_for_connection(connection_id)?;
562        let project = self
563            .projects
564            .get_mut(&project_id)
565            .ok_or_else(|| anyhow!("no such project"))?;
566
567        // If the connection leaving the project is a collaborator, remove it.
568        let remove_collaborator = if let Some(guest) = project.guests.remove(&connection_id) {
569            project.active_replica_ids.remove(&guest.replica_id);
570            true
571        } else {
572            false
573        };
574
575        // If the connection leaving the project has a pending request, remove it.
576        // If that user has no other pending requests on other connections, indicate that the request should be cancelled.
577        let mut cancel_request = None;
578        if let Entry::Occupied(mut entry) = project.join_requests.entry(user_id) {
579            entry
580                .get_mut()
581                .retain(|receipt| receipt.sender_id != connection_id);
582            if entry.get().is_empty() {
583                entry.remove();
584                cancel_request = Some(user_id);
585            }
586        }
587
588        if let Some(connection) = self.connections.get_mut(&connection_id) {
589            connection.projects.remove(&project_id);
590        }
591
592        let connection_ids = project.connection_ids();
593        let unshare = connection_ids.len() <= 1 && project.join_requests.is_empty();
594        if unshare {
595            project.language_servers.clear();
596            for worktree in project.worktrees.values_mut() {
597                worktree.diagnostic_summaries.clear();
598                worktree.entries.clear();
599            }
600        }
601
602        Ok(LeftProject {
603            host_connection_id: project.host_connection_id,
604            host_user_id: project.host.user_id,
605            connection_ids,
606            cancel_request,
607            unshare,
608            remove_collaborator,
609        })
610    }
611
612    pub fn update_worktree(
613        &mut self,
614        connection_id: ConnectionId,
615        project_id: ProjectId,
616        worktree_id: u64,
617        worktree_root_name: &str,
618        removed_entries: &[u64],
619        updated_entries: &[proto::Entry],
620        scan_id: u64,
621    ) -> Result<(Vec<ConnectionId>, bool, HashMap<String, usize>)> {
622        let project = self.write_project(project_id, connection_id)?;
623        let connection_ids = project.connection_ids();
624        let mut worktree = project.worktrees.entry(worktree_id).or_default();
625        let metadata_changed = worktree_root_name != worktree.root_name;
626        worktree.root_name = worktree_root_name.to_string();
627
628        for entry_id in removed_entries {
629            if let Some(entry) = worktree.entries.remove(&entry_id) {
630                if !entry.is_ignored {
631                    if let Some(extension) = extension_for_entry(&entry) {
632                        if let Some(count) = worktree.extension_counts.get_mut(extension) {
633                            *count = count.saturating_sub(1);
634                        }
635                    }
636                }
637            }
638        }
639
640        for entry in updated_entries {
641            if let Some(old_entry) = worktree.entries.insert(entry.id, entry.clone()) {
642                if !old_entry.is_ignored {
643                    if let Some(extension) = extension_for_entry(&old_entry) {
644                        if let Some(count) = worktree.extension_counts.get_mut(extension) {
645                            *count = count.saturating_sub(1);
646                        }
647                    }
648                }
649            }
650
651            if !entry.is_ignored {
652                if let Some(extension) = extension_for_entry(&entry) {
653                    if let Some(count) = worktree.extension_counts.get_mut(extension) {
654                        *count += 1;
655                    } else {
656                        worktree.extension_counts.insert(extension.into(), 1);
657                    }
658                }
659            }
660        }
661
662        worktree.scan_id = scan_id;
663        Ok((
664            connection_ids,
665            metadata_changed,
666            worktree.extension_counts.clone(),
667        ))
668    }
669
670    pub fn project_connection_ids(
671        &self,
672        project_id: ProjectId,
673        acting_connection_id: ConnectionId,
674    ) -> Result<Vec<ConnectionId>> {
675        Ok(self
676            .read_project(project_id, acting_connection_id)?
677            .connection_ids())
678    }
679
680    pub fn channel_connection_ids(&self, channel_id: ChannelId) -> Result<Vec<ConnectionId>> {
681        Ok(self
682            .channels
683            .get(&channel_id)
684            .ok_or_else(|| anyhow!("no such channel"))?
685            .connection_ids())
686    }
687
688    pub fn project(&self, project_id: ProjectId) -> Result<&Project> {
689        self.projects
690            .get(&project_id)
691            .ok_or_else(|| anyhow!("no such project"))
692    }
693
694    pub fn register_project_activity(
695        &mut self,
696        project_id: ProjectId,
697        connection_id: ConnectionId,
698    ) -> Result<()> {
699        let project = self
700            .projects
701            .get_mut(&project_id)
702            .ok_or_else(|| anyhow!("no such project"))?;
703        let collaborator = if connection_id == project.host_connection_id {
704            &mut project.host
705        } else if let Some(guest) = project.guests.get_mut(&connection_id) {
706            guest
707        } else {
708            return Err(anyhow!("no such project"))?;
709        };
710        collaborator.last_activity = Some(OffsetDateTime::now_utc());
711        Ok(())
712    }
713
714    pub fn projects(&self) -> impl Iterator<Item = (&ProjectId, &Project)> {
715        self.projects.iter()
716    }
717
718    pub fn read_project(
719        &self,
720        project_id: ProjectId,
721        connection_id: ConnectionId,
722    ) -> Result<&Project> {
723        let project = self
724            .projects
725            .get(&project_id)
726            .ok_or_else(|| anyhow!("no such project"))?;
727        if project.host_connection_id == connection_id
728            || project.guests.contains_key(&connection_id)
729        {
730            Ok(project)
731        } else {
732            Err(anyhow!("no such project"))?
733        }
734    }
735
736    fn write_project(
737        &mut self,
738        project_id: ProjectId,
739        connection_id: ConnectionId,
740    ) -> Result<&mut Project> {
741        let project = self
742            .projects
743            .get_mut(&project_id)
744            .ok_or_else(|| anyhow!("no such project"))?;
745        if project.host_connection_id == connection_id
746            || project.guests.contains_key(&connection_id)
747        {
748            Ok(project)
749        } else {
750            Err(anyhow!("no such project"))?
751        }
752    }
753
754    #[cfg(test)]
755    pub fn check_invariants(&self) {
756        for (connection_id, connection) in &self.connections {
757            for project_id in &connection.projects {
758                let project = &self.projects.get(&project_id).unwrap();
759                if project.host_connection_id != *connection_id {
760                    assert!(project.guests.contains_key(connection_id));
761                }
762
763                for (worktree_id, worktree) in project.worktrees.iter() {
764                    let mut paths = HashMap::default();
765                    for entry in worktree.entries.values() {
766                        let prev_entry = paths.insert(&entry.path, entry);
767                        assert_eq!(
768                            prev_entry,
769                            None,
770                            "worktree {:?}, duplicate path for entries {:?} and {:?}",
771                            worktree_id,
772                            prev_entry.unwrap(),
773                            entry
774                        );
775                    }
776                }
777            }
778            for channel_id in &connection.channels {
779                let channel = self.channels.get(channel_id).unwrap();
780                assert!(channel.connection_ids.contains(connection_id));
781            }
782            assert!(self
783                .connections_by_user_id
784                .get(&connection.user_id)
785                .unwrap()
786                .contains(connection_id));
787        }
788
789        for (user_id, connection_ids) in &self.connections_by_user_id {
790            for connection_id in connection_ids {
791                assert_eq!(
792                    self.connections.get(connection_id).unwrap().user_id,
793                    *user_id
794                );
795            }
796        }
797
798        for (project_id, project) in &self.projects {
799            let host_connection = self.connections.get(&project.host_connection_id).unwrap();
800            assert!(host_connection.projects.contains(project_id));
801
802            for guest_connection_id in project.guests.keys() {
803                let guest_connection = self.connections.get(guest_connection_id).unwrap();
804                assert!(guest_connection.projects.contains(project_id));
805            }
806            assert_eq!(project.active_replica_ids.len(), project.guests.len(),);
807            assert_eq!(
808                project.active_replica_ids,
809                project
810                    .guests
811                    .values()
812                    .map(|guest| guest.replica_id)
813                    .collect::<HashSet<_>>(),
814            );
815        }
816
817        for (channel_id, channel) in &self.channels {
818            for connection_id in &channel.connection_ids {
819                let connection = self.connections.get(connection_id).unwrap();
820                assert!(connection.channels.contains(channel_id));
821            }
822        }
823    }
824}
825
826impl Project {
827    fn is_active_since(&self, start_time: OffsetDateTime) -> bool {
828        self.guests
829            .values()
830            .chain([&self.host])
831            .any(|collaborator| {
832                collaborator
833                    .last_activity
834                    .map_or(false, |active_time| active_time > start_time)
835            })
836    }
837
838    pub fn guest_connection_ids(&self) -> Vec<ConnectionId> {
839        self.guests.keys().copied().collect()
840    }
841
842    pub fn connection_ids(&self) -> Vec<ConnectionId> {
843        self.guests
844            .keys()
845            .copied()
846            .chain(Some(self.host_connection_id))
847            .collect()
848    }
849}
850
851impl Channel {
852    fn connection_ids(&self) -> Vec<ConnectionId> {
853        self.connection_ids.iter().copied().collect()
854    }
855}
856
857fn extension_for_entry(entry: &proto::Entry) -> Option<&str> {
858    str::from_utf8(&entry.path)
859        .ok()
860        .map(Path::new)
861        .and_then(|p| p.extension())
862        .and_then(|e| e.to_str())
863}