store.rs

   1use crate::db::{self, ChannelId, ProjectId, UserId};
   2use anyhow::{anyhow, Result};
   3use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet};
   4use rpc::{proto, ConnectionId};
   5use serde::Serialize;
   6use std::{mem, path::PathBuf, str, time::Duration};
   7use time::OffsetDateTime;
   8use tracing::instrument;
   9use util::post_inc;
  10
  11pub type RoomId = u64;
  12
  13#[derive(Default, Serialize)]
  14pub struct Store {
  15    connections: BTreeMap<ConnectionId, ConnectionState>,
  16    connected_users: BTreeMap<UserId, ConnectedUser>,
  17    next_room_id: RoomId,
  18    rooms: BTreeMap<RoomId, proto::Room>,
  19    projects: BTreeMap<ProjectId, Project>,
  20    #[serde(skip)]
  21    channels: BTreeMap<ChannelId, Channel>,
  22}
  23
  24#[derive(Default, Serialize)]
  25struct ConnectedUser {
  26    connection_ids: HashSet<ConnectionId>,
  27    active_call: Option<Call>,
  28}
  29
  30#[derive(Serialize)]
  31struct ConnectionState {
  32    user_id: UserId,
  33    admin: bool,
  34    projects: BTreeSet<ProjectId>,
  35    channels: HashSet<ChannelId>,
  36}
  37
  38#[derive(Copy, Clone, Eq, PartialEq, Serialize)]
  39pub struct Call {
  40    pub caller_user_id: UserId,
  41    pub room_id: RoomId,
  42    pub connection_id: Option<ConnectionId>,
  43    pub initial_project_id: Option<ProjectId>,
  44}
  45
  46#[derive(Serialize)]
  47pub struct Project {
  48    pub id: ProjectId,
  49    pub room_id: RoomId,
  50    pub host_connection_id: ConnectionId,
  51    pub host: Collaborator,
  52    pub guests: HashMap<ConnectionId, Collaborator>,
  53    pub active_replica_ids: HashSet<ReplicaId>,
  54    pub worktrees: BTreeMap<u64, Worktree>,
  55    pub language_servers: Vec<proto::LanguageServer>,
  56}
  57
  58#[derive(Serialize)]
  59pub struct Collaborator {
  60    pub replica_id: ReplicaId,
  61    pub user_id: UserId,
  62    #[serde(skip)]
  63    pub last_activity: Option<OffsetDateTime>,
  64    pub admin: bool,
  65}
  66
  67#[derive(Default, Serialize)]
  68pub struct Worktree {
  69    pub root_name: String,
  70    pub visible: bool,
  71    #[serde(skip)]
  72    pub entries: BTreeMap<u64, proto::Entry>,
  73    #[serde(skip)]
  74    pub diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
  75    pub scan_id: u64,
  76    pub is_complete: bool,
  77}
  78
  79#[derive(Default)]
  80pub struct Channel {
  81    pub connection_ids: HashSet<ConnectionId>,
  82}
  83
  84pub type ReplicaId = u16;
  85
  86#[derive(Default)]
  87pub struct RemovedConnectionState {
  88    pub user_id: UserId,
  89    pub hosted_projects: HashMap<ProjectId, Project>,
  90    pub guest_project_ids: HashSet<ProjectId>,
  91    pub contact_ids: HashSet<UserId>,
  92    pub room_id: Option<RoomId>,
  93}
  94
  95pub struct LeftProject {
  96    pub id: ProjectId,
  97    pub host_user_id: UserId,
  98    pub host_connection_id: ConnectionId,
  99    pub connection_ids: Vec<ConnectionId>,
 100    pub remove_collaborator: bool,
 101}
 102
 103pub struct LeftRoom<'a> {
 104    pub room: Option<&'a proto::Room>,
 105    pub unshared_projects: Vec<Project>,
 106    pub left_projects: Vec<LeftProject>,
 107}
 108
 109#[derive(Copy, Clone)]
 110pub struct Metrics {
 111    pub connections: usize,
 112    pub registered_projects: usize,
 113    pub active_projects: usize,
 114    pub shared_projects: usize,
 115}
 116
 117impl Store {
 118    pub fn metrics(&self) -> Metrics {
 119        const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60);
 120        let active_window_start = OffsetDateTime::now_utc() - ACTIVE_PROJECT_TIMEOUT;
 121
 122        let connections = self.connections.values().filter(|c| !c.admin).count();
 123        let mut registered_projects = 0;
 124        let mut active_projects = 0;
 125        let mut shared_projects = 0;
 126        for project in self.projects.values() {
 127            if let Some(connection) = self.connections.get(&project.host_connection_id) {
 128                if !connection.admin {
 129                    registered_projects += 1;
 130                    if project.is_active_since(active_window_start) {
 131                        active_projects += 1;
 132                        if !project.guests.is_empty() {
 133                            shared_projects += 1;
 134                        }
 135                    }
 136                }
 137            }
 138        }
 139
 140        Metrics {
 141            connections,
 142            registered_projects,
 143            active_projects,
 144            shared_projects,
 145        }
 146    }
 147
 148    #[instrument(skip(self))]
 149    pub fn add_connection(
 150        &mut self,
 151        connection_id: ConnectionId,
 152        user_id: UserId,
 153        admin: bool,
 154    ) -> Option<proto::IncomingCall> {
 155        self.connections.insert(
 156            connection_id,
 157            ConnectionState {
 158                user_id,
 159                admin,
 160                projects: Default::default(),
 161                channels: Default::default(),
 162            },
 163        );
 164        let connected_user = self.connected_users.entry(user_id).or_default();
 165        connected_user.connection_ids.insert(connection_id);
 166        if let Some(active_call) = connected_user.active_call {
 167            if active_call.connection_id.is_some() {
 168                None
 169            } else {
 170                let room = self.room(active_call.room_id)?;
 171                Some(proto::IncomingCall {
 172                    room_id: active_call.room_id,
 173                    caller_user_id: active_call.caller_user_id.to_proto(),
 174                    participant_user_ids: room
 175                        .participants
 176                        .iter()
 177                        .map(|participant| participant.user_id)
 178                        .collect(),
 179                    initial_project_id: active_call
 180                        .initial_project_id
 181                        .map(|project_id| project_id.to_proto()),
 182                })
 183            }
 184        } else {
 185            None
 186        }
 187    }
 188
 189    #[instrument(skip(self))]
 190    pub fn remove_connection(
 191        &mut self,
 192        connection_id: ConnectionId,
 193    ) -> Result<RemovedConnectionState> {
 194        let connection = self
 195            .connections
 196            .get_mut(&connection_id)
 197            .ok_or_else(|| anyhow!("no such connection"))?;
 198
 199        let user_id = connection.user_id;
 200        let connection_projects = mem::take(&mut connection.projects);
 201        let connection_channels = mem::take(&mut connection.channels);
 202
 203        let mut result = RemovedConnectionState {
 204            user_id,
 205            ..Default::default()
 206        };
 207
 208        // Leave all channels.
 209        for channel_id in connection_channels {
 210            self.leave_channel(connection_id, channel_id);
 211        }
 212
 213        // Unshare and leave all projects.
 214        for project_id in connection_projects {
 215            if let Ok((_, project)) = self.unshare_project(project_id, connection_id) {
 216                result.hosted_projects.insert(project_id, project);
 217            } else if self.leave_project(project_id, connection_id).is_ok() {
 218                result.guest_project_ids.insert(project_id);
 219            }
 220        }
 221
 222        let connected_user = self.connected_users.get_mut(&user_id).unwrap();
 223        connected_user.connection_ids.remove(&connection_id);
 224        if let Some(active_call) = connected_user.active_call.as_ref() {
 225            let room_id = active_call.room_id;
 226            if let Some(room) = self.rooms.get_mut(&room_id) {
 227                let prev_participant_count = room.participants.len();
 228                room.participants
 229                    .retain(|participant| participant.peer_id != connection_id.0);
 230                if prev_participant_count == room.participants.len() {
 231                    if connected_user.connection_ids.is_empty() {
 232                        room.pending_user_ids
 233                            .retain(|pending_user_id| *pending_user_id != user_id.to_proto());
 234                        result.room_id = Some(room_id);
 235                        connected_user.active_call = None;
 236                    }
 237                } else {
 238                    result.room_id = Some(room_id);
 239                    connected_user.active_call = None;
 240                }
 241
 242                if room.participants.is_empty() && room.pending_user_ids.is_empty() {
 243                    self.rooms.remove(&room_id);
 244                }
 245            } else {
 246                tracing::error!("disconnected user claims to be in a room that does not exist");
 247                connected_user.active_call = None;
 248            }
 249        }
 250
 251        if connected_user.connection_ids.is_empty() {
 252            self.connected_users.remove(&user_id);
 253        }
 254
 255        self.connections.remove(&connection_id).unwrap();
 256
 257        Ok(result)
 258    }
 259
 260    #[cfg(test)]
 261    pub fn channel(&self, id: ChannelId) -> Option<&Channel> {
 262        self.channels.get(&id)
 263    }
 264
 265    pub fn join_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
 266        if let Some(connection) = self.connections.get_mut(&connection_id) {
 267            connection.channels.insert(channel_id);
 268            self.channels
 269                .entry(channel_id)
 270                .or_default()
 271                .connection_ids
 272                .insert(connection_id);
 273        }
 274    }
 275
 276    pub fn leave_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
 277        if let Some(connection) = self.connections.get_mut(&connection_id) {
 278            connection.channels.remove(&channel_id);
 279            if let btree_map::Entry::Occupied(mut entry) = self.channels.entry(channel_id) {
 280                entry.get_mut().connection_ids.remove(&connection_id);
 281                if entry.get_mut().connection_ids.is_empty() {
 282                    entry.remove();
 283                }
 284            }
 285        }
 286    }
 287
 288    pub fn user_id_for_connection(&self, connection_id: ConnectionId) -> Result<UserId> {
 289        Ok(self
 290            .connections
 291            .get(&connection_id)
 292            .ok_or_else(|| anyhow!("unknown connection"))?
 293            .user_id)
 294    }
 295
 296    pub fn connection_ids_for_user(
 297        &self,
 298        user_id: UserId,
 299    ) -> impl Iterator<Item = ConnectionId> + '_ {
 300        self.connected_users
 301            .get(&user_id)
 302            .into_iter()
 303            .map(|state| &state.connection_ids)
 304            .flatten()
 305            .copied()
 306    }
 307
 308    pub fn is_user_online(&self, user_id: UserId) -> bool {
 309        !self
 310            .connected_users
 311            .get(&user_id)
 312            .unwrap_or(&Default::default())
 313            .connection_ids
 314            .is_empty()
 315    }
 316
 317    pub fn build_initial_contacts_update(
 318        &self,
 319        contacts: Vec<db::Contact>,
 320    ) -> proto::UpdateContacts {
 321        let mut update = proto::UpdateContacts::default();
 322
 323        for contact in contacts {
 324            match contact {
 325                db::Contact::Accepted {
 326                    user_id,
 327                    should_notify,
 328                } => {
 329                    update
 330                        .contacts
 331                        .push(self.contact_for_user(user_id, should_notify));
 332                }
 333                db::Contact::Outgoing { user_id } => {
 334                    update.outgoing_requests.push(user_id.to_proto())
 335                }
 336                db::Contact::Incoming {
 337                    user_id,
 338                    should_notify,
 339                } => update
 340                    .incoming_requests
 341                    .push(proto::IncomingContactRequest {
 342                        requester_id: user_id.to_proto(),
 343                        should_notify,
 344                    }),
 345            }
 346        }
 347
 348        update
 349    }
 350
 351    pub fn contact_for_user(&self, user_id: UserId, should_notify: bool) -> proto::Contact {
 352        proto::Contact {
 353            user_id: user_id.to_proto(),
 354            online: self.is_user_online(user_id),
 355            should_notify,
 356        }
 357    }
 358
 359    pub fn create_room(&mut self, creator_connection_id: ConnectionId) -> Result<RoomId> {
 360        let connection = self
 361            .connections
 362            .get_mut(&creator_connection_id)
 363            .ok_or_else(|| anyhow!("no such connection"))?;
 364        let connected_user = self
 365            .connected_users
 366            .get_mut(&connection.user_id)
 367            .ok_or_else(|| anyhow!("no such connection"))?;
 368        anyhow::ensure!(
 369            connected_user.active_call.is_none(),
 370            "can't create a room with an active call"
 371        );
 372
 373        let mut room = proto::Room::default();
 374        room.participants.push(proto::Participant {
 375            user_id: connection.user_id.to_proto(),
 376            peer_id: creator_connection_id.0,
 377            project_ids: Default::default(),
 378            location: Some(proto::ParticipantLocation {
 379                variant: Some(proto::participant_location::Variant::External(
 380                    proto::participant_location::External {},
 381                )),
 382            }),
 383        });
 384
 385        let room_id = post_inc(&mut self.next_room_id);
 386        self.rooms.insert(room_id, room);
 387        connected_user.active_call = Some(Call {
 388            caller_user_id: connection.user_id,
 389            room_id,
 390            connection_id: Some(creator_connection_id),
 391            initial_project_id: None,
 392        });
 393        Ok(room_id)
 394    }
 395
 396    pub fn join_room(
 397        &mut self,
 398        room_id: RoomId,
 399        connection_id: ConnectionId,
 400    ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
 401        let connection = self
 402            .connections
 403            .get_mut(&connection_id)
 404            .ok_or_else(|| anyhow!("no such connection"))?;
 405        let user_id = connection.user_id;
 406        let recipient_connection_ids = self.connection_ids_for_user(user_id).collect::<Vec<_>>();
 407
 408        let connected_user = self
 409            .connected_users
 410            .get_mut(&user_id)
 411            .ok_or_else(|| anyhow!("no such connection"))?;
 412        let active_call = connected_user
 413            .active_call
 414            .as_mut()
 415            .ok_or_else(|| anyhow!("not being called"))?;
 416        anyhow::ensure!(
 417            active_call.room_id == room_id && active_call.connection_id.is_none(),
 418            "not being called on this room"
 419        );
 420
 421        let room = self
 422            .rooms
 423            .get_mut(&room_id)
 424            .ok_or_else(|| anyhow!("no such room"))?;
 425        anyhow::ensure!(
 426            room.pending_user_ids.contains(&user_id.to_proto()),
 427            anyhow!("no such room")
 428        );
 429        room.pending_user_ids
 430            .retain(|pending| *pending != user_id.to_proto());
 431        room.participants.push(proto::Participant {
 432            user_id: user_id.to_proto(),
 433            peer_id: connection_id.0,
 434            project_ids: Default::default(),
 435            location: Some(proto::ParticipantLocation {
 436                variant: Some(proto::participant_location::Variant::External(
 437                    proto::participant_location::External {},
 438                )),
 439            }),
 440        });
 441        active_call.connection_id = Some(connection_id);
 442
 443        Ok((room, recipient_connection_ids))
 444    }
 445
 446    pub fn leave_room(&mut self, room_id: RoomId, connection_id: ConnectionId) -> Result<LeftRoom> {
 447        let connection = self
 448            .connections
 449            .get_mut(&connection_id)
 450            .ok_or_else(|| anyhow!("no such connection"))?;
 451        let user_id = connection.user_id;
 452
 453        let connected_user = self
 454            .connected_users
 455            .get(&user_id)
 456            .ok_or_else(|| anyhow!("no such connection"))?;
 457        anyhow::ensure!(
 458            connected_user
 459                .active_call
 460                .map_or(false, |call| call.room_id == room_id
 461                    && call.connection_id == Some(connection_id)),
 462            "cannot leave a room before joining it"
 463        );
 464
 465        // Given that users can only join one room at a time, we can safely unshare
 466        // and leave all projects associated with the connection.
 467        let mut unshared_projects = Vec::new();
 468        let mut left_projects = Vec::new();
 469        for project_id in connection.projects.clone() {
 470            if let Ok((_, project)) = self.unshare_project(project_id, connection_id) {
 471                unshared_projects.push(project);
 472            } else if let Ok(project) = self.leave_project(project_id, connection_id) {
 473                left_projects.push(project);
 474            }
 475        }
 476        self.connected_users.get_mut(&user_id).unwrap().active_call = None;
 477
 478        let room = self
 479            .rooms
 480            .get_mut(&room_id)
 481            .ok_or_else(|| anyhow!("no such room"))?;
 482        room.participants
 483            .retain(|participant| participant.peer_id != connection_id.0);
 484        if room.participants.is_empty() && room.pending_user_ids.is_empty() {
 485            self.rooms.remove(&room_id);
 486        }
 487
 488        Ok(LeftRoom {
 489            room: self.rooms.get(&room_id),
 490            unshared_projects,
 491            left_projects,
 492        })
 493    }
 494
 495    pub fn room(&self, room_id: RoomId) -> Option<&proto::Room> {
 496        self.rooms.get(&room_id)
 497    }
 498
 499    pub fn call(
 500        &mut self,
 501        room_id: RoomId,
 502        recipient_user_id: UserId,
 503        initial_project_id: Option<ProjectId>,
 504        from_connection_id: ConnectionId,
 505    ) -> Result<(&proto::Room, Vec<ConnectionId>, proto::IncomingCall)> {
 506        let caller_user_id = self.user_id_for_connection(from_connection_id)?;
 507
 508        let recipient_connection_ids = self
 509            .connection_ids_for_user(recipient_user_id)
 510            .collect::<Vec<_>>();
 511        let mut recipient = self
 512            .connected_users
 513            .get_mut(&recipient_user_id)
 514            .ok_or_else(|| anyhow!("no such connection"))?;
 515        anyhow::ensure!(
 516            recipient.active_call.is_none(),
 517            "recipient is already on another call"
 518        );
 519
 520        let room = self
 521            .rooms
 522            .get_mut(&room_id)
 523            .ok_or_else(|| anyhow!("no such room"))?;
 524        anyhow::ensure!(
 525            room.participants
 526                .iter()
 527                .any(|participant| participant.peer_id == from_connection_id.0),
 528            "no such room"
 529        );
 530        anyhow::ensure!(
 531            room.pending_user_ids
 532                .iter()
 533                .all(|user_id| UserId::from_proto(*user_id) != recipient_user_id),
 534            "cannot call the same user more than once"
 535        );
 536        room.pending_user_ids.push(recipient_user_id.to_proto());
 537
 538        if let Some(initial_project_id) = initial_project_id {
 539            let project = self
 540                .projects
 541                .get(&initial_project_id)
 542                .ok_or_else(|| anyhow!("no such project"))?;
 543            anyhow::ensure!(project.room_id == room_id, "no such project");
 544        }
 545
 546        recipient.active_call = Some(Call {
 547            caller_user_id,
 548            room_id,
 549            connection_id: None,
 550            initial_project_id,
 551        });
 552
 553        Ok((
 554            room,
 555            recipient_connection_ids,
 556            proto::IncomingCall {
 557                room_id,
 558                caller_user_id: caller_user_id.to_proto(),
 559                participant_user_ids: room
 560                    .participants
 561                    .iter()
 562                    .map(|participant| participant.user_id)
 563                    .collect(),
 564                initial_project_id: initial_project_id.map(|project_id| project_id.to_proto()),
 565            },
 566        ))
 567    }
 568
 569    pub fn call_failed(&mut self, room_id: RoomId, to_user_id: UserId) -> Result<&proto::Room> {
 570        let mut recipient = self
 571            .connected_users
 572            .get_mut(&to_user_id)
 573            .ok_or_else(|| anyhow!("no such connection"))?;
 574        anyhow::ensure!(recipient
 575            .active_call
 576            .map_or(false, |call| call.room_id == room_id
 577                && call.connection_id.is_none()));
 578        recipient.active_call = None;
 579        let room = self
 580            .rooms
 581            .get_mut(&room_id)
 582            .ok_or_else(|| anyhow!("no such room"))?;
 583        room.pending_user_ids
 584            .retain(|user_id| UserId::from_proto(*user_id) != to_user_id);
 585        Ok(room)
 586    }
 587
 588    pub fn cancel_call(
 589        &mut self,
 590        room_id: RoomId,
 591        recipient_user_id: UserId,
 592        canceller_connection_id: ConnectionId,
 593    ) -> Result<(&proto::Room, HashSet<ConnectionId>)> {
 594        let canceller_user_id = self.user_id_for_connection(canceller_connection_id)?;
 595        let canceller = self
 596            .connected_users
 597            .get(&canceller_user_id)
 598            .ok_or_else(|| anyhow!("no such connection"))?;
 599        let recipient = self
 600            .connected_users
 601            .get(&recipient_user_id)
 602            .ok_or_else(|| anyhow!("no such connection"))?;
 603        let canceller_active_call = canceller
 604            .active_call
 605            .as_ref()
 606            .ok_or_else(|| anyhow!("no active call"))?;
 607        let recipient_active_call = recipient
 608            .active_call
 609            .as_ref()
 610            .ok_or_else(|| anyhow!("no active call for recipient"))?;
 611
 612        anyhow::ensure!(
 613            canceller_active_call.room_id == room_id,
 614            "users are on different calls"
 615        );
 616        anyhow::ensure!(
 617            recipient_active_call.room_id == room_id,
 618            "users are on different calls"
 619        );
 620        anyhow::ensure!(
 621            recipient_active_call.connection_id.is_none(),
 622            "recipient has already answered"
 623        );
 624        let room_id = recipient_active_call.room_id;
 625        let room = self
 626            .rooms
 627            .get_mut(&room_id)
 628            .ok_or_else(|| anyhow!("no such room"))?;
 629        room.pending_user_ids
 630            .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
 631
 632        let recipient = self.connected_users.get_mut(&recipient_user_id).unwrap();
 633        recipient.active_call.take();
 634
 635        Ok((room, recipient.connection_ids.clone()))
 636    }
 637
 638    pub fn decline_call(
 639        &mut self,
 640        room_id: RoomId,
 641        recipient_connection_id: ConnectionId,
 642    ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
 643        let recipient_user_id = self.user_id_for_connection(recipient_connection_id)?;
 644        let recipient = self
 645            .connected_users
 646            .get_mut(&recipient_user_id)
 647            .ok_or_else(|| anyhow!("no such connection"))?;
 648        if let Some(active_call) = recipient.active_call.take() {
 649            anyhow::ensure!(active_call.room_id == room_id, "no such room");
 650            let recipient_connection_ids = self
 651                .connection_ids_for_user(recipient_user_id)
 652                .collect::<Vec<_>>();
 653            let room = self
 654                .rooms
 655                .get_mut(&active_call.room_id)
 656                .ok_or_else(|| anyhow!("no such room"))?;
 657            room.pending_user_ids
 658                .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
 659            Ok((room, recipient_connection_ids))
 660        } else {
 661            Err(anyhow!("user is not being called"))
 662        }
 663    }
 664
 665    pub fn update_participant_location(
 666        &mut self,
 667        room_id: RoomId,
 668        location: proto::ParticipantLocation,
 669        connection_id: ConnectionId,
 670    ) -> Result<&proto::Room> {
 671        let room = self
 672            .rooms
 673            .get_mut(&room_id)
 674            .ok_or_else(|| anyhow!("no such room"))?;
 675        if let Some(proto::participant_location::Variant::Project(project)) =
 676            location.variant.as_ref()
 677        {
 678            anyhow::ensure!(
 679                room.participants
 680                    .iter()
 681                    .any(|participant| participant.project_ids.contains(&project.id)),
 682                "no such project"
 683            );
 684        }
 685
 686        let participant = room
 687            .participants
 688            .iter_mut()
 689            .find(|participant| participant.peer_id == connection_id.0)
 690            .ok_or_else(|| anyhow!("no such room"))?;
 691        participant.location = Some(location);
 692
 693        Ok(room)
 694    }
 695
 696    pub fn share_project(
 697        &mut self,
 698        room_id: RoomId,
 699        project_id: ProjectId,
 700        host_connection_id: ConnectionId,
 701    ) -> Result<&proto::Room> {
 702        let connection = self
 703            .connections
 704            .get_mut(&host_connection_id)
 705            .ok_or_else(|| anyhow!("no such connection"))?;
 706
 707        let room = self
 708            .rooms
 709            .get_mut(&room_id)
 710            .ok_or_else(|| anyhow!("no such room"))?;
 711        let participant = room
 712            .participants
 713            .iter_mut()
 714            .find(|participant| participant.peer_id == host_connection_id.0)
 715            .ok_or_else(|| anyhow!("no such room"))?;
 716        participant.project_ids.push(project_id.to_proto());
 717
 718        connection.projects.insert(project_id);
 719        self.projects.insert(
 720            project_id,
 721            Project {
 722                id: project_id,
 723                room_id,
 724                host_connection_id,
 725                host: Collaborator {
 726                    user_id: connection.user_id,
 727                    replica_id: 0,
 728                    last_activity: None,
 729                    admin: connection.admin,
 730                },
 731                guests: Default::default(),
 732                active_replica_ids: Default::default(),
 733                worktrees: Default::default(),
 734                language_servers: Default::default(),
 735            },
 736        );
 737
 738        Ok(room)
 739    }
 740
 741    pub fn unshare_project(
 742        &mut self,
 743        project_id: ProjectId,
 744        connection_id: ConnectionId,
 745    ) -> Result<(&proto::Room, Project)> {
 746        match self.projects.entry(project_id) {
 747            btree_map::Entry::Occupied(e) => {
 748                if e.get().host_connection_id == connection_id {
 749                    let project = e.remove();
 750
 751                    if let Some(host_connection) = self.connections.get_mut(&connection_id) {
 752                        host_connection.projects.remove(&project_id);
 753                    }
 754
 755                    for guest_connection in project.guests.keys() {
 756                        if let Some(connection) = self.connections.get_mut(guest_connection) {
 757                            connection.projects.remove(&project_id);
 758                        }
 759                    }
 760
 761                    let room = self
 762                        .rooms
 763                        .get_mut(&project.room_id)
 764                        .ok_or_else(|| anyhow!("no such room"))?;
 765                    let participant = room
 766                        .participants
 767                        .iter_mut()
 768                        .find(|participant| participant.peer_id == connection_id.0)
 769                        .ok_or_else(|| anyhow!("no such room"))?;
 770                    participant
 771                        .project_ids
 772                        .retain(|id| *id != project_id.to_proto());
 773
 774                    Ok((room, project))
 775                } else {
 776                    Err(anyhow!("no such project"))?
 777                }
 778            }
 779            btree_map::Entry::Vacant(_) => Err(anyhow!("no such project"))?,
 780        }
 781    }
 782
 783    pub fn update_project(
 784        &mut self,
 785        project_id: ProjectId,
 786        worktrees: &[proto::WorktreeMetadata],
 787        connection_id: ConnectionId,
 788    ) -> Result<()> {
 789        let project = self
 790            .projects
 791            .get_mut(&project_id)
 792            .ok_or_else(|| anyhow!("no such project"))?;
 793        if project.host_connection_id == connection_id {
 794            let mut old_worktrees = mem::take(&mut project.worktrees);
 795            for worktree in worktrees {
 796                if let Some(old_worktree) = old_worktrees.remove(&worktree.id) {
 797                    project.worktrees.insert(worktree.id, old_worktree);
 798                } else {
 799                    project.worktrees.insert(
 800                        worktree.id,
 801                        Worktree {
 802                            root_name: worktree.root_name.clone(),
 803                            visible: worktree.visible,
 804                            ..Default::default()
 805                        },
 806                    );
 807                }
 808            }
 809
 810            Ok(())
 811        } else {
 812            Err(anyhow!("no such project"))?
 813        }
 814    }
 815
 816    pub fn update_diagnostic_summary(
 817        &mut self,
 818        project_id: ProjectId,
 819        worktree_id: u64,
 820        connection_id: ConnectionId,
 821        summary: proto::DiagnosticSummary,
 822    ) -> Result<Vec<ConnectionId>> {
 823        let project = self
 824            .projects
 825            .get_mut(&project_id)
 826            .ok_or_else(|| anyhow!("no such project"))?;
 827        if project.host_connection_id == connection_id {
 828            let worktree = project
 829                .worktrees
 830                .get_mut(&worktree_id)
 831                .ok_or_else(|| anyhow!("no such worktree"))?;
 832            worktree
 833                .diagnostic_summaries
 834                .insert(summary.path.clone().into(), summary);
 835            return Ok(project.connection_ids());
 836        }
 837
 838        Err(anyhow!("no such worktree"))?
 839    }
 840
 841    pub fn start_language_server(
 842        &mut self,
 843        project_id: ProjectId,
 844        connection_id: ConnectionId,
 845        language_server: proto::LanguageServer,
 846    ) -> Result<Vec<ConnectionId>> {
 847        let project = self
 848            .projects
 849            .get_mut(&project_id)
 850            .ok_or_else(|| anyhow!("no such project"))?;
 851        if project.host_connection_id == connection_id {
 852            project.language_servers.push(language_server);
 853            return Ok(project.connection_ids());
 854        }
 855
 856        Err(anyhow!("no such project"))?
 857    }
 858
 859    pub fn join_project(
 860        &mut self,
 861        requester_connection_id: ConnectionId,
 862        project_id: ProjectId,
 863    ) -> Result<(&Project, ReplicaId)> {
 864        let connection = self
 865            .connections
 866            .get_mut(&requester_connection_id)
 867            .ok_or_else(|| anyhow!("no such connection"))?;
 868        let user = self
 869            .connected_users
 870            .get(&connection.user_id)
 871            .ok_or_else(|| anyhow!("no such connection"))?;
 872        let active_call = user.active_call.ok_or_else(|| anyhow!("no such project"))?;
 873        anyhow::ensure!(
 874            active_call.connection_id == Some(requester_connection_id),
 875            "no such project"
 876        );
 877
 878        let project = self
 879            .projects
 880            .get_mut(&project_id)
 881            .ok_or_else(|| anyhow!("no such project"))?;
 882        anyhow::ensure!(project.room_id == active_call.room_id, "no such project");
 883
 884        connection.projects.insert(project_id);
 885        let mut replica_id = 1;
 886        while project.active_replica_ids.contains(&replica_id) {
 887            replica_id += 1;
 888        }
 889        project.active_replica_ids.insert(replica_id);
 890        project.guests.insert(
 891            requester_connection_id,
 892            Collaborator {
 893                replica_id,
 894                user_id: connection.user_id,
 895                last_activity: Some(OffsetDateTime::now_utc()),
 896                admin: connection.admin,
 897            },
 898        );
 899
 900        project.host.last_activity = Some(OffsetDateTime::now_utc());
 901        Ok((project, replica_id))
 902    }
 903
 904    pub fn leave_project(
 905        &mut self,
 906        project_id: ProjectId,
 907        connection_id: ConnectionId,
 908    ) -> Result<LeftProject> {
 909        let project = self
 910            .projects
 911            .get_mut(&project_id)
 912            .ok_or_else(|| anyhow!("no such project"))?;
 913
 914        // If the connection leaving the project is a collaborator, remove it.
 915        let remove_collaborator = if let Some(guest) = project.guests.remove(&connection_id) {
 916            project.active_replica_ids.remove(&guest.replica_id);
 917            true
 918        } else {
 919            false
 920        };
 921
 922        if let Some(connection) = self.connections.get_mut(&connection_id) {
 923            connection.projects.remove(&project_id);
 924        }
 925
 926        Ok(LeftProject {
 927            id: project.id,
 928            host_connection_id: project.host_connection_id,
 929            host_user_id: project.host.user_id,
 930            connection_ids: project.connection_ids(),
 931            remove_collaborator,
 932        })
 933    }
 934
 935    #[allow(clippy::too_many_arguments)]
 936    pub fn update_worktree(
 937        &mut self,
 938        connection_id: ConnectionId,
 939        project_id: ProjectId,
 940        worktree_id: u64,
 941        worktree_root_name: &str,
 942        removed_entries: &[u64],
 943        updated_entries: &[proto::Entry],
 944        scan_id: u64,
 945        is_last_update: bool,
 946    ) -> Result<Vec<ConnectionId>> {
 947        let project = self.write_project(project_id, connection_id)?;
 948
 949        let connection_ids = project.connection_ids();
 950        let mut worktree = project.worktrees.entry(worktree_id).or_default();
 951        worktree.root_name = worktree_root_name.to_string();
 952
 953        for entry_id in removed_entries {
 954            worktree.entries.remove(entry_id);
 955        }
 956
 957        for entry in updated_entries {
 958            worktree.entries.insert(entry.id, entry.clone());
 959        }
 960
 961        worktree.scan_id = scan_id;
 962        worktree.is_complete = is_last_update;
 963        Ok(connection_ids)
 964    }
 965
 966    pub fn project_connection_ids(
 967        &self,
 968        project_id: ProjectId,
 969        acting_connection_id: ConnectionId,
 970    ) -> Result<Vec<ConnectionId>> {
 971        Ok(self
 972            .read_project(project_id, acting_connection_id)?
 973            .connection_ids())
 974    }
 975
 976    pub fn channel_connection_ids(&self, channel_id: ChannelId) -> Result<Vec<ConnectionId>> {
 977        Ok(self
 978            .channels
 979            .get(&channel_id)
 980            .ok_or_else(|| anyhow!("no such channel"))?
 981            .connection_ids())
 982    }
 983
 984    pub fn project(&self, project_id: ProjectId) -> Result<&Project> {
 985        self.projects
 986            .get(&project_id)
 987            .ok_or_else(|| anyhow!("no such project"))
 988    }
 989
 990    pub fn register_project_activity(
 991        &mut self,
 992        project_id: ProjectId,
 993        connection_id: ConnectionId,
 994    ) -> Result<()> {
 995        let project = self
 996            .projects
 997            .get_mut(&project_id)
 998            .ok_or_else(|| anyhow!("no such project"))?;
 999        let collaborator = if connection_id == project.host_connection_id {
1000            &mut project.host
1001        } else if let Some(guest) = project.guests.get_mut(&connection_id) {
1002            guest
1003        } else {
1004            return Err(anyhow!("no such project"))?;
1005        };
1006        collaborator.last_activity = Some(OffsetDateTime::now_utc());
1007        Ok(())
1008    }
1009
1010    pub fn projects(&self) -> impl Iterator<Item = (&ProjectId, &Project)> {
1011        self.projects.iter()
1012    }
1013
1014    pub fn read_project(
1015        &self,
1016        project_id: ProjectId,
1017        connection_id: ConnectionId,
1018    ) -> Result<&Project> {
1019        let project = self
1020            .projects
1021            .get(&project_id)
1022            .ok_or_else(|| anyhow!("no such project"))?;
1023        if project.host_connection_id == connection_id
1024            || project.guests.contains_key(&connection_id)
1025        {
1026            Ok(project)
1027        } else {
1028            Err(anyhow!("no such project"))?
1029        }
1030    }
1031
1032    fn write_project(
1033        &mut self,
1034        project_id: ProjectId,
1035        connection_id: ConnectionId,
1036    ) -> Result<&mut Project> {
1037        let project = self
1038            .projects
1039            .get_mut(&project_id)
1040            .ok_or_else(|| anyhow!("no such project"))?;
1041        if project.host_connection_id == connection_id
1042            || project.guests.contains_key(&connection_id)
1043        {
1044            Ok(project)
1045        } else {
1046            Err(anyhow!("no such project"))?
1047        }
1048    }
1049
1050    #[cfg(test)]
1051    pub fn check_invariants(&self) {
1052        for (connection_id, connection) in &self.connections {
1053            for project_id in &connection.projects {
1054                let project = &self.projects.get(project_id).unwrap();
1055                if project.host_connection_id != *connection_id {
1056                    assert!(project.guests.contains_key(connection_id));
1057                }
1058
1059                for (worktree_id, worktree) in project.worktrees.iter() {
1060                    let mut paths = HashMap::default();
1061                    for entry in worktree.entries.values() {
1062                        let prev_entry = paths.insert(&entry.path, entry);
1063                        assert_eq!(
1064                            prev_entry,
1065                            None,
1066                            "worktree {:?}, duplicate path for entries {:?} and {:?}",
1067                            worktree_id,
1068                            prev_entry.unwrap(),
1069                            entry
1070                        );
1071                    }
1072                }
1073            }
1074            for channel_id in &connection.channels {
1075                let channel = self.channels.get(channel_id).unwrap();
1076                assert!(channel.connection_ids.contains(connection_id));
1077            }
1078            assert!(self
1079                .connected_users
1080                .get(&connection.user_id)
1081                .unwrap()
1082                .connection_ids
1083                .contains(connection_id));
1084        }
1085
1086        for (user_id, state) in &self.connected_users {
1087            for connection_id in &state.connection_ids {
1088                assert_eq!(
1089                    self.connections.get(connection_id).unwrap().user_id,
1090                    *user_id
1091                );
1092            }
1093
1094            if let Some(active_call) = state.active_call.as_ref() {
1095                if let Some(active_call_connection_id) = active_call.connection_id {
1096                    assert!(
1097                        state.connection_ids.contains(&active_call_connection_id),
1098                        "call is active on a dead connection"
1099                    );
1100                    assert!(
1101                        state.connection_ids.contains(&active_call_connection_id),
1102                        "call is active on a dead connection"
1103                    );
1104                }
1105            }
1106        }
1107
1108        for (room_id, room) in &self.rooms {
1109            for pending_user_id in &room.pending_user_ids {
1110                assert!(
1111                    self.connected_users
1112                        .contains_key(&UserId::from_proto(*pending_user_id)),
1113                    "call is active on a user that has disconnected"
1114                );
1115            }
1116
1117            for participant in &room.participants {
1118                assert!(
1119                    self.connections
1120                        .contains_key(&ConnectionId(participant.peer_id)),
1121                    "room contains participant that has disconnected"
1122                );
1123
1124                for project_id in &participant.project_ids {
1125                    let project = &self.projects[&ProjectId::from_proto(*project_id)];
1126                    assert_eq!(
1127                        project.room_id, *room_id,
1128                        "project was shared on a different room"
1129                    );
1130                }
1131            }
1132
1133            assert!(
1134                !room.pending_user_ids.is_empty() || !room.participants.is_empty(),
1135                "room can't be empty"
1136            );
1137        }
1138
1139        for (project_id, project) in &self.projects {
1140            let host_connection = self.connections.get(&project.host_connection_id).unwrap();
1141            assert!(host_connection.projects.contains(project_id));
1142
1143            for guest_connection_id in project.guests.keys() {
1144                let guest_connection = self.connections.get(guest_connection_id).unwrap();
1145                assert!(guest_connection.projects.contains(project_id));
1146            }
1147            assert_eq!(project.active_replica_ids.len(), project.guests.len(),);
1148            assert_eq!(
1149                project.active_replica_ids,
1150                project
1151                    .guests
1152                    .values()
1153                    .map(|guest| guest.replica_id)
1154                    .collect::<HashSet<_>>(),
1155            );
1156
1157            let room = &self.rooms[&project.room_id];
1158            let room_participant = room
1159                .participants
1160                .iter()
1161                .find(|participant| participant.peer_id == project.host_connection_id.0)
1162                .unwrap();
1163            assert!(
1164                room_participant
1165                    .project_ids
1166                    .contains(&project_id.to_proto()),
1167                "project was not shared in room"
1168            );
1169        }
1170
1171        for (channel_id, channel) in &self.channels {
1172            for connection_id in &channel.connection_ids {
1173                let connection = self.connections.get(connection_id).unwrap();
1174                assert!(connection.channels.contains(channel_id));
1175            }
1176        }
1177    }
1178}
1179
1180impl Project {
1181    fn is_active_since(&self, start_time: OffsetDateTime) -> bool {
1182        self.guests
1183            .values()
1184            .chain([&self.host])
1185            .any(|collaborator| {
1186                collaborator
1187                    .last_activity
1188                    .map_or(false, |active_time| active_time > start_time)
1189            })
1190    }
1191
1192    pub fn guest_connection_ids(&self) -> Vec<ConnectionId> {
1193        self.guests.keys().copied().collect()
1194    }
1195
1196    pub fn connection_ids(&self) -> Vec<ConnectionId> {
1197        self.guests
1198            .keys()
1199            .copied()
1200            .chain(Some(self.host_connection_id))
1201            .collect()
1202    }
1203}
1204
1205impl Channel {
1206    fn connection_ids(&self) -> Vec<ConnectionId> {
1207        self.connection_ids.iter().copied().collect()
1208    }
1209}