store.rs

   1use crate::db::{self, ChannelId, ProjectId, UserId};
   2use anyhow::{anyhow, Result};
   3use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet};
   4use rpc::{proto, ConnectionId};
   5use serde::Serialize;
   6use std::{mem, path::PathBuf, str, time::Duration};
   7use time::OffsetDateTime;
   8use tracing::instrument;
   9use util::post_inc;
  10
  11pub type RoomId = u64;
  12
  13#[derive(Default, Serialize)]
  14pub struct Store {
  15    connections: BTreeMap<ConnectionId, ConnectionState>,
  16    connected_users: BTreeMap<UserId, ConnectedUser>,
  17    next_room_id: RoomId,
  18    rooms: BTreeMap<RoomId, proto::Room>,
  19    projects: BTreeMap<ProjectId, Project>,
  20    #[serde(skip)]
  21    channels: BTreeMap<ChannelId, Channel>,
  22}
  23
  24#[derive(Default, Serialize)]
  25struct ConnectedUser {
  26    connection_ids: HashSet<ConnectionId>,
  27    active_call: Option<Call>,
  28}
  29
  30#[derive(Serialize)]
  31struct ConnectionState {
  32    user_id: UserId,
  33    admin: bool,
  34    projects: BTreeSet<ProjectId>,
  35    channels: HashSet<ChannelId>,
  36}
  37
  38#[derive(Copy, Clone, Eq, PartialEq, Serialize)]
  39pub struct Call {
  40    pub caller_user_id: UserId,
  41    pub room_id: RoomId,
  42    pub connection_id: Option<ConnectionId>,
  43    pub initial_project_id: Option<ProjectId>,
  44}
  45
  46#[derive(Serialize)]
  47pub struct Project {
  48    pub id: ProjectId,
  49    pub room_id: RoomId,
  50    pub host_connection_id: ConnectionId,
  51    pub host: Collaborator,
  52    pub guests: HashMap<ConnectionId, Collaborator>,
  53    pub active_replica_ids: HashSet<ReplicaId>,
  54    pub worktrees: BTreeMap<u64, Worktree>,
  55    pub language_servers: Vec<proto::LanguageServer>,
  56}
  57
  58#[derive(Serialize)]
  59pub struct Collaborator {
  60    pub replica_id: ReplicaId,
  61    pub user_id: UserId,
  62    #[serde(skip)]
  63    pub last_activity: Option<OffsetDateTime>,
  64    pub admin: bool,
  65}
  66
  67#[derive(Default, Serialize)]
  68pub struct Worktree {
  69    pub abs_path: PathBuf,
  70    pub root_name: String,
  71    pub visible: bool,
  72    #[serde(skip)]
  73    pub entries: BTreeMap<u64, proto::Entry>,
  74    #[serde(skip)]
  75    pub diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
  76    pub scan_id: u64,
  77    pub is_complete: bool,
  78}
  79
  80#[derive(Default)]
  81pub struct Channel {
  82    pub connection_ids: HashSet<ConnectionId>,
  83}
  84
  85pub type ReplicaId = u16;
  86
  87#[derive(Default)]
  88pub struct RemovedConnectionState {
  89    pub user_id: UserId,
  90    pub hosted_projects: Vec<Project>,
  91    pub guest_projects: Vec<LeftProject>,
  92    pub contact_ids: HashSet<UserId>,
  93    pub room_id: Option<RoomId>,
  94    pub canceled_call_connection_ids: Vec<ConnectionId>,
  95}
  96
  97pub struct LeftProject {
  98    pub id: ProjectId,
  99    pub host_user_id: UserId,
 100    pub host_connection_id: ConnectionId,
 101    pub connection_ids: Vec<ConnectionId>,
 102    pub remove_collaborator: bool,
 103}
 104
 105pub struct LeftRoom<'a> {
 106    pub room: Option<&'a proto::Room>,
 107    pub unshared_projects: Vec<Project>,
 108    pub left_projects: Vec<LeftProject>,
 109    pub canceled_call_connection_ids: Vec<ConnectionId>,
 110}
 111
 112#[derive(Copy, Clone)]
 113pub struct Metrics {
 114    pub connections: usize,
 115    pub registered_projects: usize,
 116    pub active_projects: usize,
 117    pub shared_projects: usize,
 118}
 119
 120impl Store {
 121    pub fn metrics(&self) -> Metrics {
 122        const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60);
 123        let active_window_start = OffsetDateTime::now_utc() - ACTIVE_PROJECT_TIMEOUT;
 124
 125        let connections = self.connections.values().filter(|c| !c.admin).count();
 126        let mut registered_projects = 0;
 127        let mut active_projects = 0;
 128        let mut shared_projects = 0;
 129        for project in self.projects.values() {
 130            if let Some(connection) = self.connections.get(&project.host_connection_id) {
 131                if !connection.admin {
 132                    registered_projects += 1;
 133                    if project.is_active_since(active_window_start) {
 134                        active_projects += 1;
 135                        if !project.guests.is_empty() {
 136                            shared_projects += 1;
 137                        }
 138                    }
 139                }
 140            }
 141        }
 142
 143        Metrics {
 144            connections,
 145            registered_projects,
 146            active_projects,
 147            shared_projects,
 148        }
 149    }
 150
 151    #[instrument(skip(self))]
 152    pub fn add_connection(
 153        &mut self,
 154        connection_id: ConnectionId,
 155        user_id: UserId,
 156        admin: bool,
 157    ) -> Option<proto::IncomingCall> {
 158        self.connections.insert(
 159            connection_id,
 160            ConnectionState {
 161                user_id,
 162                admin,
 163                projects: Default::default(),
 164                channels: Default::default(),
 165            },
 166        );
 167        let connected_user = self.connected_users.entry(user_id).or_default();
 168        connected_user.connection_ids.insert(connection_id);
 169        if let Some(active_call) = connected_user.active_call {
 170            if active_call.connection_id.is_some() {
 171                None
 172            } else {
 173                let room = self.room(active_call.room_id)?;
 174                Some(proto::IncomingCall {
 175                    room_id: active_call.room_id,
 176                    caller_user_id: active_call.caller_user_id.to_proto(),
 177                    participant_user_ids: room
 178                        .participants
 179                        .iter()
 180                        .map(|participant| participant.user_id)
 181                        .collect(),
 182                    initial_project: active_call
 183                        .initial_project_id
 184                        .and_then(|id| Self::build_participant_project(id, &self.projects)),
 185                })
 186            }
 187        } else {
 188            None
 189        }
 190    }
 191
 192    #[instrument(skip(self))]
 193    pub fn remove_connection(
 194        &mut self,
 195        connection_id: ConnectionId,
 196    ) -> Result<RemovedConnectionState> {
 197        let connection = self
 198            .connections
 199            .get_mut(&connection_id)
 200            .ok_or_else(|| anyhow!("no such connection"))?;
 201
 202        let user_id = connection.user_id;
 203        let connection_channels = mem::take(&mut connection.channels);
 204
 205        let mut result = RemovedConnectionState {
 206            user_id,
 207            ..Default::default()
 208        };
 209
 210        // Leave all channels.
 211        for channel_id in connection_channels {
 212            self.leave_channel(connection_id, channel_id);
 213        }
 214
 215        let connected_user = self.connected_users.get(&user_id).unwrap();
 216        if let Some(active_call) = connected_user.active_call.as_ref() {
 217            let room_id = active_call.room_id;
 218            if active_call.connection_id == Some(connection_id) {
 219                let left_room = self.leave_room(room_id, connection_id)?;
 220                result.hosted_projects = left_room.unshared_projects;
 221                result.guest_projects = left_room.left_projects;
 222                result.room_id = Some(room_id);
 223                result.canceled_call_connection_ids = left_room.canceled_call_connection_ids;
 224            } else if connected_user.connection_ids.len() == 1 {
 225                self.decline_call(room_id, connection_id)?;
 226                result.room_id = Some(room_id);
 227            }
 228        }
 229
 230        let connected_user = self.connected_users.get_mut(&user_id).unwrap();
 231        connected_user.connection_ids.remove(&connection_id);
 232        if connected_user.connection_ids.is_empty() {
 233            self.connected_users.remove(&user_id);
 234        }
 235        self.connections.remove(&connection_id).unwrap();
 236
 237        Ok(result)
 238    }
 239
 240    #[cfg(test)]
 241    pub fn channel(&self, id: ChannelId) -> Option<&Channel> {
 242        self.channels.get(&id)
 243    }
 244
 245    pub fn join_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
 246        if let Some(connection) = self.connections.get_mut(&connection_id) {
 247            connection.channels.insert(channel_id);
 248            self.channels
 249                .entry(channel_id)
 250                .or_default()
 251                .connection_ids
 252                .insert(connection_id);
 253        }
 254    }
 255
 256    pub fn leave_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
 257        if let Some(connection) = self.connections.get_mut(&connection_id) {
 258            connection.channels.remove(&channel_id);
 259            if let btree_map::Entry::Occupied(mut entry) = self.channels.entry(channel_id) {
 260                entry.get_mut().connection_ids.remove(&connection_id);
 261                if entry.get_mut().connection_ids.is_empty() {
 262                    entry.remove();
 263                }
 264            }
 265        }
 266    }
 267
 268    pub fn user_id_for_connection(&self, connection_id: ConnectionId) -> Result<UserId> {
 269        Ok(self
 270            .connections
 271            .get(&connection_id)
 272            .ok_or_else(|| anyhow!("unknown connection"))?
 273            .user_id)
 274    }
 275
 276    pub fn connection_ids_for_user(
 277        &self,
 278        user_id: UserId,
 279    ) -> impl Iterator<Item = ConnectionId> + '_ {
 280        self.connected_users
 281            .get(&user_id)
 282            .into_iter()
 283            .map(|state| &state.connection_ids)
 284            .flatten()
 285            .copied()
 286    }
 287
 288    pub fn is_user_online(&self, user_id: UserId) -> bool {
 289        !self
 290            .connected_users
 291            .get(&user_id)
 292            .unwrap_or(&Default::default())
 293            .connection_ids
 294            .is_empty()
 295    }
 296
 297    fn is_user_busy(&self, user_id: UserId) -> bool {
 298        self.connected_users
 299            .get(&user_id)
 300            .unwrap_or(&Default::default())
 301            .active_call
 302            .is_some()
 303    }
 304
 305    pub fn build_initial_contacts_update(
 306        &self,
 307        contacts: Vec<db::Contact>,
 308    ) -> proto::UpdateContacts {
 309        let mut update = proto::UpdateContacts::default();
 310
 311        for contact in contacts {
 312            match contact {
 313                db::Contact::Accepted {
 314                    user_id,
 315                    should_notify,
 316                } => {
 317                    update
 318                        .contacts
 319                        .push(self.contact_for_user(user_id, should_notify));
 320                }
 321                db::Contact::Outgoing { user_id } => {
 322                    update.outgoing_requests.push(user_id.to_proto())
 323                }
 324                db::Contact::Incoming {
 325                    user_id,
 326                    should_notify,
 327                } => update
 328                    .incoming_requests
 329                    .push(proto::IncomingContactRequest {
 330                        requester_id: user_id.to_proto(),
 331                        should_notify,
 332                    }),
 333            }
 334        }
 335
 336        update
 337    }
 338
 339    pub fn contact_for_user(&self, user_id: UserId, should_notify: bool) -> proto::Contact {
 340        proto::Contact {
 341            user_id: user_id.to_proto(),
 342            online: self.is_user_online(user_id),
 343            busy: self.is_user_busy(user_id),
 344            should_notify,
 345        }
 346    }
 347
 348    pub fn create_room(&mut self, creator_connection_id: ConnectionId) -> Result<RoomId> {
 349        let connection = self
 350            .connections
 351            .get_mut(&creator_connection_id)
 352            .ok_or_else(|| anyhow!("no such connection"))?;
 353        let connected_user = self
 354            .connected_users
 355            .get_mut(&connection.user_id)
 356            .ok_or_else(|| anyhow!("no such connection"))?;
 357        anyhow::ensure!(
 358            connected_user.active_call.is_none(),
 359            "can't create a room with an active call"
 360        );
 361
 362        let mut room = proto::Room::default();
 363        room.participants.push(proto::Participant {
 364            user_id: connection.user_id.to_proto(),
 365            peer_id: creator_connection_id.0,
 366            projects: Default::default(),
 367            location: Some(proto::ParticipantLocation {
 368                variant: Some(proto::participant_location::Variant::External(
 369                    proto::participant_location::External {},
 370                )),
 371            }),
 372        });
 373
 374        let room_id = post_inc(&mut self.next_room_id);
 375        self.rooms.insert(room_id, room);
 376        connected_user.active_call = Some(Call {
 377            caller_user_id: connection.user_id,
 378            room_id,
 379            connection_id: Some(creator_connection_id),
 380            initial_project_id: None,
 381        });
 382        Ok(room_id)
 383    }
 384
 385    pub fn join_room(
 386        &mut self,
 387        room_id: RoomId,
 388        connection_id: ConnectionId,
 389    ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
 390        let connection = self
 391            .connections
 392            .get_mut(&connection_id)
 393            .ok_or_else(|| anyhow!("no such connection"))?;
 394        let user_id = connection.user_id;
 395        let recipient_connection_ids = self.connection_ids_for_user(user_id).collect::<Vec<_>>();
 396
 397        let connected_user = self
 398            .connected_users
 399            .get_mut(&user_id)
 400            .ok_or_else(|| anyhow!("no such connection"))?;
 401        let active_call = connected_user
 402            .active_call
 403            .as_mut()
 404            .ok_or_else(|| anyhow!("not being called"))?;
 405        anyhow::ensure!(
 406            active_call.room_id == room_id && active_call.connection_id.is_none(),
 407            "not being called on this room"
 408        );
 409
 410        let room = self
 411            .rooms
 412            .get_mut(&room_id)
 413            .ok_or_else(|| anyhow!("no such room"))?;
 414        anyhow::ensure!(
 415            room.pending_participant_user_ids
 416                .contains(&user_id.to_proto()),
 417            anyhow!("no such room")
 418        );
 419        room.pending_participant_user_ids
 420            .retain(|pending| *pending != user_id.to_proto());
 421        room.participants.push(proto::Participant {
 422            user_id: user_id.to_proto(),
 423            peer_id: connection_id.0,
 424            projects: Default::default(),
 425            location: Some(proto::ParticipantLocation {
 426                variant: Some(proto::participant_location::Variant::External(
 427                    proto::participant_location::External {},
 428                )),
 429            }),
 430        });
 431        active_call.connection_id = Some(connection_id);
 432
 433        Ok((room, recipient_connection_ids))
 434    }
 435
 436    pub fn leave_room(&mut self, room_id: RoomId, connection_id: ConnectionId) -> Result<LeftRoom> {
 437        let connection = self
 438            .connections
 439            .get_mut(&connection_id)
 440            .ok_or_else(|| anyhow!("no such connection"))?;
 441        let user_id = connection.user_id;
 442
 443        let connected_user = self
 444            .connected_users
 445            .get(&user_id)
 446            .ok_or_else(|| anyhow!("no such connection"))?;
 447        anyhow::ensure!(
 448            connected_user
 449                .active_call
 450                .map_or(false, |call| call.room_id == room_id
 451                    && call.connection_id == Some(connection_id)),
 452            "cannot leave a room before joining it"
 453        );
 454
 455        // Given that users can only join one room at a time, we can safely unshare
 456        // and leave all projects associated with the connection.
 457        let mut unshared_projects = Vec::new();
 458        let mut left_projects = Vec::new();
 459        for project_id in connection.projects.clone() {
 460            if let Ok((_, project)) = self.unshare_project(project_id, connection_id) {
 461                unshared_projects.push(project);
 462            } else if let Ok(project) = self.leave_project(project_id, connection_id) {
 463                left_projects.push(project);
 464            }
 465        }
 466        self.connected_users.get_mut(&user_id).unwrap().active_call = None;
 467
 468        let room = self
 469            .rooms
 470            .get_mut(&room_id)
 471            .ok_or_else(|| anyhow!("no such room"))?;
 472        room.participants
 473            .retain(|participant| participant.peer_id != connection_id.0);
 474
 475        let mut canceled_call_connection_ids = Vec::new();
 476        room.pending_participant_user_ids
 477            .retain(|pending_participant_user_id| {
 478                if let Some(connected_user) = self
 479                    .connected_users
 480                    .get_mut(&UserId::from_proto(*pending_participant_user_id))
 481                {
 482                    if let Some(call) = connected_user.active_call.as_ref() {
 483                        if call.caller_user_id == user_id {
 484                            connected_user.active_call.take();
 485                            canceled_call_connection_ids
 486                                .extend(connected_user.connection_ids.iter().copied());
 487                            false
 488                        } else {
 489                            true
 490                        }
 491                    } else {
 492                        true
 493                    }
 494                } else {
 495                    true
 496                }
 497            });
 498
 499        if room.participants.is_empty() && room.pending_participant_user_ids.is_empty() {
 500            self.rooms.remove(&room_id);
 501        }
 502
 503        Ok(LeftRoom {
 504            room: self.rooms.get(&room_id),
 505            unshared_projects,
 506            left_projects,
 507            canceled_call_connection_ids,
 508        })
 509    }
 510
 511    pub fn room(&self, room_id: RoomId) -> Option<&proto::Room> {
 512        self.rooms.get(&room_id)
 513    }
 514
 515    pub fn call(
 516        &mut self,
 517        room_id: RoomId,
 518        recipient_user_id: UserId,
 519        initial_project_id: Option<ProjectId>,
 520        from_connection_id: ConnectionId,
 521    ) -> Result<(&proto::Room, Vec<ConnectionId>, proto::IncomingCall)> {
 522        let caller_user_id = self.user_id_for_connection(from_connection_id)?;
 523
 524        let recipient_connection_ids = self
 525            .connection_ids_for_user(recipient_user_id)
 526            .collect::<Vec<_>>();
 527        let mut recipient = self
 528            .connected_users
 529            .get_mut(&recipient_user_id)
 530            .ok_or_else(|| anyhow!("no such connection"))?;
 531        anyhow::ensure!(
 532            recipient.active_call.is_none(),
 533            "recipient is already on another call"
 534        );
 535
 536        let room = self
 537            .rooms
 538            .get_mut(&room_id)
 539            .ok_or_else(|| anyhow!("no such room"))?;
 540        anyhow::ensure!(
 541            room.participants
 542                .iter()
 543                .any(|participant| participant.peer_id == from_connection_id.0),
 544            "no such room"
 545        );
 546        anyhow::ensure!(
 547            room.pending_participant_user_ids
 548                .iter()
 549                .all(|user_id| UserId::from_proto(*user_id) != recipient_user_id),
 550            "cannot call the same user more than once"
 551        );
 552        room.pending_participant_user_ids
 553            .push(recipient_user_id.to_proto());
 554
 555        if let Some(initial_project_id) = initial_project_id {
 556            let project = self
 557                .projects
 558                .get(&initial_project_id)
 559                .ok_or_else(|| anyhow!("no such project"))?;
 560            anyhow::ensure!(project.room_id == room_id, "no such project");
 561        }
 562
 563        recipient.active_call = Some(Call {
 564            caller_user_id,
 565            room_id,
 566            connection_id: None,
 567            initial_project_id,
 568        });
 569
 570        Ok((
 571            room,
 572            recipient_connection_ids,
 573            proto::IncomingCall {
 574                room_id,
 575                caller_user_id: caller_user_id.to_proto(),
 576                participant_user_ids: room
 577                    .participants
 578                    .iter()
 579                    .map(|participant| participant.user_id)
 580                    .collect(),
 581                initial_project: initial_project_id
 582                    .and_then(|id| Self::build_participant_project(id, &self.projects)),
 583            },
 584        ))
 585    }
 586
 587    pub fn call_failed(&mut self, room_id: RoomId, to_user_id: UserId) -> Result<&proto::Room> {
 588        let mut recipient = self
 589            .connected_users
 590            .get_mut(&to_user_id)
 591            .ok_or_else(|| anyhow!("no such connection"))?;
 592        anyhow::ensure!(recipient
 593            .active_call
 594            .map_or(false, |call| call.room_id == room_id
 595                && call.connection_id.is_none()));
 596        recipient.active_call = None;
 597        let room = self
 598            .rooms
 599            .get_mut(&room_id)
 600            .ok_or_else(|| anyhow!("no such room"))?;
 601        room.pending_participant_user_ids
 602            .retain(|user_id| UserId::from_proto(*user_id) != to_user_id);
 603        Ok(room)
 604    }
 605
 606    pub fn cancel_call(
 607        &mut self,
 608        room_id: RoomId,
 609        recipient_user_id: UserId,
 610        canceller_connection_id: ConnectionId,
 611    ) -> Result<(&proto::Room, HashSet<ConnectionId>)> {
 612        let canceller_user_id = self.user_id_for_connection(canceller_connection_id)?;
 613        let canceller = self
 614            .connected_users
 615            .get(&canceller_user_id)
 616            .ok_or_else(|| anyhow!("no such connection"))?;
 617        let recipient = self
 618            .connected_users
 619            .get(&recipient_user_id)
 620            .ok_or_else(|| anyhow!("no such connection"))?;
 621        let canceller_active_call = canceller
 622            .active_call
 623            .as_ref()
 624            .ok_or_else(|| anyhow!("no active call"))?;
 625        let recipient_active_call = recipient
 626            .active_call
 627            .as_ref()
 628            .ok_or_else(|| anyhow!("no active call for recipient"))?;
 629
 630        anyhow::ensure!(
 631            canceller_active_call.room_id == room_id,
 632            "users are on different calls"
 633        );
 634        anyhow::ensure!(
 635            recipient_active_call.room_id == room_id,
 636            "users are on different calls"
 637        );
 638        anyhow::ensure!(
 639            recipient_active_call.connection_id.is_none(),
 640            "recipient has already answered"
 641        );
 642        let room_id = recipient_active_call.room_id;
 643        let room = self
 644            .rooms
 645            .get_mut(&room_id)
 646            .ok_or_else(|| anyhow!("no such room"))?;
 647        room.pending_participant_user_ids
 648            .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
 649
 650        let recipient = self.connected_users.get_mut(&recipient_user_id).unwrap();
 651        recipient.active_call.take();
 652
 653        Ok((room, recipient.connection_ids.clone()))
 654    }
 655
 656    pub fn decline_call(
 657        &mut self,
 658        room_id: RoomId,
 659        recipient_connection_id: ConnectionId,
 660    ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
 661        let recipient_user_id = self.user_id_for_connection(recipient_connection_id)?;
 662        let recipient = self
 663            .connected_users
 664            .get_mut(&recipient_user_id)
 665            .ok_or_else(|| anyhow!("no such connection"))?;
 666        if let Some(active_call) = recipient.active_call.take() {
 667            anyhow::ensure!(active_call.room_id == room_id, "no such room");
 668            let recipient_connection_ids = self
 669                .connection_ids_for_user(recipient_user_id)
 670                .collect::<Vec<_>>();
 671            let room = self
 672                .rooms
 673                .get_mut(&active_call.room_id)
 674                .ok_or_else(|| anyhow!("no such room"))?;
 675            room.pending_participant_user_ids
 676                .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
 677            Ok((room, recipient_connection_ids))
 678        } else {
 679            Err(anyhow!("user is not being called"))
 680        }
 681    }
 682
 683    pub fn update_participant_location(
 684        &mut self,
 685        room_id: RoomId,
 686        location: proto::ParticipantLocation,
 687        connection_id: ConnectionId,
 688    ) -> Result<&proto::Room> {
 689        let room = self
 690            .rooms
 691            .get_mut(&room_id)
 692            .ok_or_else(|| anyhow!("no such room"))?;
 693        if let Some(proto::participant_location::Variant::SharedProject(project)) =
 694            location.variant.as_ref()
 695        {
 696            anyhow::ensure!(
 697                room.participants
 698                    .iter()
 699                    .flat_map(|participant| &participant.projects)
 700                    .any(|participant_project| participant_project.id == project.id),
 701                "no such project"
 702            );
 703        }
 704
 705        let participant = room
 706            .participants
 707            .iter_mut()
 708            .find(|participant| participant.peer_id == connection_id.0)
 709            .ok_or_else(|| anyhow!("no such room"))?;
 710        participant.location = Some(location);
 711
 712        Ok(room)
 713    }
 714
 715    pub fn share_project(
 716        &mut self,
 717        room_id: RoomId,
 718        project_id: ProjectId,
 719        worktrees: Vec<proto::WorktreeMetadata>,
 720        host_connection_id: ConnectionId,
 721    ) -> Result<&proto::Room> {
 722        let connection = self
 723            .connections
 724            .get_mut(&host_connection_id)
 725            .ok_or_else(|| anyhow!("no such connection"))?;
 726
 727        let room = self
 728            .rooms
 729            .get_mut(&room_id)
 730            .ok_or_else(|| anyhow!("no such room"))?;
 731        let participant = room
 732            .participants
 733            .iter_mut()
 734            .find(|participant| participant.peer_id == host_connection_id.0)
 735            .ok_or_else(|| anyhow!("no such room"))?;
 736
 737        connection.projects.insert(project_id);
 738        self.projects.insert(
 739            project_id,
 740            Project {
 741                id: project_id,
 742                room_id,
 743                host_connection_id,
 744                host: Collaborator {
 745                    user_id: connection.user_id,
 746                    replica_id: 0,
 747                    last_activity: None,
 748                    admin: connection.admin,
 749                },
 750                guests: Default::default(),
 751                active_replica_ids: Default::default(),
 752                worktrees: worktrees
 753                    .into_iter()
 754                    .map(|worktree| {
 755                        (
 756                            worktree.id,
 757                            Worktree {
 758                                root_name: worktree.root_name,
 759                                visible: worktree.visible,
 760                                ..Default::default()
 761                            },
 762                        )
 763                    })
 764                    .collect(),
 765                language_servers: Default::default(),
 766            },
 767        );
 768
 769        participant
 770            .projects
 771            .extend(Self::build_participant_project(project_id, &self.projects));
 772
 773        Ok(room)
 774    }
 775
 776    pub fn unshare_project(
 777        &mut self,
 778        project_id: ProjectId,
 779        connection_id: ConnectionId,
 780    ) -> Result<(&proto::Room, Project)> {
 781        match self.projects.entry(project_id) {
 782            btree_map::Entry::Occupied(e) => {
 783                if e.get().host_connection_id == connection_id {
 784                    let project = e.remove();
 785
 786                    if let Some(host_connection) = self.connections.get_mut(&connection_id) {
 787                        host_connection.projects.remove(&project_id);
 788                    }
 789
 790                    for guest_connection in project.guests.keys() {
 791                        if let Some(connection) = self.connections.get_mut(guest_connection) {
 792                            connection.projects.remove(&project_id);
 793                        }
 794                    }
 795
 796                    let room = self
 797                        .rooms
 798                        .get_mut(&project.room_id)
 799                        .ok_or_else(|| anyhow!("no such room"))?;
 800                    let participant = room
 801                        .participants
 802                        .iter_mut()
 803                        .find(|participant| participant.peer_id == connection_id.0)
 804                        .ok_or_else(|| anyhow!("no such room"))?;
 805                    participant
 806                        .projects
 807                        .retain(|project| project.id != project_id.to_proto());
 808
 809                    Ok((room, project))
 810                } else {
 811                    Err(anyhow!("no such project"))?
 812                }
 813            }
 814            btree_map::Entry::Vacant(_) => Err(anyhow!("no such project"))?,
 815        }
 816    }
 817
 818    pub fn update_project(
 819        &mut self,
 820        project_id: ProjectId,
 821        worktrees: &[proto::WorktreeMetadata],
 822        connection_id: ConnectionId,
 823    ) -> Result<&proto::Room> {
 824        let project = self
 825            .projects
 826            .get_mut(&project_id)
 827            .ok_or_else(|| anyhow!("no such project"))?;
 828        if project.host_connection_id == connection_id {
 829            let mut old_worktrees = mem::take(&mut project.worktrees);
 830            for worktree in worktrees {
 831                if let Some(old_worktree) = old_worktrees.remove(&worktree.id) {
 832                    project.worktrees.insert(worktree.id, old_worktree);
 833                } else {
 834                    project.worktrees.insert(
 835                        worktree.id,
 836                        Worktree {
 837                            root_name: worktree.root_name.clone(),
 838                            visible: worktree.visible,
 839                            ..Default::default()
 840                        },
 841                    );
 842                }
 843            }
 844
 845            let room = self
 846                .rooms
 847                .get_mut(&project.room_id)
 848                .ok_or_else(|| anyhow!("no such room"))?;
 849            let participant_project = room
 850                .participants
 851                .iter_mut()
 852                .flat_map(|participant| &mut participant.projects)
 853                .find(|project| project.id == project_id.to_proto())
 854                .ok_or_else(|| anyhow!("no such project"))?;
 855            participant_project.worktree_root_names = worktrees
 856                .iter()
 857                .filter(|worktree| worktree.visible)
 858                .map(|worktree| worktree.root_name.clone())
 859                .collect();
 860
 861            Ok(room)
 862        } else {
 863            Err(anyhow!("no such project"))?
 864        }
 865    }
 866
 867    pub fn update_diagnostic_summary(
 868        &mut self,
 869        project_id: ProjectId,
 870        worktree_id: u64,
 871        connection_id: ConnectionId,
 872        summary: proto::DiagnosticSummary,
 873    ) -> Result<Vec<ConnectionId>> {
 874        let project = self
 875            .projects
 876            .get_mut(&project_id)
 877            .ok_or_else(|| anyhow!("no such project"))?;
 878        if project.host_connection_id == connection_id {
 879            let worktree = project
 880                .worktrees
 881                .get_mut(&worktree_id)
 882                .ok_or_else(|| anyhow!("no such worktree"))?;
 883            worktree
 884                .diagnostic_summaries
 885                .insert(summary.path.clone().into(), summary);
 886            return Ok(project.connection_ids());
 887        }
 888
 889        Err(anyhow!("no such worktree"))?
 890    }
 891
 892    pub fn start_language_server(
 893        &mut self,
 894        project_id: ProjectId,
 895        connection_id: ConnectionId,
 896        language_server: proto::LanguageServer,
 897    ) -> Result<Vec<ConnectionId>> {
 898        let project = self
 899            .projects
 900            .get_mut(&project_id)
 901            .ok_or_else(|| anyhow!("no such project"))?;
 902        if project.host_connection_id == connection_id {
 903            project.language_servers.push(language_server);
 904            return Ok(project.connection_ids());
 905        }
 906
 907        Err(anyhow!("no such project"))?
 908    }
 909
 910    pub fn join_project(
 911        &mut self,
 912        requester_connection_id: ConnectionId,
 913        project_id: ProjectId,
 914    ) -> Result<(&Project, ReplicaId)> {
 915        let connection = self
 916            .connections
 917            .get_mut(&requester_connection_id)
 918            .ok_or_else(|| anyhow!("no such connection"))?;
 919        let user = self
 920            .connected_users
 921            .get(&connection.user_id)
 922            .ok_or_else(|| anyhow!("no such connection"))?;
 923        let active_call = user.active_call.ok_or_else(|| anyhow!("no such project"))?;
 924        anyhow::ensure!(
 925            active_call.connection_id == Some(requester_connection_id),
 926            "no such project"
 927        );
 928
 929        let project = self
 930            .projects
 931            .get_mut(&project_id)
 932            .ok_or_else(|| anyhow!("no such project"))?;
 933        anyhow::ensure!(project.room_id == active_call.room_id, "no such project");
 934
 935        connection.projects.insert(project_id);
 936        let mut replica_id = 1;
 937        while project.active_replica_ids.contains(&replica_id) {
 938            replica_id += 1;
 939        }
 940        project.active_replica_ids.insert(replica_id);
 941        project.guests.insert(
 942            requester_connection_id,
 943            Collaborator {
 944                replica_id,
 945                user_id: connection.user_id,
 946                last_activity: Some(OffsetDateTime::now_utc()),
 947                admin: connection.admin,
 948            },
 949        );
 950
 951        project.host.last_activity = Some(OffsetDateTime::now_utc());
 952        Ok((project, replica_id))
 953    }
 954
 955    pub fn leave_project(
 956        &mut self,
 957        project_id: ProjectId,
 958        connection_id: ConnectionId,
 959    ) -> Result<LeftProject> {
 960        let project = self
 961            .projects
 962            .get_mut(&project_id)
 963            .ok_or_else(|| anyhow!("no such project"))?;
 964
 965        // If the connection leaving the project is a collaborator, remove it.
 966        let remove_collaborator = if let Some(guest) = project.guests.remove(&connection_id) {
 967            project.active_replica_ids.remove(&guest.replica_id);
 968            true
 969        } else {
 970            false
 971        };
 972
 973        if let Some(connection) = self.connections.get_mut(&connection_id) {
 974            connection.projects.remove(&project_id);
 975        }
 976
 977        Ok(LeftProject {
 978            id: project.id,
 979            host_connection_id: project.host_connection_id,
 980            host_user_id: project.host.user_id,
 981            connection_ids: project.connection_ids(),
 982            remove_collaborator,
 983        })
 984    }
 985
 986    #[allow(clippy::too_many_arguments)]
 987    pub fn update_worktree(
 988        &mut self,
 989        connection_id: ConnectionId,
 990        project_id: ProjectId,
 991        worktree_id: u64,
 992        worktree_root_name: &str,
 993        removed_entries: &[u64],
 994        updated_entries: &[proto::Entry],
 995        scan_id: u64,
 996        is_last_update: bool,
 997    ) -> Result<Vec<ConnectionId>> {
 998        let project = self.write_project(project_id, connection_id)?;
 999
1000        let connection_ids = project.connection_ids();
1001        let mut worktree = project.worktrees.entry(worktree_id).or_default();
1002        worktree.root_name = worktree_root_name.to_string();
1003
1004        for entry_id in removed_entries {
1005            worktree.entries.remove(entry_id);
1006        }
1007
1008        for entry in updated_entries {
1009            worktree.entries.insert(entry.id, entry.clone());
1010        }
1011
1012        worktree.scan_id = scan_id;
1013        worktree.is_complete = is_last_update;
1014        Ok(connection_ids)
1015    }
1016
1017    fn build_participant_project(
1018        project_id: ProjectId,
1019        projects: &BTreeMap<ProjectId, Project>,
1020    ) -> Option<proto::ParticipantProject> {
1021        Some(proto::ParticipantProject {
1022            id: project_id.to_proto(),
1023            worktree_root_names: projects
1024                .get(&project_id)?
1025                .worktrees
1026                .values()
1027                .filter(|worktree| worktree.visible)
1028                .map(|worktree| worktree.root_name.clone())
1029                .collect(),
1030        })
1031    }
1032
1033    pub fn project_connection_ids(
1034        &self,
1035        project_id: ProjectId,
1036        acting_connection_id: ConnectionId,
1037    ) -> Result<Vec<ConnectionId>> {
1038        Ok(self
1039            .read_project(project_id, acting_connection_id)?
1040            .connection_ids())
1041    }
1042
1043    pub fn channel_connection_ids(&self, channel_id: ChannelId) -> Result<Vec<ConnectionId>> {
1044        Ok(self
1045            .channels
1046            .get(&channel_id)
1047            .ok_or_else(|| anyhow!("no such channel"))?
1048            .connection_ids())
1049    }
1050
1051    pub fn project(&self, project_id: ProjectId) -> Result<&Project> {
1052        self.projects
1053            .get(&project_id)
1054            .ok_or_else(|| anyhow!("no such project"))
1055    }
1056
1057    pub fn register_project_activity(
1058        &mut self,
1059        project_id: ProjectId,
1060        connection_id: ConnectionId,
1061    ) -> Result<()> {
1062        let project = self
1063            .projects
1064            .get_mut(&project_id)
1065            .ok_or_else(|| anyhow!("no such project"))?;
1066        let collaborator = if connection_id == project.host_connection_id {
1067            &mut project.host
1068        } else if let Some(guest) = project.guests.get_mut(&connection_id) {
1069            guest
1070        } else {
1071            return Err(anyhow!("no such project"))?;
1072        };
1073        collaborator.last_activity = Some(OffsetDateTime::now_utc());
1074        Ok(())
1075    }
1076
1077    pub fn projects(&self) -> impl Iterator<Item = (&ProjectId, &Project)> {
1078        self.projects.iter()
1079    }
1080
1081    pub fn read_project(
1082        &self,
1083        project_id: ProjectId,
1084        connection_id: ConnectionId,
1085    ) -> Result<&Project> {
1086        let project = self
1087            .projects
1088            .get(&project_id)
1089            .ok_or_else(|| anyhow!("no such project"))?;
1090        if project.host_connection_id == connection_id
1091            || project.guests.contains_key(&connection_id)
1092        {
1093            Ok(project)
1094        } else {
1095            Err(anyhow!("no such project"))?
1096        }
1097    }
1098
1099    fn write_project(
1100        &mut self,
1101        project_id: ProjectId,
1102        connection_id: ConnectionId,
1103    ) -> Result<&mut Project> {
1104        let project = self
1105            .projects
1106            .get_mut(&project_id)
1107            .ok_or_else(|| anyhow!("no such project"))?;
1108        if project.host_connection_id == connection_id
1109            || project.guests.contains_key(&connection_id)
1110        {
1111            Ok(project)
1112        } else {
1113            Err(anyhow!("no such project"))?
1114        }
1115    }
1116
1117    #[cfg(test)]
1118    pub fn check_invariants(&self) {
1119        for (connection_id, connection) in &self.connections {
1120            for project_id in &connection.projects {
1121                let project = &self.projects.get(project_id).unwrap();
1122                if project.host_connection_id != *connection_id {
1123                    assert!(project.guests.contains_key(connection_id));
1124                }
1125
1126                for (worktree_id, worktree) in project.worktrees.iter() {
1127                    let mut paths = HashMap::default();
1128                    for entry in worktree.entries.values() {
1129                        let prev_entry = paths.insert(&entry.path, entry);
1130                        assert_eq!(
1131                            prev_entry,
1132                            None,
1133                            "worktree {:?}, duplicate path for entries {:?} and {:?}",
1134                            worktree_id,
1135                            prev_entry.unwrap(),
1136                            entry
1137                        );
1138                    }
1139                }
1140            }
1141            for channel_id in &connection.channels {
1142                let channel = self.channels.get(channel_id).unwrap();
1143                assert!(channel.connection_ids.contains(connection_id));
1144            }
1145            assert!(self
1146                .connected_users
1147                .get(&connection.user_id)
1148                .unwrap()
1149                .connection_ids
1150                .contains(connection_id));
1151        }
1152
1153        for (user_id, state) in &self.connected_users {
1154            for connection_id in &state.connection_ids {
1155                assert_eq!(
1156                    self.connections.get(connection_id).unwrap().user_id,
1157                    *user_id
1158                );
1159            }
1160
1161            if let Some(active_call) = state.active_call.as_ref() {
1162                if let Some(active_call_connection_id) = active_call.connection_id {
1163                    assert!(
1164                        state.connection_ids.contains(&active_call_connection_id),
1165                        "call is active on a dead connection"
1166                    );
1167                    assert!(
1168                        state.connection_ids.contains(&active_call_connection_id),
1169                        "call is active on a dead connection"
1170                    );
1171                }
1172            }
1173        }
1174
1175        for (room_id, room) in &self.rooms {
1176            for pending_user_id in &room.pending_participant_user_ids {
1177                assert!(
1178                    self.connected_users
1179                        .contains_key(&UserId::from_proto(*pending_user_id)),
1180                    "call is active on a user that has disconnected"
1181                );
1182            }
1183
1184            for participant in &room.participants {
1185                assert!(
1186                    self.connections
1187                        .contains_key(&ConnectionId(participant.peer_id)),
1188                    "room contains participant that has disconnected"
1189                );
1190
1191                for participant_project in &participant.projects {
1192                    let project = &self.projects[&ProjectId::from_proto(participant_project.id)];
1193                    assert_eq!(
1194                        project.room_id, *room_id,
1195                        "project was shared on a different room"
1196                    );
1197                }
1198            }
1199
1200            assert!(
1201                !room.pending_participant_user_ids.is_empty() || !room.participants.is_empty(),
1202                "room can't be empty"
1203            );
1204        }
1205
1206        for (project_id, project) in &self.projects {
1207            let host_connection = self.connections.get(&project.host_connection_id).unwrap();
1208            assert!(host_connection.projects.contains(project_id));
1209
1210            for guest_connection_id in project.guests.keys() {
1211                let guest_connection = self.connections.get(guest_connection_id).unwrap();
1212                assert!(guest_connection.projects.contains(project_id));
1213            }
1214            assert_eq!(project.active_replica_ids.len(), project.guests.len());
1215            assert_eq!(
1216                project.active_replica_ids,
1217                project
1218                    .guests
1219                    .values()
1220                    .map(|guest| guest.replica_id)
1221                    .collect::<HashSet<_>>(),
1222            );
1223
1224            let room = &self.rooms[&project.room_id];
1225            let room_participant = room
1226                .participants
1227                .iter()
1228                .find(|participant| participant.peer_id == project.host_connection_id.0)
1229                .unwrap();
1230            assert!(
1231                room_participant
1232                    .projects
1233                    .iter()
1234                    .any(|project| project.id == project_id.to_proto()),
1235                "project was not shared in room"
1236            );
1237        }
1238
1239        for (channel_id, channel) in &self.channels {
1240            for connection_id in &channel.connection_ids {
1241                let connection = self.connections.get(connection_id).unwrap();
1242                assert!(connection.channels.contains(channel_id));
1243            }
1244        }
1245    }
1246}
1247
1248impl Project {
1249    fn is_active_since(&self, start_time: OffsetDateTime) -> bool {
1250        self.guests
1251            .values()
1252            .chain([&self.host])
1253            .any(|collaborator| {
1254                collaborator
1255                    .last_activity
1256                    .map_or(false, |active_time| active_time > start_time)
1257            })
1258    }
1259
1260    pub fn guest_connection_ids(&self) -> Vec<ConnectionId> {
1261        self.guests.keys().copied().collect()
1262    }
1263
1264    pub fn connection_ids(&self) -> Vec<ConnectionId> {
1265        self.guests
1266            .keys()
1267            .copied()
1268            .chain(Some(self.host_connection_id))
1269            .collect()
1270    }
1271}
1272
1273impl Channel {
1274    fn connection_ids(&self) -> Vec<ConnectionId> {
1275        self.connection_ids.iter().copied().collect()
1276    }
1277}