store.rs

   1use crate::db::{self, ProjectId, UserId};
   2use anyhow::{anyhow, Result};
   3use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet};
   4use nanoid::nanoid;
   5use rpc::{proto, ConnectionId};
   6use serde::Serialize;
   7use std::{borrow::Cow, mem, path::PathBuf, str};
   8use tracing::instrument;
   9use util::post_inc;
  10
  11pub type RoomId = u64;
  12
  13#[derive(Default, Serialize)]
  14pub struct Store {
  15    connections: BTreeMap<ConnectionId, ConnectionState>,
  16    connected_users: BTreeMap<UserId, ConnectedUser>,
  17    next_room_id: RoomId,
  18    rooms: BTreeMap<RoomId, proto::Room>,
  19    projects: BTreeMap<ProjectId, Project>,
  20}
  21
  22#[derive(Default, Serialize)]
  23struct ConnectedUser {
  24    connection_ids: HashSet<ConnectionId>,
  25    active_call: Option<Call>,
  26}
  27
  28#[derive(Serialize)]
  29struct ConnectionState {
  30    user_id: UserId,
  31    admin: bool,
  32    projects: BTreeSet<ProjectId>,
  33}
  34
  35#[derive(Copy, Clone, Eq, PartialEq, Serialize)]
  36pub struct Call {
  37    pub caller_user_id: UserId,
  38    pub room_id: RoomId,
  39    pub connection_id: Option<ConnectionId>,
  40    pub initial_project_id: Option<ProjectId>,
  41}
  42
  43#[derive(Serialize)]
  44pub struct Project {
  45    pub id: ProjectId,
  46    pub room_id: RoomId,
  47    pub host_connection_id: ConnectionId,
  48    pub host: Collaborator,
  49    pub guests: HashMap<ConnectionId, Collaborator>,
  50    pub active_replica_ids: HashSet<ReplicaId>,
  51    pub worktrees: BTreeMap<u64, Worktree>,
  52    pub language_servers: Vec<proto::LanguageServer>,
  53}
  54
  55#[derive(Serialize)]
  56pub struct Collaborator {
  57    pub replica_id: ReplicaId,
  58    pub user_id: UserId,
  59    pub admin: bool,
  60}
  61
  62#[derive(Default, Serialize)]
  63pub struct Worktree {
  64    pub abs_path: PathBuf,
  65    pub root_name: String,
  66    pub visible: bool,
  67    #[serde(skip)]
  68    pub entries: BTreeMap<u64, proto::Entry>,
  69    #[serde(skip)]
  70    pub diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
  71    pub scan_id: u64,
  72    pub is_complete: bool,
  73}
  74
  75pub type ReplicaId = u16;
  76
  77#[derive(Default)]
  78pub struct RemovedConnectionState<'a> {
  79    pub user_id: UserId,
  80    pub hosted_projects: Vec<Project>,
  81    pub guest_projects: Vec<LeftProject>,
  82    pub contact_ids: HashSet<UserId>,
  83    pub room: Option<Cow<'a, proto::Room>>,
  84    pub canceled_call_connection_ids: Vec<ConnectionId>,
  85}
  86
  87pub struct LeftProject {
  88    pub id: ProjectId,
  89    pub host_user_id: UserId,
  90    pub host_connection_id: ConnectionId,
  91    pub connection_ids: Vec<ConnectionId>,
  92    pub remove_collaborator: bool,
  93}
  94
  95pub struct LeftRoom<'a> {
  96    pub room: Cow<'a, proto::Room>,
  97    pub unshared_projects: Vec<Project>,
  98    pub left_projects: Vec<LeftProject>,
  99    pub canceled_call_connection_ids: Vec<ConnectionId>,
 100}
 101
 102#[derive(Copy, Clone)]
 103pub struct Metrics {
 104    pub connections: usize,
 105    pub shared_projects: usize,
 106}
 107
 108impl Store {
 109    pub fn metrics(&self) -> Metrics {
 110        let connections = self.connections.values().filter(|c| !c.admin).count();
 111        let mut shared_projects = 0;
 112        for project in self.projects.values() {
 113            if let Some(connection) = self.connections.get(&project.host_connection_id) {
 114                if !connection.admin {
 115                    shared_projects += 1;
 116                }
 117            }
 118        }
 119
 120        Metrics {
 121            connections,
 122            shared_projects,
 123        }
 124    }
 125
 126    #[instrument(skip(self))]
 127    pub fn add_connection(
 128        &mut self,
 129        connection_id: ConnectionId,
 130        user_id: UserId,
 131        admin: bool,
 132    ) -> Option<proto::IncomingCall> {
 133        self.connections.insert(
 134            connection_id,
 135            ConnectionState {
 136                user_id,
 137                admin,
 138                projects: Default::default(),
 139            },
 140        );
 141        let connected_user = self.connected_users.entry(user_id).or_default();
 142        connected_user.connection_ids.insert(connection_id);
 143        if let Some(active_call) = connected_user.active_call {
 144            if active_call.connection_id.is_some() {
 145                None
 146            } else {
 147                let room = self.room(active_call.room_id)?;
 148                Some(proto::IncomingCall {
 149                    room_id: active_call.room_id,
 150                    caller_user_id: active_call.caller_user_id.to_proto(),
 151                    participant_user_ids: room
 152                        .participants
 153                        .iter()
 154                        .map(|participant| participant.user_id)
 155                        .collect(),
 156                    initial_project: active_call
 157                        .initial_project_id
 158                        .and_then(|id| Self::build_participant_project(id, &self.projects)),
 159                })
 160            }
 161        } else {
 162            None
 163        }
 164    }
 165
 166    #[instrument(skip(self))]
 167    pub fn remove_connection(
 168        &mut self,
 169        connection_id: ConnectionId,
 170    ) -> Result<RemovedConnectionState> {
 171        let connection = self
 172            .connections
 173            .get_mut(&connection_id)
 174            .ok_or_else(|| anyhow!("no such connection"))?;
 175
 176        let user_id = connection.user_id;
 177
 178        let mut result = RemovedConnectionState {
 179            user_id,
 180            ..Default::default()
 181        };
 182
 183        let connected_user = self.connected_users.get(&user_id).unwrap();
 184        if let Some(active_call) = connected_user.active_call.as_ref() {
 185            let room_id = active_call.room_id;
 186            if active_call.connection_id == Some(connection_id) {
 187                let left_room = self.leave_room(room_id, connection_id)?;
 188                result.hosted_projects = left_room.unshared_projects;
 189                result.guest_projects = left_room.left_projects;
 190                result.room = Some(Cow::Owned(left_room.room.into_owned()));
 191                result.canceled_call_connection_ids = left_room.canceled_call_connection_ids;
 192            } else if connected_user.connection_ids.len() == 1 {
 193                let (room, _) = self.decline_call(room_id, connection_id)?;
 194                result.room = Some(Cow::Owned(room.clone()));
 195            }
 196        }
 197
 198        let connected_user = self.connected_users.get_mut(&user_id).unwrap();
 199        connected_user.connection_ids.remove(&connection_id);
 200        if connected_user.connection_ids.is_empty() {
 201            self.connected_users.remove(&user_id);
 202        }
 203        self.connections.remove(&connection_id).unwrap();
 204
 205        Ok(result)
 206    }
 207
 208    pub fn user_id_for_connection(&self, connection_id: ConnectionId) -> Result<UserId> {
 209        Ok(self
 210            .connections
 211            .get(&connection_id)
 212            .ok_or_else(|| anyhow!("unknown connection"))?
 213            .user_id)
 214    }
 215
 216    pub fn connection_ids_for_user(
 217        &self,
 218        user_id: UserId,
 219    ) -> impl Iterator<Item = ConnectionId> + '_ {
 220        self.connected_users
 221            .get(&user_id)
 222            .into_iter()
 223            .map(|state| &state.connection_ids)
 224            .flatten()
 225            .copied()
 226    }
 227
 228    pub fn is_user_online(&self, user_id: UserId) -> bool {
 229        !self
 230            .connected_users
 231            .get(&user_id)
 232            .unwrap_or(&Default::default())
 233            .connection_ids
 234            .is_empty()
 235    }
 236
 237    fn is_user_busy(&self, user_id: UserId) -> bool {
 238        self.connected_users
 239            .get(&user_id)
 240            .unwrap_or(&Default::default())
 241            .active_call
 242            .is_some()
 243    }
 244
 245    pub fn build_initial_contacts_update(
 246        &self,
 247        contacts: Vec<db::Contact>,
 248    ) -> proto::UpdateContacts {
 249        let mut update = proto::UpdateContacts::default();
 250
 251        for contact in contacts {
 252            match contact {
 253                db::Contact::Accepted {
 254                    user_id,
 255                    should_notify,
 256                } => {
 257                    update
 258                        .contacts
 259                        .push(self.contact_for_user(user_id, should_notify));
 260                }
 261                db::Contact::Outgoing { user_id } => {
 262                    update.outgoing_requests.push(user_id.to_proto())
 263                }
 264                db::Contact::Incoming {
 265                    user_id,
 266                    should_notify,
 267                } => update
 268                    .incoming_requests
 269                    .push(proto::IncomingContactRequest {
 270                        requester_id: user_id.to_proto(),
 271                        should_notify,
 272                    }),
 273            }
 274        }
 275
 276        update
 277    }
 278
 279    pub fn contact_for_user(&self, user_id: UserId, should_notify: bool) -> proto::Contact {
 280        proto::Contact {
 281            user_id: user_id.to_proto(),
 282            online: self.is_user_online(user_id),
 283            busy: self.is_user_busy(user_id),
 284            should_notify,
 285        }
 286    }
 287
 288    pub fn create_room(&mut self, creator_connection_id: ConnectionId) -> Result<&proto::Room> {
 289        let connection = self
 290            .connections
 291            .get_mut(&creator_connection_id)
 292            .ok_or_else(|| anyhow!("no such connection"))?;
 293        let connected_user = self
 294            .connected_users
 295            .get_mut(&connection.user_id)
 296            .ok_or_else(|| anyhow!("no such connection"))?;
 297        anyhow::ensure!(
 298            connected_user.active_call.is_none(),
 299            "can't create a room with an active call"
 300        );
 301
 302        let room_id = post_inc(&mut self.next_room_id);
 303        let room = proto::Room {
 304            id: room_id,
 305            participants: vec![proto::Participant {
 306                user_id: connection.user_id.to_proto(),
 307                peer_id: creator_connection_id.0,
 308                projects: Default::default(),
 309                location: Some(proto::ParticipantLocation {
 310                    variant: Some(proto::participant_location::Variant::External(
 311                        proto::participant_location::External {},
 312                    )),
 313                }),
 314            }],
 315            pending_participant_user_ids: Default::default(),
 316            live_kit_room: nanoid!(30),
 317        };
 318
 319        self.rooms.insert(room_id, room);
 320        connected_user.active_call = Some(Call {
 321            caller_user_id: connection.user_id,
 322            room_id,
 323            connection_id: Some(creator_connection_id),
 324            initial_project_id: None,
 325        });
 326        Ok(self.rooms.get(&room_id).unwrap())
 327    }
 328
 329    pub fn join_room(
 330        &mut self,
 331        room_id: RoomId,
 332        connection_id: ConnectionId,
 333    ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
 334        let connection = self
 335            .connections
 336            .get_mut(&connection_id)
 337            .ok_or_else(|| anyhow!("no such connection"))?;
 338        let user_id = connection.user_id;
 339        let recipient_connection_ids = self.connection_ids_for_user(user_id).collect::<Vec<_>>();
 340
 341        let connected_user = self
 342            .connected_users
 343            .get_mut(&user_id)
 344            .ok_or_else(|| anyhow!("no such connection"))?;
 345        let active_call = connected_user
 346            .active_call
 347            .as_mut()
 348            .ok_or_else(|| anyhow!("not being called"))?;
 349        anyhow::ensure!(
 350            active_call.room_id == room_id && active_call.connection_id.is_none(),
 351            "not being called on this room"
 352        );
 353
 354        let room = self
 355            .rooms
 356            .get_mut(&room_id)
 357            .ok_or_else(|| anyhow!("no such room"))?;
 358        anyhow::ensure!(
 359            room.pending_participant_user_ids
 360                .contains(&user_id.to_proto()),
 361            anyhow!("no such room")
 362        );
 363        room.pending_participant_user_ids
 364            .retain(|pending| *pending != user_id.to_proto());
 365        room.participants.push(proto::Participant {
 366            user_id: user_id.to_proto(),
 367            peer_id: connection_id.0,
 368            projects: Default::default(),
 369            location: Some(proto::ParticipantLocation {
 370                variant: Some(proto::participant_location::Variant::External(
 371                    proto::participant_location::External {},
 372                )),
 373            }),
 374        });
 375        active_call.connection_id = Some(connection_id);
 376
 377        Ok((room, recipient_connection_ids))
 378    }
 379
 380    pub fn leave_room(&mut self, room_id: RoomId, connection_id: ConnectionId) -> Result<LeftRoom> {
 381        let connection = self
 382            .connections
 383            .get_mut(&connection_id)
 384            .ok_or_else(|| anyhow!("no such connection"))?;
 385        let user_id = connection.user_id;
 386
 387        let connected_user = self
 388            .connected_users
 389            .get(&user_id)
 390            .ok_or_else(|| anyhow!("no such connection"))?;
 391        anyhow::ensure!(
 392            connected_user
 393                .active_call
 394                .map_or(false, |call| call.room_id == room_id
 395                    && call.connection_id == Some(connection_id)),
 396            "cannot leave a room before joining it"
 397        );
 398
 399        // Given that users can only join one room at a time, we can safely unshare
 400        // and leave all projects associated with the connection.
 401        let mut unshared_projects = Vec::new();
 402        let mut left_projects = Vec::new();
 403        for project_id in connection.projects.clone() {
 404            if let Ok((_, project)) = self.unshare_project(project_id, connection_id) {
 405                unshared_projects.push(project);
 406            } else if let Ok(project) = self.leave_project(project_id, connection_id) {
 407                left_projects.push(project);
 408            }
 409        }
 410        self.connected_users.get_mut(&user_id).unwrap().active_call = None;
 411
 412        let room = self
 413            .rooms
 414            .get_mut(&room_id)
 415            .ok_or_else(|| anyhow!("no such room"))?;
 416        room.participants
 417            .retain(|participant| participant.peer_id != connection_id.0);
 418
 419        let mut canceled_call_connection_ids = Vec::new();
 420        room.pending_participant_user_ids
 421            .retain(|pending_participant_user_id| {
 422                if let Some(connected_user) = self
 423                    .connected_users
 424                    .get_mut(&UserId::from_proto(*pending_participant_user_id))
 425                {
 426                    if let Some(call) = connected_user.active_call.as_ref() {
 427                        if call.caller_user_id == user_id {
 428                            connected_user.active_call.take();
 429                            canceled_call_connection_ids
 430                                .extend(connected_user.connection_ids.iter().copied());
 431                            false
 432                        } else {
 433                            true
 434                        }
 435                    } else {
 436                        true
 437                    }
 438                } else {
 439                    true
 440                }
 441            });
 442
 443        let room = if room.participants.is_empty() {
 444            Cow::Owned(self.rooms.remove(&room_id).unwrap())
 445        } else {
 446            Cow::Borrowed(self.rooms.get(&room_id).unwrap())
 447        };
 448
 449        Ok(LeftRoom {
 450            room,
 451            unshared_projects,
 452            left_projects,
 453            canceled_call_connection_ids,
 454        })
 455    }
 456
 457    pub fn room(&self, room_id: RoomId) -> Option<&proto::Room> {
 458        self.rooms.get(&room_id)
 459    }
 460
 461    pub fn rooms(&self) -> &BTreeMap<RoomId, proto::Room> {
 462        &self.rooms
 463    }
 464
 465    pub fn call(
 466        &mut self,
 467        room_id: RoomId,
 468        recipient_user_id: UserId,
 469        initial_project_id: Option<ProjectId>,
 470        from_connection_id: ConnectionId,
 471    ) -> Result<(&proto::Room, Vec<ConnectionId>, proto::IncomingCall)> {
 472        let caller_user_id = self.user_id_for_connection(from_connection_id)?;
 473
 474        let recipient_connection_ids = self
 475            .connection_ids_for_user(recipient_user_id)
 476            .collect::<Vec<_>>();
 477        let mut recipient = self
 478            .connected_users
 479            .get_mut(&recipient_user_id)
 480            .ok_or_else(|| anyhow!("no such connection"))?;
 481        anyhow::ensure!(
 482            recipient.active_call.is_none(),
 483            "recipient is already on another call"
 484        );
 485
 486        let room = self
 487            .rooms
 488            .get_mut(&room_id)
 489            .ok_or_else(|| anyhow!("no such room"))?;
 490        anyhow::ensure!(
 491            room.participants
 492                .iter()
 493                .any(|participant| participant.peer_id == from_connection_id.0),
 494            "no such room"
 495        );
 496        anyhow::ensure!(
 497            room.pending_participant_user_ids
 498                .iter()
 499                .all(|user_id| UserId::from_proto(*user_id) != recipient_user_id),
 500            "cannot call the same user more than once"
 501        );
 502        room.pending_participant_user_ids
 503            .push(recipient_user_id.to_proto());
 504
 505        if let Some(initial_project_id) = initial_project_id {
 506            let project = self
 507                .projects
 508                .get(&initial_project_id)
 509                .ok_or_else(|| anyhow!("no such project"))?;
 510            anyhow::ensure!(project.room_id == room_id, "no such project");
 511        }
 512
 513        recipient.active_call = Some(Call {
 514            caller_user_id,
 515            room_id,
 516            connection_id: None,
 517            initial_project_id,
 518        });
 519
 520        Ok((
 521            room,
 522            recipient_connection_ids,
 523            proto::IncomingCall {
 524                room_id,
 525                caller_user_id: caller_user_id.to_proto(),
 526                participant_user_ids: room
 527                    .participants
 528                    .iter()
 529                    .map(|participant| participant.user_id)
 530                    .collect(),
 531                initial_project: initial_project_id
 532                    .and_then(|id| Self::build_participant_project(id, &self.projects)),
 533            },
 534        ))
 535    }
 536
 537    pub fn call_failed(&mut self, room_id: RoomId, to_user_id: UserId) -> Result<&proto::Room> {
 538        let mut recipient = self
 539            .connected_users
 540            .get_mut(&to_user_id)
 541            .ok_or_else(|| anyhow!("no such connection"))?;
 542        anyhow::ensure!(recipient
 543            .active_call
 544            .map_or(false, |call| call.room_id == room_id
 545                && call.connection_id.is_none()));
 546        recipient.active_call = None;
 547        let room = self
 548            .rooms
 549            .get_mut(&room_id)
 550            .ok_or_else(|| anyhow!("no such room"))?;
 551        room.pending_participant_user_ids
 552            .retain(|user_id| UserId::from_proto(*user_id) != to_user_id);
 553        Ok(room)
 554    }
 555
 556    pub fn cancel_call(
 557        &mut self,
 558        room_id: RoomId,
 559        recipient_user_id: UserId,
 560        canceller_connection_id: ConnectionId,
 561    ) -> Result<(&proto::Room, HashSet<ConnectionId>)> {
 562        let canceller_user_id = self.user_id_for_connection(canceller_connection_id)?;
 563        let canceller = self
 564            .connected_users
 565            .get(&canceller_user_id)
 566            .ok_or_else(|| anyhow!("no such connection"))?;
 567        let recipient = self
 568            .connected_users
 569            .get(&recipient_user_id)
 570            .ok_or_else(|| anyhow!("no such connection"))?;
 571        let canceller_active_call = canceller
 572            .active_call
 573            .as_ref()
 574            .ok_or_else(|| anyhow!("no active call"))?;
 575        let recipient_active_call = recipient
 576            .active_call
 577            .as_ref()
 578            .ok_or_else(|| anyhow!("no active call for recipient"))?;
 579
 580        anyhow::ensure!(
 581            canceller_active_call.room_id == room_id,
 582            "users are on different calls"
 583        );
 584        anyhow::ensure!(
 585            recipient_active_call.room_id == room_id,
 586            "users are on different calls"
 587        );
 588        anyhow::ensure!(
 589            recipient_active_call.connection_id.is_none(),
 590            "recipient has already answered"
 591        );
 592        let room_id = recipient_active_call.room_id;
 593        let room = self
 594            .rooms
 595            .get_mut(&room_id)
 596            .ok_or_else(|| anyhow!("no such room"))?;
 597        room.pending_participant_user_ids
 598            .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
 599
 600        let recipient = self.connected_users.get_mut(&recipient_user_id).unwrap();
 601        recipient.active_call.take();
 602
 603        Ok((room, recipient.connection_ids.clone()))
 604    }
 605
 606    pub fn decline_call(
 607        &mut self,
 608        room_id: RoomId,
 609        recipient_connection_id: ConnectionId,
 610    ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
 611        let recipient_user_id = self.user_id_for_connection(recipient_connection_id)?;
 612        let recipient = self
 613            .connected_users
 614            .get_mut(&recipient_user_id)
 615            .ok_or_else(|| anyhow!("no such connection"))?;
 616        if let Some(active_call) = recipient.active_call {
 617            anyhow::ensure!(active_call.room_id == room_id, "no such room");
 618            anyhow::ensure!(
 619                active_call.connection_id.is_none(),
 620                "cannot decline a call after joining room"
 621            );
 622            recipient.active_call.take();
 623            let recipient_connection_ids = self
 624                .connection_ids_for_user(recipient_user_id)
 625                .collect::<Vec<_>>();
 626            let room = self
 627                .rooms
 628                .get_mut(&active_call.room_id)
 629                .ok_or_else(|| anyhow!("no such room"))?;
 630            room.pending_participant_user_ids
 631                .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
 632            Ok((room, recipient_connection_ids))
 633        } else {
 634            Err(anyhow!("user is not being called"))
 635        }
 636    }
 637
 638    pub fn update_participant_location(
 639        &mut self,
 640        room_id: RoomId,
 641        location: proto::ParticipantLocation,
 642        connection_id: ConnectionId,
 643    ) -> Result<&proto::Room> {
 644        let room = self
 645            .rooms
 646            .get_mut(&room_id)
 647            .ok_or_else(|| anyhow!("no such room"))?;
 648        if let Some(proto::participant_location::Variant::SharedProject(project)) =
 649            location.variant.as_ref()
 650        {
 651            anyhow::ensure!(
 652                room.participants
 653                    .iter()
 654                    .flat_map(|participant| &participant.projects)
 655                    .any(|participant_project| participant_project.id == project.id),
 656                "no such project"
 657            );
 658        }
 659
 660        let participant = room
 661            .participants
 662            .iter_mut()
 663            .find(|participant| participant.peer_id == connection_id.0)
 664            .ok_or_else(|| anyhow!("no such room"))?;
 665        participant.location = Some(location);
 666
 667        Ok(room)
 668    }
 669
 670    pub fn share_project(
 671        &mut self,
 672        room_id: RoomId,
 673        project_id: ProjectId,
 674        worktrees: Vec<proto::WorktreeMetadata>,
 675        host_connection_id: ConnectionId,
 676    ) -> Result<&proto::Room> {
 677        let connection = self
 678            .connections
 679            .get_mut(&host_connection_id)
 680            .ok_or_else(|| anyhow!("no such connection"))?;
 681
 682        let room = self
 683            .rooms
 684            .get_mut(&room_id)
 685            .ok_or_else(|| anyhow!("no such room"))?;
 686        let participant = room
 687            .participants
 688            .iter_mut()
 689            .find(|participant| participant.peer_id == host_connection_id.0)
 690            .ok_or_else(|| anyhow!("no such room"))?;
 691
 692        connection.projects.insert(project_id);
 693        self.projects.insert(
 694            project_id,
 695            Project {
 696                id: project_id,
 697                room_id,
 698                host_connection_id,
 699                host: Collaborator {
 700                    user_id: connection.user_id,
 701                    replica_id: 0,
 702                    admin: connection.admin,
 703                },
 704                guests: Default::default(),
 705                active_replica_ids: Default::default(),
 706                worktrees: worktrees
 707                    .into_iter()
 708                    .map(|worktree| {
 709                        (
 710                            worktree.id,
 711                            Worktree {
 712                                root_name: worktree.root_name,
 713                                visible: worktree.visible,
 714                                ..Default::default()
 715                            },
 716                        )
 717                    })
 718                    .collect(),
 719                language_servers: Default::default(),
 720            },
 721        );
 722
 723        participant
 724            .projects
 725            .extend(Self::build_participant_project(project_id, &self.projects));
 726
 727        Ok(room)
 728    }
 729
 730    pub fn unshare_project(
 731        &mut self,
 732        project_id: ProjectId,
 733        connection_id: ConnectionId,
 734    ) -> Result<(&proto::Room, Project)> {
 735        match self.projects.entry(project_id) {
 736            btree_map::Entry::Occupied(e) => {
 737                if e.get().host_connection_id == connection_id {
 738                    let project = e.remove();
 739
 740                    if let Some(host_connection) = self.connections.get_mut(&connection_id) {
 741                        host_connection.projects.remove(&project_id);
 742                    }
 743
 744                    for guest_connection in project.guests.keys() {
 745                        if let Some(connection) = self.connections.get_mut(guest_connection) {
 746                            connection.projects.remove(&project_id);
 747                        }
 748                    }
 749
 750                    let room = self
 751                        .rooms
 752                        .get_mut(&project.room_id)
 753                        .ok_or_else(|| anyhow!("no such room"))?;
 754                    let participant = room
 755                        .participants
 756                        .iter_mut()
 757                        .find(|participant| participant.peer_id == connection_id.0)
 758                        .ok_or_else(|| anyhow!("no such room"))?;
 759                    participant
 760                        .projects
 761                        .retain(|project| project.id != project_id.to_proto());
 762
 763                    Ok((room, project))
 764                } else {
 765                    Err(anyhow!("no such project"))?
 766                }
 767            }
 768            btree_map::Entry::Vacant(_) => Err(anyhow!("no such project"))?,
 769        }
 770    }
 771
 772    pub fn update_project(
 773        &mut self,
 774        project_id: ProjectId,
 775        worktrees: &[proto::WorktreeMetadata],
 776        connection_id: ConnectionId,
 777    ) -> Result<&proto::Room> {
 778        let project = self
 779            .projects
 780            .get_mut(&project_id)
 781            .ok_or_else(|| anyhow!("no such project"))?;
 782        if project.host_connection_id == connection_id {
 783            let mut old_worktrees = mem::take(&mut project.worktrees);
 784            for worktree in worktrees {
 785                if let Some(old_worktree) = old_worktrees.remove(&worktree.id) {
 786                    project.worktrees.insert(worktree.id, old_worktree);
 787                } else {
 788                    project.worktrees.insert(
 789                        worktree.id,
 790                        Worktree {
 791                            root_name: worktree.root_name.clone(),
 792                            visible: worktree.visible,
 793                            ..Default::default()
 794                        },
 795                    );
 796                }
 797            }
 798
 799            let room = self
 800                .rooms
 801                .get_mut(&project.room_id)
 802                .ok_or_else(|| anyhow!("no such room"))?;
 803            let participant_project = room
 804                .participants
 805                .iter_mut()
 806                .flat_map(|participant| &mut participant.projects)
 807                .find(|project| project.id == project_id.to_proto())
 808                .ok_or_else(|| anyhow!("no such project"))?;
 809            participant_project.worktree_root_names = worktrees
 810                .iter()
 811                .filter(|worktree| worktree.visible)
 812                .map(|worktree| worktree.root_name.clone())
 813                .collect();
 814
 815            Ok(room)
 816        } else {
 817            Err(anyhow!("no such project"))?
 818        }
 819    }
 820
 821    pub fn update_diagnostic_summary(
 822        &mut self,
 823        project_id: ProjectId,
 824        worktree_id: u64,
 825        connection_id: ConnectionId,
 826        summary: proto::DiagnosticSummary,
 827    ) -> Result<Vec<ConnectionId>> {
 828        let project = self
 829            .projects
 830            .get_mut(&project_id)
 831            .ok_or_else(|| anyhow!("no such project"))?;
 832        if project.host_connection_id == connection_id {
 833            let worktree = project
 834                .worktrees
 835                .get_mut(&worktree_id)
 836                .ok_or_else(|| anyhow!("no such worktree"))?;
 837            worktree
 838                .diagnostic_summaries
 839                .insert(summary.path.clone().into(), summary);
 840            return Ok(project.connection_ids());
 841        }
 842
 843        Err(anyhow!("no such worktree"))?
 844    }
 845
 846    pub fn start_language_server(
 847        &mut self,
 848        project_id: ProjectId,
 849        connection_id: ConnectionId,
 850        language_server: proto::LanguageServer,
 851    ) -> Result<Vec<ConnectionId>> {
 852        let project = self
 853            .projects
 854            .get_mut(&project_id)
 855            .ok_or_else(|| anyhow!("no such project"))?;
 856        if project.host_connection_id == connection_id {
 857            project.language_servers.push(language_server);
 858            return Ok(project.connection_ids());
 859        }
 860
 861        Err(anyhow!("no such project"))?
 862    }
 863
 864    pub fn join_project(
 865        &mut self,
 866        requester_connection_id: ConnectionId,
 867        project_id: ProjectId,
 868    ) -> Result<(&Project, ReplicaId)> {
 869        let connection = self
 870            .connections
 871            .get_mut(&requester_connection_id)
 872            .ok_or_else(|| anyhow!("no such connection"))?;
 873        let user = self
 874            .connected_users
 875            .get(&connection.user_id)
 876            .ok_or_else(|| anyhow!("no such connection"))?;
 877        let active_call = user.active_call.ok_or_else(|| anyhow!("no such project"))?;
 878        anyhow::ensure!(
 879            active_call.connection_id == Some(requester_connection_id),
 880            "no such project"
 881        );
 882
 883        let project = self
 884            .projects
 885            .get_mut(&project_id)
 886            .ok_or_else(|| anyhow!("no such project"))?;
 887        anyhow::ensure!(project.room_id == active_call.room_id, "no such project");
 888
 889        connection.projects.insert(project_id);
 890        let mut replica_id = 1;
 891        while project.active_replica_ids.contains(&replica_id) {
 892            replica_id += 1;
 893        }
 894        project.active_replica_ids.insert(replica_id);
 895        project.guests.insert(
 896            requester_connection_id,
 897            Collaborator {
 898                replica_id,
 899                user_id: connection.user_id,
 900                admin: connection.admin,
 901            },
 902        );
 903
 904        Ok((project, replica_id))
 905    }
 906
 907    pub fn leave_project(
 908        &mut self,
 909        project_id: ProjectId,
 910        connection_id: ConnectionId,
 911    ) -> Result<LeftProject> {
 912        let project = self
 913            .projects
 914            .get_mut(&project_id)
 915            .ok_or_else(|| anyhow!("no such project"))?;
 916
 917        // If the connection leaving the project is a collaborator, remove it.
 918        let remove_collaborator = if let Some(guest) = project.guests.remove(&connection_id) {
 919            project.active_replica_ids.remove(&guest.replica_id);
 920            true
 921        } else {
 922            false
 923        };
 924
 925        if let Some(connection) = self.connections.get_mut(&connection_id) {
 926            connection.projects.remove(&project_id);
 927        }
 928
 929        Ok(LeftProject {
 930            id: project.id,
 931            host_connection_id: project.host_connection_id,
 932            host_user_id: project.host.user_id,
 933            connection_ids: project.connection_ids(),
 934            remove_collaborator,
 935        })
 936    }
 937
 938    #[allow(clippy::too_many_arguments)]
 939    pub fn update_worktree(
 940        &mut self,
 941        connection_id: ConnectionId,
 942        project_id: ProjectId,
 943        worktree_id: u64,
 944        worktree_root_name: &str,
 945        removed_entries: &[u64],
 946        updated_entries: &[proto::Entry],
 947        scan_id: u64,
 948        is_last_update: bool,
 949    ) -> Result<Vec<ConnectionId>> {
 950        let project = self.write_project(project_id, connection_id)?;
 951
 952        let connection_ids = project.connection_ids();
 953        let mut worktree = project.worktrees.entry(worktree_id).or_default();
 954        worktree.root_name = worktree_root_name.to_string();
 955
 956        for entry_id in removed_entries {
 957            worktree.entries.remove(entry_id);
 958        }
 959
 960        for entry in updated_entries {
 961            worktree.entries.insert(entry.id, entry.clone());
 962        }
 963
 964        worktree.scan_id = scan_id;
 965        worktree.is_complete = is_last_update;
 966        Ok(connection_ids)
 967    }
 968
 969    fn build_participant_project(
 970        project_id: ProjectId,
 971        projects: &BTreeMap<ProjectId, Project>,
 972    ) -> Option<proto::ParticipantProject> {
 973        Some(proto::ParticipantProject {
 974            id: project_id.to_proto(),
 975            worktree_root_names: projects
 976                .get(&project_id)?
 977                .worktrees
 978                .values()
 979                .filter(|worktree| worktree.visible)
 980                .map(|worktree| worktree.root_name.clone())
 981                .collect(),
 982        })
 983    }
 984
 985    pub fn project_connection_ids(
 986        &self,
 987        project_id: ProjectId,
 988        acting_connection_id: ConnectionId,
 989    ) -> Result<Vec<ConnectionId>> {
 990        Ok(self
 991            .read_project(project_id, acting_connection_id)?
 992            .connection_ids())
 993    }
 994
 995    pub fn project(&self, project_id: ProjectId) -> Result<&Project> {
 996        self.projects
 997            .get(&project_id)
 998            .ok_or_else(|| anyhow!("no such project"))
 999    }
1000
1001    pub fn read_project(
1002        &self,
1003        project_id: ProjectId,
1004        connection_id: ConnectionId,
1005    ) -> Result<&Project> {
1006        let project = self
1007            .projects
1008            .get(&project_id)
1009            .ok_or_else(|| anyhow!("no such project"))?;
1010        if project.host_connection_id == connection_id
1011            || project.guests.contains_key(&connection_id)
1012        {
1013            Ok(project)
1014        } else {
1015            Err(anyhow!("no such project"))?
1016        }
1017    }
1018
1019    fn write_project(
1020        &mut self,
1021        project_id: ProjectId,
1022        connection_id: ConnectionId,
1023    ) -> Result<&mut Project> {
1024        let project = self
1025            .projects
1026            .get_mut(&project_id)
1027            .ok_or_else(|| anyhow!("no such project"))?;
1028        if project.host_connection_id == connection_id
1029            || project.guests.contains_key(&connection_id)
1030        {
1031            Ok(project)
1032        } else {
1033            Err(anyhow!("no such project"))?
1034        }
1035    }
1036
1037    #[cfg(test)]
1038    pub fn check_invariants(&self) {
1039        for (connection_id, connection) in &self.connections {
1040            for project_id in &connection.projects {
1041                let project = &self.projects.get(project_id).unwrap();
1042                if project.host_connection_id != *connection_id {
1043                    assert!(project.guests.contains_key(connection_id));
1044                }
1045
1046                for (worktree_id, worktree) in project.worktrees.iter() {
1047                    let mut paths = HashMap::default();
1048                    for entry in worktree.entries.values() {
1049                        let prev_entry = paths.insert(&entry.path, entry);
1050                        assert_eq!(
1051                            prev_entry,
1052                            None,
1053                            "worktree {:?}, duplicate path for entries {:?} and {:?}",
1054                            worktree_id,
1055                            prev_entry.unwrap(),
1056                            entry
1057                        );
1058                    }
1059                }
1060            }
1061
1062            assert!(self
1063                .connected_users
1064                .get(&connection.user_id)
1065                .unwrap()
1066                .connection_ids
1067                .contains(connection_id));
1068        }
1069
1070        for (user_id, state) in &self.connected_users {
1071            for connection_id in &state.connection_ids {
1072                assert_eq!(
1073                    self.connections.get(connection_id).unwrap().user_id,
1074                    *user_id
1075                );
1076            }
1077
1078            if let Some(active_call) = state.active_call.as_ref() {
1079                if let Some(active_call_connection_id) = active_call.connection_id {
1080                    assert!(
1081                        state.connection_ids.contains(&active_call_connection_id),
1082                        "call is active on a dead connection"
1083                    );
1084                    assert!(
1085                        state.connection_ids.contains(&active_call_connection_id),
1086                        "call is active on a dead connection"
1087                    );
1088                }
1089            }
1090        }
1091
1092        for (room_id, room) in &self.rooms {
1093            for pending_user_id in &room.pending_participant_user_ids {
1094                assert!(
1095                    self.connected_users
1096                        .contains_key(&UserId::from_proto(*pending_user_id)),
1097                    "call is active on a user that has disconnected"
1098                );
1099            }
1100
1101            for participant in &room.participants {
1102                assert!(
1103                    self.connections
1104                        .contains_key(&ConnectionId(participant.peer_id)),
1105                    "room {} contains participant {:?} that has disconnected",
1106                    room_id,
1107                    participant
1108                );
1109
1110                for participant_project in &participant.projects {
1111                    let project = &self.projects[&ProjectId::from_proto(participant_project.id)];
1112                    assert_eq!(
1113                        project.room_id, *room_id,
1114                        "project was shared on a different room"
1115                    );
1116                }
1117            }
1118
1119            assert!(
1120                !room.pending_participant_user_ids.is_empty() || !room.participants.is_empty(),
1121                "room can't be empty"
1122            );
1123        }
1124
1125        for (project_id, project) in &self.projects {
1126            let host_connection = self.connections.get(&project.host_connection_id).unwrap();
1127            assert!(host_connection.projects.contains(project_id));
1128
1129            for guest_connection_id in project.guests.keys() {
1130                let guest_connection = self.connections.get(guest_connection_id).unwrap();
1131                assert!(guest_connection.projects.contains(project_id));
1132            }
1133            assert_eq!(project.active_replica_ids.len(), project.guests.len());
1134            assert_eq!(
1135                project.active_replica_ids,
1136                project
1137                    .guests
1138                    .values()
1139                    .map(|guest| guest.replica_id)
1140                    .collect::<HashSet<_>>(),
1141            );
1142
1143            let room = &self.rooms[&project.room_id];
1144            let room_participant = room
1145                .participants
1146                .iter()
1147                .find(|participant| participant.peer_id == project.host_connection_id.0)
1148                .unwrap();
1149            assert!(
1150                room_participant
1151                    .projects
1152                    .iter()
1153                    .any(|project| project.id == project_id.to_proto()),
1154                "project was not shared in room"
1155            );
1156        }
1157    }
1158}
1159
1160impl Project {
1161    pub fn guest_connection_ids(&self) -> Vec<ConnectionId> {
1162        self.guests.keys().copied().collect()
1163    }
1164
1165    pub fn connection_ids(&self) -> Vec<ConnectionId> {
1166        self.guests
1167            .keys()
1168            .copied()
1169            .chain(Some(self.host_connection_id))
1170            .collect()
1171    }
1172}