db.rs

   1mod access_token;
   2mod contact;
   3mod follower;
   4mod language_server;
   5mod project;
   6mod project_collaborator;
   7mod room;
   8mod room_participant;
   9mod server;
  10mod signup;
  11#[cfg(test)]
  12mod tests;
  13mod user;
  14mod worktree;
  15mod worktree_diagnostic_summary;
  16mod worktree_entry;
  17mod worktree_repository;
  18
  19use crate::executor::Executor;
  20use crate::{Error, Result};
  21use anyhow::anyhow;
  22use collections::{BTreeMap, HashMap, HashSet};
  23pub use contact::Contact;
  24use dashmap::DashMap;
  25use futures::StreamExt;
  26use hyper::StatusCode;
  27use rand::prelude::StdRng;
  28use rand::{Rng, SeedableRng};
  29use rpc::{proto, ConnectionId};
  30use sea_orm::Condition;
  31pub use sea_orm::ConnectOptions;
  32use sea_orm::{
  33    entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
  34    DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
  35    Statement, TransactionTrait,
  36};
  37use sea_query::{Alias, Expr, OnConflict, Query};
  38use serde::{Deserialize, Serialize};
  39pub use signup::{Invite, NewSignup, WaitlistSummary};
  40use sqlx::migrate::{Migrate, Migration, MigrationSource};
  41use sqlx::Connection;
  42use std::ops::{Deref, DerefMut};
  43use std::path::Path;
  44use std::time::Duration;
  45use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
  46use tokio::sync::{Mutex, OwnedMutexGuard};
  47pub use user::Model as User;
  48
  49pub struct Database {
  50    options: ConnectOptions,
  51    pool: DatabaseConnection,
  52    rooms: DashMap<RoomId, Arc<Mutex<()>>>,
  53    rng: Mutex<StdRng>,
  54    executor: Executor,
  55    #[cfg(test)]
  56    runtime: Option<tokio::runtime::Runtime>,
  57}
  58
  59impl Database {
  60    pub async fn new(options: ConnectOptions, executor: Executor) -> Result<Self> {
  61        Ok(Self {
  62            options: options.clone(),
  63            pool: sea_orm::Database::connect(options).await?,
  64            rooms: DashMap::with_capacity(16384),
  65            rng: Mutex::new(StdRng::seed_from_u64(0)),
  66            executor,
  67            #[cfg(test)]
  68            runtime: None,
  69        })
  70    }
  71
  72    #[cfg(test)]
  73    pub fn reset(&self) {
  74        self.rooms.clear();
  75    }
  76
  77    pub async fn migrate(
  78        &self,
  79        migrations_path: &Path,
  80        ignore_checksum_mismatch: bool,
  81    ) -> anyhow::Result<Vec<(Migration, Duration)>> {
  82        let migrations = MigrationSource::resolve(migrations_path)
  83            .await
  84            .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
  85
  86        let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
  87
  88        connection.ensure_migrations_table().await?;
  89        let applied_migrations: HashMap<_, _> = connection
  90            .list_applied_migrations()
  91            .await?
  92            .into_iter()
  93            .map(|m| (m.version, m))
  94            .collect();
  95
  96        let mut new_migrations = Vec::new();
  97        for migration in migrations {
  98            match applied_migrations.get(&migration.version) {
  99                Some(applied_migration) => {
 100                    if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
 101                    {
 102                        Err(anyhow!(
 103                            "checksum mismatch for applied migration {}",
 104                            migration.description
 105                        ))?;
 106                    }
 107                }
 108                None => {
 109                    let elapsed = connection.apply(&migration).await?;
 110                    new_migrations.push((migration, elapsed));
 111                }
 112            }
 113        }
 114
 115        Ok(new_migrations)
 116    }
 117
 118    pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
 119        self.transaction(|tx| async move {
 120            let server = server::ActiveModel {
 121                environment: ActiveValue::set(environment.into()),
 122                ..Default::default()
 123            }
 124            .insert(&*tx)
 125            .await?;
 126            Ok(server.id)
 127        })
 128        .await
 129    }
 130
 131    pub async fn stale_room_ids(
 132        &self,
 133        environment: &str,
 134        new_server_id: ServerId,
 135    ) -> Result<Vec<RoomId>> {
 136        self.transaction(|tx| async move {
 137            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 138            enum QueryAs {
 139                RoomId,
 140            }
 141
 142            let stale_server_epochs = self
 143                .stale_server_ids(environment, new_server_id, &tx)
 144                .await?;
 145            Ok(room_participant::Entity::find()
 146                .select_only()
 147                .column(room_participant::Column::RoomId)
 148                .distinct()
 149                .filter(
 150                    room_participant::Column::AnsweringConnectionServerId
 151                        .is_in(stale_server_epochs),
 152                )
 153                .into_values::<_, QueryAs>()
 154                .all(&*tx)
 155                .await?)
 156        })
 157        .await
 158    }
 159
 160    pub async fn refresh_room(
 161        &self,
 162        room_id: RoomId,
 163        new_server_id: ServerId,
 164    ) -> Result<RoomGuard<RefreshedRoom>> {
 165        self.room_transaction(room_id, |tx| async move {
 166            let stale_participant_filter = Condition::all()
 167                .add(room_participant::Column::RoomId.eq(room_id))
 168                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
 169                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
 170
 171            let stale_participant_user_ids = room_participant::Entity::find()
 172                .filter(stale_participant_filter.clone())
 173                .all(&*tx)
 174                .await?
 175                .into_iter()
 176                .map(|participant| participant.user_id)
 177                .collect::<Vec<_>>();
 178
 179            // Delete participants who failed to reconnect and cancel their calls.
 180            let mut canceled_calls_to_user_ids = Vec::new();
 181            room_participant::Entity::delete_many()
 182                .filter(stale_participant_filter)
 183                .exec(&*tx)
 184                .await?;
 185            let called_participants = room_participant::Entity::find()
 186                .filter(
 187                    Condition::all()
 188                        .add(
 189                            room_participant::Column::CallingUserId
 190                                .is_in(stale_participant_user_ids.iter().copied()),
 191                        )
 192                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
 193                )
 194                .all(&*tx)
 195                .await?;
 196            room_participant::Entity::delete_many()
 197                .filter(
 198                    room_participant::Column::Id
 199                        .is_in(called_participants.iter().map(|participant| participant.id)),
 200                )
 201                .exec(&*tx)
 202                .await?;
 203            canceled_calls_to_user_ids.extend(
 204                called_participants
 205                    .into_iter()
 206                    .map(|participant| participant.user_id),
 207            );
 208
 209            let room = self.get_room(room_id, &tx).await?;
 210            // Delete the room if it becomes empty.
 211            if room.participants.is_empty() {
 212                project::Entity::delete_many()
 213                    .filter(project::Column::RoomId.eq(room_id))
 214                    .exec(&*tx)
 215                    .await?;
 216                room::Entity::delete_by_id(room_id).exec(&*tx).await?;
 217            }
 218
 219            Ok(RefreshedRoom {
 220                room,
 221                stale_participant_user_ids,
 222                canceled_calls_to_user_ids,
 223            })
 224        })
 225        .await
 226    }
 227
 228    pub async fn delete_stale_servers(
 229        &self,
 230        environment: &str,
 231        new_server_id: ServerId,
 232    ) -> Result<()> {
 233        self.transaction(|tx| async move {
 234            server::Entity::delete_many()
 235                .filter(
 236                    Condition::all()
 237                        .add(server::Column::Environment.eq(environment))
 238                        .add(server::Column::Id.ne(new_server_id)),
 239                )
 240                .exec(&*tx)
 241                .await?;
 242            Ok(())
 243        })
 244        .await
 245    }
 246
 247    async fn stale_server_ids(
 248        &self,
 249        environment: &str,
 250        new_server_id: ServerId,
 251        tx: &DatabaseTransaction,
 252    ) -> Result<Vec<ServerId>> {
 253        let stale_servers = server::Entity::find()
 254            .filter(
 255                Condition::all()
 256                    .add(server::Column::Environment.eq(environment))
 257                    .add(server::Column::Id.ne(new_server_id)),
 258            )
 259            .all(&*tx)
 260            .await?;
 261        Ok(stale_servers.into_iter().map(|server| server.id).collect())
 262    }
 263
 264    // users
 265
 266    pub async fn create_user(
 267        &self,
 268        email_address: &str,
 269        admin: bool,
 270        params: NewUserParams,
 271    ) -> Result<NewUserResult> {
 272        self.transaction(|tx| async {
 273            let tx = tx;
 274            let user = user::Entity::insert(user::ActiveModel {
 275                email_address: ActiveValue::set(Some(email_address.into())),
 276                github_login: ActiveValue::set(params.github_login.clone()),
 277                github_user_id: ActiveValue::set(Some(params.github_user_id)),
 278                admin: ActiveValue::set(admin),
 279                metrics_id: ActiveValue::set(Uuid::new_v4()),
 280                ..Default::default()
 281            })
 282            .on_conflict(
 283                OnConflict::column(user::Column::GithubLogin)
 284                    .update_column(user::Column::GithubLogin)
 285                    .to_owned(),
 286            )
 287            .exec_with_returning(&*tx)
 288            .await?;
 289
 290            Ok(NewUserResult {
 291                user_id: user.id,
 292                metrics_id: user.metrics_id.to_string(),
 293                signup_device_id: None,
 294                inviting_user_id: None,
 295            })
 296        })
 297        .await
 298    }
 299
 300    pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
 301        self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
 302            .await
 303    }
 304
 305    pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
 306        self.transaction(|tx| async {
 307            let tx = tx;
 308            Ok(user::Entity::find()
 309                .filter(user::Column::Id.is_in(ids.iter().copied()))
 310                .all(&*tx)
 311                .await?)
 312        })
 313        .await
 314    }
 315
 316    pub async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>> {
 317        self.transaction(|tx| async move {
 318            Ok(user::Entity::find()
 319                .filter(user::Column::GithubLogin.eq(github_login))
 320                .one(&*tx)
 321                .await?)
 322        })
 323        .await
 324    }
 325
 326    pub async fn get_or_create_user_by_github_account(
 327        &self,
 328        github_login: &str,
 329        github_user_id: Option<i32>,
 330        github_email: Option<&str>,
 331    ) -> Result<Option<User>> {
 332        self.transaction(|tx| async move {
 333            let tx = &*tx;
 334            if let Some(github_user_id) = github_user_id {
 335                if let Some(user_by_github_user_id) = user::Entity::find()
 336                    .filter(user::Column::GithubUserId.eq(github_user_id))
 337                    .one(tx)
 338                    .await?
 339                {
 340                    let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
 341                    user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
 342                    Ok(Some(user_by_github_user_id.update(tx).await?))
 343                } else if let Some(user_by_github_login) = user::Entity::find()
 344                    .filter(user::Column::GithubLogin.eq(github_login))
 345                    .one(tx)
 346                    .await?
 347                {
 348                    let mut user_by_github_login = user_by_github_login.into_active_model();
 349                    user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
 350                    Ok(Some(user_by_github_login.update(tx).await?))
 351                } else {
 352                    let user = user::Entity::insert(user::ActiveModel {
 353                        email_address: ActiveValue::set(github_email.map(|email| email.into())),
 354                        github_login: ActiveValue::set(github_login.into()),
 355                        github_user_id: ActiveValue::set(Some(github_user_id)),
 356                        admin: ActiveValue::set(false),
 357                        invite_count: ActiveValue::set(0),
 358                        invite_code: ActiveValue::set(None),
 359                        metrics_id: ActiveValue::set(Uuid::new_v4()),
 360                        ..Default::default()
 361                    })
 362                    .exec_with_returning(&*tx)
 363                    .await?;
 364                    Ok(Some(user))
 365                }
 366            } else {
 367                Ok(user::Entity::find()
 368                    .filter(user::Column::GithubLogin.eq(github_login))
 369                    .one(tx)
 370                    .await?)
 371            }
 372        })
 373        .await
 374    }
 375
 376    pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
 377        self.transaction(|tx| async move {
 378            Ok(user::Entity::find()
 379                .order_by_asc(user::Column::GithubLogin)
 380                .limit(limit as u64)
 381                .offset(page as u64 * limit as u64)
 382                .all(&*tx)
 383                .await?)
 384        })
 385        .await
 386    }
 387
 388    pub async fn get_users_with_no_invites(
 389        &self,
 390        invited_by_another_user: bool,
 391    ) -> Result<Vec<User>> {
 392        self.transaction(|tx| async move {
 393            Ok(user::Entity::find()
 394                .filter(
 395                    user::Column::InviteCount
 396                        .eq(0)
 397                        .and(if invited_by_another_user {
 398                            user::Column::InviterId.is_not_null()
 399                        } else {
 400                            user::Column::InviterId.is_null()
 401                        }),
 402                )
 403                .all(&*tx)
 404                .await?)
 405        })
 406        .await
 407    }
 408
 409    pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
 410        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 411        enum QueryAs {
 412            MetricsId,
 413        }
 414
 415        self.transaction(|tx| async move {
 416            let metrics_id: Uuid = user::Entity::find_by_id(id)
 417                .select_only()
 418                .column(user::Column::MetricsId)
 419                .into_values::<_, QueryAs>()
 420                .one(&*tx)
 421                .await?
 422                .ok_or_else(|| anyhow!("could not find user"))?;
 423            Ok(metrics_id.to_string())
 424        })
 425        .await
 426    }
 427
 428    pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
 429        self.transaction(|tx| async move {
 430            user::Entity::update_many()
 431                .filter(user::Column::Id.eq(id))
 432                .set(user::ActiveModel {
 433                    admin: ActiveValue::set(is_admin),
 434                    ..Default::default()
 435                })
 436                .exec(&*tx)
 437                .await?;
 438            Ok(())
 439        })
 440        .await
 441    }
 442
 443    pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
 444        self.transaction(|tx| async move {
 445            user::Entity::update_many()
 446                .filter(user::Column::Id.eq(id))
 447                .set(user::ActiveModel {
 448                    connected_once: ActiveValue::set(connected_once),
 449                    ..Default::default()
 450                })
 451                .exec(&*tx)
 452                .await?;
 453            Ok(())
 454        })
 455        .await
 456    }
 457
 458    pub async fn destroy_user(&self, id: UserId) -> Result<()> {
 459        self.transaction(|tx| async move {
 460            access_token::Entity::delete_many()
 461                .filter(access_token::Column::UserId.eq(id))
 462                .exec(&*tx)
 463                .await?;
 464            user::Entity::delete_by_id(id).exec(&*tx).await?;
 465            Ok(())
 466        })
 467        .await
 468    }
 469
 470    // contacts
 471
 472    pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
 473        #[derive(Debug, FromQueryResult)]
 474        struct ContactWithUserBusyStatuses {
 475            user_id_a: UserId,
 476            user_id_b: UserId,
 477            a_to_b: bool,
 478            accepted: bool,
 479            should_notify: bool,
 480            user_a_busy: bool,
 481            user_b_busy: bool,
 482        }
 483
 484        self.transaction(|tx| async move {
 485            let user_a_participant = Alias::new("user_a_participant");
 486            let user_b_participant = Alias::new("user_b_participant");
 487            let mut db_contacts = contact::Entity::find()
 488                .column_as(
 489                    Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
 490                        .is_not_null(),
 491                    "user_a_busy",
 492                )
 493                .column_as(
 494                    Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
 495                        .is_not_null(),
 496                    "user_b_busy",
 497                )
 498                .filter(
 499                    contact::Column::UserIdA
 500                        .eq(user_id)
 501                        .or(contact::Column::UserIdB.eq(user_id)),
 502                )
 503                .join_as(
 504                    JoinType::LeftJoin,
 505                    contact::Relation::UserARoomParticipant.def(),
 506                    user_a_participant,
 507                )
 508                .join_as(
 509                    JoinType::LeftJoin,
 510                    contact::Relation::UserBRoomParticipant.def(),
 511                    user_b_participant,
 512                )
 513                .into_model::<ContactWithUserBusyStatuses>()
 514                .stream(&*tx)
 515                .await?;
 516
 517            let mut contacts = Vec::new();
 518            while let Some(db_contact) = db_contacts.next().await {
 519                let db_contact = db_contact?;
 520                if db_contact.user_id_a == user_id {
 521                    if db_contact.accepted {
 522                        contacts.push(Contact::Accepted {
 523                            user_id: db_contact.user_id_b,
 524                            should_notify: db_contact.should_notify && db_contact.a_to_b,
 525                            busy: db_contact.user_b_busy,
 526                        });
 527                    } else if db_contact.a_to_b {
 528                        contacts.push(Contact::Outgoing {
 529                            user_id: db_contact.user_id_b,
 530                        })
 531                    } else {
 532                        contacts.push(Contact::Incoming {
 533                            user_id: db_contact.user_id_b,
 534                            should_notify: db_contact.should_notify,
 535                        });
 536                    }
 537                } else if db_contact.accepted {
 538                    contacts.push(Contact::Accepted {
 539                        user_id: db_contact.user_id_a,
 540                        should_notify: db_contact.should_notify && !db_contact.a_to_b,
 541                        busy: db_contact.user_a_busy,
 542                    });
 543                } else if db_contact.a_to_b {
 544                    contacts.push(Contact::Incoming {
 545                        user_id: db_contact.user_id_a,
 546                        should_notify: db_contact.should_notify,
 547                    });
 548                } else {
 549                    contacts.push(Contact::Outgoing {
 550                        user_id: db_contact.user_id_a,
 551                    });
 552                }
 553            }
 554
 555            contacts.sort_unstable_by_key(|contact| contact.user_id());
 556
 557            Ok(contacts)
 558        })
 559        .await
 560    }
 561
 562    pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
 563        self.transaction(|tx| async move {
 564            let participant = room_participant::Entity::find()
 565                .filter(room_participant::Column::UserId.eq(user_id))
 566                .one(&*tx)
 567                .await?;
 568            Ok(participant.is_some())
 569        })
 570        .await
 571    }
 572
 573    pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
 574        self.transaction(|tx| async move {
 575            let (id_a, id_b) = if user_id_1 < user_id_2 {
 576                (user_id_1, user_id_2)
 577            } else {
 578                (user_id_2, user_id_1)
 579            };
 580
 581            Ok(contact::Entity::find()
 582                .filter(
 583                    contact::Column::UserIdA
 584                        .eq(id_a)
 585                        .and(contact::Column::UserIdB.eq(id_b))
 586                        .and(contact::Column::Accepted.eq(true)),
 587                )
 588                .one(&*tx)
 589                .await?
 590                .is_some())
 591        })
 592        .await
 593    }
 594
 595    pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
 596        self.transaction(|tx| async move {
 597            let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
 598                (sender_id, receiver_id, true)
 599            } else {
 600                (receiver_id, sender_id, false)
 601            };
 602
 603            let rows_affected = contact::Entity::insert(contact::ActiveModel {
 604                user_id_a: ActiveValue::set(id_a),
 605                user_id_b: ActiveValue::set(id_b),
 606                a_to_b: ActiveValue::set(a_to_b),
 607                accepted: ActiveValue::set(false),
 608                should_notify: ActiveValue::set(true),
 609                ..Default::default()
 610            })
 611            .on_conflict(
 612                OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
 613                    .values([
 614                        (contact::Column::Accepted, true.into()),
 615                        (contact::Column::ShouldNotify, false.into()),
 616                    ])
 617                    .action_and_where(
 618                        contact::Column::Accepted.eq(false).and(
 619                            contact::Column::AToB
 620                                .eq(a_to_b)
 621                                .and(contact::Column::UserIdA.eq(id_b))
 622                                .or(contact::Column::AToB
 623                                    .ne(a_to_b)
 624                                    .and(contact::Column::UserIdA.eq(id_a))),
 625                        ),
 626                    )
 627                    .to_owned(),
 628            )
 629            .exec_without_returning(&*tx)
 630            .await?;
 631
 632            if rows_affected == 1 {
 633                Ok(())
 634            } else {
 635                Err(anyhow!("contact already requested"))?
 636            }
 637        })
 638        .await
 639    }
 640
 641    /// Returns a bool indicating whether the removed contact had originally accepted or not
 642    ///
 643    /// Deletes the contact identified by the requester and responder ids, and then returns
 644    /// whether the deleted contact had originally accepted or was a pending contact request.
 645    ///
 646    /// # Arguments
 647    ///
 648    /// * `requester_id` - The user that initiates this request
 649    /// * `responder_id` - The user that will be removed
 650    pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<bool> {
 651        self.transaction(|tx| async move {
 652            let (id_a, id_b) = if responder_id < requester_id {
 653                (responder_id, requester_id)
 654            } else {
 655                (requester_id, responder_id)
 656            };
 657
 658            let contact = contact::Entity::find()
 659                .filter(
 660                    contact::Column::UserIdA
 661                        .eq(id_a)
 662                        .and(contact::Column::UserIdB.eq(id_b)),
 663                )
 664                .one(&*tx)
 665                .await?
 666                .ok_or_else(|| anyhow!("no such contact"))?;
 667
 668            contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
 669            Ok(contact.accepted)
 670        })
 671        .await
 672    }
 673
 674    pub async fn dismiss_contact_notification(
 675        &self,
 676        user_id: UserId,
 677        contact_user_id: UserId,
 678    ) -> Result<()> {
 679        self.transaction(|tx| async move {
 680            let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
 681                (user_id, contact_user_id, true)
 682            } else {
 683                (contact_user_id, user_id, false)
 684            };
 685
 686            let result = contact::Entity::update_many()
 687                .set(contact::ActiveModel {
 688                    should_notify: ActiveValue::set(false),
 689                    ..Default::default()
 690                })
 691                .filter(
 692                    contact::Column::UserIdA
 693                        .eq(id_a)
 694                        .and(contact::Column::UserIdB.eq(id_b))
 695                        .and(
 696                            contact::Column::AToB
 697                                .eq(a_to_b)
 698                                .and(contact::Column::Accepted.eq(true))
 699                                .or(contact::Column::AToB
 700                                    .ne(a_to_b)
 701                                    .and(contact::Column::Accepted.eq(false))),
 702                        ),
 703                )
 704                .exec(&*tx)
 705                .await?;
 706            if result.rows_affected == 0 {
 707                Err(anyhow!("no such contact request"))?
 708            } else {
 709                Ok(())
 710            }
 711        })
 712        .await
 713    }
 714
 715    pub async fn respond_to_contact_request(
 716        &self,
 717        responder_id: UserId,
 718        requester_id: UserId,
 719        accept: bool,
 720    ) -> Result<()> {
 721        self.transaction(|tx| async move {
 722            let (id_a, id_b, a_to_b) = if responder_id < requester_id {
 723                (responder_id, requester_id, false)
 724            } else {
 725                (requester_id, responder_id, true)
 726            };
 727            let rows_affected = if accept {
 728                let result = contact::Entity::update_many()
 729                    .set(contact::ActiveModel {
 730                        accepted: ActiveValue::set(true),
 731                        should_notify: ActiveValue::set(true),
 732                        ..Default::default()
 733                    })
 734                    .filter(
 735                        contact::Column::UserIdA
 736                            .eq(id_a)
 737                            .and(contact::Column::UserIdB.eq(id_b))
 738                            .and(contact::Column::AToB.eq(a_to_b)),
 739                    )
 740                    .exec(&*tx)
 741                    .await?;
 742                result.rows_affected
 743            } else {
 744                let result = contact::Entity::delete_many()
 745                    .filter(
 746                        contact::Column::UserIdA
 747                            .eq(id_a)
 748                            .and(contact::Column::UserIdB.eq(id_b))
 749                            .and(contact::Column::AToB.eq(a_to_b))
 750                            .and(contact::Column::Accepted.eq(false)),
 751                    )
 752                    .exec(&*tx)
 753                    .await?;
 754
 755                result.rows_affected
 756            };
 757
 758            if rows_affected == 1 {
 759                Ok(())
 760            } else {
 761                Err(anyhow!("no such contact request"))?
 762            }
 763        })
 764        .await
 765    }
 766
 767    pub fn fuzzy_like_string(string: &str) -> String {
 768        let mut result = String::with_capacity(string.len() * 2 + 1);
 769        for c in string.chars() {
 770            if c.is_alphanumeric() {
 771                result.push('%');
 772                result.push(c);
 773            }
 774        }
 775        result.push('%');
 776        result
 777    }
 778
 779    pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
 780        self.transaction(|tx| async {
 781            let tx = tx;
 782            let like_string = Self::fuzzy_like_string(name_query);
 783            let query = "
 784                SELECT users.*
 785                FROM users
 786                WHERE github_login ILIKE $1
 787                ORDER BY github_login <-> $2
 788                LIMIT $3
 789            ";
 790
 791            Ok(user::Entity::find()
 792                .from_raw_sql(Statement::from_sql_and_values(
 793                    self.pool.get_database_backend(),
 794                    query.into(),
 795                    vec![like_string.into(), name_query.into(), limit.into()],
 796                ))
 797                .all(&*tx)
 798                .await?)
 799        })
 800        .await
 801    }
 802
 803    // signups
 804
 805    pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
 806        self.transaction(|tx| async move {
 807            signup::Entity::insert(signup::ActiveModel {
 808                email_address: ActiveValue::set(signup.email_address.clone()),
 809                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 810                email_confirmation_sent: ActiveValue::set(false),
 811                platform_mac: ActiveValue::set(signup.platform_mac),
 812                platform_windows: ActiveValue::set(signup.platform_windows),
 813                platform_linux: ActiveValue::set(signup.platform_linux),
 814                platform_unknown: ActiveValue::set(false),
 815                editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
 816                programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
 817                device_id: ActiveValue::set(signup.device_id.clone()),
 818                added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
 819                ..Default::default()
 820            })
 821            .on_conflict(
 822                OnConflict::column(signup::Column::EmailAddress)
 823                    .update_columns([
 824                        signup::Column::PlatformMac,
 825                        signup::Column::PlatformWindows,
 826                        signup::Column::PlatformLinux,
 827                        signup::Column::EditorFeatures,
 828                        signup::Column::ProgrammingLanguages,
 829                        signup::Column::DeviceId,
 830                        signup::Column::AddedToMailingList,
 831                    ])
 832                    .to_owned(),
 833            )
 834            .exec(&*tx)
 835            .await?;
 836            Ok(())
 837        })
 838        .await
 839    }
 840
 841    pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
 842        self.transaction(|tx| async move {
 843            let signup = signup::Entity::find()
 844                .filter(signup::Column::EmailAddress.eq(email_address))
 845                .one(&*tx)
 846                .await?
 847                .ok_or_else(|| {
 848                    anyhow!("signup with email address {} doesn't exist", email_address)
 849                })?;
 850
 851            Ok(signup)
 852        })
 853        .await
 854    }
 855
 856    pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
 857        self.transaction(|tx| async move {
 858            let query = "
 859                SELECT
 860                    COUNT(*) as count,
 861                    COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
 862                    COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
 863                    COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
 864                    COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
 865                FROM (
 866                    SELECT *
 867                    FROM signups
 868                    WHERE
 869                        NOT email_confirmation_sent
 870                ) AS unsent
 871            ";
 872            Ok(
 873                WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
 874                    self.pool.get_database_backend(),
 875                    query.into(),
 876                    vec![],
 877                ))
 878                .one(&*tx)
 879                .await?
 880                .ok_or_else(|| anyhow!("invalid result"))?,
 881            )
 882        })
 883        .await
 884    }
 885
 886    pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
 887        let emails = invites
 888            .iter()
 889            .map(|s| s.email_address.as_str())
 890            .collect::<Vec<_>>();
 891        self.transaction(|tx| async {
 892            let tx = tx;
 893            signup::Entity::update_many()
 894                .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
 895                .set(signup::ActiveModel {
 896                    email_confirmation_sent: ActiveValue::set(true),
 897                    ..Default::default()
 898                })
 899                .exec(&*tx)
 900                .await?;
 901            Ok(())
 902        })
 903        .await
 904    }
 905
 906    pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
 907        self.transaction(|tx| async move {
 908            Ok(signup::Entity::find()
 909                .select_only()
 910                .column(signup::Column::EmailAddress)
 911                .column(signup::Column::EmailConfirmationCode)
 912                .filter(
 913                    signup::Column::EmailConfirmationSent.eq(false).and(
 914                        signup::Column::PlatformMac
 915                            .eq(true)
 916                            .or(signup::Column::PlatformUnknown.eq(true)),
 917                    ),
 918                )
 919                .order_by_asc(signup::Column::CreatedAt)
 920                .limit(count as u64)
 921                .into_model()
 922                .all(&*tx)
 923                .await?)
 924        })
 925        .await
 926    }
 927
 928    // invite codes
 929
 930    pub async fn create_invite_from_code(
 931        &self,
 932        code: &str,
 933        email_address: &str,
 934        device_id: Option<&str>,
 935        added_to_mailing_list: bool,
 936    ) -> Result<Invite> {
 937        self.transaction(|tx| async move {
 938            let existing_user = user::Entity::find()
 939                .filter(user::Column::EmailAddress.eq(email_address))
 940                .one(&*tx)
 941                .await?;
 942
 943            if existing_user.is_some() {
 944                Err(anyhow!("email address is already in use"))?;
 945            }
 946
 947            let inviting_user_with_invites = match user::Entity::find()
 948                .filter(
 949                    user::Column::InviteCode
 950                        .eq(code)
 951                        .and(user::Column::InviteCount.gt(0)),
 952                )
 953                .one(&*tx)
 954                .await?
 955            {
 956                Some(inviting_user) => inviting_user,
 957                None => {
 958                    return Err(Error::Http(
 959                        StatusCode::UNAUTHORIZED,
 960                        "unable to find an invite code with invites remaining".to_string(),
 961                    ))?
 962                }
 963            };
 964            user::Entity::update_many()
 965                .filter(
 966                    user::Column::Id
 967                        .eq(inviting_user_with_invites.id)
 968                        .and(user::Column::InviteCount.gt(0)),
 969                )
 970                .col_expr(
 971                    user::Column::InviteCount,
 972                    Expr::col(user::Column::InviteCount).sub(1),
 973                )
 974                .exec(&*tx)
 975                .await?;
 976
 977            let signup = signup::Entity::insert(signup::ActiveModel {
 978                email_address: ActiveValue::set(email_address.into()),
 979                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 980                email_confirmation_sent: ActiveValue::set(false),
 981                inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
 982                platform_linux: ActiveValue::set(false),
 983                platform_mac: ActiveValue::set(false),
 984                platform_windows: ActiveValue::set(false),
 985                platform_unknown: ActiveValue::set(true),
 986                device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
 987                added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
 988                ..Default::default()
 989            })
 990            .on_conflict(
 991                OnConflict::column(signup::Column::EmailAddress)
 992                    .update_column(signup::Column::InvitingUserId)
 993                    .to_owned(),
 994            )
 995            .exec_with_returning(&*tx)
 996            .await?;
 997
 998            Ok(Invite {
 999                email_address: signup.email_address,
1000                email_confirmation_code: signup.email_confirmation_code,
1001            })
1002        })
1003        .await
1004    }
1005
1006    pub async fn create_user_from_invite(
1007        &self,
1008        invite: &Invite,
1009        user: NewUserParams,
1010    ) -> Result<Option<NewUserResult>> {
1011        self.transaction(|tx| async {
1012            let tx = tx;
1013            let signup = signup::Entity::find()
1014                .filter(
1015                    signup::Column::EmailAddress
1016                        .eq(invite.email_address.as_str())
1017                        .and(
1018                            signup::Column::EmailConfirmationCode
1019                                .eq(invite.email_confirmation_code.as_str()),
1020                        ),
1021                )
1022                .one(&*tx)
1023                .await?
1024                .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
1025
1026            if signup.user_id.is_some() {
1027                return Ok(None);
1028            }
1029
1030            let user = user::Entity::insert(user::ActiveModel {
1031                email_address: ActiveValue::set(Some(invite.email_address.clone())),
1032                github_login: ActiveValue::set(user.github_login.clone()),
1033                github_user_id: ActiveValue::set(Some(user.github_user_id)),
1034                admin: ActiveValue::set(false),
1035                invite_count: ActiveValue::set(user.invite_count),
1036                invite_code: ActiveValue::set(Some(random_invite_code())),
1037                metrics_id: ActiveValue::set(Uuid::new_v4()),
1038                ..Default::default()
1039            })
1040            .on_conflict(
1041                OnConflict::column(user::Column::GithubLogin)
1042                    .update_columns([
1043                        user::Column::EmailAddress,
1044                        user::Column::GithubUserId,
1045                        user::Column::Admin,
1046                    ])
1047                    .to_owned(),
1048            )
1049            .exec_with_returning(&*tx)
1050            .await?;
1051
1052            let mut signup = signup.into_active_model();
1053            signup.user_id = ActiveValue::set(Some(user.id));
1054            let signup = signup.update(&*tx).await?;
1055
1056            if let Some(inviting_user_id) = signup.inviting_user_id {
1057                let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1058                    (inviting_user_id, user.id, true)
1059                } else {
1060                    (user.id, inviting_user_id, false)
1061                };
1062
1063                contact::Entity::insert(contact::ActiveModel {
1064                    user_id_a: ActiveValue::set(user_id_a),
1065                    user_id_b: ActiveValue::set(user_id_b),
1066                    a_to_b: ActiveValue::set(a_to_b),
1067                    should_notify: ActiveValue::set(true),
1068                    accepted: ActiveValue::set(true),
1069                    ..Default::default()
1070                })
1071                .on_conflict(OnConflict::new().do_nothing().to_owned())
1072                .exec_without_returning(&*tx)
1073                .await?;
1074            }
1075
1076            Ok(Some(NewUserResult {
1077                user_id: user.id,
1078                metrics_id: user.metrics_id.to_string(),
1079                inviting_user_id: signup.inviting_user_id,
1080                signup_device_id: signup.device_id,
1081            }))
1082        })
1083        .await
1084    }
1085
1086    pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1087        self.transaction(|tx| async move {
1088            if count > 0 {
1089                user::Entity::update_many()
1090                    .filter(
1091                        user::Column::Id
1092                            .eq(id)
1093                            .and(user::Column::InviteCode.is_null()),
1094                    )
1095                    .set(user::ActiveModel {
1096                        invite_code: ActiveValue::set(Some(random_invite_code())),
1097                        ..Default::default()
1098                    })
1099                    .exec(&*tx)
1100                    .await?;
1101            }
1102
1103            user::Entity::update_many()
1104                .filter(user::Column::Id.eq(id))
1105                .set(user::ActiveModel {
1106                    invite_count: ActiveValue::set(count),
1107                    ..Default::default()
1108                })
1109                .exec(&*tx)
1110                .await?;
1111            Ok(())
1112        })
1113        .await
1114    }
1115
1116    pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1117        self.transaction(|tx| async move {
1118            match user::Entity::find_by_id(id).one(&*tx).await? {
1119                Some(user) if user.invite_code.is_some() => {
1120                    Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1121                }
1122                _ => Ok(None),
1123            }
1124        })
1125        .await
1126    }
1127
1128    pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1129        self.transaction(|tx| async move {
1130            user::Entity::find()
1131                .filter(user::Column::InviteCode.eq(code))
1132                .one(&*tx)
1133                .await?
1134                .ok_or_else(|| {
1135                    Error::Http(
1136                        StatusCode::NOT_FOUND,
1137                        "that invite code does not exist".to_string(),
1138                    )
1139                })
1140        })
1141        .await
1142    }
1143
1144    // rooms
1145
1146    pub async fn incoming_call_for_user(
1147        &self,
1148        user_id: UserId,
1149    ) -> Result<Option<proto::IncomingCall>> {
1150        self.transaction(|tx| async move {
1151            let pending_participant = room_participant::Entity::find()
1152                .filter(
1153                    room_participant::Column::UserId
1154                        .eq(user_id)
1155                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1156                )
1157                .one(&*tx)
1158                .await?;
1159
1160            if let Some(pending_participant) = pending_participant {
1161                let room = self.get_room(pending_participant.room_id, &tx).await?;
1162                Ok(Self::build_incoming_call(&room, user_id))
1163            } else {
1164                Ok(None)
1165            }
1166        })
1167        .await
1168    }
1169
1170    pub async fn create_room(
1171        &self,
1172        user_id: UserId,
1173        connection: ConnectionId,
1174        live_kit_room: &str,
1175    ) -> Result<proto::Room> {
1176        self.transaction(|tx| async move {
1177            let room = room::ActiveModel {
1178                live_kit_room: ActiveValue::set(live_kit_room.into()),
1179                ..Default::default()
1180            }
1181            .insert(&*tx)
1182            .await?;
1183            room_participant::ActiveModel {
1184                room_id: ActiveValue::set(room.id),
1185                user_id: ActiveValue::set(user_id),
1186                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1187                answering_connection_server_id: ActiveValue::set(Some(ServerId(
1188                    connection.owner_id as i32,
1189                ))),
1190                answering_connection_lost: ActiveValue::set(false),
1191                calling_user_id: ActiveValue::set(user_id),
1192                calling_connection_id: ActiveValue::set(connection.id as i32),
1193                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1194                    connection.owner_id as i32,
1195                ))),
1196                ..Default::default()
1197            }
1198            .insert(&*tx)
1199            .await?;
1200
1201            let room = self.get_room(room.id, &tx).await?;
1202            Ok(room)
1203        })
1204        .await
1205    }
1206
1207    pub async fn call(
1208        &self,
1209        room_id: RoomId,
1210        calling_user_id: UserId,
1211        calling_connection: ConnectionId,
1212        called_user_id: UserId,
1213        initial_project_id: Option<ProjectId>,
1214    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1215        self.room_transaction(room_id, |tx| async move {
1216            room_participant::ActiveModel {
1217                room_id: ActiveValue::set(room_id),
1218                user_id: ActiveValue::set(called_user_id),
1219                answering_connection_lost: ActiveValue::set(false),
1220                calling_user_id: ActiveValue::set(calling_user_id),
1221                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1222                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1223                    calling_connection.owner_id as i32,
1224                ))),
1225                initial_project_id: ActiveValue::set(initial_project_id),
1226                ..Default::default()
1227            }
1228            .insert(&*tx)
1229            .await?;
1230
1231            let room = self.get_room(room_id, &tx).await?;
1232            let incoming_call = Self::build_incoming_call(&room, called_user_id)
1233                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1234            Ok((room, incoming_call))
1235        })
1236        .await
1237    }
1238
1239    pub async fn call_failed(
1240        &self,
1241        room_id: RoomId,
1242        called_user_id: UserId,
1243    ) -> Result<RoomGuard<proto::Room>> {
1244        self.room_transaction(room_id, |tx| async move {
1245            room_participant::Entity::delete_many()
1246                .filter(
1247                    room_participant::Column::RoomId
1248                        .eq(room_id)
1249                        .and(room_participant::Column::UserId.eq(called_user_id)),
1250                )
1251                .exec(&*tx)
1252                .await?;
1253            let room = self.get_room(room_id, &tx).await?;
1254            Ok(room)
1255        })
1256        .await
1257    }
1258
1259    pub async fn decline_call(
1260        &self,
1261        expected_room_id: Option<RoomId>,
1262        user_id: UserId,
1263    ) -> Result<Option<RoomGuard<proto::Room>>> {
1264        self.optional_room_transaction(|tx| async move {
1265            let mut filter = Condition::all()
1266                .add(room_participant::Column::UserId.eq(user_id))
1267                .add(room_participant::Column::AnsweringConnectionId.is_null());
1268            if let Some(room_id) = expected_room_id {
1269                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1270            }
1271            let participant = room_participant::Entity::find()
1272                .filter(filter)
1273                .one(&*tx)
1274                .await?;
1275
1276            let participant = if let Some(participant) = participant {
1277                participant
1278            } else if expected_room_id.is_some() {
1279                return Err(anyhow!("could not find call to decline"))?;
1280            } else {
1281                return Ok(None);
1282            };
1283
1284            let room_id = participant.room_id;
1285            room_participant::Entity::delete(participant.into_active_model())
1286                .exec(&*tx)
1287                .await?;
1288
1289            let room = self.get_room(room_id, &tx).await?;
1290            Ok(Some((room_id, room)))
1291        })
1292        .await
1293    }
1294
1295    pub async fn cancel_call(
1296        &self,
1297        room_id: RoomId,
1298        calling_connection: ConnectionId,
1299        called_user_id: UserId,
1300    ) -> Result<RoomGuard<proto::Room>> {
1301        self.room_transaction(room_id, |tx| async move {
1302            let participant = room_participant::Entity::find()
1303                .filter(
1304                    Condition::all()
1305                        .add(room_participant::Column::UserId.eq(called_user_id))
1306                        .add(room_participant::Column::RoomId.eq(room_id))
1307                        .add(
1308                            room_participant::Column::CallingConnectionId
1309                                .eq(calling_connection.id as i32),
1310                        )
1311                        .add(
1312                            room_participant::Column::CallingConnectionServerId
1313                                .eq(calling_connection.owner_id as i32),
1314                        )
1315                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1316                )
1317                .one(&*tx)
1318                .await?
1319                .ok_or_else(|| anyhow!("no call to cancel"))?;
1320
1321            room_participant::Entity::delete(participant.into_active_model())
1322                .exec(&*tx)
1323                .await?;
1324
1325            let room = self.get_room(room_id, &tx).await?;
1326            Ok(room)
1327        })
1328        .await
1329    }
1330
1331    pub async fn join_room(
1332        &self,
1333        room_id: RoomId,
1334        user_id: UserId,
1335        connection: ConnectionId,
1336    ) -> Result<RoomGuard<proto::Room>> {
1337        self.room_transaction(room_id, |tx| async move {
1338            let result = room_participant::Entity::update_many()
1339                .filter(
1340                    Condition::all()
1341                        .add(room_participant::Column::RoomId.eq(room_id))
1342                        .add(room_participant::Column::UserId.eq(user_id))
1343                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1344                )
1345                .set(room_participant::ActiveModel {
1346                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1347                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1348                        connection.owner_id as i32,
1349                    ))),
1350                    answering_connection_lost: ActiveValue::set(false),
1351                    ..Default::default()
1352                })
1353                .exec(&*tx)
1354                .await?;
1355            if result.rows_affected == 0 {
1356                Err(anyhow!("room does not exist or was already joined"))?
1357            } else {
1358                let room = self.get_room(room_id, &tx).await?;
1359                Ok(room)
1360            }
1361        })
1362        .await
1363    }
1364
1365    pub async fn rejoin_room(
1366        &self,
1367        rejoin_room: proto::RejoinRoom,
1368        user_id: UserId,
1369        connection: ConnectionId,
1370    ) -> Result<RoomGuard<RejoinedRoom>> {
1371        let room_id = RoomId::from_proto(rejoin_room.id);
1372        self.room_transaction(room_id, |tx| async {
1373            let tx = tx;
1374            let participant_update = room_participant::Entity::update_many()
1375                .filter(
1376                    Condition::all()
1377                        .add(room_participant::Column::RoomId.eq(room_id))
1378                        .add(room_participant::Column::UserId.eq(user_id))
1379                        .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1380                        .add(
1381                            Condition::any()
1382                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1383                                .add(
1384                                    room_participant::Column::AnsweringConnectionServerId
1385                                        .ne(connection.owner_id as i32),
1386                                ),
1387                        ),
1388                )
1389                .set(room_participant::ActiveModel {
1390                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1391                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1392                        connection.owner_id as i32,
1393                    ))),
1394                    answering_connection_lost: ActiveValue::set(false),
1395                    ..Default::default()
1396                })
1397                .exec(&*tx)
1398                .await?;
1399            if participant_update.rows_affected == 0 {
1400                return Err(anyhow!("room does not exist or was already joined"))?;
1401            }
1402
1403            let mut reshared_projects = Vec::new();
1404            for reshared_project in &rejoin_room.reshared_projects {
1405                let project_id = ProjectId::from_proto(reshared_project.project_id);
1406                let project = project::Entity::find_by_id(project_id)
1407                    .one(&*tx)
1408                    .await?
1409                    .ok_or_else(|| anyhow!("project does not exist"))?;
1410                if project.host_user_id != user_id {
1411                    return Err(anyhow!("no such project"))?;
1412                }
1413
1414                let mut collaborators = project
1415                    .find_related(project_collaborator::Entity)
1416                    .all(&*tx)
1417                    .await?;
1418                let host_ix = collaborators
1419                    .iter()
1420                    .position(|collaborator| {
1421                        collaborator.user_id == user_id && collaborator.is_host
1422                    })
1423                    .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1424                let host = collaborators.swap_remove(host_ix);
1425                let old_connection_id = host.connection();
1426
1427                project::Entity::update(project::ActiveModel {
1428                    host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1429                    host_connection_server_id: ActiveValue::set(Some(ServerId(
1430                        connection.owner_id as i32,
1431                    ))),
1432                    ..project.into_active_model()
1433                })
1434                .exec(&*tx)
1435                .await?;
1436                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1437                    connection_id: ActiveValue::set(connection.id as i32),
1438                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1439                    ..host.into_active_model()
1440                })
1441                .exec(&*tx)
1442                .await?;
1443
1444                self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1445                    .await?;
1446
1447                reshared_projects.push(ResharedProject {
1448                    id: project_id,
1449                    old_connection_id,
1450                    collaborators: collaborators
1451                        .iter()
1452                        .map(|collaborator| ProjectCollaborator {
1453                            connection_id: collaborator.connection(),
1454                            user_id: collaborator.user_id,
1455                            replica_id: collaborator.replica_id,
1456                            is_host: collaborator.is_host,
1457                        })
1458                        .collect(),
1459                    worktrees: reshared_project.worktrees.clone(),
1460                });
1461            }
1462
1463            project::Entity::delete_many()
1464                .filter(
1465                    Condition::all()
1466                        .add(project::Column::RoomId.eq(room_id))
1467                        .add(project::Column::HostUserId.eq(user_id))
1468                        .add(
1469                            project::Column::Id
1470                                .is_not_in(reshared_projects.iter().map(|project| project.id)),
1471                        ),
1472                )
1473                .exec(&*tx)
1474                .await?;
1475
1476            let mut rejoined_projects = Vec::new();
1477            for rejoined_project in &rejoin_room.rejoined_projects {
1478                let project_id = ProjectId::from_proto(rejoined_project.id);
1479                let Some(project) = project::Entity::find_by_id(project_id)
1480                    .one(&*tx)
1481                    .await? else { continue };
1482
1483                let mut worktrees = Vec::new();
1484                let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1485                for db_worktree in db_worktrees {
1486                    let mut worktree = RejoinedWorktree {
1487                        id: db_worktree.id as u64,
1488                        abs_path: db_worktree.abs_path,
1489                        root_name: db_worktree.root_name,
1490                        visible: db_worktree.visible,
1491                        updated_entries: Default::default(),
1492                        removed_entries: Default::default(),
1493                        updated_repositories: Default::default(),
1494                        removed_repositories: Default::default(),
1495                        diagnostic_summaries: Default::default(),
1496                        scan_id: db_worktree.scan_id as u64,
1497                        completed_scan_id: db_worktree.completed_scan_id as u64,
1498                    };
1499
1500                    let rejoined_worktree = rejoined_project
1501                        .worktrees
1502                        .iter()
1503                        .find(|worktree| worktree.id == db_worktree.id as u64);
1504
1505                    // File entries
1506                    {
1507                        let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1508                            worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1509                        } else {
1510                            worktree_entry::Column::IsDeleted.eq(false)
1511                        };
1512
1513                        let mut db_entries = worktree_entry::Entity::find()
1514                            .filter(
1515                                Condition::all()
1516                                    .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1517                                    .add(entry_filter),
1518                            )
1519                            .stream(&*tx)
1520                            .await?;
1521
1522                        while let Some(db_entry) = db_entries.next().await {
1523                            let db_entry = db_entry?;
1524                            if db_entry.is_deleted {
1525                                worktree.removed_entries.push(db_entry.id as u64);
1526                            } else {
1527                                worktree.updated_entries.push(proto::Entry {
1528                                    id: db_entry.id as u64,
1529                                    is_dir: db_entry.is_dir,
1530                                    path: db_entry.path,
1531                                    inode: db_entry.inode as u64,
1532                                    mtime: Some(proto::Timestamp {
1533                                        seconds: db_entry.mtime_seconds as u64,
1534                                        nanos: db_entry.mtime_nanos as u32,
1535                                    }),
1536                                    is_symlink: db_entry.is_symlink,
1537                                    is_ignored: db_entry.is_ignored,
1538                                });
1539                            }
1540                        }
1541                    }
1542
1543                    // Repository Entries
1544                    {
1545                        let repository_entry_filter =
1546                            if let Some(rejoined_worktree) = rejoined_worktree {
1547                                worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
1548                            } else {
1549                                worktree_repository::Column::IsDeleted.eq(false)
1550                            };
1551
1552                        let mut db_repositories = worktree_repository::Entity::find()
1553                            .filter(
1554                                Condition::all()
1555                                    .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
1556                                    .add(repository_entry_filter),
1557                            )
1558                            .stream(&*tx)
1559                            .await?;
1560
1561                        while let Some(db_repository) = db_repositories.next().await {
1562                            let db_repository = db_repository?;
1563                            if db_repository.is_deleted {
1564                                worktree
1565                                    .removed_repositories
1566                                    .push(db_repository.dot_git_entry_id as u64);
1567                            } else {
1568                                worktree.updated_repositories.push(proto::RepositoryEntry {
1569                                    dot_git_entry_id: db_repository.dot_git_entry_id as u64,
1570                                    scan_id: db_repository.scan_id as u64,
1571                                    work_directory: db_repository.work_directory_path,
1572                                    branch: db_repository.branch,
1573                                });
1574                            }
1575                        }
1576                    }
1577
1578                    worktrees.push(worktree);
1579                }
1580
1581                let language_servers = project
1582                    .find_related(language_server::Entity)
1583                    .all(&*tx)
1584                    .await?
1585                    .into_iter()
1586                    .map(|language_server| proto::LanguageServer {
1587                        id: language_server.id as u64,
1588                        name: language_server.name,
1589                    })
1590                    .collect::<Vec<_>>();
1591
1592                let mut collaborators = project
1593                    .find_related(project_collaborator::Entity)
1594                    .all(&*tx)
1595                    .await?;
1596                let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1597                    .iter()
1598                    .position(|collaborator| collaborator.user_id == user_id)
1599                {
1600                    collaborators.swap_remove(self_collaborator_ix)
1601                } else {
1602                    continue;
1603                };
1604                let old_connection_id = self_collaborator.connection();
1605                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1606                    connection_id: ActiveValue::set(connection.id as i32),
1607                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1608                    ..self_collaborator.into_active_model()
1609                })
1610                .exec(&*tx)
1611                .await?;
1612
1613                let collaborators = collaborators
1614                    .into_iter()
1615                    .map(|collaborator| ProjectCollaborator {
1616                        connection_id: collaborator.connection(),
1617                        user_id: collaborator.user_id,
1618                        replica_id: collaborator.replica_id,
1619                        is_host: collaborator.is_host,
1620                    })
1621                    .collect::<Vec<_>>();
1622
1623                rejoined_projects.push(RejoinedProject {
1624                    id: project_id,
1625                    old_connection_id,
1626                    collaborators,
1627                    worktrees,
1628                    language_servers,
1629                });
1630            }
1631
1632            let room = self.get_room(room_id, &tx).await?;
1633            Ok(RejoinedRoom {
1634                room,
1635                rejoined_projects,
1636                reshared_projects,
1637            })
1638        })
1639        .await
1640    }
1641
1642    pub async fn leave_room(
1643        &self,
1644        connection: ConnectionId,
1645    ) -> Result<Option<RoomGuard<LeftRoom>>> {
1646        self.optional_room_transaction(|tx| async move {
1647            let leaving_participant = room_participant::Entity::find()
1648                .filter(
1649                    Condition::all()
1650                        .add(
1651                            room_participant::Column::AnsweringConnectionId
1652                                .eq(connection.id as i32),
1653                        )
1654                        .add(
1655                            room_participant::Column::AnsweringConnectionServerId
1656                                .eq(connection.owner_id as i32),
1657                        ),
1658                )
1659                .one(&*tx)
1660                .await?;
1661
1662            if let Some(leaving_participant) = leaving_participant {
1663                // Leave room.
1664                let room_id = leaving_participant.room_id;
1665                room_participant::Entity::delete_by_id(leaving_participant.id)
1666                    .exec(&*tx)
1667                    .await?;
1668
1669                // Cancel pending calls initiated by the leaving user.
1670                let called_participants = room_participant::Entity::find()
1671                    .filter(
1672                        Condition::all()
1673                            .add(
1674                                room_participant::Column::CallingUserId
1675                                    .eq(leaving_participant.user_id),
1676                            )
1677                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
1678                    )
1679                    .all(&*tx)
1680                    .await?;
1681                room_participant::Entity::delete_many()
1682                    .filter(
1683                        room_participant::Column::Id
1684                            .is_in(called_participants.iter().map(|participant| participant.id)),
1685                    )
1686                    .exec(&*tx)
1687                    .await?;
1688                let canceled_calls_to_user_ids = called_participants
1689                    .into_iter()
1690                    .map(|participant| participant.user_id)
1691                    .collect();
1692
1693                // Detect left projects.
1694                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1695                enum QueryProjectIds {
1696                    ProjectId,
1697                }
1698                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1699                    .select_only()
1700                    .column_as(
1701                        project_collaborator::Column::ProjectId,
1702                        QueryProjectIds::ProjectId,
1703                    )
1704                    .filter(
1705                        Condition::all()
1706                            .add(
1707                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1708                            )
1709                            .add(
1710                                project_collaborator::Column::ConnectionServerId
1711                                    .eq(connection.owner_id as i32),
1712                            ),
1713                    )
1714                    .into_values::<_, QueryProjectIds>()
1715                    .all(&*tx)
1716                    .await?;
1717                let mut left_projects = HashMap::default();
1718                let mut collaborators = project_collaborator::Entity::find()
1719                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1720                    .stream(&*tx)
1721                    .await?;
1722                while let Some(collaborator) = collaborators.next().await {
1723                    let collaborator = collaborator?;
1724                    let left_project =
1725                        left_projects
1726                            .entry(collaborator.project_id)
1727                            .or_insert(LeftProject {
1728                                id: collaborator.project_id,
1729                                host_user_id: Default::default(),
1730                                connection_ids: Default::default(),
1731                                host_connection_id: Default::default(),
1732                            });
1733
1734                    let collaborator_connection_id = collaborator.connection();
1735                    if collaborator_connection_id != connection {
1736                        left_project.connection_ids.push(collaborator_connection_id);
1737                    }
1738
1739                    if collaborator.is_host {
1740                        left_project.host_user_id = collaborator.user_id;
1741                        left_project.host_connection_id = collaborator_connection_id;
1742                    }
1743                }
1744                drop(collaborators);
1745
1746                // Leave projects.
1747                project_collaborator::Entity::delete_many()
1748                    .filter(
1749                        Condition::all()
1750                            .add(
1751                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1752                            )
1753                            .add(
1754                                project_collaborator::Column::ConnectionServerId
1755                                    .eq(connection.owner_id as i32),
1756                            ),
1757                    )
1758                    .exec(&*tx)
1759                    .await?;
1760
1761                // Unshare projects.
1762                project::Entity::delete_many()
1763                    .filter(
1764                        Condition::all()
1765                            .add(project::Column::RoomId.eq(room_id))
1766                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
1767                            .add(
1768                                project::Column::HostConnectionServerId
1769                                    .eq(connection.owner_id as i32),
1770                            ),
1771                    )
1772                    .exec(&*tx)
1773                    .await?;
1774
1775                let room = self.get_room(room_id, &tx).await?;
1776                if room.participants.is_empty() {
1777                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
1778                }
1779
1780                let left_room = LeftRoom {
1781                    room,
1782                    left_projects,
1783                    canceled_calls_to_user_ids,
1784                };
1785
1786                if left_room.room.participants.is_empty() {
1787                    self.rooms.remove(&room_id);
1788                }
1789
1790                Ok(Some((room_id, left_room)))
1791            } else {
1792                Ok(None)
1793            }
1794        })
1795        .await
1796    }
1797
1798    pub async fn follow(
1799        &self,
1800        project_id: ProjectId,
1801        leader_connection: ConnectionId,
1802        follower_connection: ConnectionId,
1803    ) -> Result<RoomGuard<proto::Room>> {
1804        let room_id = self.room_id_for_project(project_id).await?;
1805        self.room_transaction(room_id, |tx| async move {
1806            follower::ActiveModel {
1807                room_id: ActiveValue::set(room_id),
1808                project_id: ActiveValue::set(project_id),
1809                leader_connection_server_id: ActiveValue::set(ServerId(
1810                    leader_connection.owner_id as i32,
1811                )),
1812                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1813                follower_connection_server_id: ActiveValue::set(ServerId(
1814                    follower_connection.owner_id as i32,
1815                )),
1816                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1817                ..Default::default()
1818            }
1819            .insert(&*tx)
1820            .await?;
1821
1822            let room = self.get_room(room_id, &*tx).await?;
1823            Ok(room)
1824        })
1825        .await
1826    }
1827
1828    pub async fn unfollow(
1829        &self,
1830        project_id: ProjectId,
1831        leader_connection: ConnectionId,
1832        follower_connection: ConnectionId,
1833    ) -> Result<RoomGuard<proto::Room>> {
1834        let room_id = self.room_id_for_project(project_id).await?;
1835        self.room_transaction(room_id, |tx| async move {
1836            follower::Entity::delete_many()
1837                .filter(
1838                    Condition::all()
1839                        .add(follower::Column::ProjectId.eq(project_id))
1840                        .add(
1841                            follower::Column::LeaderConnectionServerId
1842                                .eq(leader_connection.owner_id),
1843                        )
1844                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1845                        .add(
1846                            follower::Column::FollowerConnectionServerId
1847                                .eq(follower_connection.owner_id),
1848                        )
1849                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1850                )
1851                .exec(&*tx)
1852                .await?;
1853
1854            let room = self.get_room(room_id, &*tx).await?;
1855            Ok(room)
1856        })
1857        .await
1858    }
1859
1860    pub async fn update_room_participant_location(
1861        &self,
1862        room_id: RoomId,
1863        connection: ConnectionId,
1864        location: proto::ParticipantLocation,
1865    ) -> Result<RoomGuard<proto::Room>> {
1866        self.room_transaction(room_id, |tx| async {
1867            let tx = tx;
1868            let location_kind;
1869            let location_project_id;
1870            match location
1871                .variant
1872                .as_ref()
1873                .ok_or_else(|| anyhow!("invalid location"))?
1874            {
1875                proto::participant_location::Variant::SharedProject(project) => {
1876                    location_kind = 0;
1877                    location_project_id = Some(ProjectId::from_proto(project.id));
1878                }
1879                proto::participant_location::Variant::UnsharedProject(_) => {
1880                    location_kind = 1;
1881                    location_project_id = None;
1882                }
1883                proto::participant_location::Variant::External(_) => {
1884                    location_kind = 2;
1885                    location_project_id = None;
1886                }
1887            }
1888
1889            let result = room_participant::Entity::update_many()
1890                .filter(
1891                    Condition::all()
1892                        .add(room_participant::Column::RoomId.eq(room_id))
1893                        .add(
1894                            room_participant::Column::AnsweringConnectionId
1895                                .eq(connection.id as i32),
1896                        )
1897                        .add(
1898                            room_participant::Column::AnsweringConnectionServerId
1899                                .eq(connection.owner_id as i32),
1900                        ),
1901                )
1902                .set(room_participant::ActiveModel {
1903                    location_kind: ActiveValue::set(Some(location_kind)),
1904                    location_project_id: ActiveValue::set(location_project_id),
1905                    ..Default::default()
1906                })
1907                .exec(&*tx)
1908                .await?;
1909
1910            if result.rows_affected == 1 {
1911                let room = self.get_room(room_id, &tx).await?;
1912                Ok(room)
1913            } else {
1914                Err(anyhow!("could not update room participant location"))?
1915            }
1916        })
1917        .await
1918    }
1919
1920    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1921        self.transaction(|tx| async move {
1922            let participant = room_participant::Entity::find()
1923                .filter(
1924                    Condition::all()
1925                        .add(
1926                            room_participant::Column::AnsweringConnectionId
1927                                .eq(connection.id as i32),
1928                        )
1929                        .add(
1930                            room_participant::Column::AnsweringConnectionServerId
1931                                .eq(connection.owner_id as i32),
1932                        ),
1933                )
1934                .one(&*tx)
1935                .await?
1936                .ok_or_else(|| anyhow!("not a participant in any room"))?;
1937
1938            room_participant::Entity::update(room_participant::ActiveModel {
1939                answering_connection_lost: ActiveValue::set(true),
1940                ..participant.into_active_model()
1941            })
1942            .exec(&*tx)
1943            .await?;
1944
1945            Ok(())
1946        })
1947        .await
1948    }
1949
1950    fn build_incoming_call(
1951        room: &proto::Room,
1952        called_user_id: UserId,
1953    ) -> Option<proto::IncomingCall> {
1954        let pending_participant = room
1955            .pending_participants
1956            .iter()
1957            .find(|participant| participant.user_id == called_user_id.to_proto())?;
1958
1959        Some(proto::IncomingCall {
1960            room_id: room.id,
1961            calling_user_id: pending_participant.calling_user_id,
1962            participant_user_ids: room
1963                .participants
1964                .iter()
1965                .map(|participant| participant.user_id)
1966                .collect(),
1967            initial_project: room.participants.iter().find_map(|participant| {
1968                let initial_project_id = pending_participant.initial_project_id?;
1969                participant
1970                    .projects
1971                    .iter()
1972                    .find(|project| project.id == initial_project_id)
1973                    .cloned()
1974            }),
1975        })
1976    }
1977
1978    async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1979        let db_room = room::Entity::find_by_id(room_id)
1980            .one(tx)
1981            .await?
1982            .ok_or_else(|| anyhow!("could not find room"))?;
1983
1984        let mut db_participants = db_room
1985            .find_related(room_participant::Entity)
1986            .stream(tx)
1987            .await?;
1988        let mut participants = HashMap::default();
1989        let mut pending_participants = Vec::new();
1990        while let Some(db_participant) = db_participants.next().await {
1991            let db_participant = db_participant?;
1992            if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
1993                .answering_connection_id
1994                .zip(db_participant.answering_connection_server_id)
1995            {
1996                let location = match (
1997                    db_participant.location_kind,
1998                    db_participant.location_project_id,
1999                ) {
2000                    (Some(0), Some(project_id)) => {
2001                        Some(proto::participant_location::Variant::SharedProject(
2002                            proto::participant_location::SharedProject {
2003                                id: project_id.to_proto(),
2004                            },
2005                        ))
2006                    }
2007                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
2008                        Default::default(),
2009                    )),
2010                    _ => Some(proto::participant_location::Variant::External(
2011                        Default::default(),
2012                    )),
2013                };
2014
2015                let answering_connection = ConnectionId {
2016                    owner_id: answering_connection_server_id.0 as u32,
2017                    id: answering_connection_id as u32,
2018                };
2019                participants.insert(
2020                    answering_connection,
2021                    proto::Participant {
2022                        user_id: db_participant.user_id.to_proto(),
2023                        peer_id: Some(answering_connection.into()),
2024                        projects: Default::default(),
2025                        location: Some(proto::ParticipantLocation { variant: location }),
2026                    },
2027                );
2028            } else {
2029                pending_participants.push(proto::PendingParticipant {
2030                    user_id: db_participant.user_id.to_proto(),
2031                    calling_user_id: db_participant.calling_user_id.to_proto(),
2032                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
2033                });
2034            }
2035        }
2036        drop(db_participants);
2037
2038        let mut db_projects = db_room
2039            .find_related(project::Entity)
2040            .find_with_related(worktree::Entity)
2041            .stream(tx)
2042            .await?;
2043
2044        while let Some(row) = db_projects.next().await {
2045            let (db_project, db_worktree) = row?;
2046            let host_connection = db_project.host_connection()?;
2047            if let Some(participant) = participants.get_mut(&host_connection) {
2048                let project = if let Some(project) = participant
2049                    .projects
2050                    .iter_mut()
2051                    .find(|project| project.id == db_project.id.to_proto())
2052                {
2053                    project
2054                } else {
2055                    participant.projects.push(proto::ParticipantProject {
2056                        id: db_project.id.to_proto(),
2057                        worktree_root_names: Default::default(),
2058                    });
2059                    participant.projects.last_mut().unwrap()
2060                };
2061
2062                if let Some(db_worktree) = db_worktree {
2063                    if db_worktree.visible {
2064                        project.worktree_root_names.push(db_worktree.root_name);
2065                    }
2066                }
2067            }
2068        }
2069        drop(db_projects);
2070
2071        let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
2072        let mut followers = Vec::new();
2073        while let Some(db_follower) = db_followers.next().await {
2074            let db_follower = db_follower?;
2075            followers.push(proto::Follower {
2076                leader_id: Some(db_follower.leader_connection().into()),
2077                follower_id: Some(db_follower.follower_connection().into()),
2078                project_id: db_follower.project_id.to_proto(),
2079            });
2080        }
2081
2082        Ok(proto::Room {
2083            id: db_room.id.to_proto(),
2084            live_kit_room: db_room.live_kit_room,
2085            participants: participants.into_values().collect(),
2086            pending_participants,
2087            followers,
2088        })
2089    }
2090
2091    // projects
2092
2093    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
2094        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2095        enum QueryAs {
2096            Count,
2097        }
2098
2099        self.transaction(|tx| async move {
2100            Ok(project::Entity::find()
2101                .select_only()
2102                .column_as(project::Column::Id.count(), QueryAs::Count)
2103                .inner_join(user::Entity)
2104                .filter(user::Column::Admin.eq(false))
2105                .into_values::<_, QueryAs>()
2106                .one(&*tx)
2107                .await?
2108                .unwrap_or(0i64) as usize)
2109        })
2110        .await
2111    }
2112
2113    pub async fn share_project(
2114        &self,
2115        room_id: RoomId,
2116        connection: ConnectionId,
2117        worktrees: &[proto::WorktreeMetadata],
2118    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
2119        self.room_transaction(room_id, |tx| async move {
2120            let participant = room_participant::Entity::find()
2121                .filter(
2122                    Condition::all()
2123                        .add(
2124                            room_participant::Column::AnsweringConnectionId
2125                                .eq(connection.id as i32),
2126                        )
2127                        .add(
2128                            room_participant::Column::AnsweringConnectionServerId
2129                                .eq(connection.owner_id as i32),
2130                        ),
2131                )
2132                .one(&*tx)
2133                .await?
2134                .ok_or_else(|| anyhow!("could not find participant"))?;
2135            if participant.room_id != room_id {
2136                return Err(anyhow!("shared project on unexpected room"))?;
2137            }
2138
2139            let project = project::ActiveModel {
2140                room_id: ActiveValue::set(participant.room_id),
2141                host_user_id: ActiveValue::set(participant.user_id),
2142                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
2143                host_connection_server_id: ActiveValue::set(Some(ServerId(
2144                    connection.owner_id as i32,
2145                ))),
2146                ..Default::default()
2147            }
2148            .insert(&*tx)
2149            .await?;
2150
2151            if !worktrees.is_empty() {
2152                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2153                    worktree::ActiveModel {
2154                        id: ActiveValue::set(worktree.id as i64),
2155                        project_id: ActiveValue::set(project.id),
2156                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
2157                        root_name: ActiveValue::set(worktree.root_name.clone()),
2158                        visible: ActiveValue::set(worktree.visible),
2159                        scan_id: ActiveValue::set(0),
2160                        completed_scan_id: ActiveValue::set(0),
2161                    }
2162                }))
2163                .exec(&*tx)
2164                .await?;
2165            }
2166
2167            project_collaborator::ActiveModel {
2168                project_id: ActiveValue::set(project.id),
2169                connection_id: ActiveValue::set(connection.id as i32),
2170                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2171                user_id: ActiveValue::set(participant.user_id),
2172                replica_id: ActiveValue::set(ReplicaId(0)),
2173                is_host: ActiveValue::set(true),
2174                ..Default::default()
2175            }
2176            .insert(&*tx)
2177            .await?;
2178
2179            let room = self.get_room(room_id, &tx).await?;
2180            Ok((project.id, room))
2181        })
2182        .await
2183    }
2184
2185    pub async fn unshare_project(
2186        &self,
2187        project_id: ProjectId,
2188        connection: ConnectionId,
2189    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2190        let room_id = self.room_id_for_project(project_id).await?;
2191        self.room_transaction(room_id, |tx| async move {
2192            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2193
2194            let project = project::Entity::find_by_id(project_id)
2195                .one(&*tx)
2196                .await?
2197                .ok_or_else(|| anyhow!("project not found"))?;
2198            if project.host_connection()? == connection {
2199                project::Entity::delete(project.into_active_model())
2200                    .exec(&*tx)
2201                    .await?;
2202                let room = self.get_room(room_id, &tx).await?;
2203                Ok((room, guest_connection_ids))
2204            } else {
2205                Err(anyhow!("cannot unshare a project hosted by another user"))?
2206            }
2207        })
2208        .await
2209    }
2210
2211    pub async fn update_project(
2212        &self,
2213        project_id: ProjectId,
2214        connection: ConnectionId,
2215        worktrees: &[proto::WorktreeMetadata],
2216    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2217        let room_id = self.room_id_for_project(project_id).await?;
2218        self.room_transaction(room_id, |tx| async move {
2219            let project = project::Entity::find_by_id(project_id)
2220                .filter(
2221                    Condition::all()
2222                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2223                        .add(
2224                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2225                        ),
2226                )
2227                .one(&*tx)
2228                .await?
2229                .ok_or_else(|| anyhow!("no such project"))?;
2230
2231            self.update_project_worktrees(project.id, worktrees, &tx)
2232                .await?;
2233
2234            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2235            let room = self.get_room(project.room_id, &tx).await?;
2236            Ok((room, guest_connection_ids))
2237        })
2238        .await
2239    }
2240
2241    async fn update_project_worktrees(
2242        &self,
2243        project_id: ProjectId,
2244        worktrees: &[proto::WorktreeMetadata],
2245        tx: &DatabaseTransaction,
2246    ) -> Result<()> {
2247        if !worktrees.is_empty() {
2248            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2249                id: ActiveValue::set(worktree.id as i64),
2250                project_id: ActiveValue::set(project_id),
2251                abs_path: ActiveValue::set(worktree.abs_path.clone()),
2252                root_name: ActiveValue::set(worktree.root_name.clone()),
2253                visible: ActiveValue::set(worktree.visible),
2254                scan_id: ActiveValue::set(0),
2255                completed_scan_id: ActiveValue::set(0),
2256            }))
2257            .on_conflict(
2258                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2259                    .update_column(worktree::Column::RootName)
2260                    .to_owned(),
2261            )
2262            .exec(&*tx)
2263            .await?;
2264        }
2265
2266        worktree::Entity::delete_many()
2267            .filter(worktree::Column::ProjectId.eq(project_id).and(
2268                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2269            ))
2270            .exec(&*tx)
2271            .await?;
2272
2273        Ok(())
2274    }
2275
2276    pub async fn update_worktree(
2277        &self,
2278        update: &proto::UpdateWorktree,
2279        connection: ConnectionId,
2280    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2281        let project_id = ProjectId::from_proto(update.project_id);
2282        let worktree_id = update.worktree_id as i64;
2283        let room_id = self.room_id_for_project(project_id).await?;
2284        self.room_transaction(room_id, |tx| async move {
2285            // Ensure the update comes from the host.
2286            let _project = project::Entity::find_by_id(project_id)
2287                .filter(
2288                    Condition::all()
2289                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2290                        .add(
2291                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2292                        ),
2293                )
2294                .one(&*tx)
2295                .await?
2296                .ok_or_else(|| anyhow!("no such project"))?;
2297
2298            // Update metadata.
2299            worktree::Entity::update(worktree::ActiveModel {
2300                id: ActiveValue::set(worktree_id),
2301                project_id: ActiveValue::set(project_id),
2302                root_name: ActiveValue::set(update.root_name.clone()),
2303                scan_id: ActiveValue::set(update.scan_id as i64),
2304                completed_scan_id: if update.is_last_update {
2305                    ActiveValue::set(update.scan_id as i64)
2306                } else {
2307                    ActiveValue::default()
2308                },
2309                abs_path: ActiveValue::set(update.abs_path.clone()),
2310                ..Default::default()
2311            })
2312            .exec(&*tx)
2313            .await?;
2314
2315            if !update.updated_entries.is_empty() {
2316                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2317                    let mtime = entry.mtime.clone().unwrap_or_default();
2318                    worktree_entry::ActiveModel {
2319                        project_id: ActiveValue::set(project_id),
2320                        worktree_id: ActiveValue::set(worktree_id),
2321                        id: ActiveValue::set(entry.id as i64),
2322                        is_dir: ActiveValue::set(entry.is_dir),
2323                        path: ActiveValue::set(entry.path.clone()),
2324                        inode: ActiveValue::set(entry.inode as i64),
2325                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2326                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2327                        is_symlink: ActiveValue::set(entry.is_symlink),
2328                        is_ignored: ActiveValue::set(entry.is_ignored),
2329                        is_deleted: ActiveValue::set(false),
2330                        scan_id: ActiveValue::set(update.scan_id as i64),
2331                    }
2332                }))
2333                .on_conflict(
2334                    OnConflict::columns([
2335                        worktree_entry::Column::ProjectId,
2336                        worktree_entry::Column::WorktreeId,
2337                        worktree_entry::Column::Id,
2338                    ])
2339                    .update_columns([
2340                        worktree_entry::Column::IsDir,
2341                        worktree_entry::Column::Path,
2342                        worktree_entry::Column::Inode,
2343                        worktree_entry::Column::MtimeSeconds,
2344                        worktree_entry::Column::MtimeNanos,
2345                        worktree_entry::Column::IsSymlink,
2346                        worktree_entry::Column::IsIgnored,
2347                        worktree_entry::Column::ScanId,
2348                    ])
2349                    .to_owned(),
2350                )
2351                .exec(&*tx)
2352                .await?;
2353            }
2354
2355            if !update.removed_entries.is_empty() {
2356                worktree_entry::Entity::update_many()
2357                    .filter(
2358                        worktree_entry::Column::ProjectId
2359                            .eq(project_id)
2360                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2361                            .and(
2362                                worktree_entry::Column::Id
2363                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2364                            ),
2365                    )
2366                    .set(worktree_entry::ActiveModel {
2367                        is_deleted: ActiveValue::Set(true),
2368                        scan_id: ActiveValue::Set(update.scan_id as i64),
2369                        ..Default::default()
2370                    })
2371                    .exec(&*tx)
2372                    .await?;
2373            }
2374
2375            if !update.updated_repositories.is_empty() {
2376                worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
2377                    |repository| worktree_repository::ActiveModel {
2378                        project_id: ActiveValue::set(project_id),
2379                        worktree_id: ActiveValue::set(worktree_id),
2380                        dot_git_entry_id: ActiveValue::set(repository.dot_git_entry_id as i64),
2381                        work_directory_path: ActiveValue::set(repository.work_directory.clone()),
2382                        scan_id: ActiveValue::set(update.scan_id as i64),
2383                        branch: ActiveValue::set(repository.branch.clone()),
2384                        is_deleted: ActiveValue::set(false),
2385                    },
2386                ))
2387                .on_conflict(
2388                    OnConflict::columns([
2389                        worktree_repository::Column::ProjectId,
2390                        worktree_repository::Column::WorktreeId,
2391                        worktree_repository::Column::DotGitEntryId,
2392                    ])
2393                    .update_columns([
2394                        worktree_repository::Column::ScanId,
2395                        worktree_repository::Column::WorkDirectoryPath,
2396                        worktree_repository::Column::Branch,
2397                    ])
2398                    .to_owned(),
2399                )
2400                .exec(&*tx)
2401                .await?;
2402            }
2403
2404            if !update.removed_repositories.is_empty() {
2405                worktree_repository::Entity::update_many()
2406                    .filter(
2407                        worktree_repository::Column::ProjectId
2408                            .eq(project_id)
2409                            .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
2410                            .and(
2411                                worktree_repository::Column::DotGitEntryId
2412                                    .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
2413                            ),
2414                    )
2415                    .set(worktree_repository::ActiveModel {
2416                        is_deleted: ActiveValue::Set(true),
2417                        scan_id: ActiveValue::Set(update.scan_id as i64),
2418                        ..Default::default()
2419                    })
2420                    .exec(&*tx)
2421                    .await?;
2422            }
2423
2424            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2425            Ok(connection_ids)
2426        })
2427        .await
2428    }
2429
2430    pub async fn update_diagnostic_summary(
2431        &self,
2432        update: &proto::UpdateDiagnosticSummary,
2433        connection: ConnectionId,
2434    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2435        let project_id = ProjectId::from_proto(update.project_id);
2436        let worktree_id = update.worktree_id as i64;
2437        let room_id = self.room_id_for_project(project_id).await?;
2438        self.room_transaction(room_id, |tx| async move {
2439            let summary = update
2440                .summary
2441                .as_ref()
2442                .ok_or_else(|| anyhow!("invalid summary"))?;
2443
2444            // Ensure the update comes from the host.
2445            let project = project::Entity::find_by_id(project_id)
2446                .one(&*tx)
2447                .await?
2448                .ok_or_else(|| anyhow!("no such project"))?;
2449            if project.host_connection()? != connection {
2450                return Err(anyhow!("can't update a project hosted by someone else"))?;
2451            }
2452
2453            // Update summary.
2454            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2455                project_id: ActiveValue::set(project_id),
2456                worktree_id: ActiveValue::set(worktree_id),
2457                path: ActiveValue::set(summary.path.clone()),
2458                language_server_id: ActiveValue::set(summary.language_server_id as i64),
2459                error_count: ActiveValue::set(summary.error_count as i32),
2460                warning_count: ActiveValue::set(summary.warning_count as i32),
2461                ..Default::default()
2462            })
2463            .on_conflict(
2464                OnConflict::columns([
2465                    worktree_diagnostic_summary::Column::ProjectId,
2466                    worktree_diagnostic_summary::Column::WorktreeId,
2467                    worktree_diagnostic_summary::Column::Path,
2468                ])
2469                .update_columns([
2470                    worktree_diagnostic_summary::Column::LanguageServerId,
2471                    worktree_diagnostic_summary::Column::ErrorCount,
2472                    worktree_diagnostic_summary::Column::WarningCount,
2473                ])
2474                .to_owned(),
2475            )
2476            .exec(&*tx)
2477            .await?;
2478
2479            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2480            Ok(connection_ids)
2481        })
2482        .await
2483    }
2484
2485    pub async fn start_language_server(
2486        &self,
2487        update: &proto::StartLanguageServer,
2488        connection: ConnectionId,
2489    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2490        let project_id = ProjectId::from_proto(update.project_id);
2491        let room_id = self.room_id_for_project(project_id).await?;
2492        self.room_transaction(room_id, |tx| async move {
2493            let server = update
2494                .server
2495                .as_ref()
2496                .ok_or_else(|| anyhow!("invalid language server"))?;
2497
2498            // Ensure the update comes from the host.
2499            let project = project::Entity::find_by_id(project_id)
2500                .one(&*tx)
2501                .await?
2502                .ok_or_else(|| anyhow!("no such project"))?;
2503            if project.host_connection()? != connection {
2504                return Err(anyhow!("can't update a project hosted by someone else"))?;
2505            }
2506
2507            // Add the newly-started language server.
2508            language_server::Entity::insert(language_server::ActiveModel {
2509                project_id: ActiveValue::set(project_id),
2510                id: ActiveValue::set(server.id as i64),
2511                name: ActiveValue::set(server.name.clone()),
2512                ..Default::default()
2513            })
2514            .on_conflict(
2515                OnConflict::columns([
2516                    language_server::Column::ProjectId,
2517                    language_server::Column::Id,
2518                ])
2519                .update_column(language_server::Column::Name)
2520                .to_owned(),
2521            )
2522            .exec(&*tx)
2523            .await?;
2524
2525            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2526            Ok(connection_ids)
2527        })
2528        .await
2529    }
2530
2531    pub async fn join_project(
2532        &self,
2533        project_id: ProjectId,
2534        connection: ConnectionId,
2535    ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2536        let room_id = self.room_id_for_project(project_id).await?;
2537        self.room_transaction(room_id, |tx| async move {
2538            let participant = room_participant::Entity::find()
2539                .filter(
2540                    Condition::all()
2541                        .add(
2542                            room_participant::Column::AnsweringConnectionId
2543                                .eq(connection.id as i32),
2544                        )
2545                        .add(
2546                            room_participant::Column::AnsweringConnectionServerId
2547                                .eq(connection.owner_id as i32),
2548                        ),
2549                )
2550                .one(&*tx)
2551                .await?
2552                .ok_or_else(|| anyhow!("must join a room first"))?;
2553
2554            let project = project::Entity::find_by_id(project_id)
2555                .one(&*tx)
2556                .await?
2557                .ok_or_else(|| anyhow!("no such project"))?;
2558            if project.room_id != participant.room_id {
2559                return Err(anyhow!("no such project"))?;
2560            }
2561
2562            let mut collaborators = project
2563                .find_related(project_collaborator::Entity)
2564                .all(&*tx)
2565                .await?;
2566            let replica_ids = collaborators
2567                .iter()
2568                .map(|c| c.replica_id)
2569                .collect::<HashSet<_>>();
2570            let mut replica_id = ReplicaId(1);
2571            while replica_ids.contains(&replica_id) {
2572                replica_id.0 += 1;
2573            }
2574            let new_collaborator = project_collaborator::ActiveModel {
2575                project_id: ActiveValue::set(project_id),
2576                connection_id: ActiveValue::set(connection.id as i32),
2577                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2578                user_id: ActiveValue::set(participant.user_id),
2579                replica_id: ActiveValue::set(replica_id),
2580                is_host: ActiveValue::set(false),
2581                ..Default::default()
2582            }
2583            .insert(&*tx)
2584            .await?;
2585            collaborators.push(new_collaborator);
2586
2587            let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2588            let mut worktrees = db_worktrees
2589                .into_iter()
2590                .map(|db_worktree| {
2591                    (
2592                        db_worktree.id as u64,
2593                        Worktree {
2594                            id: db_worktree.id as u64,
2595                            abs_path: db_worktree.abs_path,
2596                            root_name: db_worktree.root_name,
2597                            visible: db_worktree.visible,
2598                            entries: Default::default(),
2599                            repository_entries: Default::default(),
2600                            diagnostic_summaries: Default::default(),
2601                            scan_id: db_worktree.scan_id as u64,
2602                            completed_scan_id: db_worktree.completed_scan_id as u64,
2603                        },
2604                    )
2605                })
2606                .collect::<BTreeMap<_, _>>();
2607
2608            // Populate worktree entries.
2609            {
2610                let mut db_entries = worktree_entry::Entity::find()
2611                    .filter(
2612                        Condition::all()
2613                            .add(worktree_entry::Column::ProjectId.eq(project_id))
2614                            .add(worktree_entry::Column::IsDeleted.eq(false)),
2615                    )
2616                    .stream(&*tx)
2617                    .await?;
2618                while let Some(db_entry) = db_entries.next().await {
2619                    let db_entry = db_entry?;
2620                    if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2621                        worktree.entries.push(proto::Entry {
2622                            id: db_entry.id as u64,
2623                            is_dir: db_entry.is_dir,
2624                            path: db_entry.path,
2625                            inode: db_entry.inode as u64,
2626                            mtime: Some(proto::Timestamp {
2627                                seconds: db_entry.mtime_seconds as u64,
2628                                nanos: db_entry.mtime_nanos as u32,
2629                            }),
2630                            is_symlink: db_entry.is_symlink,
2631                            is_ignored: db_entry.is_ignored,
2632                        });
2633                    }
2634                }
2635            }
2636
2637            // Populate repository entries.
2638            {
2639                let mut db_repository_entries = worktree_repository::Entity::find()
2640                    .filter(
2641                        Condition::all()
2642                            .add(worktree_repository::Column::ProjectId.eq(project_id))
2643                            .add(worktree_repository::Column::IsDeleted.eq(false)),
2644                    )
2645                    .stream(&*tx)
2646                    .await?;
2647                while let Some(db_repository_entry) = db_repository_entries.next().await {
2648                    let db_repository_entry = db_repository_entry?;
2649                    if let Some(worktree) =
2650                        worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
2651                    {
2652                        worktree.repository_entries.push(proto::RepositoryEntry {
2653                            dot_git_entry_id: db_repository_entry.dot_git_entry_id as u64,
2654                            scan_id: db_repository_entry.scan_id as u64,
2655                            work_directory: db_repository_entry.work_directory_path,
2656                            branch: db_repository_entry.branch,
2657                        });
2658                    }
2659                }
2660            }
2661
2662            // Populate worktree diagnostic summaries.
2663            {
2664                let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2665                    .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2666                    .stream(&*tx)
2667                    .await?;
2668                while let Some(db_summary) = db_summaries.next().await {
2669                    let db_summary = db_summary?;
2670                    if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2671                        worktree
2672                            .diagnostic_summaries
2673                            .push(proto::DiagnosticSummary {
2674                                path: db_summary.path,
2675                                language_server_id: db_summary.language_server_id as u64,
2676                                error_count: db_summary.error_count as u32,
2677                                warning_count: db_summary.warning_count as u32,
2678                            });
2679                    }
2680                }
2681            }
2682
2683            // Populate language servers.
2684            let language_servers = project
2685                .find_related(language_server::Entity)
2686                .all(&*tx)
2687                .await?;
2688
2689            let project = Project {
2690                collaborators: collaborators
2691                    .into_iter()
2692                    .map(|collaborator| ProjectCollaborator {
2693                        connection_id: collaborator.connection(),
2694                        user_id: collaborator.user_id,
2695                        replica_id: collaborator.replica_id,
2696                        is_host: collaborator.is_host,
2697                    })
2698                    .collect(),
2699                worktrees,
2700                language_servers: language_servers
2701                    .into_iter()
2702                    .map(|language_server| proto::LanguageServer {
2703                        id: language_server.id as u64,
2704                        name: language_server.name,
2705                    })
2706                    .collect(),
2707            };
2708            Ok((project, replica_id as ReplicaId))
2709        })
2710        .await
2711    }
2712
2713    pub async fn leave_project(
2714        &self,
2715        project_id: ProjectId,
2716        connection: ConnectionId,
2717    ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
2718        let room_id = self.room_id_for_project(project_id).await?;
2719        self.room_transaction(room_id, |tx| async move {
2720            let result = project_collaborator::Entity::delete_many()
2721                .filter(
2722                    Condition::all()
2723                        .add(project_collaborator::Column::ProjectId.eq(project_id))
2724                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2725                        .add(
2726                            project_collaborator::Column::ConnectionServerId
2727                                .eq(connection.owner_id as i32),
2728                        ),
2729                )
2730                .exec(&*tx)
2731                .await?;
2732            if result.rows_affected == 0 {
2733                Err(anyhow!("not a collaborator on this project"))?;
2734            }
2735
2736            let project = project::Entity::find_by_id(project_id)
2737                .one(&*tx)
2738                .await?
2739                .ok_or_else(|| anyhow!("no such project"))?;
2740            let collaborators = project
2741                .find_related(project_collaborator::Entity)
2742                .all(&*tx)
2743                .await?;
2744            let connection_ids = collaborators
2745                .into_iter()
2746                .map(|collaborator| collaborator.connection())
2747                .collect();
2748
2749            follower::Entity::delete_many()
2750                .filter(
2751                    Condition::any()
2752                        .add(
2753                            Condition::all()
2754                                .add(follower::Column::ProjectId.eq(project_id))
2755                                .add(
2756                                    follower::Column::LeaderConnectionServerId
2757                                        .eq(connection.owner_id),
2758                                )
2759                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
2760                        )
2761                        .add(
2762                            Condition::all()
2763                                .add(follower::Column::ProjectId.eq(project_id))
2764                                .add(
2765                                    follower::Column::FollowerConnectionServerId
2766                                        .eq(connection.owner_id),
2767                                )
2768                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
2769                        ),
2770                )
2771                .exec(&*tx)
2772                .await?;
2773
2774            let room = self.get_room(project.room_id, &tx).await?;
2775            let left_project = LeftProject {
2776                id: project_id,
2777                host_user_id: project.host_user_id,
2778                host_connection_id: project.host_connection()?,
2779                connection_ids,
2780            };
2781            Ok((room, left_project))
2782        })
2783        .await
2784    }
2785
2786    pub async fn project_collaborators(
2787        &self,
2788        project_id: ProjectId,
2789        connection_id: ConnectionId,
2790    ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
2791        let room_id = self.room_id_for_project(project_id).await?;
2792        self.room_transaction(room_id, |tx| async move {
2793            let collaborators = project_collaborator::Entity::find()
2794                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2795                .all(&*tx)
2796                .await?
2797                .into_iter()
2798                .map(|collaborator| ProjectCollaborator {
2799                    connection_id: collaborator.connection(),
2800                    user_id: collaborator.user_id,
2801                    replica_id: collaborator.replica_id,
2802                    is_host: collaborator.is_host,
2803                })
2804                .collect::<Vec<_>>();
2805
2806            if collaborators
2807                .iter()
2808                .any(|collaborator| collaborator.connection_id == connection_id)
2809            {
2810                Ok(collaborators)
2811            } else {
2812                Err(anyhow!("no such project"))?
2813            }
2814        })
2815        .await
2816    }
2817
2818    pub async fn project_connection_ids(
2819        &self,
2820        project_id: ProjectId,
2821        connection_id: ConnectionId,
2822    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
2823        let room_id = self.room_id_for_project(project_id).await?;
2824        self.room_transaction(room_id, |tx| async move {
2825            let mut collaborators = project_collaborator::Entity::find()
2826                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2827                .stream(&*tx)
2828                .await?;
2829
2830            let mut connection_ids = HashSet::default();
2831            while let Some(collaborator) = collaborators.next().await {
2832                let collaborator = collaborator?;
2833                connection_ids.insert(collaborator.connection());
2834            }
2835
2836            if connection_ids.contains(&connection_id) {
2837                Ok(connection_ids)
2838            } else {
2839                Err(anyhow!("no such project"))?
2840            }
2841        })
2842        .await
2843    }
2844
2845    async fn project_guest_connection_ids(
2846        &self,
2847        project_id: ProjectId,
2848        tx: &DatabaseTransaction,
2849    ) -> Result<Vec<ConnectionId>> {
2850        let mut collaborators = project_collaborator::Entity::find()
2851            .filter(
2852                project_collaborator::Column::ProjectId
2853                    .eq(project_id)
2854                    .and(project_collaborator::Column::IsHost.eq(false)),
2855            )
2856            .stream(tx)
2857            .await?;
2858
2859        let mut guest_connection_ids = Vec::new();
2860        while let Some(collaborator) = collaborators.next().await {
2861            let collaborator = collaborator?;
2862            guest_connection_ids.push(collaborator.connection());
2863        }
2864        Ok(guest_connection_ids)
2865    }
2866
2867    async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
2868        self.transaction(|tx| async move {
2869            let project = project::Entity::find_by_id(project_id)
2870                .one(&*tx)
2871                .await?
2872                .ok_or_else(|| anyhow!("project {} not found", project_id))?;
2873            Ok(project.room_id)
2874        })
2875        .await
2876    }
2877
2878    // access tokens
2879
2880    pub async fn create_access_token(
2881        &self,
2882        user_id: UserId,
2883        access_token_hash: &str,
2884        max_access_token_count: usize,
2885    ) -> Result<AccessTokenId> {
2886        self.transaction(|tx| async {
2887            let tx = tx;
2888
2889            let token = access_token::ActiveModel {
2890                user_id: ActiveValue::set(user_id),
2891                hash: ActiveValue::set(access_token_hash.into()),
2892                ..Default::default()
2893            }
2894            .insert(&*tx)
2895            .await?;
2896
2897            access_token::Entity::delete_many()
2898                .filter(
2899                    access_token::Column::Id.in_subquery(
2900                        Query::select()
2901                            .column(access_token::Column::Id)
2902                            .from(access_token::Entity)
2903                            .and_where(access_token::Column::UserId.eq(user_id))
2904                            .order_by(access_token::Column::Id, sea_orm::Order::Desc)
2905                            .limit(10000)
2906                            .offset(max_access_token_count as u64)
2907                            .to_owned(),
2908                    ),
2909                )
2910                .exec(&*tx)
2911                .await?;
2912            Ok(token.id)
2913        })
2914        .await
2915    }
2916
2917    pub async fn get_access_token(
2918        &self,
2919        access_token_id: AccessTokenId,
2920    ) -> Result<access_token::Model> {
2921        self.transaction(|tx| async move {
2922            Ok(access_token::Entity::find_by_id(access_token_id)
2923                .one(&*tx)
2924                .await?
2925                .ok_or_else(|| anyhow!("no such access token"))?)
2926        })
2927        .await
2928    }
2929
2930    async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
2931    where
2932        F: Send + Fn(TransactionHandle) -> Fut,
2933        Fut: Send + Future<Output = Result<T>>,
2934    {
2935        let body = async {
2936            let mut i = 0;
2937            loop {
2938                let (tx, result) = self.with_transaction(&f).await?;
2939                match result {
2940                    Ok(result) => match tx.commit().await.map_err(Into::into) {
2941                        Ok(()) => return Ok(result),
2942                        Err(error) => {
2943                            if !self.retry_on_serialization_error(&error, i).await {
2944                                return Err(error);
2945                            }
2946                        }
2947                    },
2948                    Err(error) => {
2949                        tx.rollback().await?;
2950                        if !self.retry_on_serialization_error(&error, i).await {
2951                            return Err(error);
2952                        }
2953                    }
2954                }
2955                i += 1;
2956            }
2957        };
2958
2959        self.run(body).await
2960    }
2961
2962    async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
2963    where
2964        F: Send + Fn(TransactionHandle) -> Fut,
2965        Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
2966    {
2967        let body = async {
2968            let mut i = 0;
2969            loop {
2970                let (tx, result) = self.with_transaction(&f).await?;
2971                match result {
2972                    Ok(Some((room_id, data))) => {
2973                        let lock = self.rooms.entry(room_id).or_default().clone();
2974                        let _guard = lock.lock_owned().await;
2975                        match tx.commit().await.map_err(Into::into) {
2976                            Ok(()) => {
2977                                return Ok(Some(RoomGuard {
2978                                    data,
2979                                    _guard,
2980                                    _not_send: PhantomData,
2981                                }));
2982                            }
2983                            Err(error) => {
2984                                if !self.retry_on_serialization_error(&error, i).await {
2985                                    return Err(error);
2986                                }
2987                            }
2988                        }
2989                    }
2990                    Ok(None) => match tx.commit().await.map_err(Into::into) {
2991                        Ok(()) => return Ok(None),
2992                        Err(error) => {
2993                            if !self.retry_on_serialization_error(&error, i).await {
2994                                return Err(error);
2995                            }
2996                        }
2997                    },
2998                    Err(error) => {
2999                        tx.rollback().await?;
3000                        if !self.retry_on_serialization_error(&error, i).await {
3001                            return Err(error);
3002                        }
3003                    }
3004                }
3005                i += 1;
3006            }
3007        };
3008
3009        self.run(body).await
3010    }
3011
3012    async fn room_transaction<F, Fut, T>(&self, room_id: RoomId, f: F) -> Result<RoomGuard<T>>
3013    where
3014        F: Send + Fn(TransactionHandle) -> Fut,
3015        Fut: Send + Future<Output = Result<T>>,
3016    {
3017        let body = async {
3018            let mut i = 0;
3019            loop {
3020                let lock = self.rooms.entry(room_id).or_default().clone();
3021                let _guard = lock.lock_owned().await;
3022                let (tx, result) = self.with_transaction(&f).await?;
3023                match result {
3024                    Ok(data) => match tx.commit().await.map_err(Into::into) {
3025                        Ok(()) => {
3026                            return Ok(RoomGuard {
3027                                data,
3028                                _guard,
3029                                _not_send: PhantomData,
3030                            });
3031                        }
3032                        Err(error) => {
3033                            if !self.retry_on_serialization_error(&error, i).await {
3034                                return Err(error);
3035                            }
3036                        }
3037                    },
3038                    Err(error) => {
3039                        tx.rollback().await?;
3040                        if !self.retry_on_serialization_error(&error, i).await {
3041                            return Err(error);
3042                        }
3043                    }
3044                }
3045                i += 1;
3046            }
3047        };
3048
3049        self.run(body).await
3050    }
3051
3052    async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
3053    where
3054        F: Send + Fn(TransactionHandle) -> Fut,
3055        Fut: Send + Future<Output = Result<T>>,
3056    {
3057        let tx = self
3058            .pool
3059            .begin_with_config(Some(IsolationLevel::Serializable), None)
3060            .await?;
3061
3062        let mut tx = Arc::new(Some(tx));
3063        let result = f(TransactionHandle(tx.clone())).await;
3064        let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
3065            return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
3066        };
3067
3068        Ok((tx, result))
3069    }
3070
3071    async fn run<F, T>(&self, future: F) -> Result<T>
3072    where
3073        F: Future<Output = Result<T>>,
3074    {
3075        #[cfg(test)]
3076        {
3077            if let Executor::Deterministic(executor) = &self.executor {
3078                executor.simulate_random_delay().await;
3079            }
3080
3081            self.runtime.as_ref().unwrap().block_on(future)
3082        }
3083
3084        #[cfg(not(test))]
3085        {
3086            future.await
3087        }
3088    }
3089
3090    async fn retry_on_serialization_error(&self, error: &Error, prev_attempt_count: u32) -> bool {
3091        // If the error is due to a failure to serialize concurrent transactions, then retry
3092        // this transaction after a delay. With each subsequent retry, double the delay duration.
3093        // Also vary the delay randomly in order to ensure different database connections retry
3094        // at different times.
3095        if is_serialization_error(error) {
3096            let base_delay = 4_u64 << prev_attempt_count.min(16);
3097            let randomized_delay = base_delay as f32 * self.rng.lock().await.gen_range(0.5..=2.0);
3098            log::info!(
3099                "retrying transaction after serialization error. delay: {} ms.",
3100                randomized_delay
3101            );
3102            self.executor
3103                .sleep(Duration::from_millis(randomized_delay as u64))
3104                .await;
3105            true
3106        } else {
3107            false
3108        }
3109    }
3110}
3111
3112fn is_serialization_error(error: &Error) -> bool {
3113    const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
3114    match error {
3115        Error::Database(
3116            DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
3117            | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
3118        ) if error
3119            .as_database_error()
3120            .and_then(|error| error.code())
3121            .as_deref()
3122            == Some(SERIALIZATION_FAILURE_CODE) =>
3123        {
3124            true
3125        }
3126        _ => false,
3127    }
3128}
3129
3130struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
3131
3132impl Deref for TransactionHandle {
3133    type Target = DatabaseTransaction;
3134
3135    fn deref(&self) -> &Self::Target {
3136        self.0.as_ref().as_ref().unwrap()
3137    }
3138}
3139
3140pub struct RoomGuard<T> {
3141    data: T,
3142    _guard: OwnedMutexGuard<()>,
3143    _not_send: PhantomData<Rc<()>>,
3144}
3145
3146impl<T> Deref for RoomGuard<T> {
3147    type Target = T;
3148
3149    fn deref(&self) -> &T {
3150        &self.data
3151    }
3152}
3153
3154impl<T> DerefMut for RoomGuard<T> {
3155    fn deref_mut(&mut self) -> &mut T {
3156        &mut self.data
3157    }
3158}
3159
3160#[derive(Debug, Serialize, Deserialize)]
3161pub struct NewUserParams {
3162    pub github_login: String,
3163    pub github_user_id: i32,
3164    pub invite_count: i32,
3165}
3166
3167#[derive(Debug)]
3168pub struct NewUserResult {
3169    pub user_id: UserId,
3170    pub metrics_id: String,
3171    pub inviting_user_id: Option<UserId>,
3172    pub signup_device_id: Option<String>,
3173}
3174
3175fn random_invite_code() -> String {
3176    nanoid::nanoid!(16)
3177}
3178
3179fn random_email_confirmation_code() -> String {
3180    nanoid::nanoid!(64)
3181}
3182
3183macro_rules! id_type {
3184    ($name:ident) => {
3185        #[derive(
3186            Clone,
3187            Copy,
3188            Debug,
3189            Default,
3190            PartialEq,
3191            Eq,
3192            PartialOrd,
3193            Ord,
3194            Hash,
3195            Serialize,
3196            Deserialize,
3197        )]
3198        #[serde(transparent)]
3199        pub struct $name(pub i32);
3200
3201        impl $name {
3202            #[allow(unused)]
3203            pub const MAX: Self = Self(i32::MAX);
3204
3205            #[allow(unused)]
3206            pub fn from_proto(value: u64) -> Self {
3207                Self(value as i32)
3208            }
3209
3210            #[allow(unused)]
3211            pub fn to_proto(self) -> u64 {
3212                self.0 as u64
3213            }
3214        }
3215
3216        impl std::fmt::Display for $name {
3217            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
3218                self.0.fmt(f)
3219            }
3220        }
3221
3222        impl From<$name> for sea_query::Value {
3223            fn from(value: $name) -> Self {
3224                sea_query::Value::Int(Some(value.0))
3225            }
3226        }
3227
3228        impl sea_orm::TryGetable for $name {
3229            fn try_get(
3230                res: &sea_orm::QueryResult,
3231                pre: &str,
3232                col: &str,
3233            ) -> Result<Self, sea_orm::TryGetError> {
3234                Ok(Self(i32::try_get(res, pre, col)?))
3235            }
3236        }
3237
3238        impl sea_query::ValueType for $name {
3239            fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
3240                match v {
3241                    Value::TinyInt(Some(int)) => {
3242                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3243                    }
3244                    Value::SmallInt(Some(int)) => {
3245                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3246                    }
3247                    Value::Int(Some(int)) => {
3248                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3249                    }
3250                    Value::BigInt(Some(int)) => {
3251                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3252                    }
3253                    Value::TinyUnsigned(Some(int)) => {
3254                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3255                    }
3256                    Value::SmallUnsigned(Some(int)) => {
3257                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3258                    }
3259                    Value::Unsigned(Some(int)) => {
3260                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3261                    }
3262                    Value::BigUnsigned(Some(int)) => {
3263                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3264                    }
3265                    _ => Err(sea_query::ValueTypeErr),
3266                }
3267            }
3268
3269            fn type_name() -> String {
3270                stringify!($name).into()
3271            }
3272
3273            fn array_type() -> sea_query::ArrayType {
3274                sea_query::ArrayType::Int
3275            }
3276
3277            fn column_type() -> sea_query::ColumnType {
3278                sea_query::ColumnType::Integer(None)
3279            }
3280        }
3281
3282        impl sea_orm::TryFromU64 for $name {
3283            fn try_from_u64(n: u64) -> Result<Self, DbErr> {
3284                Ok(Self(n.try_into().map_err(|_| {
3285                    DbErr::ConvertFromU64(concat!(
3286                        "error converting ",
3287                        stringify!($name),
3288                        " to u64"
3289                    ))
3290                })?))
3291            }
3292        }
3293
3294        impl sea_query::Nullable for $name {
3295            fn null() -> Value {
3296                Value::Int(None)
3297            }
3298        }
3299    };
3300}
3301
3302id_type!(AccessTokenId);
3303id_type!(ContactId);
3304id_type!(FollowerId);
3305id_type!(RoomId);
3306id_type!(RoomParticipantId);
3307id_type!(ProjectId);
3308id_type!(ProjectCollaboratorId);
3309id_type!(ReplicaId);
3310id_type!(ServerId);
3311id_type!(SignupId);
3312id_type!(UserId);
3313
3314pub struct RejoinedRoom {
3315    pub room: proto::Room,
3316    pub rejoined_projects: Vec<RejoinedProject>,
3317    pub reshared_projects: Vec<ResharedProject>,
3318}
3319
3320pub struct ResharedProject {
3321    pub id: ProjectId,
3322    pub old_connection_id: ConnectionId,
3323    pub collaborators: Vec<ProjectCollaborator>,
3324    pub worktrees: Vec<proto::WorktreeMetadata>,
3325}
3326
3327pub struct RejoinedProject {
3328    pub id: ProjectId,
3329    pub old_connection_id: ConnectionId,
3330    pub collaborators: Vec<ProjectCollaborator>,
3331    pub worktrees: Vec<RejoinedWorktree>,
3332    pub language_servers: Vec<proto::LanguageServer>,
3333}
3334
3335#[derive(Debug)]
3336pub struct RejoinedWorktree {
3337    pub id: u64,
3338    pub abs_path: String,
3339    pub root_name: String,
3340    pub visible: bool,
3341    pub updated_entries: Vec<proto::Entry>,
3342    pub removed_entries: Vec<u64>,
3343    pub updated_repositories: Vec<proto::RepositoryEntry>,
3344    pub removed_repositories: Vec<u64>,
3345    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3346    pub scan_id: u64,
3347    pub completed_scan_id: u64,
3348}
3349
3350pub struct LeftRoom {
3351    pub room: proto::Room,
3352    pub left_projects: HashMap<ProjectId, LeftProject>,
3353    pub canceled_calls_to_user_ids: Vec<UserId>,
3354}
3355
3356pub struct RefreshedRoom {
3357    pub room: proto::Room,
3358    pub stale_participant_user_ids: Vec<UserId>,
3359    pub canceled_calls_to_user_ids: Vec<UserId>,
3360}
3361
3362pub struct Project {
3363    pub collaborators: Vec<ProjectCollaborator>,
3364    pub worktrees: BTreeMap<u64, Worktree>,
3365    pub language_servers: Vec<proto::LanguageServer>,
3366}
3367
3368pub struct ProjectCollaborator {
3369    pub connection_id: ConnectionId,
3370    pub user_id: UserId,
3371    pub replica_id: ReplicaId,
3372    pub is_host: bool,
3373}
3374
3375impl ProjectCollaborator {
3376    pub fn to_proto(&self) -> proto::Collaborator {
3377        proto::Collaborator {
3378            peer_id: Some(self.connection_id.into()),
3379            replica_id: self.replica_id.0 as u32,
3380            user_id: self.user_id.to_proto(),
3381        }
3382    }
3383}
3384
3385#[derive(Debug)]
3386pub struct LeftProject {
3387    pub id: ProjectId,
3388    pub host_user_id: UserId,
3389    pub host_connection_id: ConnectionId,
3390    pub connection_ids: Vec<ConnectionId>,
3391}
3392
3393pub struct Worktree {
3394    pub id: u64,
3395    pub abs_path: String,
3396    pub root_name: String,
3397    pub visible: bool,
3398    pub entries: Vec<proto::Entry>,
3399    pub repository_entries: Vec<proto::RepositoryEntry>,
3400    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3401    pub scan_id: u64,
3402    pub completed_scan_id: u64,
3403}
3404
3405#[cfg(test)]
3406pub use test::*;
3407
3408#[cfg(test)]
3409mod test {
3410    use super::*;
3411    use gpui::executor::Background;
3412    use lazy_static::lazy_static;
3413    use parking_lot::Mutex;
3414    use sea_orm::ConnectionTrait;
3415    use sqlx::migrate::MigrateDatabase;
3416    use std::sync::Arc;
3417
3418    pub struct TestDb {
3419        pub db: Option<Arc<Database>>,
3420        pub connection: Option<sqlx::AnyConnection>,
3421    }
3422
3423    impl TestDb {
3424        pub fn sqlite(background: Arc<Background>) -> Self {
3425            let url = format!("sqlite::memory:");
3426            let runtime = tokio::runtime::Builder::new_current_thread()
3427                .enable_io()
3428                .enable_time()
3429                .build()
3430                .unwrap();
3431
3432            let mut db = runtime.block_on(async {
3433                let mut options = ConnectOptions::new(url);
3434                options.max_connections(5);
3435                let db = Database::new(options, Executor::Deterministic(background))
3436                    .await
3437                    .unwrap();
3438                let sql = include_str!(concat!(
3439                    env!("CARGO_MANIFEST_DIR"),
3440                    "/migrations.sqlite/20221109000000_test_schema.sql"
3441                ));
3442                db.pool
3443                    .execute(sea_orm::Statement::from_string(
3444                        db.pool.get_database_backend(),
3445                        sql.into(),
3446                    ))
3447                    .await
3448                    .unwrap();
3449                db
3450            });
3451
3452            db.runtime = Some(runtime);
3453
3454            Self {
3455                db: Some(Arc::new(db)),
3456                connection: None,
3457            }
3458        }
3459
3460        pub fn postgres(background: Arc<Background>) -> Self {
3461            lazy_static! {
3462                static ref LOCK: Mutex<()> = Mutex::new(());
3463            }
3464
3465            let _guard = LOCK.lock();
3466            let mut rng = StdRng::from_entropy();
3467            let url = format!(
3468                "postgres://postgres@localhost/zed-test-{}",
3469                rng.gen::<u128>()
3470            );
3471            let runtime = tokio::runtime::Builder::new_current_thread()
3472                .enable_io()
3473                .enable_time()
3474                .build()
3475                .unwrap();
3476
3477            let mut db = runtime.block_on(async {
3478                sqlx::Postgres::create_database(&url)
3479                    .await
3480                    .expect("failed to create test db");
3481                let mut options = ConnectOptions::new(url);
3482                options
3483                    .max_connections(5)
3484                    .idle_timeout(Duration::from_secs(0));
3485                let db = Database::new(options, Executor::Deterministic(background))
3486                    .await
3487                    .unwrap();
3488                let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
3489                db.migrate(Path::new(migrations_path), false).await.unwrap();
3490                db
3491            });
3492
3493            db.runtime = Some(runtime);
3494
3495            Self {
3496                db: Some(Arc::new(db)),
3497                connection: None,
3498            }
3499        }
3500
3501        pub fn db(&self) -> &Arc<Database> {
3502            self.db.as_ref().unwrap()
3503        }
3504    }
3505
3506    impl Drop for TestDb {
3507        fn drop(&mut self) {
3508            let db = self.db.take().unwrap();
3509            if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
3510                db.runtime.as_ref().unwrap().block_on(async {
3511                    use util::ResultExt;
3512                    let query = "
3513                        SELECT pg_terminate_backend(pg_stat_activity.pid)
3514                        FROM pg_stat_activity
3515                        WHERE
3516                            pg_stat_activity.datname = current_database() AND
3517                            pid <> pg_backend_pid();
3518                    ";
3519                    db.pool
3520                        .execute(sea_orm::Statement::from_string(
3521                            db.pool.get_database_backend(),
3522                            query.into(),
3523                        ))
3524                        .await
3525                        .log_err();
3526                    sqlx::Postgres::drop_database(db.options.get_url())
3527                        .await
3528                        .log_err();
3529                })
3530            }
3531        }
3532    }
3533}