1use crate::db::{self, ChannelId, ProjectId, UserId};
2use anyhow::{anyhow, Result};
3use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet};
4use rpc::{proto, ConnectionId};
5use serde::Serialize;
6use std::{mem, path::PathBuf, str, time::Duration};
7use time::OffsetDateTime;
8use tracing::instrument;
9use util::post_inc;
10
11pub type RoomId = u64;
12
13#[derive(Default, Serialize)]
14pub struct Store {
15 connections: BTreeMap<ConnectionId, ConnectionState>,
16 connected_users: BTreeMap<UserId, ConnectedUser>,
17 next_room_id: RoomId,
18 rooms: BTreeMap<RoomId, proto::Room>,
19 projects: BTreeMap<ProjectId, Project>,
20 #[serde(skip)]
21 channels: BTreeMap<ChannelId, Channel>,
22}
23
24#[derive(Default, Serialize)]
25struct ConnectedUser {
26 connection_ids: HashSet<ConnectionId>,
27 active_call: Option<Call>,
28}
29
30#[derive(Serialize)]
31struct ConnectionState {
32 user_id: UserId,
33 admin: bool,
34 projects: BTreeSet<ProjectId>,
35 channels: HashSet<ChannelId>,
36}
37
38#[derive(Copy, Clone, Eq, PartialEq, Serialize)]
39pub struct Call {
40 pub caller_user_id: UserId,
41 pub room_id: RoomId,
42 pub connection_id: Option<ConnectionId>,
43 pub initial_project_id: Option<ProjectId>,
44}
45
46#[derive(Serialize)]
47pub struct Project {
48 pub id: ProjectId,
49 pub room_id: RoomId,
50 pub host_connection_id: ConnectionId,
51 pub host: Collaborator,
52 pub guests: HashMap<ConnectionId, Collaborator>,
53 pub active_replica_ids: HashSet<ReplicaId>,
54 pub worktrees: BTreeMap<u64, Worktree>,
55 pub language_servers: Vec<proto::LanguageServer>,
56}
57
58#[derive(Serialize)]
59pub struct Collaborator {
60 pub replica_id: ReplicaId,
61 pub user_id: UserId,
62 #[serde(skip)]
63 pub last_activity: Option<OffsetDateTime>,
64 pub admin: bool,
65}
66
67#[derive(Default, Serialize)]
68pub struct Worktree {
69 pub root_name: String,
70 pub visible: bool,
71 #[serde(skip)]
72 pub entries: BTreeMap<u64, proto::Entry>,
73 #[serde(skip)]
74 pub diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
75 pub scan_id: u64,
76 pub is_complete: bool,
77}
78
79#[derive(Default)]
80pub struct Channel {
81 pub connection_ids: HashSet<ConnectionId>,
82}
83
84pub type ReplicaId = u16;
85
86#[derive(Default)]
87pub struct RemovedConnectionState {
88 pub user_id: UserId,
89 pub hosted_projects: HashMap<ProjectId, Project>,
90 pub guest_project_ids: HashSet<ProjectId>,
91 pub contact_ids: HashSet<UserId>,
92 pub room_id: Option<RoomId>,
93}
94
95pub struct LeftProject {
96 pub id: ProjectId,
97 pub host_user_id: UserId,
98 pub host_connection_id: ConnectionId,
99 pub connection_ids: Vec<ConnectionId>,
100 pub remove_collaborator: bool,
101}
102
103pub struct LeftRoom<'a> {
104 pub room: Option<&'a proto::Room>,
105 pub unshared_projects: Vec<Project>,
106 pub left_projects: Vec<LeftProject>,
107}
108
109#[derive(Copy, Clone)]
110pub struct Metrics {
111 pub connections: usize,
112 pub registered_projects: usize,
113 pub active_projects: usize,
114 pub shared_projects: usize,
115}
116
117impl Store {
118 pub fn metrics(&self) -> Metrics {
119 const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60);
120 let active_window_start = OffsetDateTime::now_utc() - ACTIVE_PROJECT_TIMEOUT;
121
122 let connections = self.connections.values().filter(|c| !c.admin).count();
123 let mut registered_projects = 0;
124 let mut active_projects = 0;
125 let mut shared_projects = 0;
126 for project in self.projects.values() {
127 if let Some(connection) = self.connections.get(&project.host_connection_id) {
128 if !connection.admin {
129 registered_projects += 1;
130 if project.is_active_since(active_window_start) {
131 active_projects += 1;
132 if !project.guests.is_empty() {
133 shared_projects += 1;
134 }
135 }
136 }
137 }
138 }
139
140 Metrics {
141 connections,
142 registered_projects,
143 active_projects,
144 shared_projects,
145 }
146 }
147
148 #[instrument(skip(self))]
149 pub fn add_connection(
150 &mut self,
151 connection_id: ConnectionId,
152 user_id: UserId,
153 admin: bool,
154 ) -> Option<proto::IncomingCall> {
155 self.connections.insert(
156 connection_id,
157 ConnectionState {
158 user_id,
159 admin,
160 projects: Default::default(),
161 channels: Default::default(),
162 },
163 );
164 let connected_user = self.connected_users.entry(user_id).or_default();
165 connected_user.connection_ids.insert(connection_id);
166 if let Some(active_call) = connected_user.active_call {
167 if active_call.connection_id.is_some() {
168 None
169 } else {
170 let room = self.room(active_call.room_id)?;
171 Some(proto::IncomingCall {
172 room_id: active_call.room_id,
173 caller_user_id: active_call.caller_user_id.to_proto(),
174 participant_user_ids: room
175 .participants
176 .iter()
177 .map(|participant| participant.user_id)
178 .collect(),
179 initial_project_id: active_call
180 .initial_project_id
181 .map(|project_id| project_id.to_proto()),
182 })
183 }
184 } else {
185 None
186 }
187 }
188
189 #[instrument(skip(self))]
190 pub fn remove_connection(
191 &mut self,
192 connection_id: ConnectionId,
193 ) -> Result<RemovedConnectionState> {
194 let connection = self
195 .connections
196 .get_mut(&connection_id)
197 .ok_or_else(|| anyhow!("no such connection"))?;
198
199 let user_id = connection.user_id;
200 let connection_projects = mem::take(&mut connection.projects);
201 let connection_channels = mem::take(&mut connection.channels);
202
203 let mut result = RemovedConnectionState {
204 user_id,
205 ..Default::default()
206 };
207
208 // Leave all channels.
209 for channel_id in connection_channels {
210 self.leave_channel(connection_id, channel_id);
211 }
212
213 // Unshare and leave all projects.
214 for project_id in connection_projects {
215 if let Ok((_, project)) = self.unshare_project(project_id, connection_id) {
216 result.hosted_projects.insert(project_id, project);
217 } else if self.leave_project(project_id, connection_id).is_ok() {
218 result.guest_project_ids.insert(project_id);
219 }
220 }
221
222 let connected_user = self.connected_users.get_mut(&user_id).unwrap();
223 connected_user.connection_ids.remove(&connection_id);
224 if let Some(active_call) = connected_user.active_call.as_ref() {
225 let room_id = active_call.room_id;
226 if let Some(room) = self.rooms.get_mut(&room_id) {
227 let prev_participant_count = room.participants.len();
228 room.participants
229 .retain(|participant| participant.peer_id != connection_id.0);
230 if prev_participant_count == room.participants.len() {
231 if connected_user.connection_ids.is_empty() {
232 room.pending_user_ids
233 .retain(|pending_user_id| *pending_user_id != user_id.to_proto());
234 result.room_id = Some(room_id);
235 connected_user.active_call = None;
236 }
237 } else {
238 result.room_id = Some(room_id);
239 connected_user.active_call = None;
240 }
241
242 if room.participants.is_empty() && room.pending_user_ids.is_empty() {
243 self.rooms.remove(&room_id);
244 }
245 } else {
246 tracing::error!("disconnected user claims to be in a room that does not exist");
247 connected_user.active_call = None;
248 }
249 }
250
251 if connected_user.connection_ids.is_empty() {
252 self.connected_users.remove(&user_id);
253 }
254
255 self.connections.remove(&connection_id).unwrap();
256
257 Ok(result)
258 }
259
260 #[cfg(test)]
261 pub fn channel(&self, id: ChannelId) -> Option<&Channel> {
262 self.channels.get(&id)
263 }
264
265 pub fn join_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
266 if let Some(connection) = self.connections.get_mut(&connection_id) {
267 connection.channels.insert(channel_id);
268 self.channels
269 .entry(channel_id)
270 .or_default()
271 .connection_ids
272 .insert(connection_id);
273 }
274 }
275
276 pub fn leave_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
277 if let Some(connection) = self.connections.get_mut(&connection_id) {
278 connection.channels.remove(&channel_id);
279 if let btree_map::Entry::Occupied(mut entry) = self.channels.entry(channel_id) {
280 entry.get_mut().connection_ids.remove(&connection_id);
281 if entry.get_mut().connection_ids.is_empty() {
282 entry.remove();
283 }
284 }
285 }
286 }
287
288 pub fn user_id_for_connection(&self, connection_id: ConnectionId) -> Result<UserId> {
289 Ok(self
290 .connections
291 .get(&connection_id)
292 .ok_or_else(|| anyhow!("unknown connection"))?
293 .user_id)
294 }
295
296 pub fn connection_ids_for_user(
297 &self,
298 user_id: UserId,
299 ) -> impl Iterator<Item = ConnectionId> + '_ {
300 self.connected_users
301 .get(&user_id)
302 .into_iter()
303 .map(|state| &state.connection_ids)
304 .flatten()
305 .copied()
306 }
307
308 pub fn is_user_online(&self, user_id: UserId) -> bool {
309 !self
310 .connected_users
311 .get(&user_id)
312 .unwrap_or(&Default::default())
313 .connection_ids
314 .is_empty()
315 }
316
317 pub fn build_initial_contacts_update(
318 &self,
319 contacts: Vec<db::Contact>,
320 ) -> proto::UpdateContacts {
321 let mut update = proto::UpdateContacts::default();
322
323 for contact in contacts {
324 match contact {
325 db::Contact::Accepted {
326 user_id,
327 should_notify,
328 } => {
329 update
330 .contacts
331 .push(self.contact_for_user(user_id, should_notify));
332 }
333 db::Contact::Outgoing { user_id } => {
334 update.outgoing_requests.push(user_id.to_proto())
335 }
336 db::Contact::Incoming {
337 user_id,
338 should_notify,
339 } => update
340 .incoming_requests
341 .push(proto::IncomingContactRequest {
342 requester_id: user_id.to_proto(),
343 should_notify,
344 }),
345 }
346 }
347
348 update
349 }
350
351 pub fn contact_for_user(&self, user_id: UserId, should_notify: bool) -> proto::Contact {
352 proto::Contact {
353 user_id: user_id.to_proto(),
354 online: self.is_user_online(user_id),
355 should_notify,
356 }
357 }
358
359 pub fn create_room(&mut self, creator_connection_id: ConnectionId) -> Result<RoomId> {
360 let connection = self
361 .connections
362 .get_mut(&creator_connection_id)
363 .ok_or_else(|| anyhow!("no such connection"))?;
364 let connected_user = self
365 .connected_users
366 .get_mut(&connection.user_id)
367 .ok_or_else(|| anyhow!("no such connection"))?;
368 anyhow::ensure!(
369 connected_user.active_call.is_none(),
370 "can't create a room with an active call"
371 );
372
373 let mut room = proto::Room::default();
374 room.participants.push(proto::Participant {
375 user_id: connection.user_id.to_proto(),
376 peer_id: creator_connection_id.0,
377 project_ids: Default::default(),
378 location: Some(proto::ParticipantLocation {
379 variant: Some(proto::participant_location::Variant::External(
380 proto::participant_location::External {},
381 )),
382 }),
383 });
384
385 let room_id = post_inc(&mut self.next_room_id);
386 self.rooms.insert(room_id, room);
387 connected_user.active_call = Some(Call {
388 caller_user_id: connection.user_id,
389 room_id,
390 connection_id: Some(creator_connection_id),
391 initial_project_id: None,
392 });
393 Ok(room_id)
394 }
395
396 pub fn join_room(
397 &mut self,
398 room_id: RoomId,
399 connection_id: ConnectionId,
400 ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
401 let connection = self
402 .connections
403 .get_mut(&connection_id)
404 .ok_or_else(|| anyhow!("no such connection"))?;
405 let user_id = connection.user_id;
406 let recipient_connection_ids = self.connection_ids_for_user(user_id).collect::<Vec<_>>();
407
408 let connected_user = self
409 .connected_users
410 .get_mut(&user_id)
411 .ok_or_else(|| anyhow!("no such connection"))?;
412 let active_call = connected_user
413 .active_call
414 .as_mut()
415 .ok_or_else(|| anyhow!("not being called"))?;
416 anyhow::ensure!(
417 active_call.room_id == room_id && active_call.connection_id.is_none(),
418 "not being called on this room"
419 );
420
421 let room = self
422 .rooms
423 .get_mut(&room_id)
424 .ok_or_else(|| anyhow!("no such room"))?;
425 anyhow::ensure!(
426 room.pending_user_ids.contains(&user_id.to_proto()),
427 anyhow!("no such room")
428 );
429 room.pending_user_ids
430 .retain(|pending| *pending != user_id.to_proto());
431 room.participants.push(proto::Participant {
432 user_id: user_id.to_proto(),
433 peer_id: connection_id.0,
434 project_ids: Default::default(),
435 location: Some(proto::ParticipantLocation {
436 variant: Some(proto::participant_location::Variant::External(
437 proto::participant_location::External {},
438 )),
439 }),
440 });
441 active_call.connection_id = Some(connection_id);
442
443 Ok((room, recipient_connection_ids))
444 }
445
446 pub fn leave_room(&mut self, room_id: RoomId, connection_id: ConnectionId) -> Result<LeftRoom> {
447 let connection = self
448 .connections
449 .get_mut(&connection_id)
450 .ok_or_else(|| anyhow!("no such connection"))?;
451 let user_id = connection.user_id;
452
453 let connected_user = self
454 .connected_users
455 .get(&user_id)
456 .ok_or_else(|| anyhow!("no such connection"))?;
457 anyhow::ensure!(
458 connected_user
459 .active_call
460 .map_or(false, |call| call.room_id == room_id
461 && call.connection_id == Some(connection_id)),
462 "cannot leave a room before joining it"
463 );
464
465 // Given that users can only join one room at a time, we can safely unshare
466 // and leave all projects associated with the connection.
467 let mut unshared_projects = Vec::new();
468 let mut left_projects = Vec::new();
469 for project_id in connection.projects.clone() {
470 if let Ok((_, project)) = self.unshare_project(project_id, connection_id) {
471 unshared_projects.push(project);
472 } else if let Ok(project) = self.leave_project(project_id, connection_id) {
473 left_projects.push(project);
474 }
475 }
476 self.connected_users.get_mut(&user_id).unwrap().active_call = None;
477
478 let room = self
479 .rooms
480 .get_mut(&room_id)
481 .ok_or_else(|| anyhow!("no such room"))?;
482 room.participants
483 .retain(|participant| participant.peer_id != connection_id.0);
484 if room.participants.is_empty() && room.pending_user_ids.is_empty() {
485 self.rooms.remove(&room_id);
486 }
487
488 Ok(LeftRoom {
489 room: self.rooms.get(&room_id),
490 unshared_projects,
491 left_projects,
492 })
493 }
494
495 pub fn room(&self, room_id: RoomId) -> Option<&proto::Room> {
496 self.rooms.get(&room_id)
497 }
498
499 pub fn call(
500 &mut self,
501 room_id: RoomId,
502 recipient_user_id: UserId,
503 initial_project_id: Option<ProjectId>,
504 from_connection_id: ConnectionId,
505 ) -> Result<(&proto::Room, Vec<ConnectionId>, proto::IncomingCall)> {
506 let caller_user_id = self.user_id_for_connection(from_connection_id)?;
507
508 let recipient_connection_ids = self
509 .connection_ids_for_user(recipient_user_id)
510 .collect::<Vec<_>>();
511 let mut recipient = self
512 .connected_users
513 .get_mut(&recipient_user_id)
514 .ok_or_else(|| anyhow!("no such connection"))?;
515 anyhow::ensure!(
516 recipient.active_call.is_none(),
517 "recipient is already on another call"
518 );
519
520 let room = self
521 .rooms
522 .get_mut(&room_id)
523 .ok_or_else(|| anyhow!("no such room"))?;
524 anyhow::ensure!(
525 room.participants
526 .iter()
527 .any(|participant| participant.peer_id == from_connection_id.0),
528 "no such room"
529 );
530 anyhow::ensure!(
531 room.pending_user_ids
532 .iter()
533 .all(|user_id| UserId::from_proto(*user_id) != recipient_user_id),
534 "cannot call the same user more than once"
535 );
536 room.pending_user_ids.push(recipient_user_id.to_proto());
537
538 if let Some(initial_project_id) = initial_project_id {
539 let project = self
540 .projects
541 .get(&initial_project_id)
542 .ok_or_else(|| anyhow!("no such project"))?;
543 anyhow::ensure!(project.room_id == room_id, "no such project");
544 }
545
546 recipient.active_call = Some(Call {
547 caller_user_id,
548 room_id,
549 connection_id: None,
550 initial_project_id,
551 });
552
553 Ok((
554 room,
555 recipient_connection_ids,
556 proto::IncomingCall {
557 room_id,
558 caller_user_id: caller_user_id.to_proto(),
559 participant_user_ids: room
560 .participants
561 .iter()
562 .map(|participant| participant.user_id)
563 .collect(),
564 initial_project_id: initial_project_id.map(|project_id| project_id.to_proto()),
565 },
566 ))
567 }
568
569 pub fn call_failed(&mut self, room_id: RoomId, to_user_id: UserId) -> Result<&proto::Room> {
570 let mut recipient = self
571 .connected_users
572 .get_mut(&to_user_id)
573 .ok_or_else(|| anyhow!("no such connection"))?;
574 anyhow::ensure!(recipient
575 .active_call
576 .map_or(false, |call| call.room_id == room_id
577 && call.connection_id.is_none()));
578 recipient.active_call = None;
579 let room = self
580 .rooms
581 .get_mut(&room_id)
582 .ok_or_else(|| anyhow!("no such room"))?;
583 room.pending_user_ids
584 .retain(|user_id| UserId::from_proto(*user_id) != to_user_id);
585 Ok(room)
586 }
587
588 pub fn cancel_call(
589 &mut self,
590 room_id: RoomId,
591 recipient_user_id: UserId,
592 canceller_connection_id: ConnectionId,
593 ) -> Result<(&proto::Room, HashSet<ConnectionId>)> {
594 let canceller_user_id = self.user_id_for_connection(canceller_connection_id)?;
595 let canceller = self
596 .connected_users
597 .get(&canceller_user_id)
598 .ok_or_else(|| anyhow!("no such connection"))?;
599 let recipient = self
600 .connected_users
601 .get(&recipient_user_id)
602 .ok_or_else(|| anyhow!("no such connection"))?;
603 let canceller_active_call = canceller
604 .active_call
605 .as_ref()
606 .ok_or_else(|| anyhow!("no active call"))?;
607 let recipient_active_call = recipient
608 .active_call
609 .as_ref()
610 .ok_or_else(|| anyhow!("no active call for recipient"))?;
611
612 anyhow::ensure!(
613 canceller_active_call.room_id == room_id,
614 "users are on different calls"
615 );
616 anyhow::ensure!(
617 recipient_active_call.room_id == room_id,
618 "users are on different calls"
619 );
620 anyhow::ensure!(
621 recipient_active_call.connection_id.is_none(),
622 "recipient has already answered"
623 );
624 let room_id = recipient_active_call.room_id;
625 let room = self
626 .rooms
627 .get_mut(&room_id)
628 .ok_or_else(|| anyhow!("no such room"))?;
629 room.pending_user_ids
630 .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
631
632 let recipient = self.connected_users.get_mut(&recipient_user_id).unwrap();
633 recipient.active_call.take();
634
635 Ok((room, recipient.connection_ids.clone()))
636 }
637
638 pub fn decline_call(
639 &mut self,
640 room_id: RoomId,
641 recipient_connection_id: ConnectionId,
642 ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
643 let recipient_user_id = self.user_id_for_connection(recipient_connection_id)?;
644 let recipient = self
645 .connected_users
646 .get_mut(&recipient_user_id)
647 .ok_or_else(|| anyhow!("no such connection"))?;
648 if let Some(active_call) = recipient.active_call.take() {
649 anyhow::ensure!(active_call.room_id == room_id, "no such room");
650 let recipient_connection_ids = self
651 .connection_ids_for_user(recipient_user_id)
652 .collect::<Vec<_>>();
653 let room = self
654 .rooms
655 .get_mut(&active_call.room_id)
656 .ok_or_else(|| anyhow!("no such room"))?;
657 room.pending_user_ids
658 .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
659 Ok((room, recipient_connection_ids))
660 } else {
661 Err(anyhow!("user is not being called"))
662 }
663 }
664
665 pub fn update_participant_location(
666 &mut self,
667 room_id: RoomId,
668 location: proto::ParticipantLocation,
669 connection_id: ConnectionId,
670 ) -> Result<&proto::Room> {
671 let room = self
672 .rooms
673 .get_mut(&room_id)
674 .ok_or_else(|| anyhow!("no such room"))?;
675 if let Some(proto::participant_location::Variant::Project(project)) =
676 location.variant.as_ref()
677 {
678 anyhow::ensure!(
679 room.participants
680 .iter()
681 .any(|participant| participant.project_ids.contains(&project.id)),
682 "no such project"
683 );
684 }
685
686 let participant = room
687 .participants
688 .iter_mut()
689 .find(|participant| participant.peer_id == connection_id.0)
690 .ok_or_else(|| anyhow!("no such room"))?;
691 participant.location = Some(location);
692
693 Ok(room)
694 }
695
696 pub fn share_project(
697 &mut self,
698 room_id: RoomId,
699 project_id: ProjectId,
700 host_connection_id: ConnectionId,
701 ) -> Result<&proto::Room> {
702 let connection = self
703 .connections
704 .get_mut(&host_connection_id)
705 .ok_or_else(|| anyhow!("no such connection"))?;
706
707 let room = self
708 .rooms
709 .get_mut(&room_id)
710 .ok_or_else(|| anyhow!("no such room"))?;
711 let participant = room
712 .participants
713 .iter_mut()
714 .find(|participant| participant.peer_id == host_connection_id.0)
715 .ok_or_else(|| anyhow!("no such room"))?;
716 participant.project_ids.push(project_id.to_proto());
717
718 connection.projects.insert(project_id);
719 self.projects.insert(
720 project_id,
721 Project {
722 id: project_id,
723 room_id,
724 host_connection_id,
725 host: Collaborator {
726 user_id: connection.user_id,
727 replica_id: 0,
728 last_activity: None,
729 admin: connection.admin,
730 },
731 guests: Default::default(),
732 active_replica_ids: Default::default(),
733 worktrees: Default::default(),
734 language_servers: Default::default(),
735 },
736 );
737
738 Ok(room)
739 }
740
741 pub fn unshare_project(
742 &mut self,
743 project_id: ProjectId,
744 connection_id: ConnectionId,
745 ) -> Result<(&proto::Room, Project)> {
746 match self.projects.entry(project_id) {
747 btree_map::Entry::Occupied(e) => {
748 if e.get().host_connection_id == connection_id {
749 let project = e.remove();
750
751 if let Some(host_connection) = self.connections.get_mut(&connection_id) {
752 host_connection.projects.remove(&project_id);
753 }
754
755 for guest_connection in project.guests.keys() {
756 if let Some(connection) = self.connections.get_mut(guest_connection) {
757 connection.projects.remove(&project_id);
758 }
759 }
760
761 let room = self
762 .rooms
763 .get_mut(&project.room_id)
764 .ok_or_else(|| anyhow!("no such room"))?;
765 let participant = room
766 .participants
767 .iter_mut()
768 .find(|participant| participant.peer_id == connection_id.0)
769 .ok_or_else(|| anyhow!("no such room"))?;
770 participant
771 .project_ids
772 .retain(|id| *id != project_id.to_proto());
773
774 Ok((room, project))
775 } else {
776 Err(anyhow!("no such project"))?
777 }
778 }
779 btree_map::Entry::Vacant(_) => Err(anyhow!("no such project"))?,
780 }
781 }
782
783 pub fn update_project(
784 &mut self,
785 project_id: ProjectId,
786 worktrees: &[proto::WorktreeMetadata],
787 connection_id: ConnectionId,
788 ) -> Result<()> {
789 let project = self
790 .projects
791 .get_mut(&project_id)
792 .ok_or_else(|| anyhow!("no such project"))?;
793 if project.host_connection_id == connection_id {
794 let mut old_worktrees = mem::take(&mut project.worktrees);
795 for worktree in worktrees {
796 if let Some(old_worktree) = old_worktrees.remove(&worktree.id) {
797 project.worktrees.insert(worktree.id, old_worktree);
798 } else {
799 project.worktrees.insert(
800 worktree.id,
801 Worktree {
802 root_name: worktree.root_name.clone(),
803 visible: worktree.visible,
804 ..Default::default()
805 },
806 );
807 }
808 }
809
810 Ok(())
811 } else {
812 Err(anyhow!("no such project"))?
813 }
814 }
815
816 pub fn update_diagnostic_summary(
817 &mut self,
818 project_id: ProjectId,
819 worktree_id: u64,
820 connection_id: ConnectionId,
821 summary: proto::DiagnosticSummary,
822 ) -> Result<Vec<ConnectionId>> {
823 let project = self
824 .projects
825 .get_mut(&project_id)
826 .ok_or_else(|| anyhow!("no such project"))?;
827 if project.host_connection_id == connection_id {
828 let worktree = project
829 .worktrees
830 .get_mut(&worktree_id)
831 .ok_or_else(|| anyhow!("no such worktree"))?;
832 worktree
833 .diagnostic_summaries
834 .insert(summary.path.clone().into(), summary);
835 return Ok(project.connection_ids());
836 }
837
838 Err(anyhow!("no such worktree"))?
839 }
840
841 pub fn start_language_server(
842 &mut self,
843 project_id: ProjectId,
844 connection_id: ConnectionId,
845 language_server: proto::LanguageServer,
846 ) -> Result<Vec<ConnectionId>> {
847 let project = self
848 .projects
849 .get_mut(&project_id)
850 .ok_or_else(|| anyhow!("no such project"))?;
851 if project.host_connection_id == connection_id {
852 project.language_servers.push(language_server);
853 return Ok(project.connection_ids());
854 }
855
856 Err(anyhow!("no such project"))?
857 }
858
859 pub fn join_project(
860 &mut self,
861 requester_connection_id: ConnectionId,
862 project_id: ProjectId,
863 ) -> Result<(&Project, ReplicaId)> {
864 let connection = self
865 .connections
866 .get_mut(&requester_connection_id)
867 .ok_or_else(|| anyhow!("no such connection"))?;
868 let user = self
869 .connected_users
870 .get(&connection.user_id)
871 .ok_or_else(|| anyhow!("no such connection"))?;
872 let active_call = user.active_call.ok_or_else(|| anyhow!("no such project"))?;
873 anyhow::ensure!(
874 active_call.connection_id == Some(requester_connection_id),
875 "no such project"
876 );
877
878 let project = self
879 .projects
880 .get_mut(&project_id)
881 .ok_or_else(|| anyhow!("no such project"))?;
882 anyhow::ensure!(project.room_id == active_call.room_id, "no such project");
883
884 connection.projects.insert(project_id);
885 let mut replica_id = 1;
886 while project.active_replica_ids.contains(&replica_id) {
887 replica_id += 1;
888 }
889 project.active_replica_ids.insert(replica_id);
890 project.guests.insert(
891 requester_connection_id,
892 Collaborator {
893 replica_id,
894 user_id: connection.user_id,
895 last_activity: Some(OffsetDateTime::now_utc()),
896 admin: connection.admin,
897 },
898 );
899
900 project.host.last_activity = Some(OffsetDateTime::now_utc());
901 Ok((project, replica_id))
902 }
903
904 pub fn leave_project(
905 &mut self,
906 project_id: ProjectId,
907 connection_id: ConnectionId,
908 ) -> Result<LeftProject> {
909 let project = self
910 .projects
911 .get_mut(&project_id)
912 .ok_or_else(|| anyhow!("no such project"))?;
913
914 // If the connection leaving the project is a collaborator, remove it.
915 let remove_collaborator = if let Some(guest) = project.guests.remove(&connection_id) {
916 project.active_replica_ids.remove(&guest.replica_id);
917 true
918 } else {
919 false
920 };
921
922 if let Some(connection) = self.connections.get_mut(&connection_id) {
923 connection.projects.remove(&project_id);
924 }
925
926 Ok(LeftProject {
927 id: project.id,
928 host_connection_id: project.host_connection_id,
929 host_user_id: project.host.user_id,
930 connection_ids: project.connection_ids(),
931 remove_collaborator,
932 })
933 }
934
935 #[allow(clippy::too_many_arguments)]
936 pub fn update_worktree(
937 &mut self,
938 connection_id: ConnectionId,
939 project_id: ProjectId,
940 worktree_id: u64,
941 worktree_root_name: &str,
942 removed_entries: &[u64],
943 updated_entries: &[proto::Entry],
944 scan_id: u64,
945 is_last_update: bool,
946 ) -> Result<Vec<ConnectionId>> {
947 let project = self.write_project(project_id, connection_id)?;
948
949 let connection_ids = project.connection_ids();
950 let mut worktree = project.worktrees.entry(worktree_id).or_default();
951 worktree.root_name = worktree_root_name.to_string();
952
953 for entry_id in removed_entries {
954 worktree.entries.remove(entry_id);
955 }
956
957 for entry in updated_entries {
958 worktree.entries.insert(entry.id, entry.clone());
959 }
960
961 worktree.scan_id = scan_id;
962 worktree.is_complete = is_last_update;
963 Ok(connection_ids)
964 }
965
966 pub fn project_connection_ids(
967 &self,
968 project_id: ProjectId,
969 acting_connection_id: ConnectionId,
970 ) -> Result<Vec<ConnectionId>> {
971 Ok(self
972 .read_project(project_id, acting_connection_id)?
973 .connection_ids())
974 }
975
976 pub fn channel_connection_ids(&self, channel_id: ChannelId) -> Result<Vec<ConnectionId>> {
977 Ok(self
978 .channels
979 .get(&channel_id)
980 .ok_or_else(|| anyhow!("no such channel"))?
981 .connection_ids())
982 }
983
984 pub fn project(&self, project_id: ProjectId) -> Result<&Project> {
985 self.projects
986 .get(&project_id)
987 .ok_or_else(|| anyhow!("no such project"))
988 }
989
990 pub fn register_project_activity(
991 &mut self,
992 project_id: ProjectId,
993 connection_id: ConnectionId,
994 ) -> Result<()> {
995 let project = self
996 .projects
997 .get_mut(&project_id)
998 .ok_or_else(|| anyhow!("no such project"))?;
999 let collaborator = if connection_id == project.host_connection_id {
1000 &mut project.host
1001 } else if let Some(guest) = project.guests.get_mut(&connection_id) {
1002 guest
1003 } else {
1004 return Err(anyhow!("no such project"))?;
1005 };
1006 collaborator.last_activity = Some(OffsetDateTime::now_utc());
1007 Ok(())
1008 }
1009
1010 pub fn projects(&self) -> impl Iterator<Item = (&ProjectId, &Project)> {
1011 self.projects.iter()
1012 }
1013
1014 pub fn read_project(
1015 &self,
1016 project_id: ProjectId,
1017 connection_id: ConnectionId,
1018 ) -> Result<&Project> {
1019 let project = self
1020 .projects
1021 .get(&project_id)
1022 .ok_or_else(|| anyhow!("no such project"))?;
1023 if project.host_connection_id == connection_id
1024 || project.guests.contains_key(&connection_id)
1025 {
1026 Ok(project)
1027 } else {
1028 Err(anyhow!("no such project"))?
1029 }
1030 }
1031
1032 fn write_project(
1033 &mut self,
1034 project_id: ProjectId,
1035 connection_id: ConnectionId,
1036 ) -> Result<&mut Project> {
1037 let project = self
1038 .projects
1039 .get_mut(&project_id)
1040 .ok_or_else(|| anyhow!("no such project"))?;
1041 if project.host_connection_id == connection_id
1042 || project.guests.contains_key(&connection_id)
1043 {
1044 Ok(project)
1045 } else {
1046 Err(anyhow!("no such project"))?
1047 }
1048 }
1049
1050 #[cfg(test)]
1051 pub fn check_invariants(&self) {
1052 for (connection_id, connection) in &self.connections {
1053 for project_id in &connection.projects {
1054 let project = &self.projects.get(project_id).unwrap();
1055 if project.host_connection_id != *connection_id {
1056 assert!(project.guests.contains_key(connection_id));
1057 }
1058
1059 for (worktree_id, worktree) in project.worktrees.iter() {
1060 let mut paths = HashMap::default();
1061 for entry in worktree.entries.values() {
1062 let prev_entry = paths.insert(&entry.path, entry);
1063 assert_eq!(
1064 prev_entry,
1065 None,
1066 "worktree {:?}, duplicate path for entries {:?} and {:?}",
1067 worktree_id,
1068 prev_entry.unwrap(),
1069 entry
1070 );
1071 }
1072 }
1073 }
1074 for channel_id in &connection.channels {
1075 let channel = self.channels.get(channel_id).unwrap();
1076 assert!(channel.connection_ids.contains(connection_id));
1077 }
1078 assert!(self
1079 .connected_users
1080 .get(&connection.user_id)
1081 .unwrap()
1082 .connection_ids
1083 .contains(connection_id));
1084 }
1085
1086 for (user_id, state) in &self.connected_users {
1087 for connection_id in &state.connection_ids {
1088 assert_eq!(
1089 self.connections.get(connection_id).unwrap().user_id,
1090 *user_id
1091 );
1092 }
1093
1094 if let Some(active_call) = state.active_call.as_ref() {
1095 if let Some(active_call_connection_id) = active_call.connection_id {
1096 assert!(
1097 state.connection_ids.contains(&active_call_connection_id),
1098 "call is active on a dead connection"
1099 );
1100 assert!(
1101 state.connection_ids.contains(&active_call_connection_id),
1102 "call is active on a dead connection"
1103 );
1104 }
1105 }
1106 }
1107
1108 for (room_id, room) in &self.rooms {
1109 for pending_user_id in &room.pending_user_ids {
1110 assert!(
1111 self.connected_users
1112 .contains_key(&UserId::from_proto(*pending_user_id)),
1113 "call is active on a user that has disconnected"
1114 );
1115 }
1116
1117 for participant in &room.participants {
1118 assert!(
1119 self.connections
1120 .contains_key(&ConnectionId(participant.peer_id)),
1121 "room contains participant that has disconnected"
1122 );
1123
1124 for project_id in &participant.project_ids {
1125 let project = &self.projects[&ProjectId::from_proto(*project_id)];
1126 assert_eq!(
1127 project.room_id, *room_id,
1128 "project was shared on a different room"
1129 );
1130 }
1131 }
1132
1133 assert!(
1134 !room.pending_user_ids.is_empty() || !room.participants.is_empty(),
1135 "room can't be empty"
1136 );
1137 }
1138
1139 for (project_id, project) in &self.projects {
1140 let host_connection = self.connections.get(&project.host_connection_id).unwrap();
1141 assert!(host_connection.projects.contains(project_id));
1142
1143 for guest_connection_id in project.guests.keys() {
1144 let guest_connection = self.connections.get(guest_connection_id).unwrap();
1145 assert!(guest_connection.projects.contains(project_id));
1146 }
1147 assert_eq!(project.active_replica_ids.len(), project.guests.len(),);
1148 assert_eq!(
1149 project.active_replica_ids,
1150 project
1151 .guests
1152 .values()
1153 .map(|guest| guest.replica_id)
1154 .collect::<HashSet<_>>(),
1155 );
1156
1157 let room = &self.rooms[&project.room_id];
1158 let room_participant = room
1159 .participants
1160 .iter()
1161 .find(|participant| participant.peer_id == project.host_connection_id.0)
1162 .unwrap();
1163 assert!(
1164 room_participant
1165 .project_ids
1166 .contains(&project_id.to_proto()),
1167 "project was not shared in room"
1168 );
1169 }
1170
1171 for (channel_id, channel) in &self.channels {
1172 for connection_id in &channel.connection_ids {
1173 let connection = self.connections.get(connection_id).unwrap();
1174 assert!(connection.channels.contains(channel_id));
1175 }
1176 }
1177 }
1178}
1179
1180impl Project {
1181 fn is_active_since(&self, start_time: OffsetDateTime) -> bool {
1182 self.guests
1183 .values()
1184 .chain([&self.host])
1185 .any(|collaborator| {
1186 collaborator
1187 .last_activity
1188 .map_or(false, |active_time| active_time > start_time)
1189 })
1190 }
1191
1192 pub fn guest_connection_ids(&self) -> Vec<ConnectionId> {
1193 self.guests.keys().copied().collect()
1194 }
1195
1196 pub fn connection_ids(&self) -> Vec<ConnectionId> {
1197 self.guests
1198 .keys()
1199 .copied()
1200 .chain(Some(self.host_connection_id))
1201 .collect()
1202 }
1203}
1204
1205impl Channel {
1206 fn connection_ids(&self) -> Vec<ConnectionId> {
1207 self.connection_ids.iter().copied().collect()
1208 }
1209}