db.rs

   1use crate::{Error, Result};
   2use anyhow::anyhow;
   3use axum::http::StatusCode;
   4use collections::{BTreeMap, HashMap, HashSet};
   5use futures::{future::BoxFuture, FutureExt, StreamExt};
   6use rpc::{proto, ConnectionId};
   7use serde::{Deserialize, Serialize};
   8use sqlx::{
   9    migrate::{Migrate as _, Migration, MigrationSource},
  10    types::Uuid,
  11    FromRow,
  12};
  13use std::{future::Future, path::Path, time::Duration};
  14use time::{OffsetDateTime, PrimitiveDateTime};
  15
  16#[cfg(test)]
  17pub type DefaultDb = Db<sqlx::Sqlite>;
  18
  19#[cfg(not(test))]
  20pub type DefaultDb = Db<sqlx::Postgres>;
  21
  22pub struct Db<D: sqlx::Database> {
  23    pool: sqlx::Pool<D>,
  24    #[cfg(test)]
  25    background: Option<std::sync::Arc<gpui::executor::Background>>,
  26    #[cfg(test)]
  27    runtime: Option<tokio::runtime::Runtime>,
  28}
  29
  30pub trait BeginTransaction: Send + Sync {
  31    type Database: sqlx::Database;
  32
  33    fn begin_transaction(&self) -> BoxFuture<Result<sqlx::Transaction<'static, Self::Database>>>;
  34}
  35
  36// In Postgres, serializable transactions are opt-in
  37impl BeginTransaction for Db<sqlx::Postgres> {
  38    type Database = sqlx::Postgres;
  39
  40    fn begin_transaction(&self) -> BoxFuture<Result<sqlx::Transaction<'static, sqlx::Postgres>>> {
  41        async move {
  42            let mut tx = self.pool.begin().await?;
  43            sqlx::Executor::execute(&mut tx, "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;")
  44                .await?;
  45            Ok(tx)
  46        }
  47        .boxed()
  48    }
  49}
  50
  51// In Sqlite, transactions are inherently serializable.
  52#[cfg(test)]
  53impl BeginTransaction for Db<sqlx::Sqlite> {
  54    type Database = sqlx::Sqlite;
  55
  56    fn begin_transaction(&self) -> BoxFuture<Result<sqlx::Transaction<'static, sqlx::Sqlite>>> {
  57        async move { Ok(self.pool.begin().await?) }.boxed()
  58    }
  59}
  60
  61pub trait RowsAffected {
  62    fn rows_affected(&self) -> u64;
  63}
  64
  65#[cfg(test)]
  66impl RowsAffected for sqlx::sqlite::SqliteQueryResult {
  67    fn rows_affected(&self) -> u64 {
  68        self.rows_affected()
  69    }
  70}
  71
  72impl RowsAffected for sqlx::postgres::PgQueryResult {
  73    fn rows_affected(&self) -> u64 {
  74        self.rows_affected()
  75    }
  76}
  77
  78#[cfg(test)]
  79impl Db<sqlx::Sqlite> {
  80    pub async fn new(url: &str, max_connections: u32) -> Result<Self> {
  81        use std::str::FromStr as _;
  82        let options = sqlx::sqlite::SqliteConnectOptions::from_str(url)
  83            .unwrap()
  84            .create_if_missing(true)
  85            .shared_cache(true);
  86        let pool = sqlx::sqlite::SqlitePoolOptions::new()
  87            .min_connections(2)
  88            .max_connections(max_connections)
  89            .connect_with(options)
  90            .await?;
  91        Ok(Self {
  92            pool,
  93            background: None,
  94            runtime: None,
  95        })
  96    }
  97
  98    pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<User>> {
  99        self.transact(|tx| async {
 100            let mut tx = tx;
 101            let query = "
 102                SELECT users.*
 103                FROM users
 104                WHERE users.id IN (SELECT value from json_each($1))
 105            ";
 106            Ok(sqlx::query_as(query)
 107                .bind(&serde_json::json!(ids))
 108                .fetch_all(&mut tx)
 109                .await?)
 110        })
 111        .await
 112    }
 113
 114    pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
 115        self.transact(|mut tx| async move {
 116            let query = "
 117                SELECT metrics_id
 118                FROM users
 119                WHERE id = $1
 120            ";
 121            Ok(sqlx::query_scalar(query)
 122                .bind(id)
 123                .fetch_one(&mut tx)
 124                .await?)
 125        })
 126        .await
 127    }
 128
 129    pub async fn create_user(
 130        &self,
 131        email_address: &str,
 132        admin: bool,
 133        params: NewUserParams,
 134    ) -> Result<NewUserResult> {
 135        self.transact(|mut tx| async {
 136            let query = "
 137                INSERT INTO users (email_address, github_login, github_user_id, admin, metrics_id)
 138                VALUES ($1, $2, $3, $4, $5)
 139                ON CONFLICT (github_login) DO UPDATE SET github_login = excluded.github_login
 140                RETURNING id, metrics_id
 141            ";
 142
 143            let (user_id, metrics_id): (UserId, String) = sqlx::query_as(query)
 144                .bind(email_address)
 145                .bind(&params.github_login)
 146                .bind(&params.github_user_id)
 147                .bind(admin)
 148                .bind(Uuid::new_v4().to_string())
 149                .fetch_one(&mut tx)
 150                .await?;
 151            tx.commit().await?;
 152            Ok(NewUserResult {
 153                user_id,
 154                metrics_id,
 155                signup_device_id: None,
 156                inviting_user_id: None,
 157            })
 158        })
 159        .await
 160    }
 161
 162    pub async fn fuzzy_search_users(&self, _name_query: &str, _limit: u32) -> Result<Vec<User>> {
 163        unimplemented!()
 164    }
 165
 166    pub async fn create_user_from_invite(
 167        &self,
 168        _invite: &Invite,
 169        _user: NewUserParams,
 170    ) -> Result<Option<NewUserResult>> {
 171        unimplemented!()
 172    }
 173
 174    pub async fn create_signup(&self, _signup: Signup) -> Result<()> {
 175        unimplemented!()
 176    }
 177
 178    pub async fn create_invite_from_code(
 179        &self,
 180        _code: &str,
 181        _email_address: &str,
 182        _device_id: Option<&str>,
 183    ) -> Result<Invite> {
 184        unimplemented!()
 185    }
 186
 187    pub async fn record_sent_invites(&self, _invites: &[Invite]) -> Result<()> {
 188        unimplemented!()
 189    }
 190}
 191
 192impl Db<sqlx::Postgres> {
 193    pub async fn new(url: &str, max_connections: u32) -> Result<Self> {
 194        let pool = sqlx::postgres::PgPoolOptions::new()
 195            .max_connections(max_connections)
 196            .connect(url)
 197            .await?;
 198        Ok(Self {
 199            pool,
 200            #[cfg(test)]
 201            background: None,
 202            #[cfg(test)]
 203            runtime: None,
 204        })
 205    }
 206
 207    #[cfg(test)]
 208    pub fn teardown(&self, url: &str) {
 209        self.runtime.as_ref().unwrap().block_on(async {
 210            use util::ResultExt;
 211            let query = "
 212                SELECT pg_terminate_backend(pg_stat_activity.pid)
 213                FROM pg_stat_activity
 214                WHERE pg_stat_activity.datname = current_database() AND pid <> pg_backend_pid();
 215            ";
 216            sqlx::query(query).execute(&self.pool).await.log_err();
 217            self.pool.close().await;
 218            <sqlx::Sqlite as sqlx::migrate::MigrateDatabase>::drop_database(url)
 219                .await
 220                .log_err();
 221        })
 222    }
 223
 224    pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
 225        self.transact(|tx| async {
 226            let mut tx = tx;
 227            let like_string = Self::fuzzy_like_string(name_query);
 228            let query = "
 229                SELECT users.*
 230                FROM users
 231                WHERE github_login ILIKE $1
 232                ORDER BY github_login <-> $2
 233                LIMIT $3
 234            ";
 235            Ok(sqlx::query_as(query)
 236                .bind(like_string)
 237                .bind(name_query)
 238                .bind(limit as i32)
 239                .fetch_all(&mut tx)
 240                .await?)
 241        })
 242        .await
 243    }
 244
 245    pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<User>> {
 246        let ids = ids.iter().map(|id| id.0).collect::<Vec<_>>();
 247        self.transact(|tx| async {
 248            let mut tx = tx;
 249            let query = "
 250                SELECT users.*
 251                FROM users
 252                WHERE users.id = ANY ($1)
 253            ";
 254            Ok(sqlx::query_as(query).bind(&ids).fetch_all(&mut tx).await?)
 255        })
 256        .await
 257    }
 258
 259    pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
 260        self.transact(|mut tx| async move {
 261            let query = "
 262                SELECT metrics_id::text
 263                FROM users
 264                WHERE id = $1
 265            ";
 266            Ok(sqlx::query_scalar(query)
 267                .bind(id)
 268                .fetch_one(&mut tx)
 269                .await?)
 270        })
 271        .await
 272    }
 273
 274    pub async fn create_user(
 275        &self,
 276        email_address: &str,
 277        admin: bool,
 278        params: NewUserParams,
 279    ) -> Result<NewUserResult> {
 280        self.transact(|mut tx| async {
 281            let query = "
 282                INSERT INTO users (email_address, github_login, github_user_id, admin)
 283                VALUES ($1, $2, $3, $4)
 284                ON CONFLICT (github_login) DO UPDATE SET github_login = excluded.github_login
 285                RETURNING id, metrics_id::text
 286            ";
 287
 288            let (user_id, metrics_id): (UserId, String) = sqlx::query_as(query)
 289                .bind(email_address)
 290                .bind(&params.github_login)
 291                .bind(params.github_user_id)
 292                .bind(admin)
 293                .fetch_one(&mut tx)
 294                .await?;
 295            tx.commit().await?;
 296
 297            Ok(NewUserResult {
 298                user_id,
 299                metrics_id,
 300                signup_device_id: None,
 301                inviting_user_id: None,
 302            })
 303        })
 304        .await
 305    }
 306
 307    pub async fn create_user_from_invite(
 308        &self,
 309        invite: &Invite,
 310        user: NewUserParams,
 311    ) -> Result<Option<NewUserResult>> {
 312        self.transact(|mut tx| async {
 313            let (signup_id, existing_user_id, inviting_user_id, signup_device_id): (
 314                i32,
 315                Option<UserId>,
 316                Option<UserId>,
 317                Option<String>,
 318            ) = sqlx::query_as(
 319                "
 320                SELECT id, user_id, inviting_user_id, device_id
 321                FROM signups
 322                WHERE
 323                    email_address = $1 AND
 324                    email_confirmation_code = $2
 325                ",
 326            )
 327            .bind(&invite.email_address)
 328            .bind(&invite.email_confirmation_code)
 329            .fetch_optional(&mut tx)
 330            .await?
 331            .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
 332
 333            if existing_user_id.is_some() {
 334                return Ok(None);
 335            }
 336
 337            let (user_id, metrics_id): (UserId, String) = sqlx::query_as(
 338                "
 339                INSERT INTO users
 340                (email_address, github_login, github_user_id, admin, invite_count, invite_code)
 341                VALUES
 342                ($1, $2, $3, FALSE, $4, $5)
 343                ON CONFLICT (github_login) DO UPDATE SET
 344                    email_address = excluded.email_address,
 345                    github_user_id = excluded.github_user_id,
 346                    admin = excluded.admin
 347                RETURNING id, metrics_id::text
 348                ",
 349            )
 350            .bind(&invite.email_address)
 351            .bind(&user.github_login)
 352            .bind(&user.github_user_id)
 353            .bind(&user.invite_count)
 354            .bind(random_invite_code())
 355            .fetch_one(&mut tx)
 356            .await?;
 357
 358            sqlx::query(
 359                "
 360                UPDATE signups
 361                SET user_id = $1
 362                WHERE id = $2
 363                ",
 364            )
 365            .bind(&user_id)
 366            .bind(&signup_id)
 367            .execute(&mut tx)
 368            .await?;
 369
 370            if let Some(inviting_user_id) = inviting_user_id {
 371                let id: Option<UserId> = sqlx::query_scalar(
 372                    "
 373                    UPDATE users
 374                    SET invite_count = invite_count - 1
 375                    WHERE id = $1 AND invite_count > 0
 376                    RETURNING id
 377                    ",
 378                )
 379                .bind(&inviting_user_id)
 380                .fetch_optional(&mut tx)
 381                .await?;
 382
 383                if id.is_none() {
 384                    Err(Error::Http(
 385                        StatusCode::UNAUTHORIZED,
 386                        "no invites remaining".to_string(),
 387                    ))?;
 388                }
 389
 390                sqlx::query(
 391                    "
 392                    INSERT INTO contacts
 393                        (user_id_a, user_id_b, a_to_b, should_notify, accepted)
 394                    VALUES
 395                        ($1, $2, TRUE, TRUE, TRUE)
 396                    ON CONFLICT DO NOTHING
 397                    ",
 398                )
 399                .bind(inviting_user_id)
 400                .bind(user_id)
 401                .execute(&mut tx)
 402                .await?;
 403            }
 404
 405            tx.commit().await?;
 406            Ok(Some(NewUserResult {
 407                user_id,
 408                metrics_id,
 409                inviting_user_id,
 410                signup_device_id,
 411            }))
 412        })
 413        .await
 414    }
 415
 416    pub async fn create_signup(&self, signup: Signup) -> Result<()> {
 417        self.transact(|mut tx| async {
 418            sqlx::query(
 419                "
 420                INSERT INTO signups
 421                (
 422                    email_address,
 423                    email_confirmation_code,
 424                    email_confirmation_sent,
 425                    platform_linux,
 426                    platform_mac,
 427                    platform_windows,
 428                    platform_unknown,
 429                    editor_features,
 430                    programming_languages,
 431                    device_id
 432                )
 433                VALUES
 434                    ($1, $2, FALSE, $3, $4, $5, FALSE, $6, $7, $8)
 435                RETURNING id
 436                ",
 437            )
 438            .bind(&signup.email_address)
 439            .bind(&random_email_confirmation_code())
 440            .bind(&signup.platform_linux)
 441            .bind(&signup.platform_mac)
 442            .bind(&signup.platform_windows)
 443            .bind(&signup.editor_features)
 444            .bind(&signup.programming_languages)
 445            .bind(&signup.device_id)
 446            .execute(&mut tx)
 447            .await?;
 448            tx.commit().await?;
 449            Ok(())
 450        })
 451        .await
 452    }
 453
 454    pub async fn create_invite_from_code(
 455        &self,
 456        code: &str,
 457        email_address: &str,
 458        device_id: Option<&str>,
 459    ) -> Result<Invite> {
 460        self.transact(|mut tx| async {
 461            let existing_user: Option<UserId> = sqlx::query_scalar(
 462                "
 463                SELECT id
 464                FROM users
 465                WHERE email_address = $1
 466                ",
 467            )
 468            .bind(email_address)
 469            .fetch_optional(&mut tx)
 470            .await?;
 471            if existing_user.is_some() {
 472                Err(anyhow!("email address is already in use"))?;
 473            }
 474
 475            let row: Option<(UserId, i32)> = sqlx::query_as(
 476                "
 477                SELECT id, invite_count
 478                FROM users
 479                WHERE invite_code = $1
 480                ",
 481            )
 482            .bind(code)
 483            .fetch_optional(&mut tx)
 484            .await?;
 485
 486            let (inviter_id, invite_count) = match row {
 487                Some(row) => row,
 488                None => Err(Error::Http(
 489                    StatusCode::NOT_FOUND,
 490                    "invite code not found".to_string(),
 491                ))?,
 492            };
 493
 494            if invite_count == 0 {
 495                Err(Error::Http(
 496                    StatusCode::UNAUTHORIZED,
 497                    "no invites remaining".to_string(),
 498                ))?;
 499            }
 500
 501            let email_confirmation_code: String = sqlx::query_scalar(
 502                "
 503                INSERT INTO signups
 504                (
 505                    email_address,
 506                    email_confirmation_code,
 507                    email_confirmation_sent,
 508                    inviting_user_id,
 509                    platform_linux,
 510                    platform_mac,
 511                    platform_windows,
 512                    platform_unknown,
 513                    device_id
 514                )
 515                VALUES
 516                    ($1, $2, FALSE, $3, FALSE, FALSE, FALSE, TRUE, $4)
 517                ON CONFLICT (email_address)
 518                DO UPDATE SET
 519                    inviting_user_id = excluded.inviting_user_id
 520                RETURNING email_confirmation_code
 521                ",
 522            )
 523            .bind(&email_address)
 524            .bind(&random_email_confirmation_code())
 525            .bind(&inviter_id)
 526            .bind(&device_id)
 527            .fetch_one(&mut tx)
 528            .await?;
 529
 530            tx.commit().await?;
 531
 532            Ok(Invite {
 533                email_address: email_address.into(),
 534                email_confirmation_code,
 535            })
 536        })
 537        .await
 538    }
 539
 540    pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
 541        self.transact(|mut tx| async {
 542            let emails = invites
 543                .iter()
 544                .map(|s| s.email_address.as_str())
 545                .collect::<Vec<_>>();
 546            sqlx::query(
 547                "
 548                UPDATE signups
 549                SET email_confirmation_sent = TRUE
 550                WHERE email_address = ANY ($1)
 551                ",
 552            )
 553            .bind(&emails)
 554            .execute(&mut tx)
 555            .await?;
 556            tx.commit().await?;
 557            Ok(())
 558        })
 559        .await
 560    }
 561}
 562
 563impl<D> Db<D>
 564where
 565    Self: BeginTransaction<Database = D>,
 566    D: sqlx::Database + sqlx::migrate::MigrateDatabase,
 567    D::Connection: sqlx::migrate::Migrate,
 568    for<'a> <D as sqlx::database::HasArguments<'a>>::Arguments: sqlx::IntoArguments<'a, D>,
 569    for<'a> &'a mut D::Connection: sqlx::Executor<'a, Database = D>,
 570    for<'a, 'b> &'b mut sqlx::Transaction<'a, D>: sqlx::Executor<'b, Database = D>,
 571    D::QueryResult: RowsAffected,
 572    String: sqlx::Type<D>,
 573    i32: sqlx::Type<D>,
 574    i64: sqlx::Type<D>,
 575    bool: sqlx::Type<D>,
 576    str: sqlx::Type<D>,
 577    Uuid: sqlx::Type<D>,
 578    sqlx::types::Json<serde_json::Value>: sqlx::Type<D>,
 579    OffsetDateTime: sqlx::Type<D>,
 580    PrimitiveDateTime: sqlx::Type<D>,
 581    usize: sqlx::ColumnIndex<D::Row>,
 582    for<'a> &'a str: sqlx::ColumnIndex<D::Row>,
 583    for<'a> &'a str: sqlx::Encode<'a, D> + sqlx::Decode<'a, D>,
 584    for<'a> String: sqlx::Encode<'a, D> + sqlx::Decode<'a, D>,
 585    for<'a> Option<String>: sqlx::Encode<'a, D> + sqlx::Decode<'a, D>,
 586    for<'a> Option<&'a str>: sqlx::Encode<'a, D> + sqlx::Decode<'a, D>,
 587    for<'a> i32: sqlx::Encode<'a, D> + sqlx::Decode<'a, D>,
 588    for<'a> i64: sqlx::Encode<'a, D> + sqlx::Decode<'a, D>,
 589    for<'a> bool: sqlx::Encode<'a, D> + sqlx::Decode<'a, D>,
 590    for<'a> Uuid: sqlx::Encode<'a, D> + sqlx::Decode<'a, D>,
 591    for<'a> Option<ProjectId>: sqlx::Encode<'a, D> + sqlx::Decode<'a, D>,
 592    for<'a> sqlx::types::JsonValue: sqlx::Encode<'a, D> + sqlx::Decode<'a, D>,
 593    for<'a> OffsetDateTime: sqlx::Encode<'a, D> + sqlx::Decode<'a, D>,
 594    for<'a> PrimitiveDateTime: sqlx::Decode<'a, D> + sqlx::Decode<'a, D>,
 595{
 596    pub async fn migrate(
 597        &self,
 598        migrations_path: &Path,
 599        ignore_checksum_mismatch: bool,
 600    ) -> anyhow::Result<Vec<(Migration, Duration)>> {
 601        let migrations = MigrationSource::resolve(migrations_path)
 602            .await
 603            .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
 604
 605        let mut conn = self.pool.acquire().await?;
 606
 607        conn.ensure_migrations_table().await?;
 608        let applied_migrations: HashMap<_, _> = conn
 609            .list_applied_migrations()
 610            .await?
 611            .into_iter()
 612            .map(|m| (m.version, m))
 613            .collect();
 614
 615        let mut new_migrations = Vec::new();
 616        for migration in migrations {
 617            match applied_migrations.get(&migration.version) {
 618                Some(applied_migration) => {
 619                    if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
 620                    {
 621                        Err(anyhow!(
 622                            "checksum mismatch for applied migration {}",
 623                            migration.description
 624                        ))?;
 625                    }
 626                }
 627                None => {
 628                    let elapsed = conn.apply(&migration).await?;
 629                    new_migrations.push((migration, elapsed));
 630                }
 631            }
 632        }
 633
 634        Ok(new_migrations)
 635    }
 636
 637    pub fn fuzzy_like_string(string: &str) -> String {
 638        let mut result = String::with_capacity(string.len() * 2 + 1);
 639        for c in string.chars() {
 640            if c.is_alphanumeric() {
 641                result.push('%');
 642                result.push(c);
 643            }
 644        }
 645        result.push('%');
 646        result
 647    }
 648
 649    // users
 650
 651    pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
 652        self.transact(|tx| async {
 653            let mut tx = tx;
 654            let query = "SELECT * FROM users ORDER BY github_login ASC LIMIT $1 OFFSET $2";
 655            Ok(sqlx::query_as(query)
 656                .bind(limit as i32)
 657                .bind((page * limit) as i32)
 658                .fetch_all(&mut tx)
 659                .await?)
 660        })
 661        .await
 662    }
 663
 664    pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<User>> {
 665        self.transact(|tx| async {
 666            let mut tx = tx;
 667            let query = "
 668                SELECT users.*
 669                FROM users
 670                WHERE id = $1
 671                LIMIT 1
 672            ";
 673            Ok(sqlx::query_as(query)
 674                .bind(&id)
 675                .fetch_optional(&mut tx)
 676                .await?)
 677        })
 678        .await
 679    }
 680
 681    pub async fn get_users_with_no_invites(
 682        &self,
 683        invited_by_another_user: bool,
 684    ) -> Result<Vec<User>> {
 685        self.transact(|tx| async {
 686            let mut tx = tx;
 687            let query = format!(
 688                "
 689                SELECT users.*
 690                FROM users
 691                WHERE invite_count = 0
 692                AND inviter_id IS{} NULL
 693                ",
 694                if invited_by_another_user { " NOT" } else { "" }
 695            );
 696
 697            Ok(sqlx::query_as(&query).fetch_all(&mut tx).await?)
 698        })
 699        .await
 700    }
 701
 702    pub async fn get_user_by_github_account(
 703        &self,
 704        github_login: &str,
 705        github_user_id: Option<i32>,
 706    ) -> Result<Option<User>> {
 707        self.transact(|tx| async {
 708            let mut tx = tx;
 709            if let Some(github_user_id) = github_user_id {
 710                let mut user = sqlx::query_as::<_, User>(
 711                    "
 712                    UPDATE users
 713                    SET github_login = $1
 714                    WHERE github_user_id = $2
 715                    RETURNING *
 716                    ",
 717                )
 718                .bind(github_login)
 719                .bind(github_user_id)
 720                .fetch_optional(&mut tx)
 721                .await?;
 722
 723                if user.is_none() {
 724                    user = sqlx::query_as::<_, User>(
 725                        "
 726                        UPDATE users
 727                        SET github_user_id = $1
 728                        WHERE github_login = $2
 729                        RETURNING *
 730                        ",
 731                    )
 732                    .bind(github_user_id)
 733                    .bind(github_login)
 734                    .fetch_optional(&mut tx)
 735                    .await?;
 736                }
 737
 738                Ok(user)
 739            } else {
 740                let user = sqlx::query_as(
 741                    "
 742                    SELECT * FROM users
 743                    WHERE github_login = $1
 744                    LIMIT 1
 745                    ",
 746                )
 747                .bind(github_login)
 748                .fetch_optional(&mut tx)
 749                .await?;
 750                Ok(user)
 751            }
 752        })
 753        .await
 754    }
 755
 756    pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
 757        self.transact(|mut tx| async {
 758            let query = "UPDATE users SET admin = $1 WHERE id = $2";
 759            sqlx::query(query)
 760                .bind(is_admin)
 761                .bind(id.0)
 762                .execute(&mut tx)
 763                .await?;
 764            tx.commit().await?;
 765            Ok(())
 766        })
 767        .await
 768    }
 769
 770    pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
 771        self.transact(|mut tx| async move {
 772            let query = "UPDATE users SET connected_once = $1 WHERE id = $2";
 773            sqlx::query(query)
 774                .bind(connected_once)
 775                .bind(id.0)
 776                .execute(&mut tx)
 777                .await?;
 778            tx.commit().await?;
 779            Ok(())
 780        })
 781        .await
 782    }
 783
 784    pub async fn destroy_user(&self, id: UserId) -> Result<()> {
 785        self.transact(|mut tx| async move {
 786            let query = "DELETE FROM access_tokens WHERE user_id = $1;";
 787            sqlx::query(query)
 788                .bind(id.0)
 789                .execute(&mut tx)
 790                .await
 791                .map(drop)?;
 792            let query = "DELETE FROM users WHERE id = $1;";
 793            sqlx::query(query).bind(id.0).execute(&mut tx).await?;
 794            tx.commit().await?;
 795            Ok(())
 796        })
 797        .await
 798    }
 799
 800    // signups
 801
 802    pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
 803        self.transact(|mut tx| async move {
 804            Ok(sqlx::query_as(
 805                "
 806                SELECT
 807                    COUNT(*) as count,
 808                    COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
 809                    COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
 810                    COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
 811                    COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
 812                FROM (
 813                    SELECT *
 814                    FROM signups
 815                    WHERE
 816                        NOT email_confirmation_sent
 817                ) AS unsent
 818                ",
 819            )
 820            .fetch_one(&mut tx)
 821            .await?)
 822        })
 823        .await
 824    }
 825
 826    pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
 827        self.transact(|mut tx| async move {
 828            Ok(sqlx::query_as(
 829                "
 830                SELECT
 831                    email_address, email_confirmation_code
 832                FROM signups
 833                WHERE
 834                    NOT email_confirmation_sent AND
 835                    (platform_mac OR platform_unknown)
 836                LIMIT $1
 837                ",
 838            )
 839            .bind(count as i32)
 840            .fetch_all(&mut tx)
 841            .await?)
 842        })
 843        .await
 844    }
 845
 846    // invite codes
 847
 848    pub async fn set_invite_count_for_user(&self, id: UserId, count: u32) -> Result<()> {
 849        self.transact(|mut tx| async move {
 850            if count > 0 {
 851                sqlx::query(
 852                    "
 853                    UPDATE users
 854                    SET invite_code = $1
 855                    WHERE id = $2 AND invite_code IS NULL
 856                ",
 857                )
 858                .bind(random_invite_code())
 859                .bind(id)
 860                .execute(&mut tx)
 861                .await?;
 862            }
 863
 864            sqlx::query(
 865                "
 866                UPDATE users
 867                SET invite_count = $1
 868                WHERE id = $2
 869                ",
 870            )
 871            .bind(count as i32)
 872            .bind(id)
 873            .execute(&mut tx)
 874            .await?;
 875            tx.commit().await?;
 876            Ok(())
 877        })
 878        .await
 879    }
 880
 881    pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, u32)>> {
 882        self.transact(|mut tx| async move {
 883            let result: Option<(String, i32)> = sqlx::query_as(
 884                "
 885                    SELECT invite_code, invite_count
 886                    FROM users
 887                    WHERE id = $1 AND invite_code IS NOT NULL 
 888                ",
 889            )
 890            .bind(id)
 891            .fetch_optional(&mut tx)
 892            .await?;
 893            if let Some((code, count)) = result {
 894                Ok(Some((code, count.try_into().map_err(anyhow::Error::new)?)))
 895            } else {
 896                Ok(None)
 897            }
 898        })
 899        .await
 900    }
 901
 902    pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
 903        self.transact(|tx| async {
 904            let mut tx = tx;
 905            sqlx::query_as(
 906                "
 907                    SELECT *
 908                    FROM users
 909                    WHERE invite_code = $1
 910                ",
 911            )
 912            .bind(code)
 913            .fetch_optional(&mut tx)
 914            .await?
 915            .ok_or_else(|| {
 916                Error::Http(
 917                    StatusCode::NOT_FOUND,
 918                    "that invite code does not exist".to_string(),
 919                )
 920            })
 921        })
 922        .await
 923    }
 924
 925    pub async fn create_room(
 926        &self,
 927        user_id: UserId,
 928        connection_id: ConnectionId,
 929    ) -> Result<proto::Room> {
 930        self.transact(|mut tx| async move {
 931            let live_kit_room = nanoid::nanoid!(30);
 932            let room_id = sqlx::query_scalar(
 933                "
 934                INSERT INTO rooms (live_kit_room, version)
 935                VALUES ($1, $2)
 936                RETURNING id
 937                ",
 938            )
 939            .bind(&live_kit_room)
 940            .bind(0)
 941            .fetch_one(&mut tx)
 942            .await
 943            .map(RoomId)?;
 944
 945            sqlx::query(
 946                "
 947                INSERT INTO room_participants (room_id, user_id, answering_connection_id, calling_user_id, calling_connection_id)
 948                VALUES ($1, $2, $3, $4, $5)
 949                ",
 950            )
 951            .bind(room_id)
 952            .bind(user_id)
 953            .bind(connection_id.0 as i32)
 954            .bind(user_id)
 955            .bind(connection_id.0 as i32)
 956            .execute(&mut tx)
 957            .await?;
 958
 959            self.commit_room_transaction(room_id, tx).await
 960        }).await
 961    }
 962
 963    pub async fn call(
 964        &self,
 965        room_id: RoomId,
 966        calling_user_id: UserId,
 967        calling_connection_id: ConnectionId,
 968        called_user_id: UserId,
 969        initial_project_id: Option<ProjectId>,
 970    ) -> Result<(proto::Room, proto::IncomingCall)> {
 971        self.transact(|mut tx| async move {
 972            sqlx::query(
 973                "
 974                INSERT INTO room_participants (room_id, user_id, calling_user_id, calling_connection_id, initial_project_id)
 975                VALUES ($1, $2, $3, $4, $5)
 976                ",
 977            )
 978            .bind(room_id)
 979            .bind(called_user_id)
 980            .bind(calling_user_id)
 981            .bind(calling_connection_id.0 as i32)
 982            .bind(initial_project_id)
 983            .execute(&mut tx)
 984            .await?;
 985
 986            let room = self.commit_room_transaction(room_id, tx).await?;
 987            let incoming_call = Self::build_incoming_call(&room, called_user_id)
 988                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
 989            Ok((room, incoming_call))
 990        }).await
 991    }
 992
 993    pub async fn incoming_call_for_user(
 994        &self,
 995        user_id: UserId,
 996    ) -> Result<Option<proto::IncomingCall>> {
 997        self.transact(|mut tx| async move {
 998            let room_id = sqlx::query_scalar::<_, RoomId>(
 999                "
1000                SELECT room_id
1001                FROM room_participants
1002                WHERE user_id = $1 AND answering_connection_id IS NULL
1003                ",
1004            )
1005            .bind(user_id)
1006            .fetch_optional(&mut tx)
1007            .await?;
1008
1009            if let Some(room_id) = room_id {
1010                let room = self.get_room(room_id, &mut tx).await?;
1011                Ok(Self::build_incoming_call(&room, user_id))
1012            } else {
1013                Ok(None)
1014            }
1015        })
1016        .await
1017    }
1018
1019    fn build_incoming_call(
1020        room: &proto::Room,
1021        called_user_id: UserId,
1022    ) -> Option<proto::IncomingCall> {
1023        let pending_participant = room
1024            .pending_participants
1025            .iter()
1026            .find(|participant| participant.user_id == called_user_id.to_proto())?;
1027
1028        Some(proto::IncomingCall {
1029            room_id: room.id,
1030            calling_user_id: pending_participant.calling_user_id,
1031            participant_user_ids: room
1032                .participants
1033                .iter()
1034                .map(|participant| participant.user_id)
1035                .collect(),
1036            initial_project: room.participants.iter().find_map(|participant| {
1037                let initial_project_id = pending_participant.initial_project_id?;
1038                participant
1039                    .projects
1040                    .iter()
1041                    .find(|project| project.id == initial_project_id)
1042                    .cloned()
1043            }),
1044        })
1045    }
1046
1047    pub async fn call_failed(
1048        &self,
1049        room_id: RoomId,
1050        called_user_id: UserId,
1051    ) -> Result<proto::Room> {
1052        self.transact(|mut tx| async move {
1053            sqlx::query(
1054                "
1055                DELETE FROM room_participants
1056                WHERE room_id = $1 AND user_id = $2
1057                ",
1058            )
1059            .bind(room_id)
1060            .bind(called_user_id)
1061            .execute(&mut tx)
1062            .await?;
1063
1064            self.commit_room_transaction(room_id, tx).await
1065        })
1066        .await
1067    }
1068
1069    pub async fn decline_call(
1070        &self,
1071        expected_room_id: Option<RoomId>,
1072        user_id: UserId,
1073    ) -> Result<proto::Room> {
1074        self.transact(|mut tx| async move {
1075            let room_id = sqlx::query_scalar(
1076                "
1077                DELETE FROM room_participants
1078                WHERE user_id = $1 AND answering_connection_id IS NULL
1079                RETURNING room_id
1080                ",
1081            )
1082            .bind(user_id)
1083            .fetch_one(&mut tx)
1084            .await?;
1085            if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) {
1086                return Err(anyhow!("declining call on unexpected room"))?;
1087            }
1088
1089            self.commit_room_transaction(room_id, tx).await
1090        })
1091        .await
1092    }
1093
1094    pub async fn cancel_call(
1095        &self,
1096        expected_room_id: Option<RoomId>,
1097        calling_connection_id: ConnectionId,
1098        called_user_id: UserId,
1099    ) -> Result<proto::Room> {
1100        self.transact(|mut tx| async move {
1101            let room_id = sqlx::query_scalar(
1102                "
1103                DELETE FROM room_participants
1104                WHERE user_id = $1 AND calling_connection_id = $2 AND answering_connection_id IS NULL
1105                RETURNING room_id
1106                ",
1107            )
1108            .bind(called_user_id)
1109            .bind(calling_connection_id.0 as i32)
1110            .fetch_one(&mut tx)
1111            .await?;
1112            if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) {
1113                return Err(anyhow!("canceling call on unexpected room"))?;
1114            }
1115
1116            self.commit_room_transaction(room_id, tx).await
1117        }).await
1118    }
1119
1120    pub async fn join_room(
1121        &self,
1122        room_id: RoomId,
1123        user_id: UserId,
1124        connection_id: ConnectionId,
1125    ) -> Result<proto::Room> {
1126        self.transact(|mut tx| async move {
1127            sqlx::query(
1128                "
1129                UPDATE room_participants 
1130                SET answering_connection_id = $1
1131                WHERE room_id = $2 AND user_id = $3
1132                RETURNING 1
1133                ",
1134            )
1135            .bind(connection_id.0 as i32)
1136            .bind(room_id)
1137            .bind(user_id)
1138            .fetch_one(&mut tx)
1139            .await?;
1140            self.commit_room_transaction(room_id, tx).await
1141        })
1142        .await
1143    }
1144
1145    pub async fn leave_room(&self, connection_id: ConnectionId) -> Result<Option<LeftRoom>> {
1146        self.transact(|mut tx| async move {
1147            // Leave room.
1148            let room_id = sqlx::query_scalar::<_, RoomId>(
1149                "
1150                DELETE FROM room_participants
1151                WHERE answering_connection_id = $1
1152                RETURNING room_id
1153                ",
1154            )
1155            .bind(connection_id.0 as i32)
1156            .fetch_optional(&mut tx)
1157            .await?;
1158
1159            if let Some(room_id) = room_id {
1160                // Cancel pending calls initiated by the leaving user.
1161                let canceled_calls_to_user_ids: Vec<UserId> = sqlx::query_scalar(
1162                    "
1163                    DELETE FROM room_participants
1164                    WHERE calling_connection_id = $1 AND answering_connection_id IS NULL
1165                    RETURNING user_id
1166                    ",
1167                )
1168                .bind(connection_id.0 as i32)
1169                .fetch_all(&mut tx)
1170                .await?;
1171
1172                let project_ids = sqlx::query_scalar::<_, ProjectId>(
1173                    "
1174                    SELECT project_id
1175                    FROM project_collaborators
1176                    WHERE connection_id = $1
1177                    ",
1178                )
1179                .bind(connection_id.0 as i32)
1180                .fetch_all(&mut tx)
1181                .await?;
1182
1183                // Leave projects.
1184                let mut left_projects = HashMap::default();
1185                if !project_ids.is_empty() {
1186                    let mut params = "?,".repeat(project_ids.len());
1187                    params.pop();
1188                    let query = format!(
1189                        "
1190                        SELECT *
1191                        FROM project_collaborators
1192                        WHERE project_id IN ({params})
1193                    "
1194                    );
1195                    let mut query = sqlx::query_as::<_, ProjectCollaborator>(&query);
1196                    for project_id in project_ids {
1197                        query = query.bind(project_id);
1198                    }
1199
1200                    let mut project_collaborators = query.fetch(&mut tx);
1201                    while let Some(collaborator) = project_collaborators.next().await {
1202                        let collaborator = collaborator?;
1203                        let left_project =
1204                            left_projects
1205                                .entry(collaborator.project_id)
1206                                .or_insert(LeftProject {
1207                                    id: collaborator.project_id,
1208                                    host_user_id: Default::default(),
1209                                    connection_ids: Default::default(),
1210                                    host_connection_id: Default::default(),
1211                                });
1212
1213                        let collaborator_connection_id =
1214                            ConnectionId(collaborator.connection_id as u32);
1215                        if collaborator_connection_id != connection_id {
1216                            left_project.connection_ids.push(collaborator_connection_id);
1217                        }
1218
1219                        if collaborator.is_host {
1220                            left_project.host_user_id = collaborator.user_id;
1221                            left_project.host_connection_id =
1222                                ConnectionId(collaborator.connection_id as u32);
1223                        }
1224                    }
1225                }
1226                sqlx::query(
1227                    "
1228                    DELETE FROM project_collaborators
1229                    WHERE connection_id = $1
1230                    ",
1231                )
1232                .bind(connection_id.0 as i32)
1233                .execute(&mut tx)
1234                .await?;
1235
1236                // Unshare projects.
1237                sqlx::query(
1238                    "
1239                    DELETE FROM projects
1240                    WHERE room_id = $1 AND host_connection_id = $2
1241                    ",
1242                )
1243                .bind(room_id)
1244                .bind(connection_id.0 as i32)
1245                .execute(&mut tx)
1246                .await?;
1247
1248                let room = self.commit_room_transaction(room_id, tx).await?;
1249                Ok(Some(LeftRoom {
1250                    room,
1251                    left_projects,
1252                    canceled_calls_to_user_ids,
1253                }))
1254            } else {
1255                Ok(None)
1256            }
1257        })
1258        .await
1259    }
1260
1261    pub async fn update_room_participant_location(
1262        &self,
1263        room_id: RoomId,
1264        connection_id: ConnectionId,
1265        location: proto::ParticipantLocation,
1266    ) -> Result<proto::Room> {
1267        self.transact(|tx| async {
1268            let mut tx = tx;
1269            let location_kind;
1270            let location_project_id;
1271            match location
1272                .variant
1273                .as_ref()
1274                .ok_or_else(|| anyhow!("invalid location"))?
1275            {
1276                proto::participant_location::Variant::SharedProject(project) => {
1277                    location_kind = 0;
1278                    location_project_id = Some(ProjectId::from_proto(project.id));
1279                }
1280                proto::participant_location::Variant::UnsharedProject(_) => {
1281                    location_kind = 1;
1282                    location_project_id = None;
1283                }
1284                proto::participant_location::Variant::External(_) => {
1285                    location_kind = 2;
1286                    location_project_id = None;
1287                }
1288            }
1289
1290            sqlx::query(
1291                "
1292                UPDATE room_participants
1293                SET location_kind = $1, location_project_id = $2
1294                WHERE room_id = $3 AND answering_connection_id = $4
1295                RETURNING 1
1296                ",
1297            )
1298            .bind(location_kind)
1299            .bind(location_project_id)
1300            .bind(room_id)
1301            .bind(connection_id.0 as i32)
1302            .fetch_one(&mut tx)
1303            .await?;
1304
1305            self.commit_room_transaction(room_id, tx).await
1306        })
1307        .await
1308    }
1309
1310    async fn commit_room_transaction(
1311        &self,
1312        room_id: RoomId,
1313        mut tx: sqlx::Transaction<'_, D>,
1314    ) -> Result<proto::Room> {
1315        sqlx::query(
1316            "
1317            UPDATE rooms
1318            SET version = version + 1
1319            WHERE id = $1
1320            ",
1321        )
1322        .bind(room_id)
1323        .execute(&mut tx)
1324        .await?;
1325        let room = self.get_room(room_id, &mut tx).await?;
1326        tx.commit().await?;
1327
1328        Ok(room)
1329    }
1330
1331    async fn get_guest_connection_ids(
1332        &self,
1333        project_id: ProjectId,
1334        tx: &mut sqlx::Transaction<'_, D>,
1335    ) -> Result<Vec<ConnectionId>> {
1336        let mut guest_connection_ids = Vec::new();
1337        let mut db_guest_connection_ids = sqlx::query_scalar::<_, i32>(
1338            "
1339            SELECT connection_id
1340            FROM project_collaborators
1341            WHERE project_id = $1 AND is_host = FALSE
1342            ",
1343        )
1344        .bind(project_id)
1345        .fetch(tx);
1346        while let Some(connection_id) = db_guest_connection_ids.next().await {
1347            guest_connection_ids.push(ConnectionId(connection_id? as u32));
1348        }
1349        Ok(guest_connection_ids)
1350    }
1351
1352    async fn get_room(
1353        &self,
1354        room_id: RoomId,
1355        tx: &mut sqlx::Transaction<'_, D>,
1356    ) -> Result<proto::Room> {
1357        let room: Room = sqlx::query_as(
1358            "
1359            SELECT *
1360            FROM rooms
1361            WHERE id = $1
1362            ",
1363        )
1364        .bind(room_id)
1365        .fetch_one(&mut *tx)
1366        .await?;
1367
1368        let mut db_participants =
1369            sqlx::query_as::<_, (UserId, Option<i32>, Option<i32>, Option<ProjectId>, UserId, Option<ProjectId>)>(
1370                "
1371                SELECT user_id, answering_connection_id, location_kind, location_project_id, calling_user_id, initial_project_id
1372                FROM room_participants
1373                WHERE room_id = $1
1374                ",
1375            )
1376            .bind(room_id)
1377            .fetch(&mut *tx);
1378
1379        let mut participants = HashMap::default();
1380        let mut pending_participants = Vec::new();
1381        while let Some(participant) = db_participants.next().await {
1382            let (
1383                user_id,
1384                answering_connection_id,
1385                location_kind,
1386                location_project_id,
1387                calling_user_id,
1388                initial_project_id,
1389            ) = participant?;
1390            if let Some(answering_connection_id) = answering_connection_id {
1391                let location = match (location_kind, location_project_id) {
1392                    (Some(0), Some(project_id)) => {
1393                        Some(proto::participant_location::Variant::SharedProject(
1394                            proto::participant_location::SharedProject {
1395                                id: project_id.to_proto(),
1396                            },
1397                        ))
1398                    }
1399                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1400                        Default::default(),
1401                    )),
1402                    _ => Some(proto::participant_location::Variant::External(
1403                        Default::default(),
1404                    )),
1405                };
1406                participants.insert(
1407                    answering_connection_id,
1408                    proto::Participant {
1409                        user_id: user_id.to_proto(),
1410                        peer_id: answering_connection_id as u32,
1411                        projects: Default::default(),
1412                        location: Some(proto::ParticipantLocation { variant: location }),
1413                    },
1414                );
1415            } else {
1416                pending_participants.push(proto::PendingParticipant {
1417                    user_id: user_id.to_proto(),
1418                    calling_user_id: calling_user_id.to_proto(),
1419                    initial_project_id: initial_project_id.map(|id| id.to_proto()),
1420                });
1421            }
1422        }
1423        drop(db_participants);
1424
1425        let mut rows = sqlx::query_as::<_, (i32, ProjectId, Option<String>)>(
1426            "
1427            SELECT host_connection_id, projects.id, worktrees.root_name
1428            FROM projects
1429            LEFT JOIN worktrees ON projects.id = worktrees.project_id
1430            WHERE room_id = $1
1431            ",
1432        )
1433        .bind(room_id)
1434        .fetch(&mut *tx);
1435
1436        while let Some(row) = rows.next().await {
1437            let (connection_id, project_id, worktree_root_name) = row?;
1438            if let Some(participant) = participants.get_mut(&connection_id) {
1439                let project = if let Some(project) = participant
1440                    .projects
1441                    .iter_mut()
1442                    .find(|project| project.id == project_id.to_proto())
1443                {
1444                    project
1445                } else {
1446                    participant.projects.push(proto::ParticipantProject {
1447                        id: project_id.to_proto(),
1448                        worktree_root_names: Default::default(),
1449                    });
1450                    participant.projects.last_mut().unwrap()
1451                };
1452                project.worktree_root_names.extend(worktree_root_name);
1453            }
1454        }
1455
1456        Ok(proto::Room {
1457            id: room.id.to_proto(),
1458            version: room.version as u64,
1459            live_kit_room: room.live_kit_room,
1460            participants: participants.into_values().collect(),
1461            pending_participants,
1462        })
1463    }
1464
1465    // projects
1466
1467    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
1468        self.transact(|mut tx| async move {
1469            Ok(sqlx::query_scalar::<_, i32>(
1470                "
1471                SELECT COUNT(*)
1472                FROM projects, users
1473                WHERE projects.host_user_id = users.id AND users.admin IS FALSE
1474                ",
1475            )
1476            .fetch_one(&mut tx)
1477            .await? as usize)
1478        })
1479        .await
1480    }
1481
1482    pub async fn share_project(
1483        &self,
1484        expected_room_id: RoomId,
1485        connection_id: ConnectionId,
1486        worktrees: &[proto::WorktreeMetadata],
1487    ) -> Result<(ProjectId, proto::Room)> {
1488        self.transact(|mut tx| async move {
1489            let (room_id, user_id) = sqlx::query_as::<_, (RoomId, UserId)>(
1490                "
1491                SELECT room_id, user_id
1492                FROM room_participants
1493                WHERE answering_connection_id = $1
1494                ",
1495            )
1496            .bind(connection_id.0 as i32)
1497            .fetch_one(&mut tx)
1498            .await?;
1499            if room_id != expected_room_id {
1500                return Err(anyhow!("shared project on unexpected room"))?;
1501            }
1502
1503            let project_id: ProjectId = sqlx::query_scalar(
1504                "
1505                INSERT INTO projects (room_id, host_user_id, host_connection_id)
1506                VALUES ($1, $2, $3)
1507                RETURNING id
1508                ",
1509            )
1510            .bind(room_id)
1511            .bind(user_id)
1512            .bind(connection_id.0 as i32)
1513            .fetch_one(&mut tx)
1514            .await?;
1515
1516            if !worktrees.is_empty() {
1517                let mut params = "(?, ?, ?, ?, ?, ?, ?),".repeat(worktrees.len());
1518                params.pop();
1519                let query = format!(
1520                    "
1521                    INSERT INTO worktrees (
1522                        project_id,
1523                        id,
1524                        root_name,
1525                        abs_path,
1526                        visible,
1527                        scan_id,
1528                        is_complete
1529                    )
1530                    VALUES {params}
1531                    "
1532                );
1533
1534                let mut query = sqlx::query(&query);
1535                for worktree in worktrees {
1536                    query = query
1537                        .bind(project_id)
1538                        .bind(worktree.id as i32)
1539                        .bind(&worktree.root_name)
1540                        .bind(&worktree.abs_path)
1541                        .bind(worktree.visible)
1542                        .bind(0)
1543                        .bind(false);
1544                }
1545                query.execute(&mut tx).await?;
1546            }
1547
1548            sqlx::query(
1549                "
1550                INSERT INTO project_collaborators (
1551                    project_id,
1552                    connection_id,
1553                    user_id,
1554                    replica_id,
1555                    is_host
1556                )
1557                VALUES ($1, $2, $3, $4, $5)
1558                ",
1559            )
1560            .bind(project_id)
1561            .bind(connection_id.0 as i32)
1562            .bind(user_id)
1563            .bind(0)
1564            .bind(true)
1565            .execute(&mut tx)
1566            .await?;
1567
1568            let room = self.commit_room_transaction(room_id, tx).await?;
1569            Ok((project_id, room))
1570        })
1571        .await
1572    }
1573
1574    pub async fn unshare_project(
1575        &self,
1576        project_id: ProjectId,
1577        connection_id: ConnectionId,
1578    ) -> Result<(proto::Room, Vec<ConnectionId>)> {
1579        self.transact(|mut tx| async move {
1580            let guest_connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?;
1581            let room_id: RoomId = sqlx::query_scalar(
1582                "
1583                DELETE FROM projects
1584                WHERE id = $1 AND host_connection_id = $2
1585                RETURNING room_id
1586                ",
1587            )
1588            .bind(project_id)
1589            .bind(connection_id.0 as i32)
1590            .fetch_one(&mut tx)
1591            .await?;
1592            let room = self.commit_room_transaction(room_id, tx).await?;
1593
1594            Ok((room, guest_connection_ids))
1595        })
1596        .await
1597    }
1598
1599    pub async fn update_project(
1600        &self,
1601        project_id: ProjectId,
1602        connection_id: ConnectionId,
1603        worktrees: &[proto::WorktreeMetadata],
1604    ) -> Result<(proto::Room, Vec<ConnectionId>)> {
1605        self.transact(|mut tx| async move {
1606            let room_id: RoomId = sqlx::query_scalar(
1607                "
1608                SELECT room_id
1609                FROM projects
1610                WHERE id = $1 AND host_connection_id = $2
1611                ",
1612            )
1613            .bind(project_id)
1614            .bind(connection_id.0 as i32)
1615            .fetch_one(&mut tx)
1616            .await?;
1617
1618            if !worktrees.is_empty() {
1619                let mut params = "(?, ?, ?, ?, ?, ?, ?),".repeat(worktrees.len());
1620                params.pop();
1621                let query = format!(
1622                    "
1623                    INSERT INTO worktrees (
1624                        project_id,
1625                        id,
1626                        root_name,
1627                        abs_path,
1628                        visible,
1629                        scan_id,
1630                        is_complete
1631                    )
1632                    VALUES {params}
1633                    ON CONFLICT (project_id, id) DO UPDATE SET root_name = excluded.root_name
1634                    "
1635                );
1636
1637                let mut query = sqlx::query(&query);
1638                for worktree in worktrees {
1639                    query = query
1640                        .bind(project_id)
1641                        .bind(worktree.id as i32)
1642                        .bind(&worktree.root_name)
1643                        .bind(&worktree.abs_path)
1644                        .bind(worktree.visible)
1645                        .bind(0)
1646                        .bind(false)
1647                }
1648                query.execute(&mut tx).await?;
1649            }
1650
1651            let mut params = "?,".repeat(worktrees.len());
1652            if !worktrees.is_empty() {
1653                params.pop();
1654            }
1655            let query = format!(
1656                "
1657                DELETE FROM worktrees
1658                WHERE project_id = ? AND id NOT IN ({params})
1659                ",
1660            );
1661
1662            let mut query = sqlx::query(&query).bind(project_id);
1663            for worktree in worktrees {
1664                query = query.bind(WorktreeId(worktree.id as i32));
1665            }
1666            query.execute(&mut tx).await?;
1667
1668            let guest_connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?;
1669            let room = self.commit_room_transaction(room_id, tx).await?;
1670
1671            Ok((room, guest_connection_ids))
1672        })
1673        .await
1674    }
1675
1676    pub async fn update_worktree(
1677        &self,
1678        update: &proto::UpdateWorktree,
1679        connection_id: ConnectionId,
1680    ) -> Result<Vec<ConnectionId>> {
1681        self.transact(|mut tx| async move {
1682            let project_id = ProjectId::from_proto(update.project_id);
1683            let worktree_id = WorktreeId::from_proto(update.worktree_id);
1684
1685            // Ensure the update comes from the host.
1686            sqlx::query(
1687                "
1688                SELECT 1
1689                FROM projects
1690                WHERE id = $1 AND host_connection_id = $2
1691                ",
1692            )
1693            .bind(project_id)
1694            .bind(connection_id.0 as i32)
1695            .fetch_one(&mut tx)
1696            .await?;
1697
1698            // Update metadata.
1699            sqlx::query(
1700                "
1701                UPDATE worktrees
1702                SET
1703                    root_name = $1,
1704                    scan_id = $2,
1705                    is_complete = $3,
1706                    abs_path = $4
1707                WHERE project_id = $5 AND id = $6
1708                RETURNING 1
1709                ",
1710            )
1711            .bind(&update.root_name)
1712            .bind(update.scan_id as i64)
1713            .bind(update.is_last_update)
1714            .bind(&update.abs_path)
1715            .bind(project_id)
1716            .bind(worktree_id)
1717            .fetch_one(&mut tx)
1718            .await?;
1719
1720            if !update.updated_entries.is_empty() {
1721                let mut params =
1722                    "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?),".repeat(update.updated_entries.len());
1723                params.pop();
1724
1725                let query = format!(
1726                    "
1727                    INSERT INTO worktree_entries (
1728                        project_id, 
1729                        worktree_id, 
1730                        id, 
1731                        is_dir, 
1732                        path, 
1733                        inode,
1734                        mtime_seconds, 
1735                        mtime_nanos, 
1736                        is_symlink, 
1737                        is_ignored
1738                    )
1739                    VALUES {params}
1740                    ON CONFLICT (project_id, worktree_id, id) DO UPDATE SET
1741                        is_dir = excluded.is_dir,
1742                        path = excluded.path,
1743                        inode = excluded.inode,
1744                        mtime_seconds = excluded.mtime_seconds,
1745                        mtime_nanos = excluded.mtime_nanos,
1746                        is_symlink = excluded.is_symlink,
1747                        is_ignored = excluded.is_ignored
1748                    "
1749                );
1750                let mut query = sqlx::query(&query);
1751                for entry in &update.updated_entries {
1752                    let mtime = entry.mtime.clone().unwrap_or_default();
1753                    query = query
1754                        .bind(project_id)
1755                        .bind(worktree_id)
1756                        .bind(entry.id as i64)
1757                        .bind(entry.is_dir)
1758                        .bind(&entry.path)
1759                        .bind(entry.inode as i64)
1760                        .bind(mtime.seconds as i64)
1761                        .bind(mtime.nanos as i32)
1762                        .bind(entry.is_symlink)
1763                        .bind(entry.is_ignored);
1764                }
1765                query.execute(&mut tx).await?;
1766            }
1767
1768            if !update.removed_entries.is_empty() {
1769                let mut params = "?,".repeat(update.removed_entries.len());
1770                params.pop();
1771                let query = format!(
1772                    "
1773                    DELETE FROM worktree_entries
1774                    WHERE project_id = ? AND worktree_id = ? AND id IN ({params})
1775                    "
1776                );
1777
1778                let mut query = sqlx::query(&query).bind(project_id).bind(worktree_id);
1779                for entry_id in &update.removed_entries {
1780                    query = query.bind(*entry_id as i64);
1781                }
1782                query.execute(&mut tx).await?;
1783            }
1784
1785            let connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?;
1786            tx.commit().await?;
1787            Ok(connection_ids)
1788        })
1789        .await
1790    }
1791
1792    pub async fn update_diagnostic_summary(
1793        &self,
1794        update: &proto::UpdateDiagnosticSummary,
1795        connection_id: ConnectionId,
1796    ) -> Result<Vec<ConnectionId>> {
1797        self.transact(|mut tx| async {
1798            let project_id = ProjectId::from_proto(update.project_id);
1799            let worktree_id = WorktreeId::from_proto(update.worktree_id);
1800            let summary = update
1801                .summary
1802                .as_ref()
1803                .ok_or_else(|| anyhow!("invalid summary"))?;
1804
1805            // Ensure the update comes from the host.
1806            sqlx::query(
1807                "
1808                SELECT 1
1809                FROM projects
1810                WHERE id = $1 AND host_connection_id = $2
1811                ",
1812            )
1813            .bind(project_id)
1814            .bind(connection_id.0 as i32)
1815            .fetch_one(&mut tx)
1816            .await?;
1817
1818            // Update summary.
1819            sqlx::query(
1820                "
1821                INSERT INTO worktree_diagnostic_summaries (
1822                    project_id,
1823                    worktree_id,
1824                    path,
1825                    language_server_id,
1826                    error_count,
1827                    warning_count
1828                )
1829                VALUES ($1, $2, $3, $4, $5, $6)
1830                ON CONFLICT (project_id, worktree_id, path) DO UPDATE SET
1831                    language_server_id = excluded.language_server_id,
1832                    error_count = excluded.error_count, 
1833                    warning_count = excluded.warning_count
1834                ",
1835            )
1836            .bind(project_id)
1837            .bind(worktree_id)
1838            .bind(&summary.path)
1839            .bind(summary.language_server_id as i64)
1840            .bind(summary.error_count as i32)
1841            .bind(summary.warning_count as i32)
1842            .execute(&mut tx)
1843            .await?;
1844
1845            let connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?;
1846            tx.commit().await?;
1847            Ok(connection_ids)
1848        })
1849        .await
1850    }
1851
1852    pub async fn start_language_server(
1853        &self,
1854        update: &proto::StartLanguageServer,
1855        connection_id: ConnectionId,
1856    ) -> Result<Vec<ConnectionId>> {
1857        self.transact(|mut tx| async {
1858            let project_id = ProjectId::from_proto(update.project_id);
1859            let server = update
1860                .server
1861                .as_ref()
1862                .ok_or_else(|| anyhow!("invalid language server"))?;
1863
1864            // Ensure the update comes from the host.
1865            sqlx::query(
1866                "
1867                SELECT 1
1868                FROM projects
1869                WHERE id = $1 AND host_connection_id = $2
1870                ",
1871            )
1872            .bind(project_id)
1873            .bind(connection_id.0 as i32)
1874            .fetch_one(&mut tx)
1875            .await?;
1876
1877            // Add the newly-started language server.
1878            sqlx::query(
1879                "
1880                INSERT INTO language_servers (project_id, id, name)
1881                VALUES ($1, $2, $3)
1882                ON CONFLICT (project_id, id) DO UPDATE SET
1883                    name = excluded.name
1884                ",
1885            )
1886            .bind(project_id)
1887            .bind(server.id as i64)
1888            .bind(&server.name)
1889            .execute(&mut tx)
1890            .await?;
1891
1892            let connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?;
1893            tx.commit().await?;
1894            Ok(connection_ids)
1895        })
1896        .await
1897    }
1898
1899    pub async fn join_project(
1900        &self,
1901        project_id: ProjectId,
1902        connection_id: ConnectionId,
1903    ) -> Result<(Project, ReplicaId)> {
1904        self.transact(|mut tx| async move {
1905            let (room_id, user_id) = sqlx::query_as::<_, (RoomId, UserId)>(
1906                "
1907                SELECT room_id, user_id
1908                FROM room_participants
1909                WHERE answering_connection_id = $1
1910                ",
1911            )
1912            .bind(connection_id.0 as i32)
1913            .fetch_one(&mut tx)
1914            .await?;
1915
1916            // Ensure project id was shared on this room.
1917            sqlx::query(
1918                "
1919                SELECT 1
1920                FROM projects
1921                WHERE id = $1 AND room_id = $2
1922                ",
1923            )
1924            .bind(project_id)
1925            .bind(room_id)
1926            .fetch_one(&mut tx)
1927            .await?;
1928
1929            let mut collaborators = sqlx::query_as::<_, ProjectCollaborator>(
1930                "
1931                SELECT *
1932                FROM project_collaborators
1933                WHERE project_id = $1
1934                ",
1935            )
1936            .bind(project_id)
1937            .fetch_all(&mut tx)
1938            .await?;
1939            let replica_ids = collaborators
1940                .iter()
1941                .map(|c| c.replica_id)
1942                .collect::<HashSet<_>>();
1943            let mut replica_id = ReplicaId(1);
1944            while replica_ids.contains(&replica_id) {
1945                replica_id.0 += 1;
1946            }
1947            let new_collaborator = ProjectCollaborator {
1948                project_id,
1949                connection_id: connection_id.0 as i32,
1950                user_id,
1951                replica_id,
1952                is_host: false,
1953            };
1954
1955            sqlx::query(
1956                "
1957                INSERT INTO project_collaborators (
1958                    project_id,
1959                    connection_id,
1960                    user_id,
1961                    replica_id,
1962                    is_host
1963                )
1964                VALUES ($1, $2, $3, $4, $5)
1965                ",
1966            )
1967            .bind(new_collaborator.project_id)
1968            .bind(new_collaborator.connection_id)
1969            .bind(new_collaborator.user_id)
1970            .bind(new_collaborator.replica_id)
1971            .bind(new_collaborator.is_host)
1972            .execute(&mut tx)
1973            .await?;
1974            collaborators.push(new_collaborator);
1975
1976            let worktree_rows = sqlx::query_as::<_, WorktreeRow>(
1977                "
1978                SELECT *
1979                FROM worktrees
1980                WHERE project_id = $1
1981                ",
1982            )
1983            .bind(project_id)
1984            .fetch_all(&mut tx)
1985            .await?;
1986            let mut worktrees = worktree_rows
1987                .into_iter()
1988                .map(|worktree_row| {
1989                    (
1990                        worktree_row.id,
1991                        Worktree {
1992                            id: worktree_row.id,
1993                            abs_path: worktree_row.abs_path,
1994                            root_name: worktree_row.root_name,
1995                            visible: worktree_row.visible,
1996                            entries: Default::default(),
1997                            diagnostic_summaries: Default::default(),
1998                            scan_id: worktree_row.scan_id as u64,
1999                            is_complete: worktree_row.is_complete,
2000                        },
2001                    )
2002                })
2003                .collect::<BTreeMap<_, _>>();
2004
2005            // Populate worktree entries.
2006            {
2007                let mut entries = sqlx::query_as::<_, WorktreeEntry>(
2008                    "
2009                    SELECT *
2010                    FROM worktree_entries
2011                    WHERE project_id = $1
2012                    ",
2013                )
2014                .bind(project_id)
2015                .fetch(&mut tx);
2016                while let Some(entry) = entries.next().await {
2017                    let entry = entry?;
2018                    if let Some(worktree) = worktrees.get_mut(&entry.worktree_id) {
2019                        worktree.entries.push(proto::Entry {
2020                            id: entry.id as u64,
2021                            is_dir: entry.is_dir,
2022                            path: entry.path,
2023                            inode: entry.inode as u64,
2024                            mtime: Some(proto::Timestamp {
2025                                seconds: entry.mtime_seconds as u64,
2026                                nanos: entry.mtime_nanos as u32,
2027                            }),
2028                            is_symlink: entry.is_symlink,
2029                            is_ignored: entry.is_ignored,
2030                        });
2031                    }
2032                }
2033            }
2034
2035            // Populate worktree diagnostic summaries.
2036            {
2037                let mut summaries = sqlx::query_as::<_, WorktreeDiagnosticSummary>(
2038                    "
2039                    SELECT *
2040                    FROM worktree_diagnostic_summaries
2041                    WHERE project_id = $1
2042                    ",
2043                )
2044                .bind(project_id)
2045                .fetch(&mut tx);
2046                while let Some(summary) = summaries.next().await {
2047                    let summary = summary?;
2048                    if let Some(worktree) = worktrees.get_mut(&summary.worktree_id) {
2049                        worktree
2050                            .diagnostic_summaries
2051                            .push(proto::DiagnosticSummary {
2052                                path: summary.path,
2053                                language_server_id: summary.language_server_id as u64,
2054                                error_count: summary.error_count as u32,
2055                                warning_count: summary.warning_count as u32,
2056                            });
2057                    }
2058                }
2059            }
2060
2061            // Populate language servers.
2062            let language_servers = sqlx::query_as::<_, LanguageServer>(
2063                "
2064                SELECT *
2065                FROM language_servers
2066                WHERE project_id = $1
2067                ",
2068            )
2069            .bind(project_id)
2070            .fetch_all(&mut tx)
2071            .await?;
2072
2073            tx.commit().await?;
2074            Ok((
2075                Project {
2076                    collaborators,
2077                    worktrees,
2078                    language_servers: language_servers
2079                        .into_iter()
2080                        .map(|language_server| proto::LanguageServer {
2081                            id: language_server.id.to_proto(),
2082                            name: language_server.name,
2083                        })
2084                        .collect(),
2085                },
2086                replica_id as ReplicaId,
2087            ))
2088        })
2089        .await
2090    }
2091
2092    pub async fn leave_project(
2093        &self,
2094        project_id: ProjectId,
2095        connection_id: ConnectionId,
2096    ) -> Result<LeftProject> {
2097        self.transact(|mut tx| async move {
2098            let result = sqlx::query(
2099                "
2100                DELETE FROM project_collaborators
2101                WHERE project_id = $1 AND connection_id = $2
2102                ",
2103            )
2104            .bind(project_id)
2105            .bind(connection_id.0 as i32)
2106            .execute(&mut tx)
2107            .await?;
2108
2109            if result.rows_affected() == 0 {
2110                Err(anyhow!("not a collaborator on this project"))?;
2111            }
2112
2113            let connection_ids = sqlx::query_scalar::<_, i32>(
2114                "
2115                SELECT connection_id
2116                FROM project_collaborators
2117                WHERE project_id = $1
2118                ",
2119            )
2120            .bind(project_id)
2121            .fetch_all(&mut tx)
2122            .await?
2123            .into_iter()
2124            .map(|id| ConnectionId(id as u32))
2125            .collect();
2126
2127            let (host_user_id, host_connection_id) = sqlx::query_as::<_, (i32, i32)>(
2128                "
2129                SELECT host_user_id, host_connection_id
2130                FROM projects
2131                WHERE id = $1
2132                ",
2133            )
2134            .bind(project_id)
2135            .fetch_one(&mut tx)
2136            .await?;
2137
2138            tx.commit().await?;
2139
2140            Ok(LeftProject {
2141                id: project_id,
2142                host_user_id: UserId(host_user_id),
2143                host_connection_id: ConnectionId(host_connection_id as u32),
2144                connection_ids,
2145            })
2146        })
2147        .await
2148    }
2149
2150    pub async fn project_collaborators(
2151        &self,
2152        project_id: ProjectId,
2153        connection_id: ConnectionId,
2154    ) -> Result<Vec<ProjectCollaborator>> {
2155        self.transact(|mut tx| async move {
2156            let collaborators = sqlx::query_as::<_, ProjectCollaborator>(
2157                "
2158                SELECT *
2159                FROM project_collaborators
2160                WHERE project_id = $1
2161                ",
2162            )
2163            .bind(project_id)
2164            .fetch_all(&mut tx)
2165            .await?;
2166
2167            if collaborators
2168                .iter()
2169                .any(|collaborator| collaborator.connection_id == connection_id.0 as i32)
2170            {
2171                Ok(collaborators)
2172            } else {
2173                Err(anyhow!("no such project"))?
2174            }
2175        })
2176        .await
2177    }
2178
2179    pub async fn project_connection_ids(
2180        &self,
2181        project_id: ProjectId,
2182        connection_id: ConnectionId,
2183    ) -> Result<HashSet<ConnectionId>> {
2184        self.transact(|mut tx| async move {
2185            let connection_ids = sqlx::query_scalar::<_, i32>(
2186                "
2187                SELECT connection_id
2188                FROM project_collaborators
2189                WHERE project_id = $1
2190                ",
2191            )
2192            .bind(project_id)
2193            .fetch_all(&mut tx)
2194            .await?;
2195
2196            if connection_ids.contains(&(connection_id.0 as i32)) {
2197                Ok(connection_ids
2198                    .into_iter()
2199                    .map(|connection_id| ConnectionId(connection_id as u32))
2200                    .collect())
2201            } else {
2202                Err(anyhow!("no such project"))?
2203            }
2204        })
2205        .await
2206    }
2207
2208    // contacts
2209
2210    pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
2211        self.transact(|mut tx| async move {
2212            let query = "
2213                SELECT user_id_a, user_id_b, a_to_b, accepted, should_notify, (room_participants.id IS NOT NULL) as busy
2214                FROM contacts
2215                LEFT JOIN room_participants ON room_participants.user_id = $1
2216                WHERE user_id_a = $1 OR user_id_b = $1;
2217            ";
2218
2219            let mut rows = sqlx::query_as::<_, (UserId, UserId, bool, bool, bool, bool)>(query)
2220                .bind(user_id)
2221                .fetch(&mut tx);
2222
2223            let mut contacts = Vec::new();
2224            while let Some(row) = rows.next().await {
2225                let (user_id_a, user_id_b, a_to_b, accepted, should_notify, busy) = row?;
2226                if user_id_a == user_id {
2227                    if accepted {
2228                        contacts.push(Contact::Accepted {
2229                            user_id: user_id_b,
2230                            should_notify: should_notify && a_to_b,
2231                            busy
2232                        });
2233                    } else if a_to_b {
2234                        contacts.push(Contact::Outgoing { user_id: user_id_b })
2235                    } else {
2236                        contacts.push(Contact::Incoming {
2237                            user_id: user_id_b,
2238                            should_notify,
2239                        });
2240                    }
2241                } else if accepted {
2242                    contacts.push(Contact::Accepted {
2243                        user_id: user_id_a,
2244                        should_notify: should_notify && !a_to_b,
2245                        busy
2246                    });
2247                } else if a_to_b {
2248                    contacts.push(Contact::Incoming {
2249                        user_id: user_id_a,
2250                        should_notify,
2251                    });
2252                } else {
2253                    contacts.push(Contact::Outgoing { user_id: user_id_a });
2254                }
2255            }
2256
2257            contacts.sort_unstable_by_key(|contact| contact.user_id());
2258
2259            Ok(contacts)
2260        })
2261        .await
2262    }
2263
2264    pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
2265        self.transact(|mut tx| async move {
2266            Ok(sqlx::query_scalar::<_, i32>(
2267                "
2268                SELECT 1
2269                FROM room_participants
2270                WHERE room_participants.user_id = $1
2271                ",
2272            )
2273            .bind(user_id)
2274            .fetch_optional(&mut tx)
2275            .await?
2276            .is_some())
2277        })
2278        .await
2279    }
2280
2281    pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
2282        self.transact(|mut tx| async move {
2283            let (id_a, id_b) = if user_id_1 < user_id_2 {
2284                (user_id_1, user_id_2)
2285            } else {
2286                (user_id_2, user_id_1)
2287            };
2288
2289            let query = "
2290                SELECT 1 FROM contacts
2291                WHERE user_id_a = $1 AND user_id_b = $2 AND accepted = TRUE
2292                LIMIT 1
2293            ";
2294            Ok(sqlx::query_scalar::<_, i32>(query)
2295                .bind(id_a.0)
2296                .bind(id_b.0)
2297                .fetch_optional(&mut tx)
2298                .await?
2299                .is_some())
2300        })
2301        .await
2302    }
2303
2304    pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
2305        self.transact(|mut tx| async move {
2306            let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
2307                (sender_id, receiver_id, true)
2308            } else {
2309                (receiver_id, sender_id, false)
2310            };
2311            let query = "
2312                INSERT into contacts (user_id_a, user_id_b, a_to_b, accepted, should_notify)
2313                VALUES ($1, $2, $3, FALSE, TRUE)
2314                ON CONFLICT (user_id_a, user_id_b) DO UPDATE
2315                SET
2316                    accepted = TRUE,
2317                    should_notify = FALSE
2318                WHERE
2319                    NOT contacts.accepted AND
2320                    ((contacts.a_to_b = excluded.a_to_b AND contacts.user_id_a = excluded.user_id_b) OR
2321                    (contacts.a_to_b != excluded.a_to_b AND contacts.user_id_a = excluded.user_id_a));
2322            ";
2323            let result = sqlx::query(query)
2324                .bind(id_a.0)
2325                .bind(id_b.0)
2326                .bind(a_to_b)
2327                .execute(&mut tx)
2328                .await?;
2329
2330            if result.rows_affected() == 1 {
2331                tx.commit().await?;
2332                Ok(())
2333            } else {
2334                Err(anyhow!("contact already requested"))?
2335            }
2336        }).await
2337    }
2338
2339    pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<()> {
2340        self.transact(|mut tx| async move {
2341            let (id_a, id_b) = if responder_id < requester_id {
2342                (responder_id, requester_id)
2343            } else {
2344                (requester_id, responder_id)
2345            };
2346            let query = "
2347                DELETE FROM contacts
2348                WHERE user_id_a = $1 AND user_id_b = $2;
2349            ";
2350            let result = sqlx::query(query)
2351                .bind(id_a.0)
2352                .bind(id_b.0)
2353                .execute(&mut tx)
2354                .await?;
2355
2356            if result.rows_affected() == 1 {
2357                tx.commit().await?;
2358                Ok(())
2359            } else {
2360                Err(anyhow!("no such contact"))?
2361            }
2362        })
2363        .await
2364    }
2365
2366    pub async fn dismiss_contact_notification(
2367        &self,
2368        user_id: UserId,
2369        contact_user_id: UserId,
2370    ) -> Result<()> {
2371        self.transact(|mut tx| async move {
2372            let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
2373                (user_id, contact_user_id, true)
2374            } else {
2375                (contact_user_id, user_id, false)
2376            };
2377
2378            let query = "
2379                UPDATE contacts
2380                SET should_notify = FALSE
2381                WHERE
2382                    user_id_a = $1 AND user_id_b = $2 AND
2383                    (
2384                        (a_to_b = $3 AND accepted) OR
2385                        (a_to_b != $3 AND NOT accepted)
2386                    );
2387            ";
2388
2389            let result = sqlx::query(query)
2390                .bind(id_a.0)
2391                .bind(id_b.0)
2392                .bind(a_to_b)
2393                .execute(&mut tx)
2394                .await?;
2395
2396            if result.rows_affected() == 0 {
2397                Err(anyhow!("no such contact request"))?
2398            } else {
2399                tx.commit().await?;
2400                Ok(())
2401            }
2402        })
2403        .await
2404    }
2405
2406    pub async fn respond_to_contact_request(
2407        &self,
2408        responder_id: UserId,
2409        requester_id: UserId,
2410        accept: bool,
2411    ) -> Result<()> {
2412        self.transact(|mut tx| async move {
2413            let (id_a, id_b, a_to_b) = if responder_id < requester_id {
2414                (responder_id, requester_id, false)
2415            } else {
2416                (requester_id, responder_id, true)
2417            };
2418            let result = if accept {
2419                let query = "
2420                    UPDATE contacts
2421                    SET accepted = TRUE, should_notify = TRUE
2422                    WHERE user_id_a = $1 AND user_id_b = $2 AND a_to_b = $3;
2423                ";
2424                sqlx::query(query)
2425                    .bind(id_a.0)
2426                    .bind(id_b.0)
2427                    .bind(a_to_b)
2428                    .execute(&mut tx)
2429                    .await?
2430            } else {
2431                let query = "
2432                    DELETE FROM contacts
2433                    WHERE user_id_a = $1 AND user_id_b = $2 AND a_to_b = $3 AND NOT accepted;
2434                ";
2435                sqlx::query(query)
2436                    .bind(id_a.0)
2437                    .bind(id_b.0)
2438                    .bind(a_to_b)
2439                    .execute(&mut tx)
2440                    .await?
2441            };
2442            if result.rows_affected() == 1 {
2443                tx.commit().await?;
2444                Ok(())
2445            } else {
2446                Err(anyhow!("no such contact request"))?
2447            }
2448        })
2449        .await
2450    }
2451
2452    // access tokens
2453
2454    pub async fn create_access_token_hash(
2455        &self,
2456        user_id: UserId,
2457        access_token_hash: &str,
2458        max_access_token_count: usize,
2459    ) -> Result<()> {
2460        self.transact(|tx| async {
2461            let mut tx = tx;
2462            let insert_query = "
2463                INSERT INTO access_tokens (user_id, hash)
2464                VALUES ($1, $2);
2465            ";
2466            let cleanup_query = "
2467                DELETE FROM access_tokens
2468                WHERE id IN (
2469                    SELECT id from access_tokens
2470                    WHERE user_id = $1
2471                    ORDER BY id DESC
2472                    LIMIT 10000
2473                    OFFSET $3
2474                )
2475            ";
2476
2477            sqlx::query(insert_query)
2478                .bind(user_id.0)
2479                .bind(access_token_hash)
2480                .execute(&mut tx)
2481                .await?;
2482            sqlx::query(cleanup_query)
2483                .bind(user_id.0)
2484                .bind(access_token_hash)
2485                .bind(max_access_token_count as i32)
2486                .execute(&mut tx)
2487                .await?;
2488            Ok(tx.commit().await?)
2489        })
2490        .await
2491    }
2492
2493    pub async fn get_access_token_hashes(&self, user_id: UserId) -> Result<Vec<String>> {
2494        self.transact(|mut tx| async move {
2495            let query = "
2496                SELECT hash
2497                FROM access_tokens
2498                WHERE user_id = $1
2499                ORDER BY id DESC
2500            ";
2501            Ok(sqlx::query_scalar(query)
2502                .bind(user_id.0)
2503                .fetch_all(&mut tx)
2504                .await?)
2505        })
2506        .await
2507    }
2508
2509    async fn transact<F, Fut, T>(&self, f: F) -> Result<T>
2510    where
2511        F: Send + Fn(sqlx::Transaction<'static, D>) -> Fut,
2512        Fut: Send + Future<Output = Result<T>>,
2513    {
2514        let body = async {
2515            loop {
2516                let tx = self.begin_transaction().await?;
2517                match f(tx).await {
2518                    Ok(result) => return Ok(result),
2519                    Err(error) => match error {
2520                        Error::Database(error)
2521                            if error
2522                                .as_database_error()
2523                                .and_then(|error| error.code())
2524                                .as_deref()
2525                                == Some("hey") =>
2526                        {
2527                            // Retry (don't break the loop)
2528                        }
2529                        error @ _ => return Err(error),
2530                    },
2531                }
2532            }
2533        };
2534
2535        #[cfg(test)]
2536        {
2537            if let Some(background) = self.background.as_ref() {
2538                background.simulate_random_delay().await;
2539            }
2540
2541            let result = self.runtime.as_ref().unwrap().block_on(body);
2542
2543            if let Some(background) = self.background.as_ref() {
2544                background.simulate_random_delay().await;
2545            }
2546
2547            result
2548        }
2549
2550        #[cfg(not(test))]
2551        {
2552            body.await
2553        }
2554    }
2555}
2556
2557macro_rules! id_type {
2558    ($name:ident) => {
2559        #[derive(
2560            Clone,
2561            Copy,
2562            Debug,
2563            Default,
2564            PartialEq,
2565            Eq,
2566            PartialOrd,
2567            Ord,
2568            Hash,
2569            sqlx::Type,
2570            Serialize,
2571            Deserialize,
2572        )]
2573        #[sqlx(transparent)]
2574        #[serde(transparent)]
2575        pub struct $name(pub i32);
2576
2577        impl $name {
2578            #[allow(unused)]
2579            pub const MAX: Self = Self(i32::MAX);
2580
2581            #[allow(unused)]
2582            pub fn from_proto(value: u64) -> Self {
2583                Self(value as i32)
2584            }
2585
2586            #[allow(unused)]
2587            pub fn to_proto(self) -> u64 {
2588                self.0 as u64
2589            }
2590        }
2591
2592        impl std::fmt::Display for $name {
2593            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2594                self.0.fmt(f)
2595            }
2596        }
2597    };
2598}
2599
2600id_type!(UserId);
2601#[derive(Clone, Debug, Default, FromRow, Serialize, PartialEq)]
2602pub struct User {
2603    pub id: UserId,
2604    pub github_login: String,
2605    pub github_user_id: Option<i32>,
2606    pub email_address: Option<String>,
2607    pub admin: bool,
2608    pub invite_code: Option<String>,
2609    pub invite_count: i32,
2610    pub connected_once: bool,
2611}
2612
2613id_type!(RoomId);
2614#[derive(Clone, Debug, Default, FromRow, Serialize, PartialEq)]
2615pub struct Room {
2616    pub id: RoomId,
2617    pub version: i32,
2618    pub live_kit_room: String,
2619}
2620
2621id_type!(ProjectId);
2622pub struct Project {
2623    pub collaborators: Vec<ProjectCollaborator>,
2624    pub worktrees: BTreeMap<WorktreeId, Worktree>,
2625    pub language_servers: Vec<proto::LanguageServer>,
2626}
2627
2628id_type!(ReplicaId);
2629#[derive(Clone, Debug, Default, FromRow, PartialEq)]
2630pub struct ProjectCollaborator {
2631    pub project_id: ProjectId,
2632    pub connection_id: i32,
2633    pub user_id: UserId,
2634    pub replica_id: ReplicaId,
2635    pub is_host: bool,
2636}
2637
2638id_type!(WorktreeId);
2639#[derive(Clone, Debug, Default, FromRow, PartialEq)]
2640struct WorktreeRow {
2641    pub id: WorktreeId,
2642    pub abs_path: String,
2643    pub root_name: String,
2644    pub visible: bool,
2645    pub scan_id: i64,
2646    pub is_complete: bool,
2647}
2648
2649pub struct Worktree {
2650    pub id: WorktreeId,
2651    pub abs_path: String,
2652    pub root_name: String,
2653    pub visible: bool,
2654    pub entries: Vec<proto::Entry>,
2655    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
2656    pub scan_id: u64,
2657    pub is_complete: bool,
2658}
2659
2660#[derive(Clone, Debug, Default, FromRow, PartialEq)]
2661struct WorktreeEntry {
2662    id: i64,
2663    worktree_id: WorktreeId,
2664    is_dir: bool,
2665    path: String,
2666    inode: i64,
2667    mtime_seconds: i64,
2668    mtime_nanos: i32,
2669    is_symlink: bool,
2670    is_ignored: bool,
2671}
2672
2673#[derive(Clone, Debug, Default, FromRow, PartialEq)]
2674struct WorktreeDiagnosticSummary {
2675    worktree_id: WorktreeId,
2676    path: String,
2677    language_server_id: i64,
2678    error_count: i32,
2679    warning_count: i32,
2680}
2681
2682id_type!(LanguageServerId);
2683#[derive(Clone, Debug, Default, FromRow, PartialEq)]
2684struct LanguageServer {
2685    id: LanguageServerId,
2686    name: String,
2687}
2688
2689pub struct LeftProject {
2690    pub id: ProjectId,
2691    pub host_user_id: UserId,
2692    pub host_connection_id: ConnectionId,
2693    pub connection_ids: Vec<ConnectionId>,
2694}
2695
2696pub struct LeftRoom {
2697    pub room: proto::Room,
2698    pub left_projects: HashMap<ProjectId, LeftProject>,
2699    pub canceled_calls_to_user_ids: Vec<UserId>,
2700}
2701
2702#[derive(Clone, Debug, PartialEq, Eq)]
2703pub enum Contact {
2704    Accepted {
2705        user_id: UserId,
2706        should_notify: bool,
2707        busy: bool,
2708    },
2709    Outgoing {
2710        user_id: UserId,
2711    },
2712    Incoming {
2713        user_id: UserId,
2714        should_notify: bool,
2715    },
2716}
2717
2718impl Contact {
2719    pub fn user_id(&self) -> UserId {
2720        match self {
2721            Contact::Accepted { user_id, .. } => *user_id,
2722            Contact::Outgoing { user_id } => *user_id,
2723            Contact::Incoming { user_id, .. } => *user_id,
2724        }
2725    }
2726}
2727
2728#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
2729pub struct IncomingContactRequest {
2730    pub requester_id: UserId,
2731    pub should_notify: bool,
2732}
2733
2734#[derive(Clone, Deserialize)]
2735pub struct Signup {
2736    pub email_address: String,
2737    pub platform_mac: bool,
2738    pub platform_windows: bool,
2739    pub platform_linux: bool,
2740    pub editor_features: Vec<String>,
2741    pub programming_languages: Vec<String>,
2742    pub device_id: Option<String>,
2743}
2744
2745#[derive(Clone, Debug, PartialEq, Deserialize, Serialize, FromRow)]
2746pub struct WaitlistSummary {
2747    #[sqlx(default)]
2748    pub count: i64,
2749    #[sqlx(default)]
2750    pub linux_count: i64,
2751    #[sqlx(default)]
2752    pub mac_count: i64,
2753    #[sqlx(default)]
2754    pub windows_count: i64,
2755    #[sqlx(default)]
2756    pub unknown_count: i64,
2757}
2758
2759#[derive(FromRow, PartialEq, Debug, Serialize, Deserialize)]
2760pub struct Invite {
2761    pub email_address: String,
2762    pub email_confirmation_code: String,
2763}
2764
2765#[derive(Debug, Serialize, Deserialize)]
2766pub struct NewUserParams {
2767    pub github_login: String,
2768    pub github_user_id: i32,
2769    pub invite_count: i32,
2770}
2771
2772#[derive(Debug)]
2773pub struct NewUserResult {
2774    pub user_id: UserId,
2775    pub metrics_id: String,
2776    pub inviting_user_id: Option<UserId>,
2777    pub signup_device_id: Option<String>,
2778}
2779
2780fn random_invite_code() -> String {
2781    nanoid::nanoid!(16)
2782}
2783
2784fn random_email_confirmation_code() -> String {
2785    nanoid::nanoid!(64)
2786}
2787
2788#[cfg(test)]
2789pub use test::*;
2790
2791#[cfg(test)]
2792mod test {
2793    use super::*;
2794    use gpui::executor::Background;
2795    use lazy_static::lazy_static;
2796    use parking_lot::Mutex;
2797    use rand::prelude::*;
2798    use sqlx::migrate::MigrateDatabase;
2799    use std::sync::Arc;
2800
2801    pub struct SqliteTestDb {
2802        pub db: Option<Arc<Db<sqlx::Sqlite>>>,
2803        pub conn: sqlx::sqlite::SqliteConnection,
2804    }
2805
2806    pub struct PostgresTestDb {
2807        pub db: Option<Arc<Db<sqlx::Postgres>>>,
2808        pub url: String,
2809    }
2810
2811    impl SqliteTestDb {
2812        pub fn new(background: Arc<Background>) -> Self {
2813            let mut rng = StdRng::from_entropy();
2814            let url = format!("file:zed-test-{}?mode=memory", rng.gen::<u128>());
2815            let runtime = tokio::runtime::Builder::new_current_thread()
2816                .enable_io()
2817                .enable_time()
2818                .build()
2819                .unwrap();
2820
2821            let (mut db, conn) = runtime.block_on(async {
2822                let db = Db::<sqlx::Sqlite>::new(&url, 5).await.unwrap();
2823                let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations.sqlite");
2824                db.migrate(migrations_path.as_ref(), false).await.unwrap();
2825                let conn = db.pool.acquire().await.unwrap().detach();
2826                (db, conn)
2827            });
2828
2829            db.background = Some(background);
2830            db.runtime = Some(runtime);
2831
2832            Self {
2833                db: Some(Arc::new(db)),
2834                conn,
2835            }
2836        }
2837
2838        pub fn db(&self) -> &Arc<Db<sqlx::Sqlite>> {
2839            self.db.as_ref().unwrap()
2840        }
2841    }
2842
2843    impl PostgresTestDb {
2844        pub fn new(background: Arc<Background>) -> Self {
2845            lazy_static! {
2846                static ref LOCK: Mutex<()> = Mutex::new(());
2847            }
2848
2849            let _guard = LOCK.lock();
2850            let mut rng = StdRng::from_entropy();
2851            let url = format!(
2852                "postgres://postgres@localhost/zed-test-{}",
2853                rng.gen::<u128>()
2854            );
2855            let runtime = tokio::runtime::Builder::new_current_thread()
2856                .enable_io()
2857                .enable_time()
2858                .build()
2859                .unwrap();
2860
2861            let mut db = runtime.block_on(async {
2862                sqlx::Postgres::create_database(&url)
2863                    .await
2864                    .expect("failed to create test db");
2865                let db = Db::<sqlx::Postgres>::new(&url, 5).await.unwrap();
2866                let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
2867                db.migrate(Path::new(migrations_path), false).await.unwrap();
2868                db
2869            });
2870
2871            db.background = Some(background);
2872            db.runtime = Some(runtime);
2873
2874            Self {
2875                db: Some(Arc::new(db)),
2876                url,
2877            }
2878        }
2879
2880        pub fn db(&self) -> &Arc<Db<sqlx::Postgres>> {
2881            self.db.as_ref().unwrap()
2882        }
2883    }
2884
2885    impl Drop for PostgresTestDb {
2886        fn drop(&mut self) {
2887            let db = self.db.take().unwrap();
2888            db.teardown(&self.url);
2889        }
2890    }
2891}