1use crate::db::{self, ChannelId, ProjectId, UserId};
2use anyhow::{anyhow, Result};
3use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet};
4use rpc::{proto, ConnectionId};
5use serde::Serialize;
6use std::{mem, path::PathBuf, str, time::Duration};
7use time::OffsetDateTime;
8use tracing::instrument;
9use util::post_inc;
10
11pub type RoomId = u64;
12
13#[derive(Default, Serialize)]
14pub struct Store {
15 connections: BTreeMap<ConnectionId, ConnectionState>,
16 connected_users: BTreeMap<UserId, ConnectedUser>,
17 next_room_id: RoomId,
18 rooms: BTreeMap<RoomId, proto::Room>,
19 projects: BTreeMap<ProjectId, Project>,
20 #[serde(skip)]
21 channels: BTreeMap<ChannelId, Channel>,
22}
23
24#[derive(Default, Serialize)]
25struct ConnectedUser {
26 connection_ids: HashSet<ConnectionId>,
27 active_call: Option<Call>,
28}
29
30#[derive(Serialize)]
31struct ConnectionState {
32 user_id: UserId,
33 admin: bool,
34 projects: BTreeSet<ProjectId>,
35 channels: HashSet<ChannelId>,
36}
37
38#[derive(Copy, Clone, Eq, PartialEq, Serialize)]
39pub struct Call {
40 pub caller_user_id: UserId,
41 pub room_id: RoomId,
42 pub connection_id: Option<ConnectionId>,
43 pub initial_project_id: Option<ProjectId>,
44}
45
46#[derive(Serialize)]
47pub struct Project {
48 pub id: ProjectId,
49 pub room_id: RoomId,
50 pub host_connection_id: ConnectionId,
51 pub host: Collaborator,
52 pub guests: HashMap<ConnectionId, Collaborator>,
53 pub active_replica_ids: HashSet<ReplicaId>,
54 pub worktrees: BTreeMap<u64, Worktree>,
55 pub language_servers: Vec<proto::LanguageServer>,
56}
57
58#[derive(Serialize)]
59pub struct Collaborator {
60 pub replica_id: ReplicaId,
61 pub user_id: UserId,
62 #[serde(skip)]
63 pub last_activity: Option<OffsetDateTime>,
64 pub admin: bool,
65}
66
67#[derive(Default, Serialize)]
68pub struct Worktree {
69 pub abs_path: PathBuf,
70 pub root_name: String,
71 pub visible: bool,
72 #[serde(skip)]
73 pub entries: BTreeMap<u64, proto::Entry>,
74 #[serde(skip)]
75 pub diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
76 pub scan_id: u64,
77 pub is_complete: bool,
78}
79
80#[derive(Default)]
81pub struct Channel {
82 pub connection_ids: HashSet<ConnectionId>,
83}
84
85pub type ReplicaId = u16;
86
87#[derive(Default)]
88pub struct RemovedConnectionState {
89 pub user_id: UserId,
90 pub hosted_projects: Vec<Project>,
91 pub guest_projects: Vec<LeftProject>,
92 pub contact_ids: HashSet<UserId>,
93 pub room_id: Option<RoomId>,
94 pub canceled_call_connection_ids: Vec<ConnectionId>,
95}
96
97pub struct LeftProject {
98 pub id: ProjectId,
99 pub host_user_id: UserId,
100 pub host_connection_id: ConnectionId,
101 pub connection_ids: Vec<ConnectionId>,
102 pub remove_collaborator: bool,
103}
104
105pub struct LeftRoom<'a> {
106 pub room: Option<&'a proto::Room>,
107 pub unshared_projects: Vec<Project>,
108 pub left_projects: Vec<LeftProject>,
109 pub canceled_call_connection_ids: Vec<ConnectionId>,
110}
111
112#[derive(Copy, Clone)]
113pub struct Metrics {
114 pub connections: usize,
115 pub registered_projects: usize,
116 pub active_projects: usize,
117 pub shared_projects: usize,
118}
119
120impl Store {
121 pub fn metrics(&self) -> Metrics {
122 const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60);
123 let active_window_start = OffsetDateTime::now_utc() - ACTIVE_PROJECT_TIMEOUT;
124
125 let connections = self.connections.values().filter(|c| !c.admin).count();
126 let mut registered_projects = 0;
127 let mut active_projects = 0;
128 let mut shared_projects = 0;
129 for project in self.projects.values() {
130 if let Some(connection) = self.connections.get(&project.host_connection_id) {
131 if !connection.admin {
132 registered_projects += 1;
133 if project.is_active_since(active_window_start) {
134 active_projects += 1;
135 if !project.guests.is_empty() {
136 shared_projects += 1;
137 }
138 }
139 }
140 }
141 }
142
143 Metrics {
144 connections,
145 registered_projects,
146 active_projects,
147 shared_projects,
148 }
149 }
150
151 #[instrument(skip(self))]
152 pub fn add_connection(
153 &mut self,
154 connection_id: ConnectionId,
155 user_id: UserId,
156 admin: bool,
157 ) -> Option<proto::IncomingCall> {
158 self.connections.insert(
159 connection_id,
160 ConnectionState {
161 user_id,
162 admin,
163 projects: Default::default(),
164 channels: Default::default(),
165 },
166 );
167 let connected_user = self.connected_users.entry(user_id).or_default();
168 connected_user.connection_ids.insert(connection_id);
169 if let Some(active_call) = connected_user.active_call {
170 if active_call.connection_id.is_some() {
171 None
172 } else {
173 let room = self.room(active_call.room_id)?;
174 Some(proto::IncomingCall {
175 room_id: active_call.room_id,
176 caller_user_id: active_call.caller_user_id.to_proto(),
177 participant_user_ids: room
178 .participants
179 .iter()
180 .map(|participant| participant.user_id)
181 .collect(),
182 initial_project: active_call
183 .initial_project_id
184 .and_then(|id| Self::build_participant_project(id, &self.projects)),
185 })
186 }
187 } else {
188 None
189 }
190 }
191
192 #[instrument(skip(self))]
193 pub fn remove_connection(
194 &mut self,
195 connection_id: ConnectionId,
196 ) -> Result<RemovedConnectionState> {
197 let connection = self
198 .connections
199 .get_mut(&connection_id)
200 .ok_or_else(|| anyhow!("no such connection"))?;
201
202 let user_id = connection.user_id;
203 let connection_channels = mem::take(&mut connection.channels);
204
205 let mut result = RemovedConnectionState {
206 user_id,
207 ..Default::default()
208 };
209
210 // Leave all channels.
211 for channel_id in connection_channels {
212 self.leave_channel(connection_id, channel_id);
213 }
214
215 let connected_user = self.connected_users.get(&user_id).unwrap();
216 if let Some(active_call) = connected_user.active_call.as_ref() {
217 let room_id = active_call.room_id;
218 if active_call.connection_id == Some(connection_id) {
219 let left_room = self.leave_room(room_id, connection_id)?;
220 result.hosted_projects = left_room.unshared_projects;
221 result.guest_projects = left_room.left_projects;
222 result.room_id = Some(room_id);
223 result.canceled_call_connection_ids = left_room.canceled_call_connection_ids;
224 } else if connected_user.connection_ids.len() == 1 {
225 self.decline_call(room_id, connection_id)?;
226 result.room_id = Some(room_id);
227 }
228 }
229
230 let connected_user = self.connected_users.get_mut(&user_id).unwrap();
231 connected_user.connection_ids.remove(&connection_id);
232 if connected_user.connection_ids.is_empty() {
233 self.connected_users.remove(&user_id);
234 }
235 self.connections.remove(&connection_id).unwrap();
236
237 Ok(result)
238 }
239
240 #[cfg(test)]
241 pub fn channel(&self, id: ChannelId) -> Option<&Channel> {
242 self.channels.get(&id)
243 }
244
245 pub fn join_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
246 if let Some(connection) = self.connections.get_mut(&connection_id) {
247 connection.channels.insert(channel_id);
248 self.channels
249 .entry(channel_id)
250 .or_default()
251 .connection_ids
252 .insert(connection_id);
253 }
254 }
255
256 pub fn leave_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
257 if let Some(connection) = self.connections.get_mut(&connection_id) {
258 connection.channels.remove(&channel_id);
259 if let btree_map::Entry::Occupied(mut entry) = self.channels.entry(channel_id) {
260 entry.get_mut().connection_ids.remove(&connection_id);
261 if entry.get_mut().connection_ids.is_empty() {
262 entry.remove();
263 }
264 }
265 }
266 }
267
268 pub fn user_id_for_connection(&self, connection_id: ConnectionId) -> Result<UserId> {
269 Ok(self
270 .connections
271 .get(&connection_id)
272 .ok_or_else(|| anyhow!("unknown connection"))?
273 .user_id)
274 }
275
276 pub fn connection_ids_for_user(
277 &self,
278 user_id: UserId,
279 ) -> impl Iterator<Item = ConnectionId> + '_ {
280 self.connected_users
281 .get(&user_id)
282 .into_iter()
283 .map(|state| &state.connection_ids)
284 .flatten()
285 .copied()
286 }
287
288 pub fn is_user_online(&self, user_id: UserId) -> bool {
289 !self
290 .connected_users
291 .get(&user_id)
292 .unwrap_or(&Default::default())
293 .connection_ids
294 .is_empty()
295 }
296
297 fn is_user_busy(&self, user_id: UserId) -> bool {
298 self.connected_users
299 .get(&user_id)
300 .unwrap_or(&Default::default())
301 .active_call
302 .is_some()
303 }
304
305 pub fn build_initial_contacts_update(
306 &self,
307 contacts: Vec<db::Contact>,
308 ) -> proto::UpdateContacts {
309 let mut update = proto::UpdateContacts::default();
310
311 for contact in contacts {
312 match contact {
313 db::Contact::Accepted {
314 user_id,
315 should_notify,
316 } => {
317 update
318 .contacts
319 .push(self.contact_for_user(user_id, should_notify));
320 }
321 db::Contact::Outgoing { user_id } => {
322 update.outgoing_requests.push(user_id.to_proto())
323 }
324 db::Contact::Incoming {
325 user_id,
326 should_notify,
327 } => update
328 .incoming_requests
329 .push(proto::IncomingContactRequest {
330 requester_id: user_id.to_proto(),
331 should_notify,
332 }),
333 }
334 }
335
336 update
337 }
338
339 pub fn contact_for_user(&self, user_id: UserId, should_notify: bool) -> proto::Contact {
340 proto::Contact {
341 user_id: user_id.to_proto(),
342 online: self.is_user_online(user_id),
343 busy: self.is_user_busy(user_id),
344 should_notify,
345 }
346 }
347
348 pub fn create_room(&mut self, creator_connection_id: ConnectionId) -> Result<RoomId> {
349 let connection = self
350 .connections
351 .get_mut(&creator_connection_id)
352 .ok_or_else(|| anyhow!("no such connection"))?;
353 let connected_user = self
354 .connected_users
355 .get_mut(&connection.user_id)
356 .ok_or_else(|| anyhow!("no such connection"))?;
357 anyhow::ensure!(
358 connected_user.active_call.is_none(),
359 "can't create a room with an active call"
360 );
361
362 let mut room = proto::Room::default();
363 room.participants.push(proto::Participant {
364 user_id: connection.user_id.to_proto(),
365 peer_id: creator_connection_id.0,
366 projects: Default::default(),
367 location: Some(proto::ParticipantLocation {
368 variant: Some(proto::participant_location::Variant::External(
369 proto::participant_location::External {},
370 )),
371 }),
372 });
373
374 let room_id = post_inc(&mut self.next_room_id);
375 self.rooms.insert(room_id, room);
376 connected_user.active_call = Some(Call {
377 caller_user_id: connection.user_id,
378 room_id,
379 connection_id: Some(creator_connection_id),
380 initial_project_id: None,
381 });
382 Ok(room_id)
383 }
384
385 pub fn join_room(
386 &mut self,
387 room_id: RoomId,
388 connection_id: ConnectionId,
389 ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
390 let connection = self
391 .connections
392 .get_mut(&connection_id)
393 .ok_or_else(|| anyhow!("no such connection"))?;
394 let user_id = connection.user_id;
395 let recipient_connection_ids = self.connection_ids_for_user(user_id).collect::<Vec<_>>();
396
397 let connected_user = self
398 .connected_users
399 .get_mut(&user_id)
400 .ok_or_else(|| anyhow!("no such connection"))?;
401 let active_call = connected_user
402 .active_call
403 .as_mut()
404 .ok_or_else(|| anyhow!("not being called"))?;
405 anyhow::ensure!(
406 active_call.room_id == room_id && active_call.connection_id.is_none(),
407 "not being called on this room"
408 );
409
410 let room = self
411 .rooms
412 .get_mut(&room_id)
413 .ok_or_else(|| anyhow!("no such room"))?;
414 anyhow::ensure!(
415 room.pending_participant_user_ids
416 .contains(&user_id.to_proto()),
417 anyhow!("no such room")
418 );
419 room.pending_participant_user_ids
420 .retain(|pending| *pending != user_id.to_proto());
421 room.participants.push(proto::Participant {
422 user_id: user_id.to_proto(),
423 peer_id: connection_id.0,
424 projects: Default::default(),
425 location: Some(proto::ParticipantLocation {
426 variant: Some(proto::participant_location::Variant::External(
427 proto::participant_location::External {},
428 )),
429 }),
430 });
431 active_call.connection_id = Some(connection_id);
432
433 Ok((room, recipient_connection_ids))
434 }
435
436 pub fn leave_room(&mut self, room_id: RoomId, connection_id: ConnectionId) -> Result<LeftRoom> {
437 let connection = self
438 .connections
439 .get_mut(&connection_id)
440 .ok_or_else(|| anyhow!("no such connection"))?;
441 let user_id = connection.user_id;
442
443 let connected_user = self
444 .connected_users
445 .get(&user_id)
446 .ok_or_else(|| anyhow!("no such connection"))?;
447 anyhow::ensure!(
448 connected_user
449 .active_call
450 .map_or(false, |call| call.room_id == room_id
451 && call.connection_id == Some(connection_id)),
452 "cannot leave a room before joining it"
453 );
454
455 // Given that users can only join one room at a time, we can safely unshare
456 // and leave all projects associated with the connection.
457 let mut unshared_projects = Vec::new();
458 let mut left_projects = Vec::new();
459 for project_id in connection.projects.clone() {
460 if let Ok((_, project)) = self.unshare_project(project_id, connection_id) {
461 unshared_projects.push(project);
462 } else if let Ok(project) = self.leave_project(project_id, connection_id) {
463 left_projects.push(project);
464 }
465 }
466 self.connected_users.get_mut(&user_id).unwrap().active_call = None;
467
468 let room = self
469 .rooms
470 .get_mut(&room_id)
471 .ok_or_else(|| anyhow!("no such room"))?;
472 room.participants
473 .retain(|participant| participant.peer_id != connection_id.0);
474
475 let mut canceled_call_connection_ids = Vec::new();
476 room.pending_participant_user_ids
477 .retain(|pending_participant_user_id| {
478 if let Some(connected_user) = self
479 .connected_users
480 .get_mut(&UserId::from_proto(*pending_participant_user_id))
481 {
482 if let Some(call) = connected_user.active_call.as_ref() {
483 if call.caller_user_id == user_id {
484 connected_user.active_call.take();
485 canceled_call_connection_ids
486 .extend(connected_user.connection_ids.iter().copied());
487 false
488 } else {
489 true
490 }
491 } else {
492 true
493 }
494 } else {
495 true
496 }
497 });
498
499 if room.participants.is_empty() && room.pending_participant_user_ids.is_empty() {
500 self.rooms.remove(&room_id);
501 }
502
503 Ok(LeftRoom {
504 room: self.rooms.get(&room_id),
505 unshared_projects,
506 left_projects,
507 canceled_call_connection_ids,
508 })
509 }
510
511 pub fn room(&self, room_id: RoomId) -> Option<&proto::Room> {
512 self.rooms.get(&room_id)
513 }
514
515 pub fn call(
516 &mut self,
517 room_id: RoomId,
518 recipient_user_id: UserId,
519 initial_project_id: Option<ProjectId>,
520 from_connection_id: ConnectionId,
521 ) -> Result<(&proto::Room, Vec<ConnectionId>, proto::IncomingCall)> {
522 let caller_user_id = self.user_id_for_connection(from_connection_id)?;
523
524 let recipient_connection_ids = self
525 .connection_ids_for_user(recipient_user_id)
526 .collect::<Vec<_>>();
527 let mut recipient = self
528 .connected_users
529 .get_mut(&recipient_user_id)
530 .ok_or_else(|| anyhow!("no such connection"))?;
531 anyhow::ensure!(
532 recipient.active_call.is_none(),
533 "recipient is already on another call"
534 );
535
536 let room = self
537 .rooms
538 .get_mut(&room_id)
539 .ok_or_else(|| anyhow!("no such room"))?;
540 anyhow::ensure!(
541 room.participants
542 .iter()
543 .any(|participant| participant.peer_id == from_connection_id.0),
544 "no such room"
545 );
546 anyhow::ensure!(
547 room.pending_participant_user_ids
548 .iter()
549 .all(|user_id| UserId::from_proto(*user_id) != recipient_user_id),
550 "cannot call the same user more than once"
551 );
552 room.pending_participant_user_ids
553 .push(recipient_user_id.to_proto());
554
555 if let Some(initial_project_id) = initial_project_id {
556 let project = self
557 .projects
558 .get(&initial_project_id)
559 .ok_or_else(|| anyhow!("no such project"))?;
560 anyhow::ensure!(project.room_id == room_id, "no such project");
561 }
562
563 recipient.active_call = Some(Call {
564 caller_user_id,
565 room_id,
566 connection_id: None,
567 initial_project_id,
568 });
569
570 Ok((
571 room,
572 recipient_connection_ids,
573 proto::IncomingCall {
574 room_id,
575 caller_user_id: caller_user_id.to_proto(),
576 participant_user_ids: room
577 .participants
578 .iter()
579 .map(|participant| participant.user_id)
580 .collect(),
581 initial_project: initial_project_id
582 .and_then(|id| Self::build_participant_project(id, &self.projects)),
583 },
584 ))
585 }
586
587 pub fn call_failed(&mut self, room_id: RoomId, to_user_id: UserId) -> Result<&proto::Room> {
588 let mut recipient = self
589 .connected_users
590 .get_mut(&to_user_id)
591 .ok_or_else(|| anyhow!("no such connection"))?;
592 anyhow::ensure!(recipient
593 .active_call
594 .map_or(false, |call| call.room_id == room_id
595 && call.connection_id.is_none()));
596 recipient.active_call = None;
597 let room = self
598 .rooms
599 .get_mut(&room_id)
600 .ok_or_else(|| anyhow!("no such room"))?;
601 room.pending_participant_user_ids
602 .retain(|user_id| UserId::from_proto(*user_id) != to_user_id);
603 Ok(room)
604 }
605
606 pub fn cancel_call(
607 &mut self,
608 room_id: RoomId,
609 recipient_user_id: UserId,
610 canceller_connection_id: ConnectionId,
611 ) -> Result<(&proto::Room, HashSet<ConnectionId>)> {
612 let canceller_user_id = self.user_id_for_connection(canceller_connection_id)?;
613 let canceller = self
614 .connected_users
615 .get(&canceller_user_id)
616 .ok_or_else(|| anyhow!("no such connection"))?;
617 let recipient = self
618 .connected_users
619 .get(&recipient_user_id)
620 .ok_or_else(|| anyhow!("no such connection"))?;
621 let canceller_active_call = canceller
622 .active_call
623 .as_ref()
624 .ok_or_else(|| anyhow!("no active call"))?;
625 let recipient_active_call = recipient
626 .active_call
627 .as_ref()
628 .ok_or_else(|| anyhow!("no active call for recipient"))?;
629
630 anyhow::ensure!(
631 canceller_active_call.room_id == room_id,
632 "users are on different calls"
633 );
634 anyhow::ensure!(
635 recipient_active_call.room_id == room_id,
636 "users are on different calls"
637 );
638 anyhow::ensure!(
639 recipient_active_call.connection_id.is_none(),
640 "recipient has already answered"
641 );
642 let room_id = recipient_active_call.room_id;
643 let room = self
644 .rooms
645 .get_mut(&room_id)
646 .ok_or_else(|| anyhow!("no such room"))?;
647 room.pending_participant_user_ids
648 .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
649
650 let recipient = self.connected_users.get_mut(&recipient_user_id).unwrap();
651 recipient.active_call.take();
652
653 Ok((room, recipient.connection_ids.clone()))
654 }
655
656 pub fn decline_call(
657 &mut self,
658 room_id: RoomId,
659 recipient_connection_id: ConnectionId,
660 ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
661 let recipient_user_id = self.user_id_for_connection(recipient_connection_id)?;
662 let recipient = self
663 .connected_users
664 .get_mut(&recipient_user_id)
665 .ok_or_else(|| anyhow!("no such connection"))?;
666 if let Some(active_call) = recipient.active_call.take() {
667 anyhow::ensure!(active_call.room_id == room_id, "no such room");
668 let recipient_connection_ids = self
669 .connection_ids_for_user(recipient_user_id)
670 .collect::<Vec<_>>();
671 let room = self
672 .rooms
673 .get_mut(&active_call.room_id)
674 .ok_or_else(|| anyhow!("no such room"))?;
675 room.pending_participant_user_ids
676 .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
677 Ok((room, recipient_connection_ids))
678 } else {
679 Err(anyhow!("user is not being called"))
680 }
681 }
682
683 pub fn update_participant_location(
684 &mut self,
685 room_id: RoomId,
686 location: proto::ParticipantLocation,
687 connection_id: ConnectionId,
688 ) -> Result<&proto::Room> {
689 let room = self
690 .rooms
691 .get_mut(&room_id)
692 .ok_or_else(|| anyhow!("no such room"))?;
693 if let Some(proto::participant_location::Variant::SharedProject(project)) =
694 location.variant.as_ref()
695 {
696 anyhow::ensure!(
697 room.participants
698 .iter()
699 .flat_map(|participant| &participant.projects)
700 .any(|participant_project| participant_project.id == project.id),
701 "no such project"
702 );
703 }
704
705 let participant = room
706 .participants
707 .iter_mut()
708 .find(|participant| participant.peer_id == connection_id.0)
709 .ok_or_else(|| anyhow!("no such room"))?;
710 participant.location = Some(location);
711
712 Ok(room)
713 }
714
715 pub fn share_project(
716 &mut self,
717 room_id: RoomId,
718 project_id: ProjectId,
719 worktrees: Vec<proto::WorktreeMetadata>,
720 host_connection_id: ConnectionId,
721 ) -> Result<&proto::Room> {
722 let connection = self
723 .connections
724 .get_mut(&host_connection_id)
725 .ok_or_else(|| anyhow!("no such connection"))?;
726
727 let room = self
728 .rooms
729 .get_mut(&room_id)
730 .ok_or_else(|| anyhow!("no such room"))?;
731 let participant = room
732 .participants
733 .iter_mut()
734 .find(|participant| participant.peer_id == host_connection_id.0)
735 .ok_or_else(|| anyhow!("no such room"))?;
736
737 connection.projects.insert(project_id);
738 self.projects.insert(
739 project_id,
740 Project {
741 id: project_id,
742 room_id,
743 host_connection_id,
744 host: Collaborator {
745 user_id: connection.user_id,
746 replica_id: 0,
747 last_activity: None,
748 admin: connection.admin,
749 },
750 guests: Default::default(),
751 active_replica_ids: Default::default(),
752 worktrees: worktrees
753 .into_iter()
754 .map(|worktree| {
755 (
756 worktree.id,
757 Worktree {
758 root_name: worktree.root_name,
759 visible: worktree.visible,
760 ..Default::default()
761 },
762 )
763 })
764 .collect(),
765 language_servers: Default::default(),
766 },
767 );
768
769 participant
770 .projects
771 .extend(Self::build_participant_project(project_id, &self.projects));
772
773 Ok(room)
774 }
775
776 pub fn unshare_project(
777 &mut self,
778 project_id: ProjectId,
779 connection_id: ConnectionId,
780 ) -> Result<(&proto::Room, Project)> {
781 match self.projects.entry(project_id) {
782 btree_map::Entry::Occupied(e) => {
783 if e.get().host_connection_id == connection_id {
784 let project = e.remove();
785
786 if let Some(host_connection) = self.connections.get_mut(&connection_id) {
787 host_connection.projects.remove(&project_id);
788 }
789
790 for guest_connection in project.guests.keys() {
791 if let Some(connection) = self.connections.get_mut(guest_connection) {
792 connection.projects.remove(&project_id);
793 }
794 }
795
796 let room = self
797 .rooms
798 .get_mut(&project.room_id)
799 .ok_or_else(|| anyhow!("no such room"))?;
800 let participant = room
801 .participants
802 .iter_mut()
803 .find(|participant| participant.peer_id == connection_id.0)
804 .ok_or_else(|| anyhow!("no such room"))?;
805 participant
806 .projects
807 .retain(|project| project.id != project_id.to_proto());
808
809 Ok((room, project))
810 } else {
811 Err(anyhow!("no such project"))?
812 }
813 }
814 btree_map::Entry::Vacant(_) => Err(anyhow!("no such project"))?,
815 }
816 }
817
818 pub fn update_project(
819 &mut self,
820 project_id: ProjectId,
821 worktrees: &[proto::WorktreeMetadata],
822 connection_id: ConnectionId,
823 ) -> Result<&proto::Room> {
824 let project = self
825 .projects
826 .get_mut(&project_id)
827 .ok_or_else(|| anyhow!("no such project"))?;
828 if project.host_connection_id == connection_id {
829 let mut old_worktrees = mem::take(&mut project.worktrees);
830 for worktree in worktrees {
831 if let Some(old_worktree) = old_worktrees.remove(&worktree.id) {
832 project.worktrees.insert(worktree.id, old_worktree);
833 } else {
834 project.worktrees.insert(
835 worktree.id,
836 Worktree {
837 root_name: worktree.root_name.clone(),
838 visible: worktree.visible,
839 ..Default::default()
840 },
841 );
842 }
843 }
844
845 let room = self
846 .rooms
847 .get_mut(&project.room_id)
848 .ok_or_else(|| anyhow!("no such room"))?;
849 let participant_project = room
850 .participants
851 .iter_mut()
852 .flat_map(|participant| &mut participant.projects)
853 .find(|project| project.id == project_id.to_proto())
854 .ok_or_else(|| anyhow!("no such project"))?;
855 participant_project.worktree_root_names = worktrees
856 .iter()
857 .filter(|worktree| worktree.visible)
858 .map(|worktree| worktree.root_name.clone())
859 .collect();
860
861 Ok(room)
862 } else {
863 Err(anyhow!("no such project"))?
864 }
865 }
866
867 pub fn update_diagnostic_summary(
868 &mut self,
869 project_id: ProjectId,
870 worktree_id: u64,
871 connection_id: ConnectionId,
872 summary: proto::DiagnosticSummary,
873 ) -> Result<Vec<ConnectionId>> {
874 let project = self
875 .projects
876 .get_mut(&project_id)
877 .ok_or_else(|| anyhow!("no such project"))?;
878 if project.host_connection_id == connection_id {
879 let worktree = project
880 .worktrees
881 .get_mut(&worktree_id)
882 .ok_or_else(|| anyhow!("no such worktree"))?;
883 worktree
884 .diagnostic_summaries
885 .insert(summary.path.clone().into(), summary);
886 return Ok(project.connection_ids());
887 }
888
889 Err(anyhow!("no such worktree"))?
890 }
891
892 pub fn start_language_server(
893 &mut self,
894 project_id: ProjectId,
895 connection_id: ConnectionId,
896 language_server: proto::LanguageServer,
897 ) -> Result<Vec<ConnectionId>> {
898 let project = self
899 .projects
900 .get_mut(&project_id)
901 .ok_or_else(|| anyhow!("no such project"))?;
902 if project.host_connection_id == connection_id {
903 project.language_servers.push(language_server);
904 return Ok(project.connection_ids());
905 }
906
907 Err(anyhow!("no such project"))?
908 }
909
910 pub fn join_project(
911 &mut self,
912 requester_connection_id: ConnectionId,
913 project_id: ProjectId,
914 ) -> Result<(&Project, ReplicaId)> {
915 let connection = self
916 .connections
917 .get_mut(&requester_connection_id)
918 .ok_or_else(|| anyhow!("no such connection"))?;
919 let user = self
920 .connected_users
921 .get(&connection.user_id)
922 .ok_or_else(|| anyhow!("no such connection"))?;
923 let active_call = user.active_call.ok_or_else(|| anyhow!("no such project"))?;
924 anyhow::ensure!(
925 active_call.connection_id == Some(requester_connection_id),
926 "no such project"
927 );
928
929 let project = self
930 .projects
931 .get_mut(&project_id)
932 .ok_or_else(|| anyhow!("no such project"))?;
933 anyhow::ensure!(project.room_id == active_call.room_id, "no such project");
934
935 connection.projects.insert(project_id);
936 let mut replica_id = 1;
937 while project.active_replica_ids.contains(&replica_id) {
938 replica_id += 1;
939 }
940 project.active_replica_ids.insert(replica_id);
941 project.guests.insert(
942 requester_connection_id,
943 Collaborator {
944 replica_id,
945 user_id: connection.user_id,
946 last_activity: Some(OffsetDateTime::now_utc()),
947 admin: connection.admin,
948 },
949 );
950
951 project.host.last_activity = Some(OffsetDateTime::now_utc());
952 Ok((project, replica_id))
953 }
954
955 pub fn leave_project(
956 &mut self,
957 project_id: ProjectId,
958 connection_id: ConnectionId,
959 ) -> Result<LeftProject> {
960 let project = self
961 .projects
962 .get_mut(&project_id)
963 .ok_or_else(|| anyhow!("no such project"))?;
964
965 // If the connection leaving the project is a collaborator, remove it.
966 let remove_collaborator = if let Some(guest) = project.guests.remove(&connection_id) {
967 project.active_replica_ids.remove(&guest.replica_id);
968 true
969 } else {
970 false
971 };
972
973 if let Some(connection) = self.connections.get_mut(&connection_id) {
974 connection.projects.remove(&project_id);
975 }
976
977 Ok(LeftProject {
978 id: project.id,
979 host_connection_id: project.host_connection_id,
980 host_user_id: project.host.user_id,
981 connection_ids: project.connection_ids(),
982 remove_collaborator,
983 })
984 }
985
986 #[allow(clippy::too_many_arguments)]
987 pub fn update_worktree(
988 &mut self,
989 connection_id: ConnectionId,
990 project_id: ProjectId,
991 worktree_id: u64,
992 worktree_root_name: &str,
993 removed_entries: &[u64],
994 updated_entries: &[proto::Entry],
995 scan_id: u64,
996 is_last_update: bool,
997 ) -> Result<Vec<ConnectionId>> {
998 let project = self.write_project(project_id, connection_id)?;
999
1000 let connection_ids = project.connection_ids();
1001 let mut worktree = project.worktrees.entry(worktree_id).or_default();
1002 worktree.root_name = worktree_root_name.to_string();
1003
1004 for entry_id in removed_entries {
1005 worktree.entries.remove(entry_id);
1006 }
1007
1008 for entry in updated_entries {
1009 worktree.entries.insert(entry.id, entry.clone());
1010 }
1011
1012 worktree.scan_id = scan_id;
1013 worktree.is_complete = is_last_update;
1014 Ok(connection_ids)
1015 }
1016
1017 fn build_participant_project(
1018 project_id: ProjectId,
1019 projects: &BTreeMap<ProjectId, Project>,
1020 ) -> Option<proto::ParticipantProject> {
1021 Some(proto::ParticipantProject {
1022 id: project_id.to_proto(),
1023 worktree_root_names: projects
1024 .get(&project_id)?
1025 .worktrees
1026 .values()
1027 .filter(|worktree| worktree.visible)
1028 .map(|worktree| worktree.root_name.clone())
1029 .collect(),
1030 })
1031 }
1032
1033 pub fn project_connection_ids(
1034 &self,
1035 project_id: ProjectId,
1036 acting_connection_id: ConnectionId,
1037 ) -> Result<Vec<ConnectionId>> {
1038 Ok(self
1039 .read_project(project_id, acting_connection_id)?
1040 .connection_ids())
1041 }
1042
1043 pub fn channel_connection_ids(&self, channel_id: ChannelId) -> Result<Vec<ConnectionId>> {
1044 Ok(self
1045 .channels
1046 .get(&channel_id)
1047 .ok_or_else(|| anyhow!("no such channel"))?
1048 .connection_ids())
1049 }
1050
1051 pub fn project(&self, project_id: ProjectId) -> Result<&Project> {
1052 self.projects
1053 .get(&project_id)
1054 .ok_or_else(|| anyhow!("no such project"))
1055 }
1056
1057 pub fn register_project_activity(
1058 &mut self,
1059 project_id: ProjectId,
1060 connection_id: ConnectionId,
1061 ) -> Result<()> {
1062 let project = self
1063 .projects
1064 .get_mut(&project_id)
1065 .ok_or_else(|| anyhow!("no such project"))?;
1066 let collaborator = if connection_id == project.host_connection_id {
1067 &mut project.host
1068 } else if let Some(guest) = project.guests.get_mut(&connection_id) {
1069 guest
1070 } else {
1071 return Err(anyhow!("no such project"))?;
1072 };
1073 collaborator.last_activity = Some(OffsetDateTime::now_utc());
1074 Ok(())
1075 }
1076
1077 pub fn projects(&self) -> impl Iterator<Item = (&ProjectId, &Project)> {
1078 self.projects.iter()
1079 }
1080
1081 pub fn read_project(
1082 &self,
1083 project_id: ProjectId,
1084 connection_id: ConnectionId,
1085 ) -> Result<&Project> {
1086 let project = self
1087 .projects
1088 .get(&project_id)
1089 .ok_or_else(|| anyhow!("no such project"))?;
1090 if project.host_connection_id == connection_id
1091 || project.guests.contains_key(&connection_id)
1092 {
1093 Ok(project)
1094 } else {
1095 Err(anyhow!("no such project"))?
1096 }
1097 }
1098
1099 fn write_project(
1100 &mut self,
1101 project_id: ProjectId,
1102 connection_id: ConnectionId,
1103 ) -> Result<&mut Project> {
1104 let project = self
1105 .projects
1106 .get_mut(&project_id)
1107 .ok_or_else(|| anyhow!("no such project"))?;
1108 if project.host_connection_id == connection_id
1109 || project.guests.contains_key(&connection_id)
1110 {
1111 Ok(project)
1112 } else {
1113 Err(anyhow!("no such project"))?
1114 }
1115 }
1116
1117 #[cfg(test)]
1118 pub fn check_invariants(&self) {
1119 for (connection_id, connection) in &self.connections {
1120 for project_id in &connection.projects {
1121 let project = &self.projects.get(project_id).unwrap();
1122 if project.host_connection_id != *connection_id {
1123 assert!(project.guests.contains_key(connection_id));
1124 }
1125
1126 for (worktree_id, worktree) in project.worktrees.iter() {
1127 let mut paths = HashMap::default();
1128 for entry in worktree.entries.values() {
1129 let prev_entry = paths.insert(&entry.path, entry);
1130 assert_eq!(
1131 prev_entry,
1132 None,
1133 "worktree {:?}, duplicate path for entries {:?} and {:?}",
1134 worktree_id,
1135 prev_entry.unwrap(),
1136 entry
1137 );
1138 }
1139 }
1140 }
1141 for channel_id in &connection.channels {
1142 let channel = self.channels.get(channel_id).unwrap();
1143 assert!(channel.connection_ids.contains(connection_id));
1144 }
1145 assert!(self
1146 .connected_users
1147 .get(&connection.user_id)
1148 .unwrap()
1149 .connection_ids
1150 .contains(connection_id));
1151 }
1152
1153 for (user_id, state) in &self.connected_users {
1154 for connection_id in &state.connection_ids {
1155 assert_eq!(
1156 self.connections.get(connection_id).unwrap().user_id,
1157 *user_id
1158 );
1159 }
1160
1161 if let Some(active_call) = state.active_call.as_ref() {
1162 if let Some(active_call_connection_id) = active_call.connection_id {
1163 assert!(
1164 state.connection_ids.contains(&active_call_connection_id),
1165 "call is active on a dead connection"
1166 );
1167 assert!(
1168 state.connection_ids.contains(&active_call_connection_id),
1169 "call is active on a dead connection"
1170 );
1171 }
1172 }
1173 }
1174
1175 for (room_id, room) in &self.rooms {
1176 for pending_user_id in &room.pending_participant_user_ids {
1177 assert!(
1178 self.connected_users
1179 .contains_key(&UserId::from_proto(*pending_user_id)),
1180 "call is active on a user that has disconnected"
1181 );
1182 }
1183
1184 for participant in &room.participants {
1185 assert!(
1186 self.connections
1187 .contains_key(&ConnectionId(participant.peer_id)),
1188 "room contains participant that has disconnected"
1189 );
1190
1191 for participant_project in &participant.projects {
1192 let project = &self.projects[&ProjectId::from_proto(participant_project.id)];
1193 assert_eq!(
1194 project.room_id, *room_id,
1195 "project was shared on a different room"
1196 );
1197 }
1198 }
1199
1200 assert!(
1201 !room.pending_participant_user_ids.is_empty() || !room.participants.is_empty(),
1202 "room can't be empty"
1203 );
1204 }
1205
1206 for (project_id, project) in &self.projects {
1207 let host_connection = self.connections.get(&project.host_connection_id).unwrap();
1208 assert!(host_connection.projects.contains(project_id));
1209
1210 for guest_connection_id in project.guests.keys() {
1211 let guest_connection = self.connections.get(guest_connection_id).unwrap();
1212 assert!(guest_connection.projects.contains(project_id));
1213 }
1214 assert_eq!(project.active_replica_ids.len(), project.guests.len());
1215 assert_eq!(
1216 project.active_replica_ids,
1217 project
1218 .guests
1219 .values()
1220 .map(|guest| guest.replica_id)
1221 .collect::<HashSet<_>>(),
1222 );
1223
1224 let room = &self.rooms[&project.room_id];
1225 let room_participant = room
1226 .participants
1227 .iter()
1228 .find(|participant| participant.peer_id == project.host_connection_id.0)
1229 .unwrap();
1230 assert!(
1231 room_participant
1232 .projects
1233 .iter()
1234 .any(|project| project.id == project_id.to_proto()),
1235 "project was not shared in room"
1236 );
1237 }
1238
1239 for (channel_id, channel) in &self.channels {
1240 for connection_id in &channel.connection_ids {
1241 let connection = self.connections.get(connection_id).unwrap();
1242 assert!(connection.channels.contains(channel_id));
1243 }
1244 }
1245 }
1246}
1247
1248impl Project {
1249 fn is_active_since(&self, start_time: OffsetDateTime) -> bool {
1250 self.guests
1251 .values()
1252 .chain([&self.host])
1253 .any(|collaborator| {
1254 collaborator
1255 .last_activity
1256 .map_or(false, |active_time| active_time > start_time)
1257 })
1258 }
1259
1260 pub fn guest_connection_ids(&self) -> Vec<ConnectionId> {
1261 self.guests.keys().copied().collect()
1262 }
1263
1264 pub fn connection_ids(&self) -> Vec<ConnectionId> {
1265 self.guests
1266 .keys()
1267 .copied()
1268 .chain(Some(self.host_connection_id))
1269 .collect()
1270 }
1271}
1272
1273impl Channel {
1274 fn connection_ids(&self) -> Vec<ConnectionId> {
1275 self.connection_ids.iter().copied().collect()
1276 }
1277}