store.rs

  1use crate::db::{self, ChannelId, ProjectId, UserId};
  2use anyhow::{anyhow, Result};
  3use collections::{btree_map, hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet};
  4use rpc::{proto, ConnectionId, Receipt};
  5use serde::Serialize;
  6use std::{
  7    mem,
  8    path::{Path, PathBuf},
  9    str,
 10    time::Duration,
 11};
 12use time::OffsetDateTime;
 13use tracing::instrument;
 14
 15#[derive(Default, Serialize)]
 16pub struct Store {
 17    connections: BTreeMap<ConnectionId, ConnectionState>,
 18    connections_by_user_id: BTreeMap<UserId, HashSet<ConnectionId>>,
 19    projects: BTreeMap<ProjectId, Project>,
 20    #[serde(skip)]
 21    channels: BTreeMap<ChannelId, Channel>,
 22}
 23
 24#[derive(Serialize)]
 25struct ConnectionState {
 26    user_id: UserId,
 27    admin: bool,
 28    projects: BTreeSet<ProjectId>,
 29    requested_projects: HashSet<ProjectId>,
 30    channels: HashSet<ChannelId>,
 31}
 32
 33#[derive(Serialize)]
 34pub struct Project {
 35    pub online: bool,
 36    pub host_connection_id: ConnectionId,
 37    pub host: Collaborator,
 38    pub guests: HashMap<ConnectionId, Collaborator>,
 39    #[serde(skip)]
 40    pub join_requests: HashMap<UserId, Vec<Receipt<proto::JoinProject>>>,
 41    pub active_replica_ids: HashSet<ReplicaId>,
 42    pub worktrees: BTreeMap<u64, Worktree>,
 43    pub language_servers: Vec<proto::LanguageServer>,
 44}
 45
 46#[derive(Serialize)]
 47pub struct Collaborator {
 48    pub replica_id: ReplicaId,
 49    pub user_id: UserId,
 50    #[serde(skip)]
 51    pub last_activity: Option<OffsetDateTime>,
 52    pub admin: bool,
 53}
 54
 55#[derive(Default, Serialize)]
 56pub struct Worktree {
 57    pub root_name: String,
 58    pub visible: bool,
 59    #[serde(skip)]
 60    pub entries: BTreeMap<u64, proto::Entry>,
 61    #[serde(skip)]
 62    pub extension_counts: HashMap<String, usize>,
 63    #[serde(skip)]
 64    pub diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
 65    pub scan_id: u64,
 66}
 67
 68#[derive(Default)]
 69pub struct Channel {
 70    pub connection_ids: HashSet<ConnectionId>,
 71}
 72
 73pub type ReplicaId = u16;
 74
 75#[derive(Default)]
 76pub struct RemovedConnectionState {
 77    pub user_id: UserId,
 78    pub hosted_projects: HashMap<ProjectId, Project>,
 79    pub guest_project_ids: HashSet<ProjectId>,
 80    pub contact_ids: HashSet<UserId>,
 81}
 82
 83pub struct LeftProject {
 84    pub host_user_id: UserId,
 85    pub host_connection_id: ConnectionId,
 86    pub connection_ids: Vec<ConnectionId>,
 87    pub remove_collaborator: bool,
 88    pub cancel_request: Option<UserId>,
 89    pub unshare: bool,
 90}
 91
 92pub struct UnsharedProject {
 93    pub guests: HashMap<ConnectionId, Collaborator>,
 94    pub pending_join_requests: HashMap<UserId, Vec<Receipt<proto::JoinProject>>>,
 95}
 96
 97#[derive(Copy, Clone)]
 98pub struct Metrics {
 99    pub connections: usize,
100    pub registered_projects: usize,
101    pub active_projects: usize,
102    pub shared_projects: usize,
103}
104
105impl Store {
106    pub fn metrics(&self) -> Metrics {
107        const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60);
108        let active_window_start = OffsetDateTime::now_utc() - ACTIVE_PROJECT_TIMEOUT;
109
110        let connections = self.connections.values().filter(|c| !c.admin).count();
111        let mut registered_projects = 0;
112        let mut active_projects = 0;
113        let mut shared_projects = 0;
114        for project in self.projects.values() {
115            if let Some(connection) = self.connections.get(&project.host_connection_id) {
116                if !connection.admin {
117                    registered_projects += 1;
118                    if project.is_active_since(active_window_start) {
119                        active_projects += 1;
120                        if !project.guests.is_empty() {
121                            shared_projects += 1;
122                        }
123                    }
124                }
125            }
126        }
127
128        Metrics {
129            connections,
130            registered_projects,
131            active_projects,
132            shared_projects,
133        }
134    }
135
136    #[instrument(skip(self))]
137    pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId, admin: bool) {
138        self.connections.insert(
139            connection_id,
140            ConnectionState {
141                user_id,
142                admin,
143                projects: Default::default(),
144                requested_projects: Default::default(),
145                channels: Default::default(),
146            },
147        );
148        self.connections_by_user_id
149            .entry(user_id)
150            .or_default()
151            .insert(connection_id);
152    }
153
154    #[instrument(skip(self))]
155    pub fn remove_connection(
156        &mut self,
157        connection_id: ConnectionId,
158    ) -> Result<RemovedConnectionState> {
159        let connection = self
160            .connections
161            .get_mut(&connection_id)
162            .ok_or_else(|| anyhow!("no such connection"))?;
163
164        let user_id = connection.user_id;
165        let connection_projects = mem::take(&mut connection.projects);
166        let connection_channels = mem::take(&mut connection.channels);
167
168        let mut result = RemovedConnectionState::default();
169        result.user_id = user_id;
170
171        // Leave all channels.
172        for channel_id in connection_channels {
173            self.leave_channel(connection_id, channel_id);
174        }
175
176        // Unregister and leave all projects.
177        for project_id in connection_projects {
178            if let Ok(project) = self.unregister_project(project_id, connection_id) {
179                result.hosted_projects.insert(project_id, project);
180            } else if self.leave_project(connection_id, project_id).is_ok() {
181                result.guest_project_ids.insert(project_id);
182            }
183        }
184
185        let user_connections = self.connections_by_user_id.get_mut(&user_id).unwrap();
186        user_connections.remove(&connection_id);
187        if user_connections.is_empty() {
188            self.connections_by_user_id.remove(&user_id);
189        }
190
191        self.connections.remove(&connection_id).unwrap();
192
193        Ok(result)
194    }
195
196    #[cfg(test)]
197    pub fn channel(&self, id: ChannelId) -> Option<&Channel> {
198        self.channels.get(&id)
199    }
200
201    pub fn join_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
202        if let Some(connection) = self.connections.get_mut(&connection_id) {
203            connection.channels.insert(channel_id);
204            self.channels
205                .entry(channel_id)
206                .or_default()
207                .connection_ids
208                .insert(connection_id);
209        }
210    }
211
212    pub fn leave_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
213        if let Some(connection) = self.connections.get_mut(&connection_id) {
214            connection.channels.remove(&channel_id);
215            if let btree_map::Entry::Occupied(mut entry) = self.channels.entry(channel_id) {
216                entry.get_mut().connection_ids.remove(&connection_id);
217                if entry.get_mut().connection_ids.is_empty() {
218                    entry.remove();
219                }
220            }
221        }
222    }
223
224    pub fn user_id_for_connection(&self, connection_id: ConnectionId) -> Result<UserId> {
225        Ok(self
226            .connections
227            .get(&connection_id)
228            .ok_or_else(|| anyhow!("unknown connection"))?
229            .user_id)
230    }
231
232    pub fn connection_ids_for_user<'a>(
233        &'a self,
234        user_id: UserId,
235    ) -> impl 'a + Iterator<Item = ConnectionId> {
236        self.connections_by_user_id
237            .get(&user_id)
238            .into_iter()
239            .flatten()
240            .copied()
241    }
242
243    pub fn is_user_online(&self, user_id: UserId) -> bool {
244        !self
245            .connections_by_user_id
246            .get(&user_id)
247            .unwrap_or(&Default::default())
248            .is_empty()
249    }
250
251    pub fn build_initial_contacts_update(
252        &self,
253        contacts: Vec<db::Contact>,
254    ) -> proto::UpdateContacts {
255        let mut update = proto::UpdateContacts::default();
256
257        for contact in contacts {
258            match contact {
259                db::Contact::Accepted {
260                    user_id,
261                    should_notify,
262                } => {
263                    update
264                        .contacts
265                        .push(self.contact_for_user(user_id, should_notify));
266                }
267                db::Contact::Outgoing { user_id } => {
268                    update.outgoing_requests.push(user_id.to_proto())
269                }
270                db::Contact::Incoming {
271                    user_id,
272                    should_notify,
273                } => update
274                    .incoming_requests
275                    .push(proto::IncomingContactRequest {
276                        requester_id: user_id.to_proto(),
277                        should_notify,
278                    }),
279            }
280        }
281
282        update
283    }
284
285    pub fn contact_for_user(&self, user_id: UserId, should_notify: bool) -> proto::Contact {
286        proto::Contact {
287            user_id: user_id.to_proto(),
288            projects: self.project_metadata_for_user(user_id),
289            online: self.is_user_online(user_id),
290            should_notify,
291        }
292    }
293
294    pub fn project_metadata_for_user(&self, user_id: UserId) -> Vec<proto::ProjectMetadata> {
295        let connection_ids = self.connections_by_user_id.get(&user_id);
296        let project_ids = connection_ids.iter().flat_map(|connection_ids| {
297            connection_ids
298                .iter()
299                .filter_map(|connection_id| self.connections.get(connection_id))
300                .flat_map(|connection| connection.projects.iter().copied())
301        });
302
303        let mut metadata = Vec::new();
304        for project_id in project_ids {
305            if let Some(project) = self.projects.get(&project_id) {
306                if project.host.user_id == user_id && project.online {
307                    metadata.push(proto::ProjectMetadata {
308                        id: project_id.to_proto(),
309                        visible_worktree_root_names: project
310                            .worktrees
311                            .values()
312                            .filter(|worktree| worktree.visible)
313                            .map(|worktree| worktree.root_name.clone())
314                            .collect(),
315                        guests: project
316                            .guests
317                            .values()
318                            .map(|guest| guest.user_id.to_proto())
319                            .collect(),
320                    });
321                }
322            }
323        }
324
325        metadata
326    }
327
328    pub fn register_project(
329        &mut self,
330        host_connection_id: ConnectionId,
331        project_id: ProjectId,
332        online: bool,
333    ) -> Result<()> {
334        let connection = self
335            .connections
336            .get_mut(&host_connection_id)
337            .ok_or_else(|| anyhow!("no such connection"))?;
338        connection.projects.insert(project_id);
339        self.projects.insert(
340            project_id,
341            Project {
342                online,
343                host_connection_id,
344                host: Collaborator {
345                    user_id: connection.user_id,
346                    replica_id: 0,
347                    last_activity: None,
348                    admin: connection.admin,
349                },
350                guests: Default::default(),
351                join_requests: Default::default(),
352                active_replica_ids: Default::default(),
353                worktrees: Default::default(),
354                language_servers: Default::default(),
355            },
356        );
357        Ok(())
358    }
359
360    pub fn update_project(
361        &mut self,
362        project_id: ProjectId,
363        worktrees: &[proto::WorktreeMetadata],
364        online: bool,
365        connection_id: ConnectionId,
366    ) -> Result<Option<UnsharedProject>> {
367        let project = self
368            .projects
369            .get_mut(&project_id)
370            .ok_or_else(|| anyhow!("no such project"))?;
371        if project.host_connection_id == connection_id {
372            let mut old_worktrees = mem::take(&mut project.worktrees);
373            for worktree in worktrees {
374                if let Some(old_worktree) = old_worktrees.remove(&worktree.id) {
375                    project.worktrees.insert(worktree.id, old_worktree);
376                } else {
377                    project.worktrees.insert(
378                        worktree.id,
379                        Worktree {
380                            root_name: worktree.root_name.clone(),
381                            visible: worktree.visible,
382                            ..Default::default()
383                        },
384                    );
385                }
386            }
387
388            if online != project.online {
389                project.online = online;
390                if project.online {
391                    Ok(None)
392                } else {
393                    for connection_id in project.guest_connection_ids() {
394                        if let Some(connection) = self.connections.get_mut(&connection_id) {
395                            connection.projects.remove(&project_id);
396                        }
397                    }
398
399                    project.active_replica_ids.clear();
400                    project.language_servers.clear();
401                    for worktree in project.worktrees.values_mut() {
402                        worktree.diagnostic_summaries.clear();
403                        worktree.entries.clear();
404                        worktree.extension_counts.clear();
405                    }
406
407                    Ok(Some(UnsharedProject {
408                        guests: mem::take(&mut project.guests),
409                        pending_join_requests: mem::take(&mut project.join_requests),
410                    }))
411                }
412            } else {
413                Ok(None)
414            }
415        } else {
416            Err(anyhow!("no such project"))?
417        }
418    }
419
420    pub fn unregister_project(
421        &mut self,
422        project_id: ProjectId,
423        connection_id: ConnectionId,
424    ) -> Result<Project> {
425        match self.projects.entry(project_id) {
426            btree_map::Entry::Occupied(e) => {
427                if e.get().host_connection_id == connection_id {
428                    let project = e.remove();
429
430                    if let Some(host_connection) = self.connections.get_mut(&connection_id) {
431                        host_connection.projects.remove(&project_id);
432                    }
433
434                    for guest_connection in project.guests.keys() {
435                        if let Some(connection) = self.connections.get_mut(&guest_connection) {
436                            connection.projects.remove(&project_id);
437                        }
438                    }
439
440                    for requester_user_id in project.join_requests.keys() {
441                        if let Some(requester_connection_ids) =
442                            self.connections_by_user_id.get_mut(&requester_user_id)
443                        {
444                            for requester_connection_id in requester_connection_ids.iter() {
445                                if let Some(requester_connection) =
446                                    self.connections.get_mut(requester_connection_id)
447                                {
448                                    requester_connection.requested_projects.remove(&project_id);
449                                }
450                            }
451                        }
452                    }
453
454                    Ok(project)
455                } else {
456                    Err(anyhow!("no such project"))?
457                }
458            }
459            btree_map::Entry::Vacant(_) => Err(anyhow!("no such project"))?,
460        }
461    }
462
463    pub fn update_diagnostic_summary(
464        &mut self,
465        project_id: ProjectId,
466        worktree_id: u64,
467        connection_id: ConnectionId,
468        summary: proto::DiagnosticSummary,
469    ) -> Result<Vec<ConnectionId>> {
470        let project = self
471            .projects
472            .get_mut(&project_id)
473            .ok_or_else(|| anyhow!("no such project"))?;
474        if project.host_connection_id == connection_id {
475            let worktree = project
476                .worktrees
477                .get_mut(&worktree_id)
478                .ok_or_else(|| anyhow!("no such worktree"))?;
479            worktree
480                .diagnostic_summaries
481                .insert(summary.path.clone().into(), summary);
482            return Ok(project.connection_ids());
483        }
484
485        Err(anyhow!("no such worktree"))?
486    }
487
488    pub fn start_language_server(
489        &mut self,
490        project_id: ProjectId,
491        connection_id: ConnectionId,
492        language_server: proto::LanguageServer,
493    ) -> Result<Vec<ConnectionId>> {
494        let project = self
495            .projects
496            .get_mut(&project_id)
497            .ok_or_else(|| anyhow!("no such project"))?;
498        if project.host_connection_id == connection_id {
499            project.language_servers.push(language_server);
500            return Ok(project.connection_ids());
501        }
502
503        Err(anyhow!("no such project"))?
504    }
505
506    pub fn request_join_project(
507        &mut self,
508        requester_id: UserId,
509        project_id: ProjectId,
510        receipt: Receipt<proto::JoinProject>,
511    ) -> Result<()> {
512        let connection = self
513            .connections
514            .get_mut(&receipt.sender_id)
515            .ok_or_else(|| anyhow!("no such connection"))?;
516        let project = self
517            .projects
518            .get_mut(&project_id)
519            .ok_or_else(|| anyhow!("no such project"))?;
520        if project.online {
521            connection.requested_projects.insert(project_id);
522            project
523                .join_requests
524                .entry(requester_id)
525                .or_default()
526                .push(receipt);
527            Ok(())
528        } else {
529            Err(anyhow!("no such project"))
530        }
531    }
532
533    pub fn deny_join_project_request(
534        &mut self,
535        responder_connection_id: ConnectionId,
536        requester_id: UserId,
537        project_id: ProjectId,
538    ) -> Option<Vec<Receipt<proto::JoinProject>>> {
539        let project = self.projects.get_mut(&project_id)?;
540        if responder_connection_id != project.host_connection_id {
541            return None;
542        }
543
544        let receipts = project.join_requests.remove(&requester_id)?;
545        for receipt in &receipts {
546            let requester_connection = self.connections.get_mut(&receipt.sender_id)?;
547            requester_connection.requested_projects.remove(&project_id);
548        }
549        project.host.last_activity = Some(OffsetDateTime::now_utc());
550
551        Some(receipts)
552    }
553
554    pub fn accept_join_project_request(
555        &mut self,
556        responder_connection_id: ConnectionId,
557        requester_id: UserId,
558        project_id: ProjectId,
559    ) -> Option<(Vec<(Receipt<proto::JoinProject>, ReplicaId)>, &Project)> {
560        let project = self.projects.get_mut(&project_id)?;
561        if responder_connection_id != project.host_connection_id {
562            return None;
563        }
564
565        let receipts = project.join_requests.remove(&requester_id)?;
566        let mut receipts_with_replica_ids = Vec::new();
567        for receipt in receipts {
568            let requester_connection = self.connections.get_mut(&receipt.sender_id)?;
569            requester_connection.requested_projects.remove(&project_id);
570            requester_connection.projects.insert(project_id);
571            let mut replica_id = 1;
572            while project.active_replica_ids.contains(&replica_id) {
573                replica_id += 1;
574            }
575            project.active_replica_ids.insert(replica_id);
576            project.guests.insert(
577                receipt.sender_id,
578                Collaborator {
579                    replica_id,
580                    user_id: requester_id,
581                    last_activity: Some(OffsetDateTime::now_utc()),
582                    admin: requester_connection.admin,
583                },
584            );
585            receipts_with_replica_ids.push((receipt, replica_id));
586        }
587
588        project.host.last_activity = Some(OffsetDateTime::now_utc());
589        Some((receipts_with_replica_ids, project))
590    }
591
592    pub fn leave_project(
593        &mut self,
594        connection_id: ConnectionId,
595        project_id: ProjectId,
596    ) -> Result<LeftProject> {
597        let user_id = self.user_id_for_connection(connection_id)?;
598        let project = self
599            .projects
600            .get_mut(&project_id)
601            .ok_or_else(|| anyhow!("no such project"))?;
602
603        // If the connection leaving the project is a collaborator, remove it.
604        let remove_collaborator = if let Some(guest) = project.guests.remove(&connection_id) {
605            project.active_replica_ids.remove(&guest.replica_id);
606            true
607        } else {
608            false
609        };
610
611        // If the connection leaving the project has a pending request, remove it.
612        // If that user has no other pending requests on other connections, indicate that the request should be cancelled.
613        let mut cancel_request = None;
614        if let Entry::Occupied(mut entry) = project.join_requests.entry(user_id) {
615            entry
616                .get_mut()
617                .retain(|receipt| receipt.sender_id != connection_id);
618            if entry.get().is_empty() {
619                entry.remove();
620                cancel_request = Some(user_id);
621            }
622        }
623
624        if let Some(connection) = self.connections.get_mut(&connection_id) {
625            connection.projects.remove(&project_id);
626        }
627
628        let connection_ids = project.connection_ids();
629        let unshare = connection_ids.len() <= 1 && project.join_requests.is_empty();
630        if unshare {
631            project.language_servers.clear();
632            for worktree in project.worktrees.values_mut() {
633                worktree.diagnostic_summaries.clear();
634                worktree.entries.clear();
635                worktree.extension_counts.clear();
636            }
637        }
638
639        Ok(LeftProject {
640            host_connection_id: project.host_connection_id,
641            host_user_id: project.host.user_id,
642            connection_ids,
643            cancel_request,
644            unshare,
645            remove_collaborator,
646        })
647    }
648
649    pub fn update_worktree(
650        &mut self,
651        connection_id: ConnectionId,
652        project_id: ProjectId,
653        worktree_id: u64,
654        worktree_root_name: &str,
655        removed_entries: &[u64],
656        updated_entries: &[proto::Entry],
657        scan_id: u64,
658    ) -> Result<(Vec<ConnectionId>, bool, HashMap<String, usize>)> {
659        let project = self.write_project(project_id, connection_id)?;
660        if !project.online {
661            return Err(anyhow!("project is not online"));
662        }
663
664        let connection_ids = project.connection_ids();
665        let mut worktree = project.worktrees.entry(worktree_id).or_default();
666        let metadata_changed = worktree_root_name != worktree.root_name;
667        worktree.root_name = worktree_root_name.to_string();
668
669        for entry_id in removed_entries {
670            if let Some(entry) = worktree.entries.remove(&entry_id) {
671                if !entry.is_ignored {
672                    if let Some(extension) = extension_for_entry(&entry) {
673                        if let Some(count) = worktree.extension_counts.get_mut(extension) {
674                            *count = count.saturating_sub(1);
675                        }
676                    }
677                }
678            }
679        }
680
681        for entry in updated_entries {
682            if let Some(old_entry) = worktree.entries.insert(entry.id, entry.clone()) {
683                if !old_entry.is_ignored {
684                    if let Some(extension) = extension_for_entry(&old_entry) {
685                        if let Some(count) = worktree.extension_counts.get_mut(extension) {
686                            *count = count.saturating_sub(1);
687                        }
688                    }
689                }
690            }
691
692            if !entry.is_ignored {
693                if let Some(extension) = extension_for_entry(&entry) {
694                    if let Some(count) = worktree.extension_counts.get_mut(extension) {
695                        *count += 1;
696                    } else {
697                        worktree.extension_counts.insert(extension.into(), 1);
698                    }
699                }
700            }
701        }
702
703        worktree.scan_id = scan_id;
704        Ok((
705            connection_ids,
706            metadata_changed,
707            worktree.extension_counts.clone(),
708        ))
709    }
710
711    pub fn project_connection_ids(
712        &self,
713        project_id: ProjectId,
714        acting_connection_id: ConnectionId,
715    ) -> Result<Vec<ConnectionId>> {
716        Ok(self
717            .read_project(project_id, acting_connection_id)?
718            .connection_ids())
719    }
720
721    pub fn channel_connection_ids(&self, channel_id: ChannelId) -> Result<Vec<ConnectionId>> {
722        Ok(self
723            .channels
724            .get(&channel_id)
725            .ok_or_else(|| anyhow!("no such channel"))?
726            .connection_ids())
727    }
728
729    pub fn project(&self, project_id: ProjectId) -> Result<&Project> {
730        self.projects
731            .get(&project_id)
732            .ok_or_else(|| anyhow!("no such project"))
733    }
734
735    pub fn register_project_activity(
736        &mut self,
737        project_id: ProjectId,
738        connection_id: ConnectionId,
739    ) -> Result<()> {
740        let project = self
741            .projects
742            .get_mut(&project_id)
743            .ok_or_else(|| anyhow!("no such project"))?;
744        let collaborator = if connection_id == project.host_connection_id {
745            &mut project.host
746        } else if let Some(guest) = project.guests.get_mut(&connection_id) {
747            guest
748        } else {
749            return Err(anyhow!("no such project"))?;
750        };
751        collaborator.last_activity = Some(OffsetDateTime::now_utc());
752        Ok(())
753    }
754
755    pub fn projects(&self) -> impl Iterator<Item = (&ProjectId, &Project)> {
756        self.projects.iter()
757    }
758
759    pub fn read_project(
760        &self,
761        project_id: ProjectId,
762        connection_id: ConnectionId,
763    ) -> Result<&Project> {
764        let project = self
765            .projects
766            .get(&project_id)
767            .ok_or_else(|| anyhow!("no such project"))?;
768        if project.host_connection_id == connection_id
769            || project.guests.contains_key(&connection_id)
770        {
771            Ok(project)
772        } else {
773            Err(anyhow!("no such project"))?
774        }
775    }
776
777    fn write_project(
778        &mut self,
779        project_id: ProjectId,
780        connection_id: ConnectionId,
781    ) -> Result<&mut Project> {
782        let project = self
783            .projects
784            .get_mut(&project_id)
785            .ok_or_else(|| anyhow!("no such project"))?;
786        if project.host_connection_id == connection_id
787            || project.guests.contains_key(&connection_id)
788        {
789            Ok(project)
790        } else {
791            Err(anyhow!("no such project"))?
792        }
793    }
794
795    #[cfg(test)]
796    pub fn check_invariants(&self) {
797        for (connection_id, connection) in &self.connections {
798            for project_id in &connection.projects {
799                let project = &self.projects.get(&project_id).unwrap();
800                if project.host_connection_id != *connection_id {
801                    assert!(project.guests.contains_key(connection_id));
802                }
803
804                for (worktree_id, worktree) in project.worktrees.iter() {
805                    let mut paths = HashMap::default();
806                    for entry in worktree.entries.values() {
807                        let prev_entry = paths.insert(&entry.path, entry);
808                        assert_eq!(
809                            prev_entry,
810                            None,
811                            "worktree {:?}, duplicate path for entries {:?} and {:?}",
812                            worktree_id,
813                            prev_entry.unwrap(),
814                            entry
815                        );
816                    }
817                }
818            }
819            for channel_id in &connection.channels {
820                let channel = self.channels.get(channel_id).unwrap();
821                assert!(channel.connection_ids.contains(connection_id));
822            }
823            assert!(self
824                .connections_by_user_id
825                .get(&connection.user_id)
826                .unwrap()
827                .contains(connection_id));
828        }
829
830        for (user_id, connection_ids) in &self.connections_by_user_id {
831            for connection_id in connection_ids {
832                assert_eq!(
833                    self.connections.get(connection_id).unwrap().user_id,
834                    *user_id
835                );
836            }
837        }
838
839        for (project_id, project) in &self.projects {
840            let host_connection = self.connections.get(&project.host_connection_id).unwrap();
841            assert!(host_connection.projects.contains(project_id));
842
843            for guest_connection_id in project.guests.keys() {
844                let guest_connection = self.connections.get(guest_connection_id).unwrap();
845                assert!(guest_connection.projects.contains(project_id));
846            }
847            assert_eq!(project.active_replica_ids.len(), project.guests.len(),);
848            assert_eq!(
849                project.active_replica_ids,
850                project
851                    .guests
852                    .values()
853                    .map(|guest| guest.replica_id)
854                    .collect::<HashSet<_>>(),
855            );
856        }
857
858        for (channel_id, channel) in &self.channels {
859            for connection_id in &channel.connection_ids {
860                let connection = self.connections.get(connection_id).unwrap();
861                assert!(connection.channels.contains(channel_id));
862            }
863        }
864    }
865}
866
867impl Project {
868    fn is_active_since(&self, start_time: OffsetDateTime) -> bool {
869        self.guests
870            .values()
871            .chain([&self.host])
872            .any(|collaborator| {
873                collaborator
874                    .last_activity
875                    .map_or(false, |active_time| active_time > start_time)
876            })
877    }
878
879    pub fn guest_connection_ids(&self) -> Vec<ConnectionId> {
880        self.guests.keys().copied().collect()
881    }
882
883    pub fn connection_ids(&self) -> Vec<ConnectionId> {
884        self.guests
885            .keys()
886            .copied()
887            .chain(Some(self.host_connection_id))
888            .collect()
889    }
890}
891
892impl Channel {
893    fn connection_ids(&self) -> Vec<ConnectionId> {
894        self.connection_ids.iter().copied().collect()
895    }
896}
897
898fn extension_for_entry(entry: &proto::Entry) -> Option<&str> {
899    str::from_utf8(&entry.path)
900        .ok()
901        .map(Path::new)
902        .and_then(|p| p.extension())
903        .and_then(|e| e.to_str())
904}