1use crate::db::{self, ChannelId, ProjectId, UserId};
2use anyhow::{anyhow, Result};
3use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet};
4use nanoid::nanoid;
5use rpc::{proto, ConnectionId};
6use serde::Serialize;
7use std::{borrow::Cow, mem, path::PathBuf, str, time::Duration};
8use time::OffsetDateTime;
9use tracing::instrument;
10use util::post_inc;
11
12pub type RoomId = u64;
13
14#[derive(Default, Serialize)]
15pub struct Store {
16 connections: BTreeMap<ConnectionId, ConnectionState>,
17 connected_users: BTreeMap<UserId, ConnectedUser>,
18 next_room_id: RoomId,
19 rooms: BTreeMap<RoomId, proto::Room>,
20 projects: BTreeMap<ProjectId, Project>,
21 #[serde(skip)]
22 channels: BTreeMap<ChannelId, Channel>,
23}
24
25#[derive(Default, Serialize)]
26struct ConnectedUser {
27 connection_ids: HashSet<ConnectionId>,
28 active_call: Option<Call>,
29}
30
31#[derive(Serialize)]
32struct ConnectionState {
33 user_id: UserId,
34 admin: bool,
35 projects: BTreeSet<ProjectId>,
36 channels: HashSet<ChannelId>,
37}
38
39#[derive(Copy, Clone, Eq, PartialEq, Serialize)]
40pub struct Call {
41 pub caller_user_id: UserId,
42 pub room_id: RoomId,
43 pub connection_id: Option<ConnectionId>,
44 pub initial_project_id: Option<ProjectId>,
45}
46
47#[derive(Serialize)]
48pub struct Project {
49 pub id: ProjectId,
50 pub room_id: RoomId,
51 pub host_connection_id: ConnectionId,
52 pub host: Collaborator,
53 pub guests: HashMap<ConnectionId, Collaborator>,
54 pub active_replica_ids: HashSet<ReplicaId>,
55 pub worktrees: BTreeMap<u64, Worktree>,
56 pub language_servers: Vec<proto::LanguageServer>,
57}
58
59#[derive(Serialize)]
60pub struct Collaborator {
61 pub replica_id: ReplicaId,
62 pub user_id: UserId,
63 #[serde(skip)]
64 pub last_activity: Option<OffsetDateTime>,
65 pub admin: bool,
66}
67
68#[derive(Default, Serialize)]
69pub struct Worktree {
70 pub abs_path: PathBuf,
71 pub root_name: String,
72 pub visible: bool,
73 #[serde(skip)]
74 pub entries: BTreeMap<u64, proto::Entry>,
75 #[serde(skip)]
76 pub diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
77 pub scan_id: u64,
78 pub is_complete: bool,
79}
80
81#[derive(Default)]
82pub struct Channel {
83 pub connection_ids: HashSet<ConnectionId>,
84}
85
86pub type ReplicaId = u16;
87
88#[derive(Default)]
89pub struct RemovedConnectionState<'a> {
90 pub user_id: UserId,
91 pub hosted_projects: Vec<Project>,
92 pub guest_projects: Vec<LeftProject>,
93 pub contact_ids: HashSet<UserId>,
94 pub room: Option<Cow<'a, proto::Room>>,
95 pub canceled_call_connection_ids: Vec<ConnectionId>,
96}
97
98pub struct LeftProject {
99 pub id: ProjectId,
100 pub host_user_id: UserId,
101 pub host_connection_id: ConnectionId,
102 pub connection_ids: Vec<ConnectionId>,
103 pub remove_collaborator: bool,
104}
105
106pub struct LeftRoom<'a> {
107 pub room: Cow<'a, proto::Room>,
108 pub unshared_projects: Vec<Project>,
109 pub left_projects: Vec<LeftProject>,
110 pub canceled_call_connection_ids: Vec<ConnectionId>,
111}
112
113#[derive(Copy, Clone)]
114pub struct Metrics {
115 pub connections: usize,
116 pub registered_projects: usize,
117 pub active_projects: usize,
118 pub shared_projects: usize,
119}
120
121impl Store {
122 pub fn metrics(&self) -> Metrics {
123 const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60);
124 let active_window_start = OffsetDateTime::now_utc() - ACTIVE_PROJECT_TIMEOUT;
125
126 let connections = self.connections.values().filter(|c| !c.admin).count();
127 let mut registered_projects = 0;
128 let mut active_projects = 0;
129 let mut shared_projects = 0;
130 for project in self.projects.values() {
131 if let Some(connection) = self.connections.get(&project.host_connection_id) {
132 if !connection.admin {
133 registered_projects += 1;
134 if project.is_active_since(active_window_start) {
135 active_projects += 1;
136 if !project.guests.is_empty() {
137 shared_projects += 1;
138 }
139 }
140 }
141 }
142 }
143
144 Metrics {
145 connections,
146 registered_projects,
147 active_projects,
148 shared_projects,
149 }
150 }
151
152 #[instrument(skip(self))]
153 pub fn add_connection(
154 &mut self,
155 connection_id: ConnectionId,
156 user_id: UserId,
157 admin: bool,
158 ) -> Option<proto::IncomingCall> {
159 self.connections.insert(
160 connection_id,
161 ConnectionState {
162 user_id,
163 admin,
164 projects: Default::default(),
165 channels: Default::default(),
166 },
167 );
168 let connected_user = self.connected_users.entry(user_id).or_default();
169 connected_user.connection_ids.insert(connection_id);
170 if let Some(active_call) = connected_user.active_call {
171 if active_call.connection_id.is_some() {
172 None
173 } else {
174 let room = self.room(active_call.room_id)?;
175 Some(proto::IncomingCall {
176 room_id: active_call.room_id,
177 caller_user_id: active_call.caller_user_id.to_proto(),
178 participant_user_ids: room
179 .participants
180 .iter()
181 .map(|participant| participant.user_id)
182 .collect(),
183 initial_project: active_call
184 .initial_project_id
185 .and_then(|id| Self::build_participant_project(id, &self.projects)),
186 })
187 }
188 } else {
189 None
190 }
191 }
192
193 #[instrument(skip(self))]
194 pub fn remove_connection(
195 &mut self,
196 connection_id: ConnectionId,
197 ) -> Result<RemovedConnectionState> {
198 let connection = self
199 .connections
200 .get_mut(&connection_id)
201 .ok_or_else(|| anyhow!("no such connection"))?;
202
203 let user_id = connection.user_id;
204 let connection_channels = mem::take(&mut connection.channels);
205
206 let mut result = RemovedConnectionState {
207 user_id,
208 ..Default::default()
209 };
210
211 // Leave all channels.
212 for channel_id in connection_channels {
213 self.leave_channel(connection_id, channel_id);
214 }
215
216 let connected_user = self.connected_users.get(&user_id).unwrap();
217 if let Some(active_call) = connected_user.active_call.as_ref() {
218 let room_id = active_call.room_id;
219 if active_call.connection_id == Some(connection_id) {
220 let left_room = self.leave_room(room_id, connection_id)?;
221 result.hosted_projects = left_room.unshared_projects;
222 result.guest_projects = left_room.left_projects;
223 result.room = Some(Cow::Owned(left_room.room.into_owned()));
224 result.canceled_call_connection_ids = left_room.canceled_call_connection_ids;
225 } else if connected_user.connection_ids.len() == 1 {
226 let (room, _) = self.decline_call(room_id, connection_id)?;
227 result.room = Some(Cow::Owned(room.clone()));
228 }
229 }
230
231 let connected_user = self.connected_users.get_mut(&user_id).unwrap();
232 connected_user.connection_ids.remove(&connection_id);
233 if connected_user.connection_ids.is_empty() {
234 self.connected_users.remove(&user_id);
235 }
236 self.connections.remove(&connection_id).unwrap();
237
238 Ok(result)
239 }
240
241 #[cfg(test)]
242 pub fn channel(&self, id: ChannelId) -> Option<&Channel> {
243 self.channels.get(&id)
244 }
245
246 pub fn join_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
247 if let Some(connection) = self.connections.get_mut(&connection_id) {
248 connection.channels.insert(channel_id);
249 self.channels
250 .entry(channel_id)
251 .or_default()
252 .connection_ids
253 .insert(connection_id);
254 }
255 }
256
257 pub fn leave_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
258 if let Some(connection) = self.connections.get_mut(&connection_id) {
259 connection.channels.remove(&channel_id);
260 if let btree_map::Entry::Occupied(mut entry) = self.channels.entry(channel_id) {
261 entry.get_mut().connection_ids.remove(&connection_id);
262 if entry.get_mut().connection_ids.is_empty() {
263 entry.remove();
264 }
265 }
266 }
267 }
268
269 pub fn user_id_for_connection(&self, connection_id: ConnectionId) -> Result<UserId> {
270 Ok(self
271 .connections
272 .get(&connection_id)
273 .ok_or_else(|| anyhow!("unknown connection"))?
274 .user_id)
275 }
276
277 pub fn connection_ids_for_user(
278 &self,
279 user_id: UserId,
280 ) -> impl Iterator<Item = ConnectionId> + '_ {
281 self.connected_users
282 .get(&user_id)
283 .into_iter()
284 .map(|state| &state.connection_ids)
285 .flatten()
286 .copied()
287 }
288
289 pub fn is_user_online(&self, user_id: UserId) -> bool {
290 !self
291 .connected_users
292 .get(&user_id)
293 .unwrap_or(&Default::default())
294 .connection_ids
295 .is_empty()
296 }
297
298 fn is_user_busy(&self, user_id: UserId) -> bool {
299 self.connected_users
300 .get(&user_id)
301 .unwrap_or(&Default::default())
302 .active_call
303 .is_some()
304 }
305
306 pub fn build_initial_contacts_update(
307 &self,
308 contacts: Vec<db::Contact>,
309 ) -> proto::UpdateContacts {
310 let mut update = proto::UpdateContacts::default();
311
312 for contact in contacts {
313 match contact {
314 db::Contact::Accepted {
315 user_id,
316 should_notify,
317 } => {
318 update
319 .contacts
320 .push(self.contact_for_user(user_id, should_notify));
321 }
322 db::Contact::Outgoing { user_id } => {
323 update.outgoing_requests.push(user_id.to_proto())
324 }
325 db::Contact::Incoming {
326 user_id,
327 should_notify,
328 } => update
329 .incoming_requests
330 .push(proto::IncomingContactRequest {
331 requester_id: user_id.to_proto(),
332 should_notify,
333 }),
334 }
335 }
336
337 update
338 }
339
340 pub fn contact_for_user(&self, user_id: UserId, should_notify: bool) -> proto::Contact {
341 proto::Contact {
342 user_id: user_id.to_proto(),
343 online: self.is_user_online(user_id),
344 busy: self.is_user_busy(user_id),
345 should_notify,
346 }
347 }
348
349 pub fn create_room(&mut self, creator_connection_id: ConnectionId) -> Result<&proto::Room> {
350 let connection = self
351 .connections
352 .get_mut(&creator_connection_id)
353 .ok_or_else(|| anyhow!("no such connection"))?;
354 let connected_user = self
355 .connected_users
356 .get_mut(&connection.user_id)
357 .ok_or_else(|| anyhow!("no such connection"))?;
358 anyhow::ensure!(
359 connected_user.active_call.is_none(),
360 "can't create a room with an active call"
361 );
362
363 let room_id = post_inc(&mut self.next_room_id);
364 let room = proto::Room {
365 id: room_id,
366 participants: vec![proto::Participant {
367 user_id: connection.user_id.to_proto(),
368 peer_id: creator_connection_id.0,
369 projects: Default::default(),
370 location: Some(proto::ParticipantLocation {
371 variant: Some(proto::participant_location::Variant::External(
372 proto::participant_location::External {},
373 )),
374 }),
375 }],
376 pending_participant_user_ids: Default::default(),
377 live_kit_room: nanoid!(30),
378 };
379
380 self.rooms.insert(room_id, room);
381 connected_user.active_call = Some(Call {
382 caller_user_id: connection.user_id,
383 room_id,
384 connection_id: Some(creator_connection_id),
385 initial_project_id: None,
386 });
387 Ok(self.rooms.get(&room_id).unwrap())
388 }
389
390 pub fn join_room(
391 &mut self,
392 room_id: RoomId,
393 connection_id: ConnectionId,
394 ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
395 let connection = self
396 .connections
397 .get_mut(&connection_id)
398 .ok_or_else(|| anyhow!("no such connection"))?;
399 let user_id = connection.user_id;
400 let recipient_connection_ids = self.connection_ids_for_user(user_id).collect::<Vec<_>>();
401
402 let connected_user = self
403 .connected_users
404 .get_mut(&user_id)
405 .ok_or_else(|| anyhow!("no such connection"))?;
406 let active_call = connected_user
407 .active_call
408 .as_mut()
409 .ok_or_else(|| anyhow!("not being called"))?;
410 anyhow::ensure!(
411 active_call.room_id == room_id && active_call.connection_id.is_none(),
412 "not being called on this room"
413 );
414
415 let room = self
416 .rooms
417 .get_mut(&room_id)
418 .ok_or_else(|| anyhow!("no such room"))?;
419 anyhow::ensure!(
420 room.pending_participant_user_ids
421 .contains(&user_id.to_proto()),
422 anyhow!("no such room")
423 );
424 room.pending_participant_user_ids
425 .retain(|pending| *pending != user_id.to_proto());
426 room.participants.push(proto::Participant {
427 user_id: user_id.to_proto(),
428 peer_id: connection_id.0,
429 projects: Default::default(),
430 location: Some(proto::ParticipantLocation {
431 variant: Some(proto::participant_location::Variant::External(
432 proto::participant_location::External {},
433 )),
434 }),
435 });
436 active_call.connection_id = Some(connection_id);
437
438 Ok((room, recipient_connection_ids))
439 }
440
441 pub fn leave_room(&mut self, room_id: RoomId, connection_id: ConnectionId) -> Result<LeftRoom> {
442 let connection = self
443 .connections
444 .get_mut(&connection_id)
445 .ok_or_else(|| anyhow!("no such connection"))?;
446 let user_id = connection.user_id;
447
448 let connected_user = self
449 .connected_users
450 .get(&user_id)
451 .ok_or_else(|| anyhow!("no such connection"))?;
452 anyhow::ensure!(
453 connected_user
454 .active_call
455 .map_or(false, |call| call.room_id == room_id
456 && call.connection_id == Some(connection_id)),
457 "cannot leave a room before joining it"
458 );
459
460 // Given that users can only join one room at a time, we can safely unshare
461 // and leave all projects associated with the connection.
462 let mut unshared_projects = Vec::new();
463 let mut left_projects = Vec::new();
464 for project_id in connection.projects.clone() {
465 if let Ok((_, project)) = self.unshare_project(project_id, connection_id) {
466 unshared_projects.push(project);
467 } else if let Ok(project) = self.leave_project(project_id, connection_id) {
468 left_projects.push(project);
469 }
470 }
471 self.connected_users.get_mut(&user_id).unwrap().active_call = None;
472
473 let room = self
474 .rooms
475 .get_mut(&room_id)
476 .ok_or_else(|| anyhow!("no such room"))?;
477 room.participants
478 .retain(|participant| participant.peer_id != connection_id.0);
479
480 let mut canceled_call_connection_ids = Vec::new();
481 room.pending_participant_user_ids
482 .retain(|pending_participant_user_id| {
483 if let Some(connected_user) = self
484 .connected_users
485 .get_mut(&UserId::from_proto(*pending_participant_user_id))
486 {
487 if let Some(call) = connected_user.active_call.as_ref() {
488 if call.caller_user_id == user_id {
489 connected_user.active_call.take();
490 canceled_call_connection_ids
491 .extend(connected_user.connection_ids.iter().copied());
492 false
493 } else {
494 true
495 }
496 } else {
497 true
498 }
499 } else {
500 true
501 }
502 });
503
504 let room = if room.participants.is_empty() {
505 Cow::Owned(self.rooms.remove(&room_id).unwrap())
506 } else {
507 Cow::Borrowed(self.rooms.get(&room_id).unwrap())
508 };
509
510 Ok(LeftRoom {
511 room,
512 unshared_projects,
513 left_projects,
514 canceled_call_connection_ids,
515 })
516 }
517
518 pub fn room(&self, room_id: RoomId) -> Option<&proto::Room> {
519 self.rooms.get(&room_id)
520 }
521
522 pub fn rooms(&self) -> &BTreeMap<RoomId, proto::Room> {
523 &self.rooms
524 }
525
526 pub fn call(
527 &mut self,
528 room_id: RoomId,
529 recipient_user_id: UserId,
530 initial_project_id: Option<ProjectId>,
531 from_connection_id: ConnectionId,
532 ) -> Result<(&proto::Room, Vec<ConnectionId>, proto::IncomingCall)> {
533 let caller_user_id = self.user_id_for_connection(from_connection_id)?;
534
535 let recipient_connection_ids = self
536 .connection_ids_for_user(recipient_user_id)
537 .collect::<Vec<_>>();
538 let mut recipient = self
539 .connected_users
540 .get_mut(&recipient_user_id)
541 .ok_or_else(|| anyhow!("no such connection"))?;
542 anyhow::ensure!(
543 recipient.active_call.is_none(),
544 "recipient is already on another call"
545 );
546
547 let room = self
548 .rooms
549 .get_mut(&room_id)
550 .ok_or_else(|| anyhow!("no such room"))?;
551 anyhow::ensure!(
552 room.participants
553 .iter()
554 .any(|participant| participant.peer_id == from_connection_id.0),
555 "no such room"
556 );
557 anyhow::ensure!(
558 room.pending_participant_user_ids
559 .iter()
560 .all(|user_id| UserId::from_proto(*user_id) != recipient_user_id),
561 "cannot call the same user more than once"
562 );
563 room.pending_participant_user_ids
564 .push(recipient_user_id.to_proto());
565
566 if let Some(initial_project_id) = initial_project_id {
567 let project = self
568 .projects
569 .get(&initial_project_id)
570 .ok_or_else(|| anyhow!("no such project"))?;
571 anyhow::ensure!(project.room_id == room_id, "no such project");
572 }
573
574 recipient.active_call = Some(Call {
575 caller_user_id,
576 room_id,
577 connection_id: None,
578 initial_project_id,
579 });
580
581 Ok((
582 room,
583 recipient_connection_ids,
584 proto::IncomingCall {
585 room_id,
586 caller_user_id: caller_user_id.to_proto(),
587 participant_user_ids: room
588 .participants
589 .iter()
590 .map(|participant| participant.user_id)
591 .collect(),
592 initial_project: initial_project_id
593 .and_then(|id| Self::build_participant_project(id, &self.projects)),
594 },
595 ))
596 }
597
598 pub fn call_failed(&mut self, room_id: RoomId, to_user_id: UserId) -> Result<&proto::Room> {
599 let mut recipient = self
600 .connected_users
601 .get_mut(&to_user_id)
602 .ok_or_else(|| anyhow!("no such connection"))?;
603 anyhow::ensure!(recipient
604 .active_call
605 .map_or(false, |call| call.room_id == room_id
606 && call.connection_id.is_none()));
607 recipient.active_call = None;
608 let room = self
609 .rooms
610 .get_mut(&room_id)
611 .ok_or_else(|| anyhow!("no such room"))?;
612 room.pending_participant_user_ids
613 .retain(|user_id| UserId::from_proto(*user_id) != to_user_id);
614 Ok(room)
615 }
616
617 pub fn cancel_call(
618 &mut self,
619 room_id: RoomId,
620 recipient_user_id: UserId,
621 canceller_connection_id: ConnectionId,
622 ) -> Result<(&proto::Room, HashSet<ConnectionId>)> {
623 let canceller_user_id = self.user_id_for_connection(canceller_connection_id)?;
624 let canceller = self
625 .connected_users
626 .get(&canceller_user_id)
627 .ok_or_else(|| anyhow!("no such connection"))?;
628 let recipient = self
629 .connected_users
630 .get(&recipient_user_id)
631 .ok_or_else(|| anyhow!("no such connection"))?;
632 let canceller_active_call = canceller
633 .active_call
634 .as_ref()
635 .ok_or_else(|| anyhow!("no active call"))?;
636 let recipient_active_call = recipient
637 .active_call
638 .as_ref()
639 .ok_or_else(|| anyhow!("no active call for recipient"))?;
640
641 anyhow::ensure!(
642 canceller_active_call.room_id == room_id,
643 "users are on different calls"
644 );
645 anyhow::ensure!(
646 recipient_active_call.room_id == room_id,
647 "users are on different calls"
648 );
649 anyhow::ensure!(
650 recipient_active_call.connection_id.is_none(),
651 "recipient has already answered"
652 );
653 let room_id = recipient_active_call.room_id;
654 let room = self
655 .rooms
656 .get_mut(&room_id)
657 .ok_or_else(|| anyhow!("no such room"))?;
658 room.pending_participant_user_ids
659 .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
660
661 let recipient = self.connected_users.get_mut(&recipient_user_id).unwrap();
662 recipient.active_call.take();
663
664 Ok((room, recipient.connection_ids.clone()))
665 }
666
667 pub fn decline_call(
668 &mut self,
669 room_id: RoomId,
670 recipient_connection_id: ConnectionId,
671 ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
672 let recipient_user_id = self.user_id_for_connection(recipient_connection_id)?;
673 let recipient = self
674 .connected_users
675 .get_mut(&recipient_user_id)
676 .ok_or_else(|| anyhow!("no such connection"))?;
677 if let Some(active_call) = recipient.active_call {
678 anyhow::ensure!(active_call.room_id == room_id, "no such room");
679 anyhow::ensure!(
680 active_call.connection_id.is_none(),
681 "cannot decline a call after joining room"
682 );
683 recipient.active_call.take();
684 let recipient_connection_ids = self
685 .connection_ids_for_user(recipient_user_id)
686 .collect::<Vec<_>>();
687 let room = self
688 .rooms
689 .get_mut(&active_call.room_id)
690 .ok_or_else(|| anyhow!("no such room"))?;
691 room.pending_participant_user_ids
692 .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
693 Ok((room, recipient_connection_ids))
694 } else {
695 Err(anyhow!("user is not being called"))
696 }
697 }
698
699 pub fn update_participant_location(
700 &mut self,
701 room_id: RoomId,
702 location: proto::ParticipantLocation,
703 connection_id: ConnectionId,
704 ) -> Result<&proto::Room> {
705 let room = self
706 .rooms
707 .get_mut(&room_id)
708 .ok_or_else(|| anyhow!("no such room"))?;
709 if let Some(proto::participant_location::Variant::SharedProject(project)) =
710 location.variant.as_ref()
711 {
712 anyhow::ensure!(
713 room.participants
714 .iter()
715 .flat_map(|participant| &participant.projects)
716 .any(|participant_project| participant_project.id == project.id),
717 "no such project"
718 );
719 }
720
721 let participant = room
722 .participants
723 .iter_mut()
724 .find(|participant| participant.peer_id == connection_id.0)
725 .ok_or_else(|| anyhow!("no such room"))?;
726 participant.location = Some(location);
727
728 Ok(room)
729 }
730
731 pub fn share_project(
732 &mut self,
733 room_id: RoomId,
734 project_id: ProjectId,
735 worktrees: Vec<proto::WorktreeMetadata>,
736 host_connection_id: ConnectionId,
737 ) -> Result<&proto::Room> {
738 let connection = self
739 .connections
740 .get_mut(&host_connection_id)
741 .ok_or_else(|| anyhow!("no such connection"))?;
742
743 let room = self
744 .rooms
745 .get_mut(&room_id)
746 .ok_or_else(|| anyhow!("no such room"))?;
747 let participant = room
748 .participants
749 .iter_mut()
750 .find(|participant| participant.peer_id == host_connection_id.0)
751 .ok_or_else(|| anyhow!("no such room"))?;
752
753 connection.projects.insert(project_id);
754 self.projects.insert(
755 project_id,
756 Project {
757 id: project_id,
758 room_id,
759 host_connection_id,
760 host: Collaborator {
761 user_id: connection.user_id,
762 replica_id: 0,
763 last_activity: None,
764 admin: connection.admin,
765 },
766 guests: Default::default(),
767 active_replica_ids: Default::default(),
768 worktrees: worktrees
769 .into_iter()
770 .map(|worktree| {
771 (
772 worktree.id,
773 Worktree {
774 root_name: worktree.root_name,
775 visible: worktree.visible,
776 ..Default::default()
777 },
778 )
779 })
780 .collect(),
781 language_servers: Default::default(),
782 },
783 );
784
785 participant
786 .projects
787 .extend(Self::build_participant_project(project_id, &self.projects));
788
789 Ok(room)
790 }
791
792 pub fn unshare_project(
793 &mut self,
794 project_id: ProjectId,
795 connection_id: ConnectionId,
796 ) -> Result<(&proto::Room, Project)> {
797 match self.projects.entry(project_id) {
798 btree_map::Entry::Occupied(e) => {
799 if e.get().host_connection_id == connection_id {
800 let project = e.remove();
801
802 if let Some(host_connection) = self.connections.get_mut(&connection_id) {
803 host_connection.projects.remove(&project_id);
804 }
805
806 for guest_connection in project.guests.keys() {
807 if let Some(connection) = self.connections.get_mut(guest_connection) {
808 connection.projects.remove(&project_id);
809 }
810 }
811
812 let room = self
813 .rooms
814 .get_mut(&project.room_id)
815 .ok_or_else(|| anyhow!("no such room"))?;
816 let participant = room
817 .participants
818 .iter_mut()
819 .find(|participant| participant.peer_id == connection_id.0)
820 .ok_or_else(|| anyhow!("no such room"))?;
821 participant
822 .projects
823 .retain(|project| project.id != project_id.to_proto());
824
825 Ok((room, project))
826 } else {
827 Err(anyhow!("no such project"))?
828 }
829 }
830 btree_map::Entry::Vacant(_) => Err(anyhow!("no such project"))?,
831 }
832 }
833
834 pub fn update_project(
835 &mut self,
836 project_id: ProjectId,
837 worktrees: &[proto::WorktreeMetadata],
838 connection_id: ConnectionId,
839 ) -> Result<&proto::Room> {
840 let project = self
841 .projects
842 .get_mut(&project_id)
843 .ok_or_else(|| anyhow!("no such project"))?;
844 if project.host_connection_id == connection_id {
845 let mut old_worktrees = mem::take(&mut project.worktrees);
846 for worktree in worktrees {
847 if let Some(old_worktree) = old_worktrees.remove(&worktree.id) {
848 project.worktrees.insert(worktree.id, old_worktree);
849 } else {
850 project.worktrees.insert(
851 worktree.id,
852 Worktree {
853 root_name: worktree.root_name.clone(),
854 visible: worktree.visible,
855 ..Default::default()
856 },
857 );
858 }
859 }
860
861 let room = self
862 .rooms
863 .get_mut(&project.room_id)
864 .ok_or_else(|| anyhow!("no such room"))?;
865 let participant_project = room
866 .participants
867 .iter_mut()
868 .flat_map(|participant| &mut participant.projects)
869 .find(|project| project.id == project_id.to_proto())
870 .ok_or_else(|| anyhow!("no such project"))?;
871 participant_project.worktree_root_names = worktrees
872 .iter()
873 .filter(|worktree| worktree.visible)
874 .map(|worktree| worktree.root_name.clone())
875 .collect();
876
877 Ok(room)
878 } else {
879 Err(anyhow!("no such project"))?
880 }
881 }
882
883 pub fn update_diagnostic_summary(
884 &mut self,
885 project_id: ProjectId,
886 worktree_id: u64,
887 connection_id: ConnectionId,
888 summary: proto::DiagnosticSummary,
889 ) -> Result<Vec<ConnectionId>> {
890 let project = self
891 .projects
892 .get_mut(&project_id)
893 .ok_or_else(|| anyhow!("no such project"))?;
894 if project.host_connection_id == connection_id {
895 let worktree = project
896 .worktrees
897 .get_mut(&worktree_id)
898 .ok_or_else(|| anyhow!("no such worktree"))?;
899 worktree
900 .diagnostic_summaries
901 .insert(summary.path.clone().into(), summary);
902 return Ok(project.connection_ids());
903 }
904
905 Err(anyhow!("no such worktree"))?
906 }
907
908 pub fn start_language_server(
909 &mut self,
910 project_id: ProjectId,
911 connection_id: ConnectionId,
912 language_server: proto::LanguageServer,
913 ) -> Result<Vec<ConnectionId>> {
914 let project = self
915 .projects
916 .get_mut(&project_id)
917 .ok_or_else(|| anyhow!("no such project"))?;
918 if project.host_connection_id == connection_id {
919 project.language_servers.push(language_server);
920 return Ok(project.connection_ids());
921 }
922
923 Err(anyhow!("no such project"))?
924 }
925
926 pub fn join_project(
927 &mut self,
928 requester_connection_id: ConnectionId,
929 project_id: ProjectId,
930 ) -> Result<(&Project, ReplicaId)> {
931 let connection = self
932 .connections
933 .get_mut(&requester_connection_id)
934 .ok_or_else(|| anyhow!("no such connection"))?;
935 let user = self
936 .connected_users
937 .get(&connection.user_id)
938 .ok_or_else(|| anyhow!("no such connection"))?;
939 let active_call = user.active_call.ok_or_else(|| anyhow!("no such project"))?;
940 anyhow::ensure!(
941 active_call.connection_id == Some(requester_connection_id),
942 "no such project"
943 );
944
945 let project = self
946 .projects
947 .get_mut(&project_id)
948 .ok_or_else(|| anyhow!("no such project"))?;
949 anyhow::ensure!(project.room_id == active_call.room_id, "no such project");
950
951 connection.projects.insert(project_id);
952 let mut replica_id = 1;
953 while project.active_replica_ids.contains(&replica_id) {
954 replica_id += 1;
955 }
956 project.active_replica_ids.insert(replica_id);
957 project.guests.insert(
958 requester_connection_id,
959 Collaborator {
960 replica_id,
961 user_id: connection.user_id,
962 last_activity: Some(OffsetDateTime::now_utc()),
963 admin: connection.admin,
964 },
965 );
966
967 project.host.last_activity = Some(OffsetDateTime::now_utc());
968 Ok((project, replica_id))
969 }
970
971 pub fn leave_project(
972 &mut self,
973 project_id: ProjectId,
974 connection_id: ConnectionId,
975 ) -> Result<LeftProject> {
976 let project = self
977 .projects
978 .get_mut(&project_id)
979 .ok_or_else(|| anyhow!("no such project"))?;
980
981 // If the connection leaving the project is a collaborator, remove it.
982 let remove_collaborator = if let Some(guest) = project.guests.remove(&connection_id) {
983 project.active_replica_ids.remove(&guest.replica_id);
984 true
985 } else {
986 false
987 };
988
989 if let Some(connection) = self.connections.get_mut(&connection_id) {
990 connection.projects.remove(&project_id);
991 }
992
993 Ok(LeftProject {
994 id: project.id,
995 host_connection_id: project.host_connection_id,
996 host_user_id: project.host.user_id,
997 connection_ids: project.connection_ids(),
998 remove_collaborator,
999 })
1000 }
1001
1002 #[allow(clippy::too_many_arguments)]
1003 pub fn update_worktree(
1004 &mut self,
1005 connection_id: ConnectionId,
1006 project_id: ProjectId,
1007 worktree_id: u64,
1008 worktree_root_name: &str,
1009 removed_entries: &[u64],
1010 updated_entries: &[proto::Entry],
1011 scan_id: u64,
1012 is_last_update: bool,
1013 ) -> Result<Vec<ConnectionId>> {
1014 let project = self.write_project(project_id, connection_id)?;
1015
1016 let connection_ids = project.connection_ids();
1017 let mut worktree = project.worktrees.entry(worktree_id).or_default();
1018 worktree.root_name = worktree_root_name.to_string();
1019
1020 for entry_id in removed_entries {
1021 worktree.entries.remove(entry_id);
1022 }
1023
1024 for entry in updated_entries {
1025 worktree.entries.insert(entry.id, entry.clone());
1026 }
1027
1028 worktree.scan_id = scan_id;
1029 worktree.is_complete = is_last_update;
1030 Ok(connection_ids)
1031 }
1032
1033 fn build_participant_project(
1034 project_id: ProjectId,
1035 projects: &BTreeMap<ProjectId, Project>,
1036 ) -> Option<proto::ParticipantProject> {
1037 Some(proto::ParticipantProject {
1038 id: project_id.to_proto(),
1039 worktree_root_names: projects
1040 .get(&project_id)?
1041 .worktrees
1042 .values()
1043 .filter(|worktree| worktree.visible)
1044 .map(|worktree| worktree.root_name.clone())
1045 .collect(),
1046 })
1047 }
1048
1049 pub fn project_connection_ids(
1050 &self,
1051 project_id: ProjectId,
1052 acting_connection_id: ConnectionId,
1053 ) -> Result<Vec<ConnectionId>> {
1054 Ok(self
1055 .read_project(project_id, acting_connection_id)?
1056 .connection_ids())
1057 }
1058
1059 pub fn channel_connection_ids(&self, channel_id: ChannelId) -> Result<Vec<ConnectionId>> {
1060 Ok(self
1061 .channels
1062 .get(&channel_id)
1063 .ok_or_else(|| anyhow!("no such channel"))?
1064 .connection_ids())
1065 }
1066
1067 pub fn project(&self, project_id: ProjectId) -> Result<&Project> {
1068 self.projects
1069 .get(&project_id)
1070 .ok_or_else(|| anyhow!("no such project"))
1071 }
1072
1073 pub fn register_project_activity(
1074 &mut self,
1075 project_id: ProjectId,
1076 connection_id: ConnectionId,
1077 ) -> Result<()> {
1078 let project = self
1079 .projects
1080 .get_mut(&project_id)
1081 .ok_or_else(|| anyhow!("no such project"))?;
1082 let collaborator = if connection_id == project.host_connection_id {
1083 &mut project.host
1084 } else if let Some(guest) = project.guests.get_mut(&connection_id) {
1085 guest
1086 } else {
1087 return Err(anyhow!("no such project"))?;
1088 };
1089 collaborator.last_activity = Some(OffsetDateTime::now_utc());
1090 Ok(())
1091 }
1092
1093 pub fn projects(&self) -> impl Iterator<Item = (&ProjectId, &Project)> {
1094 self.projects.iter()
1095 }
1096
1097 pub fn read_project(
1098 &self,
1099 project_id: ProjectId,
1100 connection_id: ConnectionId,
1101 ) -> Result<&Project> {
1102 let project = self
1103 .projects
1104 .get(&project_id)
1105 .ok_or_else(|| anyhow!("no such project"))?;
1106 if project.host_connection_id == connection_id
1107 || project.guests.contains_key(&connection_id)
1108 {
1109 Ok(project)
1110 } else {
1111 Err(anyhow!("no such project"))?
1112 }
1113 }
1114
1115 fn write_project(
1116 &mut self,
1117 project_id: ProjectId,
1118 connection_id: ConnectionId,
1119 ) -> Result<&mut Project> {
1120 let project = self
1121 .projects
1122 .get_mut(&project_id)
1123 .ok_or_else(|| anyhow!("no such project"))?;
1124 if project.host_connection_id == connection_id
1125 || project.guests.contains_key(&connection_id)
1126 {
1127 Ok(project)
1128 } else {
1129 Err(anyhow!("no such project"))?
1130 }
1131 }
1132
1133 #[cfg(test)]
1134 pub fn check_invariants(&self) {
1135 for (connection_id, connection) in &self.connections {
1136 for project_id in &connection.projects {
1137 let project = &self.projects.get(project_id).unwrap();
1138 if project.host_connection_id != *connection_id {
1139 assert!(project.guests.contains_key(connection_id));
1140 }
1141
1142 for (worktree_id, worktree) in project.worktrees.iter() {
1143 let mut paths = HashMap::default();
1144 for entry in worktree.entries.values() {
1145 let prev_entry = paths.insert(&entry.path, entry);
1146 assert_eq!(
1147 prev_entry,
1148 None,
1149 "worktree {:?}, duplicate path for entries {:?} and {:?}",
1150 worktree_id,
1151 prev_entry.unwrap(),
1152 entry
1153 );
1154 }
1155 }
1156 }
1157 for channel_id in &connection.channels {
1158 let channel = self.channels.get(channel_id).unwrap();
1159 assert!(channel.connection_ids.contains(connection_id));
1160 }
1161 assert!(self
1162 .connected_users
1163 .get(&connection.user_id)
1164 .unwrap()
1165 .connection_ids
1166 .contains(connection_id));
1167 }
1168
1169 for (user_id, state) in &self.connected_users {
1170 for connection_id in &state.connection_ids {
1171 assert_eq!(
1172 self.connections.get(connection_id).unwrap().user_id,
1173 *user_id
1174 );
1175 }
1176
1177 if let Some(active_call) = state.active_call.as_ref() {
1178 if let Some(active_call_connection_id) = active_call.connection_id {
1179 assert!(
1180 state.connection_ids.contains(&active_call_connection_id),
1181 "call is active on a dead connection"
1182 );
1183 assert!(
1184 state.connection_ids.contains(&active_call_connection_id),
1185 "call is active on a dead connection"
1186 );
1187 }
1188 }
1189 }
1190
1191 for (room_id, room) in &self.rooms {
1192 for pending_user_id in &room.pending_participant_user_ids {
1193 assert!(
1194 self.connected_users
1195 .contains_key(&UserId::from_proto(*pending_user_id)),
1196 "call is active on a user that has disconnected"
1197 );
1198 }
1199
1200 for participant in &room.participants {
1201 assert!(
1202 self.connections
1203 .contains_key(&ConnectionId(participant.peer_id)),
1204 "room {} contains participant {:?} that has disconnected",
1205 room_id,
1206 participant
1207 );
1208
1209 for participant_project in &participant.projects {
1210 let project = &self.projects[&ProjectId::from_proto(participant_project.id)];
1211 assert_eq!(
1212 project.room_id, *room_id,
1213 "project was shared on a different room"
1214 );
1215 }
1216 }
1217
1218 assert!(
1219 !room.pending_participant_user_ids.is_empty() || !room.participants.is_empty(),
1220 "room can't be empty"
1221 );
1222 }
1223
1224 for (project_id, project) in &self.projects {
1225 let host_connection = self.connections.get(&project.host_connection_id).unwrap();
1226 assert!(host_connection.projects.contains(project_id));
1227
1228 for guest_connection_id in project.guests.keys() {
1229 let guest_connection = self.connections.get(guest_connection_id).unwrap();
1230 assert!(guest_connection.projects.contains(project_id));
1231 }
1232 assert_eq!(project.active_replica_ids.len(), project.guests.len());
1233 assert_eq!(
1234 project.active_replica_ids,
1235 project
1236 .guests
1237 .values()
1238 .map(|guest| guest.replica_id)
1239 .collect::<HashSet<_>>(),
1240 );
1241
1242 let room = &self.rooms[&project.room_id];
1243 let room_participant = room
1244 .participants
1245 .iter()
1246 .find(|participant| participant.peer_id == project.host_connection_id.0)
1247 .unwrap();
1248 assert!(
1249 room_participant
1250 .projects
1251 .iter()
1252 .any(|project| project.id == project_id.to_proto()),
1253 "project was not shared in room"
1254 );
1255 }
1256
1257 for (channel_id, channel) in &self.channels {
1258 for connection_id in &channel.connection_ids {
1259 let connection = self.connections.get(connection_id).unwrap();
1260 assert!(connection.channels.contains(channel_id));
1261 }
1262 }
1263 }
1264}
1265
1266impl Project {
1267 fn is_active_since(&self, start_time: OffsetDateTime) -> bool {
1268 self.guests
1269 .values()
1270 .chain([&self.host])
1271 .any(|collaborator| {
1272 collaborator
1273 .last_activity
1274 .map_or(false, |active_time| active_time > start_time)
1275 })
1276 }
1277
1278 pub fn guest_connection_ids(&self) -> Vec<ConnectionId> {
1279 self.guests.keys().copied().collect()
1280 }
1281
1282 pub fn connection_ids(&self) -> Vec<ConnectionId> {
1283 self.guests
1284 .keys()
1285 .copied()
1286 .chain(Some(self.host_connection_id))
1287 .collect()
1288 }
1289}
1290
1291impl Channel {
1292 fn connection_ids(&self) -> Vec<ConnectionId> {
1293 self.connection_ids.iter().copied().collect()
1294 }
1295}