store.rs

   1use crate::db::{self, ProjectId, UserId};
   2use anyhow::{anyhow, Result};
   3use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet};
   4use nanoid::nanoid;
   5use rpc::{proto, ConnectionId};
   6use serde::Serialize;
   7use std::{borrow::Cow, mem, path::PathBuf, str};
   8use tracing::instrument;
   9use util::post_inc;
  10
  11pub type RoomId = u64;
  12
  13#[derive(Default, Serialize)]
  14pub struct Store {
  15    connections: BTreeMap<ConnectionId, ConnectionState>,
  16    connected_users: BTreeMap<UserId, ConnectedUser>,
  17    next_room_id: RoomId,
  18    rooms: BTreeMap<RoomId, proto::Room>,
  19    projects: BTreeMap<ProjectId, Project>,
  20}
  21
  22#[derive(Default, Serialize)]
  23struct ConnectedUser {
  24    connection_ids: HashSet<ConnectionId>,
  25    active_call: Option<Call>,
  26}
  27
  28#[derive(Serialize)]
  29struct ConnectionState {
  30    user_id: UserId,
  31    admin: bool,
  32    projects: BTreeSet<ProjectId>,
  33}
  34
  35#[derive(Copy, Clone, Eq, PartialEq, Serialize)]
  36pub struct Call {
  37    pub caller_user_id: UserId,
  38    pub room_id: RoomId,
  39    pub connection_id: Option<ConnectionId>,
  40    pub initial_project_id: Option<ProjectId>,
  41}
  42
  43#[derive(Serialize)]
  44pub struct Project {
  45    pub id: ProjectId,
  46    pub room_id: RoomId,
  47    pub host_connection_id: ConnectionId,
  48    pub host: Collaborator,
  49    pub guests: HashMap<ConnectionId, Collaborator>,
  50    pub active_replica_ids: HashSet<ReplicaId>,
  51    pub worktrees: BTreeMap<u64, Worktree>,
  52    pub language_servers: Vec<proto::LanguageServer>,
  53}
  54
  55#[derive(Serialize)]
  56pub struct Collaborator {
  57    pub replica_id: ReplicaId,
  58    pub user_id: UserId,
  59    pub admin: bool,
  60}
  61
  62#[derive(Default, Serialize)]
  63pub struct Worktree {
  64    pub abs_path: Vec<u8>,
  65    pub root_name: String,
  66    pub visible: bool,
  67    #[serde(skip)]
  68    pub entries: BTreeMap<u64, proto::Entry>,
  69    #[serde(skip)]
  70    pub diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
  71    pub scan_id: u64,
  72    pub is_complete: bool,
  73}
  74
  75pub type ReplicaId = u16;
  76
  77#[derive(Default)]
  78pub struct RemovedConnectionState<'a> {
  79    pub user_id: UserId,
  80    pub hosted_projects: Vec<Project>,
  81    pub guest_projects: Vec<LeftProject>,
  82    pub contact_ids: HashSet<UserId>,
  83    pub room: Option<Cow<'a, proto::Room>>,
  84    pub canceled_call_connection_ids: Vec<ConnectionId>,
  85}
  86
  87pub struct LeftProject {
  88    pub id: ProjectId,
  89    pub host_user_id: UserId,
  90    pub host_connection_id: ConnectionId,
  91    pub connection_ids: Vec<ConnectionId>,
  92    pub remove_collaborator: bool,
  93}
  94
  95pub struct LeftRoom<'a> {
  96    pub room: Cow<'a, proto::Room>,
  97    pub unshared_projects: Vec<Project>,
  98    pub left_projects: Vec<LeftProject>,
  99    pub canceled_call_connection_ids: Vec<ConnectionId>,
 100}
 101
 102#[derive(Copy, Clone)]
 103pub struct Metrics {
 104    pub connections: usize,
 105    pub shared_projects: usize,
 106}
 107
 108impl Store {
 109    pub fn metrics(&self) -> Metrics {
 110        let connections = self.connections.values().filter(|c| !c.admin).count();
 111        let mut shared_projects = 0;
 112        for project in self.projects.values() {
 113            if let Some(connection) = self.connections.get(&project.host_connection_id) {
 114                if !connection.admin {
 115                    shared_projects += 1;
 116                }
 117            }
 118        }
 119
 120        Metrics {
 121            connections,
 122            shared_projects,
 123        }
 124    }
 125
 126    #[instrument(skip(self))]
 127    pub fn add_connection(
 128        &mut self,
 129        connection_id: ConnectionId,
 130        user_id: UserId,
 131        admin: bool,
 132    ) -> Option<proto::IncomingCall> {
 133        self.connections.insert(
 134            connection_id,
 135            ConnectionState {
 136                user_id,
 137                admin,
 138                projects: Default::default(),
 139            },
 140        );
 141        let connected_user = self.connected_users.entry(user_id).or_default();
 142        connected_user.connection_ids.insert(connection_id);
 143        if let Some(active_call) = connected_user.active_call {
 144            if active_call.connection_id.is_some() {
 145                None
 146            } else {
 147                let room = self.room(active_call.room_id)?;
 148                Some(proto::IncomingCall {
 149                    room_id: active_call.room_id,
 150                    caller_user_id: active_call.caller_user_id.to_proto(),
 151                    participant_user_ids: room
 152                        .participants
 153                        .iter()
 154                        .map(|participant| participant.user_id)
 155                        .collect(),
 156                    initial_project: active_call
 157                        .initial_project_id
 158                        .and_then(|id| Self::build_participant_project(id, &self.projects)),
 159                })
 160            }
 161        } else {
 162            None
 163        }
 164    }
 165
 166    #[instrument(skip(self))]
 167    pub fn remove_connection(
 168        &mut self,
 169        connection_id: ConnectionId,
 170    ) -> Result<RemovedConnectionState> {
 171        let connection = self
 172            .connections
 173            .get_mut(&connection_id)
 174            .ok_or_else(|| anyhow!("no such connection"))?;
 175
 176        let user_id = connection.user_id;
 177
 178        let mut result = RemovedConnectionState {
 179            user_id,
 180            ..Default::default()
 181        };
 182
 183        let connected_user = self.connected_users.get(&user_id).unwrap();
 184        if let Some(active_call) = connected_user.active_call.as_ref() {
 185            let room_id = active_call.room_id;
 186            if active_call.connection_id == Some(connection_id) {
 187                let left_room = self.leave_room(room_id, connection_id)?;
 188                result.hosted_projects = left_room.unshared_projects;
 189                result.guest_projects = left_room.left_projects;
 190                result.room = Some(Cow::Owned(left_room.room.into_owned()));
 191                result.canceled_call_connection_ids = left_room.canceled_call_connection_ids;
 192            } else if connected_user.connection_ids.len() == 1 {
 193                let (room, _) = self.decline_call(room_id, connection_id)?;
 194                result.room = Some(Cow::Owned(room.clone()));
 195            }
 196        }
 197
 198        let connected_user = self.connected_users.get_mut(&user_id).unwrap();
 199        connected_user.connection_ids.remove(&connection_id);
 200        if connected_user.connection_ids.is_empty() {
 201            self.connected_users.remove(&user_id);
 202        }
 203        self.connections.remove(&connection_id).unwrap();
 204
 205        Ok(result)
 206    }
 207
 208    pub fn user_id_for_connection(&self, connection_id: ConnectionId) -> Result<UserId> {
 209        Ok(self
 210            .connections
 211            .get(&connection_id)
 212            .ok_or_else(|| anyhow!("unknown connection"))?
 213            .user_id)
 214    }
 215
 216    pub fn connection_ids_for_user(
 217        &self,
 218        user_id: UserId,
 219    ) -> impl Iterator<Item = ConnectionId> + '_ {
 220        self.connected_users
 221            .get(&user_id)
 222            .into_iter()
 223            .map(|state| &state.connection_ids)
 224            .flatten()
 225            .copied()
 226    }
 227
 228    pub fn is_user_online(&self, user_id: UserId) -> bool {
 229        !self
 230            .connected_users
 231            .get(&user_id)
 232            .unwrap_or(&Default::default())
 233            .connection_ids
 234            .is_empty()
 235    }
 236
 237    fn is_user_busy(&self, user_id: UserId) -> bool {
 238        self.connected_users
 239            .get(&user_id)
 240            .unwrap_or(&Default::default())
 241            .active_call
 242            .is_some()
 243    }
 244
 245    pub fn build_initial_contacts_update(
 246        &self,
 247        contacts: Vec<db::Contact>,
 248    ) -> proto::UpdateContacts {
 249        let mut update = proto::UpdateContacts::default();
 250
 251        for contact in contacts {
 252            match contact {
 253                db::Contact::Accepted {
 254                    user_id,
 255                    should_notify,
 256                } => {
 257                    update
 258                        .contacts
 259                        .push(self.contact_for_user(user_id, should_notify));
 260                }
 261                db::Contact::Outgoing { user_id } => {
 262                    update.outgoing_requests.push(user_id.to_proto())
 263                }
 264                db::Contact::Incoming {
 265                    user_id,
 266                    should_notify,
 267                } => update
 268                    .incoming_requests
 269                    .push(proto::IncomingContactRequest {
 270                        requester_id: user_id.to_proto(),
 271                        should_notify,
 272                    }),
 273            }
 274        }
 275
 276        update
 277    }
 278
 279    pub fn contact_for_user(&self, user_id: UserId, should_notify: bool) -> proto::Contact {
 280        proto::Contact {
 281            user_id: user_id.to_proto(),
 282            online: self.is_user_online(user_id),
 283            busy: self.is_user_busy(user_id),
 284            should_notify,
 285        }
 286    }
 287
 288    pub fn create_room(&mut self, creator_connection_id: ConnectionId) -> Result<&proto::Room> {
 289        let connection = self
 290            .connections
 291            .get_mut(&creator_connection_id)
 292            .ok_or_else(|| anyhow!("no such connection"))?;
 293        let connected_user = self
 294            .connected_users
 295            .get_mut(&connection.user_id)
 296            .ok_or_else(|| anyhow!("no such connection"))?;
 297        anyhow::ensure!(
 298            connected_user.active_call.is_none(),
 299            "can't create a room with an active call"
 300        );
 301
 302        let room_id = post_inc(&mut self.next_room_id);
 303        let room = proto::Room {
 304            id: room_id,
 305            participants: vec![proto::Participant {
 306                user_id: connection.user_id.to_proto(),
 307                peer_id: creator_connection_id.0,
 308                projects: Default::default(),
 309                location: Some(proto::ParticipantLocation {
 310                    variant: Some(proto::participant_location::Variant::External(
 311                        proto::participant_location::External {},
 312                    )),
 313                }),
 314            }],
 315            pending_participant_user_ids: Default::default(),
 316            live_kit_room: nanoid!(30),
 317        };
 318
 319        self.rooms.insert(room_id, room);
 320        connected_user.active_call = Some(Call {
 321            caller_user_id: connection.user_id,
 322            room_id,
 323            connection_id: Some(creator_connection_id),
 324            initial_project_id: None,
 325        });
 326        Ok(self.rooms.get(&room_id).unwrap())
 327    }
 328
 329    pub fn join_room(
 330        &mut self,
 331        room_id: RoomId,
 332        connection_id: ConnectionId,
 333    ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
 334        let connection = self
 335            .connections
 336            .get_mut(&connection_id)
 337            .ok_or_else(|| anyhow!("no such connection"))?;
 338        let user_id = connection.user_id;
 339        let recipient_connection_ids = self.connection_ids_for_user(user_id).collect::<Vec<_>>();
 340
 341        let connected_user = self
 342            .connected_users
 343            .get_mut(&user_id)
 344            .ok_or_else(|| anyhow!("no such connection"))?;
 345        let active_call = connected_user
 346            .active_call
 347            .as_mut()
 348            .ok_or_else(|| anyhow!("not being called"))?;
 349        anyhow::ensure!(
 350            active_call.room_id == room_id && active_call.connection_id.is_none(),
 351            "not being called on this room"
 352        );
 353
 354        let room = self
 355            .rooms
 356            .get_mut(&room_id)
 357            .ok_or_else(|| anyhow!("no such room"))?;
 358        anyhow::ensure!(
 359            room.pending_participant_user_ids
 360                .contains(&user_id.to_proto()),
 361            anyhow!("no such room")
 362        );
 363        room.pending_participant_user_ids
 364            .retain(|pending| *pending != user_id.to_proto());
 365        room.participants.push(proto::Participant {
 366            user_id: user_id.to_proto(),
 367            peer_id: connection_id.0,
 368            projects: Default::default(),
 369            location: Some(proto::ParticipantLocation {
 370                variant: Some(proto::participant_location::Variant::External(
 371                    proto::participant_location::External {},
 372                )),
 373            }),
 374        });
 375        active_call.connection_id = Some(connection_id);
 376
 377        Ok((room, recipient_connection_ids))
 378    }
 379
 380    pub fn leave_room(&mut self, room_id: RoomId, connection_id: ConnectionId) -> Result<LeftRoom> {
 381        let connection = self
 382            .connections
 383            .get_mut(&connection_id)
 384            .ok_or_else(|| anyhow!("no such connection"))?;
 385        let user_id = connection.user_id;
 386
 387        let connected_user = self
 388            .connected_users
 389            .get(&user_id)
 390            .ok_or_else(|| anyhow!("no such connection"))?;
 391        anyhow::ensure!(
 392            connected_user
 393                .active_call
 394                .map_or(false, |call| call.room_id == room_id
 395                    && call.connection_id == Some(connection_id)),
 396            "cannot leave a room before joining it"
 397        );
 398
 399        // Given that users can only join one room at a time, we can safely unshare
 400        // and leave all projects associated with the connection.
 401        let mut unshared_projects = Vec::new();
 402        let mut left_projects = Vec::new();
 403        for project_id in connection.projects.clone() {
 404            if let Ok((_, project)) = self.unshare_project(project_id, connection_id) {
 405                unshared_projects.push(project);
 406            } else if let Ok(project) = self.leave_project(project_id, connection_id) {
 407                left_projects.push(project);
 408            }
 409        }
 410        self.connected_users.get_mut(&user_id).unwrap().active_call = None;
 411
 412        let room = self
 413            .rooms
 414            .get_mut(&room_id)
 415            .ok_or_else(|| anyhow!("no such room"))?;
 416        room.participants
 417            .retain(|participant| participant.peer_id != connection_id.0);
 418
 419        let mut canceled_call_connection_ids = Vec::new();
 420        room.pending_participant_user_ids
 421            .retain(|pending_participant_user_id| {
 422                if let Some(connected_user) = self
 423                    .connected_users
 424                    .get_mut(&UserId::from_proto(*pending_participant_user_id))
 425                {
 426                    if let Some(call) = connected_user.active_call.as_ref() {
 427                        if call.caller_user_id == user_id {
 428                            connected_user.active_call.take();
 429                            canceled_call_connection_ids
 430                                .extend(connected_user.connection_ids.iter().copied());
 431                            false
 432                        } else {
 433                            true
 434                        }
 435                    } else {
 436                        true
 437                    }
 438                } else {
 439                    true
 440                }
 441            });
 442
 443        let room = if room.participants.is_empty() {
 444            Cow::Owned(self.rooms.remove(&room_id).unwrap())
 445        } else {
 446            Cow::Borrowed(self.rooms.get(&room_id).unwrap())
 447        };
 448
 449        Ok(LeftRoom {
 450            room,
 451            unshared_projects,
 452            left_projects,
 453            canceled_call_connection_ids,
 454        })
 455    }
 456
 457    pub fn room(&self, room_id: RoomId) -> Option<&proto::Room> {
 458        self.rooms.get(&room_id)
 459    }
 460
 461    pub fn rooms(&self) -> &BTreeMap<RoomId, proto::Room> {
 462        &self.rooms
 463    }
 464
 465    pub fn call(
 466        &mut self,
 467        room_id: RoomId,
 468        recipient_user_id: UserId,
 469        initial_project_id: Option<ProjectId>,
 470        from_connection_id: ConnectionId,
 471    ) -> Result<(&proto::Room, Vec<ConnectionId>, proto::IncomingCall)> {
 472        let caller_user_id = self.user_id_for_connection(from_connection_id)?;
 473
 474        let recipient_connection_ids = self
 475            .connection_ids_for_user(recipient_user_id)
 476            .collect::<Vec<_>>();
 477        let mut recipient = self
 478            .connected_users
 479            .get_mut(&recipient_user_id)
 480            .ok_or_else(|| anyhow!("no such connection"))?;
 481        anyhow::ensure!(
 482            recipient.active_call.is_none(),
 483            "recipient is already on another call"
 484        );
 485
 486        let room = self
 487            .rooms
 488            .get_mut(&room_id)
 489            .ok_or_else(|| anyhow!("no such room"))?;
 490        anyhow::ensure!(
 491            room.participants
 492                .iter()
 493                .any(|participant| participant.peer_id == from_connection_id.0),
 494            "no such room"
 495        );
 496        anyhow::ensure!(
 497            room.pending_participant_user_ids
 498                .iter()
 499                .all(|user_id| UserId::from_proto(*user_id) != recipient_user_id),
 500            "cannot call the same user more than once"
 501        );
 502        room.pending_participant_user_ids
 503            .push(recipient_user_id.to_proto());
 504
 505        if let Some(initial_project_id) = initial_project_id {
 506            let project = self
 507                .projects
 508                .get(&initial_project_id)
 509                .ok_or_else(|| anyhow!("no such project"))?;
 510            anyhow::ensure!(project.room_id == room_id, "no such project");
 511        }
 512
 513        recipient.active_call = Some(Call {
 514            caller_user_id,
 515            room_id,
 516            connection_id: None,
 517            initial_project_id,
 518        });
 519
 520        Ok((
 521            room,
 522            recipient_connection_ids,
 523            proto::IncomingCall {
 524                room_id,
 525                caller_user_id: caller_user_id.to_proto(),
 526                participant_user_ids: room
 527                    .participants
 528                    .iter()
 529                    .map(|participant| participant.user_id)
 530                    .collect(),
 531                initial_project: initial_project_id
 532                    .and_then(|id| Self::build_participant_project(id, &self.projects)),
 533            },
 534        ))
 535    }
 536
 537    pub fn call_failed(&mut self, room_id: RoomId, to_user_id: UserId) -> Result<&proto::Room> {
 538        let mut recipient = self
 539            .connected_users
 540            .get_mut(&to_user_id)
 541            .ok_or_else(|| anyhow!("no such connection"))?;
 542        anyhow::ensure!(recipient
 543            .active_call
 544            .map_or(false, |call| call.room_id == room_id
 545                && call.connection_id.is_none()));
 546        recipient.active_call = None;
 547        let room = self
 548            .rooms
 549            .get_mut(&room_id)
 550            .ok_or_else(|| anyhow!("no such room"))?;
 551        room.pending_participant_user_ids
 552            .retain(|user_id| UserId::from_proto(*user_id) != to_user_id);
 553        Ok(room)
 554    }
 555
 556    pub fn cancel_call(
 557        &mut self,
 558        room_id: RoomId,
 559        recipient_user_id: UserId,
 560        canceller_connection_id: ConnectionId,
 561    ) -> Result<(&proto::Room, HashSet<ConnectionId>)> {
 562        let canceller_user_id = self.user_id_for_connection(canceller_connection_id)?;
 563        let canceller = self
 564            .connected_users
 565            .get(&canceller_user_id)
 566            .ok_or_else(|| anyhow!("no such connection"))?;
 567        let recipient = self
 568            .connected_users
 569            .get(&recipient_user_id)
 570            .ok_or_else(|| anyhow!("no such connection"))?;
 571        let canceller_active_call = canceller
 572            .active_call
 573            .as_ref()
 574            .ok_or_else(|| anyhow!("no active call"))?;
 575        let recipient_active_call = recipient
 576            .active_call
 577            .as_ref()
 578            .ok_or_else(|| anyhow!("no active call for recipient"))?;
 579
 580        anyhow::ensure!(
 581            canceller_active_call.room_id == room_id,
 582            "users are on different calls"
 583        );
 584        anyhow::ensure!(
 585            recipient_active_call.room_id == room_id,
 586            "users are on different calls"
 587        );
 588        anyhow::ensure!(
 589            recipient_active_call.connection_id.is_none(),
 590            "recipient has already answered"
 591        );
 592        let room_id = recipient_active_call.room_id;
 593        let room = self
 594            .rooms
 595            .get_mut(&room_id)
 596            .ok_or_else(|| anyhow!("no such room"))?;
 597        room.pending_participant_user_ids
 598            .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
 599
 600        let recipient = self.connected_users.get_mut(&recipient_user_id).unwrap();
 601        recipient.active_call.take();
 602
 603        Ok((room, recipient.connection_ids.clone()))
 604    }
 605
 606    pub fn decline_call(
 607        &mut self,
 608        room_id: RoomId,
 609        recipient_connection_id: ConnectionId,
 610    ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
 611        let recipient_user_id = self.user_id_for_connection(recipient_connection_id)?;
 612        let recipient = self
 613            .connected_users
 614            .get_mut(&recipient_user_id)
 615            .ok_or_else(|| anyhow!("no such connection"))?;
 616        if let Some(active_call) = recipient.active_call {
 617            anyhow::ensure!(active_call.room_id == room_id, "no such room");
 618            anyhow::ensure!(
 619                active_call.connection_id.is_none(),
 620                "cannot decline a call after joining room"
 621            );
 622            recipient.active_call.take();
 623            let recipient_connection_ids = self
 624                .connection_ids_for_user(recipient_user_id)
 625                .collect::<Vec<_>>();
 626            let room = self
 627                .rooms
 628                .get_mut(&active_call.room_id)
 629                .ok_or_else(|| anyhow!("no such room"))?;
 630            room.pending_participant_user_ids
 631                .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
 632            Ok((room, recipient_connection_ids))
 633        } else {
 634            Err(anyhow!("user is not being called"))
 635        }
 636    }
 637
 638    pub fn update_participant_location(
 639        &mut self,
 640        room_id: RoomId,
 641        location: proto::ParticipantLocation,
 642        connection_id: ConnectionId,
 643    ) -> Result<&proto::Room> {
 644        let room = self
 645            .rooms
 646            .get_mut(&room_id)
 647            .ok_or_else(|| anyhow!("no such room"))?;
 648        if let Some(proto::participant_location::Variant::SharedProject(project)) =
 649            location.variant.as_ref()
 650        {
 651            anyhow::ensure!(
 652                room.participants
 653                    .iter()
 654                    .flat_map(|participant| &participant.projects)
 655                    .any(|participant_project| participant_project.id == project.id),
 656                "no such project"
 657            );
 658        }
 659
 660        let participant = room
 661            .participants
 662            .iter_mut()
 663            .find(|participant| participant.peer_id == connection_id.0)
 664            .ok_or_else(|| anyhow!("no such room"))?;
 665        participant.location = Some(location);
 666
 667        Ok(room)
 668    }
 669
 670    pub fn share_project(
 671        &mut self,
 672        room_id: RoomId,
 673        project_id: ProjectId,
 674        worktrees: Vec<proto::WorktreeMetadata>,
 675        host_connection_id: ConnectionId,
 676    ) -> Result<&proto::Room> {
 677        let connection = self
 678            .connections
 679            .get_mut(&host_connection_id)
 680            .ok_or_else(|| anyhow!("no such connection"))?;
 681
 682        let room = self
 683            .rooms
 684            .get_mut(&room_id)
 685            .ok_or_else(|| anyhow!("no such room"))?;
 686        let participant = room
 687            .participants
 688            .iter_mut()
 689            .find(|participant| participant.peer_id == host_connection_id.0)
 690            .ok_or_else(|| anyhow!("no such room"))?;
 691
 692        connection.projects.insert(project_id);
 693        self.projects.insert(
 694            project_id,
 695            Project {
 696                id: project_id,
 697                room_id,
 698                host_connection_id,
 699                host: Collaborator {
 700                    user_id: connection.user_id,
 701                    replica_id: 0,
 702                    admin: connection.admin,
 703                },
 704                guests: Default::default(),
 705                active_replica_ids: Default::default(),
 706                worktrees: worktrees
 707                    .into_iter()
 708                    .map(|worktree| {
 709                        (
 710                            worktree.id,
 711                            Worktree {
 712                                root_name: worktree.root_name,
 713                                visible: worktree.visible,
 714                                abs_path: worktree.abs_path.clone(),
 715                                entries: Default::default(),
 716                                diagnostic_summaries: Default::default(),
 717                                scan_id: Default::default(),
 718                                is_complete: Default::default(),
 719                            },
 720                        )
 721                    })
 722                    .collect(),
 723                language_servers: Default::default(),
 724            },
 725        );
 726
 727        participant
 728            .projects
 729            .extend(Self::build_participant_project(project_id, &self.projects));
 730
 731        Ok(room)
 732    }
 733
 734    pub fn unshare_project(
 735        &mut self,
 736        project_id: ProjectId,
 737        connection_id: ConnectionId,
 738    ) -> Result<(&proto::Room, Project)> {
 739        match self.projects.entry(project_id) {
 740            btree_map::Entry::Occupied(e) => {
 741                if e.get().host_connection_id == connection_id {
 742                    let project = e.remove();
 743
 744                    if let Some(host_connection) = self.connections.get_mut(&connection_id) {
 745                        host_connection.projects.remove(&project_id);
 746                    }
 747
 748                    for guest_connection in project.guests.keys() {
 749                        if let Some(connection) = self.connections.get_mut(guest_connection) {
 750                            connection.projects.remove(&project_id);
 751                        }
 752                    }
 753
 754                    let room = self
 755                        .rooms
 756                        .get_mut(&project.room_id)
 757                        .ok_or_else(|| anyhow!("no such room"))?;
 758                    let participant = room
 759                        .participants
 760                        .iter_mut()
 761                        .find(|participant| participant.peer_id == connection_id.0)
 762                        .ok_or_else(|| anyhow!("no such room"))?;
 763                    participant
 764                        .projects
 765                        .retain(|project| project.id != project_id.to_proto());
 766
 767                    Ok((room, project))
 768                } else {
 769                    Err(anyhow!("no such project"))?
 770                }
 771            }
 772            btree_map::Entry::Vacant(_) => Err(anyhow!("no such project"))?,
 773        }
 774    }
 775
 776    pub fn update_project(
 777        &mut self,
 778        project_id: ProjectId,
 779        worktrees: &[proto::WorktreeMetadata],
 780        connection_id: ConnectionId,
 781    ) -> Result<&proto::Room> {
 782        let project = self
 783            .projects
 784            .get_mut(&project_id)
 785            .ok_or_else(|| anyhow!("no such project"))?;
 786        if project.host_connection_id == connection_id {
 787            let mut old_worktrees = mem::take(&mut project.worktrees);
 788            for worktree in worktrees {
 789                if let Some(old_worktree) = old_worktrees.remove(&worktree.id) {
 790                    project.worktrees.insert(worktree.id, old_worktree);
 791                } else {
 792                    project.worktrees.insert(
 793                        worktree.id,
 794                        Worktree {
 795                            root_name: worktree.root_name.clone(),
 796                            visible: worktree.visible,
 797                            abs_path: worktree.abs_path.clone(),
 798                            entries: Default::default(),
 799                            diagnostic_summaries: Default::default(),
 800                            scan_id: Default::default(),
 801                            is_complete: false,
 802                        },
 803                    );
 804                }
 805            }
 806
 807            let room = self
 808                .rooms
 809                .get_mut(&project.room_id)
 810                .ok_or_else(|| anyhow!("no such room"))?;
 811            let participant_project = room
 812                .participants
 813                .iter_mut()
 814                .flat_map(|participant| &mut participant.projects)
 815                .find(|project| project.id == project_id.to_proto())
 816                .ok_or_else(|| anyhow!("no such project"))?;
 817            participant_project.worktree_root_names = worktrees
 818                .iter()
 819                .filter(|worktree| worktree.visible)
 820                .map(|worktree| worktree.root_name.clone())
 821                .collect();
 822
 823            Ok(room)
 824        } else {
 825            Err(anyhow!("no such project"))?
 826        }
 827    }
 828
 829    pub fn update_diagnostic_summary(
 830        &mut self,
 831        project_id: ProjectId,
 832        worktree_id: u64,
 833        connection_id: ConnectionId,
 834        summary: proto::DiagnosticSummary,
 835    ) -> Result<Vec<ConnectionId>> {
 836        let project = self
 837            .projects
 838            .get_mut(&project_id)
 839            .ok_or_else(|| anyhow!("no such project"))?;
 840        if project.host_connection_id == connection_id {
 841            let worktree = project
 842                .worktrees
 843                .get_mut(&worktree_id)
 844                .ok_or_else(|| anyhow!("no such worktree"))?;
 845            worktree
 846                .diagnostic_summaries
 847                .insert(summary.path.clone().into(), summary);
 848            return Ok(project.connection_ids());
 849        }
 850
 851        Err(anyhow!("no such worktree"))?
 852    }
 853
 854    pub fn start_language_server(
 855        &mut self,
 856        project_id: ProjectId,
 857        connection_id: ConnectionId,
 858        language_server: proto::LanguageServer,
 859    ) -> Result<Vec<ConnectionId>> {
 860        let project = self
 861            .projects
 862            .get_mut(&project_id)
 863            .ok_or_else(|| anyhow!("no such project"))?;
 864        if project.host_connection_id == connection_id {
 865            project.language_servers.push(language_server);
 866            return Ok(project.connection_ids());
 867        }
 868
 869        Err(anyhow!("no such project"))?
 870    }
 871
 872    pub fn join_project(
 873        &mut self,
 874        requester_connection_id: ConnectionId,
 875        project_id: ProjectId,
 876    ) -> Result<(&Project, ReplicaId)> {
 877        let connection = self
 878            .connections
 879            .get_mut(&requester_connection_id)
 880            .ok_or_else(|| anyhow!("no such connection"))?;
 881        let user = self
 882            .connected_users
 883            .get(&connection.user_id)
 884            .ok_or_else(|| anyhow!("no such connection"))?;
 885        let active_call = user.active_call.ok_or_else(|| anyhow!("no such project"))?;
 886        anyhow::ensure!(
 887            active_call.connection_id == Some(requester_connection_id),
 888            "no such project"
 889        );
 890
 891        let project = self
 892            .projects
 893            .get_mut(&project_id)
 894            .ok_or_else(|| anyhow!("no such project"))?;
 895        anyhow::ensure!(project.room_id == active_call.room_id, "no such project");
 896
 897        connection.projects.insert(project_id);
 898        let mut replica_id = 1;
 899        while project.active_replica_ids.contains(&replica_id) {
 900            replica_id += 1;
 901        }
 902        project.active_replica_ids.insert(replica_id);
 903        project.guests.insert(
 904            requester_connection_id,
 905            Collaborator {
 906                replica_id,
 907                user_id: connection.user_id,
 908                admin: connection.admin,
 909            },
 910        );
 911
 912        Ok((project, replica_id))
 913    }
 914
 915    pub fn leave_project(
 916        &mut self,
 917        project_id: ProjectId,
 918        connection_id: ConnectionId,
 919    ) -> Result<LeftProject> {
 920        let project = self
 921            .projects
 922            .get_mut(&project_id)
 923            .ok_or_else(|| anyhow!("no such project"))?;
 924
 925        // If the connection leaving the project is a collaborator, remove it.
 926        let remove_collaborator = if let Some(guest) = project.guests.remove(&connection_id) {
 927            project.active_replica_ids.remove(&guest.replica_id);
 928            true
 929        } else {
 930            false
 931        };
 932
 933        if let Some(connection) = self.connections.get_mut(&connection_id) {
 934            connection.projects.remove(&project_id);
 935        }
 936
 937        Ok(LeftProject {
 938            id: project.id,
 939            host_connection_id: project.host_connection_id,
 940            host_user_id: project.host.user_id,
 941            connection_ids: project.connection_ids(),
 942            remove_collaborator,
 943        })
 944    }
 945
 946    #[allow(clippy::too_many_arguments)]
 947    pub fn update_worktree(
 948        &mut self,
 949        connection_id: ConnectionId,
 950        project_id: ProjectId,
 951        worktree_id: u64,
 952        worktree_root_name: &str,
 953        worktree_abs_path: &[u8],
 954        removed_entries: &[u64],
 955        updated_entries: &[proto::Entry],
 956        scan_id: u64,
 957        is_last_update: bool,
 958    ) -> Result<Vec<ConnectionId>> {
 959        let project = self.write_project(project_id, connection_id)?;
 960
 961        let connection_ids = project.connection_ids();
 962        let mut worktree = project.worktrees.entry(worktree_id).or_default();
 963        worktree.root_name = worktree_root_name.to_string();
 964        worktree.abs_path = worktree_abs_path.to_vec();
 965
 966        for entry_id in removed_entries {
 967            worktree.entries.remove(entry_id);
 968        }
 969
 970        for entry in updated_entries {
 971            worktree.entries.insert(entry.id, entry.clone());
 972        }
 973
 974        worktree.scan_id = scan_id;
 975        worktree.is_complete = is_last_update;
 976        Ok(connection_ids)
 977    }
 978
 979    fn build_participant_project(
 980        project_id: ProjectId,
 981        projects: &BTreeMap<ProjectId, Project>,
 982    ) -> Option<proto::ParticipantProject> {
 983        Some(proto::ParticipantProject {
 984            id: project_id.to_proto(),
 985            worktree_root_names: projects
 986                .get(&project_id)?
 987                .worktrees
 988                .values()
 989                .filter(|worktree| worktree.visible)
 990                .map(|worktree| worktree.root_name.clone())
 991                .collect(),
 992        })
 993    }
 994
 995    pub fn project_connection_ids(
 996        &self,
 997        project_id: ProjectId,
 998        acting_connection_id: ConnectionId,
 999    ) -> Result<Vec<ConnectionId>> {
1000        Ok(self
1001            .read_project(project_id, acting_connection_id)?
1002            .connection_ids())
1003    }
1004
1005    pub fn project(&self, project_id: ProjectId) -> Result<&Project> {
1006        self.projects
1007            .get(&project_id)
1008            .ok_or_else(|| anyhow!("no such project"))
1009    }
1010
1011    pub fn read_project(
1012        &self,
1013        project_id: ProjectId,
1014        connection_id: ConnectionId,
1015    ) -> Result<&Project> {
1016        let project = self
1017            .projects
1018            .get(&project_id)
1019            .ok_or_else(|| anyhow!("no such project"))?;
1020        if project.host_connection_id == connection_id
1021            || project.guests.contains_key(&connection_id)
1022        {
1023            Ok(project)
1024        } else {
1025            Err(anyhow!("no such project"))?
1026        }
1027    }
1028
1029    fn write_project(
1030        &mut self,
1031        project_id: ProjectId,
1032        connection_id: ConnectionId,
1033    ) -> Result<&mut Project> {
1034        let project = self
1035            .projects
1036            .get_mut(&project_id)
1037            .ok_or_else(|| anyhow!("no such project"))?;
1038        if project.host_connection_id == connection_id
1039            || project.guests.contains_key(&connection_id)
1040        {
1041            Ok(project)
1042        } else {
1043            Err(anyhow!("no such project"))?
1044        }
1045    }
1046
1047    #[cfg(test)]
1048    pub fn check_invariants(&self) {
1049        for (connection_id, connection) in &self.connections {
1050            for project_id in &connection.projects {
1051                let project = &self.projects.get(project_id).unwrap();
1052                if project.host_connection_id != *connection_id {
1053                    assert!(project.guests.contains_key(connection_id));
1054                }
1055
1056                for (worktree_id, worktree) in project.worktrees.iter() {
1057                    let mut paths = HashMap::default();
1058                    for entry in worktree.entries.values() {
1059                        let prev_entry = paths.insert(&entry.path, entry);
1060                        assert_eq!(
1061                            prev_entry,
1062                            None,
1063                            "worktree {:?}, duplicate path for entries {:?} and {:?}",
1064                            worktree_id,
1065                            prev_entry.unwrap(),
1066                            entry
1067                        );
1068                    }
1069                }
1070            }
1071
1072            assert!(self
1073                .connected_users
1074                .get(&connection.user_id)
1075                .unwrap()
1076                .connection_ids
1077                .contains(connection_id));
1078        }
1079
1080        for (user_id, state) in &self.connected_users {
1081            for connection_id in &state.connection_ids {
1082                assert_eq!(
1083                    self.connections.get(connection_id).unwrap().user_id,
1084                    *user_id
1085                );
1086            }
1087
1088            if let Some(active_call) = state.active_call.as_ref() {
1089                if let Some(active_call_connection_id) = active_call.connection_id {
1090                    assert!(
1091                        state.connection_ids.contains(&active_call_connection_id),
1092                        "call is active on a dead connection"
1093                    );
1094                    assert!(
1095                        state.connection_ids.contains(&active_call_connection_id),
1096                        "call is active on a dead connection"
1097                    );
1098                }
1099            }
1100        }
1101
1102        for (room_id, room) in &self.rooms {
1103            for pending_user_id in &room.pending_participant_user_ids {
1104                assert!(
1105                    self.connected_users
1106                        .contains_key(&UserId::from_proto(*pending_user_id)),
1107                    "call is active on a user that has disconnected"
1108                );
1109            }
1110
1111            for participant in &room.participants {
1112                assert!(
1113                    self.connections
1114                        .contains_key(&ConnectionId(participant.peer_id)),
1115                    "room {} contains participant {:?} that has disconnected",
1116                    room_id,
1117                    participant
1118                );
1119
1120                for participant_project in &participant.projects {
1121                    let project = &self.projects[&ProjectId::from_proto(participant_project.id)];
1122                    assert_eq!(
1123                        project.room_id, *room_id,
1124                        "project was shared on a different room"
1125                    );
1126                }
1127            }
1128
1129            assert!(
1130                !room.pending_participant_user_ids.is_empty() || !room.participants.is_empty(),
1131                "room can't be empty"
1132            );
1133        }
1134
1135        for (project_id, project) in &self.projects {
1136            let host_connection = self.connections.get(&project.host_connection_id).unwrap();
1137            assert!(host_connection.projects.contains(project_id));
1138
1139            for guest_connection_id in project.guests.keys() {
1140                let guest_connection = self.connections.get(guest_connection_id).unwrap();
1141                assert!(guest_connection.projects.contains(project_id));
1142            }
1143            assert_eq!(project.active_replica_ids.len(), project.guests.len());
1144            assert_eq!(
1145                project.active_replica_ids,
1146                project
1147                    .guests
1148                    .values()
1149                    .map(|guest| guest.replica_id)
1150                    .collect::<HashSet<_>>(),
1151            );
1152
1153            let room = &self.rooms[&project.room_id];
1154            let room_participant = room
1155                .participants
1156                .iter()
1157                .find(|participant| participant.peer_id == project.host_connection_id.0)
1158                .unwrap();
1159            assert!(
1160                room_participant
1161                    .projects
1162                    .iter()
1163                    .any(|project| project.id == project_id.to_proto()),
1164                "project was not shared in room"
1165            );
1166        }
1167    }
1168}
1169
1170impl Project {
1171    pub fn guest_connection_ids(&self) -> Vec<ConnectionId> {
1172        self.guests.keys().copied().collect()
1173    }
1174
1175    pub fn connection_ids(&self) -> Vec<ConnectionId> {
1176        self.guests
1177            .keys()
1178            .copied()
1179            .chain(Some(self.host_connection_id))
1180            .collect()
1181    }
1182}