store.rs

   1use crate::db::{self, ChannelId, ProjectId, UserId};
   2use anyhow::{anyhow, Result};
   3use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet};
   4use nanoid::nanoid;
   5use rpc::{proto, ConnectionId};
   6use serde::Serialize;
   7use std::{borrow::Cow, mem, path::PathBuf, str, time::Duration};
   8use time::OffsetDateTime;
   9use tracing::instrument;
  10use util::post_inc;
  11
  12pub type RoomId = u64;
  13
  14#[derive(Default, Serialize)]
  15pub struct Store {
  16    connections: BTreeMap<ConnectionId, ConnectionState>,
  17    connected_users: BTreeMap<UserId, ConnectedUser>,
  18    next_room_id: RoomId,
  19    rooms: BTreeMap<RoomId, proto::Room>,
  20    projects: BTreeMap<ProjectId, Project>,
  21    #[serde(skip)]
  22    channels: BTreeMap<ChannelId, Channel>,
  23}
  24
  25#[derive(Default, Serialize)]
  26struct ConnectedUser {
  27    connection_ids: HashSet<ConnectionId>,
  28    active_call: Option<Call>,
  29}
  30
  31#[derive(Serialize)]
  32struct ConnectionState {
  33    user_id: UserId,
  34    admin: bool,
  35    projects: BTreeSet<ProjectId>,
  36    channels: HashSet<ChannelId>,
  37}
  38
  39#[derive(Copy, Clone, Eq, PartialEq, Serialize)]
  40pub struct Call {
  41    pub caller_user_id: UserId,
  42    pub room_id: RoomId,
  43    pub connection_id: Option<ConnectionId>,
  44    pub initial_project_id: Option<ProjectId>,
  45}
  46
  47#[derive(Serialize)]
  48pub struct Project {
  49    pub id: ProjectId,
  50    pub room_id: RoomId,
  51    pub host_connection_id: ConnectionId,
  52    pub host: Collaborator,
  53    pub guests: HashMap<ConnectionId, Collaborator>,
  54    pub active_replica_ids: HashSet<ReplicaId>,
  55    pub worktrees: BTreeMap<u64, Worktree>,
  56    pub language_servers: Vec<proto::LanguageServer>,
  57}
  58
  59#[derive(Serialize)]
  60pub struct Collaborator {
  61    pub replica_id: ReplicaId,
  62    pub user_id: UserId,
  63    #[serde(skip)]
  64    pub last_activity: Option<OffsetDateTime>,
  65    pub admin: bool,
  66}
  67
  68#[derive(Default, Serialize)]
  69pub struct Worktree {
  70    pub root_name: String,
  71    pub visible: bool,
  72    #[serde(skip)]
  73    pub entries: BTreeMap<u64, proto::Entry>,
  74    #[serde(skip)]
  75    pub diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
  76    pub scan_id: u64,
  77    pub is_complete: bool,
  78}
  79
  80#[derive(Default)]
  81pub struct Channel {
  82    pub connection_ids: HashSet<ConnectionId>,
  83}
  84
  85pub type ReplicaId = u16;
  86
  87#[derive(Default)]
  88pub struct RemovedConnectionState<'a> {
  89    pub user_id: UserId,
  90    pub hosted_projects: Vec<Project>,
  91    pub guest_projects: Vec<LeftProject>,
  92    pub contact_ids: HashSet<UserId>,
  93    pub room: Option<Cow<'a, proto::Room>>,
  94    pub canceled_call_connection_ids: Vec<ConnectionId>,
  95}
  96
  97pub struct LeftProject {
  98    pub id: ProjectId,
  99    pub host_user_id: UserId,
 100    pub host_connection_id: ConnectionId,
 101    pub connection_ids: Vec<ConnectionId>,
 102    pub remove_collaborator: bool,
 103}
 104
 105pub struct LeftRoom<'a> {
 106    pub room: Cow<'a, proto::Room>,
 107    pub unshared_projects: Vec<Project>,
 108    pub left_projects: Vec<LeftProject>,
 109    pub canceled_call_connection_ids: Vec<ConnectionId>,
 110}
 111
 112#[derive(Copy, Clone)]
 113pub struct Metrics {
 114    pub connections: usize,
 115    pub registered_projects: usize,
 116    pub active_projects: usize,
 117    pub shared_projects: usize,
 118}
 119
 120impl Store {
 121    pub fn metrics(&self) -> Metrics {
 122        const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60);
 123        let active_window_start = OffsetDateTime::now_utc() - ACTIVE_PROJECT_TIMEOUT;
 124
 125        let connections = self.connections.values().filter(|c| !c.admin).count();
 126        let mut registered_projects = 0;
 127        let mut active_projects = 0;
 128        let mut shared_projects = 0;
 129        for project in self.projects.values() {
 130            if let Some(connection) = self.connections.get(&project.host_connection_id) {
 131                if !connection.admin {
 132                    registered_projects += 1;
 133                    if project.is_active_since(active_window_start) {
 134                        active_projects += 1;
 135                        if !project.guests.is_empty() {
 136                            shared_projects += 1;
 137                        }
 138                    }
 139                }
 140            }
 141        }
 142
 143        Metrics {
 144            connections,
 145            registered_projects,
 146            active_projects,
 147            shared_projects,
 148        }
 149    }
 150
 151    #[instrument(skip(self))]
 152    pub fn add_connection(
 153        &mut self,
 154        connection_id: ConnectionId,
 155        user_id: UserId,
 156        admin: bool,
 157    ) -> Option<proto::IncomingCall> {
 158        self.connections.insert(
 159            connection_id,
 160            ConnectionState {
 161                user_id,
 162                admin,
 163                projects: Default::default(),
 164                channels: Default::default(),
 165            },
 166        );
 167        let connected_user = self.connected_users.entry(user_id).or_default();
 168        connected_user.connection_ids.insert(connection_id);
 169        if let Some(active_call) = connected_user.active_call {
 170            if active_call.connection_id.is_some() {
 171                None
 172            } else {
 173                let room = self.room(active_call.room_id)?;
 174                Some(proto::IncomingCall {
 175                    room_id: active_call.room_id,
 176                    caller_user_id: active_call.caller_user_id.to_proto(),
 177                    participant_user_ids: room
 178                        .participants
 179                        .iter()
 180                        .map(|participant| participant.user_id)
 181                        .collect(),
 182                    initial_project: active_call
 183                        .initial_project_id
 184                        .and_then(|id| Self::build_participant_project(id, &self.projects)),
 185                })
 186            }
 187        } else {
 188            None
 189        }
 190    }
 191
 192    #[instrument(skip(self))]
 193    pub fn remove_connection(
 194        &mut self,
 195        connection_id: ConnectionId,
 196    ) -> Result<RemovedConnectionState> {
 197        let connection = self
 198            .connections
 199            .get_mut(&connection_id)
 200            .ok_or_else(|| anyhow!("no such connection"))?;
 201
 202        let user_id = connection.user_id;
 203        let connection_channels = mem::take(&mut connection.channels);
 204
 205        let mut result = RemovedConnectionState {
 206            user_id,
 207            ..Default::default()
 208        };
 209
 210        // Leave all channels.
 211        for channel_id in connection_channels {
 212            self.leave_channel(connection_id, channel_id);
 213        }
 214
 215        let connected_user = self.connected_users.get(&user_id).unwrap();
 216        if let Some(active_call) = connected_user.active_call.as_ref() {
 217            let room_id = active_call.room_id;
 218            let left_room = self.leave_room(room_id, connection_id)?;
 219            result.hosted_projects = left_room.unshared_projects;
 220            result.guest_projects = left_room.left_projects;
 221            result.room = Some(Cow::Owned(left_room.room.into_owned()));
 222            result.canceled_call_connection_ids = left_room.canceled_call_connection_ids;
 223        }
 224
 225        let connected_user = self.connected_users.get_mut(&user_id).unwrap();
 226        connected_user.connection_ids.remove(&connection_id);
 227        if connected_user.connection_ids.is_empty() {
 228            self.connected_users.remove(&user_id);
 229        }
 230        self.connections.remove(&connection_id).unwrap();
 231
 232        Ok(result)
 233    }
 234
 235    #[cfg(test)]
 236    pub fn channel(&self, id: ChannelId) -> Option<&Channel> {
 237        self.channels.get(&id)
 238    }
 239
 240    pub fn join_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
 241        if let Some(connection) = self.connections.get_mut(&connection_id) {
 242            connection.channels.insert(channel_id);
 243            self.channels
 244                .entry(channel_id)
 245                .or_default()
 246                .connection_ids
 247                .insert(connection_id);
 248        }
 249    }
 250
 251    pub fn leave_channel(&mut self, connection_id: ConnectionId, channel_id: ChannelId) {
 252        if let Some(connection) = self.connections.get_mut(&connection_id) {
 253            connection.channels.remove(&channel_id);
 254            if let btree_map::Entry::Occupied(mut entry) = self.channels.entry(channel_id) {
 255                entry.get_mut().connection_ids.remove(&connection_id);
 256                if entry.get_mut().connection_ids.is_empty() {
 257                    entry.remove();
 258                }
 259            }
 260        }
 261    }
 262
 263    pub fn user_id_for_connection(&self, connection_id: ConnectionId) -> Result<UserId> {
 264        Ok(self
 265            .connections
 266            .get(&connection_id)
 267            .ok_or_else(|| anyhow!("unknown connection"))?
 268            .user_id)
 269    }
 270
 271    pub fn connection_ids_for_user(
 272        &self,
 273        user_id: UserId,
 274    ) -> impl Iterator<Item = ConnectionId> + '_ {
 275        self.connected_users
 276            .get(&user_id)
 277            .into_iter()
 278            .map(|state| &state.connection_ids)
 279            .flatten()
 280            .copied()
 281    }
 282
 283    pub fn is_user_online(&self, user_id: UserId) -> bool {
 284        !self
 285            .connected_users
 286            .get(&user_id)
 287            .unwrap_or(&Default::default())
 288            .connection_ids
 289            .is_empty()
 290    }
 291
 292    fn is_user_busy(&self, user_id: UserId) -> bool {
 293        self.connected_users
 294            .get(&user_id)
 295            .unwrap_or(&Default::default())
 296            .active_call
 297            .is_some()
 298    }
 299
 300    pub fn build_initial_contacts_update(
 301        &self,
 302        contacts: Vec<db::Contact>,
 303    ) -> proto::UpdateContacts {
 304        let mut update = proto::UpdateContacts::default();
 305
 306        for contact in contacts {
 307            match contact {
 308                db::Contact::Accepted {
 309                    user_id,
 310                    should_notify,
 311                } => {
 312                    update
 313                        .contacts
 314                        .push(self.contact_for_user(user_id, should_notify));
 315                }
 316                db::Contact::Outgoing { user_id } => {
 317                    update.outgoing_requests.push(user_id.to_proto())
 318                }
 319                db::Contact::Incoming {
 320                    user_id,
 321                    should_notify,
 322                } => update
 323                    .incoming_requests
 324                    .push(proto::IncomingContactRequest {
 325                        requester_id: user_id.to_proto(),
 326                        should_notify,
 327                    }),
 328            }
 329        }
 330
 331        update
 332    }
 333
 334    pub fn contact_for_user(&self, user_id: UserId, should_notify: bool) -> proto::Contact {
 335        proto::Contact {
 336            user_id: user_id.to_proto(),
 337            online: self.is_user_online(user_id),
 338            busy: self.is_user_busy(user_id),
 339            should_notify,
 340        }
 341    }
 342
 343    pub fn create_room(&mut self, creator_connection_id: ConnectionId) -> Result<&proto::Room> {
 344        let connection = self
 345            .connections
 346            .get_mut(&creator_connection_id)
 347            .ok_or_else(|| anyhow!("no such connection"))?;
 348        let connected_user = self
 349            .connected_users
 350            .get_mut(&connection.user_id)
 351            .ok_or_else(|| anyhow!("no such connection"))?;
 352        anyhow::ensure!(
 353            connected_user.active_call.is_none(),
 354            "can't create a room with an active call"
 355        );
 356
 357        let room_id = post_inc(&mut self.next_room_id);
 358        let room = proto::Room {
 359            id: room_id,
 360            participants: vec![proto::Participant {
 361                user_id: connection.user_id.to_proto(),
 362                peer_id: creator_connection_id.0,
 363                projects: Default::default(),
 364                location: Some(proto::ParticipantLocation {
 365                    variant: Some(proto::participant_location::Variant::External(
 366                        proto::participant_location::External {},
 367                    )),
 368                }),
 369            }],
 370            pending_participant_user_ids: Default::default(),
 371            live_kit_room: nanoid!(30),
 372        };
 373
 374        self.rooms.insert(room_id, room);
 375        connected_user.active_call = Some(Call {
 376            caller_user_id: connection.user_id,
 377            room_id,
 378            connection_id: Some(creator_connection_id),
 379            initial_project_id: None,
 380        });
 381        Ok(self.rooms.get(&room_id).unwrap())
 382    }
 383
 384    pub fn join_room(
 385        &mut self,
 386        room_id: RoomId,
 387        connection_id: ConnectionId,
 388    ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
 389        let connection = self
 390            .connections
 391            .get_mut(&connection_id)
 392            .ok_or_else(|| anyhow!("no such connection"))?;
 393        let user_id = connection.user_id;
 394        let recipient_connection_ids = self.connection_ids_for_user(user_id).collect::<Vec<_>>();
 395
 396        let connected_user = self
 397            .connected_users
 398            .get_mut(&user_id)
 399            .ok_or_else(|| anyhow!("no such connection"))?;
 400        let active_call = connected_user
 401            .active_call
 402            .as_mut()
 403            .ok_or_else(|| anyhow!("not being called"))?;
 404        anyhow::ensure!(
 405            active_call.room_id == room_id && active_call.connection_id.is_none(),
 406            "not being called on this room"
 407        );
 408
 409        let room = self
 410            .rooms
 411            .get_mut(&room_id)
 412            .ok_or_else(|| anyhow!("no such room"))?;
 413        anyhow::ensure!(
 414            room.pending_participant_user_ids
 415                .contains(&user_id.to_proto()),
 416            anyhow!("no such room")
 417        );
 418        room.pending_participant_user_ids
 419            .retain(|pending| *pending != user_id.to_proto());
 420        room.participants.push(proto::Participant {
 421            user_id: user_id.to_proto(),
 422            peer_id: connection_id.0,
 423            projects: Default::default(),
 424            location: Some(proto::ParticipantLocation {
 425                variant: Some(proto::participant_location::Variant::External(
 426                    proto::participant_location::External {},
 427                )),
 428            }),
 429        });
 430        active_call.connection_id = Some(connection_id);
 431
 432        Ok((room, recipient_connection_ids))
 433    }
 434
 435    pub fn leave_room(&mut self, room_id: RoomId, connection_id: ConnectionId) -> Result<LeftRoom> {
 436        let connection = self
 437            .connections
 438            .get_mut(&connection_id)
 439            .ok_or_else(|| anyhow!("no such connection"))?;
 440        let user_id = connection.user_id;
 441
 442        let connected_user = self
 443            .connected_users
 444            .get(&user_id)
 445            .ok_or_else(|| anyhow!("no such connection"))?;
 446        anyhow::ensure!(
 447            connected_user
 448                .active_call
 449                .map_or(false, |call| call.room_id == room_id
 450                    && call.connection_id == Some(connection_id)),
 451            "cannot leave a room before joining it"
 452        );
 453
 454        // Given that users can only join one room at a time, we can safely unshare
 455        // and leave all projects associated with the connection.
 456        let mut unshared_projects = Vec::new();
 457        let mut left_projects = Vec::new();
 458        for project_id in connection.projects.clone() {
 459            if let Ok((_, project)) = self.unshare_project(project_id, connection_id) {
 460                unshared_projects.push(project);
 461            } else if let Ok(project) = self.leave_project(project_id, connection_id) {
 462                left_projects.push(project);
 463            }
 464        }
 465        self.connected_users.get_mut(&user_id).unwrap().active_call = None;
 466
 467        let room = self
 468            .rooms
 469            .get_mut(&room_id)
 470            .ok_or_else(|| anyhow!("no such room"))?;
 471        room.participants
 472            .retain(|participant| participant.peer_id != connection_id.0);
 473
 474        let mut canceled_call_connection_ids = Vec::new();
 475        room.pending_participant_user_ids
 476            .retain(|pending_participant_user_id| {
 477                if let Some(connected_user) = self
 478                    .connected_users
 479                    .get_mut(&UserId::from_proto(*pending_participant_user_id))
 480                {
 481                    if let Some(call) = connected_user.active_call.as_ref() {
 482                        if call.caller_user_id == user_id {
 483                            connected_user.active_call.take();
 484                            canceled_call_connection_ids
 485                                .extend(connected_user.connection_ids.iter().copied());
 486                            false
 487                        } else {
 488                            true
 489                        }
 490                    } else {
 491                        true
 492                    }
 493                } else {
 494                    true
 495                }
 496            });
 497
 498        let room = if room.participants.is_empty() && room.pending_participant_user_ids.is_empty() {
 499            Cow::Owned(self.rooms.remove(&room_id).unwrap())
 500        } else {
 501            Cow::Borrowed(self.rooms.get(&room_id).unwrap())
 502        };
 503
 504        Ok(LeftRoom {
 505            room,
 506            unshared_projects,
 507            left_projects,
 508            canceled_call_connection_ids,
 509        })
 510    }
 511
 512    pub fn room(&self, room_id: RoomId) -> Option<&proto::Room> {
 513        self.rooms.get(&room_id)
 514    }
 515
 516    pub fn call(
 517        &mut self,
 518        room_id: RoomId,
 519        recipient_user_id: UserId,
 520        initial_project_id: Option<ProjectId>,
 521        from_connection_id: ConnectionId,
 522    ) -> Result<(&proto::Room, Vec<ConnectionId>, proto::IncomingCall)> {
 523        let caller_user_id = self.user_id_for_connection(from_connection_id)?;
 524
 525        let recipient_connection_ids = self
 526            .connection_ids_for_user(recipient_user_id)
 527            .collect::<Vec<_>>();
 528        let mut recipient = self
 529            .connected_users
 530            .get_mut(&recipient_user_id)
 531            .ok_or_else(|| anyhow!("no such connection"))?;
 532        anyhow::ensure!(
 533            recipient.active_call.is_none(),
 534            "recipient is already on another call"
 535        );
 536
 537        let room = self
 538            .rooms
 539            .get_mut(&room_id)
 540            .ok_or_else(|| anyhow!("no such room"))?;
 541        anyhow::ensure!(
 542            room.participants
 543                .iter()
 544                .any(|participant| participant.peer_id == from_connection_id.0),
 545            "no such room"
 546        );
 547        anyhow::ensure!(
 548            room.pending_participant_user_ids
 549                .iter()
 550                .all(|user_id| UserId::from_proto(*user_id) != recipient_user_id),
 551            "cannot call the same user more than once"
 552        );
 553        room.pending_participant_user_ids
 554            .push(recipient_user_id.to_proto());
 555
 556        if let Some(initial_project_id) = initial_project_id {
 557            let project = self
 558                .projects
 559                .get(&initial_project_id)
 560                .ok_or_else(|| anyhow!("no such project"))?;
 561            anyhow::ensure!(project.room_id == room_id, "no such project");
 562        }
 563
 564        recipient.active_call = Some(Call {
 565            caller_user_id,
 566            room_id,
 567            connection_id: None,
 568            initial_project_id,
 569        });
 570
 571        Ok((
 572            room,
 573            recipient_connection_ids,
 574            proto::IncomingCall {
 575                room_id,
 576                caller_user_id: caller_user_id.to_proto(),
 577                participant_user_ids: room
 578                    .participants
 579                    .iter()
 580                    .map(|participant| participant.user_id)
 581                    .collect(),
 582                initial_project: initial_project_id
 583                    .and_then(|id| Self::build_participant_project(id, &self.projects)),
 584            },
 585        ))
 586    }
 587
 588    pub fn call_failed(&mut self, room_id: RoomId, to_user_id: UserId) -> Result<&proto::Room> {
 589        let mut recipient = self
 590            .connected_users
 591            .get_mut(&to_user_id)
 592            .ok_or_else(|| anyhow!("no such connection"))?;
 593        anyhow::ensure!(recipient
 594            .active_call
 595            .map_or(false, |call| call.room_id == room_id
 596                && call.connection_id.is_none()));
 597        recipient.active_call = None;
 598        let room = self
 599            .rooms
 600            .get_mut(&room_id)
 601            .ok_or_else(|| anyhow!("no such room"))?;
 602        room.pending_participant_user_ids
 603            .retain(|user_id| UserId::from_proto(*user_id) != to_user_id);
 604        Ok(room)
 605    }
 606
 607    pub fn cancel_call(
 608        &mut self,
 609        room_id: RoomId,
 610        recipient_user_id: UserId,
 611        canceller_connection_id: ConnectionId,
 612    ) -> Result<(&proto::Room, HashSet<ConnectionId>)> {
 613        let canceller_user_id = self.user_id_for_connection(canceller_connection_id)?;
 614        let canceller = self
 615            .connected_users
 616            .get(&canceller_user_id)
 617            .ok_or_else(|| anyhow!("no such connection"))?;
 618        let recipient = self
 619            .connected_users
 620            .get(&recipient_user_id)
 621            .ok_or_else(|| anyhow!("no such connection"))?;
 622        let canceller_active_call = canceller
 623            .active_call
 624            .as_ref()
 625            .ok_or_else(|| anyhow!("no active call"))?;
 626        let recipient_active_call = recipient
 627            .active_call
 628            .as_ref()
 629            .ok_or_else(|| anyhow!("no active call for recipient"))?;
 630
 631        anyhow::ensure!(
 632            canceller_active_call.room_id == room_id,
 633            "users are on different calls"
 634        );
 635        anyhow::ensure!(
 636            recipient_active_call.room_id == room_id,
 637            "users are on different calls"
 638        );
 639        anyhow::ensure!(
 640            recipient_active_call.connection_id.is_none(),
 641            "recipient has already answered"
 642        );
 643        let room_id = recipient_active_call.room_id;
 644        let room = self
 645            .rooms
 646            .get_mut(&room_id)
 647            .ok_or_else(|| anyhow!("no such room"))?;
 648        room.pending_participant_user_ids
 649            .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
 650
 651        let recipient = self.connected_users.get_mut(&recipient_user_id).unwrap();
 652        recipient.active_call.take();
 653
 654        Ok((room, recipient.connection_ids.clone()))
 655    }
 656
 657    pub fn decline_call(
 658        &mut self,
 659        room_id: RoomId,
 660        recipient_connection_id: ConnectionId,
 661    ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
 662        let recipient_user_id = self.user_id_for_connection(recipient_connection_id)?;
 663        let recipient = self
 664            .connected_users
 665            .get_mut(&recipient_user_id)
 666            .ok_or_else(|| anyhow!("no such connection"))?;
 667        if let Some(active_call) = recipient.active_call.take() {
 668            anyhow::ensure!(active_call.room_id == room_id, "no such room");
 669            let recipient_connection_ids = self
 670                .connection_ids_for_user(recipient_user_id)
 671                .collect::<Vec<_>>();
 672            let room = self
 673                .rooms
 674                .get_mut(&active_call.room_id)
 675                .ok_or_else(|| anyhow!("no such room"))?;
 676            room.pending_participant_user_ids
 677                .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
 678            Ok((room, recipient_connection_ids))
 679        } else {
 680            Err(anyhow!("user is not being called"))
 681        }
 682    }
 683
 684    pub fn update_participant_location(
 685        &mut self,
 686        room_id: RoomId,
 687        location: proto::ParticipantLocation,
 688        connection_id: ConnectionId,
 689    ) -> Result<&proto::Room> {
 690        let room = self
 691            .rooms
 692            .get_mut(&room_id)
 693            .ok_or_else(|| anyhow!("no such room"))?;
 694        if let Some(proto::participant_location::Variant::SharedProject(project)) =
 695            location.variant.as_ref()
 696        {
 697            anyhow::ensure!(
 698                room.participants
 699                    .iter()
 700                    .flat_map(|participant| &participant.projects)
 701                    .any(|participant_project| participant_project.id == project.id),
 702                "no such project"
 703            );
 704        }
 705
 706        let participant = room
 707            .participants
 708            .iter_mut()
 709            .find(|participant| participant.peer_id == connection_id.0)
 710            .ok_or_else(|| anyhow!("no such room"))?;
 711        participant.location = Some(location);
 712
 713        Ok(room)
 714    }
 715
 716    pub fn share_project(
 717        &mut self,
 718        room_id: RoomId,
 719        project_id: ProjectId,
 720        worktrees: Vec<proto::WorktreeMetadata>,
 721        host_connection_id: ConnectionId,
 722    ) -> Result<&proto::Room> {
 723        let connection = self
 724            .connections
 725            .get_mut(&host_connection_id)
 726            .ok_or_else(|| anyhow!("no such connection"))?;
 727
 728        let room = self
 729            .rooms
 730            .get_mut(&room_id)
 731            .ok_or_else(|| anyhow!("no such room"))?;
 732        let participant = room
 733            .participants
 734            .iter_mut()
 735            .find(|participant| participant.peer_id == host_connection_id.0)
 736            .ok_or_else(|| anyhow!("no such room"))?;
 737
 738        connection.projects.insert(project_id);
 739        self.projects.insert(
 740            project_id,
 741            Project {
 742                id: project_id,
 743                room_id,
 744                host_connection_id,
 745                host: Collaborator {
 746                    user_id: connection.user_id,
 747                    replica_id: 0,
 748                    last_activity: None,
 749                    admin: connection.admin,
 750                },
 751                guests: Default::default(),
 752                active_replica_ids: Default::default(),
 753                worktrees: worktrees
 754                    .into_iter()
 755                    .map(|worktree| {
 756                        (
 757                            worktree.id,
 758                            Worktree {
 759                                root_name: worktree.root_name,
 760                                visible: worktree.visible,
 761                                ..Default::default()
 762                            },
 763                        )
 764                    })
 765                    .collect(),
 766                language_servers: Default::default(),
 767            },
 768        );
 769
 770        participant
 771            .projects
 772            .extend(Self::build_participant_project(project_id, &self.projects));
 773
 774        Ok(room)
 775    }
 776
 777    pub fn unshare_project(
 778        &mut self,
 779        project_id: ProjectId,
 780        connection_id: ConnectionId,
 781    ) -> Result<(&proto::Room, Project)> {
 782        match self.projects.entry(project_id) {
 783            btree_map::Entry::Occupied(e) => {
 784                if e.get().host_connection_id == connection_id {
 785                    let project = e.remove();
 786
 787                    if let Some(host_connection) = self.connections.get_mut(&connection_id) {
 788                        host_connection.projects.remove(&project_id);
 789                    }
 790
 791                    for guest_connection in project.guests.keys() {
 792                        if let Some(connection) = self.connections.get_mut(guest_connection) {
 793                            connection.projects.remove(&project_id);
 794                        }
 795                    }
 796
 797                    let room = self
 798                        .rooms
 799                        .get_mut(&project.room_id)
 800                        .ok_or_else(|| anyhow!("no such room"))?;
 801                    let participant = room
 802                        .participants
 803                        .iter_mut()
 804                        .find(|participant| participant.peer_id == connection_id.0)
 805                        .ok_or_else(|| anyhow!("no such room"))?;
 806                    participant
 807                        .projects
 808                        .retain(|project| project.id != project_id.to_proto());
 809
 810                    Ok((room, project))
 811                } else {
 812                    Err(anyhow!("no such project"))?
 813                }
 814            }
 815            btree_map::Entry::Vacant(_) => Err(anyhow!("no such project"))?,
 816        }
 817    }
 818
 819    pub fn update_project(
 820        &mut self,
 821        project_id: ProjectId,
 822        worktrees: &[proto::WorktreeMetadata],
 823        connection_id: ConnectionId,
 824    ) -> Result<&proto::Room> {
 825        let project = self
 826            .projects
 827            .get_mut(&project_id)
 828            .ok_or_else(|| anyhow!("no such project"))?;
 829        if project.host_connection_id == connection_id {
 830            let mut old_worktrees = mem::take(&mut project.worktrees);
 831            for worktree in worktrees {
 832                if let Some(old_worktree) = old_worktrees.remove(&worktree.id) {
 833                    project.worktrees.insert(worktree.id, old_worktree);
 834                } else {
 835                    project.worktrees.insert(
 836                        worktree.id,
 837                        Worktree {
 838                            root_name: worktree.root_name.clone(),
 839                            visible: worktree.visible,
 840                            ..Default::default()
 841                        },
 842                    );
 843                }
 844            }
 845
 846            let room = self
 847                .rooms
 848                .get_mut(&project.room_id)
 849                .ok_or_else(|| anyhow!("no such room"))?;
 850            let participant_project = room
 851                .participants
 852                .iter_mut()
 853                .flat_map(|participant| &mut participant.projects)
 854                .find(|project| project.id == project_id.to_proto())
 855                .ok_or_else(|| anyhow!("no such project"))?;
 856            participant_project.worktree_root_names = worktrees
 857                .iter()
 858                .filter(|worktree| worktree.visible)
 859                .map(|worktree| worktree.root_name.clone())
 860                .collect();
 861
 862            Ok(room)
 863        } else {
 864            Err(anyhow!("no such project"))?
 865        }
 866    }
 867
 868    pub fn update_diagnostic_summary(
 869        &mut self,
 870        project_id: ProjectId,
 871        worktree_id: u64,
 872        connection_id: ConnectionId,
 873        summary: proto::DiagnosticSummary,
 874    ) -> Result<Vec<ConnectionId>> {
 875        let project = self
 876            .projects
 877            .get_mut(&project_id)
 878            .ok_or_else(|| anyhow!("no such project"))?;
 879        if project.host_connection_id == connection_id {
 880            let worktree = project
 881                .worktrees
 882                .get_mut(&worktree_id)
 883                .ok_or_else(|| anyhow!("no such worktree"))?;
 884            worktree
 885                .diagnostic_summaries
 886                .insert(summary.path.clone().into(), summary);
 887            return Ok(project.connection_ids());
 888        }
 889
 890        Err(anyhow!("no such worktree"))?
 891    }
 892
 893    pub fn start_language_server(
 894        &mut self,
 895        project_id: ProjectId,
 896        connection_id: ConnectionId,
 897        language_server: proto::LanguageServer,
 898    ) -> Result<Vec<ConnectionId>> {
 899        let project = self
 900            .projects
 901            .get_mut(&project_id)
 902            .ok_or_else(|| anyhow!("no such project"))?;
 903        if project.host_connection_id == connection_id {
 904            project.language_servers.push(language_server);
 905            return Ok(project.connection_ids());
 906        }
 907
 908        Err(anyhow!("no such project"))?
 909    }
 910
 911    pub fn join_project(
 912        &mut self,
 913        requester_connection_id: ConnectionId,
 914        project_id: ProjectId,
 915    ) -> Result<(&Project, ReplicaId)> {
 916        let connection = self
 917            .connections
 918            .get_mut(&requester_connection_id)
 919            .ok_or_else(|| anyhow!("no such connection"))?;
 920        let user = self
 921            .connected_users
 922            .get(&connection.user_id)
 923            .ok_or_else(|| anyhow!("no such connection"))?;
 924        let active_call = user.active_call.ok_or_else(|| anyhow!("no such project"))?;
 925        anyhow::ensure!(
 926            active_call.connection_id == Some(requester_connection_id),
 927            "no such project"
 928        );
 929
 930        let project = self
 931            .projects
 932            .get_mut(&project_id)
 933            .ok_or_else(|| anyhow!("no such project"))?;
 934        anyhow::ensure!(project.room_id == active_call.room_id, "no such project");
 935
 936        connection.projects.insert(project_id);
 937        let mut replica_id = 1;
 938        while project.active_replica_ids.contains(&replica_id) {
 939            replica_id += 1;
 940        }
 941        project.active_replica_ids.insert(replica_id);
 942        project.guests.insert(
 943            requester_connection_id,
 944            Collaborator {
 945                replica_id,
 946                user_id: connection.user_id,
 947                last_activity: Some(OffsetDateTime::now_utc()),
 948                admin: connection.admin,
 949            },
 950        );
 951
 952        project.host.last_activity = Some(OffsetDateTime::now_utc());
 953        Ok((project, replica_id))
 954    }
 955
 956    pub fn leave_project(
 957        &mut self,
 958        project_id: ProjectId,
 959        connection_id: ConnectionId,
 960    ) -> Result<LeftProject> {
 961        let project = self
 962            .projects
 963            .get_mut(&project_id)
 964            .ok_or_else(|| anyhow!("no such project"))?;
 965
 966        // If the connection leaving the project is a collaborator, remove it.
 967        let remove_collaborator = if let Some(guest) = project.guests.remove(&connection_id) {
 968            project.active_replica_ids.remove(&guest.replica_id);
 969            true
 970        } else {
 971            false
 972        };
 973
 974        if let Some(connection) = self.connections.get_mut(&connection_id) {
 975            connection.projects.remove(&project_id);
 976        }
 977
 978        Ok(LeftProject {
 979            id: project.id,
 980            host_connection_id: project.host_connection_id,
 981            host_user_id: project.host.user_id,
 982            connection_ids: project.connection_ids(),
 983            remove_collaborator,
 984        })
 985    }
 986
 987    #[allow(clippy::too_many_arguments)]
 988    pub fn update_worktree(
 989        &mut self,
 990        connection_id: ConnectionId,
 991        project_id: ProjectId,
 992        worktree_id: u64,
 993        worktree_root_name: &str,
 994        removed_entries: &[u64],
 995        updated_entries: &[proto::Entry],
 996        scan_id: u64,
 997        is_last_update: bool,
 998    ) -> Result<Vec<ConnectionId>> {
 999        let project = self.write_project(project_id, connection_id)?;
1000
1001        let connection_ids = project.connection_ids();
1002        let mut worktree = project.worktrees.entry(worktree_id).or_default();
1003        worktree.root_name = worktree_root_name.to_string();
1004
1005        for entry_id in removed_entries {
1006            worktree.entries.remove(entry_id);
1007        }
1008
1009        for entry in updated_entries {
1010            worktree.entries.insert(entry.id, entry.clone());
1011        }
1012
1013        worktree.scan_id = scan_id;
1014        worktree.is_complete = is_last_update;
1015        Ok(connection_ids)
1016    }
1017
1018    fn build_participant_project(
1019        project_id: ProjectId,
1020        projects: &BTreeMap<ProjectId, Project>,
1021    ) -> Option<proto::ParticipantProject> {
1022        Some(proto::ParticipantProject {
1023            id: project_id.to_proto(),
1024            worktree_root_names: projects
1025                .get(&project_id)?
1026                .worktrees
1027                .values()
1028                .filter(|worktree| worktree.visible)
1029                .map(|worktree| worktree.root_name.clone())
1030                .collect(),
1031        })
1032    }
1033
1034    pub fn project_connection_ids(
1035        &self,
1036        project_id: ProjectId,
1037        acting_connection_id: ConnectionId,
1038    ) -> Result<Vec<ConnectionId>> {
1039        Ok(self
1040            .read_project(project_id, acting_connection_id)?
1041            .connection_ids())
1042    }
1043
1044    pub fn channel_connection_ids(&self, channel_id: ChannelId) -> Result<Vec<ConnectionId>> {
1045        Ok(self
1046            .channels
1047            .get(&channel_id)
1048            .ok_or_else(|| anyhow!("no such channel"))?
1049            .connection_ids())
1050    }
1051
1052    pub fn project(&self, project_id: ProjectId) -> Result<&Project> {
1053        self.projects
1054            .get(&project_id)
1055            .ok_or_else(|| anyhow!("no such project"))
1056    }
1057
1058    pub fn register_project_activity(
1059        &mut self,
1060        project_id: ProjectId,
1061        connection_id: ConnectionId,
1062    ) -> Result<()> {
1063        let project = self
1064            .projects
1065            .get_mut(&project_id)
1066            .ok_or_else(|| anyhow!("no such project"))?;
1067        let collaborator = if connection_id == project.host_connection_id {
1068            &mut project.host
1069        } else if let Some(guest) = project.guests.get_mut(&connection_id) {
1070            guest
1071        } else {
1072            return Err(anyhow!("no such project"))?;
1073        };
1074        collaborator.last_activity = Some(OffsetDateTime::now_utc());
1075        Ok(())
1076    }
1077
1078    pub fn projects(&self) -> impl Iterator<Item = (&ProjectId, &Project)> {
1079        self.projects.iter()
1080    }
1081
1082    pub fn read_project(
1083        &self,
1084        project_id: ProjectId,
1085        connection_id: ConnectionId,
1086    ) -> Result<&Project> {
1087        let project = self
1088            .projects
1089            .get(&project_id)
1090            .ok_or_else(|| anyhow!("no such project"))?;
1091        if project.host_connection_id == connection_id
1092            || project.guests.contains_key(&connection_id)
1093        {
1094            Ok(project)
1095        } else {
1096            Err(anyhow!("no such project"))?
1097        }
1098    }
1099
1100    fn write_project(
1101        &mut self,
1102        project_id: ProjectId,
1103        connection_id: ConnectionId,
1104    ) -> Result<&mut Project> {
1105        let project = self
1106            .projects
1107            .get_mut(&project_id)
1108            .ok_or_else(|| anyhow!("no such project"))?;
1109        if project.host_connection_id == connection_id
1110            || project.guests.contains_key(&connection_id)
1111        {
1112            Ok(project)
1113        } else {
1114            Err(anyhow!("no such project"))?
1115        }
1116    }
1117
1118    #[cfg(test)]
1119    pub fn check_invariants(&self) {
1120        for (connection_id, connection) in &self.connections {
1121            for project_id in &connection.projects {
1122                let project = &self.projects.get(project_id).unwrap();
1123                if project.host_connection_id != *connection_id {
1124                    assert!(project.guests.contains_key(connection_id));
1125                }
1126
1127                for (worktree_id, worktree) in project.worktrees.iter() {
1128                    let mut paths = HashMap::default();
1129                    for entry in worktree.entries.values() {
1130                        let prev_entry = paths.insert(&entry.path, entry);
1131                        assert_eq!(
1132                            prev_entry,
1133                            None,
1134                            "worktree {:?}, duplicate path for entries {:?} and {:?}",
1135                            worktree_id,
1136                            prev_entry.unwrap(),
1137                            entry
1138                        );
1139                    }
1140                }
1141            }
1142            for channel_id in &connection.channels {
1143                let channel = self.channels.get(channel_id).unwrap();
1144                assert!(channel.connection_ids.contains(connection_id));
1145            }
1146            assert!(self
1147                .connected_users
1148                .get(&connection.user_id)
1149                .unwrap()
1150                .connection_ids
1151                .contains(connection_id));
1152        }
1153
1154        for (user_id, state) in &self.connected_users {
1155            for connection_id in &state.connection_ids {
1156                assert_eq!(
1157                    self.connections.get(connection_id).unwrap().user_id,
1158                    *user_id
1159                );
1160            }
1161
1162            if let Some(active_call) = state.active_call.as_ref() {
1163                if let Some(active_call_connection_id) = active_call.connection_id {
1164                    assert!(
1165                        state.connection_ids.contains(&active_call_connection_id),
1166                        "call is active on a dead connection"
1167                    );
1168                    assert!(
1169                        state.connection_ids.contains(&active_call_connection_id),
1170                        "call is active on a dead connection"
1171                    );
1172                }
1173            }
1174        }
1175
1176        for (room_id, room) in &self.rooms {
1177            for pending_user_id in &room.pending_participant_user_ids {
1178                assert!(
1179                    self.connected_users
1180                        .contains_key(&UserId::from_proto(*pending_user_id)),
1181                    "call is active on a user that has disconnected"
1182                );
1183            }
1184
1185            for participant in &room.participants {
1186                assert!(
1187                    self.connections
1188                        .contains_key(&ConnectionId(participant.peer_id)),
1189                    "room contains participant that has disconnected"
1190                );
1191
1192                for participant_project in &participant.projects {
1193                    let project = &self.projects[&ProjectId::from_proto(participant_project.id)];
1194                    assert_eq!(
1195                        project.room_id, *room_id,
1196                        "project was shared on a different room"
1197                    );
1198                }
1199            }
1200
1201            assert!(
1202                !room.pending_participant_user_ids.is_empty() || !room.participants.is_empty(),
1203                "room can't be empty"
1204            );
1205        }
1206
1207        for (project_id, project) in &self.projects {
1208            let host_connection = self.connections.get(&project.host_connection_id).unwrap();
1209            assert!(host_connection.projects.contains(project_id));
1210
1211            for guest_connection_id in project.guests.keys() {
1212                let guest_connection = self.connections.get(guest_connection_id).unwrap();
1213                assert!(guest_connection.projects.contains(project_id));
1214            }
1215            assert_eq!(project.active_replica_ids.len(), project.guests.len());
1216            assert_eq!(
1217                project.active_replica_ids,
1218                project
1219                    .guests
1220                    .values()
1221                    .map(|guest| guest.replica_id)
1222                    .collect::<HashSet<_>>(),
1223            );
1224
1225            let room = &self.rooms[&project.room_id];
1226            let room_participant = room
1227                .participants
1228                .iter()
1229                .find(|participant| participant.peer_id == project.host_connection_id.0)
1230                .unwrap();
1231            assert!(
1232                room_participant
1233                    .projects
1234                    .iter()
1235                    .any(|project| project.id == project_id.to_proto()),
1236                "project was not shared in room"
1237            );
1238        }
1239
1240        for (channel_id, channel) in &self.channels {
1241            for connection_id in &channel.connection_ids {
1242                let connection = self.connections.get(connection_id).unwrap();
1243                assert!(connection.channels.contains(channel_id));
1244            }
1245        }
1246    }
1247}
1248
1249impl Project {
1250    fn is_active_since(&self, start_time: OffsetDateTime) -> bool {
1251        self.guests
1252            .values()
1253            .chain([&self.host])
1254            .any(|collaborator| {
1255                collaborator
1256                    .last_activity
1257                    .map_or(false, |active_time| active_time > start_time)
1258            })
1259    }
1260
1261    pub fn guest_connection_ids(&self) -> Vec<ConnectionId> {
1262        self.guests.keys().copied().collect()
1263    }
1264
1265    pub fn connection_ids(&self) -> Vec<ConnectionId> {
1266        self.guests
1267            .keys()
1268            .copied()
1269            .chain(Some(self.host_connection_id))
1270            .collect()
1271    }
1272}
1273
1274impl Channel {
1275    fn connection_ids(&self) -> Vec<ConnectionId> {
1276        self.connection_ids.iter().copied().collect()
1277    }
1278}