store.rs

   1use crate::db::{self, ChannelId, ProjectId, UserId};
   2use anyhow::{anyhow, Result};
   3use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet};
   4use nanoid::nanoid;
   5use rpc::{proto, ConnectionId};
   6use serde::Serialize;
   7use std::{borrow::Cow, mem, path::PathBuf, str, time::Duration};
   8use time::OffsetDateTime;
   9use tracing::instrument;
  10use util::post_inc;
  11
  12pub type RoomId = u64;
  13
  14#[derive(Default, Serialize)]
  15pub struct Store {
  16    connections: BTreeMap<ConnectionId, ConnectionState>,
  17    connected_users: BTreeMap<UserId, ConnectedUser>,
  18    next_room_id: RoomId,
  19    rooms: BTreeMap<RoomId, proto::Room>,
  20    projects: BTreeMap<ProjectId, Project>,
  21    #[serde(skip)]
  22    channels: BTreeMap<ChannelId, Channel>,
  23}
  24
  25#[derive(Default, Serialize)]
  26struct ConnectedUser {
  27    connection_ids: HashSet<ConnectionId>,
  28    active_call: Option<Call>,
  29}
  30
  31#[derive(Serialize)]
  32struct ConnectionState {
  33    user_id: UserId,
  34    admin: bool,
  35    projects: BTreeSet<ProjectId>,
  36    channels: HashSet<ChannelId>,
  37}
  38
  39#[derive(Copy, Clone, Eq, PartialEq, Serialize)]
  40pub struct Call {
  41    pub caller_user_id: UserId,
  42    pub room_id: RoomId,
  43    pub connection_id: Option<ConnectionId>,
  44    pub initial_project_id: Option<ProjectId>,
  45}
  46
  47#[derive(Serialize)]
  48pub struct Project {
  49    pub id: ProjectId,
  50    pub room_id: RoomId,
  51    pub host_connection_id: ConnectionId,
  52    pub host: Collaborator,
  53    pub guests: HashMap<ConnectionId, Collaborator>,
  54    pub active_replica_ids: HashSet<ReplicaId>,
  55    pub worktrees: BTreeMap<u64, Worktree>,
  56    pub language_servers: Vec<proto::LanguageServer>,
  57}
  58
  59#[derive(Serialize)]
  60pub struct Collaborator {
  61    pub replica_id: ReplicaId,
  62    pub user_id: UserId,
  63    #[serde(skip)]
  64    pub last_activity: Option<OffsetDateTime>,
  65    pub admin: bool,
  66}
  67
  68#[derive(Default, Serialize)]
  69pub struct Worktree {
  70    pub abs_path: PathBuf,
  71    pub root_name: String,
  72    pub visible: bool,
  73    #[serde(skip)]
  74    pub entries: BTreeMap<u64, proto::Entry>,
  75    #[serde(skip)]
  76    pub diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
  77    pub scan_id: u64,
  78    pub is_complete: bool,
  79}
  80
  81#[derive(Default)]
  82pub struct Channel {
  83    pub connection_ids: HashSet<ConnectionId>,
  84}
  85
  86pub type ReplicaId = u16;
  87
  88#[derive(Default)]
  89pub struct RemovedConnectionState<'a> {
  90    pub user_id: UserId,
  91    pub hosted_projects: Vec<Project>,
  92    pub guest_projects: Vec<LeftProject>,
  93    pub contact_ids: HashSet<UserId>,
  94    pub room: Option<Cow<'a, proto::Room>>,
  95    pub canceled_call_connection_ids: Vec<ConnectionId>,
  96}
  97
  98pub struct LeftProject {
  99    pub id: ProjectId,
 100    pub host_user_id: UserId,
 101    pub host_connection_id: ConnectionId,
 102    pub connection_ids: Vec<ConnectionId>,
 103    pub remove_collaborator: bool,
 104}
 105
 106pub struct LeftRoom<'a> {
 107    pub room: Cow<'a, proto::Room>,
 108    pub unshared_projects: Vec<Project>,
 109    pub left_projects: Vec<LeftProject>,
 110    pub canceled_call_connection_ids: Vec<ConnectionId>,
 111}
 112
 113#[derive(Copy, Clone)]
 114pub struct Metrics {
 115    pub connections: usize,
 116    pub registered_projects: usize,
 117    pub active_projects: usize,
 118    pub shared_projects: usize,
 119}
 120
 121impl Store {
 122    pub fn metrics(&self) -> Metrics {
 123        const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60);
 124        let active_window_start = OffsetDateTime::now_utc() - ACTIVE_PROJECT_TIMEOUT;
 125
 126        let connections = self.connections.values().filter(|c| !c.admin).count();
 127        let mut registered_projects = 0;
 128        let mut active_projects = 0;
 129        let mut shared_projects = 0;
 130        for project in self.projects.values() {
 131            if let Some(connection) = self.connections.get(&project.host_connection_id) {
 132                if !connection.admin {
 133                    registered_projects += 1;
 134                    if project.is_active_since(active_window_start) {
 135                        active_projects += 1;
 136                        if !project.guests.is_empty() {
 137                            shared_projects += 1;
 138                        }
 139                    }
 140                }
 141            }
 142        }
 143
 144        Metrics {
 145            connections,
 146            registered_projects,
 147            active_projects,
 148            shared_projects,
 149        }
 150    }
 151
 152    #[instrument(skip(self))]
 153    pub fn add_connection(
 154        &mut self,
 155        connection_id: ConnectionId,
 156        user_id: UserId,
 157        admin: bool,
 158    ) -> Option<proto::IncomingCall> {
 159        self.connections.insert(
 160            connection_id,
 161            ConnectionState {
 162                user_id,
 163                admin,
 164                projects: Default::default(),
 165                channels: Default::default(),
 166            },
 167        );
 168        let connected_user = self.connected_users.entry(user_id).or_default();
 169        connected_user.connection_ids.insert(connection_id);
 170        if let Some(active_call) = connected_user.active_call {
 171            if active_call.connection_id.is_some() {
 172                None
 173            } else {
 174                let room = self.room(active_call.room_id)?;
 175                Some(proto::IncomingCall {
 176                    room_id: active_call.room_id,
 177                    caller_user_id: active_call.caller_user_id.to_proto(),
 178                    participant_user_ids: room
 179                        .participants
 180                        .iter()
 181                        .map(|participant| participant.user_id)
 182                        .collect(),
 183                    initial_project: active_call
 184                        .initial_project_id
 185                        .and_then(|id| Self::build_participant_project(id, &self.projects)),
 186                })
 187            }
 188        } else {
 189            None
 190        }
 191    }
 192
 193    #[instrument(skip(self))]
 194    pub fn remove_connection(
 195        &mut self,
 196        connection_id: ConnectionId,
 197    ) -> Result<RemovedConnectionState> {
 198        let connection = self
 199            .connections
 200            .get_mut(&connection_id)
 201            .ok_or_else(|| anyhow!("no such connection"))?;
 202
 203        let user_id = connection.user_id;
 204        let connection_channels = mem::take(&mut connection.channels);
 205
 206        let mut result = RemovedConnectionState {
 207            user_id,
 208            ..Default::default()
 209        };
 210
 211        // Leave all channels.
 212        for channel_id in connection_channels {
 213            self.leave_channel(connection_id, channel_id);
 214        }
 215
 216        let connected_user = self.connected_users.get(&user_id).unwrap();
 217        if let Some(active_call) = connected_user.active_call.as_ref() {
 218            let room_id = active_call.room_id;
 219            if active_call.connection_id == Some(connection_id) {
 220                let left_room = self.leave_room(room_id, connection_id)?;
 221                result.hosted_projects = left_room.unshared_projects;
 222                result.guest_projects = left_room.left_projects;
 223                result.room = Some(Cow::Owned(left_room.room.into_owned()));
 224                result.canceled_call_connection_ids = left_room.canceled_call_connection_ids;
 225            } else if connected_user.connection_ids.len() == 1 {
 226                let (room, _) = self.decline_call(room_id, connection_id)?;
 227                result.room = Some(Cow::Owned(room.clone()));
 228            }
 229        }
 230
 231        let connected_user = self.connected_users.get_mut(&user_id).unwrap();
 232        connected_user.connection_ids.remove(&connection_id);
 233        if connected_user.connection_ids.is_empty() {
 234            self.connected_users.remove(&user_id);
 235        }
 236        self.connections.remove(&connection_id).unwrap();
 237
 238        Ok(result)
 239    }
 240
 241    #[cfg(test)]
 242    pub fn channel(&self, id: ChannelId) -> Option<&Channel> {
 243        self.channels.get(&id)
 244    }
 245
 246    pub fn join_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
 247        if let Some(connection) = self.connections.get_mut(&connection_id) {
 248            connection.channels.insert(channel_id);
 249            self.channels
 250                .entry(channel_id)
 251                .or_default()
 252                .connection_ids
 253                .insert(connection_id);
 254        }
 255    }
 256
 257    pub fn leave_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
 258        if let Some(connection) = self.connections.get_mut(&connection_id) {
 259            connection.channels.remove(&channel_id);
 260            if let btree_map::Entry::Occupied(mut entry) = self.channels.entry(channel_id) {
 261                entry.get_mut().connection_ids.remove(&connection_id);
 262                if entry.get_mut().connection_ids.is_empty() {
 263                    entry.remove();
 264                }
 265            }
 266        }
 267    }
 268
 269    pub fn user_id_for_connection(&self, connection_id: ConnectionId) -> Result<UserId> {
 270        Ok(self
 271            .connections
 272            .get(&connection_id)
 273            .ok_or_else(|| anyhow!("unknown connection"))?
 274            .user_id)
 275    }
 276
 277    pub fn connection_ids_for_user(
 278        &self,
 279        user_id: UserId,
 280    ) -> impl Iterator<Item = ConnectionId> + '_ {
 281        self.connected_users
 282            .get(&user_id)
 283            .into_iter()
 284            .map(|state| &state.connection_ids)
 285            .flatten()
 286            .copied()
 287    }
 288
 289    pub fn is_user_online(&self, user_id: UserId) -> bool {
 290        !self
 291            .connected_users
 292            .get(&user_id)
 293            .unwrap_or(&Default::default())
 294            .connection_ids
 295            .is_empty()
 296    }
 297
 298    fn is_user_busy(&self, user_id: UserId) -> bool {
 299        self.connected_users
 300            .get(&user_id)
 301            .unwrap_or(&Default::default())
 302            .active_call
 303            .is_some()
 304    }
 305
 306    pub fn build_initial_contacts_update(
 307        &self,
 308        contacts: Vec<db::Contact>,
 309    ) -> proto::UpdateContacts {
 310        let mut update = proto::UpdateContacts::default();
 311
 312        for contact in contacts {
 313            match contact {
 314                db::Contact::Accepted {
 315                    user_id,
 316                    should_notify,
 317                } => {
 318                    update
 319                        .contacts
 320                        .push(self.contact_for_user(user_id, should_notify));
 321                }
 322                db::Contact::Outgoing { user_id } => {
 323                    update.outgoing_requests.push(user_id.to_proto())
 324                }
 325                db::Contact::Incoming {
 326                    user_id,
 327                    should_notify,
 328                } => update
 329                    .incoming_requests
 330                    .push(proto::IncomingContactRequest {
 331                        requester_id: user_id.to_proto(),
 332                        should_notify,
 333                    }),
 334            }
 335        }
 336
 337        update
 338    }
 339
 340    pub fn contact_for_user(&self, user_id: UserId, should_notify: bool) -> proto::Contact {
 341        proto::Contact {
 342            user_id: user_id.to_proto(),
 343            online: self.is_user_online(user_id),
 344            busy: self.is_user_busy(user_id),
 345            should_notify,
 346        }
 347    }
 348
 349    pub fn create_room(&mut self, creator_connection_id: ConnectionId) -> Result<&proto::Room> {
 350        let connection = self
 351            .connections
 352            .get_mut(&creator_connection_id)
 353            .ok_or_else(|| anyhow!("no such connection"))?;
 354        let connected_user = self
 355            .connected_users
 356            .get_mut(&connection.user_id)
 357            .ok_or_else(|| anyhow!("no such connection"))?;
 358        anyhow::ensure!(
 359            connected_user.active_call.is_none(),
 360            "can't create a room with an active call"
 361        );
 362
 363        let room_id = post_inc(&mut self.next_room_id);
 364        let room = proto::Room {
 365            id: room_id,
 366            participants: vec![proto::Participant {
 367                user_id: connection.user_id.to_proto(),
 368                peer_id: creator_connection_id.0,
 369                projects: Default::default(),
 370                location: Some(proto::ParticipantLocation {
 371                    variant: Some(proto::participant_location::Variant::External(
 372                        proto::participant_location::External {},
 373                    )),
 374                }),
 375            }],
 376            pending_participant_user_ids: Default::default(),
 377            live_kit_room: nanoid!(30),
 378        };
 379
 380        self.rooms.insert(room_id, room);
 381        connected_user.active_call = Some(Call {
 382            caller_user_id: connection.user_id,
 383            room_id,
 384            connection_id: Some(creator_connection_id),
 385            initial_project_id: None,
 386        });
 387        Ok(self.rooms.get(&room_id).unwrap())
 388    }
 389
 390    pub fn join_room(
 391        &mut self,
 392        room_id: RoomId,
 393        connection_id: ConnectionId,
 394    ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
 395        let connection = self
 396            .connections
 397            .get_mut(&connection_id)
 398            .ok_or_else(|| anyhow!("no such connection"))?;
 399        let user_id = connection.user_id;
 400        let recipient_connection_ids = self.connection_ids_for_user(user_id).collect::<Vec<_>>();
 401
 402        let connected_user = self
 403            .connected_users
 404            .get_mut(&user_id)
 405            .ok_or_else(|| anyhow!("no such connection"))?;
 406        let active_call = connected_user
 407            .active_call
 408            .as_mut()
 409            .ok_or_else(|| anyhow!("not being called"))?;
 410        anyhow::ensure!(
 411            active_call.room_id == room_id && active_call.connection_id.is_none(),
 412            "not being called on this room"
 413        );
 414
 415        let room = self
 416            .rooms
 417            .get_mut(&room_id)
 418            .ok_or_else(|| anyhow!("no such room"))?;
 419        anyhow::ensure!(
 420            room.pending_participant_user_ids
 421                .contains(&user_id.to_proto()),
 422            anyhow!("no such room")
 423        );
 424        room.pending_participant_user_ids
 425            .retain(|pending| *pending != user_id.to_proto());
 426        room.participants.push(proto::Participant {
 427            user_id: user_id.to_proto(),
 428            peer_id: connection_id.0,
 429            projects: Default::default(),
 430            location: Some(proto::ParticipantLocation {
 431                variant: Some(proto::participant_location::Variant::External(
 432                    proto::participant_location::External {},
 433                )),
 434            }),
 435        });
 436        active_call.connection_id = Some(connection_id);
 437
 438        Ok((room, recipient_connection_ids))
 439    }
 440
 441    pub fn leave_room(&mut self, room_id: RoomId, connection_id: ConnectionId) -> Result<LeftRoom> {
 442        let connection = self
 443            .connections
 444            .get_mut(&connection_id)
 445            .ok_or_else(|| anyhow!("no such connection"))?;
 446        let user_id = connection.user_id;
 447
 448        let connected_user = self
 449            .connected_users
 450            .get(&user_id)
 451            .ok_or_else(|| anyhow!("no such connection"))?;
 452        anyhow::ensure!(
 453            connected_user
 454                .active_call
 455                .map_or(false, |call| call.room_id == room_id
 456                    && call.connection_id == Some(connection_id)),
 457            "cannot leave a room before joining it"
 458        );
 459
 460        // Given that users can only join one room at a time, we can safely unshare
 461        // and leave all projects associated with the connection.
 462        let mut unshared_projects = Vec::new();
 463        let mut left_projects = Vec::new();
 464        for project_id in connection.projects.clone() {
 465            if let Ok((_, project)) = self.unshare_project(project_id, connection_id) {
 466                unshared_projects.push(project);
 467            } else if let Ok(project) = self.leave_project(project_id, connection_id) {
 468                left_projects.push(project);
 469            }
 470        }
 471        self.connected_users.get_mut(&user_id).unwrap().active_call = None;
 472
 473        let room = self
 474            .rooms
 475            .get_mut(&room_id)
 476            .ok_or_else(|| anyhow!("no such room"))?;
 477        room.participants
 478            .retain(|participant| participant.peer_id != connection_id.0);
 479
 480        let mut canceled_call_connection_ids = Vec::new();
 481        room.pending_participant_user_ids
 482            .retain(|pending_participant_user_id| {
 483                if let Some(connected_user) = self
 484                    .connected_users
 485                    .get_mut(&UserId::from_proto(*pending_participant_user_id))
 486                {
 487                    if let Some(call) = connected_user.active_call.as_ref() {
 488                        if call.caller_user_id == user_id {
 489                            connected_user.active_call.take();
 490                            canceled_call_connection_ids
 491                                .extend(connected_user.connection_ids.iter().copied());
 492                            false
 493                        } else {
 494                            true
 495                        }
 496                    } else {
 497                        true
 498                    }
 499                } else {
 500                    true
 501                }
 502            });
 503
 504        let room = if room.participants.is_empty() {
 505            Cow::Owned(self.rooms.remove(&room_id).unwrap())
 506        } else {
 507            Cow::Borrowed(self.rooms.get(&room_id).unwrap())
 508        };
 509
 510        Ok(LeftRoom {
 511            room,
 512            unshared_projects,
 513            left_projects,
 514            canceled_call_connection_ids,
 515        })
 516    }
 517
 518    pub fn room(&self, room_id: RoomId) -> Option<&proto::Room> {
 519        self.rooms.get(&room_id)
 520    }
 521
 522    pub fn rooms(&self) -> &BTreeMap<RoomId, proto::Room> {
 523        &self.rooms
 524    }
 525
 526    pub fn call(
 527        &mut self,
 528        room_id: RoomId,
 529        recipient_user_id: UserId,
 530        initial_project_id: Option<ProjectId>,
 531        from_connection_id: ConnectionId,
 532    ) -> Result<(&proto::Room, Vec<ConnectionId>, proto::IncomingCall)> {
 533        let caller_user_id = self.user_id_for_connection(from_connection_id)?;
 534
 535        let recipient_connection_ids = self
 536            .connection_ids_for_user(recipient_user_id)
 537            .collect::<Vec<_>>();
 538        let mut recipient = self
 539            .connected_users
 540            .get_mut(&recipient_user_id)
 541            .ok_or_else(|| anyhow!("no such connection"))?;
 542        anyhow::ensure!(
 543            recipient.active_call.is_none(),
 544            "recipient is already on another call"
 545        );
 546
 547        let room = self
 548            .rooms
 549            .get_mut(&room_id)
 550            .ok_or_else(|| anyhow!("no such room"))?;
 551        anyhow::ensure!(
 552            room.participants
 553                .iter()
 554                .any(|participant| participant.peer_id == from_connection_id.0),
 555            "no such room"
 556        );
 557        anyhow::ensure!(
 558            room.pending_participant_user_ids
 559                .iter()
 560                .all(|user_id| UserId::from_proto(*user_id) != recipient_user_id),
 561            "cannot call the same user more than once"
 562        );
 563        room.pending_participant_user_ids
 564            .push(recipient_user_id.to_proto());
 565
 566        if let Some(initial_project_id) = initial_project_id {
 567            let project = self
 568                .projects
 569                .get(&initial_project_id)
 570                .ok_or_else(|| anyhow!("no such project"))?;
 571            anyhow::ensure!(project.room_id == room_id, "no such project");
 572        }
 573
 574        recipient.active_call = Some(Call {
 575            caller_user_id,
 576            room_id,
 577            connection_id: None,
 578            initial_project_id,
 579        });
 580
 581        Ok((
 582            room,
 583            recipient_connection_ids,
 584            proto::IncomingCall {
 585                room_id,
 586                caller_user_id: caller_user_id.to_proto(),
 587                participant_user_ids: room
 588                    .participants
 589                    .iter()
 590                    .map(|participant| participant.user_id)
 591                    .collect(),
 592                initial_project: initial_project_id
 593                    .and_then(|id| Self::build_participant_project(id, &self.projects)),
 594            },
 595        ))
 596    }
 597
 598    pub fn call_failed(&mut self, room_id: RoomId, to_user_id: UserId) -> Result<&proto::Room> {
 599        let mut recipient = self
 600            .connected_users
 601            .get_mut(&to_user_id)
 602            .ok_or_else(|| anyhow!("no such connection"))?;
 603        anyhow::ensure!(recipient
 604            .active_call
 605            .map_or(false, |call| call.room_id == room_id
 606                && call.connection_id.is_none()));
 607        recipient.active_call = None;
 608        let room = self
 609            .rooms
 610            .get_mut(&room_id)
 611            .ok_or_else(|| anyhow!("no such room"))?;
 612        room.pending_participant_user_ids
 613            .retain(|user_id| UserId::from_proto(*user_id) != to_user_id);
 614        Ok(room)
 615    }
 616
 617    pub fn cancel_call(
 618        &mut self,
 619        room_id: RoomId,
 620        recipient_user_id: UserId,
 621        canceller_connection_id: ConnectionId,
 622    ) -> Result<(&proto::Room, HashSet<ConnectionId>)> {
 623        let canceller_user_id = self.user_id_for_connection(canceller_connection_id)?;
 624        let canceller = self
 625            .connected_users
 626            .get(&canceller_user_id)
 627            .ok_or_else(|| anyhow!("no such connection"))?;
 628        let recipient = self
 629            .connected_users
 630            .get(&recipient_user_id)
 631            .ok_or_else(|| anyhow!("no such connection"))?;
 632        let canceller_active_call = canceller
 633            .active_call
 634            .as_ref()
 635            .ok_or_else(|| anyhow!("no active call"))?;
 636        let recipient_active_call = recipient
 637            .active_call
 638            .as_ref()
 639            .ok_or_else(|| anyhow!("no active call for recipient"))?;
 640
 641        anyhow::ensure!(
 642            canceller_active_call.room_id == room_id,
 643            "users are on different calls"
 644        );
 645        anyhow::ensure!(
 646            recipient_active_call.room_id == room_id,
 647            "users are on different calls"
 648        );
 649        anyhow::ensure!(
 650            recipient_active_call.connection_id.is_none(),
 651            "recipient has already answered"
 652        );
 653        let room_id = recipient_active_call.room_id;
 654        let room = self
 655            .rooms
 656            .get_mut(&room_id)
 657            .ok_or_else(|| anyhow!("no such room"))?;
 658        room.pending_participant_user_ids
 659            .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
 660
 661        let recipient = self.connected_users.get_mut(&recipient_user_id).unwrap();
 662        recipient.active_call.take();
 663
 664        Ok((room, recipient.connection_ids.clone()))
 665    }
 666
 667    pub fn decline_call(
 668        &mut self,
 669        room_id: RoomId,
 670        recipient_connection_id: ConnectionId,
 671    ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
 672        let recipient_user_id = self.user_id_for_connection(recipient_connection_id)?;
 673        let recipient = self
 674            .connected_users
 675            .get_mut(&recipient_user_id)
 676            .ok_or_else(|| anyhow!("no such connection"))?;
 677        if let Some(active_call) = recipient.active_call {
 678            anyhow::ensure!(active_call.room_id == room_id, "no such room");
 679            anyhow::ensure!(
 680                active_call.connection_id.is_none(),
 681                "cannot decline a call after joining room"
 682            );
 683            recipient.active_call.take();
 684            let recipient_connection_ids = self
 685                .connection_ids_for_user(recipient_user_id)
 686                .collect::<Vec<_>>();
 687            let room = self
 688                .rooms
 689                .get_mut(&active_call.room_id)
 690                .ok_or_else(|| anyhow!("no such room"))?;
 691            room.pending_participant_user_ids
 692                .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
 693            Ok((room, recipient_connection_ids))
 694        } else {
 695            Err(anyhow!("user is not being called"))
 696        }
 697    }
 698
 699    pub fn update_participant_location(
 700        &mut self,
 701        room_id: RoomId,
 702        location: proto::ParticipantLocation,
 703        connection_id: ConnectionId,
 704    ) -> Result<&proto::Room> {
 705        let room = self
 706            .rooms
 707            .get_mut(&room_id)
 708            .ok_or_else(|| anyhow!("no such room"))?;
 709        if let Some(proto::participant_location::Variant::SharedProject(project)) =
 710            location.variant.as_ref()
 711        {
 712            anyhow::ensure!(
 713                room.participants
 714                    .iter()
 715                    .flat_map(|participant| &participant.projects)
 716                    .any(|participant_project| participant_project.id == project.id),
 717                "no such project"
 718            );
 719        }
 720
 721        let participant = room
 722            .participants
 723            .iter_mut()
 724            .find(|participant| participant.peer_id == connection_id.0)
 725            .ok_or_else(|| anyhow!("no such room"))?;
 726        participant.location = Some(location);
 727
 728        Ok(room)
 729    }
 730
 731    pub fn share_project(
 732        &mut self,
 733        room_id: RoomId,
 734        project_id: ProjectId,
 735        worktrees: Vec<proto::WorktreeMetadata>,
 736        host_connection_id: ConnectionId,
 737    ) -> Result<&proto::Room> {
 738        let connection = self
 739            .connections
 740            .get_mut(&host_connection_id)
 741            .ok_or_else(|| anyhow!("no such connection"))?;
 742
 743        let room = self
 744            .rooms
 745            .get_mut(&room_id)
 746            .ok_or_else(|| anyhow!("no such room"))?;
 747        let participant = room
 748            .participants
 749            .iter_mut()
 750            .find(|participant| participant.peer_id == host_connection_id.0)
 751            .ok_or_else(|| anyhow!("no such room"))?;
 752
 753        connection.projects.insert(project_id);
 754        self.projects.insert(
 755            project_id,
 756            Project {
 757                id: project_id,
 758                room_id,
 759                host_connection_id,
 760                host: Collaborator {
 761                    user_id: connection.user_id,
 762                    replica_id: 0,
 763                    last_activity: None,
 764                    admin: connection.admin,
 765                },
 766                guests: Default::default(),
 767                active_replica_ids: Default::default(),
 768                worktrees: worktrees
 769                    .into_iter()
 770                    .map(|worktree| {
 771                        (
 772                            worktree.id,
 773                            Worktree {
 774                                root_name: worktree.root_name,
 775                                visible: worktree.visible,
 776                                ..Default::default()
 777                            },
 778                        )
 779                    })
 780                    .collect(),
 781                language_servers: Default::default(),
 782            },
 783        );
 784
 785        participant
 786            .projects
 787            .extend(Self::build_participant_project(project_id, &self.projects));
 788
 789        Ok(room)
 790    }
 791
 792    pub fn unshare_project(
 793        &mut self,
 794        project_id: ProjectId,
 795        connection_id: ConnectionId,
 796    ) -> Result<(&proto::Room, Project)> {
 797        match self.projects.entry(project_id) {
 798            btree_map::Entry::Occupied(e) => {
 799                if e.get().host_connection_id == connection_id {
 800                    let project = e.remove();
 801
 802                    if let Some(host_connection) = self.connections.get_mut(&connection_id) {
 803                        host_connection.projects.remove(&project_id);
 804                    }
 805
 806                    for guest_connection in project.guests.keys() {
 807                        if let Some(connection) = self.connections.get_mut(guest_connection) {
 808                            connection.projects.remove(&project_id);
 809                        }
 810                    }
 811
 812                    let room = self
 813                        .rooms
 814                        .get_mut(&project.room_id)
 815                        .ok_or_else(|| anyhow!("no such room"))?;
 816                    let participant = room
 817                        .participants
 818                        .iter_mut()
 819                        .find(|participant| participant.peer_id == connection_id.0)
 820                        .ok_or_else(|| anyhow!("no such room"))?;
 821                    participant
 822                        .projects
 823                        .retain(|project| project.id != project_id.to_proto());
 824
 825                    Ok((room, project))
 826                } else {
 827                    Err(anyhow!("no such project"))?
 828                }
 829            }
 830            btree_map::Entry::Vacant(_) => Err(anyhow!("no such project"))?,
 831        }
 832    }
 833
 834    pub fn update_project(
 835        &mut self,
 836        project_id: ProjectId,
 837        worktrees: &[proto::WorktreeMetadata],
 838        connection_id: ConnectionId,
 839    ) -> Result<&proto::Room> {
 840        let project = self
 841            .projects
 842            .get_mut(&project_id)
 843            .ok_or_else(|| anyhow!("no such project"))?;
 844        if project.host_connection_id == connection_id {
 845            let mut old_worktrees = mem::take(&mut project.worktrees);
 846            for worktree in worktrees {
 847                if let Some(old_worktree) = old_worktrees.remove(&worktree.id) {
 848                    project.worktrees.insert(worktree.id, old_worktree);
 849                } else {
 850                    project.worktrees.insert(
 851                        worktree.id,
 852                        Worktree {
 853                            root_name: worktree.root_name.clone(),
 854                            visible: worktree.visible,
 855                            ..Default::default()
 856                        },
 857                    );
 858                }
 859            }
 860
 861            let room = self
 862                .rooms
 863                .get_mut(&project.room_id)
 864                .ok_or_else(|| anyhow!("no such room"))?;
 865            let participant_project = room
 866                .participants
 867                .iter_mut()
 868                .flat_map(|participant| &mut participant.projects)
 869                .find(|project| project.id == project_id.to_proto())
 870                .ok_or_else(|| anyhow!("no such project"))?;
 871            participant_project.worktree_root_names = worktrees
 872                .iter()
 873                .filter(|worktree| worktree.visible)
 874                .map(|worktree| worktree.root_name.clone())
 875                .collect();
 876
 877            Ok(room)
 878        } else {
 879            Err(anyhow!("no such project"))?
 880        }
 881    }
 882
 883    pub fn update_diagnostic_summary(
 884        &mut self,
 885        project_id: ProjectId,
 886        worktree_id: u64,
 887        connection_id: ConnectionId,
 888        summary: proto::DiagnosticSummary,
 889    ) -> Result<Vec<ConnectionId>> {
 890        let project = self
 891            .projects
 892            .get_mut(&project_id)
 893            .ok_or_else(|| anyhow!("no such project"))?;
 894        if project.host_connection_id == connection_id {
 895            let worktree = project
 896                .worktrees
 897                .get_mut(&worktree_id)
 898                .ok_or_else(|| anyhow!("no such worktree"))?;
 899            worktree
 900                .diagnostic_summaries
 901                .insert(summary.path.clone().into(), summary);
 902            return Ok(project.connection_ids());
 903        }
 904
 905        Err(anyhow!("no such worktree"))?
 906    }
 907
 908    pub fn start_language_server(
 909        &mut self,
 910        project_id: ProjectId,
 911        connection_id: ConnectionId,
 912        language_server: proto::LanguageServer,
 913    ) -> Result<Vec<ConnectionId>> {
 914        let project = self
 915            .projects
 916            .get_mut(&project_id)
 917            .ok_or_else(|| anyhow!("no such project"))?;
 918        if project.host_connection_id == connection_id {
 919            project.language_servers.push(language_server);
 920            return Ok(project.connection_ids());
 921        }
 922
 923        Err(anyhow!("no such project"))?
 924    }
 925
 926    pub fn join_project(
 927        &mut self,
 928        requester_connection_id: ConnectionId,
 929        project_id: ProjectId,
 930    ) -> Result<(&Project, ReplicaId)> {
 931        let connection = self
 932            .connections
 933            .get_mut(&requester_connection_id)
 934            .ok_or_else(|| anyhow!("no such connection"))?;
 935        let user = self
 936            .connected_users
 937            .get(&connection.user_id)
 938            .ok_or_else(|| anyhow!("no such connection"))?;
 939        let active_call = user.active_call.ok_or_else(|| anyhow!("no such project"))?;
 940        anyhow::ensure!(
 941            active_call.connection_id == Some(requester_connection_id),
 942            "no such project"
 943        );
 944
 945        let project = self
 946            .projects
 947            .get_mut(&project_id)
 948            .ok_or_else(|| anyhow!("no such project"))?;
 949        anyhow::ensure!(project.room_id == active_call.room_id, "no such project");
 950
 951        connection.projects.insert(project_id);
 952        let mut replica_id = 1;
 953        while project.active_replica_ids.contains(&replica_id) {
 954            replica_id += 1;
 955        }
 956        project.active_replica_ids.insert(replica_id);
 957        project.guests.insert(
 958            requester_connection_id,
 959            Collaborator {
 960                replica_id,
 961                user_id: connection.user_id,
 962                last_activity: Some(OffsetDateTime::now_utc()),
 963                admin: connection.admin,
 964            },
 965        );
 966
 967        project.host.last_activity = Some(OffsetDateTime::now_utc());
 968        Ok((project, replica_id))
 969    }
 970
 971    pub fn leave_project(
 972        &mut self,
 973        project_id: ProjectId,
 974        connection_id: ConnectionId,
 975    ) -> Result<LeftProject> {
 976        let project = self
 977            .projects
 978            .get_mut(&project_id)
 979            .ok_or_else(|| anyhow!("no such project"))?;
 980
 981        // If the connection leaving the project is a collaborator, remove it.
 982        let remove_collaborator = if let Some(guest) = project.guests.remove(&connection_id) {
 983            project.active_replica_ids.remove(&guest.replica_id);
 984            true
 985        } else {
 986            false
 987        };
 988
 989        if let Some(connection) = self.connections.get_mut(&connection_id) {
 990            connection.projects.remove(&project_id);
 991        }
 992
 993        Ok(LeftProject {
 994            id: project.id,
 995            host_connection_id: project.host_connection_id,
 996            host_user_id: project.host.user_id,
 997            connection_ids: project.connection_ids(),
 998            remove_collaborator,
 999        })
1000    }
1001
1002    #[allow(clippy::too_many_arguments)]
1003    pub fn update_worktree(
1004        &mut self,
1005        connection_id: ConnectionId,
1006        project_id: ProjectId,
1007        worktree_id: u64,
1008        worktree_root_name: &str,
1009        removed_entries: &[u64],
1010        updated_entries: &[proto::Entry],
1011        scan_id: u64,
1012        is_last_update: bool,
1013    ) -> Result<Vec<ConnectionId>> {
1014        let project = self.write_project(project_id, connection_id)?;
1015
1016        let connection_ids = project.connection_ids();
1017        let mut worktree = project.worktrees.entry(worktree_id).or_default();
1018        worktree.root_name = worktree_root_name.to_string();
1019
1020        for entry_id in removed_entries {
1021            worktree.entries.remove(entry_id);
1022        }
1023
1024        for entry in updated_entries {
1025            worktree.entries.insert(entry.id, entry.clone());
1026        }
1027
1028        worktree.scan_id = scan_id;
1029        worktree.is_complete = is_last_update;
1030        Ok(connection_ids)
1031    }
1032
1033    fn build_participant_project(
1034        project_id: ProjectId,
1035        projects: &BTreeMap<ProjectId, Project>,
1036    ) -> Option<proto::ParticipantProject> {
1037        Some(proto::ParticipantProject {
1038            id: project_id.to_proto(),
1039            worktree_root_names: projects
1040                .get(&project_id)?
1041                .worktrees
1042                .values()
1043                .filter(|worktree| worktree.visible)
1044                .map(|worktree| worktree.root_name.clone())
1045                .collect(),
1046        })
1047    }
1048
1049    pub fn project_connection_ids(
1050        &self,
1051        project_id: ProjectId,
1052        acting_connection_id: ConnectionId,
1053    ) -> Result<Vec<ConnectionId>> {
1054        Ok(self
1055            .read_project(project_id, acting_connection_id)?
1056            .connection_ids())
1057    }
1058
1059    pub fn channel_connection_ids(&self, channel_id: ChannelId) -> Result<Vec<ConnectionId>> {
1060        Ok(self
1061            .channels
1062            .get(&channel_id)
1063            .ok_or_else(|| anyhow!("no such channel"))?
1064            .connection_ids())
1065    }
1066
1067    pub fn project(&self, project_id: ProjectId) -> Result<&Project> {
1068        self.projects
1069            .get(&project_id)
1070            .ok_or_else(|| anyhow!("no such project"))
1071    }
1072
1073    pub fn register_project_activity(
1074        &mut self,
1075        project_id: ProjectId,
1076        connection_id: ConnectionId,
1077    ) -> Result<()> {
1078        let project = self
1079            .projects
1080            .get_mut(&project_id)
1081            .ok_or_else(|| anyhow!("no such project"))?;
1082        let collaborator = if connection_id == project.host_connection_id {
1083            &mut project.host
1084        } else if let Some(guest) = project.guests.get_mut(&connection_id) {
1085            guest
1086        } else {
1087            return Err(anyhow!("no such project"))?;
1088        };
1089        collaborator.last_activity = Some(OffsetDateTime::now_utc());
1090        Ok(())
1091    }
1092
1093    pub fn projects(&self) -> impl Iterator<Item = (&ProjectId, &Project)> {
1094        self.projects.iter()
1095    }
1096
1097    pub fn read_project(
1098        &self,
1099        project_id: ProjectId,
1100        connection_id: ConnectionId,
1101    ) -> Result<&Project> {
1102        let project = self
1103            .projects
1104            .get(&project_id)
1105            .ok_or_else(|| anyhow!("no such project"))?;
1106        if project.host_connection_id == connection_id
1107            || project.guests.contains_key(&connection_id)
1108        {
1109            Ok(project)
1110        } else {
1111            Err(anyhow!("no such project"))?
1112        }
1113    }
1114
1115    fn write_project(
1116        &mut self,
1117        project_id: ProjectId,
1118        connection_id: ConnectionId,
1119    ) -> Result<&mut Project> {
1120        let project = self
1121            .projects
1122            .get_mut(&project_id)
1123            .ok_or_else(|| anyhow!("no such project"))?;
1124        if project.host_connection_id == connection_id
1125            || project.guests.contains_key(&connection_id)
1126        {
1127            Ok(project)
1128        } else {
1129            Err(anyhow!("no such project"))?
1130        }
1131    }
1132
1133    #[cfg(test)]
1134    pub fn check_invariants(&self) {
1135        for (connection_id, connection) in &self.connections {
1136            for project_id in &connection.projects {
1137                let project = &self.projects.get(project_id).unwrap();
1138                if project.host_connection_id != *connection_id {
1139                    assert!(project.guests.contains_key(connection_id));
1140                }
1141
1142                for (worktree_id, worktree) in project.worktrees.iter() {
1143                    let mut paths = HashMap::default();
1144                    for entry in worktree.entries.values() {
1145                        let prev_entry = paths.insert(&entry.path, entry);
1146                        assert_eq!(
1147                            prev_entry,
1148                            None,
1149                            "worktree {:?}, duplicate path for entries {:?} and {:?}",
1150                            worktree_id,
1151                            prev_entry.unwrap(),
1152                            entry
1153                        );
1154                    }
1155                }
1156            }
1157            for channel_id in &connection.channels {
1158                let channel = self.channels.get(channel_id).unwrap();
1159                assert!(channel.connection_ids.contains(connection_id));
1160            }
1161            assert!(self
1162                .connected_users
1163                .get(&connection.user_id)
1164                .unwrap()
1165                .connection_ids
1166                .contains(connection_id));
1167        }
1168
1169        for (user_id, state) in &self.connected_users {
1170            for connection_id in &state.connection_ids {
1171                assert_eq!(
1172                    self.connections.get(connection_id).unwrap().user_id,
1173                    *user_id
1174                );
1175            }
1176
1177            if let Some(active_call) = state.active_call.as_ref() {
1178                if let Some(active_call_connection_id) = active_call.connection_id {
1179                    assert!(
1180                        state.connection_ids.contains(&active_call_connection_id),
1181                        "call is active on a dead connection"
1182                    );
1183                    assert!(
1184                        state.connection_ids.contains(&active_call_connection_id),
1185                        "call is active on a dead connection"
1186                    );
1187                }
1188            }
1189        }
1190
1191        for (room_id, room) in &self.rooms {
1192            for pending_user_id in &room.pending_participant_user_ids {
1193                assert!(
1194                    self.connected_users
1195                        .contains_key(&UserId::from_proto(*pending_user_id)),
1196                    "call is active on a user that has disconnected"
1197                );
1198            }
1199
1200            for participant in &room.participants {
1201                assert!(
1202                    self.connections
1203                        .contains_key(&ConnectionId(participant.peer_id)),
1204                    "room {} contains participant {:?} that has disconnected",
1205                    room_id,
1206                    participant
1207                );
1208
1209                for participant_project in &participant.projects {
1210                    let project = &self.projects[&ProjectId::from_proto(participant_project.id)];
1211                    assert_eq!(
1212                        project.room_id, *room_id,
1213                        "project was shared on a different room"
1214                    );
1215                }
1216            }
1217
1218            assert!(
1219                !room.pending_participant_user_ids.is_empty() || !room.participants.is_empty(),
1220                "room can't be empty"
1221            );
1222        }
1223
1224        for (project_id, project) in &self.projects {
1225            let host_connection = self.connections.get(&project.host_connection_id).unwrap();
1226            assert!(host_connection.projects.contains(project_id));
1227
1228            for guest_connection_id in project.guests.keys() {
1229                let guest_connection = self.connections.get(guest_connection_id).unwrap();
1230                assert!(guest_connection.projects.contains(project_id));
1231            }
1232            assert_eq!(project.active_replica_ids.len(), project.guests.len());
1233            assert_eq!(
1234                project.active_replica_ids,
1235                project
1236                    .guests
1237                    .values()
1238                    .map(|guest| guest.replica_id)
1239                    .collect::<HashSet<_>>(),
1240            );
1241
1242            let room = &self.rooms[&project.room_id];
1243            let room_participant = room
1244                .participants
1245                .iter()
1246                .find(|participant| participant.peer_id == project.host_connection_id.0)
1247                .unwrap();
1248            assert!(
1249                room_participant
1250                    .projects
1251                    .iter()
1252                    .any(|project| project.id == project_id.to_proto()),
1253                "project was not shared in room"
1254            );
1255        }
1256
1257        for (channel_id, channel) in &self.channels {
1258            for connection_id in &channel.connection_ids {
1259                let connection = self.connections.get(connection_id).unwrap();
1260                assert!(connection.channels.contains(channel_id));
1261            }
1262        }
1263    }
1264}
1265
1266impl Project {
1267    fn is_active_since(&self, start_time: OffsetDateTime) -> bool {
1268        self.guests
1269            .values()
1270            .chain([&self.host])
1271            .any(|collaborator| {
1272                collaborator
1273                    .last_activity
1274                    .map_or(false, |active_time| active_time > start_time)
1275            })
1276    }
1277
1278    pub fn guest_connection_ids(&self) -> Vec<ConnectionId> {
1279        self.guests.keys().copied().collect()
1280    }
1281
1282    pub fn connection_ids(&self) -> Vec<ConnectionId> {
1283        self.guests
1284            .keys()
1285            .copied()
1286            .chain(Some(self.host_connection_id))
1287            .collect()
1288    }
1289}
1290
1291impl Channel {
1292    fn connection_ids(&self) -> Vec<ConnectionId> {
1293        self.connection_ids.iter().copied().collect()
1294    }
1295}