store.rs

   1use crate::db::{self, ChannelId, ProjectId, UserId};
   2use anyhow::{anyhow, Result};
   3use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet};
   4use rpc::{proto, ConnectionId};
   5use serde::Serialize;
   6use std::{mem, path::PathBuf, str, time::Duration};
   7use time::OffsetDateTime;
   8use tracing::instrument;
   9use util::post_inc;
  10
  11pub type RoomId = u64;
  12
  13#[derive(Default, Serialize)]
  14pub struct Store {
  15    connections: BTreeMap<ConnectionId, ConnectionState>,
  16    connected_users: BTreeMap<UserId, ConnectedUser>,
  17    next_room_id: RoomId,
  18    rooms: BTreeMap<RoomId, proto::Room>,
  19    projects: BTreeMap<ProjectId, Project>,
  20    #[serde(skip)]
  21    channels: BTreeMap<ChannelId, Channel>,
  22}
  23
  24#[derive(Default, Serialize)]
  25struct ConnectedUser {
  26    connection_ids: HashSet<ConnectionId>,
  27    active_call: Option<Call>,
  28}
  29
  30#[derive(Serialize)]
  31struct ConnectionState {
  32    user_id: UserId,
  33    admin: bool,
  34    projects: BTreeSet<ProjectId>,
  35    channels: HashSet<ChannelId>,
  36}
  37
  38#[derive(Copy, Clone, Eq, PartialEq, Serialize)]
  39pub struct Call {
  40    pub caller_user_id: UserId,
  41    pub room_id: RoomId,
  42    pub connection_id: Option<ConnectionId>,
  43    pub initial_project_id: Option<ProjectId>,
  44}
  45
  46#[derive(Serialize)]
  47pub struct Project {
  48    pub id: ProjectId,
  49    pub room_id: RoomId,
  50    pub host_connection_id: ConnectionId,
  51    pub host: Collaborator,
  52    pub guests: HashMap<ConnectionId, Collaborator>,
  53    pub active_replica_ids: HashSet<ReplicaId>,
  54    pub worktrees: BTreeMap<u64, Worktree>,
  55    pub language_servers: Vec<proto::LanguageServer>,
  56}
  57
  58#[derive(Serialize)]
  59pub struct Collaborator {
  60    pub replica_id: ReplicaId,
  61    pub user_id: UserId,
  62    #[serde(skip)]
  63    pub last_activity: Option<OffsetDateTime>,
  64    pub admin: bool,
  65}
  66
  67#[derive(Default, Serialize)]
  68pub struct Worktree {
  69    pub abs_path: PathBuf,
  70    pub root_name: String,
  71    pub visible: bool,
  72    #[serde(skip)]
  73    pub entries: BTreeMap<u64, proto::Entry>,
  74    #[serde(skip)]
  75    pub diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
  76    pub scan_id: u64,
  77    pub is_complete: bool,
  78}
  79
  80#[derive(Default)]
  81pub struct Channel {
  82    pub connection_ids: HashSet<ConnectionId>,
  83}
  84
  85pub type ReplicaId = u16;
  86
  87#[derive(Default)]
  88pub struct RemovedConnectionState {
  89    pub user_id: UserId,
  90    pub hosted_projects: Vec<Project>,
  91    pub guest_projects: Vec<LeftProject>,
  92    pub contact_ids: HashSet<UserId>,
  93    pub room_id: Option<RoomId>,
  94    pub canceled_call_connection_ids: Vec<ConnectionId>,
  95}
  96
  97pub struct LeftProject {
  98    pub id: ProjectId,
  99    pub host_user_id: UserId,
 100    pub host_connection_id: ConnectionId,
 101    pub connection_ids: Vec<ConnectionId>,
 102    pub remove_collaborator: bool,
 103}
 104
 105pub struct LeftRoom<'a> {
 106    pub room: Option<&'a proto::Room>,
 107    pub unshared_projects: Vec<Project>,
 108    pub left_projects: Vec<LeftProject>,
 109    pub canceled_call_connection_ids: Vec<ConnectionId>,
 110}
 111
 112#[derive(Copy, Clone)]
 113pub struct Metrics {
 114    pub connections: usize,
 115    pub registered_projects: usize,
 116    pub active_projects: usize,
 117    pub shared_projects: usize,
 118}
 119
 120impl Store {
 121    pub fn metrics(&self) -> Metrics {
 122        const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60);
 123        let active_window_start = OffsetDateTime::now_utc() - ACTIVE_PROJECT_TIMEOUT;
 124
 125        let connections = self.connections.values().filter(|c| !c.admin).count();
 126        let mut registered_projects = 0;
 127        let mut active_projects = 0;
 128        let mut shared_projects = 0;
 129        for project in self.projects.values() {
 130            if let Some(connection) = self.connections.get(&project.host_connection_id) {
 131                if !connection.admin {
 132                    registered_projects += 1;
 133                    if project.is_active_since(active_window_start) {
 134                        active_projects += 1;
 135                        if !project.guests.is_empty() {
 136                            shared_projects += 1;
 137                        }
 138                    }
 139                }
 140            }
 141        }
 142
 143        Metrics {
 144            connections,
 145            registered_projects,
 146            active_projects,
 147            shared_projects,
 148        }
 149    }
 150
 151    #[instrument(skip(self))]
 152    pub fn add_connection(
 153        &mut self,
 154        connection_id: ConnectionId,
 155        user_id: UserId,
 156        admin: bool,
 157    ) -> Option<proto::IncomingCall> {
 158        self.connections.insert(
 159            connection_id,
 160            ConnectionState {
 161                user_id,
 162                admin,
 163                projects: Default::default(),
 164                channels: Default::default(),
 165            },
 166        );
 167        let connected_user = self.connected_users.entry(user_id).or_default();
 168        connected_user.connection_ids.insert(connection_id);
 169        if let Some(active_call) = connected_user.active_call {
 170            if active_call.connection_id.is_some() {
 171                None
 172            } else {
 173                let room = self.room(active_call.room_id)?;
 174                Some(proto::IncomingCall {
 175                    room_id: active_call.room_id,
 176                    caller_user_id: active_call.caller_user_id.to_proto(),
 177                    participant_user_ids: room
 178                        .participants
 179                        .iter()
 180                        .map(|participant| participant.user_id)
 181                        .collect(),
 182                    initial_project: active_call
 183                        .initial_project_id
 184                        .and_then(|id| Self::build_participant_project(id, &self.projects)),
 185                })
 186            }
 187        } else {
 188            None
 189        }
 190    }
 191
 192    #[instrument(skip(self))]
 193    pub fn remove_connection(
 194        &mut self,
 195        connection_id: ConnectionId,
 196    ) -> Result<RemovedConnectionState> {
 197        let connection = self
 198            .connections
 199            .get_mut(&connection_id)
 200            .ok_or_else(|| anyhow!("no such connection"))?;
 201
 202        let user_id = connection.user_id;
 203        let connection_channels = mem::take(&mut connection.channels);
 204
 205        let mut result = RemovedConnectionState {
 206            user_id,
 207            ..Default::default()
 208        };
 209
 210        // Leave all channels.
 211        for channel_id in connection_channels {
 212            self.leave_channel(connection_id, channel_id);
 213        }
 214
 215        let connected_user = self.connected_users.get(&user_id).unwrap();
 216        if let Some(active_call) = connected_user.active_call.as_ref() {
 217            let room_id = active_call.room_id;
 218            if active_call.connection_id == Some(connection_id) {
 219                let left_room = self.leave_room(room_id, connection_id)?;
 220                result.hosted_projects = left_room.unshared_projects;
 221                result.guest_projects = left_room.left_projects;
 222                result.room_id = Some(room_id);
 223                result.canceled_call_connection_ids = left_room.canceled_call_connection_ids;
 224            }
 225        }
 226
 227        let connected_user = self.connected_users.get_mut(&user_id).unwrap();
 228        connected_user.connection_ids.remove(&connection_id);
 229        if connected_user.connection_ids.is_empty() {
 230            self.connected_users.remove(&user_id);
 231        }
 232        self.connections.remove(&connection_id).unwrap();
 233
 234        Ok(result)
 235    }
 236
 237    #[cfg(test)]
 238    pub fn channel(&self, id: ChannelId) -> Option<&Channel> {
 239        self.channels.get(&id)
 240    }
 241
 242    pub fn join_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
 243        if let Some(connection) = self.connections.get_mut(&connection_id) {
 244            connection.channels.insert(channel_id);
 245            self.channels
 246                .entry(channel_id)
 247                .or_default()
 248                .connection_ids
 249                .insert(connection_id);
 250        }
 251    }
 252
 253    pub fn leave_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
 254        if let Some(connection) = self.connections.get_mut(&connection_id) {
 255            connection.channels.remove(&channel_id);
 256            if let btree_map::Entry::Occupied(mut entry) = self.channels.entry(channel_id) {
 257                entry.get_mut().connection_ids.remove(&connection_id);
 258                if entry.get_mut().connection_ids.is_empty() {
 259                    entry.remove();
 260                }
 261            }
 262        }
 263    }
 264
 265    pub fn user_id_for_connection(&self, connection_id: ConnectionId) -> Result<UserId> {
 266        Ok(self
 267            .connections
 268            .get(&connection_id)
 269            .ok_or_else(|| anyhow!("unknown connection"))?
 270            .user_id)
 271    }
 272
 273    pub fn connection_ids_for_user(
 274        &self,
 275        user_id: UserId,
 276    ) -> impl Iterator<Item = ConnectionId> + '_ {
 277        self.connected_users
 278            .get(&user_id)
 279            .into_iter()
 280            .map(|state| &state.connection_ids)
 281            .flatten()
 282            .copied()
 283    }
 284
 285    pub fn is_user_online(&self, user_id: UserId) -> bool {
 286        !self
 287            .connected_users
 288            .get(&user_id)
 289            .unwrap_or(&Default::default())
 290            .connection_ids
 291            .is_empty()
 292    }
 293
 294    fn is_user_busy(&self, user_id: UserId) -> bool {
 295        self.connected_users
 296            .get(&user_id)
 297            .unwrap_or(&Default::default())
 298            .active_call
 299            .is_some()
 300    }
 301
 302    pub fn build_initial_contacts_update(
 303        &self,
 304        contacts: Vec<db::Contact>,
 305    ) -> proto::UpdateContacts {
 306        let mut update = proto::UpdateContacts::default();
 307
 308        for contact in contacts {
 309            match contact {
 310                db::Contact::Accepted {
 311                    user_id,
 312                    should_notify,
 313                } => {
 314                    update
 315                        .contacts
 316                        .push(self.contact_for_user(user_id, should_notify));
 317                }
 318                db::Contact::Outgoing { user_id } => {
 319                    update.outgoing_requests.push(user_id.to_proto())
 320                }
 321                db::Contact::Incoming {
 322                    user_id,
 323                    should_notify,
 324                } => update
 325                    .incoming_requests
 326                    .push(proto::IncomingContactRequest {
 327                        requester_id: user_id.to_proto(),
 328                        should_notify,
 329                    }),
 330            }
 331        }
 332
 333        update
 334    }
 335
 336    pub fn contact_for_user(&self, user_id: UserId, should_notify: bool) -> proto::Contact {
 337        proto::Contact {
 338            user_id: user_id.to_proto(),
 339            online: self.is_user_online(user_id),
 340            busy: self.is_user_busy(user_id),
 341            should_notify,
 342        }
 343    }
 344
 345    pub fn create_room(&mut self, creator_connection_id: ConnectionId) -> Result<RoomId> {
 346        let connection = self
 347            .connections
 348            .get_mut(&creator_connection_id)
 349            .ok_or_else(|| anyhow!("no such connection"))?;
 350        let connected_user = self
 351            .connected_users
 352            .get_mut(&connection.user_id)
 353            .ok_or_else(|| anyhow!("no such connection"))?;
 354        anyhow::ensure!(
 355            connected_user.active_call.is_none(),
 356            "can't create a room with an active call"
 357        );
 358
 359        let mut room = proto::Room::default();
 360        room.participants.push(proto::Participant {
 361            user_id: connection.user_id.to_proto(),
 362            peer_id: creator_connection_id.0,
 363            projects: Default::default(),
 364            location: Some(proto::ParticipantLocation {
 365                variant: Some(proto::participant_location::Variant::External(
 366                    proto::participant_location::External {},
 367                )),
 368            }),
 369        });
 370
 371        let room_id = post_inc(&mut self.next_room_id);
 372        self.rooms.insert(room_id, room);
 373        connected_user.active_call = Some(Call {
 374            caller_user_id: connection.user_id,
 375            room_id,
 376            connection_id: Some(creator_connection_id),
 377            initial_project_id: None,
 378        });
 379        Ok(room_id)
 380    }
 381
 382    pub fn join_room(
 383        &mut self,
 384        room_id: RoomId,
 385        connection_id: ConnectionId,
 386    ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
 387        let connection = self
 388            .connections
 389            .get_mut(&connection_id)
 390            .ok_or_else(|| anyhow!("no such connection"))?;
 391        let user_id = connection.user_id;
 392        let recipient_connection_ids = self.connection_ids_for_user(user_id).collect::<Vec<_>>();
 393
 394        let connected_user = self
 395            .connected_users
 396            .get_mut(&user_id)
 397            .ok_or_else(|| anyhow!("no such connection"))?;
 398        let active_call = connected_user
 399            .active_call
 400            .as_mut()
 401            .ok_or_else(|| anyhow!("not being called"))?;
 402        anyhow::ensure!(
 403            active_call.room_id == room_id && active_call.connection_id.is_none(),
 404            "not being called on this room"
 405        );
 406
 407        let room = self
 408            .rooms
 409            .get_mut(&room_id)
 410            .ok_or_else(|| anyhow!("no such room"))?;
 411        anyhow::ensure!(
 412            room.pending_participant_user_ids
 413                .contains(&user_id.to_proto()),
 414            anyhow!("no such room")
 415        );
 416        room.pending_participant_user_ids
 417            .retain(|pending| *pending != user_id.to_proto());
 418        room.participants.push(proto::Participant {
 419            user_id: user_id.to_proto(),
 420            peer_id: connection_id.0,
 421            projects: Default::default(),
 422            location: Some(proto::ParticipantLocation {
 423                variant: Some(proto::participant_location::Variant::External(
 424                    proto::participant_location::External {},
 425                )),
 426            }),
 427        });
 428        active_call.connection_id = Some(connection_id);
 429
 430        Ok((room, recipient_connection_ids))
 431    }
 432
 433    pub fn leave_room(&mut self, room_id: RoomId, connection_id: ConnectionId) -> Result<LeftRoom> {
 434        let connection = self
 435            .connections
 436            .get_mut(&connection_id)
 437            .ok_or_else(|| anyhow!("no such connection"))?;
 438        let user_id = connection.user_id;
 439
 440        let connected_user = self
 441            .connected_users
 442            .get(&user_id)
 443            .ok_or_else(|| anyhow!("no such connection"))?;
 444        anyhow::ensure!(
 445            connected_user
 446                .active_call
 447                .map_or(false, |call| call.room_id == room_id
 448                    && call.connection_id == Some(connection_id)),
 449            "cannot leave a room before joining it"
 450        );
 451
 452        // Given that users can only join one room at a time, we can safely unshare
 453        // and leave all projects associated with the connection.
 454        let mut unshared_projects = Vec::new();
 455        let mut left_projects = Vec::new();
 456        for project_id in connection.projects.clone() {
 457            if let Ok((_, project)) = self.unshare_project(project_id, connection_id) {
 458                unshared_projects.push(project);
 459            } else if let Ok(project) = self.leave_project(project_id, connection_id) {
 460                left_projects.push(project);
 461            }
 462        }
 463        self.connected_users.get_mut(&user_id).unwrap().active_call = None;
 464
 465        let room = self
 466            .rooms
 467            .get_mut(&room_id)
 468            .ok_or_else(|| anyhow!("no such room"))?;
 469        room.participants
 470            .retain(|participant| participant.peer_id != connection_id.0);
 471
 472        let mut canceled_call_connection_ids = Vec::new();
 473        room.pending_participant_user_ids
 474            .retain(|pending_participant_user_id| {
 475                if let Some(connected_user) = self
 476                    .connected_users
 477                    .get_mut(&UserId::from_proto(*pending_participant_user_id))
 478                {
 479                    if let Some(call) = connected_user.active_call.as_ref() {
 480                        if call.caller_user_id == user_id {
 481                            connected_user.active_call.take();
 482                            canceled_call_connection_ids
 483                                .extend(connected_user.connection_ids.iter().copied());
 484                            false
 485                        } else {
 486                            true
 487                        }
 488                    } else {
 489                        true
 490                    }
 491                } else {
 492                    true
 493                }
 494            });
 495
 496        if room.participants.is_empty() && room.pending_participant_user_ids.is_empty() {
 497            self.rooms.remove(&room_id);
 498        }
 499
 500        Ok(LeftRoom {
 501            room: self.rooms.get(&room_id),
 502            unshared_projects,
 503            left_projects,
 504            canceled_call_connection_ids,
 505        })
 506    }
 507
 508    pub fn room(&self, room_id: RoomId) -> Option<&proto::Room> {
 509        self.rooms.get(&room_id)
 510    }
 511
 512    pub fn call(
 513        &mut self,
 514        room_id: RoomId,
 515        recipient_user_id: UserId,
 516        initial_project_id: Option<ProjectId>,
 517        from_connection_id: ConnectionId,
 518    ) -> Result<(&proto::Room, Vec<ConnectionId>, proto::IncomingCall)> {
 519        let caller_user_id = self.user_id_for_connection(from_connection_id)?;
 520
 521        let recipient_connection_ids = self
 522            .connection_ids_for_user(recipient_user_id)
 523            .collect::<Vec<_>>();
 524        let mut recipient = self
 525            .connected_users
 526            .get_mut(&recipient_user_id)
 527            .ok_or_else(|| anyhow!("no such connection"))?;
 528        anyhow::ensure!(
 529            recipient.active_call.is_none(),
 530            "recipient is already on another call"
 531        );
 532
 533        let room = self
 534            .rooms
 535            .get_mut(&room_id)
 536            .ok_or_else(|| anyhow!("no such room"))?;
 537        anyhow::ensure!(
 538            room.participants
 539                .iter()
 540                .any(|participant| participant.peer_id == from_connection_id.0),
 541            "no such room"
 542        );
 543        anyhow::ensure!(
 544            room.pending_participant_user_ids
 545                .iter()
 546                .all(|user_id| UserId::from_proto(*user_id) != recipient_user_id),
 547            "cannot call the same user more than once"
 548        );
 549        room.pending_participant_user_ids
 550            .push(recipient_user_id.to_proto());
 551
 552        if let Some(initial_project_id) = initial_project_id {
 553            let project = self
 554                .projects
 555                .get(&initial_project_id)
 556                .ok_or_else(|| anyhow!("no such project"))?;
 557            anyhow::ensure!(project.room_id == room_id, "no such project");
 558        }
 559
 560        recipient.active_call = Some(Call {
 561            caller_user_id,
 562            room_id,
 563            connection_id: None,
 564            initial_project_id,
 565        });
 566
 567        Ok((
 568            room,
 569            recipient_connection_ids,
 570            proto::IncomingCall {
 571                room_id,
 572                caller_user_id: caller_user_id.to_proto(),
 573                participant_user_ids: room
 574                    .participants
 575                    .iter()
 576                    .map(|participant| participant.user_id)
 577                    .collect(),
 578                initial_project: initial_project_id
 579                    .and_then(|id| Self::build_participant_project(id, &self.projects)),
 580            },
 581        ))
 582    }
 583
 584    pub fn call_failed(&mut self, room_id: RoomId, to_user_id: UserId) -> Result<&proto::Room> {
 585        let mut recipient = self
 586            .connected_users
 587            .get_mut(&to_user_id)
 588            .ok_or_else(|| anyhow!("no such connection"))?;
 589        anyhow::ensure!(recipient
 590            .active_call
 591            .map_or(false, |call| call.room_id == room_id
 592                && call.connection_id.is_none()));
 593        recipient.active_call = None;
 594        let room = self
 595            .rooms
 596            .get_mut(&room_id)
 597            .ok_or_else(|| anyhow!("no such room"))?;
 598        room.pending_participant_user_ids
 599            .retain(|user_id| UserId::from_proto(*user_id) != to_user_id);
 600        Ok(room)
 601    }
 602
 603    pub fn cancel_call(
 604        &mut self,
 605        room_id: RoomId,
 606        recipient_user_id: UserId,
 607        canceller_connection_id: ConnectionId,
 608    ) -> Result<(&proto::Room, HashSet<ConnectionId>)> {
 609        let canceller_user_id = self.user_id_for_connection(canceller_connection_id)?;
 610        let canceller = self
 611            .connected_users
 612            .get(&canceller_user_id)
 613            .ok_or_else(|| anyhow!("no such connection"))?;
 614        let recipient = self
 615            .connected_users
 616            .get(&recipient_user_id)
 617            .ok_or_else(|| anyhow!("no such connection"))?;
 618        let canceller_active_call = canceller
 619            .active_call
 620            .as_ref()
 621            .ok_or_else(|| anyhow!("no active call"))?;
 622        let recipient_active_call = recipient
 623            .active_call
 624            .as_ref()
 625            .ok_or_else(|| anyhow!("no active call for recipient"))?;
 626
 627        anyhow::ensure!(
 628            canceller_active_call.room_id == room_id,
 629            "users are on different calls"
 630        );
 631        anyhow::ensure!(
 632            recipient_active_call.room_id == room_id,
 633            "users are on different calls"
 634        );
 635        anyhow::ensure!(
 636            recipient_active_call.connection_id.is_none(),
 637            "recipient has already answered"
 638        );
 639        let room_id = recipient_active_call.room_id;
 640        let room = self
 641            .rooms
 642            .get_mut(&room_id)
 643            .ok_or_else(|| anyhow!("no such room"))?;
 644        room.pending_participant_user_ids
 645            .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
 646
 647        let recipient = self.connected_users.get_mut(&recipient_user_id).unwrap();
 648        recipient.active_call.take();
 649
 650        Ok((room, recipient.connection_ids.clone()))
 651    }
 652
 653    pub fn decline_call(
 654        &mut self,
 655        room_id: RoomId,
 656        recipient_connection_id: ConnectionId,
 657    ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
 658        let recipient_user_id = self.user_id_for_connection(recipient_connection_id)?;
 659        let recipient = self
 660            .connected_users
 661            .get_mut(&recipient_user_id)
 662            .ok_or_else(|| anyhow!("no such connection"))?;
 663        if let Some(active_call) = recipient.active_call.take() {
 664            anyhow::ensure!(active_call.room_id == room_id, "no such room");
 665            let recipient_connection_ids = self
 666                .connection_ids_for_user(recipient_user_id)
 667                .collect::<Vec<_>>();
 668            let room = self
 669                .rooms
 670                .get_mut(&active_call.room_id)
 671                .ok_or_else(|| anyhow!("no such room"))?;
 672            room.pending_participant_user_ids
 673                .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
 674            Ok((room, recipient_connection_ids))
 675        } else {
 676            Err(anyhow!("user is not being called"))
 677        }
 678    }
 679
 680    pub fn update_participant_location(
 681        &mut self,
 682        room_id: RoomId,
 683        location: proto::ParticipantLocation,
 684        connection_id: ConnectionId,
 685    ) -> Result<&proto::Room> {
 686        let room = self
 687            .rooms
 688            .get_mut(&room_id)
 689            .ok_or_else(|| anyhow!("no such room"))?;
 690        if let Some(proto::participant_location::Variant::SharedProject(project)) =
 691            location.variant.as_ref()
 692        {
 693            anyhow::ensure!(
 694                room.participants
 695                    .iter()
 696                    .flat_map(|participant| &participant.projects)
 697                    .any(|participant_project| participant_project.id == project.id),
 698                "no such project"
 699            );
 700        }
 701
 702        let participant = room
 703            .participants
 704            .iter_mut()
 705            .find(|participant| participant.peer_id == connection_id.0)
 706            .ok_or_else(|| anyhow!("no such room"))?;
 707        participant.location = Some(location);
 708
 709        Ok(room)
 710    }
 711
 712    pub fn share_project(
 713        &mut self,
 714        room_id: RoomId,
 715        project_id: ProjectId,
 716        worktrees: Vec<proto::WorktreeMetadata>,
 717        host_connection_id: ConnectionId,
 718    ) -> Result<&proto::Room> {
 719        let connection = self
 720            .connections
 721            .get_mut(&host_connection_id)
 722            .ok_or_else(|| anyhow!("no such connection"))?;
 723
 724        let room = self
 725            .rooms
 726            .get_mut(&room_id)
 727            .ok_or_else(|| anyhow!("no such room"))?;
 728        let participant = room
 729            .participants
 730            .iter_mut()
 731            .find(|participant| participant.peer_id == host_connection_id.0)
 732            .ok_or_else(|| anyhow!("no such room"))?;
 733
 734        connection.projects.insert(project_id);
 735        self.projects.insert(
 736            project_id,
 737            Project {
 738                id: project_id,
 739                room_id,
 740                host_connection_id,
 741                host: Collaborator {
 742                    user_id: connection.user_id,
 743                    replica_id: 0,
 744                    last_activity: None,
 745                    admin: connection.admin,
 746                },
 747                guests: Default::default(),
 748                active_replica_ids: Default::default(),
 749                worktrees: worktrees
 750                    .into_iter()
 751                    .map(|worktree| {
 752                        (
 753                            worktree.id,
 754                            Worktree {
 755                                root_name: worktree.root_name,
 756                                visible: worktree.visible,
 757                                ..Default::default()
 758                            },
 759                        )
 760                    })
 761                    .collect(),
 762                language_servers: Default::default(),
 763            },
 764        );
 765
 766        participant
 767            .projects
 768            .extend(Self::build_participant_project(project_id, &self.projects));
 769
 770        Ok(room)
 771    }
 772
 773    pub fn unshare_project(
 774        &mut self,
 775        project_id: ProjectId,
 776        connection_id: ConnectionId,
 777    ) -> Result<(&proto::Room, Project)> {
 778        match self.projects.entry(project_id) {
 779            btree_map::Entry::Occupied(e) => {
 780                if e.get().host_connection_id == connection_id {
 781                    let project = e.remove();
 782
 783                    if let Some(host_connection) = self.connections.get_mut(&connection_id) {
 784                        host_connection.projects.remove(&project_id);
 785                    }
 786
 787                    for guest_connection in project.guests.keys() {
 788                        if let Some(connection) = self.connections.get_mut(guest_connection) {
 789                            connection.projects.remove(&project_id);
 790                        }
 791                    }
 792
 793                    let room = self
 794                        .rooms
 795                        .get_mut(&project.room_id)
 796                        .ok_or_else(|| anyhow!("no such room"))?;
 797                    let participant = room
 798                        .participants
 799                        .iter_mut()
 800                        .find(|participant| participant.peer_id == connection_id.0)
 801                        .ok_or_else(|| anyhow!("no such room"))?;
 802                    participant
 803                        .projects
 804                        .retain(|project| project.id != project_id.to_proto());
 805
 806                    Ok((room, project))
 807                } else {
 808                    Err(anyhow!("no such project"))?
 809                }
 810            }
 811            btree_map::Entry::Vacant(_) => Err(anyhow!("no such project"))?,
 812        }
 813    }
 814
 815    pub fn update_project(
 816        &mut self,
 817        project_id: ProjectId,
 818        worktrees: &[proto::WorktreeMetadata],
 819        connection_id: ConnectionId,
 820    ) -> Result<&proto::Room> {
 821        let project = self
 822            .projects
 823            .get_mut(&project_id)
 824            .ok_or_else(|| anyhow!("no such project"))?;
 825        if project.host_connection_id == connection_id {
 826            let mut old_worktrees = mem::take(&mut project.worktrees);
 827            for worktree in worktrees {
 828                if let Some(old_worktree) = old_worktrees.remove(&worktree.id) {
 829                    project.worktrees.insert(worktree.id, old_worktree);
 830                } else {
 831                    project.worktrees.insert(
 832                        worktree.id,
 833                        Worktree {
 834                            root_name: worktree.root_name.clone(),
 835                            visible: worktree.visible,
 836                            ..Default::default()
 837                        },
 838                    );
 839                }
 840            }
 841
 842            let room = self
 843                .rooms
 844                .get_mut(&project.room_id)
 845                .ok_or_else(|| anyhow!("no such room"))?;
 846            let participant_project = room
 847                .participants
 848                .iter_mut()
 849                .flat_map(|participant| &mut participant.projects)
 850                .find(|project| project.id == project_id.to_proto())
 851                .ok_or_else(|| anyhow!("no such project"))?;
 852            participant_project.worktree_root_names = worktrees
 853                .iter()
 854                .filter(|worktree| worktree.visible)
 855                .map(|worktree| worktree.root_name.clone())
 856                .collect();
 857
 858            Ok(room)
 859        } else {
 860            Err(anyhow!("no such project"))?
 861        }
 862    }
 863
 864    pub fn update_diagnostic_summary(
 865        &mut self,
 866        project_id: ProjectId,
 867        worktree_id: u64,
 868        connection_id: ConnectionId,
 869        summary: proto::DiagnosticSummary,
 870    ) -> Result<Vec<ConnectionId>> {
 871        let project = self
 872            .projects
 873            .get_mut(&project_id)
 874            .ok_or_else(|| anyhow!("no such project"))?;
 875        if project.host_connection_id == connection_id {
 876            let worktree = project
 877                .worktrees
 878                .get_mut(&worktree_id)
 879                .ok_or_else(|| anyhow!("no such worktree"))?;
 880            worktree
 881                .diagnostic_summaries
 882                .insert(summary.path.clone().into(), summary);
 883            return Ok(project.connection_ids());
 884        }
 885
 886        Err(anyhow!("no such worktree"))?
 887    }
 888
 889    pub fn start_language_server(
 890        &mut self,
 891        project_id: ProjectId,
 892        connection_id: ConnectionId,
 893        language_server: proto::LanguageServer,
 894    ) -> Result<Vec<ConnectionId>> {
 895        let project = self
 896            .projects
 897            .get_mut(&project_id)
 898            .ok_or_else(|| anyhow!("no such project"))?;
 899        if project.host_connection_id == connection_id {
 900            project.language_servers.push(language_server);
 901            return Ok(project.connection_ids());
 902        }
 903
 904        Err(anyhow!("no such project"))?
 905    }
 906
 907    pub fn join_project(
 908        &mut self,
 909        requester_connection_id: ConnectionId,
 910        project_id: ProjectId,
 911    ) -> Result<(&Project, ReplicaId)> {
 912        let connection = self
 913            .connections
 914            .get_mut(&requester_connection_id)
 915            .ok_or_else(|| anyhow!("no such connection"))?;
 916        let user = self
 917            .connected_users
 918            .get(&connection.user_id)
 919            .ok_or_else(|| anyhow!("no such connection"))?;
 920        let active_call = user.active_call.ok_or_else(|| anyhow!("no such project"))?;
 921        anyhow::ensure!(
 922            active_call.connection_id == Some(requester_connection_id),
 923            "no such project"
 924        );
 925
 926        let project = self
 927            .projects
 928            .get_mut(&project_id)
 929            .ok_or_else(|| anyhow!("no such project"))?;
 930        anyhow::ensure!(project.room_id == active_call.room_id, "no such project");
 931
 932        connection.projects.insert(project_id);
 933        let mut replica_id = 1;
 934        while project.active_replica_ids.contains(&replica_id) {
 935            replica_id += 1;
 936        }
 937        project.active_replica_ids.insert(replica_id);
 938        project.guests.insert(
 939            requester_connection_id,
 940            Collaborator {
 941                replica_id,
 942                user_id: connection.user_id,
 943                last_activity: Some(OffsetDateTime::now_utc()),
 944                admin: connection.admin,
 945            },
 946        );
 947
 948        project.host.last_activity = Some(OffsetDateTime::now_utc());
 949        Ok((project, replica_id))
 950    }
 951
 952    pub fn leave_project(
 953        &mut self,
 954        project_id: ProjectId,
 955        connection_id: ConnectionId,
 956    ) -> Result<LeftProject> {
 957        let project = self
 958            .projects
 959            .get_mut(&project_id)
 960            .ok_or_else(|| anyhow!("no such project"))?;
 961
 962        // If the connection leaving the project is a collaborator, remove it.
 963        let remove_collaborator = if let Some(guest) = project.guests.remove(&connection_id) {
 964            project.active_replica_ids.remove(&guest.replica_id);
 965            true
 966        } else {
 967            false
 968        };
 969
 970        if let Some(connection) = self.connections.get_mut(&connection_id) {
 971            connection.projects.remove(&project_id);
 972        }
 973
 974        Ok(LeftProject {
 975            id: project.id,
 976            host_connection_id: project.host_connection_id,
 977            host_user_id: project.host.user_id,
 978            connection_ids: project.connection_ids(),
 979            remove_collaborator,
 980        })
 981    }
 982
 983    #[allow(clippy::too_many_arguments)]
 984    pub fn update_worktree(
 985        &mut self,
 986        connection_id: ConnectionId,
 987        project_id: ProjectId,
 988        worktree_id: u64,
 989        worktree_root_name: &str,
 990        removed_entries: &[u64],
 991        updated_entries: &[proto::Entry],
 992        scan_id: u64,
 993        is_last_update: bool,
 994    ) -> Result<Vec<ConnectionId>> {
 995        let project = self.write_project(project_id, connection_id)?;
 996
 997        let connection_ids = project.connection_ids();
 998        let mut worktree = project.worktrees.entry(worktree_id).or_default();
 999        worktree.root_name = worktree_root_name.to_string();
1000
1001        for entry_id in removed_entries {
1002            worktree.entries.remove(entry_id);
1003        }
1004
1005        for entry in updated_entries {
1006            worktree.entries.insert(entry.id, entry.clone());
1007        }
1008
1009        worktree.scan_id = scan_id;
1010        worktree.is_complete = is_last_update;
1011        Ok(connection_ids)
1012    }
1013
1014    fn build_participant_project(
1015        project_id: ProjectId,
1016        projects: &BTreeMap<ProjectId, Project>,
1017    ) -> Option<proto::ParticipantProject> {
1018        Some(proto::ParticipantProject {
1019            id: project_id.to_proto(),
1020            worktree_root_names: projects
1021                .get(&project_id)?
1022                .worktrees
1023                .values()
1024                .filter(|worktree| worktree.visible)
1025                .map(|worktree| worktree.root_name.clone())
1026                .collect(),
1027        })
1028    }
1029
1030    pub fn project_connection_ids(
1031        &self,
1032        project_id: ProjectId,
1033        acting_connection_id: ConnectionId,
1034    ) -> Result<Vec<ConnectionId>> {
1035        Ok(self
1036            .read_project(project_id, acting_connection_id)?
1037            .connection_ids())
1038    }
1039
1040    pub fn channel_connection_ids(&self, channel_id: ChannelId) -> Result<Vec<ConnectionId>> {
1041        Ok(self
1042            .channels
1043            .get(&channel_id)
1044            .ok_or_else(|| anyhow!("no such channel"))?
1045            .connection_ids())
1046    }
1047
1048    pub fn project(&self, project_id: ProjectId) -> Result<&Project> {
1049        self.projects
1050            .get(&project_id)
1051            .ok_or_else(|| anyhow!("no such project"))
1052    }
1053
1054    pub fn register_project_activity(
1055        &mut self,
1056        project_id: ProjectId,
1057        connection_id: ConnectionId,
1058    ) -> Result<()> {
1059        let project = self
1060            .projects
1061            .get_mut(&project_id)
1062            .ok_or_else(|| anyhow!("no such project"))?;
1063        let collaborator = if connection_id == project.host_connection_id {
1064            &mut project.host
1065        } else if let Some(guest) = project.guests.get_mut(&connection_id) {
1066            guest
1067        } else {
1068            return Err(anyhow!("no such project"))?;
1069        };
1070        collaborator.last_activity = Some(OffsetDateTime::now_utc());
1071        Ok(())
1072    }
1073
1074    pub fn projects(&self) -> impl Iterator<Item = (&ProjectId, &Project)> {
1075        self.projects.iter()
1076    }
1077
1078    pub fn read_project(
1079        &self,
1080        project_id: ProjectId,
1081        connection_id: ConnectionId,
1082    ) -> Result<&Project> {
1083        let project = self
1084            .projects
1085            .get(&project_id)
1086            .ok_or_else(|| anyhow!("no such project"))?;
1087        if project.host_connection_id == connection_id
1088            || project.guests.contains_key(&connection_id)
1089        {
1090            Ok(project)
1091        } else {
1092            Err(anyhow!("no such project"))?
1093        }
1094    }
1095
1096    fn write_project(
1097        &mut self,
1098        project_id: ProjectId,
1099        connection_id: ConnectionId,
1100    ) -> Result<&mut Project> {
1101        let project = self
1102            .projects
1103            .get_mut(&project_id)
1104            .ok_or_else(|| anyhow!("no such project"))?;
1105        if project.host_connection_id == connection_id
1106            || project.guests.contains_key(&connection_id)
1107        {
1108            Ok(project)
1109        } else {
1110            Err(anyhow!("no such project"))?
1111        }
1112    }
1113
1114    #[cfg(test)]
1115    pub fn check_invariants(&self) {
1116        for (connection_id, connection) in &self.connections {
1117            for project_id in &connection.projects {
1118                let project = &self.projects.get(project_id).unwrap();
1119                if project.host_connection_id != *connection_id {
1120                    assert!(project.guests.contains_key(connection_id));
1121                }
1122
1123                for (worktree_id, worktree) in project.worktrees.iter() {
1124                    let mut paths = HashMap::default();
1125                    for entry in worktree.entries.values() {
1126                        let prev_entry = paths.insert(&entry.path, entry);
1127                        assert_eq!(
1128                            prev_entry,
1129                            None,
1130                            "worktree {:?}, duplicate path for entries {:?} and {:?}",
1131                            worktree_id,
1132                            prev_entry.unwrap(),
1133                            entry
1134                        );
1135                    }
1136                }
1137            }
1138            for channel_id in &connection.channels {
1139                let channel = self.channels.get(channel_id).unwrap();
1140                assert!(channel.connection_ids.contains(connection_id));
1141            }
1142            assert!(self
1143                .connected_users
1144                .get(&connection.user_id)
1145                .unwrap()
1146                .connection_ids
1147                .contains(connection_id));
1148        }
1149
1150        for (user_id, state) in &self.connected_users {
1151            for connection_id in &state.connection_ids {
1152                assert_eq!(
1153                    self.connections.get(connection_id).unwrap().user_id,
1154                    *user_id
1155                );
1156            }
1157
1158            if let Some(active_call) = state.active_call.as_ref() {
1159                if let Some(active_call_connection_id) = active_call.connection_id {
1160                    assert!(
1161                        state.connection_ids.contains(&active_call_connection_id),
1162                        "call is active on a dead connection"
1163                    );
1164                    assert!(
1165                        state.connection_ids.contains(&active_call_connection_id),
1166                        "call is active on a dead connection"
1167                    );
1168                }
1169            }
1170        }
1171
1172        for (room_id, room) in &self.rooms {
1173            for pending_user_id in &room.pending_participant_user_ids {
1174                assert!(
1175                    self.connected_users
1176                        .contains_key(&UserId::from_proto(*pending_user_id)),
1177                    "call is active on a user that has disconnected"
1178                );
1179            }
1180
1181            for participant in &room.participants {
1182                assert!(
1183                    self.connections
1184                        .contains_key(&ConnectionId(participant.peer_id)),
1185                    "room contains participant that has disconnected"
1186                );
1187
1188                for participant_project in &participant.projects {
1189                    let project = &self.projects[&ProjectId::from_proto(participant_project.id)];
1190                    assert_eq!(
1191                        project.room_id, *room_id,
1192                        "project was shared on a different room"
1193                    );
1194                }
1195            }
1196
1197            assert!(
1198                !room.pending_participant_user_ids.is_empty() || !room.participants.is_empty(),
1199                "room can't be empty"
1200            );
1201        }
1202
1203        for (project_id, project) in &self.projects {
1204            let host_connection = self.connections.get(&project.host_connection_id).unwrap();
1205            assert!(host_connection.projects.contains(project_id));
1206
1207            for guest_connection_id in project.guests.keys() {
1208                let guest_connection = self.connections.get(guest_connection_id).unwrap();
1209                assert!(guest_connection.projects.contains(project_id));
1210            }
1211            assert_eq!(project.active_replica_ids.len(), project.guests.len());
1212            assert_eq!(
1213                project.active_replica_ids,
1214                project
1215                    .guests
1216                    .values()
1217                    .map(|guest| guest.replica_id)
1218                    .collect::<HashSet<_>>(),
1219            );
1220
1221            let room = &self.rooms[&project.room_id];
1222            let room_participant = room
1223                .participants
1224                .iter()
1225                .find(|participant| participant.peer_id == project.host_connection_id.0)
1226                .unwrap();
1227            assert!(
1228                room_participant
1229                    .projects
1230                    .iter()
1231                    .any(|project| project.id == project_id.to_proto()),
1232                "project was not shared in room"
1233            );
1234        }
1235
1236        for (channel_id, channel) in &self.channels {
1237            for connection_id in &channel.connection_ids {
1238                let connection = self.connections.get(connection_id).unwrap();
1239                assert!(connection.channels.contains(channel_id));
1240            }
1241        }
1242    }
1243}
1244
1245impl Project {
1246    fn is_active_since(&self, start_time: OffsetDateTime) -> bool {
1247        self.guests
1248            .values()
1249            .chain([&self.host])
1250            .any(|collaborator| {
1251                collaborator
1252                    .last_activity
1253                    .map_or(false, |active_time| active_time > start_time)
1254            })
1255    }
1256
1257    pub fn guest_connection_ids(&self) -> Vec<ConnectionId> {
1258        self.guests.keys().copied().collect()
1259    }
1260
1261    pub fn connection_ids(&self) -> Vec<ConnectionId> {
1262        self.guests
1263            .keys()
1264            .copied()
1265            .chain(Some(self.host_connection_id))
1266            .collect()
1267    }
1268}
1269
1270impl Channel {
1271    fn connection_ids(&self) -> Vec<ConnectionId> {
1272        self.connection_ids.iter().copied().collect()
1273    }
1274}