1use crate::db::{self, ChannelId, ProjectId, UserId};
2use anyhow::{anyhow, Result};
3use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet};
4use rpc::{proto, ConnectionId};
5use serde::Serialize;
6use std::{mem, path::PathBuf, str, time::Duration};
7use time::OffsetDateTime;
8use tracing::instrument;
9use util::post_inc;
10
11pub type RoomId = u64;
12
13#[derive(Default, Serialize)]
14pub struct Store {
15 connections: BTreeMap<ConnectionId, ConnectionState>,
16 connected_users: BTreeMap<UserId, ConnectedUser>,
17 next_room_id: RoomId,
18 rooms: BTreeMap<RoomId, proto::Room>,
19 projects: BTreeMap<ProjectId, Project>,
20 #[serde(skip)]
21 channels: BTreeMap<ChannelId, Channel>,
22}
23
24#[derive(Default, Serialize)]
25struct ConnectedUser {
26 connection_ids: HashSet<ConnectionId>,
27 active_call: Option<Call>,
28}
29
30#[derive(Serialize)]
31struct ConnectionState {
32 user_id: UserId,
33 admin: bool,
34 projects: BTreeSet<ProjectId>,
35 channels: HashSet<ChannelId>,
36}
37
38#[derive(Copy, Clone, Eq, PartialEq, Serialize)]
39pub struct Call {
40 pub caller_user_id: UserId,
41 pub room_id: RoomId,
42 pub connection_id: Option<ConnectionId>,
43 pub initial_project_id: Option<ProjectId>,
44}
45
46#[derive(Serialize)]
47pub struct Project {
48 pub id: ProjectId,
49 pub room_id: RoomId,
50 pub host_connection_id: ConnectionId,
51 pub host: Collaborator,
52 pub guests: HashMap<ConnectionId, Collaborator>,
53 pub active_replica_ids: HashSet<ReplicaId>,
54 pub worktrees: BTreeMap<u64, Worktree>,
55 pub language_servers: Vec<proto::LanguageServer>,
56}
57
58#[derive(Serialize)]
59pub struct Collaborator {
60 pub replica_id: ReplicaId,
61 pub user_id: UserId,
62 #[serde(skip)]
63 pub last_activity: Option<OffsetDateTime>,
64 pub admin: bool,
65}
66
67#[derive(Default, Serialize)]
68pub struct Worktree {
69 pub root_name: String,
70 pub visible: bool,
71 #[serde(skip)]
72 pub entries: BTreeMap<u64, proto::Entry>,
73 #[serde(skip)]
74 pub diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
75 pub scan_id: u64,
76 pub is_complete: bool,
77}
78
79#[derive(Default)]
80pub struct Channel {
81 pub connection_ids: HashSet<ConnectionId>,
82}
83
84pub type ReplicaId = u16;
85
86#[derive(Default)]
87pub struct RemovedConnectionState {
88 pub user_id: UserId,
89 pub hosted_projects: HashMap<ProjectId, Project>,
90 pub guest_project_ids: HashSet<ProjectId>,
91 pub contact_ids: HashSet<UserId>,
92 pub room_id: Option<RoomId>,
93}
94
95pub struct LeftProject {
96 pub id: ProjectId,
97 pub host_user_id: UserId,
98 pub host_connection_id: ConnectionId,
99 pub connection_ids: Vec<ConnectionId>,
100 pub remove_collaborator: bool,
101}
102
103pub struct LeftRoom<'a> {
104 pub room: Option<&'a proto::Room>,
105 pub unshared_projects: Vec<Project>,
106 pub left_projects: Vec<LeftProject>,
107}
108
109#[derive(Copy, Clone)]
110pub struct Metrics {
111 pub connections: usize,
112 pub registered_projects: usize,
113 pub active_projects: usize,
114 pub shared_projects: usize,
115}
116
117impl Store {
118 pub fn metrics(&self) -> Metrics {
119 const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60);
120 let active_window_start = OffsetDateTime::now_utc() - ACTIVE_PROJECT_TIMEOUT;
121
122 let connections = self.connections.values().filter(|c| !c.admin).count();
123 let mut registered_projects = 0;
124 let mut active_projects = 0;
125 let mut shared_projects = 0;
126 for project in self.projects.values() {
127 if let Some(connection) = self.connections.get(&project.host_connection_id) {
128 if !connection.admin {
129 registered_projects += 1;
130 if project.is_active_since(active_window_start) {
131 active_projects += 1;
132 if !project.guests.is_empty() {
133 shared_projects += 1;
134 }
135 }
136 }
137 }
138 }
139
140 Metrics {
141 connections,
142 registered_projects,
143 active_projects,
144 shared_projects,
145 }
146 }
147
148 #[instrument(skip(self))]
149 pub fn add_connection(
150 &mut self,
151 connection_id: ConnectionId,
152 user_id: UserId,
153 admin: bool,
154 ) -> Option<proto::IncomingCall> {
155 self.connections.insert(
156 connection_id,
157 ConnectionState {
158 user_id,
159 admin,
160 projects: Default::default(),
161 channels: Default::default(),
162 },
163 );
164 let connected_user = self.connected_users.entry(user_id).or_default();
165 connected_user.connection_ids.insert(connection_id);
166 if let Some(active_call) = connected_user.active_call {
167 if active_call.connection_id.is_some() {
168 None
169 } else {
170 let room = self.room(active_call.room_id)?;
171 Some(proto::IncomingCall {
172 room_id: active_call.room_id,
173 caller_user_id: active_call.caller_user_id.to_proto(),
174 participant_user_ids: room
175 .participants
176 .iter()
177 .map(|participant| participant.user_id)
178 .collect(),
179 initial_project_id: active_call
180 .initial_project_id
181 .map(|project_id| project_id.to_proto()),
182 })
183 }
184 } else {
185 None
186 }
187 }
188
189 #[instrument(skip(self))]
190 pub fn remove_connection(
191 &mut self,
192 connection_id: ConnectionId,
193 ) -> Result<RemovedConnectionState> {
194 let connection = self
195 .connections
196 .get_mut(&connection_id)
197 .ok_or_else(|| anyhow!("no such connection"))?;
198
199 let user_id = connection.user_id;
200 let connection_projects = mem::take(&mut connection.projects);
201 let connection_channels = mem::take(&mut connection.channels);
202
203 let mut result = RemovedConnectionState {
204 user_id,
205 ..Default::default()
206 };
207
208 // Leave all channels.
209 for channel_id in connection_channels {
210 self.leave_channel(connection_id, channel_id);
211 }
212
213 // Unshare and leave all projects.
214 for project_id in connection_projects {
215 if let Ok((_, project)) = self.unshare_project(project_id, connection_id) {
216 result.hosted_projects.insert(project_id, project);
217 } else if self.leave_project(project_id, connection_id).is_ok() {
218 result.guest_project_ids.insert(project_id);
219 }
220 }
221
222 let connected_user = self.connected_users.get_mut(&user_id).unwrap();
223 connected_user.connection_ids.remove(&connection_id);
224 if let Some(active_call) = connected_user.active_call.as_ref() {
225 let room_id = active_call.room_id;
226 if let Some(room) = self.rooms.get_mut(&room_id) {
227 let prev_participant_count = room.participants.len();
228 room.participants
229 .retain(|participant| participant.peer_id != connection_id.0);
230 if prev_participant_count == room.participants.len() {
231 if connected_user.connection_ids.is_empty() {
232 room.pending_user_ids
233 .retain(|pending_user_id| *pending_user_id != user_id.to_proto());
234 result.room_id = Some(room_id);
235 connected_user.active_call = None;
236 }
237 } else {
238 result.room_id = Some(room_id);
239 connected_user.active_call = None;
240 }
241
242 if room.participants.is_empty() && room.pending_user_ids.is_empty() {
243 self.rooms.remove(&room_id);
244 }
245 } else {
246 tracing::error!("disconnected user claims to be in a room that does not exist");
247 connected_user.active_call = None;
248 }
249 }
250
251 if connected_user.connection_ids.is_empty() {
252 self.connected_users.remove(&user_id);
253 }
254
255 self.connections.remove(&connection_id).unwrap();
256
257 Ok(result)
258 }
259
260 #[cfg(test)]
261 pub fn channel(&self, id: ChannelId) -> Option<&Channel> {
262 self.channels.get(&id)
263 }
264
265 pub fn join_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
266 if let Some(connection) = self.connections.get_mut(&connection_id) {
267 connection.channels.insert(channel_id);
268 self.channels
269 .entry(channel_id)
270 .or_default()
271 .connection_ids
272 .insert(connection_id);
273 }
274 }
275
276 pub fn leave_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
277 if let Some(connection) = self.connections.get_mut(&connection_id) {
278 connection.channels.remove(&channel_id);
279 if let btree_map::Entry::Occupied(mut entry) = self.channels.entry(channel_id) {
280 entry.get_mut().connection_ids.remove(&connection_id);
281 if entry.get_mut().connection_ids.is_empty() {
282 entry.remove();
283 }
284 }
285 }
286 }
287
288 pub fn user_id_for_connection(&self, connection_id: ConnectionId) -> Result<UserId> {
289 Ok(self
290 .connections
291 .get(&connection_id)
292 .ok_or_else(|| anyhow!("unknown connection"))?
293 .user_id)
294 }
295
296 pub fn connection_ids_for_user(
297 &self,
298 user_id: UserId,
299 ) -> impl Iterator<Item = ConnectionId> + '_ {
300 self.connected_users
301 .get(&user_id)
302 .into_iter()
303 .map(|state| &state.connection_ids)
304 .flatten()
305 .copied()
306 }
307
308 pub fn is_user_online(&self, user_id: UserId) -> bool {
309 !self
310 .connected_users
311 .get(&user_id)
312 .unwrap_or(&Default::default())
313 .connection_ids
314 .is_empty()
315 }
316
317 fn is_user_busy(&self, user_id: UserId) -> bool {
318 self.connected_users
319 .get(&user_id)
320 .unwrap_or(&Default::default())
321 .active_call
322 .is_some()
323 }
324
325 pub fn build_initial_contacts_update(
326 &self,
327 contacts: Vec<db::Contact>,
328 ) -> proto::UpdateContacts {
329 let mut update = proto::UpdateContacts::default();
330
331 for contact in contacts {
332 match contact {
333 db::Contact::Accepted {
334 user_id,
335 should_notify,
336 } => {
337 update
338 .contacts
339 .push(self.contact_for_user(user_id, should_notify));
340 }
341 db::Contact::Outgoing { user_id } => {
342 update.outgoing_requests.push(user_id.to_proto())
343 }
344 db::Contact::Incoming {
345 user_id,
346 should_notify,
347 } => update
348 .incoming_requests
349 .push(proto::IncomingContactRequest {
350 requester_id: user_id.to_proto(),
351 should_notify,
352 }),
353 }
354 }
355
356 update
357 }
358
359 pub fn contact_for_user(&self, user_id: UserId, should_notify: bool) -> proto::Contact {
360 proto::Contact {
361 user_id: user_id.to_proto(),
362 online: self.is_user_online(user_id),
363 busy: self.is_user_busy(user_id),
364 should_notify,
365 }
366 }
367
368 pub fn create_room(&mut self, creator_connection_id: ConnectionId) -> Result<RoomId> {
369 let connection = self
370 .connections
371 .get_mut(&creator_connection_id)
372 .ok_or_else(|| anyhow!("no such connection"))?;
373 let connected_user = self
374 .connected_users
375 .get_mut(&connection.user_id)
376 .ok_or_else(|| anyhow!("no such connection"))?;
377 anyhow::ensure!(
378 connected_user.active_call.is_none(),
379 "can't create a room with an active call"
380 );
381
382 let mut room = proto::Room::default();
383 room.participants.push(proto::Participant {
384 user_id: connection.user_id.to_proto(),
385 peer_id: creator_connection_id.0,
386 project_ids: Default::default(),
387 location: Some(proto::ParticipantLocation {
388 variant: Some(proto::participant_location::Variant::External(
389 proto::participant_location::External {},
390 )),
391 }),
392 });
393
394 let room_id = post_inc(&mut self.next_room_id);
395 self.rooms.insert(room_id, room);
396 connected_user.active_call = Some(Call {
397 caller_user_id: connection.user_id,
398 room_id,
399 connection_id: Some(creator_connection_id),
400 initial_project_id: None,
401 });
402 Ok(room_id)
403 }
404
405 pub fn join_room(
406 &mut self,
407 room_id: RoomId,
408 connection_id: ConnectionId,
409 ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
410 let connection = self
411 .connections
412 .get_mut(&connection_id)
413 .ok_or_else(|| anyhow!("no such connection"))?;
414 let user_id = connection.user_id;
415 let recipient_connection_ids = self.connection_ids_for_user(user_id).collect::<Vec<_>>();
416
417 let connected_user = self
418 .connected_users
419 .get_mut(&user_id)
420 .ok_or_else(|| anyhow!("no such connection"))?;
421 let active_call = connected_user
422 .active_call
423 .as_mut()
424 .ok_or_else(|| anyhow!("not being called"))?;
425 anyhow::ensure!(
426 active_call.room_id == room_id && active_call.connection_id.is_none(),
427 "not being called on this room"
428 );
429
430 let room = self
431 .rooms
432 .get_mut(&room_id)
433 .ok_or_else(|| anyhow!("no such room"))?;
434 anyhow::ensure!(
435 room.pending_user_ids.contains(&user_id.to_proto()),
436 anyhow!("no such room")
437 );
438 room.pending_user_ids
439 .retain(|pending| *pending != user_id.to_proto());
440 room.participants.push(proto::Participant {
441 user_id: user_id.to_proto(),
442 peer_id: connection_id.0,
443 project_ids: Default::default(),
444 location: Some(proto::ParticipantLocation {
445 variant: Some(proto::participant_location::Variant::External(
446 proto::participant_location::External {},
447 )),
448 }),
449 });
450 active_call.connection_id = Some(connection_id);
451
452 Ok((room, recipient_connection_ids))
453 }
454
455 pub fn leave_room(&mut self, room_id: RoomId, connection_id: ConnectionId) -> Result<LeftRoom> {
456 let connection = self
457 .connections
458 .get_mut(&connection_id)
459 .ok_or_else(|| anyhow!("no such connection"))?;
460 let user_id = connection.user_id;
461
462 let connected_user = self
463 .connected_users
464 .get(&user_id)
465 .ok_or_else(|| anyhow!("no such connection"))?;
466 anyhow::ensure!(
467 connected_user
468 .active_call
469 .map_or(false, |call| call.room_id == room_id
470 && call.connection_id == Some(connection_id)),
471 "cannot leave a room before joining it"
472 );
473
474 // Given that users can only join one room at a time, we can safely unshare
475 // and leave all projects associated with the connection.
476 let mut unshared_projects = Vec::new();
477 let mut left_projects = Vec::new();
478 for project_id in connection.projects.clone() {
479 if let Ok((_, project)) = self.unshare_project(project_id, connection_id) {
480 unshared_projects.push(project);
481 } else if let Ok(project) = self.leave_project(project_id, connection_id) {
482 left_projects.push(project);
483 }
484 }
485 self.connected_users.get_mut(&user_id).unwrap().active_call = None;
486
487 let room = self
488 .rooms
489 .get_mut(&room_id)
490 .ok_or_else(|| anyhow!("no such room"))?;
491 room.participants
492 .retain(|participant| participant.peer_id != connection_id.0);
493 if room.participants.is_empty() && room.pending_user_ids.is_empty() {
494 self.rooms.remove(&room_id);
495 }
496
497 Ok(LeftRoom {
498 room: self.rooms.get(&room_id),
499 unshared_projects,
500 left_projects,
501 })
502 }
503
504 pub fn room(&self, room_id: RoomId) -> Option<&proto::Room> {
505 self.rooms.get(&room_id)
506 }
507
508 pub fn call(
509 &mut self,
510 room_id: RoomId,
511 recipient_user_id: UserId,
512 initial_project_id: Option<ProjectId>,
513 from_connection_id: ConnectionId,
514 ) -> Result<(&proto::Room, Vec<ConnectionId>, proto::IncomingCall)> {
515 let caller_user_id = self.user_id_for_connection(from_connection_id)?;
516
517 let recipient_connection_ids = self
518 .connection_ids_for_user(recipient_user_id)
519 .collect::<Vec<_>>();
520 let mut recipient = self
521 .connected_users
522 .get_mut(&recipient_user_id)
523 .ok_or_else(|| anyhow!("no such connection"))?;
524 anyhow::ensure!(
525 recipient.active_call.is_none(),
526 "recipient is already on another call"
527 );
528
529 let room = self
530 .rooms
531 .get_mut(&room_id)
532 .ok_or_else(|| anyhow!("no such room"))?;
533 anyhow::ensure!(
534 room.participants
535 .iter()
536 .any(|participant| participant.peer_id == from_connection_id.0),
537 "no such room"
538 );
539 anyhow::ensure!(
540 room.pending_user_ids
541 .iter()
542 .all(|user_id| UserId::from_proto(*user_id) != recipient_user_id),
543 "cannot call the same user more than once"
544 );
545 room.pending_user_ids.push(recipient_user_id.to_proto());
546
547 if let Some(initial_project_id) = initial_project_id {
548 let project = self
549 .projects
550 .get(&initial_project_id)
551 .ok_or_else(|| anyhow!("no such project"))?;
552 anyhow::ensure!(project.room_id == room_id, "no such project");
553 }
554
555 recipient.active_call = Some(Call {
556 caller_user_id,
557 room_id,
558 connection_id: None,
559 initial_project_id,
560 });
561
562 Ok((
563 room,
564 recipient_connection_ids,
565 proto::IncomingCall {
566 room_id,
567 caller_user_id: caller_user_id.to_proto(),
568 participant_user_ids: room
569 .participants
570 .iter()
571 .map(|participant| participant.user_id)
572 .collect(),
573 initial_project_id: initial_project_id.map(|project_id| project_id.to_proto()),
574 },
575 ))
576 }
577
578 pub fn call_failed(&mut self, room_id: RoomId, to_user_id: UserId) -> Result<&proto::Room> {
579 let mut recipient = self
580 .connected_users
581 .get_mut(&to_user_id)
582 .ok_or_else(|| anyhow!("no such connection"))?;
583 anyhow::ensure!(recipient
584 .active_call
585 .map_or(false, |call| call.room_id == room_id
586 && call.connection_id.is_none()));
587 recipient.active_call = None;
588 let room = self
589 .rooms
590 .get_mut(&room_id)
591 .ok_or_else(|| anyhow!("no such room"))?;
592 room.pending_user_ids
593 .retain(|user_id| UserId::from_proto(*user_id) != to_user_id);
594 Ok(room)
595 }
596
597 pub fn cancel_call(
598 &mut self,
599 room_id: RoomId,
600 recipient_user_id: UserId,
601 canceller_connection_id: ConnectionId,
602 ) -> Result<(&proto::Room, HashSet<ConnectionId>)> {
603 let canceller_user_id = self.user_id_for_connection(canceller_connection_id)?;
604 let canceller = self
605 .connected_users
606 .get(&canceller_user_id)
607 .ok_or_else(|| anyhow!("no such connection"))?;
608 let recipient = self
609 .connected_users
610 .get(&recipient_user_id)
611 .ok_or_else(|| anyhow!("no such connection"))?;
612 let canceller_active_call = canceller
613 .active_call
614 .as_ref()
615 .ok_or_else(|| anyhow!("no active call"))?;
616 let recipient_active_call = recipient
617 .active_call
618 .as_ref()
619 .ok_or_else(|| anyhow!("no active call for recipient"))?;
620
621 anyhow::ensure!(
622 canceller_active_call.room_id == room_id,
623 "users are on different calls"
624 );
625 anyhow::ensure!(
626 recipient_active_call.room_id == room_id,
627 "users are on different calls"
628 );
629 anyhow::ensure!(
630 recipient_active_call.connection_id.is_none(),
631 "recipient has already answered"
632 );
633 let room_id = recipient_active_call.room_id;
634 let room = self
635 .rooms
636 .get_mut(&room_id)
637 .ok_or_else(|| anyhow!("no such room"))?;
638 room.pending_user_ids
639 .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
640
641 let recipient = self.connected_users.get_mut(&recipient_user_id).unwrap();
642 recipient.active_call.take();
643
644 Ok((room, recipient.connection_ids.clone()))
645 }
646
647 pub fn decline_call(
648 &mut self,
649 room_id: RoomId,
650 recipient_connection_id: ConnectionId,
651 ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
652 let recipient_user_id = self.user_id_for_connection(recipient_connection_id)?;
653 let recipient = self
654 .connected_users
655 .get_mut(&recipient_user_id)
656 .ok_or_else(|| anyhow!("no such connection"))?;
657 if let Some(active_call) = recipient.active_call.take() {
658 anyhow::ensure!(active_call.room_id == room_id, "no such room");
659 let recipient_connection_ids = self
660 .connection_ids_for_user(recipient_user_id)
661 .collect::<Vec<_>>();
662 let room = self
663 .rooms
664 .get_mut(&active_call.room_id)
665 .ok_or_else(|| anyhow!("no such room"))?;
666 room.pending_user_ids
667 .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
668 Ok((room, recipient_connection_ids))
669 } else {
670 Err(anyhow!("user is not being called"))
671 }
672 }
673
674 pub fn update_participant_location(
675 &mut self,
676 room_id: RoomId,
677 location: proto::ParticipantLocation,
678 connection_id: ConnectionId,
679 ) -> Result<&proto::Room> {
680 let room = self
681 .rooms
682 .get_mut(&room_id)
683 .ok_or_else(|| anyhow!("no such room"))?;
684 if let Some(proto::participant_location::Variant::Project(project)) =
685 location.variant.as_ref()
686 {
687 anyhow::ensure!(
688 room.participants
689 .iter()
690 .any(|participant| participant.project_ids.contains(&project.id)),
691 "no such project"
692 );
693 }
694
695 let participant = room
696 .participants
697 .iter_mut()
698 .find(|participant| participant.peer_id == connection_id.0)
699 .ok_or_else(|| anyhow!("no such room"))?;
700 participant.location = Some(location);
701
702 Ok(room)
703 }
704
705 pub fn share_project(
706 &mut self,
707 room_id: RoomId,
708 project_id: ProjectId,
709 host_connection_id: ConnectionId,
710 ) -> Result<&proto::Room> {
711 let connection = self
712 .connections
713 .get_mut(&host_connection_id)
714 .ok_or_else(|| anyhow!("no such connection"))?;
715
716 let room = self
717 .rooms
718 .get_mut(&room_id)
719 .ok_or_else(|| anyhow!("no such room"))?;
720 let participant = room
721 .participants
722 .iter_mut()
723 .find(|participant| participant.peer_id == host_connection_id.0)
724 .ok_or_else(|| anyhow!("no such room"))?;
725 participant.project_ids.push(project_id.to_proto());
726
727 connection.projects.insert(project_id);
728 self.projects.insert(
729 project_id,
730 Project {
731 id: project_id,
732 room_id,
733 host_connection_id,
734 host: Collaborator {
735 user_id: connection.user_id,
736 replica_id: 0,
737 last_activity: None,
738 admin: connection.admin,
739 },
740 guests: Default::default(),
741 active_replica_ids: Default::default(),
742 worktrees: Default::default(),
743 language_servers: Default::default(),
744 },
745 );
746
747 Ok(room)
748 }
749
750 pub fn unshare_project(
751 &mut self,
752 project_id: ProjectId,
753 connection_id: ConnectionId,
754 ) -> Result<(&proto::Room, Project)> {
755 match self.projects.entry(project_id) {
756 btree_map::Entry::Occupied(e) => {
757 if e.get().host_connection_id == connection_id {
758 let project = e.remove();
759
760 if let Some(host_connection) = self.connections.get_mut(&connection_id) {
761 host_connection.projects.remove(&project_id);
762 }
763
764 for guest_connection in project.guests.keys() {
765 if let Some(connection) = self.connections.get_mut(guest_connection) {
766 connection.projects.remove(&project_id);
767 }
768 }
769
770 let room = self
771 .rooms
772 .get_mut(&project.room_id)
773 .ok_or_else(|| anyhow!("no such room"))?;
774 let participant = room
775 .participants
776 .iter_mut()
777 .find(|participant| participant.peer_id == connection_id.0)
778 .ok_or_else(|| anyhow!("no such room"))?;
779 participant
780 .project_ids
781 .retain(|id| *id != project_id.to_proto());
782
783 Ok((room, project))
784 } else {
785 Err(anyhow!("no such project"))?
786 }
787 }
788 btree_map::Entry::Vacant(_) => Err(anyhow!("no such project"))?,
789 }
790 }
791
792 pub fn update_project(
793 &mut self,
794 project_id: ProjectId,
795 worktrees: &[proto::WorktreeMetadata],
796 connection_id: ConnectionId,
797 ) -> Result<()> {
798 let project = self
799 .projects
800 .get_mut(&project_id)
801 .ok_or_else(|| anyhow!("no such project"))?;
802 if project.host_connection_id == connection_id {
803 let mut old_worktrees = mem::take(&mut project.worktrees);
804 for worktree in worktrees {
805 if let Some(old_worktree) = old_worktrees.remove(&worktree.id) {
806 project.worktrees.insert(worktree.id, old_worktree);
807 } else {
808 project.worktrees.insert(
809 worktree.id,
810 Worktree {
811 root_name: worktree.root_name.clone(),
812 visible: worktree.visible,
813 ..Default::default()
814 },
815 );
816 }
817 }
818
819 Ok(())
820 } else {
821 Err(anyhow!("no such project"))?
822 }
823 }
824
825 pub fn update_diagnostic_summary(
826 &mut self,
827 project_id: ProjectId,
828 worktree_id: u64,
829 connection_id: ConnectionId,
830 summary: proto::DiagnosticSummary,
831 ) -> Result<Vec<ConnectionId>> {
832 let project = self
833 .projects
834 .get_mut(&project_id)
835 .ok_or_else(|| anyhow!("no such project"))?;
836 if project.host_connection_id == connection_id {
837 let worktree = project
838 .worktrees
839 .get_mut(&worktree_id)
840 .ok_or_else(|| anyhow!("no such worktree"))?;
841 worktree
842 .diagnostic_summaries
843 .insert(summary.path.clone().into(), summary);
844 return Ok(project.connection_ids());
845 }
846
847 Err(anyhow!("no such worktree"))?
848 }
849
850 pub fn start_language_server(
851 &mut self,
852 project_id: ProjectId,
853 connection_id: ConnectionId,
854 language_server: proto::LanguageServer,
855 ) -> Result<Vec<ConnectionId>> {
856 let project = self
857 .projects
858 .get_mut(&project_id)
859 .ok_or_else(|| anyhow!("no such project"))?;
860 if project.host_connection_id == connection_id {
861 project.language_servers.push(language_server);
862 return Ok(project.connection_ids());
863 }
864
865 Err(anyhow!("no such project"))?
866 }
867
868 pub fn join_project(
869 &mut self,
870 requester_connection_id: ConnectionId,
871 project_id: ProjectId,
872 ) -> Result<(&Project, ReplicaId)> {
873 let connection = self
874 .connections
875 .get_mut(&requester_connection_id)
876 .ok_or_else(|| anyhow!("no such connection"))?;
877 let user = self
878 .connected_users
879 .get(&connection.user_id)
880 .ok_or_else(|| anyhow!("no such connection"))?;
881 let active_call = user.active_call.ok_or_else(|| anyhow!("no such project"))?;
882 anyhow::ensure!(
883 active_call.connection_id == Some(requester_connection_id),
884 "no such project"
885 );
886
887 let project = self
888 .projects
889 .get_mut(&project_id)
890 .ok_or_else(|| anyhow!("no such project"))?;
891 anyhow::ensure!(project.room_id == active_call.room_id, "no such project");
892
893 connection.projects.insert(project_id);
894 let mut replica_id = 1;
895 while project.active_replica_ids.contains(&replica_id) {
896 replica_id += 1;
897 }
898 project.active_replica_ids.insert(replica_id);
899 project.guests.insert(
900 requester_connection_id,
901 Collaborator {
902 replica_id,
903 user_id: connection.user_id,
904 last_activity: Some(OffsetDateTime::now_utc()),
905 admin: connection.admin,
906 },
907 );
908
909 project.host.last_activity = Some(OffsetDateTime::now_utc());
910 Ok((project, replica_id))
911 }
912
913 pub fn leave_project(
914 &mut self,
915 project_id: ProjectId,
916 connection_id: ConnectionId,
917 ) -> Result<LeftProject> {
918 let project = self
919 .projects
920 .get_mut(&project_id)
921 .ok_or_else(|| anyhow!("no such project"))?;
922
923 // If the connection leaving the project is a collaborator, remove it.
924 let remove_collaborator = if let Some(guest) = project.guests.remove(&connection_id) {
925 project.active_replica_ids.remove(&guest.replica_id);
926 true
927 } else {
928 false
929 };
930
931 if let Some(connection) = self.connections.get_mut(&connection_id) {
932 connection.projects.remove(&project_id);
933 }
934
935 Ok(LeftProject {
936 id: project.id,
937 host_connection_id: project.host_connection_id,
938 host_user_id: project.host.user_id,
939 connection_ids: project.connection_ids(),
940 remove_collaborator,
941 })
942 }
943
944 #[allow(clippy::too_many_arguments)]
945 pub fn update_worktree(
946 &mut self,
947 connection_id: ConnectionId,
948 project_id: ProjectId,
949 worktree_id: u64,
950 worktree_root_name: &str,
951 removed_entries: &[u64],
952 updated_entries: &[proto::Entry],
953 scan_id: u64,
954 is_last_update: bool,
955 ) -> Result<Vec<ConnectionId>> {
956 let project = self.write_project(project_id, connection_id)?;
957
958 let connection_ids = project.connection_ids();
959 let mut worktree = project.worktrees.entry(worktree_id).or_default();
960 worktree.root_name = worktree_root_name.to_string();
961
962 for entry_id in removed_entries {
963 worktree.entries.remove(entry_id);
964 }
965
966 for entry in updated_entries {
967 worktree.entries.insert(entry.id, entry.clone());
968 }
969
970 worktree.scan_id = scan_id;
971 worktree.is_complete = is_last_update;
972 Ok(connection_ids)
973 }
974
975 pub fn project_connection_ids(
976 &self,
977 project_id: ProjectId,
978 acting_connection_id: ConnectionId,
979 ) -> Result<Vec<ConnectionId>> {
980 Ok(self
981 .read_project(project_id, acting_connection_id)?
982 .connection_ids())
983 }
984
985 pub fn channel_connection_ids(&self, channel_id: ChannelId) -> Result<Vec<ConnectionId>> {
986 Ok(self
987 .channels
988 .get(&channel_id)
989 .ok_or_else(|| anyhow!("no such channel"))?
990 .connection_ids())
991 }
992
993 pub fn project(&self, project_id: ProjectId) -> Result<&Project> {
994 self.projects
995 .get(&project_id)
996 .ok_or_else(|| anyhow!("no such project"))
997 }
998
999 pub fn register_project_activity(
1000 &mut self,
1001 project_id: ProjectId,
1002 connection_id: ConnectionId,
1003 ) -> Result<()> {
1004 let project = self
1005 .projects
1006 .get_mut(&project_id)
1007 .ok_or_else(|| anyhow!("no such project"))?;
1008 let collaborator = if connection_id == project.host_connection_id {
1009 &mut project.host
1010 } else if let Some(guest) = project.guests.get_mut(&connection_id) {
1011 guest
1012 } else {
1013 return Err(anyhow!("no such project"))?;
1014 };
1015 collaborator.last_activity = Some(OffsetDateTime::now_utc());
1016 Ok(())
1017 }
1018
1019 pub fn projects(&self) -> impl Iterator<Item = (&ProjectId, &Project)> {
1020 self.projects.iter()
1021 }
1022
1023 pub fn read_project(
1024 &self,
1025 project_id: ProjectId,
1026 connection_id: ConnectionId,
1027 ) -> Result<&Project> {
1028 let project = self
1029 .projects
1030 .get(&project_id)
1031 .ok_or_else(|| anyhow!("no such project"))?;
1032 if project.host_connection_id == connection_id
1033 || project.guests.contains_key(&connection_id)
1034 {
1035 Ok(project)
1036 } else {
1037 Err(anyhow!("no such project"))?
1038 }
1039 }
1040
1041 fn write_project(
1042 &mut self,
1043 project_id: ProjectId,
1044 connection_id: ConnectionId,
1045 ) -> Result<&mut Project> {
1046 let project = self
1047 .projects
1048 .get_mut(&project_id)
1049 .ok_or_else(|| anyhow!("no such project"))?;
1050 if project.host_connection_id == connection_id
1051 || project.guests.contains_key(&connection_id)
1052 {
1053 Ok(project)
1054 } else {
1055 Err(anyhow!("no such project"))?
1056 }
1057 }
1058
1059 #[cfg(test)]
1060 pub fn check_invariants(&self) {
1061 for (connection_id, connection) in &self.connections {
1062 for project_id in &connection.projects {
1063 let project = &self.projects.get(project_id).unwrap();
1064 if project.host_connection_id != *connection_id {
1065 assert!(project.guests.contains_key(connection_id));
1066 }
1067
1068 for (worktree_id, worktree) in project.worktrees.iter() {
1069 let mut paths = HashMap::default();
1070 for entry in worktree.entries.values() {
1071 let prev_entry = paths.insert(&entry.path, entry);
1072 assert_eq!(
1073 prev_entry,
1074 None,
1075 "worktree {:?}, duplicate path for entries {:?} and {:?}",
1076 worktree_id,
1077 prev_entry.unwrap(),
1078 entry
1079 );
1080 }
1081 }
1082 }
1083 for channel_id in &connection.channels {
1084 let channel = self.channels.get(channel_id).unwrap();
1085 assert!(channel.connection_ids.contains(connection_id));
1086 }
1087 assert!(self
1088 .connected_users
1089 .get(&connection.user_id)
1090 .unwrap()
1091 .connection_ids
1092 .contains(connection_id));
1093 }
1094
1095 for (user_id, state) in &self.connected_users {
1096 for connection_id in &state.connection_ids {
1097 assert_eq!(
1098 self.connections.get(connection_id).unwrap().user_id,
1099 *user_id
1100 );
1101 }
1102
1103 if let Some(active_call) = state.active_call.as_ref() {
1104 if let Some(active_call_connection_id) = active_call.connection_id {
1105 assert!(
1106 state.connection_ids.contains(&active_call_connection_id),
1107 "call is active on a dead connection"
1108 );
1109 assert!(
1110 state.connection_ids.contains(&active_call_connection_id),
1111 "call is active on a dead connection"
1112 );
1113 }
1114 }
1115 }
1116
1117 for (room_id, room) in &self.rooms {
1118 for pending_user_id in &room.pending_user_ids {
1119 assert!(
1120 self.connected_users
1121 .contains_key(&UserId::from_proto(*pending_user_id)),
1122 "call is active on a user that has disconnected"
1123 );
1124 }
1125
1126 for participant in &room.participants {
1127 assert!(
1128 self.connections
1129 .contains_key(&ConnectionId(participant.peer_id)),
1130 "room contains participant that has disconnected"
1131 );
1132
1133 for project_id in &participant.project_ids {
1134 let project = &self.projects[&ProjectId::from_proto(*project_id)];
1135 assert_eq!(
1136 project.room_id, *room_id,
1137 "project was shared on a different room"
1138 );
1139 }
1140 }
1141
1142 assert!(
1143 !room.pending_user_ids.is_empty() || !room.participants.is_empty(),
1144 "room can't be empty"
1145 );
1146 }
1147
1148 for (project_id, project) in &self.projects {
1149 let host_connection = self.connections.get(&project.host_connection_id).unwrap();
1150 assert!(host_connection.projects.contains(project_id));
1151
1152 for guest_connection_id in project.guests.keys() {
1153 let guest_connection = self.connections.get(guest_connection_id).unwrap();
1154 assert!(guest_connection.projects.contains(project_id));
1155 }
1156 assert_eq!(project.active_replica_ids.len(), project.guests.len(),);
1157 assert_eq!(
1158 project.active_replica_ids,
1159 project
1160 .guests
1161 .values()
1162 .map(|guest| guest.replica_id)
1163 .collect::<HashSet<_>>(),
1164 );
1165
1166 let room = &self.rooms[&project.room_id];
1167 let room_participant = room
1168 .participants
1169 .iter()
1170 .find(|participant| participant.peer_id == project.host_connection_id.0)
1171 .unwrap();
1172 assert!(
1173 room_participant
1174 .project_ids
1175 .contains(&project_id.to_proto()),
1176 "project was not shared in room"
1177 );
1178 }
1179
1180 for (channel_id, channel) in &self.channels {
1181 for connection_id in &channel.connection_ids {
1182 let connection = self.connections.get(connection_id).unwrap();
1183 assert!(connection.channels.contains(channel_id));
1184 }
1185 }
1186 }
1187}
1188
1189impl Project {
1190 fn is_active_since(&self, start_time: OffsetDateTime) -> bool {
1191 self.guests
1192 .values()
1193 .chain([&self.host])
1194 .any(|collaborator| {
1195 collaborator
1196 .last_activity
1197 .map_or(false, |active_time| active_time > start_time)
1198 })
1199 }
1200
1201 pub fn guest_connection_ids(&self) -> Vec<ConnectionId> {
1202 self.guests.keys().copied().collect()
1203 }
1204
1205 pub fn connection_ids(&self) -> Vec<ConnectionId> {
1206 self.guests
1207 .keys()
1208 .copied()
1209 .chain(Some(self.host_connection_id))
1210 .collect()
1211 }
1212}
1213
1214impl Channel {
1215 fn connection_ids(&self) -> Vec<ConnectionId> {
1216 self.connection_ids.iter().copied().collect()
1217 }
1218}