1use crate::db::{self, ChannelId, ProjectId, UserId};
2use anyhow::{anyhow, Result};
3use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet};
4use rpc::{proto, ConnectionId};
5use serde::Serialize;
6use std::{mem, path::PathBuf, str, time::Duration};
7use time::OffsetDateTime;
8use tracing::instrument;
9use util::post_inc;
10
11pub type RoomId = u64;
12
13#[derive(Default, Serialize)]
14pub struct Store {
15 connections: BTreeMap<ConnectionId, ConnectionState>,
16 connected_users: BTreeMap<UserId, ConnectedUser>,
17 next_room_id: RoomId,
18 rooms: BTreeMap<RoomId, proto::Room>,
19 projects: BTreeMap<ProjectId, Project>,
20 #[serde(skip)]
21 channels: BTreeMap<ChannelId, Channel>,
22}
23
24#[derive(Default, Serialize)]
25struct ConnectedUser {
26 connection_ids: HashSet<ConnectionId>,
27 active_call: Option<Call>,
28}
29
30#[derive(Serialize)]
31struct ConnectionState {
32 user_id: UserId,
33 admin: bool,
34 projects: BTreeSet<ProjectId>,
35 channels: HashSet<ChannelId>,
36}
37
38#[derive(Copy, Clone, Eq, PartialEq, Serialize)]
39pub struct Call {
40 pub caller_user_id: UserId,
41 pub room_id: RoomId,
42 pub connection_id: Option<ConnectionId>,
43 pub initial_project_id: Option<ProjectId>,
44}
45
46#[derive(Serialize)]
47pub struct Project {
48 pub id: ProjectId,
49 pub room_id: RoomId,
50 pub host_connection_id: ConnectionId,
51 pub host: Collaborator,
52 pub guests: HashMap<ConnectionId, Collaborator>,
53 pub active_replica_ids: HashSet<ReplicaId>,
54 pub worktrees: BTreeMap<u64, Worktree>,
55 pub language_servers: Vec<proto::LanguageServer>,
56}
57
58#[derive(Serialize)]
59pub struct Collaborator {
60 pub replica_id: ReplicaId,
61 pub user_id: UserId,
62 #[serde(skip)]
63 pub last_activity: Option<OffsetDateTime>,
64 pub admin: bool,
65}
66
67#[derive(Default, Serialize)]
68pub struct Worktree {
69 pub root_name: String,
70 pub visible: bool,
71 #[serde(skip)]
72 pub entries: BTreeMap<u64, proto::Entry>,
73 #[serde(skip)]
74 pub diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
75 pub scan_id: u64,
76 pub is_complete: bool,
77}
78
79#[derive(Default)]
80pub struct Channel {
81 pub connection_ids: HashSet<ConnectionId>,
82}
83
84pub type ReplicaId = u16;
85
86#[derive(Default)]
87pub struct RemovedConnectionState {
88 pub user_id: UserId,
89 pub hosted_projects: Vec<Project>,
90 pub guest_projects: Vec<LeftProject>,
91 pub contact_ids: HashSet<UserId>,
92 pub room_id: Option<RoomId>,
93 pub canceled_call_connection_ids: Vec<ConnectionId>,
94}
95
96pub struct LeftProject {
97 pub id: ProjectId,
98 pub host_user_id: UserId,
99 pub host_connection_id: ConnectionId,
100 pub connection_ids: Vec<ConnectionId>,
101 pub remove_collaborator: bool,
102}
103
104pub struct LeftRoom<'a> {
105 pub room: Option<&'a proto::Room>,
106 pub unshared_projects: Vec<Project>,
107 pub left_projects: Vec<LeftProject>,
108 pub canceled_call_connection_ids: Vec<ConnectionId>,
109}
110
111#[derive(Copy, Clone)]
112pub struct Metrics {
113 pub connections: usize,
114 pub registered_projects: usize,
115 pub active_projects: usize,
116 pub shared_projects: usize,
117}
118
119impl Store {
120 pub fn metrics(&self) -> Metrics {
121 const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60);
122 let active_window_start = OffsetDateTime::now_utc() - ACTIVE_PROJECT_TIMEOUT;
123
124 let connections = self.connections.values().filter(|c| !c.admin).count();
125 let mut registered_projects = 0;
126 let mut active_projects = 0;
127 let mut shared_projects = 0;
128 for project in self.projects.values() {
129 if let Some(connection) = self.connections.get(&project.host_connection_id) {
130 if !connection.admin {
131 registered_projects += 1;
132 if project.is_active_since(active_window_start) {
133 active_projects += 1;
134 if !project.guests.is_empty() {
135 shared_projects += 1;
136 }
137 }
138 }
139 }
140 }
141
142 Metrics {
143 connections,
144 registered_projects,
145 active_projects,
146 shared_projects,
147 }
148 }
149
150 #[instrument(skip(self))]
151 pub fn add_connection(
152 &mut self,
153 connection_id: ConnectionId,
154 user_id: UserId,
155 admin: bool,
156 ) -> Option<proto::IncomingCall> {
157 self.connections.insert(
158 connection_id,
159 ConnectionState {
160 user_id,
161 admin,
162 projects: Default::default(),
163 channels: Default::default(),
164 },
165 );
166 let connected_user = self.connected_users.entry(user_id).or_default();
167 connected_user.connection_ids.insert(connection_id);
168 if let Some(active_call) = connected_user.active_call {
169 if active_call.connection_id.is_some() {
170 None
171 } else {
172 let room = self.room(active_call.room_id)?;
173 Some(proto::IncomingCall {
174 room_id: active_call.room_id,
175 caller_user_id: active_call.caller_user_id.to_proto(),
176 participant_user_ids: room
177 .participants
178 .iter()
179 .map(|participant| participant.user_id)
180 .collect(),
181 initial_project: active_call
182 .initial_project_id
183 .and_then(|id| Self::build_participant_project(id, &self.projects)),
184 })
185 }
186 } else {
187 None
188 }
189 }
190
191 #[instrument(skip(self))]
192 pub fn remove_connection(
193 &mut self,
194 connection_id: ConnectionId,
195 ) -> Result<RemovedConnectionState> {
196 let connection = self
197 .connections
198 .get_mut(&connection_id)
199 .ok_or_else(|| anyhow!("no such connection"))?;
200
201 let user_id = connection.user_id;
202 let connection_channels = mem::take(&mut connection.channels);
203
204 let mut result = RemovedConnectionState {
205 user_id,
206 ..Default::default()
207 };
208
209 // Leave all channels.
210 for channel_id in connection_channels {
211 self.leave_channel(connection_id, channel_id);
212 }
213
214 let connected_user = self.connected_users.get(&user_id).unwrap();
215 if let Some(active_call) = connected_user.active_call.as_ref() {
216 let room_id = active_call.room_id;
217 let left_room = self.leave_room(room_id, connection_id)?;
218 result.hosted_projects = left_room.unshared_projects;
219 result.guest_projects = left_room.left_projects;
220 result.room_id = Some(room_id);
221 result.canceled_call_connection_ids = left_room.canceled_call_connection_ids;
222 }
223
224 let connected_user = self.connected_users.get_mut(&user_id).unwrap();
225 connected_user.connection_ids.remove(&connection_id);
226 if connected_user.connection_ids.is_empty() {
227 self.connected_users.remove(&user_id);
228 }
229 self.connections.remove(&connection_id).unwrap();
230
231 Ok(result)
232 }
233
234 #[cfg(test)]
235 pub fn channel(&self, id: ChannelId) -> Option<&Channel> {
236 self.channels.get(&id)
237 }
238
239 pub fn join_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
240 if let Some(connection) = self.connections.get_mut(&connection_id) {
241 connection.channels.insert(channel_id);
242 self.channels
243 .entry(channel_id)
244 .or_default()
245 .connection_ids
246 .insert(connection_id);
247 }
248 }
249
250 pub fn leave_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
251 if let Some(connection) = self.connections.get_mut(&connection_id) {
252 connection.channels.remove(&channel_id);
253 if let btree_map::Entry::Occupied(mut entry) = self.channels.entry(channel_id) {
254 entry.get_mut().connection_ids.remove(&connection_id);
255 if entry.get_mut().connection_ids.is_empty() {
256 entry.remove();
257 }
258 }
259 }
260 }
261
262 pub fn user_id_for_connection(&self, connection_id: ConnectionId) -> Result<UserId> {
263 Ok(self
264 .connections
265 .get(&connection_id)
266 .ok_or_else(|| anyhow!("unknown connection"))?
267 .user_id)
268 }
269
270 pub fn connection_ids_for_user(
271 &self,
272 user_id: UserId,
273 ) -> impl Iterator<Item = ConnectionId> + '_ {
274 self.connected_users
275 .get(&user_id)
276 .into_iter()
277 .map(|state| &state.connection_ids)
278 .flatten()
279 .copied()
280 }
281
282 pub fn is_user_online(&self, user_id: UserId) -> bool {
283 !self
284 .connected_users
285 .get(&user_id)
286 .unwrap_or(&Default::default())
287 .connection_ids
288 .is_empty()
289 }
290
291 fn is_user_busy(&self, user_id: UserId) -> bool {
292 self.connected_users
293 .get(&user_id)
294 .unwrap_or(&Default::default())
295 .active_call
296 .is_some()
297 }
298
299 pub fn build_initial_contacts_update(
300 &self,
301 contacts: Vec<db::Contact>,
302 ) -> proto::UpdateContacts {
303 let mut update = proto::UpdateContacts::default();
304
305 for contact in contacts {
306 match contact {
307 db::Contact::Accepted {
308 user_id,
309 should_notify,
310 } => {
311 update
312 .contacts
313 .push(self.contact_for_user(user_id, should_notify));
314 }
315 db::Contact::Outgoing { user_id } => {
316 update.outgoing_requests.push(user_id.to_proto())
317 }
318 db::Contact::Incoming {
319 user_id,
320 should_notify,
321 } => update
322 .incoming_requests
323 .push(proto::IncomingContactRequest {
324 requester_id: user_id.to_proto(),
325 should_notify,
326 }),
327 }
328 }
329
330 update
331 }
332
333 pub fn contact_for_user(&self, user_id: UserId, should_notify: bool) -> proto::Contact {
334 proto::Contact {
335 user_id: user_id.to_proto(),
336 online: self.is_user_online(user_id),
337 busy: self.is_user_busy(user_id),
338 should_notify,
339 }
340 }
341
342 pub fn create_room(&mut self, creator_connection_id: ConnectionId) -> Result<RoomId> {
343 let connection = self
344 .connections
345 .get_mut(&creator_connection_id)
346 .ok_or_else(|| anyhow!("no such connection"))?;
347 let connected_user = self
348 .connected_users
349 .get_mut(&connection.user_id)
350 .ok_or_else(|| anyhow!("no such connection"))?;
351 anyhow::ensure!(
352 connected_user.active_call.is_none(),
353 "can't create a room with an active call"
354 );
355
356 let mut room = proto::Room::default();
357 room.participants.push(proto::Participant {
358 user_id: connection.user_id.to_proto(),
359 peer_id: creator_connection_id.0,
360 projects: Default::default(),
361 location: Some(proto::ParticipantLocation {
362 variant: Some(proto::participant_location::Variant::External(
363 proto::participant_location::External {},
364 )),
365 }),
366 });
367
368 let room_id = post_inc(&mut self.next_room_id);
369 self.rooms.insert(room_id, room);
370 connected_user.active_call = Some(Call {
371 caller_user_id: connection.user_id,
372 room_id,
373 connection_id: Some(creator_connection_id),
374 initial_project_id: None,
375 });
376 Ok(room_id)
377 }
378
379 pub fn join_room(
380 &mut self,
381 room_id: RoomId,
382 connection_id: ConnectionId,
383 ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
384 let connection = self
385 .connections
386 .get_mut(&connection_id)
387 .ok_or_else(|| anyhow!("no such connection"))?;
388 let user_id = connection.user_id;
389 let recipient_connection_ids = self.connection_ids_for_user(user_id).collect::<Vec<_>>();
390
391 let connected_user = self
392 .connected_users
393 .get_mut(&user_id)
394 .ok_or_else(|| anyhow!("no such connection"))?;
395 let active_call = connected_user
396 .active_call
397 .as_mut()
398 .ok_or_else(|| anyhow!("not being called"))?;
399 anyhow::ensure!(
400 active_call.room_id == room_id && active_call.connection_id.is_none(),
401 "not being called on this room"
402 );
403
404 let room = self
405 .rooms
406 .get_mut(&room_id)
407 .ok_or_else(|| anyhow!("no such room"))?;
408 anyhow::ensure!(
409 room.pending_participant_user_ids
410 .contains(&user_id.to_proto()),
411 anyhow!("no such room")
412 );
413 room.pending_participant_user_ids
414 .retain(|pending| *pending != user_id.to_proto());
415 room.participants.push(proto::Participant {
416 user_id: user_id.to_proto(),
417 peer_id: connection_id.0,
418 projects: Default::default(),
419 location: Some(proto::ParticipantLocation {
420 variant: Some(proto::participant_location::Variant::External(
421 proto::participant_location::External {},
422 )),
423 }),
424 });
425 active_call.connection_id = Some(connection_id);
426
427 Ok((room, recipient_connection_ids))
428 }
429
430 pub fn leave_room(&mut self, room_id: RoomId, connection_id: ConnectionId) -> Result<LeftRoom> {
431 let connection = self
432 .connections
433 .get_mut(&connection_id)
434 .ok_or_else(|| anyhow!("no such connection"))?;
435 let user_id = connection.user_id;
436
437 let connected_user = self
438 .connected_users
439 .get(&user_id)
440 .ok_or_else(|| anyhow!("no such connection"))?;
441 anyhow::ensure!(
442 connected_user
443 .active_call
444 .map_or(false, |call| call.room_id == room_id
445 && call.connection_id == Some(connection_id)),
446 "cannot leave a room before joining it"
447 );
448
449 // Given that users can only join one room at a time, we can safely unshare
450 // and leave all projects associated with the connection.
451 let mut unshared_projects = Vec::new();
452 let mut left_projects = Vec::new();
453 for project_id in connection.projects.clone() {
454 if let Ok((_, project)) = self.unshare_project(project_id, connection_id) {
455 unshared_projects.push(project);
456 } else if let Ok(project) = self.leave_project(project_id, connection_id) {
457 left_projects.push(project);
458 }
459 }
460 self.connected_users.get_mut(&user_id).unwrap().active_call = None;
461
462 let room = self
463 .rooms
464 .get_mut(&room_id)
465 .ok_or_else(|| anyhow!("no such room"))?;
466 room.participants
467 .retain(|participant| participant.peer_id != connection_id.0);
468
469 let mut canceled_call_connection_ids = Vec::new();
470 room.pending_participant_user_ids
471 .retain(|pending_participant_user_id| {
472 if let Some(connected_user) = self
473 .connected_users
474 .get_mut(&UserId::from_proto(*pending_participant_user_id))
475 {
476 if let Some(call) = connected_user.active_call.as_ref() {
477 if call.caller_user_id == user_id {
478 connected_user.active_call.take();
479 canceled_call_connection_ids
480 .extend(connected_user.connection_ids.iter().copied());
481 false
482 } else {
483 true
484 }
485 } else {
486 true
487 }
488 } else {
489 true
490 }
491 });
492
493 if room.participants.is_empty() && room.pending_participant_user_ids.is_empty() {
494 self.rooms.remove(&room_id);
495 }
496
497 Ok(LeftRoom {
498 room: self.rooms.get(&room_id),
499 unshared_projects,
500 left_projects,
501 canceled_call_connection_ids,
502 })
503 }
504
505 pub fn room(&self, room_id: RoomId) -> Option<&proto::Room> {
506 self.rooms.get(&room_id)
507 }
508
509 pub fn call(
510 &mut self,
511 room_id: RoomId,
512 recipient_user_id: UserId,
513 initial_project_id: Option<ProjectId>,
514 from_connection_id: ConnectionId,
515 ) -> Result<(&proto::Room, Vec<ConnectionId>, proto::IncomingCall)> {
516 let caller_user_id = self.user_id_for_connection(from_connection_id)?;
517
518 let recipient_connection_ids = self
519 .connection_ids_for_user(recipient_user_id)
520 .collect::<Vec<_>>();
521 let mut recipient = self
522 .connected_users
523 .get_mut(&recipient_user_id)
524 .ok_or_else(|| anyhow!("no such connection"))?;
525 anyhow::ensure!(
526 recipient.active_call.is_none(),
527 "recipient is already on another call"
528 );
529
530 let room = self
531 .rooms
532 .get_mut(&room_id)
533 .ok_or_else(|| anyhow!("no such room"))?;
534 anyhow::ensure!(
535 room.participants
536 .iter()
537 .any(|participant| participant.peer_id == from_connection_id.0),
538 "no such room"
539 );
540 anyhow::ensure!(
541 room.pending_participant_user_ids
542 .iter()
543 .all(|user_id| UserId::from_proto(*user_id) != recipient_user_id),
544 "cannot call the same user more than once"
545 );
546 room.pending_participant_user_ids
547 .push(recipient_user_id.to_proto());
548
549 if let Some(initial_project_id) = initial_project_id {
550 let project = self
551 .projects
552 .get(&initial_project_id)
553 .ok_or_else(|| anyhow!("no such project"))?;
554 anyhow::ensure!(project.room_id == room_id, "no such project");
555 }
556
557 recipient.active_call = Some(Call {
558 caller_user_id,
559 room_id,
560 connection_id: None,
561 initial_project_id,
562 });
563
564 Ok((
565 room,
566 recipient_connection_ids,
567 proto::IncomingCall {
568 room_id,
569 caller_user_id: caller_user_id.to_proto(),
570 participant_user_ids: room
571 .participants
572 .iter()
573 .map(|participant| participant.user_id)
574 .collect(),
575 initial_project: initial_project_id
576 .and_then(|id| Self::build_participant_project(id, &self.projects)),
577 },
578 ))
579 }
580
581 pub fn call_failed(&mut self, room_id: RoomId, to_user_id: UserId) -> Result<&proto::Room> {
582 let mut recipient = self
583 .connected_users
584 .get_mut(&to_user_id)
585 .ok_or_else(|| anyhow!("no such connection"))?;
586 anyhow::ensure!(recipient
587 .active_call
588 .map_or(false, |call| call.room_id == room_id
589 && call.connection_id.is_none()));
590 recipient.active_call = None;
591 let room = self
592 .rooms
593 .get_mut(&room_id)
594 .ok_or_else(|| anyhow!("no such room"))?;
595 room.pending_participant_user_ids
596 .retain(|user_id| UserId::from_proto(*user_id) != to_user_id);
597 Ok(room)
598 }
599
600 pub fn cancel_call(
601 &mut self,
602 room_id: RoomId,
603 recipient_user_id: UserId,
604 canceller_connection_id: ConnectionId,
605 ) -> Result<(&proto::Room, HashSet<ConnectionId>)> {
606 let canceller_user_id = self.user_id_for_connection(canceller_connection_id)?;
607 let canceller = self
608 .connected_users
609 .get(&canceller_user_id)
610 .ok_or_else(|| anyhow!("no such connection"))?;
611 let recipient = self
612 .connected_users
613 .get(&recipient_user_id)
614 .ok_or_else(|| anyhow!("no such connection"))?;
615 let canceller_active_call = canceller
616 .active_call
617 .as_ref()
618 .ok_or_else(|| anyhow!("no active call"))?;
619 let recipient_active_call = recipient
620 .active_call
621 .as_ref()
622 .ok_or_else(|| anyhow!("no active call for recipient"))?;
623
624 anyhow::ensure!(
625 canceller_active_call.room_id == room_id,
626 "users are on different calls"
627 );
628 anyhow::ensure!(
629 recipient_active_call.room_id == room_id,
630 "users are on different calls"
631 );
632 anyhow::ensure!(
633 recipient_active_call.connection_id.is_none(),
634 "recipient has already answered"
635 );
636 let room_id = recipient_active_call.room_id;
637 let room = self
638 .rooms
639 .get_mut(&room_id)
640 .ok_or_else(|| anyhow!("no such room"))?;
641 room.pending_participant_user_ids
642 .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
643
644 let recipient = self.connected_users.get_mut(&recipient_user_id).unwrap();
645 recipient.active_call.take();
646
647 Ok((room, recipient.connection_ids.clone()))
648 }
649
650 pub fn decline_call(
651 &mut self,
652 room_id: RoomId,
653 recipient_connection_id: ConnectionId,
654 ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
655 let recipient_user_id = self.user_id_for_connection(recipient_connection_id)?;
656 let recipient = self
657 .connected_users
658 .get_mut(&recipient_user_id)
659 .ok_or_else(|| anyhow!("no such connection"))?;
660 if let Some(active_call) = recipient.active_call.take() {
661 anyhow::ensure!(active_call.room_id == room_id, "no such room");
662 let recipient_connection_ids = self
663 .connection_ids_for_user(recipient_user_id)
664 .collect::<Vec<_>>();
665 let room = self
666 .rooms
667 .get_mut(&active_call.room_id)
668 .ok_or_else(|| anyhow!("no such room"))?;
669 room.pending_participant_user_ids
670 .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
671 Ok((room, recipient_connection_ids))
672 } else {
673 Err(anyhow!("user is not being called"))
674 }
675 }
676
677 pub fn update_participant_location(
678 &mut self,
679 room_id: RoomId,
680 location: proto::ParticipantLocation,
681 connection_id: ConnectionId,
682 ) -> Result<&proto::Room> {
683 let room = self
684 .rooms
685 .get_mut(&room_id)
686 .ok_or_else(|| anyhow!("no such room"))?;
687 if let Some(proto::participant_location::Variant::SharedProject(project)) =
688 location.variant.as_ref()
689 {
690 anyhow::ensure!(
691 room.participants
692 .iter()
693 .flat_map(|participant| &participant.projects)
694 .any(|participant_project| participant_project.id == project.id),
695 "no such project"
696 );
697 }
698
699 let participant = room
700 .participants
701 .iter_mut()
702 .find(|participant| participant.peer_id == connection_id.0)
703 .ok_or_else(|| anyhow!("no such room"))?;
704 participant.location = Some(location);
705
706 Ok(room)
707 }
708
709 pub fn share_project(
710 &mut self,
711 room_id: RoomId,
712 project_id: ProjectId,
713 worktrees: Vec<proto::WorktreeMetadata>,
714 host_connection_id: ConnectionId,
715 ) -> Result<&proto::Room> {
716 let connection = self
717 .connections
718 .get_mut(&host_connection_id)
719 .ok_or_else(|| anyhow!("no such connection"))?;
720
721 let room = self
722 .rooms
723 .get_mut(&room_id)
724 .ok_or_else(|| anyhow!("no such room"))?;
725 let participant = room
726 .participants
727 .iter_mut()
728 .find(|participant| participant.peer_id == host_connection_id.0)
729 .ok_or_else(|| anyhow!("no such room"))?;
730
731 connection.projects.insert(project_id);
732 self.projects.insert(
733 project_id,
734 Project {
735 id: project_id,
736 room_id,
737 host_connection_id,
738 host: Collaborator {
739 user_id: connection.user_id,
740 replica_id: 0,
741 last_activity: None,
742 admin: connection.admin,
743 },
744 guests: Default::default(),
745 active_replica_ids: Default::default(),
746 worktrees: worktrees
747 .into_iter()
748 .map(|worktree| {
749 (
750 worktree.id,
751 Worktree {
752 root_name: worktree.root_name,
753 visible: worktree.visible,
754 ..Default::default()
755 },
756 )
757 })
758 .collect(),
759 language_servers: Default::default(),
760 },
761 );
762
763 participant
764 .projects
765 .extend(Self::build_participant_project(project_id, &self.projects));
766
767 Ok(room)
768 }
769
770 pub fn unshare_project(
771 &mut self,
772 project_id: ProjectId,
773 connection_id: ConnectionId,
774 ) -> Result<(&proto::Room, Project)> {
775 match self.projects.entry(project_id) {
776 btree_map::Entry::Occupied(e) => {
777 if e.get().host_connection_id == connection_id {
778 let project = e.remove();
779
780 if let Some(host_connection) = self.connections.get_mut(&connection_id) {
781 host_connection.projects.remove(&project_id);
782 }
783
784 for guest_connection in project.guests.keys() {
785 if let Some(connection) = self.connections.get_mut(guest_connection) {
786 connection.projects.remove(&project_id);
787 }
788 }
789
790 let room = self
791 .rooms
792 .get_mut(&project.room_id)
793 .ok_or_else(|| anyhow!("no such room"))?;
794 let participant = room
795 .participants
796 .iter_mut()
797 .find(|participant| participant.peer_id == connection_id.0)
798 .ok_or_else(|| anyhow!("no such room"))?;
799 participant
800 .projects
801 .retain(|project| project.id != project_id.to_proto());
802
803 Ok((room, project))
804 } else {
805 Err(anyhow!("no such project"))?
806 }
807 }
808 btree_map::Entry::Vacant(_) => Err(anyhow!("no such project"))?,
809 }
810 }
811
812 pub fn update_project(
813 &mut self,
814 project_id: ProjectId,
815 worktrees: &[proto::WorktreeMetadata],
816 connection_id: ConnectionId,
817 ) -> Result<&proto::Room> {
818 let project = self
819 .projects
820 .get_mut(&project_id)
821 .ok_or_else(|| anyhow!("no such project"))?;
822 if project.host_connection_id == connection_id {
823 let mut old_worktrees = mem::take(&mut project.worktrees);
824 for worktree in worktrees {
825 if let Some(old_worktree) = old_worktrees.remove(&worktree.id) {
826 project.worktrees.insert(worktree.id, old_worktree);
827 } else {
828 project.worktrees.insert(
829 worktree.id,
830 Worktree {
831 root_name: worktree.root_name.clone(),
832 visible: worktree.visible,
833 ..Default::default()
834 },
835 );
836 }
837 }
838
839 let room = self
840 .rooms
841 .get_mut(&project.room_id)
842 .ok_or_else(|| anyhow!("no such room"))?;
843 let participant_project = room
844 .participants
845 .iter_mut()
846 .flat_map(|participant| &mut participant.projects)
847 .find(|project| project.id == project_id.to_proto())
848 .ok_or_else(|| anyhow!("no such project"))?;
849 participant_project.worktree_root_names = worktrees
850 .iter()
851 .filter(|worktree| worktree.visible)
852 .map(|worktree| worktree.root_name.clone())
853 .collect();
854
855 Ok(room)
856 } else {
857 Err(anyhow!("no such project"))?
858 }
859 }
860
861 pub fn update_diagnostic_summary(
862 &mut self,
863 project_id: ProjectId,
864 worktree_id: u64,
865 connection_id: ConnectionId,
866 summary: proto::DiagnosticSummary,
867 ) -> Result<Vec<ConnectionId>> {
868 let project = self
869 .projects
870 .get_mut(&project_id)
871 .ok_or_else(|| anyhow!("no such project"))?;
872 if project.host_connection_id == connection_id {
873 let worktree = project
874 .worktrees
875 .get_mut(&worktree_id)
876 .ok_or_else(|| anyhow!("no such worktree"))?;
877 worktree
878 .diagnostic_summaries
879 .insert(summary.path.clone().into(), summary);
880 return Ok(project.connection_ids());
881 }
882
883 Err(anyhow!("no such worktree"))?
884 }
885
886 pub fn start_language_server(
887 &mut self,
888 project_id: ProjectId,
889 connection_id: ConnectionId,
890 language_server: proto::LanguageServer,
891 ) -> Result<Vec<ConnectionId>> {
892 let project = self
893 .projects
894 .get_mut(&project_id)
895 .ok_or_else(|| anyhow!("no such project"))?;
896 if project.host_connection_id == connection_id {
897 project.language_servers.push(language_server);
898 return Ok(project.connection_ids());
899 }
900
901 Err(anyhow!("no such project"))?
902 }
903
904 pub fn join_project(
905 &mut self,
906 requester_connection_id: ConnectionId,
907 project_id: ProjectId,
908 ) -> Result<(&Project, ReplicaId)> {
909 let connection = self
910 .connections
911 .get_mut(&requester_connection_id)
912 .ok_or_else(|| anyhow!("no such connection"))?;
913 let user = self
914 .connected_users
915 .get(&connection.user_id)
916 .ok_or_else(|| anyhow!("no such connection"))?;
917 let active_call = user.active_call.ok_or_else(|| anyhow!("no such project"))?;
918 anyhow::ensure!(
919 active_call.connection_id == Some(requester_connection_id),
920 "no such project"
921 );
922
923 let project = self
924 .projects
925 .get_mut(&project_id)
926 .ok_or_else(|| anyhow!("no such project"))?;
927 anyhow::ensure!(project.room_id == active_call.room_id, "no such project");
928
929 connection.projects.insert(project_id);
930 let mut replica_id = 1;
931 while project.active_replica_ids.contains(&replica_id) {
932 replica_id += 1;
933 }
934 project.active_replica_ids.insert(replica_id);
935 project.guests.insert(
936 requester_connection_id,
937 Collaborator {
938 replica_id,
939 user_id: connection.user_id,
940 last_activity: Some(OffsetDateTime::now_utc()),
941 admin: connection.admin,
942 },
943 );
944
945 project.host.last_activity = Some(OffsetDateTime::now_utc());
946 Ok((project, replica_id))
947 }
948
949 pub fn leave_project(
950 &mut self,
951 project_id: ProjectId,
952 connection_id: ConnectionId,
953 ) -> Result<LeftProject> {
954 let project = self
955 .projects
956 .get_mut(&project_id)
957 .ok_or_else(|| anyhow!("no such project"))?;
958
959 // If the connection leaving the project is a collaborator, remove it.
960 let remove_collaborator = if let Some(guest) = project.guests.remove(&connection_id) {
961 project.active_replica_ids.remove(&guest.replica_id);
962 true
963 } else {
964 false
965 };
966
967 if let Some(connection) = self.connections.get_mut(&connection_id) {
968 connection.projects.remove(&project_id);
969 }
970
971 Ok(LeftProject {
972 id: project.id,
973 host_connection_id: project.host_connection_id,
974 host_user_id: project.host.user_id,
975 connection_ids: project.connection_ids(),
976 remove_collaborator,
977 })
978 }
979
980 #[allow(clippy::too_many_arguments)]
981 pub fn update_worktree(
982 &mut self,
983 connection_id: ConnectionId,
984 project_id: ProjectId,
985 worktree_id: u64,
986 worktree_root_name: &str,
987 removed_entries: &[u64],
988 updated_entries: &[proto::Entry],
989 scan_id: u64,
990 is_last_update: bool,
991 ) -> Result<Vec<ConnectionId>> {
992 let project = self.write_project(project_id, connection_id)?;
993
994 let connection_ids = project.connection_ids();
995 let mut worktree = project.worktrees.entry(worktree_id).or_default();
996 worktree.root_name = worktree_root_name.to_string();
997
998 for entry_id in removed_entries {
999 worktree.entries.remove(entry_id);
1000 }
1001
1002 for entry in updated_entries {
1003 worktree.entries.insert(entry.id, entry.clone());
1004 }
1005
1006 worktree.scan_id = scan_id;
1007 worktree.is_complete = is_last_update;
1008 Ok(connection_ids)
1009 }
1010
1011 fn build_participant_project(
1012 project_id: ProjectId,
1013 projects: &BTreeMap<ProjectId, Project>,
1014 ) -> Option<proto::ParticipantProject> {
1015 Some(proto::ParticipantProject {
1016 id: project_id.to_proto(),
1017 worktree_root_names: projects
1018 .get(&project_id)?
1019 .worktrees
1020 .values()
1021 .filter(|worktree| worktree.visible)
1022 .map(|worktree| worktree.root_name.clone())
1023 .collect(),
1024 })
1025 }
1026
1027 pub fn project_connection_ids(
1028 &self,
1029 project_id: ProjectId,
1030 acting_connection_id: ConnectionId,
1031 ) -> Result<Vec<ConnectionId>> {
1032 Ok(self
1033 .read_project(project_id, acting_connection_id)?
1034 .connection_ids())
1035 }
1036
1037 pub fn channel_connection_ids(&self, channel_id: ChannelId) -> Result<Vec<ConnectionId>> {
1038 Ok(self
1039 .channels
1040 .get(&channel_id)
1041 .ok_or_else(|| anyhow!("no such channel"))?
1042 .connection_ids())
1043 }
1044
1045 pub fn project(&self, project_id: ProjectId) -> Result<&Project> {
1046 self.projects
1047 .get(&project_id)
1048 .ok_or_else(|| anyhow!("no such project"))
1049 }
1050
1051 pub fn register_project_activity(
1052 &mut self,
1053 project_id: ProjectId,
1054 connection_id: ConnectionId,
1055 ) -> Result<()> {
1056 let project = self
1057 .projects
1058 .get_mut(&project_id)
1059 .ok_or_else(|| anyhow!("no such project"))?;
1060 let collaborator = if connection_id == project.host_connection_id {
1061 &mut project.host
1062 } else if let Some(guest) = project.guests.get_mut(&connection_id) {
1063 guest
1064 } else {
1065 return Err(anyhow!("no such project"))?;
1066 };
1067 collaborator.last_activity = Some(OffsetDateTime::now_utc());
1068 Ok(())
1069 }
1070
1071 pub fn projects(&self) -> impl Iterator<Item = (&ProjectId, &Project)> {
1072 self.projects.iter()
1073 }
1074
1075 pub fn read_project(
1076 &self,
1077 project_id: ProjectId,
1078 connection_id: ConnectionId,
1079 ) -> Result<&Project> {
1080 let project = self
1081 .projects
1082 .get(&project_id)
1083 .ok_or_else(|| anyhow!("no such project"))?;
1084 if project.host_connection_id == connection_id
1085 || project.guests.contains_key(&connection_id)
1086 {
1087 Ok(project)
1088 } else {
1089 Err(anyhow!("no such project"))?
1090 }
1091 }
1092
1093 fn write_project(
1094 &mut self,
1095 project_id: ProjectId,
1096 connection_id: ConnectionId,
1097 ) -> Result<&mut Project> {
1098 let project = self
1099 .projects
1100 .get_mut(&project_id)
1101 .ok_or_else(|| anyhow!("no such project"))?;
1102 if project.host_connection_id == connection_id
1103 || project.guests.contains_key(&connection_id)
1104 {
1105 Ok(project)
1106 } else {
1107 Err(anyhow!("no such project"))?
1108 }
1109 }
1110
1111 #[cfg(test)]
1112 pub fn check_invariants(&self) {
1113 for (connection_id, connection) in &self.connections {
1114 for project_id in &connection.projects {
1115 let project = &self.projects.get(project_id).unwrap();
1116 if project.host_connection_id != *connection_id {
1117 assert!(project.guests.contains_key(connection_id));
1118 }
1119
1120 for (worktree_id, worktree) in project.worktrees.iter() {
1121 let mut paths = HashMap::default();
1122 for entry in worktree.entries.values() {
1123 let prev_entry = paths.insert(&entry.path, entry);
1124 assert_eq!(
1125 prev_entry,
1126 None,
1127 "worktree {:?}, duplicate path for entries {:?} and {:?}",
1128 worktree_id,
1129 prev_entry.unwrap(),
1130 entry
1131 );
1132 }
1133 }
1134 }
1135 for channel_id in &connection.channels {
1136 let channel = self.channels.get(channel_id).unwrap();
1137 assert!(channel.connection_ids.contains(connection_id));
1138 }
1139 assert!(self
1140 .connected_users
1141 .get(&connection.user_id)
1142 .unwrap()
1143 .connection_ids
1144 .contains(connection_id));
1145 }
1146
1147 for (user_id, state) in &self.connected_users {
1148 for connection_id in &state.connection_ids {
1149 assert_eq!(
1150 self.connections.get(connection_id).unwrap().user_id,
1151 *user_id
1152 );
1153 }
1154
1155 if let Some(active_call) = state.active_call.as_ref() {
1156 if let Some(active_call_connection_id) = active_call.connection_id {
1157 assert!(
1158 state.connection_ids.contains(&active_call_connection_id),
1159 "call is active on a dead connection"
1160 );
1161 assert!(
1162 state.connection_ids.contains(&active_call_connection_id),
1163 "call is active on a dead connection"
1164 );
1165 }
1166 }
1167 }
1168
1169 for (room_id, room) in &self.rooms {
1170 for pending_user_id in &room.pending_participant_user_ids {
1171 assert!(
1172 self.connected_users
1173 .contains_key(&UserId::from_proto(*pending_user_id)),
1174 "call is active on a user that has disconnected"
1175 );
1176 }
1177
1178 for participant in &room.participants {
1179 assert!(
1180 self.connections
1181 .contains_key(&ConnectionId(participant.peer_id)),
1182 "room contains participant that has disconnected"
1183 );
1184
1185 for participant_project in &participant.projects {
1186 let project = &self.projects[&ProjectId::from_proto(participant_project.id)];
1187 assert_eq!(
1188 project.room_id, *room_id,
1189 "project was shared on a different room"
1190 );
1191 }
1192 }
1193
1194 assert!(
1195 !room.pending_participant_user_ids.is_empty() || !room.participants.is_empty(),
1196 "room can't be empty"
1197 );
1198 }
1199
1200 for (project_id, project) in &self.projects {
1201 let host_connection = self.connections.get(&project.host_connection_id).unwrap();
1202 assert!(host_connection.projects.contains(project_id));
1203
1204 for guest_connection_id in project.guests.keys() {
1205 let guest_connection = self.connections.get(guest_connection_id).unwrap();
1206 assert!(guest_connection.projects.contains(project_id));
1207 }
1208 assert_eq!(project.active_replica_ids.len(), project.guests.len());
1209 assert_eq!(
1210 project.active_replica_ids,
1211 project
1212 .guests
1213 .values()
1214 .map(|guest| guest.replica_id)
1215 .collect::<HashSet<_>>(),
1216 );
1217
1218 let room = &self.rooms[&project.room_id];
1219 let room_participant = room
1220 .participants
1221 .iter()
1222 .find(|participant| participant.peer_id == project.host_connection_id.0)
1223 .unwrap();
1224 assert!(
1225 room_participant
1226 .projects
1227 .iter()
1228 .any(|project| project.id == project_id.to_proto()),
1229 "project was not shared in room"
1230 );
1231 }
1232
1233 for (channel_id, channel) in &self.channels {
1234 for connection_id in &channel.connection_ids {
1235 let connection = self.connections.get(connection_id).unwrap();
1236 assert!(connection.channels.contains(channel_id));
1237 }
1238 }
1239 }
1240}
1241
1242impl Project {
1243 fn is_active_since(&self, start_time: OffsetDateTime) -> bool {
1244 self.guests
1245 .values()
1246 .chain([&self.host])
1247 .any(|collaborator| {
1248 collaborator
1249 .last_activity
1250 .map_or(false, |active_time| active_time > start_time)
1251 })
1252 }
1253
1254 pub fn guest_connection_ids(&self) -> Vec<ConnectionId> {
1255 self.guests.keys().copied().collect()
1256 }
1257
1258 pub fn connection_ids(&self) -> Vec<ConnectionId> {
1259 self.guests
1260 .keys()
1261 .copied()
1262 .chain(Some(self.host_connection_id))
1263 .collect()
1264 }
1265}
1266
1267impl Channel {
1268 fn connection_ids(&self) -> Vec<ConnectionId> {
1269 self.connection_ids.iter().copied().collect()
1270 }
1271}