store.rs

   1use crate::db::{self, ChannelId, ProjectId, UserId};
   2use anyhow::{anyhow, Result};
   3use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet};
   4use rpc::{proto, ConnectionId};
   5use serde::Serialize;
   6use std::{mem, path::PathBuf, str, time::Duration};
   7use time::OffsetDateTime;
   8use tracing::instrument;
   9use util::post_inc;
  10
  11pub type RoomId = u64;
  12
  13#[derive(Default, Serialize)]
  14pub struct Store {
  15    connections: BTreeMap<ConnectionId, ConnectionState>,
  16    connected_users: BTreeMap<UserId, ConnectedUser>,
  17    next_room_id: RoomId,
  18    rooms: BTreeMap<RoomId, proto::Room>,
  19    projects: BTreeMap<ProjectId, Project>,
  20    #[serde(skip)]
  21    channels: BTreeMap<ChannelId, Channel>,
  22}
  23
  24#[derive(Default, Serialize)]
  25struct ConnectedUser {
  26    connection_ids: HashSet<ConnectionId>,
  27    active_call: Option<Call>,
  28}
  29
  30#[derive(Serialize)]
  31struct ConnectionState {
  32    user_id: UserId,
  33    admin: bool,
  34    projects: BTreeSet<ProjectId>,
  35    channels: HashSet<ChannelId>,
  36}
  37
  38#[derive(Copy, Clone, Eq, PartialEq, Serialize)]
  39pub struct Call {
  40    pub caller_user_id: UserId,
  41    pub room_id: RoomId,
  42    pub connection_id: Option<ConnectionId>,
  43    pub initial_project_id: Option<ProjectId>,
  44}
  45
  46#[derive(Serialize)]
  47pub struct Project {
  48    pub id: ProjectId,
  49    pub room_id: RoomId,
  50    pub host_connection_id: ConnectionId,
  51    pub host: Collaborator,
  52    pub guests: HashMap<ConnectionId, Collaborator>,
  53    pub active_replica_ids: HashSet<ReplicaId>,
  54    pub worktrees: BTreeMap<u64, Worktree>,
  55    pub language_servers: Vec<proto::LanguageServer>,
  56}
  57
  58#[derive(Serialize)]
  59pub struct Collaborator {
  60    pub replica_id: ReplicaId,
  61    pub user_id: UserId,
  62    #[serde(skip)]
  63    pub last_activity: Option<OffsetDateTime>,
  64    pub admin: bool,
  65}
  66
  67#[derive(Default, Serialize)]
  68pub struct Worktree {
  69    pub root_name: String,
  70    pub visible: bool,
  71    #[serde(skip)]
  72    pub entries: BTreeMap<u64, proto::Entry>,
  73    #[serde(skip)]
  74    pub diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
  75    pub scan_id: u64,
  76    pub is_complete: bool,
  77}
  78
  79#[derive(Default)]
  80pub struct Channel {
  81    pub connection_ids: HashSet<ConnectionId>,
  82}
  83
  84pub type ReplicaId = u16;
  85
  86#[derive(Default)]
  87pub struct RemovedConnectionState {
  88    pub user_id: UserId,
  89    pub hosted_projects: Vec<Project>,
  90    pub guest_projects: Vec<LeftProject>,
  91    pub contact_ids: HashSet<UserId>,
  92    pub room_id: Option<RoomId>,
  93    pub canceled_call_connection_ids: Vec<ConnectionId>,
  94}
  95
  96pub struct LeftProject {
  97    pub id: ProjectId,
  98    pub host_user_id: UserId,
  99    pub host_connection_id: ConnectionId,
 100    pub connection_ids: Vec<ConnectionId>,
 101    pub remove_collaborator: bool,
 102}
 103
 104pub struct LeftRoom<'a> {
 105    pub room: Option<&'a proto::Room>,
 106    pub unshared_projects: Vec<Project>,
 107    pub left_projects: Vec<LeftProject>,
 108    pub canceled_call_connection_ids: Vec<ConnectionId>,
 109}
 110
 111#[derive(Copy, Clone)]
 112pub struct Metrics {
 113    pub connections: usize,
 114    pub registered_projects: usize,
 115    pub active_projects: usize,
 116    pub shared_projects: usize,
 117}
 118
 119impl Store {
 120    pub fn metrics(&self) -> Metrics {
 121        const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60);
 122        let active_window_start = OffsetDateTime::now_utc() - ACTIVE_PROJECT_TIMEOUT;
 123
 124        let connections = self.connections.values().filter(|c| !c.admin).count();
 125        let mut registered_projects = 0;
 126        let mut active_projects = 0;
 127        let mut shared_projects = 0;
 128        for project in self.projects.values() {
 129            if let Some(connection) = self.connections.get(&project.host_connection_id) {
 130                if !connection.admin {
 131                    registered_projects += 1;
 132                    if project.is_active_since(active_window_start) {
 133                        active_projects += 1;
 134                        if !project.guests.is_empty() {
 135                            shared_projects += 1;
 136                        }
 137                    }
 138                }
 139            }
 140        }
 141
 142        Metrics {
 143            connections,
 144            registered_projects,
 145            active_projects,
 146            shared_projects,
 147        }
 148    }
 149
 150    #[instrument(skip(self))]
 151    pub fn add_connection(
 152        &mut self,
 153        connection_id: ConnectionId,
 154        user_id: UserId,
 155        admin: bool,
 156    ) -> Option<proto::IncomingCall> {
 157        self.connections.insert(
 158            connection_id,
 159            ConnectionState {
 160                user_id,
 161                admin,
 162                projects: Default::default(),
 163                channels: Default::default(),
 164            },
 165        );
 166        let connected_user = self.connected_users.entry(user_id).or_default();
 167        connected_user.connection_ids.insert(connection_id);
 168        if let Some(active_call) = connected_user.active_call {
 169            if active_call.connection_id.is_some() {
 170                None
 171            } else {
 172                let room = self.room(active_call.room_id)?;
 173                Some(proto::IncomingCall {
 174                    room_id: active_call.room_id,
 175                    caller_user_id: active_call.caller_user_id.to_proto(),
 176                    participant_user_ids: room
 177                        .participants
 178                        .iter()
 179                        .map(|participant| participant.user_id)
 180                        .collect(),
 181                    initial_project: active_call
 182                        .initial_project_id
 183                        .and_then(|id| Self::build_participant_project(id, &self.projects)),
 184                })
 185            }
 186        } else {
 187            None
 188        }
 189    }
 190
 191    #[instrument(skip(self))]
 192    pub fn remove_connection(
 193        &mut self,
 194        connection_id: ConnectionId,
 195    ) -> Result<RemovedConnectionState> {
 196        let connection = self
 197            .connections
 198            .get_mut(&connection_id)
 199            .ok_or_else(|| anyhow!("no such connection"))?;
 200
 201        let user_id = connection.user_id;
 202        let connection_channels = mem::take(&mut connection.channels);
 203
 204        let mut result = RemovedConnectionState {
 205            user_id,
 206            ..Default::default()
 207        };
 208
 209        // Leave all channels.
 210        for channel_id in connection_channels {
 211            self.leave_channel(connection_id, channel_id);
 212        }
 213
 214        let connected_user = self.connected_users.get(&user_id).unwrap();
 215        if let Some(active_call) = connected_user.active_call.as_ref() {
 216            let room_id = active_call.room_id;
 217            let left_room = self.leave_room(room_id, connection_id)?;
 218            result.hosted_projects = left_room.unshared_projects;
 219            result.guest_projects = left_room.left_projects;
 220            result.room_id = Some(room_id);
 221            result.canceled_call_connection_ids = left_room.canceled_call_connection_ids;
 222        }
 223
 224        let connected_user = self.connected_users.get_mut(&user_id).unwrap();
 225        connected_user.connection_ids.remove(&connection_id);
 226        if connected_user.connection_ids.is_empty() {
 227            self.connected_users.remove(&user_id);
 228        }
 229        self.connections.remove(&connection_id).unwrap();
 230
 231        Ok(result)
 232    }
 233
 234    #[cfg(test)]
 235    pub fn channel(&self, id: ChannelId) -> Option<&Channel> {
 236        self.channels.get(&id)
 237    }
 238
 239    pub fn join_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
 240        if let Some(connection) = self.connections.get_mut(&connection_id) {
 241            connection.channels.insert(channel_id);
 242            self.channels
 243                .entry(channel_id)
 244                .or_default()
 245                .connection_ids
 246                .insert(connection_id);
 247        }
 248    }
 249
 250    pub fn leave_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
 251        if let Some(connection) = self.connections.get_mut(&connection_id) {
 252            connection.channels.remove(&channel_id);
 253            if let btree_map::Entry::Occupied(mut entry) = self.channels.entry(channel_id) {
 254                entry.get_mut().connection_ids.remove(&connection_id);
 255                if entry.get_mut().connection_ids.is_empty() {
 256                    entry.remove();
 257                }
 258            }
 259        }
 260    }
 261
 262    pub fn user_id_for_connection(&self, connection_id: ConnectionId) -> Result<UserId> {
 263        Ok(self
 264            .connections
 265            .get(&connection_id)
 266            .ok_or_else(|| anyhow!("unknown connection"))?
 267            .user_id)
 268    }
 269
 270    pub fn connection_ids_for_user(
 271        &self,
 272        user_id: UserId,
 273    ) -> impl Iterator<Item = ConnectionId> + '_ {
 274        self.connected_users
 275            .get(&user_id)
 276            .into_iter()
 277            .map(|state| &state.connection_ids)
 278            .flatten()
 279            .copied()
 280    }
 281
 282    pub fn is_user_online(&self, user_id: UserId) -> bool {
 283        !self
 284            .connected_users
 285            .get(&user_id)
 286            .unwrap_or(&Default::default())
 287            .connection_ids
 288            .is_empty()
 289    }
 290
 291    fn is_user_busy(&self, user_id: UserId) -> bool {
 292        self.connected_users
 293            .get(&user_id)
 294            .unwrap_or(&Default::default())
 295            .active_call
 296            .is_some()
 297    }
 298
 299    pub fn build_initial_contacts_update(
 300        &self,
 301        contacts: Vec<db::Contact>,
 302    ) -> proto::UpdateContacts {
 303        let mut update = proto::UpdateContacts::default();
 304
 305        for contact in contacts {
 306            match contact {
 307                db::Contact::Accepted {
 308                    user_id,
 309                    should_notify,
 310                } => {
 311                    update
 312                        .contacts
 313                        .push(self.contact_for_user(user_id, should_notify));
 314                }
 315                db::Contact::Outgoing { user_id } => {
 316                    update.outgoing_requests.push(user_id.to_proto())
 317                }
 318                db::Contact::Incoming {
 319                    user_id,
 320                    should_notify,
 321                } => update
 322                    .incoming_requests
 323                    .push(proto::IncomingContactRequest {
 324                        requester_id: user_id.to_proto(),
 325                        should_notify,
 326                    }),
 327            }
 328        }
 329
 330        update
 331    }
 332
 333    pub fn contact_for_user(&self, user_id: UserId, should_notify: bool) -> proto::Contact {
 334        proto::Contact {
 335            user_id: user_id.to_proto(),
 336            online: self.is_user_online(user_id),
 337            busy: self.is_user_busy(user_id),
 338            should_notify,
 339        }
 340    }
 341
 342    pub fn create_room(&mut self, creator_connection_id: ConnectionId) -> Result<RoomId> {
 343        let connection = self
 344            .connections
 345            .get_mut(&creator_connection_id)
 346            .ok_or_else(|| anyhow!("no such connection"))?;
 347        let connected_user = self
 348            .connected_users
 349            .get_mut(&connection.user_id)
 350            .ok_or_else(|| anyhow!("no such connection"))?;
 351        anyhow::ensure!(
 352            connected_user.active_call.is_none(),
 353            "can't create a room with an active call"
 354        );
 355
 356        let mut room = proto::Room::default();
 357        room.participants.push(proto::Participant {
 358            user_id: connection.user_id.to_proto(),
 359            peer_id: creator_connection_id.0,
 360            projects: Default::default(),
 361            location: Some(proto::ParticipantLocation {
 362                variant: Some(proto::participant_location::Variant::External(
 363                    proto::participant_location::External {},
 364                )),
 365            }),
 366        });
 367
 368        let room_id = post_inc(&mut self.next_room_id);
 369        self.rooms.insert(room_id, room);
 370        connected_user.active_call = Some(Call {
 371            caller_user_id: connection.user_id,
 372            room_id,
 373            connection_id: Some(creator_connection_id),
 374            initial_project_id: None,
 375        });
 376        Ok(room_id)
 377    }
 378
 379    pub fn join_room(
 380        &mut self,
 381        room_id: RoomId,
 382        connection_id: ConnectionId,
 383    ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
 384        let connection = self
 385            .connections
 386            .get_mut(&connection_id)
 387            .ok_or_else(|| anyhow!("no such connection"))?;
 388        let user_id = connection.user_id;
 389        let recipient_connection_ids = self.connection_ids_for_user(user_id).collect::<Vec<_>>();
 390
 391        let connected_user = self
 392            .connected_users
 393            .get_mut(&user_id)
 394            .ok_or_else(|| anyhow!("no such connection"))?;
 395        let active_call = connected_user
 396            .active_call
 397            .as_mut()
 398            .ok_or_else(|| anyhow!("not being called"))?;
 399        anyhow::ensure!(
 400            active_call.room_id == room_id && active_call.connection_id.is_none(),
 401            "not being called on this room"
 402        );
 403
 404        let room = self
 405            .rooms
 406            .get_mut(&room_id)
 407            .ok_or_else(|| anyhow!("no such room"))?;
 408        anyhow::ensure!(
 409            room.pending_participant_user_ids
 410                .contains(&user_id.to_proto()),
 411            anyhow!("no such room")
 412        );
 413        room.pending_participant_user_ids
 414            .retain(|pending| *pending != user_id.to_proto());
 415        room.participants.push(proto::Participant {
 416            user_id: user_id.to_proto(),
 417            peer_id: connection_id.0,
 418            projects: Default::default(),
 419            location: Some(proto::ParticipantLocation {
 420                variant: Some(proto::participant_location::Variant::External(
 421                    proto::participant_location::External {},
 422                )),
 423            }),
 424        });
 425        active_call.connection_id = Some(connection_id);
 426
 427        Ok((room, recipient_connection_ids))
 428    }
 429
 430    pub fn leave_room(&mut self, room_id: RoomId, connection_id: ConnectionId) -> Result<LeftRoom> {
 431        let connection = self
 432            .connections
 433            .get_mut(&connection_id)
 434            .ok_or_else(|| anyhow!("no such connection"))?;
 435        let user_id = connection.user_id;
 436
 437        let connected_user = self
 438            .connected_users
 439            .get(&user_id)
 440            .ok_or_else(|| anyhow!("no such connection"))?;
 441        anyhow::ensure!(
 442            connected_user
 443                .active_call
 444                .map_or(false, |call| call.room_id == room_id
 445                    && call.connection_id == Some(connection_id)),
 446            "cannot leave a room before joining it"
 447        );
 448
 449        // Given that users can only join one room at a time, we can safely unshare
 450        // and leave all projects associated with the connection.
 451        let mut unshared_projects = Vec::new();
 452        let mut left_projects = Vec::new();
 453        for project_id in connection.projects.clone() {
 454            if let Ok((_, project)) = self.unshare_project(project_id, connection_id) {
 455                unshared_projects.push(project);
 456            } else if let Ok(project) = self.leave_project(project_id, connection_id) {
 457                left_projects.push(project);
 458            }
 459        }
 460        self.connected_users.get_mut(&user_id).unwrap().active_call = None;
 461
 462        let room = self
 463            .rooms
 464            .get_mut(&room_id)
 465            .ok_or_else(|| anyhow!("no such room"))?;
 466        room.participants
 467            .retain(|participant| participant.peer_id != connection_id.0);
 468
 469        let mut canceled_call_connection_ids = Vec::new();
 470        room.pending_participant_user_ids
 471            .retain(|pending_participant_user_id| {
 472                if let Some(connected_user) = self
 473                    .connected_users
 474                    .get_mut(&UserId::from_proto(*pending_participant_user_id))
 475                {
 476                    if let Some(call) = connected_user.active_call.as_ref() {
 477                        if call.caller_user_id == user_id {
 478                            connected_user.active_call.take();
 479                            canceled_call_connection_ids
 480                                .extend(connected_user.connection_ids.iter().copied());
 481                            false
 482                        } else {
 483                            true
 484                        }
 485                    } else {
 486                        true
 487                    }
 488                } else {
 489                    true
 490                }
 491            });
 492
 493        if room.participants.is_empty() && room.pending_participant_user_ids.is_empty() {
 494            self.rooms.remove(&room_id);
 495        }
 496
 497        Ok(LeftRoom {
 498            room: self.rooms.get(&room_id),
 499            unshared_projects,
 500            left_projects,
 501            canceled_call_connection_ids,
 502        })
 503    }
 504
 505    pub fn room(&self, room_id: RoomId) -> Option<&proto::Room> {
 506        self.rooms.get(&room_id)
 507    }
 508
 509    pub fn call(
 510        &mut self,
 511        room_id: RoomId,
 512        recipient_user_id: UserId,
 513        initial_project_id: Option<ProjectId>,
 514        from_connection_id: ConnectionId,
 515    ) -> Result<(&proto::Room, Vec<ConnectionId>, proto::IncomingCall)> {
 516        let caller_user_id = self.user_id_for_connection(from_connection_id)?;
 517
 518        let recipient_connection_ids = self
 519            .connection_ids_for_user(recipient_user_id)
 520            .collect::<Vec<_>>();
 521        let mut recipient = self
 522            .connected_users
 523            .get_mut(&recipient_user_id)
 524            .ok_or_else(|| anyhow!("no such connection"))?;
 525        anyhow::ensure!(
 526            recipient.active_call.is_none(),
 527            "recipient is already on another call"
 528        );
 529
 530        let room = self
 531            .rooms
 532            .get_mut(&room_id)
 533            .ok_or_else(|| anyhow!("no such room"))?;
 534        anyhow::ensure!(
 535            room.participants
 536                .iter()
 537                .any(|participant| participant.peer_id == from_connection_id.0),
 538            "no such room"
 539        );
 540        anyhow::ensure!(
 541            room.pending_participant_user_ids
 542                .iter()
 543                .all(|user_id| UserId::from_proto(*user_id) != recipient_user_id),
 544            "cannot call the same user more than once"
 545        );
 546        room.pending_participant_user_ids
 547            .push(recipient_user_id.to_proto());
 548
 549        if let Some(initial_project_id) = initial_project_id {
 550            let project = self
 551                .projects
 552                .get(&initial_project_id)
 553                .ok_or_else(|| anyhow!("no such project"))?;
 554            anyhow::ensure!(project.room_id == room_id, "no such project");
 555        }
 556
 557        recipient.active_call = Some(Call {
 558            caller_user_id,
 559            room_id,
 560            connection_id: None,
 561            initial_project_id,
 562        });
 563
 564        Ok((
 565            room,
 566            recipient_connection_ids,
 567            proto::IncomingCall {
 568                room_id,
 569                caller_user_id: caller_user_id.to_proto(),
 570                participant_user_ids: room
 571                    .participants
 572                    .iter()
 573                    .map(|participant| participant.user_id)
 574                    .collect(),
 575                initial_project: initial_project_id
 576                    .and_then(|id| Self::build_participant_project(id, &self.projects)),
 577            },
 578        ))
 579    }
 580
 581    pub fn call_failed(&mut self, room_id: RoomId, to_user_id: UserId) -> Result<&proto::Room> {
 582        let mut recipient = self
 583            .connected_users
 584            .get_mut(&to_user_id)
 585            .ok_or_else(|| anyhow!("no such connection"))?;
 586        anyhow::ensure!(recipient
 587            .active_call
 588            .map_or(false, |call| call.room_id == room_id
 589                && call.connection_id.is_none()));
 590        recipient.active_call = None;
 591        let room = self
 592            .rooms
 593            .get_mut(&room_id)
 594            .ok_or_else(|| anyhow!("no such room"))?;
 595        room.pending_participant_user_ids
 596            .retain(|user_id| UserId::from_proto(*user_id) != to_user_id);
 597        Ok(room)
 598    }
 599
 600    pub fn cancel_call(
 601        &mut self,
 602        room_id: RoomId,
 603        recipient_user_id: UserId,
 604        canceller_connection_id: ConnectionId,
 605    ) -> Result<(&proto::Room, HashSet<ConnectionId>)> {
 606        let canceller_user_id = self.user_id_for_connection(canceller_connection_id)?;
 607        let canceller = self
 608            .connected_users
 609            .get(&canceller_user_id)
 610            .ok_or_else(|| anyhow!("no such connection"))?;
 611        let recipient = self
 612            .connected_users
 613            .get(&recipient_user_id)
 614            .ok_or_else(|| anyhow!("no such connection"))?;
 615        let canceller_active_call = canceller
 616            .active_call
 617            .as_ref()
 618            .ok_or_else(|| anyhow!("no active call"))?;
 619        let recipient_active_call = recipient
 620            .active_call
 621            .as_ref()
 622            .ok_or_else(|| anyhow!("no active call for recipient"))?;
 623
 624        anyhow::ensure!(
 625            canceller_active_call.room_id == room_id,
 626            "users are on different calls"
 627        );
 628        anyhow::ensure!(
 629            recipient_active_call.room_id == room_id,
 630            "users are on different calls"
 631        );
 632        anyhow::ensure!(
 633            recipient_active_call.connection_id.is_none(),
 634            "recipient has already answered"
 635        );
 636        let room_id = recipient_active_call.room_id;
 637        let room = self
 638            .rooms
 639            .get_mut(&room_id)
 640            .ok_or_else(|| anyhow!("no such room"))?;
 641        room.pending_participant_user_ids
 642            .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
 643
 644        let recipient = self.connected_users.get_mut(&recipient_user_id).unwrap();
 645        recipient.active_call.take();
 646
 647        Ok((room, recipient.connection_ids.clone()))
 648    }
 649
 650    pub fn decline_call(
 651        &mut self,
 652        room_id: RoomId,
 653        recipient_connection_id: ConnectionId,
 654    ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
 655        let recipient_user_id = self.user_id_for_connection(recipient_connection_id)?;
 656        let recipient = self
 657            .connected_users
 658            .get_mut(&recipient_user_id)
 659            .ok_or_else(|| anyhow!("no such connection"))?;
 660        if let Some(active_call) = recipient.active_call.take() {
 661            anyhow::ensure!(active_call.room_id == room_id, "no such room");
 662            let recipient_connection_ids = self
 663                .connection_ids_for_user(recipient_user_id)
 664                .collect::<Vec<_>>();
 665            let room = self
 666                .rooms
 667                .get_mut(&active_call.room_id)
 668                .ok_or_else(|| anyhow!("no such room"))?;
 669            room.pending_participant_user_ids
 670                .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
 671            Ok((room, recipient_connection_ids))
 672        } else {
 673            Err(anyhow!("user is not being called"))
 674        }
 675    }
 676
 677    pub fn update_participant_location(
 678        &mut self,
 679        room_id: RoomId,
 680        location: proto::ParticipantLocation,
 681        connection_id: ConnectionId,
 682    ) -> Result<&proto::Room> {
 683        let room = self
 684            .rooms
 685            .get_mut(&room_id)
 686            .ok_or_else(|| anyhow!("no such room"))?;
 687        if let Some(proto::participant_location::Variant::SharedProject(project)) =
 688            location.variant.as_ref()
 689        {
 690            anyhow::ensure!(
 691                room.participants
 692                    .iter()
 693                    .flat_map(|participant| &participant.projects)
 694                    .any(|participant_project| participant_project.id == project.id),
 695                "no such project"
 696            );
 697        }
 698
 699        let participant = room
 700            .participants
 701            .iter_mut()
 702            .find(|participant| participant.peer_id == connection_id.0)
 703            .ok_or_else(|| anyhow!("no such room"))?;
 704        participant.location = Some(location);
 705
 706        Ok(room)
 707    }
 708
 709    pub fn share_project(
 710        &mut self,
 711        room_id: RoomId,
 712        project_id: ProjectId,
 713        worktrees: Vec<proto::WorktreeMetadata>,
 714        host_connection_id: ConnectionId,
 715    ) -> Result<&proto::Room> {
 716        let connection = self
 717            .connections
 718            .get_mut(&host_connection_id)
 719            .ok_or_else(|| anyhow!("no such connection"))?;
 720
 721        let room = self
 722            .rooms
 723            .get_mut(&room_id)
 724            .ok_or_else(|| anyhow!("no such room"))?;
 725        let participant = room
 726            .participants
 727            .iter_mut()
 728            .find(|participant| participant.peer_id == host_connection_id.0)
 729            .ok_or_else(|| anyhow!("no such room"))?;
 730
 731        connection.projects.insert(project_id);
 732        self.projects.insert(
 733            project_id,
 734            Project {
 735                id: project_id,
 736                room_id,
 737                host_connection_id,
 738                host: Collaborator {
 739                    user_id: connection.user_id,
 740                    replica_id: 0,
 741                    last_activity: None,
 742                    admin: connection.admin,
 743                },
 744                guests: Default::default(),
 745                active_replica_ids: Default::default(),
 746                worktrees: worktrees
 747                    .into_iter()
 748                    .map(|worktree| {
 749                        (
 750                            worktree.id,
 751                            Worktree {
 752                                root_name: worktree.root_name,
 753                                visible: worktree.visible,
 754                                ..Default::default()
 755                            },
 756                        )
 757                    })
 758                    .collect(),
 759                language_servers: Default::default(),
 760            },
 761        );
 762
 763        participant
 764            .projects
 765            .extend(Self::build_participant_project(project_id, &self.projects));
 766
 767        Ok(room)
 768    }
 769
 770    pub fn unshare_project(
 771        &mut self,
 772        project_id: ProjectId,
 773        connection_id: ConnectionId,
 774    ) -> Result<(&proto::Room, Project)> {
 775        match self.projects.entry(project_id) {
 776            btree_map::Entry::Occupied(e) => {
 777                if e.get().host_connection_id == connection_id {
 778                    let project = e.remove();
 779
 780                    if let Some(host_connection) = self.connections.get_mut(&connection_id) {
 781                        host_connection.projects.remove(&project_id);
 782                    }
 783
 784                    for guest_connection in project.guests.keys() {
 785                        if let Some(connection) = self.connections.get_mut(guest_connection) {
 786                            connection.projects.remove(&project_id);
 787                        }
 788                    }
 789
 790                    let room = self
 791                        .rooms
 792                        .get_mut(&project.room_id)
 793                        .ok_or_else(|| anyhow!("no such room"))?;
 794                    let participant = room
 795                        .participants
 796                        .iter_mut()
 797                        .find(|participant| participant.peer_id == connection_id.0)
 798                        .ok_or_else(|| anyhow!("no such room"))?;
 799                    participant
 800                        .projects
 801                        .retain(|project| project.id != project_id.to_proto());
 802
 803                    Ok((room, project))
 804                } else {
 805                    Err(anyhow!("no such project"))?
 806                }
 807            }
 808            btree_map::Entry::Vacant(_) => Err(anyhow!("no such project"))?,
 809        }
 810    }
 811
 812    pub fn update_project(
 813        &mut self,
 814        project_id: ProjectId,
 815        worktrees: &[proto::WorktreeMetadata],
 816        connection_id: ConnectionId,
 817    ) -> Result<&proto::Room> {
 818        let project = self
 819            .projects
 820            .get_mut(&project_id)
 821            .ok_or_else(|| anyhow!("no such project"))?;
 822        if project.host_connection_id == connection_id {
 823            let mut old_worktrees = mem::take(&mut project.worktrees);
 824            for worktree in worktrees {
 825                if let Some(old_worktree) = old_worktrees.remove(&worktree.id) {
 826                    project.worktrees.insert(worktree.id, old_worktree);
 827                } else {
 828                    project.worktrees.insert(
 829                        worktree.id,
 830                        Worktree {
 831                            root_name: worktree.root_name.clone(),
 832                            visible: worktree.visible,
 833                            ..Default::default()
 834                        },
 835                    );
 836                }
 837            }
 838
 839            let room = self
 840                .rooms
 841                .get_mut(&project.room_id)
 842                .ok_or_else(|| anyhow!("no such room"))?;
 843            let participant_project = room
 844                .participants
 845                .iter_mut()
 846                .flat_map(|participant| &mut participant.projects)
 847                .find(|project| project.id == project_id.to_proto())
 848                .ok_or_else(|| anyhow!("no such project"))?;
 849            participant_project.worktree_root_names = worktrees
 850                .iter()
 851                .filter(|worktree| worktree.visible)
 852                .map(|worktree| worktree.root_name.clone())
 853                .collect();
 854
 855            Ok(room)
 856        } else {
 857            Err(anyhow!("no such project"))?
 858        }
 859    }
 860
 861    pub fn update_diagnostic_summary(
 862        &mut self,
 863        project_id: ProjectId,
 864        worktree_id: u64,
 865        connection_id: ConnectionId,
 866        summary: proto::DiagnosticSummary,
 867    ) -> Result<Vec<ConnectionId>> {
 868        let project = self
 869            .projects
 870            .get_mut(&project_id)
 871            .ok_or_else(|| anyhow!("no such project"))?;
 872        if project.host_connection_id == connection_id {
 873            let worktree = project
 874                .worktrees
 875                .get_mut(&worktree_id)
 876                .ok_or_else(|| anyhow!("no such worktree"))?;
 877            worktree
 878                .diagnostic_summaries
 879                .insert(summary.path.clone().into(), summary);
 880            return Ok(project.connection_ids());
 881        }
 882
 883        Err(anyhow!("no such worktree"))?
 884    }
 885
 886    pub fn start_language_server(
 887        &mut self,
 888        project_id: ProjectId,
 889        connection_id: ConnectionId,
 890        language_server: proto::LanguageServer,
 891    ) -> Result<Vec<ConnectionId>> {
 892        let project = self
 893            .projects
 894            .get_mut(&project_id)
 895            .ok_or_else(|| anyhow!("no such project"))?;
 896        if project.host_connection_id == connection_id {
 897            project.language_servers.push(language_server);
 898            return Ok(project.connection_ids());
 899        }
 900
 901        Err(anyhow!("no such project"))?
 902    }
 903
 904    pub fn join_project(
 905        &mut self,
 906        requester_connection_id: ConnectionId,
 907        project_id: ProjectId,
 908    ) -> Result<(&Project, ReplicaId)> {
 909        let connection = self
 910            .connections
 911            .get_mut(&requester_connection_id)
 912            .ok_or_else(|| anyhow!("no such connection"))?;
 913        let user = self
 914            .connected_users
 915            .get(&connection.user_id)
 916            .ok_or_else(|| anyhow!("no such connection"))?;
 917        let active_call = user.active_call.ok_or_else(|| anyhow!("no such project"))?;
 918        anyhow::ensure!(
 919            active_call.connection_id == Some(requester_connection_id),
 920            "no such project"
 921        );
 922
 923        let project = self
 924            .projects
 925            .get_mut(&project_id)
 926            .ok_or_else(|| anyhow!("no such project"))?;
 927        anyhow::ensure!(project.room_id == active_call.room_id, "no such project");
 928
 929        connection.projects.insert(project_id);
 930        let mut replica_id = 1;
 931        while project.active_replica_ids.contains(&replica_id) {
 932            replica_id += 1;
 933        }
 934        project.active_replica_ids.insert(replica_id);
 935        project.guests.insert(
 936            requester_connection_id,
 937            Collaborator {
 938                replica_id,
 939                user_id: connection.user_id,
 940                last_activity: Some(OffsetDateTime::now_utc()),
 941                admin: connection.admin,
 942            },
 943        );
 944
 945        project.host.last_activity = Some(OffsetDateTime::now_utc());
 946        Ok((project, replica_id))
 947    }
 948
 949    pub fn leave_project(
 950        &mut self,
 951        project_id: ProjectId,
 952        connection_id: ConnectionId,
 953    ) -> Result<LeftProject> {
 954        let project = self
 955            .projects
 956            .get_mut(&project_id)
 957            .ok_or_else(|| anyhow!("no such project"))?;
 958
 959        // If the connection leaving the project is a collaborator, remove it.
 960        let remove_collaborator = if let Some(guest) = project.guests.remove(&connection_id) {
 961            project.active_replica_ids.remove(&guest.replica_id);
 962            true
 963        } else {
 964            false
 965        };
 966
 967        if let Some(connection) = self.connections.get_mut(&connection_id) {
 968            connection.projects.remove(&project_id);
 969        }
 970
 971        Ok(LeftProject {
 972            id: project.id,
 973            host_connection_id: project.host_connection_id,
 974            host_user_id: project.host.user_id,
 975            connection_ids: project.connection_ids(),
 976            remove_collaborator,
 977        })
 978    }
 979
 980    #[allow(clippy::too_many_arguments)]
 981    pub fn update_worktree(
 982        &mut self,
 983        connection_id: ConnectionId,
 984        project_id: ProjectId,
 985        worktree_id: u64,
 986        worktree_root_name: &str,
 987        removed_entries: &[u64],
 988        updated_entries: &[proto::Entry],
 989        scan_id: u64,
 990        is_last_update: bool,
 991    ) -> Result<Vec<ConnectionId>> {
 992        let project = self.write_project(project_id, connection_id)?;
 993
 994        let connection_ids = project.connection_ids();
 995        let mut worktree = project.worktrees.entry(worktree_id).or_default();
 996        worktree.root_name = worktree_root_name.to_string();
 997
 998        for entry_id in removed_entries {
 999            worktree.entries.remove(entry_id);
1000        }
1001
1002        for entry in updated_entries {
1003            worktree.entries.insert(entry.id, entry.clone());
1004        }
1005
1006        worktree.scan_id = scan_id;
1007        worktree.is_complete = is_last_update;
1008        Ok(connection_ids)
1009    }
1010
1011    fn build_participant_project(
1012        project_id: ProjectId,
1013        projects: &BTreeMap<ProjectId, Project>,
1014    ) -> Option<proto::ParticipantProject> {
1015        Some(proto::ParticipantProject {
1016            id: project_id.to_proto(),
1017            worktree_root_names: projects
1018                .get(&project_id)?
1019                .worktrees
1020                .values()
1021                .filter(|worktree| worktree.visible)
1022                .map(|worktree| worktree.root_name.clone())
1023                .collect(),
1024        })
1025    }
1026
1027    pub fn project_connection_ids(
1028        &self,
1029        project_id: ProjectId,
1030        acting_connection_id: ConnectionId,
1031    ) -> Result<Vec<ConnectionId>> {
1032        Ok(self
1033            .read_project(project_id, acting_connection_id)?
1034            .connection_ids())
1035    }
1036
1037    pub fn channel_connection_ids(&self, channel_id: ChannelId) -> Result<Vec<ConnectionId>> {
1038        Ok(self
1039            .channels
1040            .get(&channel_id)
1041            .ok_or_else(|| anyhow!("no such channel"))?
1042            .connection_ids())
1043    }
1044
1045    pub fn project(&self, project_id: ProjectId) -> Result<&Project> {
1046        self.projects
1047            .get(&project_id)
1048            .ok_or_else(|| anyhow!("no such project"))
1049    }
1050
1051    pub fn register_project_activity(
1052        &mut self,
1053        project_id: ProjectId,
1054        connection_id: ConnectionId,
1055    ) -> Result<()> {
1056        let project = self
1057            .projects
1058            .get_mut(&project_id)
1059            .ok_or_else(|| anyhow!("no such project"))?;
1060        let collaborator = if connection_id == project.host_connection_id {
1061            &mut project.host
1062        } else if let Some(guest) = project.guests.get_mut(&connection_id) {
1063            guest
1064        } else {
1065            return Err(anyhow!("no such project"))?;
1066        };
1067        collaborator.last_activity = Some(OffsetDateTime::now_utc());
1068        Ok(())
1069    }
1070
1071    pub fn projects(&self) -> impl Iterator<Item = (&ProjectId, &Project)> {
1072        self.projects.iter()
1073    }
1074
1075    pub fn read_project(
1076        &self,
1077        project_id: ProjectId,
1078        connection_id: ConnectionId,
1079    ) -> Result<&Project> {
1080        let project = self
1081            .projects
1082            .get(&project_id)
1083            .ok_or_else(|| anyhow!("no such project"))?;
1084        if project.host_connection_id == connection_id
1085            || project.guests.contains_key(&connection_id)
1086        {
1087            Ok(project)
1088        } else {
1089            Err(anyhow!("no such project"))?
1090        }
1091    }
1092
1093    fn write_project(
1094        &mut self,
1095        project_id: ProjectId,
1096        connection_id: ConnectionId,
1097    ) -> Result<&mut Project> {
1098        let project = self
1099            .projects
1100            .get_mut(&project_id)
1101            .ok_or_else(|| anyhow!("no such project"))?;
1102        if project.host_connection_id == connection_id
1103            || project.guests.contains_key(&connection_id)
1104        {
1105            Ok(project)
1106        } else {
1107            Err(anyhow!("no such project"))?
1108        }
1109    }
1110
1111    #[cfg(test)]
1112    pub fn check_invariants(&self) {
1113        for (connection_id, connection) in &self.connections {
1114            for project_id in &connection.projects {
1115                let project = &self.projects.get(project_id).unwrap();
1116                if project.host_connection_id != *connection_id {
1117                    assert!(project.guests.contains_key(connection_id));
1118                }
1119
1120                for (worktree_id, worktree) in project.worktrees.iter() {
1121                    let mut paths = HashMap::default();
1122                    for entry in worktree.entries.values() {
1123                        let prev_entry = paths.insert(&entry.path, entry);
1124                        assert_eq!(
1125                            prev_entry,
1126                            None,
1127                            "worktree {:?}, duplicate path for entries {:?} and {:?}",
1128                            worktree_id,
1129                            prev_entry.unwrap(),
1130                            entry
1131                        );
1132                    }
1133                }
1134            }
1135            for channel_id in &connection.channels {
1136                let channel = self.channels.get(channel_id).unwrap();
1137                assert!(channel.connection_ids.contains(connection_id));
1138            }
1139            assert!(self
1140                .connected_users
1141                .get(&connection.user_id)
1142                .unwrap()
1143                .connection_ids
1144                .contains(connection_id));
1145        }
1146
1147        for (user_id, state) in &self.connected_users {
1148            for connection_id in &state.connection_ids {
1149                assert_eq!(
1150                    self.connections.get(connection_id).unwrap().user_id,
1151                    *user_id
1152                );
1153            }
1154
1155            if let Some(active_call) = state.active_call.as_ref() {
1156                if let Some(active_call_connection_id) = active_call.connection_id {
1157                    assert!(
1158                        state.connection_ids.contains(&active_call_connection_id),
1159                        "call is active on a dead connection"
1160                    );
1161                    assert!(
1162                        state.connection_ids.contains(&active_call_connection_id),
1163                        "call is active on a dead connection"
1164                    );
1165                }
1166            }
1167        }
1168
1169        for (room_id, room) in &self.rooms {
1170            for pending_user_id in &room.pending_participant_user_ids {
1171                assert!(
1172                    self.connected_users
1173                        .contains_key(&UserId::from_proto(*pending_user_id)),
1174                    "call is active on a user that has disconnected"
1175                );
1176            }
1177
1178            for participant in &room.participants {
1179                assert!(
1180                    self.connections
1181                        .contains_key(&ConnectionId(participant.peer_id)),
1182                    "room contains participant that has disconnected"
1183                );
1184
1185                for participant_project in &participant.projects {
1186                    let project = &self.projects[&ProjectId::from_proto(participant_project.id)];
1187                    assert_eq!(
1188                        project.room_id, *room_id,
1189                        "project was shared on a different room"
1190                    );
1191                }
1192            }
1193
1194            assert!(
1195                !room.pending_participant_user_ids.is_empty() || !room.participants.is_empty(),
1196                "room can't be empty"
1197            );
1198        }
1199
1200        for (project_id, project) in &self.projects {
1201            let host_connection = self.connections.get(&project.host_connection_id).unwrap();
1202            assert!(host_connection.projects.contains(project_id));
1203
1204            for guest_connection_id in project.guests.keys() {
1205                let guest_connection = self.connections.get(guest_connection_id).unwrap();
1206                assert!(guest_connection.projects.contains(project_id));
1207            }
1208            assert_eq!(project.active_replica_ids.len(), project.guests.len());
1209            assert_eq!(
1210                project.active_replica_ids,
1211                project
1212                    .guests
1213                    .values()
1214                    .map(|guest| guest.replica_id)
1215                    .collect::<HashSet<_>>(),
1216            );
1217
1218            let room = &self.rooms[&project.room_id];
1219            let room_participant = room
1220                .participants
1221                .iter()
1222                .find(|participant| participant.peer_id == project.host_connection_id.0)
1223                .unwrap();
1224            assert!(
1225                room_participant
1226                    .projects
1227                    .iter()
1228                    .any(|project| project.id == project_id.to_proto()),
1229                "project was not shared in room"
1230            );
1231        }
1232
1233        for (channel_id, channel) in &self.channels {
1234            for connection_id in &channel.connection_ids {
1235                let connection = self.connections.get(connection_id).unwrap();
1236                assert!(connection.channels.contains(channel_id));
1237            }
1238        }
1239    }
1240}
1241
1242impl Project {
1243    fn is_active_since(&self, start_time: OffsetDateTime) -> bool {
1244        self.guests
1245            .values()
1246            .chain([&self.host])
1247            .any(|collaborator| {
1248                collaborator
1249                    .last_activity
1250                    .map_or(false, |active_time| active_time > start_time)
1251            })
1252    }
1253
1254    pub fn guest_connection_ids(&self) -> Vec<ConnectionId> {
1255        self.guests.keys().copied().collect()
1256    }
1257
1258    pub fn connection_ids(&self) -> Vec<ConnectionId> {
1259        self.guests
1260            .keys()
1261            .copied()
1262            .chain(Some(self.host_connection_id))
1263            .collect()
1264    }
1265}
1266
1267impl Channel {
1268    fn connection_ids(&self) -> Vec<ConnectionId> {
1269        self.connection_ids.iter().copied().collect()
1270    }
1271}