1use crate::db::{self, ChannelId, ProjectId, UserId};
2use anyhow::{anyhow, Result};
3use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet};
4use rpc::{proto, ConnectionId};
5use serde::Serialize;
6use std::{mem, path::PathBuf, str, time::Duration};
7use time::OffsetDateTime;
8use tracing::instrument;
9use util::post_inc;
10
11pub type RoomId = u64;
12
13#[derive(Default, Serialize)]
14pub struct Store {
15 connections: BTreeMap<ConnectionId, ConnectionState>,
16 connected_users: BTreeMap<UserId, ConnectedUser>,
17 next_room_id: RoomId,
18 rooms: BTreeMap<RoomId, proto::Room>,
19 projects: BTreeMap<ProjectId, Project>,
20 #[serde(skip)]
21 channels: BTreeMap<ChannelId, Channel>,
22}
23
24#[derive(Default, Serialize)]
25struct ConnectedUser {
26 connection_ids: HashSet<ConnectionId>,
27 active_call: Option<Call>,
28}
29
30#[derive(Serialize)]
31struct ConnectionState {
32 user_id: UserId,
33 admin: bool,
34 projects: BTreeSet<ProjectId>,
35 channels: HashSet<ChannelId>,
36}
37
38#[derive(Copy, Clone, Eq, PartialEq, Serialize)]
39pub struct Call {
40 pub caller_user_id: UserId,
41 pub room_id: RoomId,
42 pub connection_id: Option<ConnectionId>,
43 pub initial_project_id: Option<ProjectId>,
44}
45
46#[derive(Serialize)]
47pub struct Project {
48 pub id: ProjectId,
49 pub room_id: RoomId,
50 pub host_connection_id: ConnectionId,
51 pub host: Collaborator,
52 pub guests: HashMap<ConnectionId, Collaborator>,
53 pub active_replica_ids: HashSet<ReplicaId>,
54 pub worktrees: BTreeMap<u64, Worktree>,
55 pub language_servers: Vec<proto::LanguageServer>,
56}
57
58#[derive(Serialize)]
59pub struct Collaborator {
60 pub replica_id: ReplicaId,
61 pub user_id: UserId,
62 #[serde(skip)]
63 pub last_activity: Option<OffsetDateTime>,
64 pub admin: bool,
65}
66
67#[derive(Default, Serialize)]
68pub struct Worktree {
69 pub root_name: String,
70 pub visible: bool,
71 #[serde(skip)]
72 pub entries: BTreeMap<u64, proto::Entry>,
73 #[serde(skip)]
74 pub diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
75 pub scan_id: u64,
76 pub is_complete: bool,
77}
78
79#[derive(Default)]
80pub struct Channel {
81 pub connection_ids: HashSet<ConnectionId>,
82}
83
84pub type ReplicaId = u16;
85
86#[derive(Default)]
87pub struct RemovedConnectionState {
88 pub user_id: UserId,
89 pub hosted_projects: HashMap<ProjectId, Project>,
90 pub guest_project_ids: HashSet<ProjectId>,
91 pub contact_ids: HashSet<UserId>,
92 pub room_id: Option<RoomId>,
93}
94
95pub struct LeftProject {
96 pub id: ProjectId,
97 pub host_user_id: UserId,
98 pub host_connection_id: ConnectionId,
99 pub connection_ids: Vec<ConnectionId>,
100 pub remove_collaborator: bool,
101}
102
103pub struct LeftRoom<'a> {
104 pub room: Option<&'a proto::Room>,
105 pub unshared_projects: Vec<Project>,
106 pub left_projects: Vec<LeftProject>,
107}
108
109#[derive(Copy, Clone)]
110pub struct Metrics {
111 pub connections: usize,
112 pub registered_projects: usize,
113 pub active_projects: usize,
114 pub shared_projects: usize,
115}
116
117impl Store {
118 pub fn metrics(&self) -> Metrics {
119 const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60);
120 let active_window_start = OffsetDateTime::now_utc() - ACTIVE_PROJECT_TIMEOUT;
121
122 let connections = self.connections.values().filter(|c| !c.admin).count();
123 let mut registered_projects = 0;
124 let mut active_projects = 0;
125 let mut shared_projects = 0;
126 for project in self.projects.values() {
127 if let Some(connection) = self.connections.get(&project.host_connection_id) {
128 if !connection.admin {
129 registered_projects += 1;
130 if project.is_active_since(active_window_start) {
131 active_projects += 1;
132 if !project.guests.is_empty() {
133 shared_projects += 1;
134 }
135 }
136 }
137 }
138 }
139
140 Metrics {
141 connections,
142 registered_projects,
143 active_projects,
144 shared_projects,
145 }
146 }
147
148 #[instrument(skip(self))]
149 pub fn add_connection(
150 &mut self,
151 connection_id: ConnectionId,
152 user_id: UserId,
153 admin: bool,
154 ) -> Option<proto::IncomingCall> {
155 self.connections.insert(
156 connection_id,
157 ConnectionState {
158 user_id,
159 admin,
160 projects: Default::default(),
161 channels: Default::default(),
162 },
163 );
164 let connected_user = self.connected_users.entry(user_id).or_default();
165 connected_user.connection_ids.insert(connection_id);
166 if let Some(active_call) = connected_user.active_call {
167 if active_call.connection_id.is_some() {
168 None
169 } else {
170 let room = self.room(active_call.room_id)?;
171 Some(proto::IncomingCall {
172 room_id: active_call.room_id,
173 caller_user_id: active_call.caller_user_id.to_proto(),
174 participant_user_ids: room
175 .participants
176 .iter()
177 .map(|participant| participant.user_id)
178 .collect(),
179 initial_project_id: active_call
180 .initial_project_id
181 .map(|project_id| project_id.to_proto()),
182 })
183 }
184 } else {
185 None
186 }
187 }
188
189 #[instrument(skip(self))]
190 pub fn remove_connection(
191 &mut self,
192 connection_id: ConnectionId,
193 ) -> Result<RemovedConnectionState> {
194 let connection = self
195 .connections
196 .get_mut(&connection_id)
197 .ok_or_else(|| anyhow!("no such connection"))?;
198
199 let user_id = connection.user_id;
200 let connection_projects = mem::take(&mut connection.projects);
201 let connection_channels = mem::take(&mut connection.channels);
202
203 let mut result = RemovedConnectionState {
204 user_id,
205 ..Default::default()
206 };
207
208 // Leave all channels.
209 for channel_id in connection_channels {
210 self.leave_channel(connection_id, channel_id);
211 }
212
213 // Unshare and leave all projects.
214 for project_id in connection_projects {
215 if let Ok((_, project)) = self.unshare_project(project_id, connection_id) {
216 result.hosted_projects.insert(project_id, project);
217 } else if self.leave_project(project_id, connection_id).is_ok() {
218 result.guest_project_ids.insert(project_id);
219 }
220 }
221
222 let connected_user = self.connected_users.get_mut(&user_id).unwrap();
223 connected_user.connection_ids.remove(&connection_id);
224 if let Some(active_call) = connected_user.active_call.as_ref() {
225 let room_id = active_call.room_id;
226 if let Some(room) = self.rooms.get_mut(&room_id) {
227 let prev_participant_count = room.participants.len();
228 room.participants
229 .retain(|participant| participant.peer_id != connection_id.0);
230 if prev_participant_count == room.participants.len() {
231 if connected_user.connection_ids.is_empty() {
232 room.pending_user_ids
233 .retain(|pending_user_id| *pending_user_id != user_id.to_proto());
234 result.room_id = Some(room_id);
235 connected_user.active_call = None;
236 }
237 } else {
238 result.room_id = Some(room_id);
239 connected_user.active_call = None;
240 }
241
242 if room.participants.is_empty() && room.pending_user_ids.is_empty() {
243 self.rooms.remove(&room_id);
244 }
245 } else {
246 tracing::error!("disconnected user claims to be in a room that does not exist");
247 connected_user.active_call = None;
248 }
249 }
250
251 if connected_user.connection_ids.is_empty() {
252 self.connected_users.remove(&user_id);
253 }
254
255 self.connections.remove(&connection_id).unwrap();
256
257 Ok(result)
258 }
259
260 #[cfg(test)]
261 pub fn channel(&self, id: ChannelId) -> Option<&Channel> {
262 self.channels.get(&id)
263 }
264
265 pub fn join_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
266 if let Some(connection) = self.connections.get_mut(&connection_id) {
267 connection.channels.insert(channel_id);
268 self.channels
269 .entry(channel_id)
270 .or_default()
271 .connection_ids
272 .insert(connection_id);
273 }
274 }
275
276 pub fn leave_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
277 if let Some(connection) = self.connections.get_mut(&connection_id) {
278 connection.channels.remove(&channel_id);
279 if let btree_map::Entry::Occupied(mut entry) = self.channels.entry(channel_id) {
280 entry.get_mut().connection_ids.remove(&connection_id);
281 if entry.get_mut().connection_ids.is_empty() {
282 entry.remove();
283 }
284 }
285 }
286 }
287
288 pub fn user_id_for_connection(&self, connection_id: ConnectionId) -> Result<UserId> {
289 Ok(self
290 .connections
291 .get(&connection_id)
292 .ok_or_else(|| anyhow!("unknown connection"))?
293 .user_id)
294 }
295
296 pub fn connection_ids_for_user(
297 &self,
298 user_id: UserId,
299 ) -> impl Iterator<Item = ConnectionId> + '_ {
300 self.connected_users
301 .get(&user_id)
302 .into_iter()
303 .map(|state| &state.connection_ids)
304 .flatten()
305 .copied()
306 }
307
308 pub fn is_user_online(&self, user_id: UserId) -> bool {
309 !self
310 .connected_users
311 .get(&user_id)
312 .unwrap_or(&Default::default())
313 .connection_ids
314 .is_empty()
315 }
316
317 pub fn build_initial_contacts_update(
318 &self,
319 contacts: Vec<db::Contact>,
320 ) -> proto::UpdateContacts {
321 let mut update = proto::UpdateContacts::default();
322
323 for contact in contacts {
324 match contact {
325 db::Contact::Accepted {
326 user_id,
327 should_notify,
328 } => {
329 update
330 .contacts
331 .push(self.contact_for_user(user_id, should_notify));
332 }
333 db::Contact::Outgoing { user_id } => {
334 update.outgoing_requests.push(user_id.to_proto())
335 }
336 db::Contact::Incoming {
337 user_id,
338 should_notify,
339 } => update
340 .incoming_requests
341 .push(proto::IncomingContactRequest {
342 requester_id: user_id.to_proto(),
343 should_notify,
344 }),
345 }
346 }
347
348 update
349 }
350
351 pub fn contact_for_user(&self, user_id: UserId, should_notify: bool) -> proto::Contact {
352 proto::Contact {
353 user_id: user_id.to_proto(),
354 online: self.is_user_online(user_id),
355 should_notify,
356 }
357 }
358
359 pub fn create_room(&mut self, creator_connection_id: ConnectionId) -> Result<RoomId> {
360 let connection = self
361 .connections
362 .get_mut(&creator_connection_id)
363 .ok_or_else(|| anyhow!("no such connection"))?;
364 let connected_user = self
365 .connected_users
366 .get_mut(&connection.user_id)
367 .ok_or_else(|| anyhow!("no such connection"))?;
368 anyhow::ensure!(
369 connected_user.active_call.is_none(),
370 "can't create a room with an active call"
371 );
372
373 let mut room = proto::Room::default();
374 room.participants.push(proto::Participant {
375 user_id: connection.user_id.to_proto(),
376 peer_id: creator_connection_id.0,
377 project_ids: Default::default(),
378 location: Some(proto::ParticipantLocation {
379 variant: Some(proto::participant_location::Variant::External(
380 proto::participant_location::External {},
381 )),
382 }),
383 });
384
385 let room_id = post_inc(&mut self.next_room_id);
386 self.rooms.insert(room_id, room);
387 connected_user.active_call = Some(Call {
388 caller_user_id: connection.user_id,
389 room_id,
390 connection_id: Some(creator_connection_id),
391 initial_project_id: None,
392 });
393 Ok(room_id)
394 }
395
396 pub fn join_room(
397 &mut self,
398 room_id: RoomId,
399 connection_id: ConnectionId,
400 ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
401 let connection = self
402 .connections
403 .get_mut(&connection_id)
404 .ok_or_else(|| anyhow!("no such connection"))?;
405 let user_id = connection.user_id;
406 let recipient_connection_ids = self.connection_ids_for_user(user_id).collect::<Vec<_>>();
407
408 let connected_user = self
409 .connected_users
410 .get_mut(&user_id)
411 .ok_or_else(|| anyhow!("no such connection"))?;
412 let active_call = connected_user
413 .active_call
414 .as_mut()
415 .ok_or_else(|| anyhow!("not being called"))?;
416 anyhow::ensure!(
417 active_call.room_id == room_id && active_call.connection_id.is_none(),
418 "not being called on this room"
419 );
420
421 let room = self
422 .rooms
423 .get_mut(&room_id)
424 .ok_or_else(|| anyhow!("no such room"))?;
425 anyhow::ensure!(
426 room.pending_user_ids.contains(&user_id.to_proto()),
427 anyhow!("no such room")
428 );
429 room.pending_user_ids
430 .retain(|pending| *pending != user_id.to_proto());
431 room.participants.push(proto::Participant {
432 user_id: user_id.to_proto(),
433 peer_id: connection_id.0,
434 project_ids: Default::default(),
435 location: Some(proto::ParticipantLocation {
436 variant: Some(proto::participant_location::Variant::External(
437 proto::participant_location::External {},
438 )),
439 }),
440 });
441 active_call.connection_id = Some(connection_id);
442
443 Ok((room, recipient_connection_ids))
444 }
445
446 pub fn leave_room(&mut self, room_id: RoomId, connection_id: ConnectionId) -> Result<LeftRoom> {
447 let connection = self
448 .connections
449 .get_mut(&connection_id)
450 .ok_or_else(|| anyhow!("no such connection"))?;
451 let user_id = connection.user_id;
452
453 let connected_user = self
454 .connected_users
455 .get(&user_id)
456 .ok_or_else(|| anyhow!("no such connection"))?;
457 anyhow::ensure!(
458 connected_user
459 .active_call
460 .map_or(false, |call| call.room_id == room_id
461 && call.connection_id == Some(connection_id)),
462 "cannot leave a room before joining it"
463 );
464
465 // Given that users can only join one room at a time, we can safely unshare
466 // and leave all projects associated with the connection.
467 let mut unshared_projects = Vec::new();
468 let mut left_projects = Vec::new();
469 for project_id in connection.projects.clone() {
470 if let Ok((_, project)) = self.unshare_project(project_id, connection_id) {
471 unshared_projects.push(project);
472 } else if let Ok(project) = self.leave_project(project_id, connection_id) {
473 left_projects.push(project);
474 }
475 }
476 self.connected_users.get_mut(&user_id).unwrap().active_call = None;
477
478 let room = self
479 .rooms
480 .get_mut(&room_id)
481 .ok_or_else(|| anyhow!("no such room"))?;
482 room.participants
483 .retain(|participant| participant.peer_id != connection_id.0);
484 if room.participants.is_empty() && room.pending_user_ids.is_empty() {
485 self.rooms.remove(&room_id);
486 }
487
488 Ok(LeftRoom {
489 room: self.rooms.get(&room_id),
490 unshared_projects,
491 left_projects,
492 })
493 }
494
495 pub fn room(&self, room_id: RoomId) -> Option<&proto::Room> {
496 self.rooms.get(&room_id)
497 }
498
499 pub fn call(
500 &mut self,
501 room_id: RoomId,
502 recipient_user_id: UserId,
503 initial_project_id: Option<ProjectId>,
504 from_connection_id: ConnectionId,
505 ) -> Result<(&proto::Room, Vec<ConnectionId>, proto::IncomingCall)> {
506 let caller_user_id = self.user_id_for_connection(from_connection_id)?;
507
508 let recipient_connection_ids = self
509 .connection_ids_for_user(recipient_user_id)
510 .collect::<Vec<_>>();
511 let mut recipient = self
512 .connected_users
513 .get_mut(&recipient_user_id)
514 .ok_or_else(|| anyhow!("no such connection"))?;
515 anyhow::ensure!(
516 recipient.active_call.is_none(),
517 "recipient is already on another call"
518 );
519
520 let room = self
521 .rooms
522 .get_mut(&room_id)
523 .ok_or_else(|| anyhow!("no such room"))?;
524 anyhow::ensure!(
525 room.participants
526 .iter()
527 .any(|participant| participant.peer_id == from_connection_id.0),
528 "no such room"
529 );
530 anyhow::ensure!(
531 room.pending_user_ids
532 .iter()
533 .all(|user_id| UserId::from_proto(*user_id) != recipient_user_id),
534 "cannot call the same user more than once"
535 );
536 room.pending_user_ids.push(recipient_user_id.to_proto());
537
538 if let Some(initial_project_id) = initial_project_id {
539 let project = self
540 .projects
541 .get(&initial_project_id)
542 .ok_or_else(|| anyhow!("no such project"))?;
543 anyhow::ensure!(project.room_id == room_id, "no such project");
544 }
545
546 recipient.active_call = Some(Call {
547 caller_user_id,
548 room_id,
549 connection_id: None,
550 initial_project_id,
551 });
552
553 Ok((
554 room,
555 recipient_connection_ids,
556 proto::IncomingCall {
557 room_id,
558 caller_user_id: caller_user_id.to_proto(),
559 participant_user_ids: room
560 .participants
561 .iter()
562 .map(|participant| participant.user_id)
563 .collect(),
564 initial_project_id: initial_project_id.map(|project_id| project_id.to_proto()),
565 },
566 ))
567 }
568
569 pub fn call_failed(&mut self, room_id: RoomId, to_user_id: UserId) -> Result<&proto::Room> {
570 let mut recipient = self
571 .connected_users
572 .get_mut(&to_user_id)
573 .ok_or_else(|| anyhow!("no such connection"))?;
574 anyhow::ensure!(recipient
575 .active_call
576 .map_or(false, |call| call.room_id == room_id
577 && call.connection_id.is_none()));
578 recipient.active_call = None;
579 let room = self
580 .rooms
581 .get_mut(&room_id)
582 .ok_or_else(|| anyhow!("no such room"))?;
583 room.pending_user_ids
584 .retain(|user_id| UserId::from_proto(*user_id) != to_user_id);
585 Ok(room)
586 }
587
588 pub fn call_declined(
589 &mut self,
590 recipient_connection_id: ConnectionId,
591 ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
592 let recipient_user_id = self.user_id_for_connection(recipient_connection_id)?;
593 let recipient = self
594 .connected_users
595 .get_mut(&recipient_user_id)
596 .ok_or_else(|| anyhow!("no such connection"))?;
597 if let Some(active_call) = recipient.active_call.take() {
598 let recipient_connection_ids = self
599 .connection_ids_for_user(recipient_user_id)
600 .collect::<Vec<_>>();
601 let room = self
602 .rooms
603 .get_mut(&active_call.room_id)
604 .ok_or_else(|| anyhow!("no such room"))?;
605 room.pending_user_ids
606 .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
607 Ok((room, recipient_connection_ids))
608 } else {
609 Err(anyhow!("user is not being called"))
610 }
611 }
612
613 pub fn update_participant_location(
614 &mut self,
615 room_id: RoomId,
616 location: proto::ParticipantLocation,
617 connection_id: ConnectionId,
618 ) -> Result<&proto::Room> {
619 let room = self
620 .rooms
621 .get_mut(&room_id)
622 .ok_or_else(|| anyhow!("no such room"))?;
623 if let Some(proto::participant_location::Variant::Project(project)) =
624 location.variant.as_ref()
625 {
626 anyhow::ensure!(
627 room.participants
628 .iter()
629 .any(|participant| participant.project_ids.contains(&project.id)),
630 "no such project"
631 );
632 }
633
634 let participant = room
635 .participants
636 .iter_mut()
637 .find(|participant| participant.peer_id == connection_id.0)
638 .ok_or_else(|| anyhow!("no such room"))?;
639 participant.location = Some(location);
640
641 Ok(room)
642 }
643
644 pub fn share_project(
645 &mut self,
646 room_id: RoomId,
647 project_id: ProjectId,
648 host_connection_id: ConnectionId,
649 ) -> Result<&proto::Room> {
650 let connection = self
651 .connections
652 .get_mut(&host_connection_id)
653 .ok_or_else(|| anyhow!("no such connection"))?;
654
655 let room = self
656 .rooms
657 .get_mut(&room_id)
658 .ok_or_else(|| anyhow!("no such room"))?;
659 let participant = room
660 .participants
661 .iter_mut()
662 .find(|participant| participant.peer_id == host_connection_id.0)
663 .ok_or_else(|| anyhow!("no such room"))?;
664 participant.project_ids.push(project_id.to_proto());
665
666 connection.projects.insert(project_id);
667 self.projects.insert(
668 project_id,
669 Project {
670 id: project_id,
671 room_id,
672 host_connection_id,
673 host: Collaborator {
674 user_id: connection.user_id,
675 replica_id: 0,
676 last_activity: None,
677 admin: connection.admin,
678 },
679 guests: Default::default(),
680 active_replica_ids: Default::default(),
681 worktrees: Default::default(),
682 language_servers: Default::default(),
683 },
684 );
685
686 Ok(room)
687 }
688
689 pub fn unshare_project(
690 &mut self,
691 project_id: ProjectId,
692 connection_id: ConnectionId,
693 ) -> Result<(&proto::Room, Project)> {
694 match self.projects.entry(project_id) {
695 btree_map::Entry::Occupied(e) => {
696 if e.get().host_connection_id == connection_id {
697 let project = e.remove();
698
699 if let Some(host_connection) = self.connections.get_mut(&connection_id) {
700 host_connection.projects.remove(&project_id);
701 }
702
703 for guest_connection in project.guests.keys() {
704 if let Some(connection) = self.connections.get_mut(guest_connection) {
705 connection.projects.remove(&project_id);
706 }
707 }
708
709 let room = self
710 .rooms
711 .get_mut(&project.room_id)
712 .ok_or_else(|| anyhow!("no such room"))?;
713 let participant = room
714 .participants
715 .iter_mut()
716 .find(|participant| participant.peer_id == connection_id.0)
717 .ok_or_else(|| anyhow!("no such room"))?;
718 participant
719 .project_ids
720 .retain(|id| *id != project_id.to_proto());
721
722 Ok((room, project))
723 } else {
724 Err(anyhow!("no such project"))?
725 }
726 }
727 btree_map::Entry::Vacant(_) => Err(anyhow!("no such project"))?,
728 }
729 }
730
731 pub fn update_project(
732 &mut self,
733 project_id: ProjectId,
734 worktrees: &[proto::WorktreeMetadata],
735 connection_id: ConnectionId,
736 ) -> Result<()> {
737 let project = self
738 .projects
739 .get_mut(&project_id)
740 .ok_or_else(|| anyhow!("no such project"))?;
741 if project.host_connection_id == connection_id {
742 let mut old_worktrees = mem::take(&mut project.worktrees);
743 for worktree in worktrees {
744 if let Some(old_worktree) = old_worktrees.remove(&worktree.id) {
745 project.worktrees.insert(worktree.id, old_worktree);
746 } else {
747 project.worktrees.insert(
748 worktree.id,
749 Worktree {
750 root_name: worktree.root_name.clone(),
751 visible: worktree.visible,
752 ..Default::default()
753 },
754 );
755 }
756 }
757
758 Ok(())
759 } else {
760 Err(anyhow!("no such project"))?
761 }
762 }
763
764 pub fn update_diagnostic_summary(
765 &mut self,
766 project_id: ProjectId,
767 worktree_id: u64,
768 connection_id: ConnectionId,
769 summary: proto::DiagnosticSummary,
770 ) -> Result<Vec<ConnectionId>> {
771 let project = self
772 .projects
773 .get_mut(&project_id)
774 .ok_or_else(|| anyhow!("no such project"))?;
775 if project.host_connection_id == connection_id {
776 let worktree = project
777 .worktrees
778 .get_mut(&worktree_id)
779 .ok_or_else(|| anyhow!("no such worktree"))?;
780 worktree
781 .diagnostic_summaries
782 .insert(summary.path.clone().into(), summary);
783 return Ok(project.connection_ids());
784 }
785
786 Err(anyhow!("no such worktree"))?
787 }
788
789 pub fn start_language_server(
790 &mut self,
791 project_id: ProjectId,
792 connection_id: ConnectionId,
793 language_server: proto::LanguageServer,
794 ) -> Result<Vec<ConnectionId>> {
795 let project = self
796 .projects
797 .get_mut(&project_id)
798 .ok_or_else(|| anyhow!("no such project"))?;
799 if project.host_connection_id == connection_id {
800 project.language_servers.push(language_server);
801 return Ok(project.connection_ids());
802 }
803
804 Err(anyhow!("no such project"))?
805 }
806
807 pub fn join_project(
808 &mut self,
809 requester_connection_id: ConnectionId,
810 project_id: ProjectId,
811 ) -> Result<(&Project, ReplicaId)> {
812 let connection = self
813 .connections
814 .get_mut(&requester_connection_id)
815 .ok_or_else(|| anyhow!("no such connection"))?;
816 let user = self
817 .connected_users
818 .get(&connection.user_id)
819 .ok_or_else(|| anyhow!("no such connection"))?;
820 let active_call = user.active_call.ok_or_else(|| anyhow!("no such project"))?;
821 anyhow::ensure!(
822 active_call.connection_id == Some(requester_connection_id),
823 "no such project"
824 );
825
826 let project = self
827 .projects
828 .get_mut(&project_id)
829 .ok_or_else(|| anyhow!("no such project"))?;
830 anyhow::ensure!(project.room_id == active_call.room_id, "no such project");
831
832 connection.projects.insert(project_id);
833 let mut replica_id = 1;
834 while project.active_replica_ids.contains(&replica_id) {
835 replica_id += 1;
836 }
837 project.active_replica_ids.insert(replica_id);
838 project.guests.insert(
839 requester_connection_id,
840 Collaborator {
841 replica_id,
842 user_id: connection.user_id,
843 last_activity: Some(OffsetDateTime::now_utc()),
844 admin: connection.admin,
845 },
846 );
847
848 project.host.last_activity = Some(OffsetDateTime::now_utc());
849 Ok((project, replica_id))
850 }
851
852 pub fn leave_project(
853 &mut self,
854 project_id: ProjectId,
855 connection_id: ConnectionId,
856 ) -> Result<LeftProject> {
857 let project = self
858 .projects
859 .get_mut(&project_id)
860 .ok_or_else(|| anyhow!("no such project"))?;
861
862 // If the connection leaving the project is a collaborator, remove it.
863 let remove_collaborator = if let Some(guest) = project.guests.remove(&connection_id) {
864 project.active_replica_ids.remove(&guest.replica_id);
865 true
866 } else {
867 false
868 };
869
870 if let Some(connection) = self.connections.get_mut(&connection_id) {
871 connection.projects.remove(&project_id);
872 }
873
874 Ok(LeftProject {
875 id: project.id,
876 host_connection_id: project.host_connection_id,
877 host_user_id: project.host.user_id,
878 connection_ids: project.connection_ids(),
879 remove_collaborator,
880 })
881 }
882
883 #[allow(clippy::too_many_arguments)]
884 pub fn update_worktree(
885 &mut self,
886 connection_id: ConnectionId,
887 project_id: ProjectId,
888 worktree_id: u64,
889 worktree_root_name: &str,
890 removed_entries: &[u64],
891 updated_entries: &[proto::Entry],
892 scan_id: u64,
893 is_last_update: bool,
894 ) -> Result<Vec<ConnectionId>> {
895 let project = self.write_project(project_id, connection_id)?;
896
897 let connection_ids = project.connection_ids();
898 let mut worktree = project.worktrees.entry(worktree_id).or_default();
899 worktree.root_name = worktree_root_name.to_string();
900
901 for entry_id in removed_entries {
902 worktree.entries.remove(entry_id);
903 }
904
905 for entry in updated_entries {
906 worktree.entries.insert(entry.id, entry.clone());
907 }
908
909 worktree.scan_id = scan_id;
910 worktree.is_complete = is_last_update;
911 Ok(connection_ids)
912 }
913
914 pub fn project_connection_ids(
915 &self,
916 project_id: ProjectId,
917 acting_connection_id: ConnectionId,
918 ) -> Result<Vec<ConnectionId>> {
919 Ok(self
920 .read_project(project_id, acting_connection_id)?
921 .connection_ids())
922 }
923
924 pub fn channel_connection_ids(&self, channel_id: ChannelId) -> Result<Vec<ConnectionId>> {
925 Ok(self
926 .channels
927 .get(&channel_id)
928 .ok_or_else(|| anyhow!("no such channel"))?
929 .connection_ids())
930 }
931
932 pub fn project(&self, project_id: ProjectId) -> Result<&Project> {
933 self.projects
934 .get(&project_id)
935 .ok_or_else(|| anyhow!("no such project"))
936 }
937
938 pub fn register_project_activity(
939 &mut self,
940 project_id: ProjectId,
941 connection_id: ConnectionId,
942 ) -> Result<()> {
943 let project = self
944 .projects
945 .get_mut(&project_id)
946 .ok_or_else(|| anyhow!("no such project"))?;
947 let collaborator = if connection_id == project.host_connection_id {
948 &mut project.host
949 } else if let Some(guest) = project.guests.get_mut(&connection_id) {
950 guest
951 } else {
952 return Err(anyhow!("no such project"))?;
953 };
954 collaborator.last_activity = Some(OffsetDateTime::now_utc());
955 Ok(())
956 }
957
958 pub fn projects(&self) -> impl Iterator<Item = (&ProjectId, &Project)> {
959 self.projects.iter()
960 }
961
962 pub fn read_project(
963 &self,
964 project_id: ProjectId,
965 connection_id: ConnectionId,
966 ) -> Result<&Project> {
967 let project = self
968 .projects
969 .get(&project_id)
970 .ok_or_else(|| anyhow!("no such project"))?;
971 if project.host_connection_id == connection_id
972 || project.guests.contains_key(&connection_id)
973 {
974 Ok(project)
975 } else {
976 Err(anyhow!("no such project"))?
977 }
978 }
979
980 fn write_project(
981 &mut self,
982 project_id: ProjectId,
983 connection_id: ConnectionId,
984 ) -> Result<&mut Project> {
985 let project = self
986 .projects
987 .get_mut(&project_id)
988 .ok_or_else(|| anyhow!("no such project"))?;
989 if project.host_connection_id == connection_id
990 || project.guests.contains_key(&connection_id)
991 {
992 Ok(project)
993 } else {
994 Err(anyhow!("no such project"))?
995 }
996 }
997
998 #[cfg(test)]
999 pub fn check_invariants(&self) {
1000 for (connection_id, connection) in &self.connections {
1001 for project_id in &connection.projects {
1002 let project = &self.projects.get(project_id).unwrap();
1003 if project.host_connection_id != *connection_id {
1004 assert!(project.guests.contains_key(connection_id));
1005 }
1006
1007 for (worktree_id, worktree) in project.worktrees.iter() {
1008 let mut paths = HashMap::default();
1009 for entry in worktree.entries.values() {
1010 let prev_entry = paths.insert(&entry.path, entry);
1011 assert_eq!(
1012 prev_entry,
1013 None,
1014 "worktree {:?}, duplicate path for entries {:?} and {:?}",
1015 worktree_id,
1016 prev_entry.unwrap(),
1017 entry
1018 );
1019 }
1020 }
1021 }
1022 for channel_id in &connection.channels {
1023 let channel = self.channels.get(channel_id).unwrap();
1024 assert!(channel.connection_ids.contains(connection_id));
1025 }
1026 assert!(self
1027 .connected_users
1028 .get(&connection.user_id)
1029 .unwrap()
1030 .connection_ids
1031 .contains(connection_id));
1032 }
1033
1034 for (user_id, state) in &self.connected_users {
1035 for connection_id in &state.connection_ids {
1036 assert_eq!(
1037 self.connections.get(connection_id).unwrap().user_id,
1038 *user_id
1039 );
1040 }
1041
1042 if let Some(active_call) = state.active_call.as_ref() {
1043 if let Some(active_call_connection_id) = active_call.connection_id {
1044 assert!(
1045 state.connection_ids.contains(&active_call_connection_id),
1046 "call is active on a dead connection"
1047 );
1048 assert!(
1049 state.connection_ids.contains(&active_call_connection_id),
1050 "call is active on a dead connection"
1051 );
1052 }
1053 }
1054 }
1055
1056 for (room_id, room) in &self.rooms {
1057 for pending_user_id in &room.pending_user_ids {
1058 assert!(
1059 self.connected_users
1060 .contains_key(&UserId::from_proto(*pending_user_id)),
1061 "call is active on a user that has disconnected"
1062 );
1063 }
1064
1065 for participant in &room.participants {
1066 assert!(
1067 self.connections
1068 .contains_key(&ConnectionId(participant.peer_id)),
1069 "room contains participant that has disconnected"
1070 );
1071
1072 for project_id in &participant.project_ids {
1073 let project = &self.projects[&ProjectId::from_proto(*project_id)];
1074 assert_eq!(
1075 project.room_id, *room_id,
1076 "project was shared on a different room"
1077 );
1078 }
1079 }
1080
1081 assert!(
1082 !room.pending_user_ids.is_empty() || !room.participants.is_empty(),
1083 "room can't be empty"
1084 );
1085 }
1086
1087 for (project_id, project) in &self.projects {
1088 let host_connection = self.connections.get(&project.host_connection_id).unwrap();
1089 assert!(host_connection.projects.contains(project_id));
1090
1091 for guest_connection_id in project.guests.keys() {
1092 let guest_connection = self.connections.get(guest_connection_id).unwrap();
1093 assert!(guest_connection.projects.contains(project_id));
1094 }
1095 assert_eq!(project.active_replica_ids.len(), project.guests.len(),);
1096 assert_eq!(
1097 project.active_replica_ids,
1098 project
1099 .guests
1100 .values()
1101 .map(|guest| guest.replica_id)
1102 .collect::<HashSet<_>>(),
1103 );
1104
1105 let room = &self.rooms[&project.room_id];
1106 let room_participant = room
1107 .participants
1108 .iter()
1109 .find(|participant| participant.peer_id == project.host_connection_id.0)
1110 .unwrap();
1111 assert!(
1112 room_participant
1113 .project_ids
1114 .contains(&project_id.to_proto()),
1115 "project was not shared in room"
1116 );
1117 }
1118
1119 for (channel_id, channel) in &self.channels {
1120 for connection_id in &channel.connection_ids {
1121 let connection = self.connections.get(connection_id).unwrap();
1122 assert!(connection.channels.contains(channel_id));
1123 }
1124 }
1125 }
1126}
1127
1128impl Project {
1129 fn is_active_since(&self, start_time: OffsetDateTime) -> bool {
1130 self.guests
1131 .values()
1132 .chain([&self.host])
1133 .any(|collaborator| {
1134 collaborator
1135 .last_activity
1136 .map_or(false, |active_time| active_time > start_time)
1137 })
1138 }
1139
1140 pub fn guest_connection_ids(&self) -> Vec<ConnectionId> {
1141 self.guests.keys().copied().collect()
1142 }
1143
1144 pub fn connection_ids(&self) -> Vec<ConnectionId> {
1145 self.guests
1146 .keys()
1147 .copied()
1148 .chain(Some(self.host_connection_id))
1149 .collect()
1150 }
1151}
1152
1153impl Channel {
1154 fn connection_ids(&self) -> Vec<ConnectionId> {
1155 self.connection_ids.iter().copied().collect()
1156 }
1157}