1use crate::db::{self, ChannelId, ProjectId, UserId};
2use anyhow::{anyhow, Result};
3use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet};
4use rpc::{proto, ConnectionId};
5use serde::Serialize;
6use std::{mem, path::PathBuf, str, time::Duration};
7use time::OffsetDateTime;
8use tracing::instrument;
9use util::post_inc;
10
11pub type RoomId = u64;
12
13#[derive(Default, Serialize)]
14pub struct Store {
15 connections: BTreeMap<ConnectionId, ConnectionState>,
16 connected_users: BTreeMap<UserId, ConnectedUser>,
17 next_room_id: RoomId,
18 rooms: BTreeMap<RoomId, proto::Room>,
19 projects: BTreeMap<ProjectId, Project>,
20 #[serde(skip)]
21 channels: BTreeMap<ChannelId, Channel>,
22}
23
24#[derive(Default, Serialize)]
25struct ConnectedUser {
26 connection_ids: HashSet<ConnectionId>,
27 active_call: Option<Call>,
28}
29
30#[derive(Serialize)]
31struct ConnectionState {
32 user_id: UserId,
33 admin: bool,
34 projects: BTreeSet<ProjectId>,
35 channels: HashSet<ChannelId>,
36}
37
38#[derive(Copy, Clone, Eq, PartialEq, Serialize)]
39pub struct Call {
40 pub caller_user_id: UserId,
41 pub room_id: RoomId,
42 pub connection_id: Option<ConnectionId>,
43 pub initial_project_id: Option<ProjectId>,
44}
45
46#[derive(Serialize)]
47pub struct Project {
48 pub id: ProjectId,
49 pub room_id: RoomId,
50 pub host_connection_id: ConnectionId,
51 pub host: Collaborator,
52 pub guests: HashMap<ConnectionId, Collaborator>,
53 pub active_replica_ids: HashSet<ReplicaId>,
54 pub worktrees: BTreeMap<u64, Worktree>,
55 pub language_servers: Vec<proto::LanguageServer>,
56}
57
58#[derive(Serialize)]
59pub struct Collaborator {
60 pub replica_id: ReplicaId,
61 pub user_id: UserId,
62 #[serde(skip)]
63 pub last_activity: Option<OffsetDateTime>,
64 pub admin: bool,
65}
66
67#[derive(Default, Serialize)]
68pub struct Worktree {
69 pub root_name: String,
70 pub visible: bool,
71 #[serde(skip)]
72 pub entries: BTreeMap<u64, proto::Entry>,
73 #[serde(skip)]
74 pub diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
75 pub scan_id: u64,
76 pub is_complete: bool,
77}
78
79#[derive(Default)]
80pub struct Channel {
81 pub connection_ids: HashSet<ConnectionId>,
82}
83
84pub type ReplicaId = u16;
85
86#[derive(Default)]
87pub struct RemovedConnectionState {
88 pub user_id: UserId,
89 pub hosted_projects: HashMap<ProjectId, Project>,
90 pub guest_project_ids: HashSet<ProjectId>,
91 pub contact_ids: HashSet<UserId>,
92 pub room_id: Option<RoomId>,
93}
94
95pub struct LeftProject {
96 pub id: ProjectId,
97 pub host_user_id: UserId,
98 pub host_connection_id: ConnectionId,
99 pub connection_ids: Vec<ConnectionId>,
100 pub remove_collaborator: bool,
101}
102
103pub struct LeftRoom<'a> {
104 pub room: Option<&'a proto::Room>,
105 pub unshared_projects: Vec<Project>,
106 pub left_projects: Vec<LeftProject>,
107}
108
109#[derive(Copy, Clone)]
110pub struct Metrics {
111 pub connections: usize,
112 pub registered_projects: usize,
113 pub active_projects: usize,
114 pub shared_projects: usize,
115}
116
117impl Store {
118 pub fn metrics(&self) -> Metrics {
119 const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60);
120 let active_window_start = OffsetDateTime::now_utc() - ACTIVE_PROJECT_TIMEOUT;
121
122 let connections = self.connections.values().filter(|c| !c.admin).count();
123 let mut registered_projects = 0;
124 let mut active_projects = 0;
125 let mut shared_projects = 0;
126 for project in self.projects.values() {
127 if let Some(connection) = self.connections.get(&project.host_connection_id) {
128 if !connection.admin {
129 registered_projects += 1;
130 if project.is_active_since(active_window_start) {
131 active_projects += 1;
132 if !project.guests.is_empty() {
133 shared_projects += 1;
134 }
135 }
136 }
137 }
138 }
139
140 Metrics {
141 connections,
142 registered_projects,
143 active_projects,
144 shared_projects,
145 }
146 }
147
148 #[instrument(skip(self))]
149 pub fn add_connection(
150 &mut self,
151 connection_id: ConnectionId,
152 user_id: UserId,
153 admin: bool,
154 ) -> Option<proto::IncomingCall> {
155 self.connections.insert(
156 connection_id,
157 ConnectionState {
158 user_id,
159 admin,
160 projects: Default::default(),
161 channels: Default::default(),
162 },
163 );
164 let connected_user = self.connected_users.entry(user_id).or_default();
165 connected_user.connection_ids.insert(connection_id);
166 if let Some(active_call) = connected_user.active_call {
167 if active_call.connection_id.is_some() {
168 None
169 } else {
170 let room = self.room(active_call.room_id)?;
171 Some(proto::IncomingCall {
172 room_id: active_call.room_id,
173 caller_user_id: active_call.caller_user_id.to_proto(),
174 participant_user_ids: room
175 .participants
176 .iter()
177 .map(|participant| participant.user_id)
178 .collect(),
179 initial_project_id: active_call
180 .initial_project_id
181 .map(|project_id| project_id.to_proto()),
182 })
183 }
184 } else {
185 None
186 }
187 }
188
189 #[instrument(skip(self))]
190 pub fn remove_connection(
191 &mut self,
192 connection_id: ConnectionId,
193 ) -> Result<RemovedConnectionState> {
194 let connection = self
195 .connections
196 .get_mut(&connection_id)
197 .ok_or_else(|| anyhow!("no such connection"))?;
198
199 let user_id = connection.user_id;
200 let connection_projects = mem::take(&mut connection.projects);
201 let connection_channels = mem::take(&mut connection.channels);
202
203 let mut result = RemovedConnectionState {
204 user_id,
205 ..Default::default()
206 };
207
208 // Leave all channels.
209 for channel_id in connection_channels {
210 self.leave_channel(connection_id, channel_id);
211 }
212
213 // Unshare and leave all projects.
214 for project_id in connection_projects {
215 if let Ok((_, project)) = self.unshare_project(project_id, connection_id) {
216 result.hosted_projects.insert(project_id, project);
217 } else if self.leave_project(project_id, connection_id).is_ok() {
218 result.guest_project_ids.insert(project_id);
219 }
220 }
221
222 let connected_user = self.connected_users.get_mut(&user_id).unwrap();
223 connected_user.connection_ids.remove(&connection_id);
224 if let Some(active_call) = connected_user.active_call.as_ref() {
225 let room_id = active_call.room_id;
226 if let Some(room) = self.rooms.get_mut(&room_id) {
227 let prev_participant_count = room.participants.len();
228 room.participants
229 .retain(|participant| participant.peer_id != connection_id.0);
230 if prev_participant_count == room.participants.len() {
231 if connected_user.connection_ids.is_empty() {
232 room.pending_participant_user_ids
233 .retain(|pending_user_id| *pending_user_id != user_id.to_proto());
234 result.room_id = Some(room_id);
235 connected_user.active_call = None;
236 }
237 } else {
238 result.room_id = Some(room_id);
239 connected_user.active_call = None;
240 }
241
242 if room.participants.is_empty() && room.pending_participant_user_ids.is_empty() {
243 self.rooms.remove(&room_id);
244 }
245 } else {
246 tracing::error!("disconnected user claims to be in a room that does not exist");
247 connected_user.active_call = None;
248 }
249 }
250
251 if connected_user.connection_ids.is_empty() {
252 self.connected_users.remove(&user_id);
253 }
254
255 self.connections.remove(&connection_id).unwrap();
256
257 Ok(result)
258 }
259
260 #[cfg(test)]
261 pub fn channel(&self, id: ChannelId) -> Option<&Channel> {
262 self.channels.get(&id)
263 }
264
265 pub fn join_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
266 if let Some(connection) = self.connections.get_mut(&connection_id) {
267 connection.channels.insert(channel_id);
268 self.channels
269 .entry(channel_id)
270 .or_default()
271 .connection_ids
272 .insert(connection_id);
273 }
274 }
275
276 pub fn leave_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
277 if let Some(connection) = self.connections.get_mut(&connection_id) {
278 connection.channels.remove(&channel_id);
279 if let btree_map::Entry::Occupied(mut entry) = self.channels.entry(channel_id) {
280 entry.get_mut().connection_ids.remove(&connection_id);
281 if entry.get_mut().connection_ids.is_empty() {
282 entry.remove();
283 }
284 }
285 }
286 }
287
288 pub fn user_id_for_connection(&self, connection_id: ConnectionId) -> Result<UserId> {
289 Ok(self
290 .connections
291 .get(&connection_id)
292 .ok_or_else(|| anyhow!("unknown connection"))?
293 .user_id)
294 }
295
296 pub fn connection_ids_for_user(
297 &self,
298 user_id: UserId,
299 ) -> impl Iterator<Item = ConnectionId> + '_ {
300 self.connected_users
301 .get(&user_id)
302 .into_iter()
303 .map(|state| &state.connection_ids)
304 .flatten()
305 .copied()
306 }
307
308 pub fn is_user_online(&self, user_id: UserId) -> bool {
309 !self
310 .connected_users
311 .get(&user_id)
312 .unwrap_or(&Default::default())
313 .connection_ids
314 .is_empty()
315 }
316
317 fn is_user_busy(&self, user_id: UserId) -> bool {
318 self.connected_users
319 .get(&user_id)
320 .unwrap_or(&Default::default())
321 .active_call
322 .is_some()
323 }
324
325 pub fn build_initial_contacts_update(
326 &self,
327 contacts: Vec<db::Contact>,
328 ) -> proto::UpdateContacts {
329 let mut update = proto::UpdateContacts::default();
330
331 for contact in contacts {
332 match contact {
333 db::Contact::Accepted {
334 user_id,
335 should_notify,
336 } => {
337 update
338 .contacts
339 .push(self.contact_for_user(user_id, should_notify));
340 }
341 db::Contact::Outgoing { user_id } => {
342 update.outgoing_requests.push(user_id.to_proto())
343 }
344 db::Contact::Incoming {
345 user_id,
346 should_notify,
347 } => update
348 .incoming_requests
349 .push(proto::IncomingContactRequest {
350 requester_id: user_id.to_proto(),
351 should_notify,
352 }),
353 }
354 }
355
356 update
357 }
358
359 pub fn contact_for_user(&self, user_id: UserId, should_notify: bool) -> proto::Contact {
360 proto::Contact {
361 user_id: user_id.to_proto(),
362 online: self.is_user_online(user_id),
363 busy: self.is_user_busy(user_id),
364 should_notify,
365 }
366 }
367
368 pub fn create_room(&mut self, creator_connection_id: ConnectionId) -> Result<RoomId> {
369 let connection = self
370 .connections
371 .get_mut(&creator_connection_id)
372 .ok_or_else(|| anyhow!("no such connection"))?;
373 let connected_user = self
374 .connected_users
375 .get_mut(&connection.user_id)
376 .ok_or_else(|| anyhow!("no such connection"))?;
377 anyhow::ensure!(
378 connected_user.active_call.is_none(),
379 "can't create a room with an active call"
380 );
381
382 let mut room = proto::Room::default();
383 room.participants.push(proto::Participant {
384 user_id: connection.user_id.to_proto(),
385 peer_id: creator_connection_id.0,
386 projects: Default::default(),
387 location: Some(proto::ParticipantLocation {
388 variant: Some(proto::participant_location::Variant::External(
389 proto::participant_location::External {},
390 )),
391 }),
392 });
393
394 let room_id = post_inc(&mut self.next_room_id);
395 self.rooms.insert(room_id, room);
396 connected_user.active_call = Some(Call {
397 caller_user_id: connection.user_id,
398 room_id,
399 connection_id: Some(creator_connection_id),
400 initial_project_id: None,
401 });
402 Ok(room_id)
403 }
404
405 pub fn join_room(
406 &mut self,
407 room_id: RoomId,
408 connection_id: ConnectionId,
409 ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
410 let connection = self
411 .connections
412 .get_mut(&connection_id)
413 .ok_or_else(|| anyhow!("no such connection"))?;
414 let user_id = connection.user_id;
415 let recipient_connection_ids = self.connection_ids_for_user(user_id).collect::<Vec<_>>();
416
417 let connected_user = self
418 .connected_users
419 .get_mut(&user_id)
420 .ok_or_else(|| anyhow!("no such connection"))?;
421 let active_call = connected_user
422 .active_call
423 .as_mut()
424 .ok_or_else(|| anyhow!("not being called"))?;
425 anyhow::ensure!(
426 active_call.room_id == room_id && active_call.connection_id.is_none(),
427 "not being called on this room"
428 );
429
430 let room = self
431 .rooms
432 .get_mut(&room_id)
433 .ok_or_else(|| anyhow!("no such room"))?;
434 anyhow::ensure!(
435 room.pending_participant_user_ids
436 .contains(&user_id.to_proto()),
437 anyhow!("no such room")
438 );
439 room.pending_participant_user_ids
440 .retain(|pending| *pending != user_id.to_proto());
441 room.participants.push(proto::Participant {
442 user_id: user_id.to_proto(),
443 peer_id: connection_id.0,
444 projects: Default::default(),
445 location: Some(proto::ParticipantLocation {
446 variant: Some(proto::participant_location::Variant::External(
447 proto::participant_location::External {},
448 )),
449 }),
450 });
451 active_call.connection_id = Some(connection_id);
452
453 Ok((room, recipient_connection_ids))
454 }
455
456 pub fn leave_room(&mut self, room_id: RoomId, connection_id: ConnectionId) -> Result<LeftRoom> {
457 let connection = self
458 .connections
459 .get_mut(&connection_id)
460 .ok_or_else(|| anyhow!("no such connection"))?;
461 let user_id = connection.user_id;
462
463 let connected_user = self
464 .connected_users
465 .get(&user_id)
466 .ok_or_else(|| anyhow!("no such connection"))?;
467 anyhow::ensure!(
468 connected_user
469 .active_call
470 .map_or(false, |call| call.room_id == room_id
471 && call.connection_id == Some(connection_id)),
472 "cannot leave a room before joining it"
473 );
474
475 // Given that users can only join one room at a time, we can safely unshare
476 // and leave all projects associated with the connection.
477 let mut unshared_projects = Vec::new();
478 let mut left_projects = Vec::new();
479 for project_id in connection.projects.clone() {
480 if let Ok((_, project)) = self.unshare_project(project_id, connection_id) {
481 unshared_projects.push(project);
482 } else if let Ok(project) = self.leave_project(project_id, connection_id) {
483 left_projects.push(project);
484 }
485 }
486 self.connected_users.get_mut(&user_id).unwrap().active_call = None;
487
488 let room = self
489 .rooms
490 .get_mut(&room_id)
491 .ok_or_else(|| anyhow!("no such room"))?;
492 room.participants
493 .retain(|participant| participant.peer_id != connection_id.0);
494 if room.participants.is_empty() && room.pending_participant_user_ids.is_empty() {
495 self.rooms.remove(&room_id);
496 }
497
498 Ok(LeftRoom {
499 room: self.rooms.get(&room_id),
500 unshared_projects,
501 left_projects,
502 })
503 }
504
505 pub fn room(&self, room_id: RoomId) -> Option<&proto::Room> {
506 self.rooms.get(&room_id)
507 }
508
509 pub fn call(
510 &mut self,
511 room_id: RoomId,
512 recipient_user_id: UserId,
513 initial_project_id: Option<ProjectId>,
514 from_connection_id: ConnectionId,
515 ) -> Result<(&proto::Room, Vec<ConnectionId>, proto::IncomingCall)> {
516 let caller_user_id = self.user_id_for_connection(from_connection_id)?;
517
518 let recipient_connection_ids = self
519 .connection_ids_for_user(recipient_user_id)
520 .collect::<Vec<_>>();
521 let mut recipient = self
522 .connected_users
523 .get_mut(&recipient_user_id)
524 .ok_or_else(|| anyhow!("no such connection"))?;
525 anyhow::ensure!(
526 recipient.active_call.is_none(),
527 "recipient is already on another call"
528 );
529
530 let room = self
531 .rooms
532 .get_mut(&room_id)
533 .ok_or_else(|| anyhow!("no such room"))?;
534 anyhow::ensure!(
535 room.participants
536 .iter()
537 .any(|participant| participant.peer_id == from_connection_id.0),
538 "no such room"
539 );
540 anyhow::ensure!(
541 room.pending_participant_user_ids
542 .iter()
543 .all(|user_id| UserId::from_proto(*user_id) != recipient_user_id),
544 "cannot call the same user more than once"
545 );
546 room.pending_participant_user_ids
547 .push(recipient_user_id.to_proto());
548
549 if let Some(initial_project_id) = initial_project_id {
550 let project = self
551 .projects
552 .get(&initial_project_id)
553 .ok_or_else(|| anyhow!("no such project"))?;
554 anyhow::ensure!(project.room_id == room_id, "no such project");
555 }
556
557 recipient.active_call = Some(Call {
558 caller_user_id,
559 room_id,
560 connection_id: None,
561 initial_project_id,
562 });
563
564 Ok((
565 room,
566 recipient_connection_ids,
567 proto::IncomingCall {
568 room_id,
569 caller_user_id: caller_user_id.to_proto(),
570 participant_user_ids: room
571 .participants
572 .iter()
573 .map(|participant| participant.user_id)
574 .collect(),
575 initial_project_id: initial_project_id.map(|project_id| project_id.to_proto()),
576 },
577 ))
578 }
579
580 pub fn call_failed(&mut self, room_id: RoomId, to_user_id: UserId) -> Result<&proto::Room> {
581 let mut recipient = self
582 .connected_users
583 .get_mut(&to_user_id)
584 .ok_or_else(|| anyhow!("no such connection"))?;
585 anyhow::ensure!(recipient
586 .active_call
587 .map_or(false, |call| call.room_id == room_id
588 && call.connection_id.is_none()));
589 recipient.active_call = None;
590 let room = self
591 .rooms
592 .get_mut(&room_id)
593 .ok_or_else(|| anyhow!("no such room"))?;
594 room.pending_participant_user_ids
595 .retain(|user_id| UserId::from_proto(*user_id) != to_user_id);
596 Ok(room)
597 }
598
599 pub fn cancel_call(
600 &mut self,
601 room_id: RoomId,
602 recipient_user_id: UserId,
603 canceller_connection_id: ConnectionId,
604 ) -> Result<(&proto::Room, HashSet<ConnectionId>)> {
605 let canceller_user_id = self.user_id_for_connection(canceller_connection_id)?;
606 let canceller = self
607 .connected_users
608 .get(&canceller_user_id)
609 .ok_or_else(|| anyhow!("no such connection"))?;
610 let recipient = self
611 .connected_users
612 .get(&recipient_user_id)
613 .ok_or_else(|| anyhow!("no such connection"))?;
614 let canceller_active_call = canceller
615 .active_call
616 .as_ref()
617 .ok_or_else(|| anyhow!("no active call"))?;
618 let recipient_active_call = recipient
619 .active_call
620 .as_ref()
621 .ok_or_else(|| anyhow!("no active call for recipient"))?;
622
623 anyhow::ensure!(
624 canceller_active_call.room_id == room_id,
625 "users are on different calls"
626 );
627 anyhow::ensure!(
628 recipient_active_call.room_id == room_id,
629 "users are on different calls"
630 );
631 anyhow::ensure!(
632 recipient_active_call.connection_id.is_none(),
633 "recipient has already answered"
634 );
635 let room_id = recipient_active_call.room_id;
636 let room = self
637 .rooms
638 .get_mut(&room_id)
639 .ok_or_else(|| anyhow!("no such room"))?;
640 room.pending_participant_user_ids
641 .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
642
643 let recipient = self.connected_users.get_mut(&recipient_user_id).unwrap();
644 recipient.active_call.take();
645
646 Ok((room, recipient.connection_ids.clone()))
647 }
648
649 pub fn decline_call(
650 &mut self,
651 room_id: RoomId,
652 recipient_connection_id: ConnectionId,
653 ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
654 let recipient_user_id = self.user_id_for_connection(recipient_connection_id)?;
655 let recipient = self
656 .connected_users
657 .get_mut(&recipient_user_id)
658 .ok_or_else(|| anyhow!("no such connection"))?;
659 if let Some(active_call) = recipient.active_call.take() {
660 anyhow::ensure!(active_call.room_id == room_id, "no such room");
661 let recipient_connection_ids = self
662 .connection_ids_for_user(recipient_user_id)
663 .collect::<Vec<_>>();
664 let room = self
665 .rooms
666 .get_mut(&active_call.room_id)
667 .ok_or_else(|| anyhow!("no such room"))?;
668 room.pending_participant_user_ids
669 .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
670 Ok((room, recipient_connection_ids))
671 } else {
672 Err(anyhow!("user is not being called"))
673 }
674 }
675
676 pub fn update_participant_location(
677 &mut self,
678 room_id: RoomId,
679 location: proto::ParticipantLocation,
680 connection_id: ConnectionId,
681 ) -> Result<&proto::Room> {
682 let room = self
683 .rooms
684 .get_mut(&room_id)
685 .ok_or_else(|| anyhow!("no such room"))?;
686 if let Some(proto::participant_location::Variant::Project(project)) =
687 location.variant.as_ref()
688 {
689 anyhow::ensure!(
690 room.participants
691 .iter()
692 .flat_map(|participant| &participant.projects)
693 .any(|participant_project| participant_project.id == project.id),
694 "no such project"
695 );
696 }
697
698 let participant = room
699 .participants
700 .iter_mut()
701 .find(|participant| participant.peer_id == connection_id.0)
702 .ok_or_else(|| anyhow!("no such room"))?;
703 participant.location = Some(location);
704
705 Ok(room)
706 }
707
708 pub fn share_project(
709 &mut self,
710 room_id: RoomId,
711 project_id: ProjectId,
712 worktrees: Vec<proto::WorktreeMetadata>,
713 host_connection_id: ConnectionId,
714 ) -> Result<&proto::Room> {
715 let connection = self
716 .connections
717 .get_mut(&host_connection_id)
718 .ok_or_else(|| anyhow!("no such connection"))?;
719
720 let room = self
721 .rooms
722 .get_mut(&room_id)
723 .ok_or_else(|| anyhow!("no such room"))?;
724 let participant = room
725 .participants
726 .iter_mut()
727 .find(|participant| participant.peer_id == host_connection_id.0)
728 .ok_or_else(|| anyhow!("no such room"))?;
729 participant.projects.push(proto::ParticipantProject {
730 id: project_id.to_proto(),
731 worktree_root_names: worktrees
732 .iter()
733 .filter(|worktree| worktree.visible)
734 .map(|worktree| worktree.root_name.clone())
735 .collect(),
736 });
737
738 connection.projects.insert(project_id);
739 self.projects.insert(
740 project_id,
741 Project {
742 id: project_id,
743 room_id,
744 host_connection_id,
745 host: Collaborator {
746 user_id: connection.user_id,
747 replica_id: 0,
748 last_activity: None,
749 admin: connection.admin,
750 },
751 guests: Default::default(),
752 active_replica_ids: Default::default(),
753 worktrees: worktrees
754 .into_iter()
755 .map(|worktree| {
756 (
757 worktree.id,
758 Worktree {
759 root_name: worktree.root_name,
760 visible: worktree.visible,
761 ..Default::default()
762 },
763 )
764 })
765 .collect(),
766 language_servers: Default::default(),
767 },
768 );
769
770 Ok(room)
771 }
772
773 pub fn unshare_project(
774 &mut self,
775 project_id: ProjectId,
776 connection_id: ConnectionId,
777 ) -> Result<(&proto::Room, Project)> {
778 match self.projects.entry(project_id) {
779 btree_map::Entry::Occupied(e) => {
780 if e.get().host_connection_id == connection_id {
781 let project = e.remove();
782
783 if let Some(host_connection) = self.connections.get_mut(&connection_id) {
784 host_connection.projects.remove(&project_id);
785 }
786
787 for guest_connection in project.guests.keys() {
788 if let Some(connection) = self.connections.get_mut(guest_connection) {
789 connection.projects.remove(&project_id);
790 }
791 }
792
793 let room = self
794 .rooms
795 .get_mut(&project.room_id)
796 .ok_or_else(|| anyhow!("no such room"))?;
797 let participant = room
798 .participants
799 .iter_mut()
800 .find(|participant| participant.peer_id == connection_id.0)
801 .ok_or_else(|| anyhow!("no such room"))?;
802 participant
803 .projects
804 .retain(|project| project.id != project_id.to_proto());
805
806 Ok((room, project))
807 } else {
808 Err(anyhow!("no such project"))?
809 }
810 }
811 btree_map::Entry::Vacant(_) => Err(anyhow!("no such project"))?,
812 }
813 }
814
815 pub fn update_project(
816 &mut self,
817 project_id: ProjectId,
818 worktrees: &[proto::WorktreeMetadata],
819 connection_id: ConnectionId,
820 ) -> Result<&proto::Room> {
821 let project = self
822 .projects
823 .get_mut(&project_id)
824 .ok_or_else(|| anyhow!("no such project"))?;
825 if project.host_connection_id == connection_id {
826 let mut old_worktrees = mem::take(&mut project.worktrees);
827 for worktree in worktrees {
828 if let Some(old_worktree) = old_worktrees.remove(&worktree.id) {
829 project.worktrees.insert(worktree.id, old_worktree);
830 } else {
831 project.worktrees.insert(
832 worktree.id,
833 Worktree {
834 root_name: worktree.root_name.clone(),
835 visible: worktree.visible,
836 ..Default::default()
837 },
838 );
839 }
840 }
841
842 let room = self
843 .rooms
844 .get_mut(&project.room_id)
845 .ok_or_else(|| anyhow!("no such room"))?;
846 let participant_project = room
847 .participants
848 .iter_mut()
849 .flat_map(|participant| &mut participant.projects)
850 .find(|project| project.id == project_id.to_proto())
851 .ok_or_else(|| anyhow!("no such project"))?;
852 participant_project.worktree_root_names = worktrees
853 .iter()
854 .filter(|worktree| worktree.visible)
855 .map(|worktree| worktree.root_name.clone())
856 .collect();
857
858 Ok(room)
859 } else {
860 Err(anyhow!("no such project"))?
861 }
862 }
863
864 pub fn update_diagnostic_summary(
865 &mut self,
866 project_id: ProjectId,
867 worktree_id: u64,
868 connection_id: ConnectionId,
869 summary: proto::DiagnosticSummary,
870 ) -> Result<Vec<ConnectionId>> {
871 let project = self
872 .projects
873 .get_mut(&project_id)
874 .ok_or_else(|| anyhow!("no such project"))?;
875 if project.host_connection_id == connection_id {
876 let worktree = project
877 .worktrees
878 .get_mut(&worktree_id)
879 .ok_or_else(|| anyhow!("no such worktree"))?;
880 worktree
881 .diagnostic_summaries
882 .insert(summary.path.clone().into(), summary);
883 return Ok(project.connection_ids());
884 }
885
886 Err(anyhow!("no such worktree"))?
887 }
888
889 pub fn start_language_server(
890 &mut self,
891 project_id: ProjectId,
892 connection_id: ConnectionId,
893 language_server: proto::LanguageServer,
894 ) -> Result<Vec<ConnectionId>> {
895 let project = self
896 .projects
897 .get_mut(&project_id)
898 .ok_or_else(|| anyhow!("no such project"))?;
899 if project.host_connection_id == connection_id {
900 project.language_servers.push(language_server);
901 return Ok(project.connection_ids());
902 }
903
904 Err(anyhow!("no such project"))?
905 }
906
907 pub fn join_project(
908 &mut self,
909 requester_connection_id: ConnectionId,
910 project_id: ProjectId,
911 ) -> Result<(&Project, ReplicaId)> {
912 let connection = self
913 .connections
914 .get_mut(&requester_connection_id)
915 .ok_or_else(|| anyhow!("no such connection"))?;
916 let user = self
917 .connected_users
918 .get(&connection.user_id)
919 .ok_or_else(|| anyhow!("no such connection"))?;
920 let active_call = user.active_call.ok_or_else(|| anyhow!("no such project"))?;
921 anyhow::ensure!(
922 active_call.connection_id == Some(requester_connection_id),
923 "no such project"
924 );
925
926 let project = self
927 .projects
928 .get_mut(&project_id)
929 .ok_or_else(|| anyhow!("no such project"))?;
930 anyhow::ensure!(project.room_id == active_call.room_id, "no such project");
931
932 connection.projects.insert(project_id);
933 let mut replica_id = 1;
934 while project.active_replica_ids.contains(&replica_id) {
935 replica_id += 1;
936 }
937 project.active_replica_ids.insert(replica_id);
938 project.guests.insert(
939 requester_connection_id,
940 Collaborator {
941 replica_id,
942 user_id: connection.user_id,
943 last_activity: Some(OffsetDateTime::now_utc()),
944 admin: connection.admin,
945 },
946 );
947
948 project.host.last_activity = Some(OffsetDateTime::now_utc());
949 Ok((project, replica_id))
950 }
951
952 pub fn leave_project(
953 &mut self,
954 project_id: ProjectId,
955 connection_id: ConnectionId,
956 ) -> Result<LeftProject> {
957 let project = self
958 .projects
959 .get_mut(&project_id)
960 .ok_or_else(|| anyhow!("no such project"))?;
961
962 // If the connection leaving the project is a collaborator, remove it.
963 let remove_collaborator = if let Some(guest) = project.guests.remove(&connection_id) {
964 project.active_replica_ids.remove(&guest.replica_id);
965 true
966 } else {
967 false
968 };
969
970 if let Some(connection) = self.connections.get_mut(&connection_id) {
971 connection.projects.remove(&project_id);
972 }
973
974 Ok(LeftProject {
975 id: project.id,
976 host_connection_id: project.host_connection_id,
977 host_user_id: project.host.user_id,
978 connection_ids: project.connection_ids(),
979 remove_collaborator,
980 })
981 }
982
983 #[allow(clippy::too_many_arguments)]
984 pub fn update_worktree(
985 &mut self,
986 connection_id: ConnectionId,
987 project_id: ProjectId,
988 worktree_id: u64,
989 worktree_root_name: &str,
990 removed_entries: &[u64],
991 updated_entries: &[proto::Entry],
992 scan_id: u64,
993 is_last_update: bool,
994 ) -> Result<Vec<ConnectionId>> {
995 let project = self.write_project(project_id, connection_id)?;
996
997 let connection_ids = project.connection_ids();
998 let mut worktree = project.worktrees.entry(worktree_id).or_default();
999 worktree.root_name = worktree_root_name.to_string();
1000
1001 for entry_id in removed_entries {
1002 worktree.entries.remove(entry_id);
1003 }
1004
1005 for entry in updated_entries {
1006 worktree.entries.insert(entry.id, entry.clone());
1007 }
1008
1009 worktree.scan_id = scan_id;
1010 worktree.is_complete = is_last_update;
1011 Ok(connection_ids)
1012 }
1013
1014 pub fn project_connection_ids(
1015 &self,
1016 project_id: ProjectId,
1017 acting_connection_id: ConnectionId,
1018 ) -> Result<Vec<ConnectionId>> {
1019 Ok(self
1020 .read_project(project_id, acting_connection_id)?
1021 .connection_ids())
1022 }
1023
1024 pub fn channel_connection_ids(&self, channel_id: ChannelId) -> Result<Vec<ConnectionId>> {
1025 Ok(self
1026 .channels
1027 .get(&channel_id)
1028 .ok_or_else(|| anyhow!("no such channel"))?
1029 .connection_ids())
1030 }
1031
1032 pub fn project(&self, project_id: ProjectId) -> Result<&Project> {
1033 self.projects
1034 .get(&project_id)
1035 .ok_or_else(|| anyhow!("no such project"))
1036 }
1037
1038 pub fn register_project_activity(
1039 &mut self,
1040 project_id: ProjectId,
1041 connection_id: ConnectionId,
1042 ) -> Result<()> {
1043 let project = self
1044 .projects
1045 .get_mut(&project_id)
1046 .ok_or_else(|| anyhow!("no such project"))?;
1047 let collaborator = if connection_id == project.host_connection_id {
1048 &mut project.host
1049 } else if let Some(guest) = project.guests.get_mut(&connection_id) {
1050 guest
1051 } else {
1052 return Err(anyhow!("no such project"))?;
1053 };
1054 collaborator.last_activity = Some(OffsetDateTime::now_utc());
1055 Ok(())
1056 }
1057
1058 pub fn projects(&self) -> impl Iterator<Item = (&ProjectId, &Project)> {
1059 self.projects.iter()
1060 }
1061
1062 pub fn read_project(
1063 &self,
1064 project_id: ProjectId,
1065 connection_id: ConnectionId,
1066 ) -> Result<&Project> {
1067 let project = self
1068 .projects
1069 .get(&project_id)
1070 .ok_or_else(|| anyhow!("no such project"))?;
1071 if project.host_connection_id == connection_id
1072 || project.guests.contains_key(&connection_id)
1073 {
1074 Ok(project)
1075 } else {
1076 Err(anyhow!("no such project"))?
1077 }
1078 }
1079
1080 fn write_project(
1081 &mut self,
1082 project_id: ProjectId,
1083 connection_id: ConnectionId,
1084 ) -> Result<&mut Project> {
1085 let project = self
1086 .projects
1087 .get_mut(&project_id)
1088 .ok_or_else(|| anyhow!("no such project"))?;
1089 if project.host_connection_id == connection_id
1090 || project.guests.contains_key(&connection_id)
1091 {
1092 Ok(project)
1093 } else {
1094 Err(anyhow!("no such project"))?
1095 }
1096 }
1097
1098 #[cfg(test)]
1099 pub fn check_invariants(&self) {
1100 for (connection_id, connection) in &self.connections {
1101 for project_id in &connection.projects {
1102 let project = &self.projects.get(project_id).unwrap();
1103 if project.host_connection_id != *connection_id {
1104 assert!(project.guests.contains_key(connection_id));
1105 }
1106
1107 for (worktree_id, worktree) in project.worktrees.iter() {
1108 let mut paths = HashMap::default();
1109 for entry in worktree.entries.values() {
1110 let prev_entry = paths.insert(&entry.path, entry);
1111 assert_eq!(
1112 prev_entry,
1113 None,
1114 "worktree {:?}, duplicate path for entries {:?} and {:?}",
1115 worktree_id,
1116 prev_entry.unwrap(),
1117 entry
1118 );
1119 }
1120 }
1121 }
1122 for channel_id in &connection.channels {
1123 let channel = self.channels.get(channel_id).unwrap();
1124 assert!(channel.connection_ids.contains(connection_id));
1125 }
1126 assert!(self
1127 .connected_users
1128 .get(&connection.user_id)
1129 .unwrap()
1130 .connection_ids
1131 .contains(connection_id));
1132 }
1133
1134 for (user_id, state) in &self.connected_users {
1135 for connection_id in &state.connection_ids {
1136 assert_eq!(
1137 self.connections.get(connection_id).unwrap().user_id,
1138 *user_id
1139 );
1140 }
1141
1142 if let Some(active_call) = state.active_call.as_ref() {
1143 if let Some(active_call_connection_id) = active_call.connection_id {
1144 assert!(
1145 state.connection_ids.contains(&active_call_connection_id),
1146 "call is active on a dead connection"
1147 );
1148 assert!(
1149 state.connection_ids.contains(&active_call_connection_id),
1150 "call is active on a dead connection"
1151 );
1152 }
1153 }
1154 }
1155
1156 for (room_id, room) in &self.rooms {
1157 for pending_user_id in &room.pending_participant_user_ids {
1158 assert!(
1159 self.connected_users
1160 .contains_key(&UserId::from_proto(*pending_user_id)),
1161 "call is active on a user that has disconnected"
1162 );
1163 }
1164
1165 for participant in &room.participants {
1166 assert!(
1167 self.connections
1168 .contains_key(&ConnectionId(participant.peer_id)),
1169 "room contains participant that has disconnected"
1170 );
1171
1172 for participant_project in &participant.projects {
1173 let project = &self.projects[&ProjectId::from_proto(participant_project.id)];
1174 assert_eq!(
1175 project.room_id, *room_id,
1176 "project was shared on a different room"
1177 );
1178 }
1179 }
1180
1181 assert!(
1182 !room.pending_participant_user_ids.is_empty() || !room.participants.is_empty(),
1183 "room can't be empty"
1184 );
1185 }
1186
1187 for (project_id, project) in &self.projects {
1188 let host_connection = self.connections.get(&project.host_connection_id).unwrap();
1189 assert!(host_connection.projects.contains(project_id));
1190
1191 for guest_connection_id in project.guests.keys() {
1192 let guest_connection = self.connections.get(guest_connection_id).unwrap();
1193 assert!(guest_connection.projects.contains(project_id));
1194 }
1195 assert_eq!(project.active_replica_ids.len(), project.guests.len(),);
1196 assert_eq!(
1197 project.active_replica_ids,
1198 project
1199 .guests
1200 .values()
1201 .map(|guest| guest.replica_id)
1202 .collect::<HashSet<_>>(),
1203 );
1204
1205 let room = &self.rooms[&project.room_id];
1206 let room_participant = room
1207 .participants
1208 .iter()
1209 .find(|participant| participant.peer_id == project.host_connection_id.0)
1210 .unwrap();
1211 assert!(
1212 room_participant
1213 .projects
1214 .iter()
1215 .any(|project| project.id == project_id.to_proto()),
1216 "project was not shared in room"
1217 );
1218 }
1219
1220 for (channel_id, channel) in &self.channels {
1221 for connection_id in &channel.connection_ids {
1222 let connection = self.connections.get(connection_id).unwrap();
1223 assert!(connection.channels.contains(channel_id));
1224 }
1225 }
1226 }
1227}
1228
1229impl Project {
1230 fn is_active_since(&self, start_time: OffsetDateTime) -> bool {
1231 self.guests
1232 .values()
1233 .chain([&self.host])
1234 .any(|collaborator| {
1235 collaborator
1236 .last_activity
1237 .map_or(false, |active_time| active_time > start_time)
1238 })
1239 }
1240
1241 pub fn guest_connection_ids(&self) -> Vec<ConnectionId> {
1242 self.guests.keys().copied().collect()
1243 }
1244
1245 pub fn connection_ids(&self) -> Vec<ConnectionId> {
1246 self.guests
1247 .keys()
1248 .copied()
1249 .chain(Some(self.host_connection_id))
1250 .collect()
1251 }
1252}
1253
1254impl Channel {
1255 fn connection_ids(&self) -> Vec<ConnectionId> {
1256 self.connection_ids.iter().copied().collect()
1257 }
1258}