db.rs

   1mod access_token;
   2mod contact;
   3mod follower;
   4mod language_server;
   5mod project;
   6mod project_collaborator;
   7mod room;
   8mod room_participant;
   9mod server;
  10mod signup;
  11#[cfg(test)]
  12mod tests;
  13mod user;
  14mod worktree;
  15mod worktree_diagnostic_summary;
  16mod worktree_entry;
  17mod worktree_repository;
  18mod worktree_repository_statuses;
  19
  20use crate::executor::Executor;
  21use crate::{Error, Result};
  22use anyhow::anyhow;
  23use collections::{BTreeMap, HashMap, HashSet};
  24pub use contact::Contact;
  25use dashmap::DashMap;
  26use futures::StreamExt;
  27use hyper::StatusCode;
  28use rand::prelude::StdRng;
  29use rand::{Rng, SeedableRng};
  30use rpc::{proto, ConnectionId};
  31use sea_orm::Condition;
  32pub use sea_orm::ConnectOptions;
  33use sea_orm::{
  34    entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
  35    DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
  36    Statement, TransactionTrait,
  37};
  38use sea_query::{Alias, Expr, OnConflict, Query};
  39use serde::{Deserialize, Serialize};
  40pub use signup::{Invite, NewSignup, WaitlistSummary};
  41use sqlx::migrate::{Migrate, Migration, MigrationSource};
  42use sqlx::Connection;
  43use std::ops::{Deref, DerefMut};
  44use std::path::Path;
  45use std::time::Duration;
  46use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
  47use tokio::sync::{Mutex, OwnedMutexGuard};
  48pub use user::Model as User;
  49
  50pub struct Database {
  51    options: ConnectOptions,
  52    pool: DatabaseConnection,
  53    rooms: DashMap<RoomId, Arc<Mutex<()>>>,
  54    rng: Mutex<StdRng>,
  55    executor: Executor,
  56    #[cfg(test)]
  57    runtime: Option<tokio::runtime::Runtime>,
  58}
  59
  60impl Database {
  61    pub async fn new(options: ConnectOptions, executor: Executor) -> Result<Self> {
  62        Ok(Self {
  63            options: options.clone(),
  64            pool: sea_orm::Database::connect(options).await?,
  65            rooms: DashMap::with_capacity(16384),
  66            rng: Mutex::new(StdRng::seed_from_u64(0)),
  67            executor,
  68            #[cfg(test)]
  69            runtime: None,
  70        })
  71    }
  72
  73    #[cfg(test)]
  74    pub fn reset(&self) {
  75        self.rooms.clear();
  76    }
  77
  78    pub async fn migrate(
  79        &self,
  80        migrations_path: &Path,
  81        ignore_checksum_mismatch: bool,
  82    ) -> anyhow::Result<Vec<(Migration, Duration)>> {
  83        let migrations = MigrationSource::resolve(migrations_path)
  84            .await
  85            .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
  86
  87        let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
  88
  89        connection.ensure_migrations_table().await?;
  90        let applied_migrations: HashMap<_, _> = connection
  91            .list_applied_migrations()
  92            .await?
  93            .into_iter()
  94            .map(|m| (m.version, m))
  95            .collect();
  96
  97        let mut new_migrations = Vec::new();
  98        for migration in migrations {
  99            match applied_migrations.get(&migration.version) {
 100                Some(applied_migration) => {
 101                    if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
 102                    {
 103                        Err(anyhow!(
 104                            "checksum mismatch for applied migration {}",
 105                            migration.description
 106                        ))?;
 107                    }
 108                }
 109                None => {
 110                    let elapsed = connection.apply(&migration).await?;
 111                    new_migrations.push((migration, elapsed));
 112                }
 113            }
 114        }
 115
 116        Ok(new_migrations)
 117    }
 118
 119    pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
 120        self.transaction(|tx| async move {
 121            let server = server::ActiveModel {
 122                environment: ActiveValue::set(environment.into()),
 123                ..Default::default()
 124            }
 125            .insert(&*tx)
 126            .await?;
 127            Ok(server.id)
 128        })
 129        .await
 130    }
 131
 132    pub async fn stale_room_ids(
 133        &self,
 134        environment: &str,
 135        new_server_id: ServerId,
 136    ) -> Result<Vec<RoomId>> {
 137        self.transaction(|tx| async move {
 138            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 139            enum QueryAs {
 140                RoomId,
 141            }
 142
 143            let stale_server_epochs = self
 144                .stale_server_ids(environment, new_server_id, &tx)
 145                .await?;
 146            Ok(room_participant::Entity::find()
 147                .select_only()
 148                .column(room_participant::Column::RoomId)
 149                .distinct()
 150                .filter(
 151                    room_participant::Column::AnsweringConnectionServerId
 152                        .is_in(stale_server_epochs),
 153                )
 154                .into_values::<_, QueryAs>()
 155                .all(&*tx)
 156                .await?)
 157        })
 158        .await
 159    }
 160
 161    pub async fn refresh_room(
 162        &self,
 163        room_id: RoomId,
 164        new_server_id: ServerId,
 165    ) -> Result<RoomGuard<RefreshedRoom>> {
 166        self.room_transaction(room_id, |tx| async move {
 167            let stale_participant_filter = Condition::all()
 168                .add(room_participant::Column::RoomId.eq(room_id))
 169                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
 170                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
 171
 172            let stale_participant_user_ids = room_participant::Entity::find()
 173                .filter(stale_participant_filter.clone())
 174                .all(&*tx)
 175                .await?
 176                .into_iter()
 177                .map(|participant| participant.user_id)
 178                .collect::<Vec<_>>();
 179
 180            // Delete participants who failed to reconnect and cancel their calls.
 181            let mut canceled_calls_to_user_ids = Vec::new();
 182            room_participant::Entity::delete_many()
 183                .filter(stale_participant_filter)
 184                .exec(&*tx)
 185                .await?;
 186            let called_participants = room_participant::Entity::find()
 187                .filter(
 188                    Condition::all()
 189                        .add(
 190                            room_participant::Column::CallingUserId
 191                                .is_in(stale_participant_user_ids.iter().copied()),
 192                        )
 193                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
 194                )
 195                .all(&*tx)
 196                .await?;
 197            room_participant::Entity::delete_many()
 198                .filter(
 199                    room_participant::Column::Id
 200                        .is_in(called_participants.iter().map(|participant| participant.id)),
 201                )
 202                .exec(&*tx)
 203                .await?;
 204            canceled_calls_to_user_ids.extend(
 205                called_participants
 206                    .into_iter()
 207                    .map(|participant| participant.user_id),
 208            );
 209
 210            let room = self.get_room(room_id, &tx).await?;
 211            // Delete the room if it becomes empty.
 212            if room.participants.is_empty() {
 213                project::Entity::delete_many()
 214                    .filter(project::Column::RoomId.eq(room_id))
 215                    .exec(&*tx)
 216                    .await?;
 217                room::Entity::delete_by_id(room_id).exec(&*tx).await?;
 218            }
 219
 220            Ok(RefreshedRoom {
 221                room,
 222                stale_participant_user_ids,
 223                canceled_calls_to_user_ids,
 224            })
 225        })
 226        .await
 227    }
 228
 229    pub async fn delete_stale_servers(
 230        &self,
 231        environment: &str,
 232        new_server_id: ServerId,
 233    ) -> Result<()> {
 234        self.transaction(|tx| async move {
 235            server::Entity::delete_many()
 236                .filter(
 237                    Condition::all()
 238                        .add(server::Column::Environment.eq(environment))
 239                        .add(server::Column::Id.ne(new_server_id)),
 240                )
 241                .exec(&*tx)
 242                .await?;
 243            Ok(())
 244        })
 245        .await
 246    }
 247
 248    async fn stale_server_ids(
 249        &self,
 250        environment: &str,
 251        new_server_id: ServerId,
 252        tx: &DatabaseTransaction,
 253    ) -> Result<Vec<ServerId>> {
 254        let stale_servers = server::Entity::find()
 255            .filter(
 256                Condition::all()
 257                    .add(server::Column::Environment.eq(environment))
 258                    .add(server::Column::Id.ne(new_server_id)),
 259            )
 260            .all(&*tx)
 261            .await?;
 262        Ok(stale_servers.into_iter().map(|server| server.id).collect())
 263    }
 264
 265    // users
 266
 267    pub async fn create_user(
 268        &self,
 269        email_address: &str,
 270        admin: bool,
 271        params: NewUserParams,
 272    ) -> Result<NewUserResult> {
 273        self.transaction(|tx| async {
 274            let tx = tx;
 275            let user = user::Entity::insert(user::ActiveModel {
 276                email_address: ActiveValue::set(Some(email_address.into())),
 277                github_login: ActiveValue::set(params.github_login.clone()),
 278                github_user_id: ActiveValue::set(Some(params.github_user_id)),
 279                admin: ActiveValue::set(admin),
 280                metrics_id: ActiveValue::set(Uuid::new_v4()),
 281                ..Default::default()
 282            })
 283            .on_conflict(
 284                OnConflict::column(user::Column::GithubLogin)
 285                    .update_column(user::Column::GithubLogin)
 286                    .to_owned(),
 287            )
 288            .exec_with_returning(&*tx)
 289            .await?;
 290
 291            Ok(NewUserResult {
 292                user_id: user.id,
 293                metrics_id: user.metrics_id.to_string(),
 294                signup_device_id: None,
 295                inviting_user_id: None,
 296            })
 297        })
 298        .await
 299    }
 300
 301    pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
 302        self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
 303            .await
 304    }
 305
 306    pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
 307        self.transaction(|tx| async {
 308            let tx = tx;
 309            Ok(user::Entity::find()
 310                .filter(user::Column::Id.is_in(ids.iter().copied()))
 311                .all(&*tx)
 312                .await?)
 313        })
 314        .await
 315    }
 316
 317    pub async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>> {
 318        self.transaction(|tx| async move {
 319            Ok(user::Entity::find()
 320                .filter(user::Column::GithubLogin.eq(github_login))
 321                .one(&*tx)
 322                .await?)
 323        })
 324        .await
 325    }
 326
 327    pub async fn get_or_create_user_by_github_account(
 328        &self,
 329        github_login: &str,
 330        github_user_id: Option<i32>,
 331        github_email: Option<&str>,
 332    ) -> Result<Option<User>> {
 333        self.transaction(|tx| async move {
 334            let tx = &*tx;
 335            if let Some(github_user_id) = github_user_id {
 336                if let Some(user_by_github_user_id) = user::Entity::find()
 337                    .filter(user::Column::GithubUserId.eq(github_user_id))
 338                    .one(tx)
 339                    .await?
 340                {
 341                    let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
 342                    user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
 343                    Ok(Some(user_by_github_user_id.update(tx).await?))
 344                } else if let Some(user_by_github_login) = user::Entity::find()
 345                    .filter(user::Column::GithubLogin.eq(github_login))
 346                    .one(tx)
 347                    .await?
 348                {
 349                    let mut user_by_github_login = user_by_github_login.into_active_model();
 350                    user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
 351                    Ok(Some(user_by_github_login.update(tx).await?))
 352                } else {
 353                    let user = user::Entity::insert(user::ActiveModel {
 354                        email_address: ActiveValue::set(github_email.map(|email| email.into())),
 355                        github_login: ActiveValue::set(github_login.into()),
 356                        github_user_id: ActiveValue::set(Some(github_user_id)),
 357                        admin: ActiveValue::set(false),
 358                        invite_count: ActiveValue::set(0),
 359                        invite_code: ActiveValue::set(None),
 360                        metrics_id: ActiveValue::set(Uuid::new_v4()),
 361                        ..Default::default()
 362                    })
 363                    .exec_with_returning(&*tx)
 364                    .await?;
 365                    Ok(Some(user))
 366                }
 367            } else {
 368                Ok(user::Entity::find()
 369                    .filter(user::Column::GithubLogin.eq(github_login))
 370                    .one(tx)
 371                    .await?)
 372            }
 373        })
 374        .await
 375    }
 376
 377    pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
 378        self.transaction(|tx| async move {
 379            Ok(user::Entity::find()
 380                .order_by_asc(user::Column::GithubLogin)
 381                .limit(limit as u64)
 382                .offset(page as u64 * limit as u64)
 383                .all(&*tx)
 384                .await?)
 385        })
 386        .await
 387    }
 388
 389    pub async fn get_users_with_no_invites(
 390        &self,
 391        invited_by_another_user: bool,
 392    ) -> Result<Vec<User>> {
 393        self.transaction(|tx| async move {
 394            Ok(user::Entity::find()
 395                .filter(
 396                    user::Column::InviteCount
 397                        .eq(0)
 398                        .and(if invited_by_another_user {
 399                            user::Column::InviterId.is_not_null()
 400                        } else {
 401                            user::Column::InviterId.is_null()
 402                        }),
 403                )
 404                .all(&*tx)
 405                .await?)
 406        })
 407        .await
 408    }
 409
 410    pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
 411        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 412        enum QueryAs {
 413            MetricsId,
 414        }
 415
 416        self.transaction(|tx| async move {
 417            let metrics_id: Uuid = user::Entity::find_by_id(id)
 418                .select_only()
 419                .column(user::Column::MetricsId)
 420                .into_values::<_, QueryAs>()
 421                .one(&*tx)
 422                .await?
 423                .ok_or_else(|| anyhow!("could not find user"))?;
 424            Ok(metrics_id.to_string())
 425        })
 426        .await
 427    }
 428
 429    pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
 430        self.transaction(|tx| async move {
 431            user::Entity::update_many()
 432                .filter(user::Column::Id.eq(id))
 433                .set(user::ActiveModel {
 434                    admin: ActiveValue::set(is_admin),
 435                    ..Default::default()
 436                })
 437                .exec(&*tx)
 438                .await?;
 439            Ok(())
 440        })
 441        .await
 442    }
 443
 444    pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
 445        self.transaction(|tx| async move {
 446            user::Entity::update_many()
 447                .filter(user::Column::Id.eq(id))
 448                .set(user::ActiveModel {
 449                    connected_once: ActiveValue::set(connected_once),
 450                    ..Default::default()
 451                })
 452                .exec(&*tx)
 453                .await?;
 454            Ok(())
 455        })
 456        .await
 457    }
 458
 459    pub async fn destroy_user(&self, id: UserId) -> Result<()> {
 460        self.transaction(|tx| async move {
 461            access_token::Entity::delete_many()
 462                .filter(access_token::Column::UserId.eq(id))
 463                .exec(&*tx)
 464                .await?;
 465            user::Entity::delete_by_id(id).exec(&*tx).await?;
 466            Ok(())
 467        })
 468        .await
 469    }
 470
 471    // contacts
 472
 473    pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
 474        #[derive(Debug, FromQueryResult)]
 475        struct ContactWithUserBusyStatuses {
 476            user_id_a: UserId,
 477            user_id_b: UserId,
 478            a_to_b: bool,
 479            accepted: bool,
 480            should_notify: bool,
 481            user_a_busy: bool,
 482            user_b_busy: bool,
 483        }
 484
 485        self.transaction(|tx| async move {
 486            let user_a_participant = Alias::new("user_a_participant");
 487            let user_b_participant = Alias::new("user_b_participant");
 488            let mut db_contacts = contact::Entity::find()
 489                .column_as(
 490                    Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
 491                        .is_not_null(),
 492                    "user_a_busy",
 493                )
 494                .column_as(
 495                    Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
 496                        .is_not_null(),
 497                    "user_b_busy",
 498                )
 499                .filter(
 500                    contact::Column::UserIdA
 501                        .eq(user_id)
 502                        .or(contact::Column::UserIdB.eq(user_id)),
 503                )
 504                .join_as(
 505                    JoinType::LeftJoin,
 506                    contact::Relation::UserARoomParticipant.def(),
 507                    user_a_participant,
 508                )
 509                .join_as(
 510                    JoinType::LeftJoin,
 511                    contact::Relation::UserBRoomParticipant.def(),
 512                    user_b_participant,
 513                )
 514                .into_model::<ContactWithUserBusyStatuses>()
 515                .stream(&*tx)
 516                .await?;
 517
 518            let mut contacts = Vec::new();
 519            while let Some(db_contact) = db_contacts.next().await {
 520                let db_contact = db_contact?;
 521                if db_contact.user_id_a == user_id {
 522                    if db_contact.accepted {
 523                        contacts.push(Contact::Accepted {
 524                            user_id: db_contact.user_id_b,
 525                            should_notify: db_contact.should_notify && db_contact.a_to_b,
 526                            busy: db_contact.user_b_busy,
 527                        });
 528                    } else if db_contact.a_to_b {
 529                        contacts.push(Contact::Outgoing {
 530                            user_id: db_contact.user_id_b,
 531                        })
 532                    } else {
 533                        contacts.push(Contact::Incoming {
 534                            user_id: db_contact.user_id_b,
 535                            should_notify: db_contact.should_notify,
 536                        });
 537                    }
 538                } else if db_contact.accepted {
 539                    contacts.push(Contact::Accepted {
 540                        user_id: db_contact.user_id_a,
 541                        should_notify: db_contact.should_notify && !db_contact.a_to_b,
 542                        busy: db_contact.user_a_busy,
 543                    });
 544                } else if db_contact.a_to_b {
 545                    contacts.push(Contact::Incoming {
 546                        user_id: db_contact.user_id_a,
 547                        should_notify: db_contact.should_notify,
 548                    });
 549                } else {
 550                    contacts.push(Contact::Outgoing {
 551                        user_id: db_contact.user_id_a,
 552                    });
 553                }
 554            }
 555
 556            contacts.sort_unstable_by_key(|contact| contact.user_id());
 557
 558            Ok(contacts)
 559        })
 560        .await
 561    }
 562
 563    pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
 564        self.transaction(|tx| async move {
 565            let participant = room_participant::Entity::find()
 566                .filter(room_participant::Column::UserId.eq(user_id))
 567                .one(&*tx)
 568                .await?;
 569            Ok(participant.is_some())
 570        })
 571        .await
 572    }
 573
 574    pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
 575        self.transaction(|tx| async move {
 576            let (id_a, id_b) = if user_id_1 < user_id_2 {
 577                (user_id_1, user_id_2)
 578            } else {
 579                (user_id_2, user_id_1)
 580            };
 581
 582            Ok(contact::Entity::find()
 583                .filter(
 584                    contact::Column::UserIdA
 585                        .eq(id_a)
 586                        .and(contact::Column::UserIdB.eq(id_b))
 587                        .and(contact::Column::Accepted.eq(true)),
 588                )
 589                .one(&*tx)
 590                .await?
 591                .is_some())
 592        })
 593        .await
 594    }
 595
 596    pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
 597        self.transaction(|tx| async move {
 598            let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
 599                (sender_id, receiver_id, true)
 600            } else {
 601                (receiver_id, sender_id, false)
 602            };
 603
 604            let rows_affected = contact::Entity::insert(contact::ActiveModel {
 605                user_id_a: ActiveValue::set(id_a),
 606                user_id_b: ActiveValue::set(id_b),
 607                a_to_b: ActiveValue::set(a_to_b),
 608                accepted: ActiveValue::set(false),
 609                should_notify: ActiveValue::set(true),
 610                ..Default::default()
 611            })
 612            .on_conflict(
 613                OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
 614                    .values([
 615                        (contact::Column::Accepted, true.into()),
 616                        (contact::Column::ShouldNotify, false.into()),
 617                    ])
 618                    .action_and_where(
 619                        contact::Column::Accepted.eq(false).and(
 620                            contact::Column::AToB
 621                                .eq(a_to_b)
 622                                .and(contact::Column::UserIdA.eq(id_b))
 623                                .or(contact::Column::AToB
 624                                    .ne(a_to_b)
 625                                    .and(contact::Column::UserIdA.eq(id_a))),
 626                        ),
 627                    )
 628                    .to_owned(),
 629            )
 630            .exec_without_returning(&*tx)
 631            .await?;
 632
 633            if rows_affected == 1 {
 634                Ok(())
 635            } else {
 636                Err(anyhow!("contact already requested"))?
 637            }
 638        })
 639        .await
 640    }
 641
 642    /// Returns a bool indicating whether the removed contact had originally accepted or not
 643    ///
 644    /// Deletes the contact identified by the requester and responder ids, and then returns
 645    /// whether the deleted contact had originally accepted or was a pending contact request.
 646    ///
 647    /// # Arguments
 648    ///
 649    /// * `requester_id` - The user that initiates this request
 650    /// * `responder_id` - The user that will be removed
 651    pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<bool> {
 652        self.transaction(|tx| async move {
 653            let (id_a, id_b) = if responder_id < requester_id {
 654                (responder_id, requester_id)
 655            } else {
 656                (requester_id, responder_id)
 657            };
 658
 659            let contact = contact::Entity::find()
 660                .filter(
 661                    contact::Column::UserIdA
 662                        .eq(id_a)
 663                        .and(contact::Column::UserIdB.eq(id_b)),
 664                )
 665                .one(&*tx)
 666                .await?
 667                .ok_or_else(|| anyhow!("no such contact"))?;
 668
 669            contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
 670            Ok(contact.accepted)
 671        })
 672        .await
 673    }
 674
 675    pub async fn dismiss_contact_notification(
 676        &self,
 677        user_id: UserId,
 678        contact_user_id: UserId,
 679    ) -> Result<()> {
 680        self.transaction(|tx| async move {
 681            let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
 682                (user_id, contact_user_id, true)
 683            } else {
 684                (contact_user_id, user_id, false)
 685            };
 686
 687            let result = contact::Entity::update_many()
 688                .set(contact::ActiveModel {
 689                    should_notify: ActiveValue::set(false),
 690                    ..Default::default()
 691                })
 692                .filter(
 693                    contact::Column::UserIdA
 694                        .eq(id_a)
 695                        .and(contact::Column::UserIdB.eq(id_b))
 696                        .and(
 697                            contact::Column::AToB
 698                                .eq(a_to_b)
 699                                .and(contact::Column::Accepted.eq(true))
 700                                .or(contact::Column::AToB
 701                                    .ne(a_to_b)
 702                                    .and(contact::Column::Accepted.eq(false))),
 703                        ),
 704                )
 705                .exec(&*tx)
 706                .await?;
 707            if result.rows_affected == 0 {
 708                Err(anyhow!("no such contact request"))?
 709            } else {
 710                Ok(())
 711            }
 712        })
 713        .await
 714    }
 715
 716    pub async fn respond_to_contact_request(
 717        &self,
 718        responder_id: UserId,
 719        requester_id: UserId,
 720        accept: bool,
 721    ) -> Result<()> {
 722        self.transaction(|tx| async move {
 723            let (id_a, id_b, a_to_b) = if responder_id < requester_id {
 724                (responder_id, requester_id, false)
 725            } else {
 726                (requester_id, responder_id, true)
 727            };
 728            let rows_affected = if accept {
 729                let result = contact::Entity::update_many()
 730                    .set(contact::ActiveModel {
 731                        accepted: ActiveValue::set(true),
 732                        should_notify: ActiveValue::set(true),
 733                        ..Default::default()
 734                    })
 735                    .filter(
 736                        contact::Column::UserIdA
 737                            .eq(id_a)
 738                            .and(contact::Column::UserIdB.eq(id_b))
 739                            .and(contact::Column::AToB.eq(a_to_b)),
 740                    )
 741                    .exec(&*tx)
 742                    .await?;
 743                result.rows_affected
 744            } else {
 745                let result = contact::Entity::delete_many()
 746                    .filter(
 747                        contact::Column::UserIdA
 748                            .eq(id_a)
 749                            .and(contact::Column::UserIdB.eq(id_b))
 750                            .and(contact::Column::AToB.eq(a_to_b))
 751                            .and(contact::Column::Accepted.eq(false)),
 752                    )
 753                    .exec(&*tx)
 754                    .await?;
 755
 756                result.rows_affected
 757            };
 758
 759            if rows_affected == 1 {
 760                Ok(())
 761            } else {
 762                Err(anyhow!("no such contact request"))?
 763            }
 764        })
 765        .await
 766    }
 767
 768    pub fn fuzzy_like_string(string: &str) -> String {
 769        let mut result = String::with_capacity(string.len() * 2 + 1);
 770        for c in string.chars() {
 771            if c.is_alphanumeric() {
 772                result.push('%');
 773                result.push(c);
 774            }
 775        }
 776        result.push('%');
 777        result
 778    }
 779
 780    pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
 781        self.transaction(|tx| async {
 782            let tx = tx;
 783            let like_string = Self::fuzzy_like_string(name_query);
 784            let query = "
 785                SELECT users.*
 786                FROM users
 787                WHERE github_login ILIKE $1
 788                ORDER BY github_login <-> $2
 789                LIMIT $3
 790            ";
 791
 792            Ok(user::Entity::find()
 793                .from_raw_sql(Statement::from_sql_and_values(
 794                    self.pool.get_database_backend(),
 795                    query.into(),
 796                    vec![like_string.into(), name_query.into(), limit.into()],
 797                ))
 798                .all(&*tx)
 799                .await?)
 800        })
 801        .await
 802    }
 803
 804    // signups
 805
 806    pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
 807        self.transaction(|tx| async move {
 808            signup::Entity::insert(signup::ActiveModel {
 809                email_address: ActiveValue::set(signup.email_address.clone()),
 810                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 811                email_confirmation_sent: ActiveValue::set(false),
 812                platform_mac: ActiveValue::set(signup.platform_mac),
 813                platform_windows: ActiveValue::set(signup.platform_windows),
 814                platform_linux: ActiveValue::set(signup.platform_linux),
 815                platform_unknown: ActiveValue::set(false),
 816                editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
 817                programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
 818                device_id: ActiveValue::set(signup.device_id.clone()),
 819                added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
 820                ..Default::default()
 821            })
 822            .on_conflict(
 823                OnConflict::column(signup::Column::EmailAddress)
 824                    .update_columns([
 825                        signup::Column::PlatformMac,
 826                        signup::Column::PlatformWindows,
 827                        signup::Column::PlatformLinux,
 828                        signup::Column::EditorFeatures,
 829                        signup::Column::ProgrammingLanguages,
 830                        signup::Column::DeviceId,
 831                        signup::Column::AddedToMailingList,
 832                    ])
 833                    .to_owned(),
 834            )
 835            .exec(&*tx)
 836            .await?;
 837            Ok(())
 838        })
 839        .await
 840    }
 841
 842    pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
 843        self.transaction(|tx| async move {
 844            let signup = signup::Entity::find()
 845                .filter(signup::Column::EmailAddress.eq(email_address))
 846                .one(&*tx)
 847                .await?
 848                .ok_or_else(|| {
 849                    anyhow!("signup with email address {} doesn't exist", email_address)
 850                })?;
 851
 852            Ok(signup)
 853        })
 854        .await
 855    }
 856
 857    pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
 858        self.transaction(|tx| async move {
 859            let query = "
 860                SELECT
 861                    COUNT(*) as count,
 862                    COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
 863                    COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
 864                    COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
 865                    COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
 866                FROM (
 867                    SELECT *
 868                    FROM signups
 869                    WHERE
 870                        NOT email_confirmation_sent
 871                ) AS unsent
 872            ";
 873            Ok(
 874                WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
 875                    self.pool.get_database_backend(),
 876                    query.into(),
 877                    vec![],
 878                ))
 879                .one(&*tx)
 880                .await?
 881                .ok_or_else(|| anyhow!("invalid result"))?,
 882            )
 883        })
 884        .await
 885    }
 886
 887    pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
 888        let emails = invites
 889            .iter()
 890            .map(|s| s.email_address.as_str())
 891            .collect::<Vec<_>>();
 892        self.transaction(|tx| async {
 893            let tx = tx;
 894            signup::Entity::update_many()
 895                .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
 896                .set(signup::ActiveModel {
 897                    email_confirmation_sent: ActiveValue::set(true),
 898                    ..Default::default()
 899                })
 900                .exec(&*tx)
 901                .await?;
 902            Ok(())
 903        })
 904        .await
 905    }
 906
 907    pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
 908        self.transaction(|tx| async move {
 909            Ok(signup::Entity::find()
 910                .select_only()
 911                .column(signup::Column::EmailAddress)
 912                .column(signup::Column::EmailConfirmationCode)
 913                .filter(
 914                    signup::Column::EmailConfirmationSent.eq(false).and(
 915                        signup::Column::PlatformMac
 916                            .eq(true)
 917                            .or(signup::Column::PlatformUnknown.eq(true)),
 918                    ),
 919                )
 920                .order_by_asc(signup::Column::CreatedAt)
 921                .limit(count as u64)
 922                .into_model()
 923                .all(&*tx)
 924                .await?)
 925        })
 926        .await
 927    }
 928
 929    // invite codes
 930
 931    pub async fn create_invite_from_code(
 932        &self,
 933        code: &str,
 934        email_address: &str,
 935        device_id: Option<&str>,
 936        added_to_mailing_list: bool,
 937    ) -> Result<Invite> {
 938        self.transaction(|tx| async move {
 939            let existing_user = user::Entity::find()
 940                .filter(user::Column::EmailAddress.eq(email_address))
 941                .one(&*tx)
 942                .await?;
 943
 944            if existing_user.is_some() {
 945                Err(anyhow!("email address is already in use"))?;
 946            }
 947
 948            let inviting_user_with_invites = match user::Entity::find()
 949                .filter(
 950                    user::Column::InviteCode
 951                        .eq(code)
 952                        .and(user::Column::InviteCount.gt(0)),
 953                )
 954                .one(&*tx)
 955                .await?
 956            {
 957                Some(inviting_user) => inviting_user,
 958                None => {
 959                    return Err(Error::Http(
 960                        StatusCode::UNAUTHORIZED,
 961                        "unable to find an invite code with invites remaining".to_string(),
 962                    ))?
 963                }
 964            };
 965            user::Entity::update_many()
 966                .filter(
 967                    user::Column::Id
 968                        .eq(inviting_user_with_invites.id)
 969                        .and(user::Column::InviteCount.gt(0)),
 970                )
 971                .col_expr(
 972                    user::Column::InviteCount,
 973                    Expr::col(user::Column::InviteCount).sub(1),
 974                )
 975                .exec(&*tx)
 976                .await?;
 977
 978            let signup = signup::Entity::insert(signup::ActiveModel {
 979                email_address: ActiveValue::set(email_address.into()),
 980                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 981                email_confirmation_sent: ActiveValue::set(false),
 982                inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
 983                platform_linux: ActiveValue::set(false),
 984                platform_mac: ActiveValue::set(false),
 985                platform_windows: ActiveValue::set(false),
 986                platform_unknown: ActiveValue::set(true),
 987                device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
 988                added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
 989                ..Default::default()
 990            })
 991            .on_conflict(
 992                OnConflict::column(signup::Column::EmailAddress)
 993                    .update_column(signup::Column::InvitingUserId)
 994                    .to_owned(),
 995            )
 996            .exec_with_returning(&*tx)
 997            .await?;
 998
 999            Ok(Invite {
1000                email_address: signup.email_address,
1001                email_confirmation_code: signup.email_confirmation_code,
1002            })
1003        })
1004        .await
1005    }
1006
1007    pub async fn create_user_from_invite(
1008        &self,
1009        invite: &Invite,
1010        user: NewUserParams,
1011    ) -> Result<Option<NewUserResult>> {
1012        self.transaction(|tx| async {
1013            let tx = tx;
1014            let signup = signup::Entity::find()
1015                .filter(
1016                    signup::Column::EmailAddress
1017                        .eq(invite.email_address.as_str())
1018                        .and(
1019                            signup::Column::EmailConfirmationCode
1020                                .eq(invite.email_confirmation_code.as_str()),
1021                        ),
1022                )
1023                .one(&*tx)
1024                .await?
1025                .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
1026
1027            if signup.user_id.is_some() {
1028                return Ok(None);
1029            }
1030
1031            let user = user::Entity::insert(user::ActiveModel {
1032                email_address: ActiveValue::set(Some(invite.email_address.clone())),
1033                github_login: ActiveValue::set(user.github_login.clone()),
1034                github_user_id: ActiveValue::set(Some(user.github_user_id)),
1035                admin: ActiveValue::set(false),
1036                invite_count: ActiveValue::set(user.invite_count),
1037                invite_code: ActiveValue::set(Some(random_invite_code())),
1038                metrics_id: ActiveValue::set(Uuid::new_v4()),
1039                ..Default::default()
1040            })
1041            .on_conflict(
1042                OnConflict::column(user::Column::GithubLogin)
1043                    .update_columns([
1044                        user::Column::EmailAddress,
1045                        user::Column::GithubUserId,
1046                        user::Column::Admin,
1047                    ])
1048                    .to_owned(),
1049            )
1050            .exec_with_returning(&*tx)
1051            .await?;
1052
1053            let mut signup = signup.into_active_model();
1054            signup.user_id = ActiveValue::set(Some(user.id));
1055            let signup = signup.update(&*tx).await?;
1056
1057            if let Some(inviting_user_id) = signup.inviting_user_id {
1058                let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1059                    (inviting_user_id, user.id, true)
1060                } else {
1061                    (user.id, inviting_user_id, false)
1062                };
1063
1064                contact::Entity::insert(contact::ActiveModel {
1065                    user_id_a: ActiveValue::set(user_id_a),
1066                    user_id_b: ActiveValue::set(user_id_b),
1067                    a_to_b: ActiveValue::set(a_to_b),
1068                    should_notify: ActiveValue::set(true),
1069                    accepted: ActiveValue::set(true),
1070                    ..Default::default()
1071                })
1072                .on_conflict(OnConflict::new().do_nothing().to_owned())
1073                .exec_without_returning(&*tx)
1074                .await?;
1075            }
1076
1077            Ok(Some(NewUserResult {
1078                user_id: user.id,
1079                metrics_id: user.metrics_id.to_string(),
1080                inviting_user_id: signup.inviting_user_id,
1081                signup_device_id: signup.device_id,
1082            }))
1083        })
1084        .await
1085    }
1086
1087    pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1088        self.transaction(|tx| async move {
1089            if count > 0 {
1090                user::Entity::update_many()
1091                    .filter(
1092                        user::Column::Id
1093                            .eq(id)
1094                            .and(user::Column::InviteCode.is_null()),
1095                    )
1096                    .set(user::ActiveModel {
1097                        invite_code: ActiveValue::set(Some(random_invite_code())),
1098                        ..Default::default()
1099                    })
1100                    .exec(&*tx)
1101                    .await?;
1102            }
1103
1104            user::Entity::update_many()
1105                .filter(user::Column::Id.eq(id))
1106                .set(user::ActiveModel {
1107                    invite_count: ActiveValue::set(count),
1108                    ..Default::default()
1109                })
1110                .exec(&*tx)
1111                .await?;
1112            Ok(())
1113        })
1114        .await
1115    }
1116
1117    pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1118        self.transaction(|tx| async move {
1119            match user::Entity::find_by_id(id).one(&*tx).await? {
1120                Some(user) if user.invite_code.is_some() => {
1121                    Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1122                }
1123                _ => Ok(None),
1124            }
1125        })
1126        .await
1127    }
1128
1129    pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1130        self.transaction(|tx| async move {
1131            user::Entity::find()
1132                .filter(user::Column::InviteCode.eq(code))
1133                .one(&*tx)
1134                .await?
1135                .ok_or_else(|| {
1136                    Error::Http(
1137                        StatusCode::NOT_FOUND,
1138                        "that invite code does not exist".to_string(),
1139                    )
1140                })
1141        })
1142        .await
1143    }
1144
1145    // rooms
1146
1147    pub async fn incoming_call_for_user(
1148        &self,
1149        user_id: UserId,
1150    ) -> Result<Option<proto::IncomingCall>> {
1151        self.transaction(|tx| async move {
1152            let pending_participant = room_participant::Entity::find()
1153                .filter(
1154                    room_participant::Column::UserId
1155                        .eq(user_id)
1156                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1157                )
1158                .one(&*tx)
1159                .await?;
1160
1161            if let Some(pending_participant) = pending_participant {
1162                let room = self.get_room(pending_participant.room_id, &tx).await?;
1163                Ok(Self::build_incoming_call(&room, user_id))
1164            } else {
1165                Ok(None)
1166            }
1167        })
1168        .await
1169    }
1170
1171    pub async fn create_room(
1172        &self,
1173        user_id: UserId,
1174        connection: ConnectionId,
1175        live_kit_room: &str,
1176    ) -> Result<proto::Room> {
1177        self.transaction(|tx| async move {
1178            let room = room::ActiveModel {
1179                live_kit_room: ActiveValue::set(live_kit_room.into()),
1180                ..Default::default()
1181            }
1182            .insert(&*tx)
1183            .await?;
1184            room_participant::ActiveModel {
1185                room_id: ActiveValue::set(room.id),
1186                user_id: ActiveValue::set(user_id),
1187                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1188                answering_connection_server_id: ActiveValue::set(Some(ServerId(
1189                    connection.owner_id as i32,
1190                ))),
1191                answering_connection_lost: ActiveValue::set(false),
1192                calling_user_id: ActiveValue::set(user_id),
1193                calling_connection_id: ActiveValue::set(connection.id as i32),
1194                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1195                    connection.owner_id as i32,
1196                ))),
1197                ..Default::default()
1198            }
1199            .insert(&*tx)
1200            .await?;
1201
1202            let room = self.get_room(room.id, &tx).await?;
1203            Ok(room)
1204        })
1205        .await
1206    }
1207
1208    pub async fn call(
1209        &self,
1210        room_id: RoomId,
1211        calling_user_id: UserId,
1212        calling_connection: ConnectionId,
1213        called_user_id: UserId,
1214        initial_project_id: Option<ProjectId>,
1215    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1216        self.room_transaction(room_id, |tx| async move {
1217            room_participant::ActiveModel {
1218                room_id: ActiveValue::set(room_id),
1219                user_id: ActiveValue::set(called_user_id),
1220                answering_connection_lost: ActiveValue::set(false),
1221                calling_user_id: ActiveValue::set(calling_user_id),
1222                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1223                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1224                    calling_connection.owner_id as i32,
1225                ))),
1226                initial_project_id: ActiveValue::set(initial_project_id),
1227                ..Default::default()
1228            }
1229            .insert(&*tx)
1230            .await?;
1231
1232            let room = self.get_room(room_id, &tx).await?;
1233            let incoming_call = Self::build_incoming_call(&room, called_user_id)
1234                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1235            Ok((room, incoming_call))
1236        })
1237        .await
1238    }
1239
1240    pub async fn call_failed(
1241        &self,
1242        room_id: RoomId,
1243        called_user_id: UserId,
1244    ) -> Result<RoomGuard<proto::Room>> {
1245        self.room_transaction(room_id, |tx| async move {
1246            room_participant::Entity::delete_many()
1247                .filter(
1248                    room_participant::Column::RoomId
1249                        .eq(room_id)
1250                        .and(room_participant::Column::UserId.eq(called_user_id)),
1251                )
1252                .exec(&*tx)
1253                .await?;
1254            let room = self.get_room(room_id, &tx).await?;
1255            Ok(room)
1256        })
1257        .await
1258    }
1259
1260    pub async fn decline_call(
1261        &self,
1262        expected_room_id: Option<RoomId>,
1263        user_id: UserId,
1264    ) -> Result<Option<RoomGuard<proto::Room>>> {
1265        self.optional_room_transaction(|tx| async move {
1266            let mut filter = Condition::all()
1267                .add(room_participant::Column::UserId.eq(user_id))
1268                .add(room_participant::Column::AnsweringConnectionId.is_null());
1269            if let Some(room_id) = expected_room_id {
1270                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1271            }
1272            let participant = room_participant::Entity::find()
1273                .filter(filter)
1274                .one(&*tx)
1275                .await?;
1276
1277            let participant = if let Some(participant) = participant {
1278                participant
1279            } else if expected_room_id.is_some() {
1280                return Err(anyhow!("could not find call to decline"))?;
1281            } else {
1282                return Ok(None);
1283            };
1284
1285            let room_id = participant.room_id;
1286            room_participant::Entity::delete(participant.into_active_model())
1287                .exec(&*tx)
1288                .await?;
1289
1290            let room = self.get_room(room_id, &tx).await?;
1291            Ok(Some((room_id, room)))
1292        })
1293        .await
1294    }
1295
1296    pub async fn cancel_call(
1297        &self,
1298        room_id: RoomId,
1299        calling_connection: ConnectionId,
1300        called_user_id: UserId,
1301    ) -> Result<RoomGuard<proto::Room>> {
1302        self.room_transaction(room_id, |tx| async move {
1303            let participant = room_participant::Entity::find()
1304                .filter(
1305                    Condition::all()
1306                        .add(room_participant::Column::UserId.eq(called_user_id))
1307                        .add(room_participant::Column::RoomId.eq(room_id))
1308                        .add(
1309                            room_participant::Column::CallingConnectionId
1310                                .eq(calling_connection.id as i32),
1311                        )
1312                        .add(
1313                            room_participant::Column::CallingConnectionServerId
1314                                .eq(calling_connection.owner_id as i32),
1315                        )
1316                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1317                )
1318                .one(&*tx)
1319                .await?
1320                .ok_or_else(|| anyhow!("no call to cancel"))?;
1321
1322            room_participant::Entity::delete(participant.into_active_model())
1323                .exec(&*tx)
1324                .await?;
1325
1326            let room = self.get_room(room_id, &tx).await?;
1327            Ok(room)
1328        })
1329        .await
1330    }
1331
1332    pub async fn join_room(
1333        &self,
1334        room_id: RoomId,
1335        user_id: UserId,
1336        connection: ConnectionId,
1337    ) -> Result<RoomGuard<proto::Room>> {
1338        self.room_transaction(room_id, |tx| async move {
1339            let result = room_participant::Entity::update_many()
1340                .filter(
1341                    Condition::all()
1342                        .add(room_participant::Column::RoomId.eq(room_id))
1343                        .add(room_participant::Column::UserId.eq(user_id))
1344                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1345                )
1346                .set(room_participant::ActiveModel {
1347                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1348                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1349                        connection.owner_id as i32,
1350                    ))),
1351                    answering_connection_lost: ActiveValue::set(false),
1352                    ..Default::default()
1353                })
1354                .exec(&*tx)
1355                .await?;
1356            if result.rows_affected == 0 {
1357                Err(anyhow!("room does not exist or was already joined"))?
1358            } else {
1359                let room = self.get_room(room_id, &tx).await?;
1360                Ok(room)
1361            }
1362        })
1363        .await
1364    }
1365
1366    pub async fn rejoin_room(
1367        &self,
1368        rejoin_room: proto::RejoinRoom,
1369        user_id: UserId,
1370        connection: ConnectionId,
1371    ) -> Result<RoomGuard<RejoinedRoom>> {
1372        let room_id = RoomId::from_proto(rejoin_room.id);
1373        self.room_transaction(room_id, |tx| async {
1374            let tx = tx;
1375            let participant_update = room_participant::Entity::update_many()
1376                .filter(
1377                    Condition::all()
1378                        .add(room_participant::Column::RoomId.eq(room_id))
1379                        .add(room_participant::Column::UserId.eq(user_id))
1380                        .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1381                        .add(
1382                            Condition::any()
1383                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1384                                .add(
1385                                    room_participant::Column::AnsweringConnectionServerId
1386                                        .ne(connection.owner_id as i32),
1387                                ),
1388                        ),
1389                )
1390                .set(room_participant::ActiveModel {
1391                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1392                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1393                        connection.owner_id as i32,
1394                    ))),
1395                    answering_connection_lost: ActiveValue::set(false),
1396                    ..Default::default()
1397                })
1398                .exec(&*tx)
1399                .await?;
1400            if participant_update.rows_affected == 0 {
1401                return Err(anyhow!("room does not exist or was already joined"))?;
1402            }
1403
1404            let mut reshared_projects = Vec::new();
1405            for reshared_project in &rejoin_room.reshared_projects {
1406                let project_id = ProjectId::from_proto(reshared_project.project_id);
1407                let project = project::Entity::find_by_id(project_id)
1408                    .one(&*tx)
1409                    .await?
1410                    .ok_or_else(|| anyhow!("project does not exist"))?;
1411                if project.host_user_id != user_id {
1412                    return Err(anyhow!("no such project"))?;
1413                }
1414
1415                let mut collaborators = project
1416                    .find_related(project_collaborator::Entity)
1417                    .all(&*tx)
1418                    .await?;
1419                let host_ix = collaborators
1420                    .iter()
1421                    .position(|collaborator| {
1422                        collaborator.user_id == user_id && collaborator.is_host
1423                    })
1424                    .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1425                let host = collaborators.swap_remove(host_ix);
1426                let old_connection_id = host.connection();
1427
1428                project::Entity::update(project::ActiveModel {
1429                    host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1430                    host_connection_server_id: ActiveValue::set(Some(ServerId(
1431                        connection.owner_id as i32,
1432                    ))),
1433                    ..project.into_active_model()
1434                })
1435                .exec(&*tx)
1436                .await?;
1437                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1438                    connection_id: ActiveValue::set(connection.id as i32),
1439                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1440                    ..host.into_active_model()
1441                })
1442                .exec(&*tx)
1443                .await?;
1444
1445                self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1446                    .await?;
1447
1448                reshared_projects.push(ResharedProject {
1449                    id: project_id,
1450                    old_connection_id,
1451                    collaborators: collaborators
1452                        .iter()
1453                        .map(|collaborator| ProjectCollaborator {
1454                            connection_id: collaborator.connection(),
1455                            user_id: collaborator.user_id,
1456                            replica_id: collaborator.replica_id,
1457                            is_host: collaborator.is_host,
1458                        })
1459                        .collect(),
1460                    worktrees: reshared_project.worktrees.clone(),
1461                });
1462            }
1463
1464            project::Entity::delete_many()
1465                .filter(
1466                    Condition::all()
1467                        .add(project::Column::RoomId.eq(room_id))
1468                        .add(project::Column::HostUserId.eq(user_id))
1469                        .add(
1470                            project::Column::Id
1471                                .is_not_in(reshared_projects.iter().map(|project| project.id)),
1472                        ),
1473                )
1474                .exec(&*tx)
1475                .await?;
1476
1477            let mut rejoined_projects = Vec::new();
1478            for rejoined_project in &rejoin_room.rejoined_projects {
1479                let project_id = ProjectId::from_proto(rejoined_project.id);
1480                let Some(project) = project::Entity::find_by_id(project_id)
1481                    .one(&*tx)
1482                    .await? else { continue };
1483
1484                let mut worktrees = Vec::new();
1485                let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1486                for db_worktree in db_worktrees {
1487                    let mut worktree = RejoinedWorktree {
1488                        id: db_worktree.id as u64,
1489                        abs_path: db_worktree.abs_path,
1490                        root_name: db_worktree.root_name,
1491                        visible: db_worktree.visible,
1492                        updated_entries: Default::default(),
1493                        removed_entries: Default::default(),
1494                        updated_repositories: Default::default(),
1495                        removed_repositories: Default::default(),
1496                        diagnostic_summaries: Default::default(),
1497                        scan_id: db_worktree.scan_id as u64,
1498                        completed_scan_id: db_worktree.completed_scan_id as u64,
1499                    };
1500
1501                    let rejoined_worktree = rejoined_project
1502                        .worktrees
1503                        .iter()
1504                        .find(|worktree| worktree.id == db_worktree.id as u64);
1505
1506                    // File entries
1507                    {
1508                        let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1509                            worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1510                        } else {
1511                            worktree_entry::Column::IsDeleted.eq(false)
1512                        };
1513
1514                        let mut db_entries = worktree_entry::Entity::find()
1515                            .filter(
1516                                Condition::all()
1517                                    .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1518                                    .add(entry_filter),
1519                            )
1520                            .stream(&*tx)
1521                            .await?;
1522
1523                        while let Some(db_entry) = db_entries.next().await {
1524                            let db_entry = db_entry?;
1525                            if db_entry.is_deleted {
1526                                worktree.removed_entries.push(db_entry.id as u64);
1527                            } else {
1528                                worktree.updated_entries.push(proto::Entry {
1529                                    id: db_entry.id as u64,
1530                                    is_dir: db_entry.is_dir,
1531                                    path: db_entry.path,
1532                                    inode: db_entry.inode as u64,
1533                                    mtime: Some(proto::Timestamp {
1534                                        seconds: db_entry.mtime_seconds as u64,
1535                                        nanos: db_entry.mtime_nanos as u32,
1536                                    }),
1537                                    is_symlink: db_entry.is_symlink,
1538                                    is_ignored: db_entry.is_ignored,
1539                                });
1540                            }
1541                        }
1542                    }
1543
1544                    // Repository Entries
1545                    {
1546                        let repository_entry_filter =
1547                            if let Some(rejoined_worktree) = rejoined_worktree {
1548                                worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
1549                            } else {
1550                                worktree_repository::Column::IsDeleted.eq(false)
1551                            };
1552
1553                        let mut db_repositories = worktree_repository::Entity::find()
1554                            .filter(
1555                                Condition::all()
1556                                    .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
1557                                    .add(repository_entry_filter),
1558                            )
1559                            .stream(&*tx)
1560                            .await?;
1561
1562                        while let Some(db_repository) = db_repositories.next().await {
1563                            let db_repository = db_repository?;
1564                            if db_repository.is_deleted {
1565                                worktree
1566                                    .removed_repositories
1567                                    .push(db_repository.work_directory_id as u64);
1568                            } else {
1569                                worktree.updated_repositories.push(proto::RepositoryEntry {
1570                                    work_directory_id: db_repository.work_directory_id as u64,
1571                                    branch: db_repository.branch,
1572                                    removed_worktree_repo_paths: Default::default(),
1573                                    updated_worktree_statuses: Default::default(),
1574                                });
1575                            }
1576                        }
1577                    }
1578
1579                    // Repository Status Entries
1580                    for repository in worktree.updated_repositories.iter_mut() {
1581                        let repository_status_entry_filter =
1582                            if let Some(rejoined_worktree) = rejoined_worktree {
1583                                worktree_repository_statuses::Column::ScanId
1584                                    .gt(rejoined_worktree.scan_id)
1585                            } else {
1586                                worktree_repository_statuses::Column::IsDeleted.eq(false)
1587                            };
1588
1589                        let mut db_repository_statuses =
1590                            worktree_repository_statuses::Entity::find()
1591                                .filter(
1592                                    Condition::all()
1593                                        .add(
1594                                            worktree_repository_statuses::Column::WorktreeId
1595                                                .eq(worktree.id),
1596                                        )
1597                                        .add(
1598                                            worktree_repository_statuses::Column::WorkDirectoryId
1599                                                .eq(repository.work_directory_id),
1600                                        )
1601                                        .add(repository_status_entry_filter),
1602                                )
1603                                .stream(&*tx)
1604                                .await?;
1605
1606                        while let Some(db_status_entry) = db_repository_statuses.next().await {
1607                            let db_status_entry = db_status_entry?;
1608                            if db_status_entry.is_deleted {
1609                                repository
1610                                    .removed_worktree_repo_paths
1611                                    .push(db_status_entry.repo_path);
1612                            } else {
1613                                repository
1614                                    .updated_worktree_statuses
1615                                    .push(proto::StatusEntry {
1616                                        repo_path: db_status_entry.repo_path,
1617                                        status: db_status_entry.status as i32,
1618                                    });
1619                            }
1620                        }
1621                    }
1622
1623                    worktrees.push(worktree);
1624                }
1625
1626                let language_servers = project
1627                    .find_related(language_server::Entity)
1628                    .all(&*tx)
1629                    .await?
1630                    .into_iter()
1631                    .map(|language_server| proto::LanguageServer {
1632                        id: language_server.id as u64,
1633                        name: language_server.name,
1634                    })
1635                    .collect::<Vec<_>>();
1636
1637                let mut collaborators = project
1638                    .find_related(project_collaborator::Entity)
1639                    .all(&*tx)
1640                    .await?;
1641                let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1642                    .iter()
1643                    .position(|collaborator| collaborator.user_id == user_id)
1644                {
1645                    collaborators.swap_remove(self_collaborator_ix)
1646                } else {
1647                    continue;
1648                };
1649                let old_connection_id = self_collaborator.connection();
1650                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1651                    connection_id: ActiveValue::set(connection.id as i32),
1652                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1653                    ..self_collaborator.into_active_model()
1654                })
1655                .exec(&*tx)
1656                .await?;
1657
1658                let collaborators = collaborators
1659                    .into_iter()
1660                    .map(|collaborator| ProjectCollaborator {
1661                        connection_id: collaborator.connection(),
1662                        user_id: collaborator.user_id,
1663                        replica_id: collaborator.replica_id,
1664                        is_host: collaborator.is_host,
1665                    })
1666                    .collect::<Vec<_>>();
1667
1668                rejoined_projects.push(RejoinedProject {
1669                    id: project_id,
1670                    old_connection_id,
1671                    collaborators,
1672                    worktrees,
1673                    language_servers,
1674                });
1675            }
1676
1677            let room = self.get_room(room_id, &tx).await?;
1678            Ok(RejoinedRoom {
1679                room,
1680                rejoined_projects,
1681                reshared_projects,
1682            })
1683        })
1684        .await
1685    }
1686
1687    pub async fn leave_room(
1688        &self,
1689        connection: ConnectionId,
1690    ) -> Result<Option<RoomGuard<LeftRoom>>> {
1691        self.optional_room_transaction(|tx| async move {
1692            let leaving_participant = room_participant::Entity::find()
1693                .filter(
1694                    Condition::all()
1695                        .add(
1696                            room_participant::Column::AnsweringConnectionId
1697                                .eq(connection.id as i32),
1698                        )
1699                        .add(
1700                            room_participant::Column::AnsweringConnectionServerId
1701                                .eq(connection.owner_id as i32),
1702                        ),
1703                )
1704                .one(&*tx)
1705                .await?;
1706
1707            if let Some(leaving_participant) = leaving_participant {
1708                // Leave room.
1709                let room_id = leaving_participant.room_id;
1710                room_participant::Entity::delete_by_id(leaving_participant.id)
1711                    .exec(&*tx)
1712                    .await?;
1713
1714                // Cancel pending calls initiated by the leaving user.
1715                let called_participants = room_participant::Entity::find()
1716                    .filter(
1717                        Condition::all()
1718                            .add(
1719                                room_participant::Column::CallingUserId
1720                                    .eq(leaving_participant.user_id),
1721                            )
1722                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
1723                    )
1724                    .all(&*tx)
1725                    .await?;
1726                room_participant::Entity::delete_many()
1727                    .filter(
1728                        room_participant::Column::Id
1729                            .is_in(called_participants.iter().map(|participant| participant.id)),
1730                    )
1731                    .exec(&*tx)
1732                    .await?;
1733                let canceled_calls_to_user_ids = called_participants
1734                    .into_iter()
1735                    .map(|participant| participant.user_id)
1736                    .collect();
1737
1738                // Detect left projects.
1739                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1740                enum QueryProjectIds {
1741                    ProjectId,
1742                }
1743                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1744                    .select_only()
1745                    .column_as(
1746                        project_collaborator::Column::ProjectId,
1747                        QueryProjectIds::ProjectId,
1748                    )
1749                    .filter(
1750                        Condition::all()
1751                            .add(
1752                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1753                            )
1754                            .add(
1755                                project_collaborator::Column::ConnectionServerId
1756                                    .eq(connection.owner_id as i32),
1757                            ),
1758                    )
1759                    .into_values::<_, QueryProjectIds>()
1760                    .all(&*tx)
1761                    .await?;
1762                let mut left_projects = HashMap::default();
1763                let mut collaborators = project_collaborator::Entity::find()
1764                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1765                    .stream(&*tx)
1766                    .await?;
1767                while let Some(collaborator) = collaborators.next().await {
1768                    let collaborator = collaborator?;
1769                    let left_project =
1770                        left_projects
1771                            .entry(collaborator.project_id)
1772                            .or_insert(LeftProject {
1773                                id: collaborator.project_id,
1774                                host_user_id: Default::default(),
1775                                connection_ids: Default::default(),
1776                                host_connection_id: Default::default(),
1777                            });
1778
1779                    let collaborator_connection_id = collaborator.connection();
1780                    if collaborator_connection_id != connection {
1781                        left_project.connection_ids.push(collaborator_connection_id);
1782                    }
1783
1784                    if collaborator.is_host {
1785                        left_project.host_user_id = collaborator.user_id;
1786                        left_project.host_connection_id = collaborator_connection_id;
1787                    }
1788                }
1789                drop(collaborators);
1790
1791                // Leave projects.
1792                project_collaborator::Entity::delete_many()
1793                    .filter(
1794                        Condition::all()
1795                            .add(
1796                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1797                            )
1798                            .add(
1799                                project_collaborator::Column::ConnectionServerId
1800                                    .eq(connection.owner_id as i32),
1801                            ),
1802                    )
1803                    .exec(&*tx)
1804                    .await?;
1805
1806                // Unshare projects.
1807                project::Entity::delete_many()
1808                    .filter(
1809                        Condition::all()
1810                            .add(project::Column::RoomId.eq(room_id))
1811                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
1812                            .add(
1813                                project::Column::HostConnectionServerId
1814                                    .eq(connection.owner_id as i32),
1815                            ),
1816                    )
1817                    .exec(&*tx)
1818                    .await?;
1819
1820                let room = self.get_room(room_id, &tx).await?;
1821                if room.participants.is_empty() {
1822                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
1823                }
1824
1825                let left_room = LeftRoom {
1826                    room,
1827                    left_projects,
1828                    canceled_calls_to_user_ids,
1829                };
1830
1831                if left_room.room.participants.is_empty() {
1832                    self.rooms.remove(&room_id);
1833                }
1834
1835                Ok(Some((room_id, left_room)))
1836            } else {
1837                Ok(None)
1838            }
1839        })
1840        .await
1841    }
1842
1843    pub async fn follow(
1844        &self,
1845        project_id: ProjectId,
1846        leader_connection: ConnectionId,
1847        follower_connection: ConnectionId,
1848    ) -> Result<RoomGuard<proto::Room>> {
1849        let room_id = self.room_id_for_project(project_id).await?;
1850        self.room_transaction(room_id, |tx| async move {
1851            follower::ActiveModel {
1852                room_id: ActiveValue::set(room_id),
1853                project_id: ActiveValue::set(project_id),
1854                leader_connection_server_id: ActiveValue::set(ServerId(
1855                    leader_connection.owner_id as i32,
1856                )),
1857                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1858                follower_connection_server_id: ActiveValue::set(ServerId(
1859                    follower_connection.owner_id as i32,
1860                )),
1861                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1862                ..Default::default()
1863            }
1864            .insert(&*tx)
1865            .await?;
1866
1867            let room = self.get_room(room_id, &*tx).await?;
1868            Ok(room)
1869        })
1870        .await
1871    }
1872
1873    pub async fn unfollow(
1874        &self,
1875        project_id: ProjectId,
1876        leader_connection: ConnectionId,
1877        follower_connection: ConnectionId,
1878    ) -> Result<RoomGuard<proto::Room>> {
1879        let room_id = self.room_id_for_project(project_id).await?;
1880        self.room_transaction(room_id, |tx| async move {
1881            follower::Entity::delete_many()
1882                .filter(
1883                    Condition::all()
1884                        .add(follower::Column::ProjectId.eq(project_id))
1885                        .add(
1886                            follower::Column::LeaderConnectionServerId
1887                                .eq(leader_connection.owner_id),
1888                        )
1889                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1890                        .add(
1891                            follower::Column::FollowerConnectionServerId
1892                                .eq(follower_connection.owner_id),
1893                        )
1894                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1895                )
1896                .exec(&*tx)
1897                .await?;
1898
1899            let room = self.get_room(room_id, &*tx).await?;
1900            Ok(room)
1901        })
1902        .await
1903    }
1904
1905    pub async fn update_room_participant_location(
1906        &self,
1907        room_id: RoomId,
1908        connection: ConnectionId,
1909        location: proto::ParticipantLocation,
1910    ) -> Result<RoomGuard<proto::Room>> {
1911        self.room_transaction(room_id, |tx| async {
1912            let tx = tx;
1913            let location_kind;
1914            let location_project_id;
1915            match location
1916                .variant
1917                .as_ref()
1918                .ok_or_else(|| anyhow!("invalid location"))?
1919            {
1920                proto::participant_location::Variant::SharedProject(project) => {
1921                    location_kind = 0;
1922                    location_project_id = Some(ProjectId::from_proto(project.id));
1923                }
1924                proto::participant_location::Variant::UnsharedProject(_) => {
1925                    location_kind = 1;
1926                    location_project_id = None;
1927                }
1928                proto::participant_location::Variant::External(_) => {
1929                    location_kind = 2;
1930                    location_project_id = None;
1931                }
1932            }
1933
1934            let result = room_participant::Entity::update_many()
1935                .filter(
1936                    Condition::all()
1937                        .add(room_participant::Column::RoomId.eq(room_id))
1938                        .add(
1939                            room_participant::Column::AnsweringConnectionId
1940                                .eq(connection.id as i32),
1941                        )
1942                        .add(
1943                            room_participant::Column::AnsweringConnectionServerId
1944                                .eq(connection.owner_id as i32),
1945                        ),
1946                )
1947                .set(room_participant::ActiveModel {
1948                    location_kind: ActiveValue::set(Some(location_kind)),
1949                    location_project_id: ActiveValue::set(location_project_id),
1950                    ..Default::default()
1951                })
1952                .exec(&*tx)
1953                .await?;
1954
1955            if result.rows_affected == 1 {
1956                let room = self.get_room(room_id, &tx).await?;
1957                Ok(room)
1958            } else {
1959                Err(anyhow!("could not update room participant location"))?
1960            }
1961        })
1962        .await
1963    }
1964
1965    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1966        self.transaction(|tx| async move {
1967            let participant = room_participant::Entity::find()
1968                .filter(
1969                    Condition::all()
1970                        .add(
1971                            room_participant::Column::AnsweringConnectionId
1972                                .eq(connection.id as i32),
1973                        )
1974                        .add(
1975                            room_participant::Column::AnsweringConnectionServerId
1976                                .eq(connection.owner_id as i32),
1977                        ),
1978                )
1979                .one(&*tx)
1980                .await?
1981                .ok_or_else(|| anyhow!("not a participant in any room"))?;
1982
1983            room_participant::Entity::update(room_participant::ActiveModel {
1984                answering_connection_lost: ActiveValue::set(true),
1985                ..participant.into_active_model()
1986            })
1987            .exec(&*tx)
1988            .await?;
1989
1990            Ok(())
1991        })
1992        .await
1993    }
1994
1995    fn build_incoming_call(
1996        room: &proto::Room,
1997        called_user_id: UserId,
1998    ) -> Option<proto::IncomingCall> {
1999        let pending_participant = room
2000            .pending_participants
2001            .iter()
2002            .find(|participant| participant.user_id == called_user_id.to_proto())?;
2003
2004        Some(proto::IncomingCall {
2005            room_id: room.id,
2006            calling_user_id: pending_participant.calling_user_id,
2007            participant_user_ids: room
2008                .participants
2009                .iter()
2010                .map(|participant| participant.user_id)
2011                .collect(),
2012            initial_project: room.participants.iter().find_map(|participant| {
2013                let initial_project_id = pending_participant.initial_project_id?;
2014                participant
2015                    .projects
2016                    .iter()
2017                    .find(|project| project.id == initial_project_id)
2018                    .cloned()
2019            }),
2020        })
2021    }
2022
2023    async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
2024        let db_room = room::Entity::find_by_id(room_id)
2025            .one(tx)
2026            .await?
2027            .ok_or_else(|| anyhow!("could not find room"))?;
2028
2029        let mut db_participants = db_room
2030            .find_related(room_participant::Entity)
2031            .stream(tx)
2032            .await?;
2033        let mut participants = HashMap::default();
2034        let mut pending_participants = Vec::new();
2035        while let Some(db_participant) = db_participants.next().await {
2036            let db_participant = db_participant?;
2037            if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
2038                .answering_connection_id
2039                .zip(db_participant.answering_connection_server_id)
2040            {
2041                let location = match (
2042                    db_participant.location_kind,
2043                    db_participant.location_project_id,
2044                ) {
2045                    (Some(0), Some(project_id)) => {
2046                        Some(proto::participant_location::Variant::SharedProject(
2047                            proto::participant_location::SharedProject {
2048                                id: project_id.to_proto(),
2049                            },
2050                        ))
2051                    }
2052                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
2053                        Default::default(),
2054                    )),
2055                    _ => Some(proto::participant_location::Variant::External(
2056                        Default::default(),
2057                    )),
2058                };
2059
2060                let answering_connection = ConnectionId {
2061                    owner_id: answering_connection_server_id.0 as u32,
2062                    id: answering_connection_id as u32,
2063                };
2064                participants.insert(
2065                    answering_connection,
2066                    proto::Participant {
2067                        user_id: db_participant.user_id.to_proto(),
2068                        peer_id: Some(answering_connection.into()),
2069                        projects: Default::default(),
2070                        location: Some(proto::ParticipantLocation { variant: location }),
2071                    },
2072                );
2073            } else {
2074                pending_participants.push(proto::PendingParticipant {
2075                    user_id: db_participant.user_id.to_proto(),
2076                    calling_user_id: db_participant.calling_user_id.to_proto(),
2077                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
2078                });
2079            }
2080        }
2081        drop(db_participants);
2082
2083        let mut db_projects = db_room
2084            .find_related(project::Entity)
2085            .find_with_related(worktree::Entity)
2086            .stream(tx)
2087            .await?;
2088
2089        while let Some(row) = db_projects.next().await {
2090            let (db_project, db_worktree) = row?;
2091            let host_connection = db_project.host_connection()?;
2092            if let Some(participant) = participants.get_mut(&host_connection) {
2093                let project = if let Some(project) = participant
2094                    .projects
2095                    .iter_mut()
2096                    .find(|project| project.id == db_project.id.to_proto())
2097                {
2098                    project
2099                } else {
2100                    participant.projects.push(proto::ParticipantProject {
2101                        id: db_project.id.to_proto(),
2102                        worktree_root_names: Default::default(),
2103                    });
2104                    participant.projects.last_mut().unwrap()
2105                };
2106
2107                if let Some(db_worktree) = db_worktree {
2108                    if db_worktree.visible {
2109                        project.worktree_root_names.push(db_worktree.root_name);
2110                    }
2111                }
2112            }
2113        }
2114        drop(db_projects);
2115
2116        let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
2117        let mut followers = Vec::new();
2118        while let Some(db_follower) = db_followers.next().await {
2119            let db_follower = db_follower?;
2120            followers.push(proto::Follower {
2121                leader_id: Some(db_follower.leader_connection().into()),
2122                follower_id: Some(db_follower.follower_connection().into()),
2123                project_id: db_follower.project_id.to_proto(),
2124            });
2125        }
2126
2127        Ok(proto::Room {
2128            id: db_room.id.to_proto(),
2129            live_kit_room: db_room.live_kit_room,
2130            participants: participants.into_values().collect(),
2131            pending_participants,
2132            followers,
2133        })
2134    }
2135
2136    // projects
2137
2138    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
2139        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2140        enum QueryAs {
2141            Count,
2142        }
2143
2144        self.transaction(|tx| async move {
2145            Ok(project::Entity::find()
2146                .select_only()
2147                .column_as(project::Column::Id.count(), QueryAs::Count)
2148                .inner_join(user::Entity)
2149                .filter(user::Column::Admin.eq(false))
2150                .into_values::<_, QueryAs>()
2151                .one(&*tx)
2152                .await?
2153                .unwrap_or(0i64) as usize)
2154        })
2155        .await
2156    }
2157
2158    pub async fn share_project(
2159        &self,
2160        room_id: RoomId,
2161        connection: ConnectionId,
2162        worktrees: &[proto::WorktreeMetadata],
2163    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
2164        self.room_transaction(room_id, |tx| async move {
2165            let participant = room_participant::Entity::find()
2166                .filter(
2167                    Condition::all()
2168                        .add(
2169                            room_participant::Column::AnsweringConnectionId
2170                                .eq(connection.id as i32),
2171                        )
2172                        .add(
2173                            room_participant::Column::AnsweringConnectionServerId
2174                                .eq(connection.owner_id as i32),
2175                        ),
2176                )
2177                .one(&*tx)
2178                .await?
2179                .ok_or_else(|| anyhow!("could not find participant"))?;
2180            if participant.room_id != room_id {
2181                return Err(anyhow!("shared project on unexpected room"))?;
2182            }
2183
2184            let project = project::ActiveModel {
2185                room_id: ActiveValue::set(participant.room_id),
2186                host_user_id: ActiveValue::set(participant.user_id),
2187                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
2188                host_connection_server_id: ActiveValue::set(Some(ServerId(
2189                    connection.owner_id as i32,
2190                ))),
2191                ..Default::default()
2192            }
2193            .insert(&*tx)
2194            .await?;
2195
2196            if !worktrees.is_empty() {
2197                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2198                    worktree::ActiveModel {
2199                        id: ActiveValue::set(worktree.id as i64),
2200                        project_id: ActiveValue::set(project.id),
2201                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
2202                        root_name: ActiveValue::set(worktree.root_name.clone()),
2203                        visible: ActiveValue::set(worktree.visible),
2204                        scan_id: ActiveValue::set(0),
2205                        completed_scan_id: ActiveValue::set(0),
2206                    }
2207                }))
2208                .exec(&*tx)
2209                .await?;
2210            }
2211
2212            project_collaborator::ActiveModel {
2213                project_id: ActiveValue::set(project.id),
2214                connection_id: ActiveValue::set(connection.id as i32),
2215                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2216                user_id: ActiveValue::set(participant.user_id),
2217                replica_id: ActiveValue::set(ReplicaId(0)),
2218                is_host: ActiveValue::set(true),
2219                ..Default::default()
2220            }
2221            .insert(&*tx)
2222            .await?;
2223
2224            let room = self.get_room(room_id, &tx).await?;
2225            Ok((project.id, room))
2226        })
2227        .await
2228    }
2229
2230    pub async fn unshare_project(
2231        &self,
2232        project_id: ProjectId,
2233        connection: ConnectionId,
2234    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2235        let room_id = self.room_id_for_project(project_id).await?;
2236        self.room_transaction(room_id, |tx| async move {
2237            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2238
2239            let project = project::Entity::find_by_id(project_id)
2240                .one(&*tx)
2241                .await?
2242                .ok_or_else(|| anyhow!("project not found"))?;
2243            if project.host_connection()? == connection {
2244                project::Entity::delete(project.into_active_model())
2245                    .exec(&*tx)
2246                    .await?;
2247                let room = self.get_room(room_id, &tx).await?;
2248                Ok((room, guest_connection_ids))
2249            } else {
2250                Err(anyhow!("cannot unshare a project hosted by another user"))?
2251            }
2252        })
2253        .await
2254    }
2255
2256    pub async fn update_project(
2257        &self,
2258        project_id: ProjectId,
2259        connection: ConnectionId,
2260        worktrees: &[proto::WorktreeMetadata],
2261    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2262        let room_id = self.room_id_for_project(project_id).await?;
2263        self.room_transaction(room_id, |tx| async move {
2264            let project = project::Entity::find_by_id(project_id)
2265                .filter(
2266                    Condition::all()
2267                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2268                        .add(
2269                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2270                        ),
2271                )
2272                .one(&*tx)
2273                .await?
2274                .ok_or_else(|| anyhow!("no such project"))?;
2275
2276            self.update_project_worktrees(project.id, worktrees, &tx)
2277                .await?;
2278
2279            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2280            let room = self.get_room(project.room_id, &tx).await?;
2281            Ok((room, guest_connection_ids))
2282        })
2283        .await
2284    }
2285
2286    async fn update_project_worktrees(
2287        &self,
2288        project_id: ProjectId,
2289        worktrees: &[proto::WorktreeMetadata],
2290        tx: &DatabaseTransaction,
2291    ) -> Result<()> {
2292        if !worktrees.is_empty() {
2293            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2294                id: ActiveValue::set(worktree.id as i64),
2295                project_id: ActiveValue::set(project_id),
2296                abs_path: ActiveValue::set(worktree.abs_path.clone()),
2297                root_name: ActiveValue::set(worktree.root_name.clone()),
2298                visible: ActiveValue::set(worktree.visible),
2299                scan_id: ActiveValue::set(0),
2300                completed_scan_id: ActiveValue::set(0),
2301            }))
2302            .on_conflict(
2303                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2304                    .update_column(worktree::Column::RootName)
2305                    .to_owned(),
2306            )
2307            .exec(&*tx)
2308            .await?;
2309        }
2310
2311        worktree::Entity::delete_many()
2312            .filter(worktree::Column::ProjectId.eq(project_id).and(
2313                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2314            ))
2315            .exec(&*tx)
2316            .await?;
2317
2318        Ok(())
2319    }
2320
2321    pub async fn update_worktree(
2322        &self,
2323        update: &proto::UpdateWorktree,
2324        connection: ConnectionId,
2325    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2326        let project_id = ProjectId::from_proto(update.project_id);
2327        let worktree_id = update.worktree_id as i64;
2328        let room_id = self.room_id_for_project(project_id).await?;
2329        self.room_transaction(room_id, |tx| async move {
2330            // Ensure the update comes from the host.
2331            let _project = project::Entity::find_by_id(project_id)
2332                .filter(
2333                    Condition::all()
2334                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2335                        .add(
2336                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2337                        ),
2338                )
2339                .one(&*tx)
2340                .await?
2341                .ok_or_else(|| anyhow!("no such project"))?;
2342
2343            // Update metadata.
2344            worktree::Entity::update(worktree::ActiveModel {
2345                id: ActiveValue::set(worktree_id),
2346                project_id: ActiveValue::set(project_id),
2347                root_name: ActiveValue::set(update.root_name.clone()),
2348                scan_id: ActiveValue::set(update.scan_id as i64),
2349                completed_scan_id: if update.is_last_update {
2350                    ActiveValue::set(update.scan_id as i64)
2351                } else {
2352                    ActiveValue::default()
2353                },
2354                abs_path: ActiveValue::set(update.abs_path.clone()),
2355                ..Default::default()
2356            })
2357            .exec(&*tx)
2358            .await?;
2359
2360            if !update.updated_entries.is_empty() {
2361                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2362                    let mtime = entry.mtime.clone().unwrap_or_default();
2363                    worktree_entry::ActiveModel {
2364                        project_id: ActiveValue::set(project_id),
2365                        worktree_id: ActiveValue::set(worktree_id),
2366                        id: ActiveValue::set(entry.id as i64),
2367                        is_dir: ActiveValue::set(entry.is_dir),
2368                        path: ActiveValue::set(entry.path.clone()),
2369                        inode: ActiveValue::set(entry.inode as i64),
2370                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2371                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2372                        is_symlink: ActiveValue::set(entry.is_symlink),
2373                        is_ignored: ActiveValue::set(entry.is_ignored),
2374                        is_deleted: ActiveValue::set(false),
2375                        scan_id: ActiveValue::set(update.scan_id as i64),
2376                    }
2377                }))
2378                .on_conflict(
2379                    OnConflict::columns([
2380                        worktree_entry::Column::ProjectId,
2381                        worktree_entry::Column::WorktreeId,
2382                        worktree_entry::Column::Id,
2383                    ])
2384                    .update_columns([
2385                        worktree_entry::Column::IsDir,
2386                        worktree_entry::Column::Path,
2387                        worktree_entry::Column::Inode,
2388                        worktree_entry::Column::MtimeSeconds,
2389                        worktree_entry::Column::MtimeNanos,
2390                        worktree_entry::Column::IsSymlink,
2391                        worktree_entry::Column::IsIgnored,
2392                        worktree_entry::Column::ScanId,
2393                    ])
2394                    .to_owned(),
2395                )
2396                .exec(&*tx)
2397                .await?;
2398            }
2399
2400            if !update.removed_entries.is_empty() {
2401                worktree_entry::Entity::update_many()
2402                    .filter(
2403                        worktree_entry::Column::ProjectId
2404                            .eq(project_id)
2405                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2406                            .and(
2407                                worktree_entry::Column::Id
2408                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2409                            ),
2410                    )
2411                    .set(worktree_entry::ActiveModel {
2412                        is_deleted: ActiveValue::Set(true),
2413                        scan_id: ActiveValue::Set(update.scan_id as i64),
2414                        ..Default::default()
2415                    })
2416                    .exec(&*tx)
2417                    .await?;
2418            }
2419
2420            if !update.updated_repositories.is_empty() {
2421                worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
2422                    |repository| worktree_repository::ActiveModel {
2423                        project_id: ActiveValue::set(project_id),
2424                        worktree_id: ActiveValue::set(worktree_id),
2425                        work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
2426                        scan_id: ActiveValue::set(update.scan_id as i64),
2427                        branch: ActiveValue::set(repository.branch.clone()),
2428                        is_deleted: ActiveValue::set(false),
2429                    },
2430                ))
2431                .on_conflict(
2432                    OnConflict::columns([
2433                        worktree_repository::Column::ProjectId,
2434                        worktree_repository::Column::WorktreeId,
2435                        worktree_repository::Column::WorkDirectoryId,
2436                    ])
2437                    .update_columns([
2438                        worktree_repository::Column::ScanId,
2439                        worktree_repository::Column::Branch,
2440                    ])
2441                    .to_owned(),
2442                )
2443                .exec(&*tx)
2444                .await?;
2445
2446                for repository in update.updated_repositories.iter() {
2447                    if !repository.updated_worktree_statuses.is_empty() {
2448                        worktree_repository_statuses::Entity::insert_many(
2449                            repository
2450                                .updated_worktree_statuses
2451                                .iter()
2452                                .map(|status_entry| worktree_repository_statuses::ActiveModel {
2453                                    project_id: ActiveValue::set(project_id),
2454                                    worktree_id: ActiveValue::set(worktree_id),
2455                                    work_directory_id: ActiveValue::set(
2456                                        repository.work_directory_id as i64,
2457                                    ),
2458                                    repo_path: ActiveValue::set(status_entry.repo_path.clone()),
2459                                    status: ActiveValue::set(status_entry.status as i64),
2460                                    scan_id: ActiveValue::set(update.scan_id as i64),
2461                                    is_deleted: ActiveValue::set(false),
2462                                }),
2463                        )
2464                        .on_conflict(
2465                            OnConflict::columns([
2466                                worktree_repository_statuses::Column::ProjectId,
2467                                worktree_repository_statuses::Column::WorktreeId,
2468                                worktree_repository_statuses::Column::WorkDirectoryId,
2469                                worktree_repository_statuses::Column::RepoPath,
2470                            ])
2471                            .update_columns([
2472                                worktree_repository_statuses::Column::ScanId,
2473                                worktree_repository_statuses::Column::Status,
2474                                worktree_repository_statuses::Column::IsDeleted,
2475                            ])
2476                            .to_owned(),
2477                        )
2478                        .exec(&*tx)
2479                        .await?;
2480                    }
2481
2482                    if !repository.removed_worktree_repo_paths.is_empty() {
2483                        worktree_repository_statuses::Entity::update_many()
2484                            .filter(
2485                                worktree_repository_statuses::Column::ProjectId
2486                                    .eq(project_id)
2487                                    .and(
2488                                        worktree_repository_statuses::Column::WorktreeId
2489                                            .eq(worktree_id),
2490                                    )
2491                                    .and(
2492                                        worktree_repository_statuses::Column::WorkDirectoryId
2493                                            .eq(repository.work_directory_id as i64),
2494                                    )
2495                                    .and(
2496                                        worktree_repository_statuses::Column::RepoPath.is_in(
2497                                            repository
2498                                                .removed_worktree_repo_paths
2499                                                .iter()
2500                                                .map(String::as_str),
2501                                        ),
2502                                    ),
2503                            )
2504                            .set(worktree_repository_statuses::ActiveModel {
2505                                is_deleted: ActiveValue::Set(true),
2506                                scan_id: ActiveValue::Set(update.scan_id as i64),
2507                                ..Default::default()
2508                            })
2509                            .exec(&*tx)
2510                            .await?;
2511                    }
2512                }
2513            }
2514
2515            if !update.removed_repositories.is_empty() {
2516                worktree_repository::Entity::update_many()
2517                    .filter(
2518                        worktree_repository::Column::ProjectId
2519                            .eq(project_id)
2520                            .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
2521                            .and(
2522                                worktree_repository::Column::WorkDirectoryId
2523                                    .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
2524                            ),
2525                    )
2526                    .set(worktree_repository::ActiveModel {
2527                        is_deleted: ActiveValue::Set(true),
2528                        scan_id: ActiveValue::Set(update.scan_id as i64),
2529                        ..Default::default()
2530                    })
2531                    .exec(&*tx)
2532                    .await?;
2533            }
2534
2535            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2536            Ok(connection_ids)
2537        })
2538        .await
2539    }
2540
2541    pub async fn update_diagnostic_summary(
2542        &self,
2543        update: &proto::UpdateDiagnosticSummary,
2544        connection: ConnectionId,
2545    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2546        let project_id = ProjectId::from_proto(update.project_id);
2547        let worktree_id = update.worktree_id as i64;
2548        let room_id = self.room_id_for_project(project_id).await?;
2549        self.room_transaction(room_id, |tx| async move {
2550            let summary = update
2551                .summary
2552                .as_ref()
2553                .ok_or_else(|| anyhow!("invalid summary"))?;
2554
2555            // Ensure the update comes from the host.
2556            let project = project::Entity::find_by_id(project_id)
2557                .one(&*tx)
2558                .await?
2559                .ok_or_else(|| anyhow!("no such project"))?;
2560            if project.host_connection()? != connection {
2561                return Err(anyhow!("can't update a project hosted by someone else"))?;
2562            }
2563
2564            // Update summary.
2565            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2566                project_id: ActiveValue::set(project_id),
2567                worktree_id: ActiveValue::set(worktree_id),
2568                path: ActiveValue::set(summary.path.clone()),
2569                language_server_id: ActiveValue::set(summary.language_server_id as i64),
2570                error_count: ActiveValue::set(summary.error_count as i32),
2571                warning_count: ActiveValue::set(summary.warning_count as i32),
2572                ..Default::default()
2573            })
2574            .on_conflict(
2575                OnConflict::columns([
2576                    worktree_diagnostic_summary::Column::ProjectId,
2577                    worktree_diagnostic_summary::Column::WorktreeId,
2578                    worktree_diagnostic_summary::Column::Path,
2579                ])
2580                .update_columns([
2581                    worktree_diagnostic_summary::Column::LanguageServerId,
2582                    worktree_diagnostic_summary::Column::ErrorCount,
2583                    worktree_diagnostic_summary::Column::WarningCount,
2584                ])
2585                .to_owned(),
2586            )
2587            .exec(&*tx)
2588            .await?;
2589
2590            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2591            Ok(connection_ids)
2592        })
2593        .await
2594    }
2595
2596    pub async fn start_language_server(
2597        &self,
2598        update: &proto::StartLanguageServer,
2599        connection: ConnectionId,
2600    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2601        let project_id = ProjectId::from_proto(update.project_id);
2602        let room_id = self.room_id_for_project(project_id).await?;
2603        self.room_transaction(room_id, |tx| async move {
2604            let server = update
2605                .server
2606                .as_ref()
2607                .ok_or_else(|| anyhow!("invalid language server"))?;
2608
2609            // Ensure the update comes from the host.
2610            let project = project::Entity::find_by_id(project_id)
2611                .one(&*tx)
2612                .await?
2613                .ok_or_else(|| anyhow!("no such project"))?;
2614            if project.host_connection()? != connection {
2615                return Err(anyhow!("can't update a project hosted by someone else"))?;
2616            }
2617
2618            // Add the newly-started language server.
2619            language_server::Entity::insert(language_server::ActiveModel {
2620                project_id: ActiveValue::set(project_id),
2621                id: ActiveValue::set(server.id as i64),
2622                name: ActiveValue::set(server.name.clone()),
2623                ..Default::default()
2624            })
2625            .on_conflict(
2626                OnConflict::columns([
2627                    language_server::Column::ProjectId,
2628                    language_server::Column::Id,
2629                ])
2630                .update_column(language_server::Column::Name)
2631                .to_owned(),
2632            )
2633            .exec(&*tx)
2634            .await?;
2635
2636            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2637            Ok(connection_ids)
2638        })
2639        .await
2640    }
2641
2642    pub async fn join_project(
2643        &self,
2644        project_id: ProjectId,
2645        connection: ConnectionId,
2646    ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2647        let room_id = self.room_id_for_project(project_id).await?;
2648        self.room_transaction(room_id, |tx| async move {
2649            let participant = room_participant::Entity::find()
2650                .filter(
2651                    Condition::all()
2652                        .add(
2653                            room_participant::Column::AnsweringConnectionId
2654                                .eq(connection.id as i32),
2655                        )
2656                        .add(
2657                            room_participant::Column::AnsweringConnectionServerId
2658                                .eq(connection.owner_id as i32),
2659                        ),
2660                )
2661                .one(&*tx)
2662                .await?
2663                .ok_or_else(|| anyhow!("must join a room first"))?;
2664
2665            let project = project::Entity::find_by_id(project_id)
2666                .one(&*tx)
2667                .await?
2668                .ok_or_else(|| anyhow!("no such project"))?;
2669            if project.room_id != participant.room_id {
2670                return Err(anyhow!("no such project"))?;
2671            }
2672
2673            let mut collaborators = project
2674                .find_related(project_collaborator::Entity)
2675                .all(&*tx)
2676                .await?;
2677            let replica_ids = collaborators
2678                .iter()
2679                .map(|c| c.replica_id)
2680                .collect::<HashSet<_>>();
2681            let mut replica_id = ReplicaId(1);
2682            while replica_ids.contains(&replica_id) {
2683                replica_id.0 += 1;
2684            }
2685            let new_collaborator = project_collaborator::ActiveModel {
2686                project_id: ActiveValue::set(project_id),
2687                connection_id: ActiveValue::set(connection.id as i32),
2688                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2689                user_id: ActiveValue::set(participant.user_id),
2690                replica_id: ActiveValue::set(replica_id),
2691                is_host: ActiveValue::set(false),
2692                ..Default::default()
2693            }
2694            .insert(&*tx)
2695            .await?;
2696            collaborators.push(new_collaborator);
2697
2698            let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2699            let mut worktrees = db_worktrees
2700                .into_iter()
2701                .map(|db_worktree| {
2702                    (
2703                        db_worktree.id as u64,
2704                        Worktree {
2705                            id: db_worktree.id as u64,
2706                            abs_path: db_worktree.abs_path,
2707                            root_name: db_worktree.root_name,
2708                            visible: db_worktree.visible,
2709                            entries: Default::default(),
2710                            repository_entries: Default::default(),
2711                            diagnostic_summaries: Default::default(),
2712                            scan_id: db_worktree.scan_id as u64,
2713                            completed_scan_id: db_worktree.completed_scan_id as u64,
2714                        },
2715                    )
2716                })
2717                .collect::<BTreeMap<_, _>>();
2718
2719            // Populate worktree entries.
2720            {
2721                let mut db_entries = worktree_entry::Entity::find()
2722                    .filter(
2723                        Condition::all()
2724                            .add(worktree_entry::Column::ProjectId.eq(project_id))
2725                            .add(worktree_entry::Column::IsDeleted.eq(false)),
2726                    )
2727                    .stream(&*tx)
2728                    .await?;
2729                while let Some(db_entry) = db_entries.next().await {
2730                    let db_entry = db_entry?;
2731                    if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2732                        worktree.entries.push(proto::Entry {
2733                            id: db_entry.id as u64,
2734                            is_dir: db_entry.is_dir,
2735                            path: db_entry.path,
2736                            inode: db_entry.inode as u64,
2737                            mtime: Some(proto::Timestamp {
2738                                seconds: db_entry.mtime_seconds as u64,
2739                                nanos: db_entry.mtime_nanos as u32,
2740                            }),
2741                            is_symlink: db_entry.is_symlink,
2742                            is_ignored: db_entry.is_ignored,
2743                        });
2744                    }
2745                }
2746            }
2747
2748            // Populate repository entries.
2749            {
2750                let mut db_repository_entries = worktree_repository::Entity::find()
2751                    .filter(
2752                        Condition::all()
2753                            .add(worktree_repository::Column::ProjectId.eq(project_id))
2754                            .add(worktree_repository::Column::IsDeleted.eq(false)),
2755                    )
2756                    .stream(&*tx)
2757                    .await?;
2758                while let Some(db_repository_entry) = db_repository_entries.next().await {
2759                    let db_repository_entry = db_repository_entry?;
2760                    if let Some(worktree) =
2761                        worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
2762                    {
2763                        worktree.repository_entries.insert(
2764                            db_repository_entry.work_directory_id as u64,
2765                            proto::RepositoryEntry {
2766                                work_directory_id: db_repository_entry.work_directory_id as u64,
2767                                branch: db_repository_entry.branch,
2768                                removed_worktree_repo_paths: Default::default(),
2769                                updated_worktree_statuses: Default::default(),
2770                            },
2771                        );
2772                    }
2773                }
2774            }
2775
2776            {
2777                let mut db_status_entries = worktree_repository_statuses::Entity::find()
2778                    .filter(
2779                        Condition::all()
2780                            .add(worktree_repository_statuses::Column::ProjectId.eq(project_id))
2781                            .add(worktree_repository_statuses::Column::IsDeleted.eq(false)),
2782                    )
2783                    .stream(&*tx)
2784                    .await?;
2785
2786                while let Some(db_status_entry) = db_status_entries.next().await {
2787                    let db_status_entry = db_status_entry?;
2788                    if let Some(worktree) = worktrees.get_mut(&(db_status_entry.worktree_id as u64))
2789                    {
2790                        if let Some(repository_entry) = worktree
2791                            .repository_entries
2792                            .get_mut(&(db_status_entry.work_directory_id as u64))
2793                        {
2794                            repository_entry
2795                                .updated_worktree_statuses
2796                                .push(proto::StatusEntry {
2797                                    repo_path: db_status_entry.repo_path,
2798                                    status: db_status_entry.status as i32,
2799                                });
2800                        }
2801                    }
2802                }
2803            }
2804
2805            // Populate worktree diagnostic summaries.
2806            {
2807                let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2808                    .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2809                    .stream(&*tx)
2810                    .await?;
2811                while let Some(db_summary) = db_summaries.next().await {
2812                    let db_summary = db_summary?;
2813                    if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2814                        worktree
2815                            .diagnostic_summaries
2816                            .push(proto::DiagnosticSummary {
2817                                path: db_summary.path,
2818                                language_server_id: db_summary.language_server_id as u64,
2819                                error_count: db_summary.error_count as u32,
2820                                warning_count: db_summary.warning_count as u32,
2821                            });
2822                    }
2823                }
2824            }
2825
2826            // Populate language servers.
2827            let language_servers = project
2828                .find_related(language_server::Entity)
2829                .all(&*tx)
2830                .await?;
2831
2832            let project = Project {
2833                collaborators: collaborators
2834                    .into_iter()
2835                    .map(|collaborator| ProjectCollaborator {
2836                        connection_id: collaborator.connection(),
2837                        user_id: collaborator.user_id,
2838                        replica_id: collaborator.replica_id,
2839                        is_host: collaborator.is_host,
2840                    })
2841                    .collect(),
2842                worktrees,
2843                language_servers: language_servers
2844                    .into_iter()
2845                    .map(|language_server| proto::LanguageServer {
2846                        id: language_server.id as u64,
2847                        name: language_server.name,
2848                    })
2849                    .collect(),
2850            };
2851            Ok((project, replica_id as ReplicaId))
2852        })
2853        .await
2854    }
2855
2856    pub async fn leave_project(
2857        &self,
2858        project_id: ProjectId,
2859        connection: ConnectionId,
2860    ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
2861        let room_id = self.room_id_for_project(project_id).await?;
2862        self.room_transaction(room_id, |tx| async move {
2863            let result = project_collaborator::Entity::delete_many()
2864                .filter(
2865                    Condition::all()
2866                        .add(project_collaborator::Column::ProjectId.eq(project_id))
2867                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2868                        .add(
2869                            project_collaborator::Column::ConnectionServerId
2870                                .eq(connection.owner_id as i32),
2871                        ),
2872                )
2873                .exec(&*tx)
2874                .await?;
2875            if result.rows_affected == 0 {
2876                Err(anyhow!("not a collaborator on this project"))?;
2877            }
2878
2879            let project = project::Entity::find_by_id(project_id)
2880                .one(&*tx)
2881                .await?
2882                .ok_or_else(|| anyhow!("no such project"))?;
2883            let collaborators = project
2884                .find_related(project_collaborator::Entity)
2885                .all(&*tx)
2886                .await?;
2887            let connection_ids = collaborators
2888                .into_iter()
2889                .map(|collaborator| collaborator.connection())
2890                .collect();
2891
2892            follower::Entity::delete_many()
2893                .filter(
2894                    Condition::any()
2895                        .add(
2896                            Condition::all()
2897                                .add(follower::Column::ProjectId.eq(project_id))
2898                                .add(
2899                                    follower::Column::LeaderConnectionServerId
2900                                        .eq(connection.owner_id),
2901                                )
2902                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
2903                        )
2904                        .add(
2905                            Condition::all()
2906                                .add(follower::Column::ProjectId.eq(project_id))
2907                                .add(
2908                                    follower::Column::FollowerConnectionServerId
2909                                        .eq(connection.owner_id),
2910                                )
2911                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
2912                        ),
2913                )
2914                .exec(&*tx)
2915                .await?;
2916
2917            let room = self.get_room(project.room_id, &tx).await?;
2918            let left_project = LeftProject {
2919                id: project_id,
2920                host_user_id: project.host_user_id,
2921                host_connection_id: project.host_connection()?,
2922                connection_ids,
2923            };
2924            Ok((room, left_project))
2925        })
2926        .await
2927    }
2928
2929    pub async fn project_collaborators(
2930        &self,
2931        project_id: ProjectId,
2932        connection_id: ConnectionId,
2933    ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
2934        let room_id = self.room_id_for_project(project_id).await?;
2935        self.room_transaction(room_id, |tx| async move {
2936            let collaborators = project_collaborator::Entity::find()
2937                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2938                .all(&*tx)
2939                .await?
2940                .into_iter()
2941                .map(|collaborator| ProjectCollaborator {
2942                    connection_id: collaborator.connection(),
2943                    user_id: collaborator.user_id,
2944                    replica_id: collaborator.replica_id,
2945                    is_host: collaborator.is_host,
2946                })
2947                .collect::<Vec<_>>();
2948
2949            if collaborators
2950                .iter()
2951                .any(|collaborator| collaborator.connection_id == connection_id)
2952            {
2953                Ok(collaborators)
2954            } else {
2955                Err(anyhow!("no such project"))?
2956            }
2957        })
2958        .await
2959    }
2960
2961    pub async fn project_connection_ids(
2962        &self,
2963        project_id: ProjectId,
2964        connection_id: ConnectionId,
2965    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
2966        let room_id = self.room_id_for_project(project_id).await?;
2967        self.room_transaction(room_id, |tx| async move {
2968            let mut collaborators = project_collaborator::Entity::find()
2969                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2970                .stream(&*tx)
2971                .await?;
2972
2973            let mut connection_ids = HashSet::default();
2974            while let Some(collaborator) = collaborators.next().await {
2975                let collaborator = collaborator?;
2976                connection_ids.insert(collaborator.connection());
2977            }
2978
2979            if connection_ids.contains(&connection_id) {
2980                Ok(connection_ids)
2981            } else {
2982                Err(anyhow!("no such project"))?
2983            }
2984        })
2985        .await
2986    }
2987
2988    async fn project_guest_connection_ids(
2989        &self,
2990        project_id: ProjectId,
2991        tx: &DatabaseTransaction,
2992    ) -> Result<Vec<ConnectionId>> {
2993        let mut collaborators = project_collaborator::Entity::find()
2994            .filter(
2995                project_collaborator::Column::ProjectId
2996                    .eq(project_id)
2997                    .and(project_collaborator::Column::IsHost.eq(false)),
2998            )
2999            .stream(tx)
3000            .await?;
3001
3002        let mut guest_connection_ids = Vec::new();
3003        while let Some(collaborator) = collaborators.next().await {
3004            let collaborator = collaborator?;
3005            guest_connection_ids.push(collaborator.connection());
3006        }
3007        Ok(guest_connection_ids)
3008    }
3009
3010    async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
3011        self.transaction(|tx| async move {
3012            let project = project::Entity::find_by_id(project_id)
3013                .one(&*tx)
3014                .await?
3015                .ok_or_else(|| anyhow!("project {} not found", project_id))?;
3016            Ok(project.room_id)
3017        })
3018        .await
3019    }
3020
3021    // access tokens
3022
3023    pub async fn create_access_token(
3024        &self,
3025        user_id: UserId,
3026        access_token_hash: &str,
3027        max_access_token_count: usize,
3028    ) -> Result<AccessTokenId> {
3029        self.transaction(|tx| async {
3030            let tx = tx;
3031
3032            let token = access_token::ActiveModel {
3033                user_id: ActiveValue::set(user_id),
3034                hash: ActiveValue::set(access_token_hash.into()),
3035                ..Default::default()
3036            }
3037            .insert(&*tx)
3038            .await?;
3039
3040            access_token::Entity::delete_many()
3041                .filter(
3042                    access_token::Column::Id.in_subquery(
3043                        Query::select()
3044                            .column(access_token::Column::Id)
3045                            .from(access_token::Entity)
3046                            .and_where(access_token::Column::UserId.eq(user_id))
3047                            .order_by(access_token::Column::Id, sea_orm::Order::Desc)
3048                            .limit(10000)
3049                            .offset(max_access_token_count as u64)
3050                            .to_owned(),
3051                    ),
3052                )
3053                .exec(&*tx)
3054                .await?;
3055            Ok(token.id)
3056        })
3057        .await
3058    }
3059
3060    pub async fn get_access_token(
3061        &self,
3062        access_token_id: AccessTokenId,
3063    ) -> Result<access_token::Model> {
3064        self.transaction(|tx| async move {
3065            Ok(access_token::Entity::find_by_id(access_token_id)
3066                .one(&*tx)
3067                .await?
3068                .ok_or_else(|| anyhow!("no such access token"))?)
3069        })
3070        .await
3071    }
3072
3073    async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
3074    where
3075        F: Send + Fn(TransactionHandle) -> Fut,
3076        Fut: Send + Future<Output = Result<T>>,
3077    {
3078        let body = async {
3079            let mut i = 0;
3080            loop {
3081                let (tx, result) = self.with_transaction(&f).await?;
3082                match result {
3083                    Ok(result) => match tx.commit().await.map_err(Into::into) {
3084                        Ok(()) => return Ok(result),
3085                        Err(error) => {
3086                            if !self.retry_on_serialization_error(&error, i).await {
3087                                return Err(error);
3088                            }
3089                        }
3090                    },
3091                    Err(error) => {
3092                        tx.rollback().await?;
3093                        if !self.retry_on_serialization_error(&error, i).await {
3094                            return Err(error);
3095                        }
3096                    }
3097                }
3098                i += 1;
3099            }
3100        };
3101
3102        self.run(body).await
3103    }
3104
3105    async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
3106    where
3107        F: Send + Fn(TransactionHandle) -> Fut,
3108        Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
3109    {
3110        let body = async {
3111            let mut i = 0;
3112            loop {
3113                let (tx, result) = self.with_transaction(&f).await?;
3114                match result {
3115                    Ok(Some((room_id, data))) => {
3116                        let lock = self.rooms.entry(room_id).or_default().clone();
3117                        let _guard = lock.lock_owned().await;
3118                        match tx.commit().await.map_err(Into::into) {
3119                            Ok(()) => {
3120                                return Ok(Some(RoomGuard {
3121                                    data,
3122                                    _guard,
3123                                    _not_send: PhantomData,
3124                                }));
3125                            }
3126                            Err(error) => {
3127                                if !self.retry_on_serialization_error(&error, i).await {
3128                                    return Err(error);
3129                                }
3130                            }
3131                        }
3132                    }
3133                    Ok(None) => match tx.commit().await.map_err(Into::into) {
3134                        Ok(()) => return Ok(None),
3135                        Err(error) => {
3136                            if !self.retry_on_serialization_error(&error, i).await {
3137                                return Err(error);
3138                            }
3139                        }
3140                    },
3141                    Err(error) => {
3142                        tx.rollback().await?;
3143                        if !self.retry_on_serialization_error(&error, i).await {
3144                            return Err(error);
3145                        }
3146                    }
3147                }
3148                i += 1;
3149            }
3150        };
3151
3152        self.run(body).await
3153    }
3154
3155    async fn room_transaction<F, Fut, T>(&self, room_id: RoomId, f: F) -> Result<RoomGuard<T>>
3156    where
3157        F: Send + Fn(TransactionHandle) -> Fut,
3158        Fut: Send + Future<Output = Result<T>>,
3159    {
3160        let body = async {
3161            let mut i = 0;
3162            loop {
3163                let lock = self.rooms.entry(room_id).or_default().clone();
3164                let _guard = lock.lock_owned().await;
3165                let (tx, result) = self.with_transaction(&f).await?;
3166                match result {
3167                    Ok(data) => match tx.commit().await.map_err(Into::into) {
3168                        Ok(()) => {
3169                            return Ok(RoomGuard {
3170                                data,
3171                                _guard,
3172                                _not_send: PhantomData,
3173                            });
3174                        }
3175                        Err(error) => {
3176                            if !self.retry_on_serialization_error(&error, i).await {
3177                                return Err(error);
3178                            }
3179                        }
3180                    },
3181                    Err(error) => {
3182                        tx.rollback().await?;
3183                        if !self.retry_on_serialization_error(&error, i).await {
3184                            return Err(error);
3185                        }
3186                    }
3187                }
3188                i += 1;
3189            }
3190        };
3191
3192        self.run(body).await
3193    }
3194
3195    async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
3196    where
3197        F: Send + Fn(TransactionHandle) -> Fut,
3198        Fut: Send + Future<Output = Result<T>>,
3199    {
3200        let tx = self
3201            .pool
3202            .begin_with_config(Some(IsolationLevel::Serializable), None)
3203            .await?;
3204
3205        let mut tx = Arc::new(Some(tx));
3206        let result = f(TransactionHandle(tx.clone())).await;
3207        let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
3208            return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
3209        };
3210
3211        Ok((tx, result))
3212    }
3213
3214    async fn run<F, T>(&self, future: F) -> Result<T>
3215    where
3216        F: Future<Output = Result<T>>,
3217    {
3218        #[cfg(test)]
3219        {
3220            if let Executor::Deterministic(executor) = &self.executor {
3221                executor.simulate_random_delay().await;
3222            }
3223
3224            self.runtime.as_ref().unwrap().block_on(future)
3225        }
3226
3227        #[cfg(not(test))]
3228        {
3229            future.await
3230        }
3231    }
3232
3233    async fn retry_on_serialization_error(&self, error: &Error, prev_attempt_count: u32) -> bool {
3234        // If the error is due to a failure to serialize concurrent transactions, then retry
3235        // this transaction after a delay. With each subsequent retry, double the delay duration.
3236        // Also vary the delay randomly in order to ensure different database connections retry
3237        // at different times.
3238        if is_serialization_error(error) {
3239            let base_delay = 4_u64 << prev_attempt_count.min(16);
3240            let randomized_delay = base_delay as f32 * self.rng.lock().await.gen_range(0.5..=2.0);
3241            log::info!(
3242                "retrying transaction after serialization error. delay: {} ms.",
3243                randomized_delay
3244            );
3245            self.executor
3246                .sleep(Duration::from_millis(randomized_delay as u64))
3247                .await;
3248            true
3249        } else {
3250            false
3251        }
3252    }
3253}
3254
3255fn is_serialization_error(error: &Error) -> bool {
3256    const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
3257    match error {
3258        Error::Database(
3259            DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
3260            | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
3261        ) if error
3262            .as_database_error()
3263            .and_then(|error| error.code())
3264            .as_deref()
3265            == Some(SERIALIZATION_FAILURE_CODE) =>
3266        {
3267            true
3268        }
3269        _ => false,
3270    }
3271}
3272
3273struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
3274
3275impl Deref for TransactionHandle {
3276    type Target = DatabaseTransaction;
3277
3278    fn deref(&self) -> &Self::Target {
3279        self.0.as_ref().as_ref().unwrap()
3280    }
3281}
3282
3283pub struct RoomGuard<T> {
3284    data: T,
3285    _guard: OwnedMutexGuard<()>,
3286    _not_send: PhantomData<Rc<()>>,
3287}
3288
3289impl<T> Deref for RoomGuard<T> {
3290    type Target = T;
3291
3292    fn deref(&self) -> &T {
3293        &self.data
3294    }
3295}
3296
3297impl<T> DerefMut for RoomGuard<T> {
3298    fn deref_mut(&mut self) -> &mut T {
3299        &mut self.data
3300    }
3301}
3302
3303#[derive(Debug, Serialize, Deserialize)]
3304pub struct NewUserParams {
3305    pub github_login: String,
3306    pub github_user_id: i32,
3307    pub invite_count: i32,
3308}
3309
3310#[derive(Debug)]
3311pub struct NewUserResult {
3312    pub user_id: UserId,
3313    pub metrics_id: String,
3314    pub inviting_user_id: Option<UserId>,
3315    pub signup_device_id: Option<String>,
3316}
3317
3318fn random_invite_code() -> String {
3319    nanoid::nanoid!(16)
3320}
3321
3322fn random_email_confirmation_code() -> String {
3323    nanoid::nanoid!(64)
3324}
3325
3326macro_rules! id_type {
3327    ($name:ident) => {
3328        #[derive(
3329            Clone,
3330            Copy,
3331            Debug,
3332            Default,
3333            PartialEq,
3334            Eq,
3335            PartialOrd,
3336            Ord,
3337            Hash,
3338            Serialize,
3339            Deserialize,
3340        )]
3341        #[serde(transparent)]
3342        pub struct $name(pub i32);
3343
3344        impl $name {
3345            #[allow(unused)]
3346            pub const MAX: Self = Self(i32::MAX);
3347
3348            #[allow(unused)]
3349            pub fn from_proto(value: u64) -> Self {
3350                Self(value as i32)
3351            }
3352
3353            #[allow(unused)]
3354            pub fn to_proto(self) -> u64 {
3355                self.0 as u64
3356            }
3357        }
3358
3359        impl std::fmt::Display for $name {
3360            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
3361                self.0.fmt(f)
3362            }
3363        }
3364
3365        impl From<$name> for sea_query::Value {
3366            fn from(value: $name) -> Self {
3367                sea_query::Value::Int(Some(value.0))
3368            }
3369        }
3370
3371        impl sea_orm::TryGetable for $name {
3372            fn try_get(
3373                res: &sea_orm::QueryResult,
3374                pre: &str,
3375                col: &str,
3376            ) -> Result<Self, sea_orm::TryGetError> {
3377                Ok(Self(i32::try_get(res, pre, col)?))
3378            }
3379        }
3380
3381        impl sea_query::ValueType for $name {
3382            fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
3383                match v {
3384                    Value::TinyInt(Some(int)) => {
3385                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3386                    }
3387                    Value::SmallInt(Some(int)) => {
3388                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3389                    }
3390                    Value::Int(Some(int)) => {
3391                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3392                    }
3393                    Value::BigInt(Some(int)) => {
3394                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3395                    }
3396                    Value::TinyUnsigned(Some(int)) => {
3397                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3398                    }
3399                    Value::SmallUnsigned(Some(int)) => {
3400                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3401                    }
3402                    Value::Unsigned(Some(int)) => {
3403                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3404                    }
3405                    Value::BigUnsigned(Some(int)) => {
3406                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3407                    }
3408                    _ => Err(sea_query::ValueTypeErr),
3409                }
3410            }
3411
3412            fn type_name() -> String {
3413                stringify!($name).into()
3414            }
3415
3416            fn array_type() -> sea_query::ArrayType {
3417                sea_query::ArrayType::Int
3418            }
3419
3420            fn column_type() -> sea_query::ColumnType {
3421                sea_query::ColumnType::Integer(None)
3422            }
3423        }
3424
3425        impl sea_orm::TryFromU64 for $name {
3426            fn try_from_u64(n: u64) -> Result<Self, DbErr> {
3427                Ok(Self(n.try_into().map_err(|_| {
3428                    DbErr::ConvertFromU64(concat!(
3429                        "error converting ",
3430                        stringify!($name),
3431                        " to u64"
3432                    ))
3433                })?))
3434            }
3435        }
3436
3437        impl sea_query::Nullable for $name {
3438            fn null() -> Value {
3439                Value::Int(None)
3440            }
3441        }
3442    };
3443}
3444
3445id_type!(AccessTokenId);
3446id_type!(ContactId);
3447id_type!(FollowerId);
3448id_type!(RoomId);
3449id_type!(RoomParticipantId);
3450id_type!(ProjectId);
3451id_type!(ProjectCollaboratorId);
3452id_type!(ReplicaId);
3453id_type!(ServerId);
3454id_type!(SignupId);
3455id_type!(UserId);
3456
3457pub struct RejoinedRoom {
3458    pub room: proto::Room,
3459    pub rejoined_projects: Vec<RejoinedProject>,
3460    pub reshared_projects: Vec<ResharedProject>,
3461}
3462
3463pub struct ResharedProject {
3464    pub id: ProjectId,
3465    pub old_connection_id: ConnectionId,
3466    pub collaborators: Vec<ProjectCollaborator>,
3467    pub worktrees: Vec<proto::WorktreeMetadata>,
3468}
3469
3470pub struct RejoinedProject {
3471    pub id: ProjectId,
3472    pub old_connection_id: ConnectionId,
3473    pub collaborators: Vec<ProjectCollaborator>,
3474    pub worktrees: Vec<RejoinedWorktree>,
3475    pub language_servers: Vec<proto::LanguageServer>,
3476}
3477
3478#[derive(Debug)]
3479pub struct RejoinedWorktree {
3480    pub id: u64,
3481    pub abs_path: String,
3482    pub root_name: String,
3483    pub visible: bool,
3484    pub updated_entries: Vec<proto::Entry>,
3485    pub removed_entries: Vec<u64>,
3486    pub updated_repositories: Vec<proto::RepositoryEntry>,
3487    pub removed_repositories: Vec<u64>,
3488    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3489    pub scan_id: u64,
3490    pub completed_scan_id: u64,
3491}
3492
3493pub struct LeftRoom {
3494    pub room: proto::Room,
3495    pub left_projects: HashMap<ProjectId, LeftProject>,
3496    pub canceled_calls_to_user_ids: Vec<UserId>,
3497}
3498
3499pub struct RefreshedRoom {
3500    pub room: proto::Room,
3501    pub stale_participant_user_ids: Vec<UserId>,
3502    pub canceled_calls_to_user_ids: Vec<UserId>,
3503}
3504
3505pub struct Project {
3506    pub collaborators: Vec<ProjectCollaborator>,
3507    pub worktrees: BTreeMap<u64, Worktree>,
3508    pub language_servers: Vec<proto::LanguageServer>,
3509}
3510
3511pub struct ProjectCollaborator {
3512    pub connection_id: ConnectionId,
3513    pub user_id: UserId,
3514    pub replica_id: ReplicaId,
3515    pub is_host: bool,
3516}
3517
3518impl ProjectCollaborator {
3519    pub fn to_proto(&self) -> proto::Collaborator {
3520        proto::Collaborator {
3521            peer_id: Some(self.connection_id.into()),
3522            replica_id: self.replica_id.0 as u32,
3523            user_id: self.user_id.to_proto(),
3524        }
3525    }
3526}
3527
3528#[derive(Debug)]
3529pub struct LeftProject {
3530    pub id: ProjectId,
3531    pub host_user_id: UserId,
3532    pub host_connection_id: ConnectionId,
3533    pub connection_ids: Vec<ConnectionId>,
3534}
3535
3536pub struct Worktree {
3537    pub id: u64,
3538    pub abs_path: String,
3539    pub root_name: String,
3540    pub visible: bool,
3541    pub entries: Vec<proto::Entry>,
3542    pub repository_entries: BTreeMap<u64, proto::RepositoryEntry>,
3543    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3544    pub scan_id: u64,
3545    pub completed_scan_id: u64,
3546}
3547
3548#[cfg(test)]
3549pub use test::*;
3550
3551#[cfg(test)]
3552mod test {
3553    use super::*;
3554    use gpui::executor::Background;
3555    use lazy_static::lazy_static;
3556    use parking_lot::Mutex;
3557    use sea_orm::ConnectionTrait;
3558    use sqlx::migrate::MigrateDatabase;
3559    use std::sync::Arc;
3560
3561    pub struct TestDb {
3562        pub db: Option<Arc<Database>>,
3563        pub connection: Option<sqlx::AnyConnection>,
3564    }
3565
3566    impl TestDb {
3567        pub fn sqlite(background: Arc<Background>) -> Self {
3568            let url = format!("sqlite::memory:");
3569            let runtime = tokio::runtime::Builder::new_current_thread()
3570                .enable_io()
3571                .enable_time()
3572                .build()
3573                .unwrap();
3574
3575            let mut db = runtime.block_on(async {
3576                let mut options = ConnectOptions::new(url);
3577                options.max_connections(5);
3578                let db = Database::new(options, Executor::Deterministic(background))
3579                    .await
3580                    .unwrap();
3581                let sql = include_str!(concat!(
3582                    env!("CARGO_MANIFEST_DIR"),
3583                    "/migrations.sqlite/20221109000000_test_schema.sql"
3584                ));
3585                db.pool
3586                    .execute(sea_orm::Statement::from_string(
3587                        db.pool.get_database_backend(),
3588                        sql.into(),
3589                    ))
3590                    .await
3591                    .unwrap();
3592                db
3593            });
3594
3595            db.runtime = Some(runtime);
3596
3597            Self {
3598                db: Some(Arc::new(db)),
3599                connection: None,
3600            }
3601        }
3602
3603        pub fn postgres(background: Arc<Background>) -> Self {
3604            lazy_static! {
3605                static ref LOCK: Mutex<()> = Mutex::new(());
3606            }
3607
3608            let _guard = LOCK.lock();
3609            let mut rng = StdRng::from_entropy();
3610            let url = format!(
3611                "postgres://postgres@localhost/zed-test-{}",
3612                rng.gen::<u128>()
3613            );
3614            let runtime = tokio::runtime::Builder::new_current_thread()
3615                .enable_io()
3616                .enable_time()
3617                .build()
3618                .unwrap();
3619
3620            let mut db = runtime.block_on(async {
3621                sqlx::Postgres::create_database(&url)
3622                    .await
3623                    .expect("failed to create test db");
3624                let mut options = ConnectOptions::new(url);
3625                options
3626                    .max_connections(5)
3627                    .idle_timeout(Duration::from_secs(0));
3628                let db = Database::new(options, Executor::Deterministic(background))
3629                    .await
3630                    .unwrap();
3631                let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
3632                db.migrate(Path::new(migrations_path), false).await.unwrap();
3633                db
3634            });
3635
3636            db.runtime = Some(runtime);
3637
3638            Self {
3639                db: Some(Arc::new(db)),
3640                connection: None,
3641            }
3642        }
3643
3644        pub fn db(&self) -> &Arc<Database> {
3645            self.db.as_ref().unwrap()
3646        }
3647    }
3648
3649    impl Drop for TestDb {
3650        fn drop(&mut self) {
3651            let db = self.db.take().unwrap();
3652            if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
3653                db.runtime.as_ref().unwrap().block_on(async {
3654                    use util::ResultExt;
3655                    let query = "
3656                        SELECT pg_terminate_backend(pg_stat_activity.pid)
3657                        FROM pg_stat_activity
3658                        WHERE
3659                            pg_stat_activity.datname = current_database() AND
3660                            pid <> pg_backend_pid();
3661                    ";
3662                    db.pool
3663                        .execute(sea_orm::Statement::from_string(
3664                            db.pool.get_database_backend(),
3665                            query.into(),
3666                        ))
3667                        .await
3668                        .log_err();
3669                    sqlx::Postgres::drop_database(db.options.get_url())
3670                        .await
3671                        .log_err();
3672                })
3673            }
3674        }
3675    }
3676}