store.rs

   1use crate::db::{self, ChannelId, ProjectId, UserId};
   2use anyhow::{anyhow, Result};
   3use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet};
   4use rpc::{proto, ConnectionId};
   5use serde::Serialize;
   6use std::{mem, path::PathBuf, str, time::Duration};
   7use time::OffsetDateTime;
   8use tracing::instrument;
   9use util::post_inc;
  10
  11pub type RoomId = u64;
  12
  13#[derive(Default, Serialize)]
  14pub struct Store {
  15    connections: BTreeMap<ConnectionId, ConnectionState>,
  16    connected_users: BTreeMap<UserId, ConnectedUser>,
  17    next_room_id: RoomId,
  18    rooms: BTreeMap<RoomId, proto::Room>,
  19    projects: BTreeMap<ProjectId, Project>,
  20    #[serde(skip)]
  21    channels: BTreeMap<ChannelId, Channel>,
  22}
  23
  24#[derive(Default, Serialize)]
  25struct ConnectedUser {
  26    connection_ids: HashSet<ConnectionId>,
  27    active_call: Option<Call>,
  28}
  29
  30#[derive(Serialize)]
  31struct ConnectionState {
  32    user_id: UserId,
  33    admin: bool,
  34    projects: BTreeSet<ProjectId>,
  35    channels: HashSet<ChannelId>,
  36}
  37
  38#[derive(Copy, Clone, Eq, PartialEq, Serialize)]
  39pub struct Call {
  40    pub caller_user_id: UserId,
  41    pub room_id: RoomId,
  42    pub connection_id: Option<ConnectionId>,
  43    pub initial_project_id: Option<ProjectId>,
  44}
  45
  46#[derive(Serialize)]
  47pub struct Project {
  48    pub id: ProjectId,
  49    pub room_id: RoomId,
  50    pub host_connection_id: ConnectionId,
  51    pub host: Collaborator,
  52    pub guests: HashMap<ConnectionId, Collaborator>,
  53    pub active_replica_ids: HashSet<ReplicaId>,
  54    pub worktrees: BTreeMap<u64, Worktree>,
  55    pub language_servers: Vec<proto::LanguageServer>,
  56}
  57
  58#[derive(Serialize)]
  59pub struct Collaborator {
  60    pub replica_id: ReplicaId,
  61    pub user_id: UserId,
  62    #[serde(skip)]
  63    pub last_activity: Option<OffsetDateTime>,
  64    pub admin: bool,
  65}
  66
  67#[derive(Default, Serialize)]
  68pub struct Worktree {
  69    pub root_name: String,
  70    pub visible: bool,
  71    #[serde(skip)]
  72    pub entries: BTreeMap<u64, proto::Entry>,
  73    #[serde(skip)]
  74    pub diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
  75    pub scan_id: u64,
  76    pub is_complete: bool,
  77}
  78
  79#[derive(Default)]
  80pub struct Channel {
  81    pub connection_ids: HashSet<ConnectionId>,
  82}
  83
  84pub type ReplicaId = u16;
  85
  86#[derive(Default)]
  87pub struct RemovedConnectionState {
  88    pub user_id: UserId,
  89    pub hosted_projects: HashMap<ProjectId, Project>,
  90    pub guest_project_ids: HashSet<ProjectId>,
  91    pub contact_ids: HashSet<UserId>,
  92    pub room_id: Option<RoomId>,
  93}
  94
  95pub struct LeftProject {
  96    pub id: ProjectId,
  97    pub host_user_id: UserId,
  98    pub host_connection_id: ConnectionId,
  99    pub connection_ids: Vec<ConnectionId>,
 100    pub remove_collaborator: bool,
 101}
 102
 103pub struct LeftRoom<'a> {
 104    pub room: Option<&'a proto::Room>,
 105    pub unshared_projects: Vec<Project>,
 106    pub left_projects: Vec<LeftProject>,
 107}
 108
 109#[derive(Copy, Clone)]
 110pub struct Metrics {
 111    pub connections: usize,
 112    pub registered_projects: usize,
 113    pub active_projects: usize,
 114    pub shared_projects: usize,
 115}
 116
 117impl Store {
 118    pub fn metrics(&self) -> Metrics {
 119        const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60);
 120        let active_window_start = OffsetDateTime::now_utc() - ACTIVE_PROJECT_TIMEOUT;
 121
 122        let connections = self.connections.values().filter(|c| !c.admin).count();
 123        let mut registered_projects = 0;
 124        let mut active_projects = 0;
 125        let mut shared_projects = 0;
 126        for project in self.projects.values() {
 127            if let Some(connection) = self.connections.get(&project.host_connection_id) {
 128                if !connection.admin {
 129                    registered_projects += 1;
 130                    if project.is_active_since(active_window_start) {
 131                        active_projects += 1;
 132                        if !project.guests.is_empty() {
 133                            shared_projects += 1;
 134                        }
 135                    }
 136                }
 137            }
 138        }
 139
 140        Metrics {
 141            connections,
 142            registered_projects,
 143            active_projects,
 144            shared_projects,
 145        }
 146    }
 147
 148    #[instrument(skip(self))]
 149    pub fn add_connection(
 150        &mut self,
 151        connection_id: ConnectionId,
 152        user_id: UserId,
 153        admin: bool,
 154    ) -> Option<proto::IncomingCall> {
 155        self.connections.insert(
 156            connection_id,
 157            ConnectionState {
 158                user_id,
 159                admin,
 160                projects: Default::default(),
 161                channels: Default::default(),
 162            },
 163        );
 164        let connected_user = self.connected_users.entry(user_id).or_default();
 165        connected_user.connection_ids.insert(connection_id);
 166        if let Some(active_call) = connected_user.active_call {
 167            if active_call.connection_id.is_some() {
 168                None
 169            } else {
 170                let room = self.room(active_call.room_id)?;
 171                Some(proto::IncomingCall {
 172                    room_id: active_call.room_id,
 173                    caller_user_id: active_call.caller_user_id.to_proto(),
 174                    participant_user_ids: room
 175                        .participants
 176                        .iter()
 177                        .map(|participant| participant.user_id)
 178                        .collect(),
 179                    initial_project_id: active_call
 180                        .initial_project_id
 181                        .map(|project_id| project_id.to_proto()),
 182                })
 183            }
 184        } else {
 185            None
 186        }
 187    }
 188
 189    #[instrument(skip(self))]
 190    pub fn remove_connection(
 191        &mut self,
 192        connection_id: ConnectionId,
 193    ) -> Result<RemovedConnectionState> {
 194        let connection = self
 195            .connections
 196            .get_mut(&connection_id)
 197            .ok_or_else(|| anyhow!("no such connection"))?;
 198
 199        let user_id = connection.user_id;
 200        let connection_projects = mem::take(&mut connection.projects);
 201        let connection_channels = mem::take(&mut connection.channels);
 202
 203        let mut result = RemovedConnectionState {
 204            user_id,
 205            ..Default::default()
 206        };
 207
 208        // Leave all channels.
 209        for channel_id in connection_channels {
 210            self.leave_channel(connection_id, channel_id);
 211        }
 212
 213        // Unshare and leave all projects.
 214        for project_id in connection_projects {
 215            if let Ok((_, project)) = self.unshare_project(project_id, connection_id) {
 216                result.hosted_projects.insert(project_id, project);
 217            } else if self.leave_project(project_id, connection_id).is_ok() {
 218                result.guest_project_ids.insert(project_id);
 219            }
 220        }
 221
 222        let connected_user = self.connected_users.get_mut(&user_id).unwrap();
 223        connected_user.connection_ids.remove(&connection_id);
 224        if let Some(active_call) = connected_user.active_call.as_ref() {
 225            let room_id = active_call.room_id;
 226            if let Some(room) = self.rooms.get_mut(&room_id) {
 227                let prev_participant_count = room.participants.len();
 228                room.participants
 229                    .retain(|participant| participant.peer_id != connection_id.0);
 230                if prev_participant_count == room.participants.len() {
 231                    if connected_user.connection_ids.is_empty() {
 232                        room.pending_participant_user_ids
 233                            .retain(|pending_user_id| *pending_user_id != user_id.to_proto());
 234                        result.room_id = Some(room_id);
 235                        connected_user.active_call = None;
 236                    }
 237                } else {
 238                    result.room_id = Some(room_id);
 239                    connected_user.active_call = None;
 240                }
 241
 242                if room.participants.is_empty() && room.pending_participant_user_ids.is_empty() {
 243                    self.rooms.remove(&room_id);
 244                }
 245            } else {
 246                tracing::error!("disconnected user claims to be in a room that does not exist");
 247                connected_user.active_call = None;
 248            }
 249        }
 250
 251        if connected_user.connection_ids.is_empty() {
 252            self.connected_users.remove(&user_id);
 253        }
 254
 255        self.connections.remove(&connection_id).unwrap();
 256
 257        Ok(result)
 258    }
 259
 260    #[cfg(test)]
 261    pub fn channel(&self, id: ChannelId) -> Option<&Channel> {
 262        self.channels.get(&id)
 263    }
 264
 265    pub fn join_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
 266        if let Some(connection) = self.connections.get_mut(&connection_id) {
 267            connection.channels.insert(channel_id);
 268            self.channels
 269                .entry(channel_id)
 270                .or_default()
 271                .connection_ids
 272                .insert(connection_id);
 273        }
 274    }
 275
 276    pub fn leave_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
 277        if let Some(connection) = self.connections.get_mut(&connection_id) {
 278            connection.channels.remove(&channel_id);
 279            if let btree_map::Entry::Occupied(mut entry) = self.channels.entry(channel_id) {
 280                entry.get_mut().connection_ids.remove(&connection_id);
 281                if entry.get_mut().connection_ids.is_empty() {
 282                    entry.remove();
 283                }
 284            }
 285        }
 286    }
 287
 288    pub fn user_id_for_connection(&self, connection_id: ConnectionId) -> Result<UserId> {
 289        Ok(self
 290            .connections
 291            .get(&connection_id)
 292            .ok_or_else(|| anyhow!("unknown connection"))?
 293            .user_id)
 294    }
 295
 296    pub fn connection_ids_for_user(
 297        &self,
 298        user_id: UserId,
 299    ) -> impl Iterator<Item = ConnectionId> + '_ {
 300        self.connected_users
 301            .get(&user_id)
 302            .into_iter()
 303            .map(|state| &state.connection_ids)
 304            .flatten()
 305            .copied()
 306    }
 307
 308    pub fn is_user_online(&self, user_id: UserId) -> bool {
 309        !self
 310            .connected_users
 311            .get(&user_id)
 312            .unwrap_or(&Default::default())
 313            .connection_ids
 314            .is_empty()
 315    }
 316
 317    fn is_user_busy(&self, user_id: UserId) -> bool {
 318        self.connected_users
 319            .get(&user_id)
 320            .unwrap_or(&Default::default())
 321            .active_call
 322            .is_some()
 323    }
 324
 325    pub fn build_initial_contacts_update(
 326        &self,
 327        contacts: Vec<db::Contact>,
 328    ) -> proto::UpdateContacts {
 329        let mut update = proto::UpdateContacts::default();
 330
 331        for contact in contacts {
 332            match contact {
 333                db::Contact::Accepted {
 334                    user_id,
 335                    should_notify,
 336                } => {
 337                    update
 338                        .contacts
 339                        .push(self.contact_for_user(user_id, should_notify));
 340                }
 341                db::Contact::Outgoing { user_id } => {
 342                    update.outgoing_requests.push(user_id.to_proto())
 343                }
 344                db::Contact::Incoming {
 345                    user_id,
 346                    should_notify,
 347                } => update
 348                    .incoming_requests
 349                    .push(proto::IncomingContactRequest {
 350                        requester_id: user_id.to_proto(),
 351                        should_notify,
 352                    }),
 353            }
 354        }
 355
 356        update
 357    }
 358
 359    pub fn contact_for_user(&self, user_id: UserId, should_notify: bool) -> proto::Contact {
 360        proto::Contact {
 361            user_id: user_id.to_proto(),
 362            online: self.is_user_online(user_id),
 363            busy: self.is_user_busy(user_id),
 364            should_notify,
 365        }
 366    }
 367
 368    pub fn create_room(&mut self, creator_connection_id: ConnectionId) -> Result<RoomId> {
 369        let connection = self
 370            .connections
 371            .get_mut(&creator_connection_id)
 372            .ok_or_else(|| anyhow!("no such connection"))?;
 373        let connected_user = self
 374            .connected_users
 375            .get_mut(&connection.user_id)
 376            .ok_or_else(|| anyhow!("no such connection"))?;
 377        anyhow::ensure!(
 378            connected_user.active_call.is_none(),
 379            "can't create a room with an active call"
 380        );
 381
 382        let mut room = proto::Room::default();
 383        room.participants.push(proto::Participant {
 384            user_id: connection.user_id.to_proto(),
 385            peer_id: creator_connection_id.0,
 386            projects: Default::default(),
 387            location: Some(proto::ParticipantLocation {
 388                variant: Some(proto::participant_location::Variant::External(
 389                    proto::participant_location::External {},
 390                )),
 391            }),
 392        });
 393
 394        let room_id = post_inc(&mut self.next_room_id);
 395        self.rooms.insert(room_id, room);
 396        connected_user.active_call = Some(Call {
 397            caller_user_id: connection.user_id,
 398            room_id,
 399            connection_id: Some(creator_connection_id),
 400            initial_project_id: None,
 401        });
 402        Ok(room_id)
 403    }
 404
 405    pub fn join_room(
 406        &mut self,
 407        room_id: RoomId,
 408        connection_id: ConnectionId,
 409    ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
 410        let connection = self
 411            .connections
 412            .get_mut(&connection_id)
 413            .ok_or_else(|| anyhow!("no such connection"))?;
 414        let user_id = connection.user_id;
 415        let recipient_connection_ids = self.connection_ids_for_user(user_id).collect::<Vec<_>>();
 416
 417        let connected_user = self
 418            .connected_users
 419            .get_mut(&user_id)
 420            .ok_or_else(|| anyhow!("no such connection"))?;
 421        let active_call = connected_user
 422            .active_call
 423            .as_mut()
 424            .ok_or_else(|| anyhow!("not being called"))?;
 425        anyhow::ensure!(
 426            active_call.room_id == room_id && active_call.connection_id.is_none(),
 427            "not being called on this room"
 428        );
 429
 430        let room = self
 431            .rooms
 432            .get_mut(&room_id)
 433            .ok_or_else(|| anyhow!("no such room"))?;
 434        anyhow::ensure!(
 435            room.pending_participant_user_ids
 436                .contains(&user_id.to_proto()),
 437            anyhow!("no such room")
 438        );
 439        room.pending_participant_user_ids
 440            .retain(|pending| *pending != user_id.to_proto());
 441        room.participants.push(proto::Participant {
 442            user_id: user_id.to_proto(),
 443            peer_id: connection_id.0,
 444            projects: Default::default(),
 445            location: Some(proto::ParticipantLocation {
 446                variant: Some(proto::participant_location::Variant::External(
 447                    proto::participant_location::External {},
 448                )),
 449            }),
 450        });
 451        active_call.connection_id = Some(connection_id);
 452
 453        Ok((room, recipient_connection_ids))
 454    }
 455
 456    pub fn leave_room(&mut self, room_id: RoomId, connection_id: ConnectionId) -> Result<LeftRoom> {
 457        let connection = self
 458            .connections
 459            .get_mut(&connection_id)
 460            .ok_or_else(|| anyhow!("no such connection"))?;
 461        let user_id = connection.user_id;
 462
 463        let connected_user = self
 464            .connected_users
 465            .get(&user_id)
 466            .ok_or_else(|| anyhow!("no such connection"))?;
 467        anyhow::ensure!(
 468            connected_user
 469                .active_call
 470                .map_or(false, |call| call.room_id == room_id
 471                    && call.connection_id == Some(connection_id)),
 472            "cannot leave a room before joining it"
 473        );
 474
 475        // Given that users can only join one room at a time, we can safely unshare
 476        // and leave all projects associated with the connection.
 477        let mut unshared_projects = Vec::new();
 478        let mut left_projects = Vec::new();
 479        for project_id in connection.projects.clone() {
 480            if let Ok((_, project)) = self.unshare_project(project_id, connection_id) {
 481                unshared_projects.push(project);
 482            } else if let Ok(project) = self.leave_project(project_id, connection_id) {
 483                left_projects.push(project);
 484            }
 485        }
 486        self.connected_users.get_mut(&user_id).unwrap().active_call = None;
 487
 488        let room = self
 489            .rooms
 490            .get_mut(&room_id)
 491            .ok_or_else(|| anyhow!("no such room"))?;
 492        room.participants
 493            .retain(|participant| participant.peer_id != connection_id.0);
 494        if room.participants.is_empty() && room.pending_participant_user_ids.is_empty() {
 495            self.rooms.remove(&room_id);
 496        }
 497
 498        Ok(LeftRoom {
 499            room: self.rooms.get(&room_id),
 500            unshared_projects,
 501            left_projects,
 502        })
 503    }
 504
 505    pub fn room(&self, room_id: RoomId) -> Option<&proto::Room> {
 506        self.rooms.get(&room_id)
 507    }
 508
 509    pub fn call(
 510        &mut self,
 511        room_id: RoomId,
 512        recipient_user_id: UserId,
 513        initial_project_id: Option<ProjectId>,
 514        from_connection_id: ConnectionId,
 515    ) -> Result<(&proto::Room, Vec<ConnectionId>, proto::IncomingCall)> {
 516        let caller_user_id = self.user_id_for_connection(from_connection_id)?;
 517
 518        let recipient_connection_ids = self
 519            .connection_ids_for_user(recipient_user_id)
 520            .collect::<Vec<_>>();
 521        let mut recipient = self
 522            .connected_users
 523            .get_mut(&recipient_user_id)
 524            .ok_or_else(|| anyhow!("no such connection"))?;
 525        anyhow::ensure!(
 526            recipient.active_call.is_none(),
 527            "recipient is already on another call"
 528        );
 529
 530        let room = self
 531            .rooms
 532            .get_mut(&room_id)
 533            .ok_or_else(|| anyhow!("no such room"))?;
 534        anyhow::ensure!(
 535            room.participants
 536                .iter()
 537                .any(|participant| participant.peer_id == from_connection_id.0),
 538            "no such room"
 539        );
 540        anyhow::ensure!(
 541            room.pending_participant_user_ids
 542                .iter()
 543                .all(|user_id| UserId::from_proto(*user_id) != recipient_user_id),
 544            "cannot call the same user more than once"
 545        );
 546        room.pending_participant_user_ids
 547            .push(recipient_user_id.to_proto());
 548
 549        if let Some(initial_project_id) = initial_project_id {
 550            let project = self
 551                .projects
 552                .get(&initial_project_id)
 553                .ok_or_else(|| anyhow!("no such project"))?;
 554            anyhow::ensure!(project.room_id == room_id, "no such project");
 555        }
 556
 557        recipient.active_call = Some(Call {
 558            caller_user_id,
 559            room_id,
 560            connection_id: None,
 561            initial_project_id,
 562        });
 563
 564        Ok((
 565            room,
 566            recipient_connection_ids,
 567            proto::IncomingCall {
 568                room_id,
 569                caller_user_id: caller_user_id.to_proto(),
 570                participant_user_ids: room
 571                    .participants
 572                    .iter()
 573                    .map(|participant| participant.user_id)
 574                    .collect(),
 575                initial_project_id: initial_project_id.map(|project_id| project_id.to_proto()),
 576            },
 577        ))
 578    }
 579
 580    pub fn call_failed(&mut self, room_id: RoomId, to_user_id: UserId) -> Result<&proto::Room> {
 581        let mut recipient = self
 582            .connected_users
 583            .get_mut(&to_user_id)
 584            .ok_or_else(|| anyhow!("no such connection"))?;
 585        anyhow::ensure!(recipient
 586            .active_call
 587            .map_or(false, |call| call.room_id == room_id
 588                && call.connection_id.is_none()));
 589        recipient.active_call = None;
 590        let room = self
 591            .rooms
 592            .get_mut(&room_id)
 593            .ok_or_else(|| anyhow!("no such room"))?;
 594        room.pending_participant_user_ids
 595            .retain(|user_id| UserId::from_proto(*user_id) != to_user_id);
 596        Ok(room)
 597    }
 598
 599    pub fn cancel_call(
 600        &mut self,
 601        room_id: RoomId,
 602        recipient_user_id: UserId,
 603        canceller_connection_id: ConnectionId,
 604    ) -> Result<(&proto::Room, HashSet<ConnectionId>)> {
 605        let canceller_user_id = self.user_id_for_connection(canceller_connection_id)?;
 606        let canceller = self
 607            .connected_users
 608            .get(&canceller_user_id)
 609            .ok_or_else(|| anyhow!("no such connection"))?;
 610        let recipient = self
 611            .connected_users
 612            .get(&recipient_user_id)
 613            .ok_or_else(|| anyhow!("no such connection"))?;
 614        let canceller_active_call = canceller
 615            .active_call
 616            .as_ref()
 617            .ok_or_else(|| anyhow!("no active call"))?;
 618        let recipient_active_call = recipient
 619            .active_call
 620            .as_ref()
 621            .ok_or_else(|| anyhow!("no active call for recipient"))?;
 622
 623        anyhow::ensure!(
 624            canceller_active_call.room_id == room_id,
 625            "users are on different calls"
 626        );
 627        anyhow::ensure!(
 628            recipient_active_call.room_id == room_id,
 629            "users are on different calls"
 630        );
 631        anyhow::ensure!(
 632            recipient_active_call.connection_id.is_none(),
 633            "recipient has already answered"
 634        );
 635        let room_id = recipient_active_call.room_id;
 636        let room = self
 637            .rooms
 638            .get_mut(&room_id)
 639            .ok_or_else(|| anyhow!("no such room"))?;
 640        room.pending_participant_user_ids
 641            .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
 642
 643        let recipient = self.connected_users.get_mut(&recipient_user_id).unwrap();
 644        recipient.active_call.take();
 645
 646        Ok((room, recipient.connection_ids.clone()))
 647    }
 648
 649    pub fn decline_call(
 650        &mut self,
 651        room_id: RoomId,
 652        recipient_connection_id: ConnectionId,
 653    ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
 654        let recipient_user_id = self.user_id_for_connection(recipient_connection_id)?;
 655        let recipient = self
 656            .connected_users
 657            .get_mut(&recipient_user_id)
 658            .ok_or_else(|| anyhow!("no such connection"))?;
 659        if let Some(active_call) = recipient.active_call.take() {
 660            anyhow::ensure!(active_call.room_id == room_id, "no such room");
 661            let recipient_connection_ids = self
 662                .connection_ids_for_user(recipient_user_id)
 663                .collect::<Vec<_>>();
 664            let room = self
 665                .rooms
 666                .get_mut(&active_call.room_id)
 667                .ok_or_else(|| anyhow!("no such room"))?;
 668            room.pending_participant_user_ids
 669                .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
 670            Ok((room, recipient_connection_ids))
 671        } else {
 672            Err(anyhow!("user is not being called"))
 673        }
 674    }
 675
 676    pub fn update_participant_location(
 677        &mut self,
 678        room_id: RoomId,
 679        location: proto::ParticipantLocation,
 680        connection_id: ConnectionId,
 681    ) -> Result<&proto::Room> {
 682        let room = self
 683            .rooms
 684            .get_mut(&room_id)
 685            .ok_or_else(|| anyhow!("no such room"))?;
 686        if let Some(proto::participant_location::Variant::Project(project)) =
 687            location.variant.as_ref()
 688        {
 689            anyhow::ensure!(
 690                room.participants
 691                    .iter()
 692                    .flat_map(|participant| &participant.projects)
 693                    .any(|participant_project| participant_project.id == project.id),
 694                "no such project"
 695            );
 696        }
 697
 698        let participant = room
 699            .participants
 700            .iter_mut()
 701            .find(|participant| participant.peer_id == connection_id.0)
 702            .ok_or_else(|| anyhow!("no such room"))?;
 703        participant.location = Some(location);
 704
 705        Ok(room)
 706    }
 707
 708    pub fn share_project(
 709        &mut self,
 710        room_id: RoomId,
 711        project_id: ProjectId,
 712        worktrees: Vec<proto::WorktreeMetadata>,
 713        host_connection_id: ConnectionId,
 714    ) -> Result<&proto::Room> {
 715        let connection = self
 716            .connections
 717            .get_mut(&host_connection_id)
 718            .ok_or_else(|| anyhow!("no such connection"))?;
 719
 720        let room = self
 721            .rooms
 722            .get_mut(&room_id)
 723            .ok_or_else(|| anyhow!("no such room"))?;
 724        let participant = room
 725            .participants
 726            .iter_mut()
 727            .find(|participant| participant.peer_id == host_connection_id.0)
 728            .ok_or_else(|| anyhow!("no such room"))?;
 729        participant.projects.push(proto::ParticipantProject {
 730            id: project_id.to_proto(),
 731            worktree_root_names: worktrees
 732                .iter()
 733                .filter(|worktree| worktree.visible)
 734                .map(|worktree| worktree.root_name.clone())
 735                .collect(),
 736        });
 737
 738        connection.projects.insert(project_id);
 739        self.projects.insert(
 740            project_id,
 741            Project {
 742                id: project_id,
 743                room_id,
 744                host_connection_id,
 745                host: Collaborator {
 746                    user_id: connection.user_id,
 747                    replica_id: 0,
 748                    last_activity: None,
 749                    admin: connection.admin,
 750                },
 751                guests: Default::default(),
 752                active_replica_ids: Default::default(),
 753                worktrees: worktrees
 754                    .into_iter()
 755                    .map(|worktree| {
 756                        (
 757                            worktree.id,
 758                            Worktree {
 759                                root_name: worktree.root_name,
 760                                visible: worktree.visible,
 761                                ..Default::default()
 762                            },
 763                        )
 764                    })
 765                    .collect(),
 766                language_servers: Default::default(),
 767            },
 768        );
 769
 770        Ok(room)
 771    }
 772
 773    pub fn unshare_project(
 774        &mut self,
 775        project_id: ProjectId,
 776        connection_id: ConnectionId,
 777    ) -> Result<(&proto::Room, Project)> {
 778        match self.projects.entry(project_id) {
 779            btree_map::Entry::Occupied(e) => {
 780                if e.get().host_connection_id == connection_id {
 781                    let project = e.remove();
 782
 783                    if let Some(host_connection) = self.connections.get_mut(&connection_id) {
 784                        host_connection.projects.remove(&project_id);
 785                    }
 786
 787                    for guest_connection in project.guests.keys() {
 788                        if let Some(connection) = self.connections.get_mut(guest_connection) {
 789                            connection.projects.remove(&project_id);
 790                        }
 791                    }
 792
 793                    let room = self
 794                        .rooms
 795                        .get_mut(&project.room_id)
 796                        .ok_or_else(|| anyhow!("no such room"))?;
 797                    let participant = room
 798                        .participants
 799                        .iter_mut()
 800                        .find(|participant| participant.peer_id == connection_id.0)
 801                        .ok_or_else(|| anyhow!("no such room"))?;
 802                    participant
 803                        .projects
 804                        .retain(|project| project.id != project_id.to_proto());
 805
 806                    Ok((room, project))
 807                } else {
 808                    Err(anyhow!("no such project"))?
 809                }
 810            }
 811            btree_map::Entry::Vacant(_) => Err(anyhow!("no such project"))?,
 812        }
 813    }
 814
 815    pub fn update_project(
 816        &mut self,
 817        project_id: ProjectId,
 818        worktrees: &[proto::WorktreeMetadata],
 819        connection_id: ConnectionId,
 820    ) -> Result<&proto::Room> {
 821        let project = self
 822            .projects
 823            .get_mut(&project_id)
 824            .ok_or_else(|| anyhow!("no such project"))?;
 825        if project.host_connection_id == connection_id {
 826            let mut old_worktrees = mem::take(&mut project.worktrees);
 827            for worktree in worktrees {
 828                if let Some(old_worktree) = old_worktrees.remove(&worktree.id) {
 829                    project.worktrees.insert(worktree.id, old_worktree);
 830                } else {
 831                    project.worktrees.insert(
 832                        worktree.id,
 833                        Worktree {
 834                            root_name: worktree.root_name.clone(),
 835                            visible: worktree.visible,
 836                            ..Default::default()
 837                        },
 838                    );
 839                }
 840            }
 841
 842            let room = self
 843                .rooms
 844                .get_mut(&project.room_id)
 845                .ok_or_else(|| anyhow!("no such room"))?;
 846            let participant_project = room
 847                .participants
 848                .iter_mut()
 849                .flat_map(|participant| &mut participant.projects)
 850                .find(|project| project.id == project_id.to_proto())
 851                .ok_or_else(|| anyhow!("no such project"))?;
 852            participant_project.worktree_root_names = worktrees
 853                .iter()
 854                .filter(|worktree| worktree.visible)
 855                .map(|worktree| worktree.root_name.clone())
 856                .collect();
 857
 858            Ok(room)
 859        } else {
 860            Err(anyhow!("no such project"))?
 861        }
 862    }
 863
 864    pub fn update_diagnostic_summary(
 865        &mut self,
 866        project_id: ProjectId,
 867        worktree_id: u64,
 868        connection_id: ConnectionId,
 869        summary: proto::DiagnosticSummary,
 870    ) -> Result<Vec<ConnectionId>> {
 871        let project = self
 872            .projects
 873            .get_mut(&project_id)
 874            .ok_or_else(|| anyhow!("no such project"))?;
 875        if project.host_connection_id == connection_id {
 876            let worktree = project
 877                .worktrees
 878                .get_mut(&worktree_id)
 879                .ok_or_else(|| anyhow!("no such worktree"))?;
 880            worktree
 881                .diagnostic_summaries
 882                .insert(summary.path.clone().into(), summary);
 883            return Ok(project.connection_ids());
 884        }
 885
 886        Err(anyhow!("no such worktree"))?
 887    }
 888
 889    pub fn start_language_server(
 890        &mut self,
 891        project_id: ProjectId,
 892        connection_id: ConnectionId,
 893        language_server: proto::LanguageServer,
 894    ) -> Result<Vec<ConnectionId>> {
 895        let project = self
 896            .projects
 897            .get_mut(&project_id)
 898            .ok_or_else(|| anyhow!("no such project"))?;
 899        if project.host_connection_id == connection_id {
 900            project.language_servers.push(language_server);
 901            return Ok(project.connection_ids());
 902        }
 903
 904        Err(anyhow!("no such project"))?
 905    }
 906
 907    pub fn join_project(
 908        &mut self,
 909        requester_connection_id: ConnectionId,
 910        project_id: ProjectId,
 911    ) -> Result<(&Project, ReplicaId)> {
 912        let connection = self
 913            .connections
 914            .get_mut(&requester_connection_id)
 915            .ok_or_else(|| anyhow!("no such connection"))?;
 916        let user = self
 917            .connected_users
 918            .get(&connection.user_id)
 919            .ok_or_else(|| anyhow!("no such connection"))?;
 920        let active_call = user.active_call.ok_or_else(|| anyhow!("no such project"))?;
 921        anyhow::ensure!(
 922            active_call.connection_id == Some(requester_connection_id),
 923            "no such project"
 924        );
 925
 926        let project = self
 927            .projects
 928            .get_mut(&project_id)
 929            .ok_or_else(|| anyhow!("no such project"))?;
 930        anyhow::ensure!(project.room_id == active_call.room_id, "no such project");
 931
 932        connection.projects.insert(project_id);
 933        let mut replica_id = 1;
 934        while project.active_replica_ids.contains(&replica_id) {
 935            replica_id += 1;
 936        }
 937        project.active_replica_ids.insert(replica_id);
 938        project.guests.insert(
 939            requester_connection_id,
 940            Collaborator {
 941                replica_id,
 942                user_id: connection.user_id,
 943                last_activity: Some(OffsetDateTime::now_utc()),
 944                admin: connection.admin,
 945            },
 946        );
 947
 948        project.host.last_activity = Some(OffsetDateTime::now_utc());
 949        Ok((project, replica_id))
 950    }
 951
 952    pub fn leave_project(
 953        &mut self,
 954        project_id: ProjectId,
 955        connection_id: ConnectionId,
 956    ) -> Result<LeftProject> {
 957        let project = self
 958            .projects
 959            .get_mut(&project_id)
 960            .ok_or_else(|| anyhow!("no such project"))?;
 961
 962        // If the connection leaving the project is a collaborator, remove it.
 963        let remove_collaborator = if let Some(guest) = project.guests.remove(&connection_id) {
 964            project.active_replica_ids.remove(&guest.replica_id);
 965            true
 966        } else {
 967            false
 968        };
 969
 970        if let Some(connection) = self.connections.get_mut(&connection_id) {
 971            connection.projects.remove(&project_id);
 972        }
 973
 974        Ok(LeftProject {
 975            id: project.id,
 976            host_connection_id: project.host_connection_id,
 977            host_user_id: project.host.user_id,
 978            connection_ids: project.connection_ids(),
 979            remove_collaborator,
 980        })
 981    }
 982
 983    #[allow(clippy::too_many_arguments)]
 984    pub fn update_worktree(
 985        &mut self,
 986        connection_id: ConnectionId,
 987        project_id: ProjectId,
 988        worktree_id: u64,
 989        worktree_root_name: &str,
 990        removed_entries: &[u64],
 991        updated_entries: &[proto::Entry],
 992        scan_id: u64,
 993        is_last_update: bool,
 994    ) -> Result<Vec<ConnectionId>> {
 995        let project = self.write_project(project_id, connection_id)?;
 996
 997        let connection_ids = project.connection_ids();
 998        let mut worktree = project.worktrees.entry(worktree_id).or_default();
 999        worktree.root_name = worktree_root_name.to_string();
1000
1001        for entry_id in removed_entries {
1002            worktree.entries.remove(entry_id);
1003        }
1004
1005        for entry in updated_entries {
1006            worktree.entries.insert(entry.id, entry.clone());
1007        }
1008
1009        worktree.scan_id = scan_id;
1010        worktree.is_complete = is_last_update;
1011        Ok(connection_ids)
1012    }
1013
1014    pub fn project_connection_ids(
1015        &self,
1016        project_id: ProjectId,
1017        acting_connection_id: ConnectionId,
1018    ) -> Result<Vec<ConnectionId>> {
1019        Ok(self
1020            .read_project(project_id, acting_connection_id)?
1021            .connection_ids())
1022    }
1023
1024    pub fn channel_connection_ids(&self, channel_id: ChannelId) -> Result<Vec<ConnectionId>> {
1025        Ok(self
1026            .channels
1027            .get(&channel_id)
1028            .ok_or_else(|| anyhow!("no such channel"))?
1029            .connection_ids())
1030    }
1031
1032    pub fn project(&self, project_id: ProjectId) -> Result<&Project> {
1033        self.projects
1034            .get(&project_id)
1035            .ok_or_else(|| anyhow!("no such project"))
1036    }
1037
1038    pub fn register_project_activity(
1039        &mut self,
1040        project_id: ProjectId,
1041        connection_id: ConnectionId,
1042    ) -> Result<()> {
1043        let project = self
1044            .projects
1045            .get_mut(&project_id)
1046            .ok_or_else(|| anyhow!("no such project"))?;
1047        let collaborator = if connection_id == project.host_connection_id {
1048            &mut project.host
1049        } else if let Some(guest) = project.guests.get_mut(&connection_id) {
1050            guest
1051        } else {
1052            return Err(anyhow!("no such project"))?;
1053        };
1054        collaborator.last_activity = Some(OffsetDateTime::now_utc());
1055        Ok(())
1056    }
1057
1058    pub fn projects(&self) -> impl Iterator<Item = (&ProjectId, &Project)> {
1059        self.projects.iter()
1060    }
1061
1062    pub fn read_project(
1063        &self,
1064        project_id: ProjectId,
1065        connection_id: ConnectionId,
1066    ) -> Result<&Project> {
1067        let project = self
1068            .projects
1069            .get(&project_id)
1070            .ok_or_else(|| anyhow!("no such project"))?;
1071        if project.host_connection_id == connection_id
1072            || project.guests.contains_key(&connection_id)
1073        {
1074            Ok(project)
1075        } else {
1076            Err(anyhow!("no such project"))?
1077        }
1078    }
1079
1080    fn write_project(
1081        &mut self,
1082        project_id: ProjectId,
1083        connection_id: ConnectionId,
1084    ) -> Result<&mut Project> {
1085        let project = self
1086            .projects
1087            .get_mut(&project_id)
1088            .ok_or_else(|| anyhow!("no such project"))?;
1089        if project.host_connection_id == connection_id
1090            || project.guests.contains_key(&connection_id)
1091        {
1092            Ok(project)
1093        } else {
1094            Err(anyhow!("no such project"))?
1095        }
1096    }
1097
1098    #[cfg(test)]
1099    pub fn check_invariants(&self) {
1100        for (connection_id, connection) in &self.connections {
1101            for project_id in &connection.projects {
1102                let project = &self.projects.get(project_id).unwrap();
1103                if project.host_connection_id != *connection_id {
1104                    assert!(project.guests.contains_key(connection_id));
1105                }
1106
1107                for (worktree_id, worktree) in project.worktrees.iter() {
1108                    let mut paths = HashMap::default();
1109                    for entry in worktree.entries.values() {
1110                        let prev_entry = paths.insert(&entry.path, entry);
1111                        assert_eq!(
1112                            prev_entry,
1113                            None,
1114                            "worktree {:?}, duplicate path for entries {:?} and {:?}",
1115                            worktree_id,
1116                            prev_entry.unwrap(),
1117                            entry
1118                        );
1119                    }
1120                }
1121            }
1122            for channel_id in &connection.channels {
1123                let channel = self.channels.get(channel_id).unwrap();
1124                assert!(channel.connection_ids.contains(connection_id));
1125            }
1126            assert!(self
1127                .connected_users
1128                .get(&connection.user_id)
1129                .unwrap()
1130                .connection_ids
1131                .contains(connection_id));
1132        }
1133
1134        for (user_id, state) in &self.connected_users {
1135            for connection_id in &state.connection_ids {
1136                assert_eq!(
1137                    self.connections.get(connection_id).unwrap().user_id,
1138                    *user_id
1139                );
1140            }
1141
1142            if let Some(active_call) = state.active_call.as_ref() {
1143                if let Some(active_call_connection_id) = active_call.connection_id {
1144                    assert!(
1145                        state.connection_ids.contains(&active_call_connection_id),
1146                        "call is active on a dead connection"
1147                    );
1148                    assert!(
1149                        state.connection_ids.contains(&active_call_connection_id),
1150                        "call is active on a dead connection"
1151                    );
1152                }
1153            }
1154        }
1155
1156        for (room_id, room) in &self.rooms {
1157            for pending_user_id in &room.pending_participant_user_ids {
1158                assert!(
1159                    self.connected_users
1160                        .contains_key(&UserId::from_proto(*pending_user_id)),
1161                    "call is active on a user that has disconnected"
1162                );
1163            }
1164
1165            for participant in &room.participants {
1166                assert!(
1167                    self.connections
1168                        .contains_key(&ConnectionId(participant.peer_id)),
1169                    "room contains participant that has disconnected"
1170                );
1171
1172                for participant_project in &participant.projects {
1173                    let project = &self.projects[&ProjectId::from_proto(participant_project.id)];
1174                    assert_eq!(
1175                        project.room_id, *room_id,
1176                        "project was shared on a different room"
1177                    );
1178                }
1179            }
1180
1181            assert!(
1182                !room.pending_participant_user_ids.is_empty() || !room.participants.is_empty(),
1183                "room can't be empty"
1184            );
1185        }
1186
1187        for (project_id, project) in &self.projects {
1188            let host_connection = self.connections.get(&project.host_connection_id).unwrap();
1189            assert!(host_connection.projects.contains(project_id));
1190
1191            for guest_connection_id in project.guests.keys() {
1192                let guest_connection = self.connections.get(guest_connection_id).unwrap();
1193                assert!(guest_connection.projects.contains(project_id));
1194            }
1195            assert_eq!(project.active_replica_ids.len(), project.guests.len(),);
1196            assert_eq!(
1197                project.active_replica_ids,
1198                project
1199                    .guests
1200                    .values()
1201                    .map(|guest| guest.replica_id)
1202                    .collect::<HashSet<_>>(),
1203            );
1204
1205            let room = &self.rooms[&project.room_id];
1206            let room_participant = room
1207                .participants
1208                .iter()
1209                .find(|participant| participant.peer_id == project.host_connection_id.0)
1210                .unwrap();
1211            assert!(
1212                room_participant
1213                    .projects
1214                    .iter()
1215                    .any(|project| project.id == project_id.to_proto()),
1216                "project was not shared in room"
1217            );
1218        }
1219
1220        for (channel_id, channel) in &self.channels {
1221            for connection_id in &channel.connection_ids {
1222                let connection = self.connections.get(connection_id).unwrap();
1223                assert!(connection.channels.contains(channel_id));
1224            }
1225        }
1226    }
1227}
1228
1229impl Project {
1230    fn is_active_since(&self, start_time: OffsetDateTime) -> bool {
1231        self.guests
1232            .values()
1233            .chain([&self.host])
1234            .any(|collaborator| {
1235                collaborator
1236                    .last_activity
1237                    .map_or(false, |active_time| active_time > start_time)
1238            })
1239    }
1240
1241    pub fn guest_connection_ids(&self) -> Vec<ConnectionId> {
1242        self.guests.keys().copied().collect()
1243    }
1244
1245    pub fn connection_ids(&self) -> Vec<ConnectionId> {
1246        self.guests
1247            .keys()
1248            .copied()
1249            .chain(Some(self.host_connection_id))
1250            .collect()
1251    }
1252}
1253
1254impl Channel {
1255    fn connection_ids(&self) -> Vec<ConnectionId> {
1256        self.connection_ids.iter().copied().collect()
1257    }
1258}