db.rs

   1mod access_token;
   2mod contact;
   3mod project;
   4mod project_collaborator;
   5mod room;
   6mod room_participant;
   7mod signup;
   8#[cfg(test)]
   9mod tests;
  10mod user;
  11mod worktree;
  12
  13use crate::{Error, Result};
  14use anyhow::anyhow;
  15use collections::{BTreeMap, HashMap, HashSet};
  16pub use contact::Contact;
  17use dashmap::DashMap;
  18use futures::StreamExt;
  19use hyper::StatusCode;
  20use rpc::{proto, ConnectionId};
  21pub use sea_orm::ConnectOptions;
  22use sea_orm::{
  23    entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseBackend, DatabaseConnection,
  24    DatabaseTransaction, DbErr, FromQueryResult, IntoActiveModel, JoinType, QueryOrder,
  25    QuerySelect, Statement, TransactionTrait,
  26};
  27use sea_query::{Alias, Expr, OnConflict, Query};
  28use serde::{Deserialize, Serialize};
  29pub use signup::{Invite, NewSignup, WaitlistSummary};
  30use sqlx::migrate::{Migrate, Migration, MigrationSource};
  31use sqlx::Connection;
  32use std::ops::{Deref, DerefMut};
  33use std::path::Path;
  34use std::time::Duration;
  35use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
  36use tokio::sync::{Mutex, OwnedMutexGuard};
  37pub use user::Model as User;
  38
  39pub struct Database {
  40    options: ConnectOptions,
  41    pool: DatabaseConnection,
  42    rooms: DashMap<RoomId, Arc<Mutex<()>>>,
  43    #[cfg(test)]
  44    background: Option<std::sync::Arc<gpui::executor::Background>>,
  45    #[cfg(test)]
  46    runtime: Option<tokio::runtime::Runtime>,
  47}
  48
  49impl Database {
  50    pub async fn new(options: ConnectOptions) -> Result<Self> {
  51        Ok(Self {
  52            options: options.clone(),
  53            pool: sea_orm::Database::connect(options).await?,
  54            rooms: DashMap::with_capacity(16384),
  55            #[cfg(test)]
  56            background: None,
  57            #[cfg(test)]
  58            runtime: None,
  59        })
  60    }
  61
  62    pub async fn migrate(
  63        &self,
  64        migrations_path: &Path,
  65        ignore_checksum_mismatch: bool,
  66    ) -> anyhow::Result<Vec<(Migration, Duration)>> {
  67        let migrations = MigrationSource::resolve(migrations_path)
  68            .await
  69            .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
  70
  71        let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
  72
  73        connection.ensure_migrations_table().await?;
  74        let applied_migrations: HashMap<_, _> = connection
  75            .list_applied_migrations()
  76            .await?
  77            .into_iter()
  78            .map(|m| (m.version, m))
  79            .collect();
  80
  81        let mut new_migrations = Vec::new();
  82        for migration in migrations {
  83            match applied_migrations.get(&migration.version) {
  84                Some(applied_migration) => {
  85                    if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
  86                    {
  87                        Err(anyhow!(
  88                            "checksum mismatch for applied migration {}",
  89                            migration.description
  90                        ))?;
  91                    }
  92                }
  93                None => {
  94                    let elapsed = connection.apply(&migration).await?;
  95                    new_migrations.push((migration, elapsed));
  96                }
  97            }
  98        }
  99
 100        Ok(new_migrations)
 101    }
 102
 103    // users
 104
 105    pub async fn create_user(
 106        &self,
 107        email_address: &str,
 108        admin: bool,
 109        params: NewUserParams,
 110    ) -> Result<NewUserResult> {
 111        self.transact(|tx| async {
 112            let user = user::Entity::insert(user::ActiveModel {
 113                email_address: ActiveValue::set(Some(email_address.into())),
 114                github_login: ActiveValue::set(params.github_login.clone()),
 115                github_user_id: ActiveValue::set(Some(params.github_user_id)),
 116                admin: ActiveValue::set(admin),
 117                metrics_id: ActiveValue::set(Uuid::new_v4()),
 118                ..Default::default()
 119            })
 120            .on_conflict(
 121                OnConflict::column(user::Column::GithubLogin)
 122                    .update_column(user::Column::GithubLogin)
 123                    .to_owned(),
 124            )
 125            .exec_with_returning(&tx)
 126            .await?;
 127
 128            tx.commit().await?;
 129
 130            Ok(NewUserResult {
 131                user_id: user.id,
 132                metrics_id: user.metrics_id.to_string(),
 133                signup_device_id: None,
 134                inviting_user_id: None,
 135            })
 136        })
 137        .await
 138    }
 139
 140    pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
 141        self.transact(|tx| async move { Ok(user::Entity::find_by_id(id).one(&tx).await?) })
 142            .await
 143    }
 144
 145    pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
 146        self.transact(|tx| async {
 147            let tx = tx;
 148            Ok(user::Entity::find()
 149                .filter(user::Column::Id.is_in(ids.iter().copied()))
 150                .all(&tx)
 151                .await?)
 152        })
 153        .await
 154    }
 155
 156    pub async fn get_user_by_github_account(
 157        &self,
 158        github_login: &str,
 159        github_user_id: Option<i32>,
 160    ) -> Result<Option<User>> {
 161        self.transact(|tx| async {
 162            let tx = tx;
 163            if let Some(github_user_id) = github_user_id {
 164                if let Some(user_by_github_user_id) = user::Entity::find()
 165                    .filter(user::Column::GithubUserId.eq(github_user_id))
 166                    .one(&tx)
 167                    .await?
 168                {
 169                    let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
 170                    user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
 171                    Ok(Some(user_by_github_user_id.update(&tx).await?))
 172                } else if let Some(user_by_github_login) = user::Entity::find()
 173                    .filter(user::Column::GithubLogin.eq(github_login))
 174                    .one(&tx)
 175                    .await?
 176                {
 177                    let mut user_by_github_login = user_by_github_login.into_active_model();
 178                    user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
 179                    Ok(Some(user_by_github_login.update(&tx).await?))
 180                } else {
 181                    Ok(None)
 182                }
 183            } else {
 184                Ok(user::Entity::find()
 185                    .filter(user::Column::GithubLogin.eq(github_login))
 186                    .one(&tx)
 187                    .await?)
 188            }
 189        })
 190        .await
 191    }
 192
 193    pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
 194        self.transact(|tx| async move {
 195            Ok(user::Entity::find()
 196                .order_by_asc(user::Column::GithubLogin)
 197                .limit(limit as u64)
 198                .offset(page as u64 * limit as u64)
 199                .all(&tx)
 200                .await?)
 201        })
 202        .await
 203    }
 204
 205    pub async fn get_users_with_no_invites(
 206        &self,
 207        invited_by_another_user: bool,
 208    ) -> Result<Vec<User>> {
 209        self.transact(|tx| async move {
 210            Ok(user::Entity::find()
 211                .filter(
 212                    user::Column::InviteCount
 213                        .eq(0)
 214                        .and(if invited_by_another_user {
 215                            user::Column::InviterId.is_not_null()
 216                        } else {
 217                            user::Column::InviterId.is_null()
 218                        }),
 219                )
 220                .all(&tx)
 221                .await?)
 222        })
 223        .await
 224    }
 225
 226    pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
 227        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 228        enum QueryAs {
 229            MetricsId,
 230        }
 231
 232        self.transact(|tx| async move {
 233            let metrics_id: Uuid = user::Entity::find_by_id(id)
 234                .select_only()
 235                .column(user::Column::MetricsId)
 236                .into_values::<_, QueryAs>()
 237                .one(&tx)
 238                .await?
 239                .ok_or_else(|| anyhow!("could not find user"))?;
 240            Ok(metrics_id.to_string())
 241        })
 242        .await
 243    }
 244
 245    pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
 246        self.transact(|tx| async move {
 247            user::Entity::update_many()
 248                .filter(user::Column::Id.eq(id))
 249                .col_expr(user::Column::Admin, is_admin.into())
 250                .exec(&tx)
 251                .await?;
 252            tx.commit().await?;
 253            Ok(())
 254        })
 255        .await
 256    }
 257
 258    pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
 259        self.transact(|tx| async move {
 260            user::Entity::update_many()
 261                .filter(user::Column::Id.eq(id))
 262                .col_expr(user::Column::ConnectedOnce, connected_once.into())
 263                .exec(&tx)
 264                .await?;
 265            tx.commit().await?;
 266            Ok(())
 267        })
 268        .await
 269    }
 270
 271    pub async fn destroy_user(&self, id: UserId) -> Result<()> {
 272        self.transact(|tx| async move {
 273            access_token::Entity::delete_many()
 274                .filter(access_token::Column::UserId.eq(id))
 275                .exec(&tx)
 276                .await?;
 277            user::Entity::delete_by_id(id).exec(&tx).await?;
 278            tx.commit().await?;
 279            Ok(())
 280        })
 281        .await
 282    }
 283
 284    // contacts
 285
 286    pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
 287        #[derive(Debug, FromQueryResult)]
 288        struct ContactWithUserBusyStatuses {
 289            user_id_a: UserId,
 290            user_id_b: UserId,
 291            a_to_b: bool,
 292            accepted: bool,
 293            should_notify: bool,
 294            user_a_busy: bool,
 295            user_b_busy: bool,
 296        }
 297
 298        self.transact(|tx| async move {
 299            let user_a_participant = Alias::new("user_a_participant");
 300            let user_b_participant = Alias::new("user_b_participant");
 301            let mut db_contacts = contact::Entity::find()
 302                .column_as(
 303                    Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
 304                        .is_not_null(),
 305                    "user_a_busy",
 306                )
 307                .column_as(
 308                    Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
 309                        .is_not_null(),
 310                    "user_b_busy",
 311                )
 312                .filter(
 313                    contact::Column::UserIdA
 314                        .eq(user_id)
 315                        .or(contact::Column::UserIdB.eq(user_id)),
 316                )
 317                .join_as(
 318                    JoinType::LeftJoin,
 319                    contact::Relation::UserARoomParticipant.def(),
 320                    user_a_participant,
 321                )
 322                .join_as(
 323                    JoinType::LeftJoin,
 324                    contact::Relation::UserBRoomParticipant.def(),
 325                    user_b_participant,
 326                )
 327                .into_model::<ContactWithUserBusyStatuses>()
 328                .stream(&tx)
 329                .await?;
 330
 331            let mut contacts = Vec::new();
 332            while let Some(db_contact) = db_contacts.next().await {
 333                let db_contact = db_contact?;
 334                if db_contact.user_id_a == user_id {
 335                    if db_contact.accepted {
 336                        contacts.push(Contact::Accepted {
 337                            user_id: db_contact.user_id_b,
 338                            should_notify: db_contact.should_notify && db_contact.a_to_b,
 339                            busy: db_contact.user_b_busy,
 340                        });
 341                    } else if db_contact.a_to_b {
 342                        contacts.push(Contact::Outgoing {
 343                            user_id: db_contact.user_id_b,
 344                        })
 345                    } else {
 346                        contacts.push(Contact::Incoming {
 347                            user_id: db_contact.user_id_b,
 348                            should_notify: db_contact.should_notify,
 349                        });
 350                    }
 351                } else if db_contact.accepted {
 352                    contacts.push(Contact::Accepted {
 353                        user_id: db_contact.user_id_a,
 354                        should_notify: db_contact.should_notify && !db_contact.a_to_b,
 355                        busy: db_contact.user_a_busy,
 356                    });
 357                } else if db_contact.a_to_b {
 358                    contacts.push(Contact::Incoming {
 359                        user_id: db_contact.user_id_a,
 360                        should_notify: db_contact.should_notify,
 361                    });
 362                } else {
 363                    contacts.push(Contact::Outgoing {
 364                        user_id: db_contact.user_id_a,
 365                    });
 366                }
 367            }
 368
 369            contacts.sort_unstable_by_key(|contact| contact.user_id());
 370
 371            Ok(contacts)
 372        })
 373        .await
 374    }
 375
 376    pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
 377        self.transact(|tx| async move {
 378            let participant = room_participant::Entity::find()
 379                .filter(room_participant::Column::UserId.eq(user_id))
 380                .one(&tx)
 381                .await?;
 382            Ok(participant.is_some())
 383        })
 384        .await
 385    }
 386
 387    pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
 388        self.transact(|tx| async move {
 389            let (id_a, id_b) = if user_id_1 < user_id_2 {
 390                (user_id_1, user_id_2)
 391            } else {
 392                (user_id_2, user_id_1)
 393            };
 394
 395            Ok(contact::Entity::find()
 396                .filter(
 397                    contact::Column::UserIdA
 398                        .eq(id_a)
 399                        .and(contact::Column::UserIdB.eq(id_b))
 400                        .and(contact::Column::Accepted.eq(true)),
 401                )
 402                .one(&tx)
 403                .await?
 404                .is_some())
 405        })
 406        .await
 407    }
 408
 409    pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
 410        self.transact(|tx| async move {
 411            let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
 412                (sender_id, receiver_id, true)
 413            } else {
 414                (receiver_id, sender_id, false)
 415            };
 416
 417            let rows_affected = contact::Entity::insert(contact::ActiveModel {
 418                user_id_a: ActiveValue::set(id_a),
 419                user_id_b: ActiveValue::set(id_b),
 420                a_to_b: ActiveValue::set(a_to_b),
 421                accepted: ActiveValue::set(false),
 422                should_notify: ActiveValue::set(true),
 423                ..Default::default()
 424            })
 425            .on_conflict(
 426                OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
 427                    .values([
 428                        (contact::Column::Accepted, true.into()),
 429                        (contact::Column::ShouldNotify, false.into()),
 430                    ])
 431                    .action_and_where(
 432                        contact::Column::Accepted.eq(false).and(
 433                            contact::Column::AToB
 434                                .eq(a_to_b)
 435                                .and(contact::Column::UserIdA.eq(id_b))
 436                                .or(contact::Column::AToB
 437                                    .ne(a_to_b)
 438                                    .and(contact::Column::UserIdA.eq(id_a))),
 439                        ),
 440                    )
 441                    .to_owned(),
 442            )
 443            .exec_without_returning(&tx)
 444            .await?;
 445
 446            if rows_affected == 1 {
 447                tx.commit().await?;
 448                Ok(())
 449            } else {
 450                Err(anyhow!("contact already requested"))?
 451            }
 452        })
 453        .await
 454    }
 455
 456    pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<()> {
 457        self.transact(|tx| async move {
 458            let (id_a, id_b) = if responder_id < requester_id {
 459                (responder_id, requester_id)
 460            } else {
 461                (requester_id, responder_id)
 462            };
 463
 464            let result = contact::Entity::delete_many()
 465                .filter(
 466                    contact::Column::UserIdA
 467                        .eq(id_a)
 468                        .and(contact::Column::UserIdB.eq(id_b)),
 469                )
 470                .exec(&tx)
 471                .await?;
 472
 473            if result.rows_affected == 1 {
 474                tx.commit().await?;
 475                Ok(())
 476            } else {
 477                Err(anyhow!("no such contact"))?
 478            }
 479        })
 480        .await
 481    }
 482
 483    pub async fn dismiss_contact_notification(
 484        &self,
 485        user_id: UserId,
 486        contact_user_id: UserId,
 487    ) -> Result<()> {
 488        self.transact(|tx| async move {
 489            let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
 490                (user_id, contact_user_id, true)
 491            } else {
 492                (contact_user_id, user_id, false)
 493            };
 494
 495            let result = contact::Entity::update_many()
 496                .set(contact::ActiveModel {
 497                    should_notify: ActiveValue::set(false),
 498                    ..Default::default()
 499                })
 500                .filter(
 501                    contact::Column::UserIdA
 502                        .eq(id_a)
 503                        .and(contact::Column::UserIdB.eq(id_b))
 504                        .and(
 505                            contact::Column::AToB
 506                                .eq(a_to_b)
 507                                .and(contact::Column::Accepted.eq(true))
 508                                .or(contact::Column::AToB
 509                                    .ne(a_to_b)
 510                                    .and(contact::Column::Accepted.eq(false))),
 511                        ),
 512                )
 513                .exec(&tx)
 514                .await?;
 515            if result.rows_affected == 0 {
 516                Err(anyhow!("no such contact request"))?
 517            } else {
 518                tx.commit().await?;
 519                Ok(())
 520            }
 521        })
 522        .await
 523    }
 524
 525    pub async fn respond_to_contact_request(
 526        &self,
 527        responder_id: UserId,
 528        requester_id: UserId,
 529        accept: bool,
 530    ) -> Result<()> {
 531        self.transact(|tx| async move {
 532            let (id_a, id_b, a_to_b) = if responder_id < requester_id {
 533                (responder_id, requester_id, false)
 534            } else {
 535                (requester_id, responder_id, true)
 536            };
 537            let rows_affected = if accept {
 538                let result = contact::Entity::update_many()
 539                    .set(contact::ActiveModel {
 540                        accepted: ActiveValue::set(true),
 541                        should_notify: ActiveValue::set(true),
 542                        ..Default::default()
 543                    })
 544                    .filter(
 545                        contact::Column::UserIdA
 546                            .eq(id_a)
 547                            .and(contact::Column::UserIdB.eq(id_b))
 548                            .and(contact::Column::AToB.eq(a_to_b)),
 549                    )
 550                    .exec(&tx)
 551                    .await?;
 552                result.rows_affected
 553            } else {
 554                let result = contact::Entity::delete_many()
 555                    .filter(
 556                        contact::Column::UserIdA
 557                            .eq(id_a)
 558                            .and(contact::Column::UserIdB.eq(id_b))
 559                            .and(contact::Column::AToB.eq(a_to_b))
 560                            .and(contact::Column::Accepted.eq(false)),
 561                    )
 562                    .exec(&tx)
 563                    .await?;
 564
 565                result.rows_affected
 566            };
 567
 568            if rows_affected == 1 {
 569                tx.commit().await?;
 570                Ok(())
 571            } else {
 572                Err(anyhow!("no such contact request"))?
 573            }
 574        })
 575        .await
 576    }
 577
 578    pub fn fuzzy_like_string(string: &str) -> String {
 579        let mut result = String::with_capacity(string.len() * 2 + 1);
 580        for c in string.chars() {
 581            if c.is_alphanumeric() {
 582                result.push('%');
 583                result.push(c);
 584            }
 585        }
 586        result.push('%');
 587        result
 588    }
 589
 590    pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
 591        self.transact(|tx| async {
 592            let tx = tx;
 593            let like_string = Self::fuzzy_like_string(name_query);
 594            let query = "
 595                SELECT users.*
 596                FROM users
 597                WHERE github_login ILIKE $1
 598                ORDER BY github_login <-> $2
 599                LIMIT $3
 600            ";
 601
 602            Ok(user::Entity::find()
 603                .from_raw_sql(Statement::from_sql_and_values(
 604                    self.pool.get_database_backend(),
 605                    query.into(),
 606                    vec![like_string.into(), name_query.into(), limit.into()],
 607                ))
 608                .all(&tx)
 609                .await?)
 610        })
 611        .await
 612    }
 613
 614    // signups
 615
 616    pub async fn create_signup(&self, signup: NewSignup) -> Result<()> {
 617        self.transact(|tx| async {
 618            signup::ActiveModel {
 619                email_address: ActiveValue::set(signup.email_address.clone()),
 620                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 621                email_confirmation_sent: ActiveValue::set(false),
 622                platform_mac: ActiveValue::set(signup.platform_mac),
 623                platform_windows: ActiveValue::set(signup.platform_windows),
 624                platform_linux: ActiveValue::set(signup.platform_linux),
 625                platform_unknown: ActiveValue::set(false),
 626                editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
 627                programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
 628                device_id: ActiveValue::set(signup.device_id.clone()),
 629                ..Default::default()
 630            }
 631            .insert(&tx)
 632            .await?;
 633            tx.commit().await?;
 634            Ok(())
 635        })
 636        .await
 637    }
 638
 639    pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
 640        self.transact(|tx| async move {
 641            let query = "
 642                SELECT
 643                    COUNT(*) as count,
 644                    COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
 645                    COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
 646                    COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
 647                    COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
 648                FROM (
 649                    SELECT *
 650                    FROM signups
 651                    WHERE
 652                        NOT email_confirmation_sent
 653                ) AS unsent
 654            ";
 655            Ok(
 656                WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
 657                    self.pool.get_database_backend(),
 658                    query.into(),
 659                    vec![],
 660                ))
 661                .one(&tx)
 662                .await?
 663                .ok_or_else(|| anyhow!("invalid result"))?,
 664            )
 665        })
 666        .await
 667    }
 668
 669    pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
 670        let emails = invites
 671            .iter()
 672            .map(|s| s.email_address.as_str())
 673            .collect::<Vec<_>>();
 674        self.transact(|tx| async {
 675            signup::Entity::update_many()
 676                .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
 677                .col_expr(signup::Column::EmailConfirmationSent, true.into())
 678                .exec(&tx)
 679                .await?;
 680            tx.commit().await?;
 681            Ok(())
 682        })
 683        .await
 684    }
 685
 686    pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
 687        self.transact(|tx| async move {
 688            Ok(signup::Entity::find()
 689                .select_only()
 690                .column(signup::Column::EmailAddress)
 691                .column(signup::Column::EmailConfirmationCode)
 692                .filter(
 693                    signup::Column::EmailConfirmationSent.eq(false).and(
 694                        signup::Column::PlatformMac
 695                            .eq(true)
 696                            .or(signup::Column::PlatformUnknown.eq(true)),
 697                    ),
 698                )
 699                .limit(count as u64)
 700                .into_model()
 701                .all(&tx)
 702                .await?)
 703        })
 704        .await
 705    }
 706
 707    // invite codes
 708
 709    pub async fn create_invite_from_code(
 710        &self,
 711        code: &str,
 712        email_address: &str,
 713        device_id: Option<&str>,
 714    ) -> Result<Invite> {
 715        self.transact(|tx| async move {
 716            let existing_user = user::Entity::find()
 717                .filter(user::Column::EmailAddress.eq(email_address))
 718                .one(&tx)
 719                .await?;
 720
 721            if existing_user.is_some() {
 722                Err(anyhow!("email address is already in use"))?;
 723            }
 724
 725            let inviter = match user::Entity::find()
 726                .filter(user::Column::InviteCode.eq(code))
 727                .one(&tx)
 728                .await?
 729            {
 730                Some(inviter) => inviter,
 731                None => {
 732                    return Err(Error::Http(
 733                        StatusCode::NOT_FOUND,
 734                        "invite code not found".to_string(),
 735                    ))?
 736                }
 737            };
 738
 739            if inviter.invite_count == 0 {
 740                Err(Error::Http(
 741                    StatusCode::UNAUTHORIZED,
 742                    "no invites remaining".to_string(),
 743                ))?;
 744            }
 745
 746            let signup = signup::Entity::insert(signup::ActiveModel {
 747                email_address: ActiveValue::set(email_address.into()),
 748                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 749                email_confirmation_sent: ActiveValue::set(false),
 750                inviting_user_id: ActiveValue::set(Some(inviter.id)),
 751                platform_linux: ActiveValue::set(false),
 752                platform_mac: ActiveValue::set(false),
 753                platform_windows: ActiveValue::set(false),
 754                platform_unknown: ActiveValue::set(true),
 755                device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
 756                ..Default::default()
 757            })
 758            .on_conflict(
 759                OnConflict::column(signup::Column::EmailAddress)
 760                    .update_column(signup::Column::InvitingUserId)
 761                    .to_owned(),
 762            )
 763            .exec_with_returning(&tx)
 764            .await?;
 765            tx.commit().await?;
 766
 767            Ok(Invite {
 768                email_address: signup.email_address,
 769                email_confirmation_code: signup.email_confirmation_code,
 770            })
 771        })
 772        .await
 773    }
 774
 775    pub async fn create_user_from_invite(
 776        &self,
 777        invite: &Invite,
 778        user: NewUserParams,
 779    ) -> Result<Option<NewUserResult>> {
 780        self.transact(|tx| async {
 781            let tx = tx;
 782            let signup = signup::Entity::find()
 783                .filter(
 784                    signup::Column::EmailAddress
 785                        .eq(invite.email_address.as_str())
 786                        .and(
 787                            signup::Column::EmailConfirmationCode
 788                                .eq(invite.email_confirmation_code.as_str()),
 789                        ),
 790                )
 791                .one(&tx)
 792                .await?
 793                .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
 794
 795            if signup.user_id.is_some() {
 796                return Ok(None);
 797            }
 798
 799            let user = user::Entity::insert(user::ActiveModel {
 800                email_address: ActiveValue::set(Some(invite.email_address.clone())),
 801                github_login: ActiveValue::set(user.github_login.clone()),
 802                github_user_id: ActiveValue::set(Some(user.github_user_id)),
 803                admin: ActiveValue::set(false),
 804                invite_count: ActiveValue::set(user.invite_count),
 805                invite_code: ActiveValue::set(Some(random_invite_code())),
 806                metrics_id: ActiveValue::set(Uuid::new_v4()),
 807                ..Default::default()
 808            })
 809            .on_conflict(
 810                OnConflict::column(user::Column::GithubLogin)
 811                    .update_columns([
 812                        user::Column::EmailAddress,
 813                        user::Column::GithubUserId,
 814                        user::Column::Admin,
 815                    ])
 816                    .to_owned(),
 817            )
 818            .exec_with_returning(&tx)
 819            .await?;
 820
 821            let mut signup = signup.into_active_model();
 822            signup.user_id = ActiveValue::set(Some(user.id));
 823            let signup = signup.update(&tx).await?;
 824
 825            if let Some(inviting_user_id) = signup.inviting_user_id {
 826                let result = user::Entity::update_many()
 827                    .filter(
 828                        user::Column::Id
 829                            .eq(inviting_user_id)
 830                            .and(user::Column::InviteCount.gt(0)),
 831                    )
 832                    .col_expr(
 833                        user::Column::InviteCount,
 834                        Expr::col(user::Column::InviteCount).sub(1),
 835                    )
 836                    .exec(&tx)
 837                    .await?;
 838
 839                if result.rows_affected == 0 {
 840                    Err(Error::Http(
 841                        StatusCode::UNAUTHORIZED,
 842                        "no invites remaining".to_string(),
 843                    ))?;
 844                }
 845
 846                contact::Entity::insert(contact::ActiveModel {
 847                    user_id_a: ActiveValue::set(inviting_user_id),
 848                    user_id_b: ActiveValue::set(user.id),
 849                    a_to_b: ActiveValue::set(true),
 850                    should_notify: ActiveValue::set(true),
 851                    accepted: ActiveValue::set(true),
 852                    ..Default::default()
 853                })
 854                .on_conflict(OnConflict::new().do_nothing().to_owned())
 855                .exec_without_returning(&tx)
 856                .await?;
 857            }
 858
 859            tx.commit().await?;
 860            Ok(Some(NewUserResult {
 861                user_id: user.id,
 862                metrics_id: user.metrics_id.to_string(),
 863                inviting_user_id: signup.inviting_user_id,
 864                signup_device_id: signup.device_id,
 865            }))
 866        })
 867        .await
 868    }
 869
 870    pub async fn set_invite_count_for_user(&self, id: UserId, count: u32) -> Result<()> {
 871        self.transact(|tx| async move {
 872            if count > 0 {
 873                user::Entity::update_many()
 874                    .filter(
 875                        user::Column::Id
 876                            .eq(id)
 877                            .and(user::Column::InviteCode.is_null()),
 878                    )
 879                    .col_expr(user::Column::InviteCode, random_invite_code().into())
 880                    .exec(&tx)
 881                    .await?;
 882            }
 883
 884            user::Entity::update_many()
 885                .filter(user::Column::Id.eq(id))
 886                .col_expr(user::Column::InviteCount, count.into())
 887                .exec(&tx)
 888                .await?;
 889            tx.commit().await?;
 890            Ok(())
 891        })
 892        .await
 893    }
 894
 895    pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, u32)>> {
 896        self.transact(|tx| async move {
 897            match user::Entity::find_by_id(id).one(&tx).await? {
 898                Some(user) if user.invite_code.is_some() => {
 899                    Ok(Some((user.invite_code.unwrap(), user.invite_count as u32)))
 900                }
 901                _ => Ok(None),
 902            }
 903        })
 904        .await
 905    }
 906
 907    pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
 908        self.transact(|tx| async move {
 909            user::Entity::find()
 910                .filter(user::Column::InviteCode.eq(code))
 911                .one(&tx)
 912                .await?
 913                .ok_or_else(|| {
 914                    Error::Http(
 915                        StatusCode::NOT_FOUND,
 916                        "that invite code does not exist".to_string(),
 917                    )
 918                })
 919        })
 920        .await
 921    }
 922
 923    // rooms
 924
 925    pub async fn incoming_call_for_user(
 926        &self,
 927        user_id: UserId,
 928    ) -> Result<Option<proto::IncomingCall>> {
 929        self.transact(|tx| async move {
 930            let pending_participant = room_participant::Entity::find()
 931                .filter(
 932                    room_participant::Column::UserId
 933                        .eq(user_id)
 934                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
 935                )
 936                .one(&tx)
 937                .await?;
 938
 939            if let Some(pending_participant) = pending_participant {
 940                let room = self.get_room(pending_participant.room_id, &tx).await?;
 941                Ok(Self::build_incoming_call(&room, user_id))
 942            } else {
 943                Ok(None)
 944            }
 945        })
 946        .await
 947    }
 948
 949    pub async fn create_room(
 950        &self,
 951        user_id: UserId,
 952        connection_id: ConnectionId,
 953        live_kit_room: &str,
 954    ) -> Result<RoomGuard<proto::Room>> {
 955        self.transact(|tx| async move {
 956            todo!()
 957            // let room_id = sqlx::query_scalar(
 958            //     "
 959            //     INSERT INTO rooms (live_kit_room)
 960            //     VALUES ($1)
 961            //     RETURNING id
 962            //     ",
 963            // )
 964            // .bind(&live_kit_room)
 965            // .fetch_one(&mut tx)
 966            // .await
 967            // .map(RoomId)?;
 968
 969            // sqlx::query(
 970            //     "
 971            //     INSERT INTO room_participants (room_id, user_id, answering_connection_id, calling_user_id, calling_connection_id)
 972            //     VALUES ($1, $2, $3, $4, $5)
 973            //     ",
 974            // )
 975            // .bind(room_id)
 976            // .bind(user_id)
 977            // .bind(connection_id.0 as i32)
 978            // .bind(user_id)
 979            // .bind(connection_id.0 as i32)
 980            // .execute(&mut tx)
 981            // .await?;
 982
 983            // let room = self.get_room(room_id, &mut tx).await?;
 984            // self.commit_room_transaction(room_id, tx, room).await
 985        })
 986        .await
 987    }
 988
 989    pub async fn call(
 990        &self,
 991        room_id: RoomId,
 992        calling_user_id: UserId,
 993        calling_connection_id: ConnectionId,
 994        called_user_id: UserId,
 995        initial_project_id: Option<ProjectId>,
 996    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
 997        self.transact(|tx| async move {
 998            todo!()
 999            // sqlx::query(
1000            //     "
1001            //     INSERT INTO room_participants (
1002            //     room_id,
1003            //     user_id,
1004            //     calling_user_id,
1005            //     calling_connection_id,
1006            //     initial_project_id
1007            //     )
1008            //     VALUES ($1, $2, $3, $4, $5)
1009            //     ",
1010            // )
1011            // .bind(room_id)
1012            // .bind(called_user_id)
1013            // .bind(calling_user_id)
1014            // .bind(calling_connection_id.0 as i32)
1015            // .bind(initial_project_id)
1016            // .execute(&mut tx)
1017            // .await?;
1018
1019            // let room = self.get_room(room_id, &mut tx).await?;
1020            // let incoming_call = Self::build_incoming_call(&room, called_user_id)
1021            //     .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1022            // self.commit_room_transaction(room_id, tx, (room, incoming_call))
1023            // .await
1024        })
1025        .await
1026    }
1027
1028    pub async fn call_failed(
1029        &self,
1030        room_id: RoomId,
1031        called_user_id: UserId,
1032    ) -> Result<RoomGuard<proto::Room>> {
1033        self.transact(|tx| async move {
1034            todo!()
1035            // sqlx::query(
1036            //     "
1037            //     DELETE FROM room_participants
1038            //     WHERE room_id = $1 AND user_id = $2
1039            //     ",
1040            // )
1041            // .bind(room_id)
1042            // .bind(called_user_id)
1043            // .execute(&mut tx)
1044            // .await?;
1045
1046            // let room = self.get_room(room_id, &mut tx).await?;
1047            // self.commit_room_transaction(room_id, tx, room).await
1048        })
1049        .await
1050    }
1051
1052    pub async fn decline_call(
1053        &self,
1054        expected_room_id: Option<RoomId>,
1055        user_id: UserId,
1056    ) -> Result<RoomGuard<proto::Room>> {
1057        self.transact(|tx| async move {
1058            todo!()
1059            // let room_id = sqlx::query_scalar(
1060            //     "
1061            //     DELETE FROM room_participants
1062            //     WHERE user_id = $1 AND answering_connection_id IS NULL
1063            //     RETURNING room_id
1064            //     ",
1065            // )
1066            // .bind(user_id)
1067            // .fetch_one(&mut tx)
1068            // .await?;
1069            // if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) {
1070            //     return Err(anyhow!("declining call on unexpected room"))?;
1071            // }
1072
1073            // let room = self.get_room(room_id, &mut tx).await?;
1074            // self.commit_room_transaction(room_id, tx, room).await
1075        })
1076        .await
1077    }
1078
1079    pub async fn cancel_call(
1080        &self,
1081        expected_room_id: Option<RoomId>,
1082        calling_connection_id: ConnectionId,
1083        called_user_id: UserId,
1084    ) -> Result<RoomGuard<proto::Room>> {
1085        self.transact(|tx| async move {
1086            todo!()
1087            // let room_id = sqlx::query_scalar(
1088            //     "
1089            //     DELETE FROM room_participants
1090            //     WHERE user_id = $1 AND calling_connection_id = $2 AND answering_connection_id IS NULL
1091            //     RETURNING room_id
1092            //     ",
1093            // )
1094            // .bind(called_user_id)
1095            // .bind(calling_connection_id.0 as i32)
1096            // .fetch_one(&mut tx)
1097            // .await?;
1098            // if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) {
1099            //     return Err(anyhow!("canceling call on unexpected room"))?;
1100            // }
1101
1102            // let room = self.get_room(room_id, &mut tx).await?;
1103            // self.commit_room_transaction(room_id, tx, room).await
1104        })
1105        .await
1106    }
1107
1108    pub async fn join_room(
1109        &self,
1110        room_id: RoomId,
1111        user_id: UserId,
1112        connection_id: ConnectionId,
1113    ) -> Result<RoomGuard<proto::Room>> {
1114        self.transact(|tx| async move {
1115            todo!()
1116            // sqlx::query(
1117            //     "
1118            //     UPDATE room_participants
1119            //     SET answering_connection_id = $1
1120            //     WHERE room_id = $2 AND user_id = $3
1121            //     RETURNING 1
1122            //     ",
1123            // )
1124            // .bind(connection_id.0 as i32)
1125            // .bind(room_id)
1126            // .bind(user_id)
1127            // .fetch_one(&mut tx)
1128            // .await?;
1129
1130            // let room = self.get_room(room_id, &mut tx).await?;
1131            // self.commit_room_transaction(room_id, tx, room).await
1132        })
1133        .await
1134    }
1135
1136    pub async fn leave_room(
1137        &self,
1138        connection_id: ConnectionId,
1139    ) -> Result<Option<RoomGuard<LeftRoom>>> {
1140        self.transact(|tx| async move {
1141            todo!()
1142            // // Leave room.
1143            // let room_id = sqlx::query_scalar::<_, RoomId>(
1144            //     "
1145            //     DELETE FROM room_participants
1146            //     WHERE answering_connection_id = $1
1147            //     RETURNING room_id
1148            //     ",
1149            // )
1150            // .bind(connection_id.0 as i32)
1151            // .fetch_optional(&mut tx)
1152            // .await?;
1153
1154            // if let Some(room_id) = room_id {
1155            //     // Cancel pending calls initiated by the leaving user.
1156            //     let canceled_calls_to_user_ids: Vec<UserId> = sqlx::query_scalar(
1157            //         "
1158            //         DELETE FROM room_participants
1159            //         WHERE calling_connection_id = $1 AND answering_connection_id IS NULL
1160            //         RETURNING user_id
1161            //         ",
1162            //     )
1163            //     .bind(connection_id.0 as i32)
1164            //     .fetch_all(&mut tx)
1165            //     .await?;
1166
1167            //     let project_ids = sqlx::query_scalar::<_, ProjectId>(
1168            //         "
1169            //         SELECT project_id
1170            //         FROM project_collaborators
1171            //         WHERE connection_id = $1
1172            //         ",
1173            //     )
1174            //     .bind(connection_id.0 as i32)
1175            //     .fetch_all(&mut tx)
1176            //     .await?;
1177
1178            //     // Leave projects.
1179            //     let mut left_projects = HashMap::default();
1180            //     if !project_ids.is_empty() {
1181            //         let mut params = "?,".repeat(project_ids.len());
1182            //         params.pop();
1183            //         let query = format!(
1184            //             "
1185            //             SELECT *
1186            //             FROM project_collaborators
1187            //             WHERE project_id IN ({params})
1188            //             "
1189            //         );
1190            //         let mut query = sqlx::query_as::<_, ProjectCollaborator>(&query);
1191            //         for project_id in project_ids {
1192            //             query = query.bind(project_id);
1193            //         }
1194
1195            //         let mut project_collaborators = query.fetch(&mut tx);
1196            //         while let Some(collaborator) = project_collaborators.next().await {
1197            //             let collaborator = collaborator?;
1198            //             let left_project =
1199            //                 left_projects
1200            //                     .entry(collaborator.project_id)
1201            //                     .or_insert(LeftProject {
1202            //                         id: collaborator.project_id,
1203            //                         host_user_id: Default::default(),
1204            //                         connection_ids: Default::default(),
1205            //                         host_connection_id: Default::default(),
1206            //                     });
1207
1208            //             let collaborator_connection_id =
1209            //                 ConnectionId(collaborator.connection_id as u32);
1210            //             if collaborator_connection_id != connection_id {
1211            //                 left_project.connection_ids.push(collaborator_connection_id);
1212            //             }
1213
1214            //             if collaborator.is_host {
1215            //                 left_project.host_user_id = collaborator.user_id;
1216            //                 left_project.host_connection_id =
1217            //                     ConnectionId(collaborator.connection_id as u32);
1218            //             }
1219            //         }
1220            //     }
1221            //     sqlx::query(
1222            //         "
1223            //         DELETE FROM project_collaborators
1224            //         WHERE connection_id = $1
1225            //         ",
1226            //     )
1227            //     .bind(connection_id.0 as i32)
1228            //     .execute(&mut tx)
1229            //     .await?;
1230
1231            //     // Unshare projects.
1232            //     sqlx::query(
1233            //         "
1234            //         DELETE FROM projects
1235            //         WHERE room_id = $1 AND host_connection_id = $2
1236            //         ",
1237            //     )
1238            //     .bind(room_id)
1239            //     .bind(connection_id.0 as i32)
1240            //     .execute(&mut tx)
1241            //     .await?;
1242
1243            //     let room = self.get_room(room_id, &mut tx).await?;
1244            //     Ok(Some(
1245            //         self.commit_room_transaction(
1246            //             room_id,
1247            //             tx,
1248            //             LeftRoom {
1249            //                 room,
1250            //                 left_projects,
1251            //                 canceled_calls_to_user_ids,
1252            //             },
1253            //         )
1254            //         .await?,
1255            //     ))
1256            // } else {
1257            //     Ok(None)
1258            // }
1259        })
1260        .await
1261    }
1262
1263    pub async fn update_room_participant_location(
1264        &self,
1265        room_id: RoomId,
1266        connection_id: ConnectionId,
1267        location: proto::ParticipantLocation,
1268    ) -> Result<RoomGuard<proto::Room>> {
1269        self.transact(|tx| async {
1270            todo!()
1271            // let mut tx = tx;
1272            // let location_kind;
1273            // let location_project_id;
1274            // match location
1275            //     .variant
1276            //     .as_ref()
1277            //     .ok_or_else(|| anyhow!("invalid location"))?
1278            // {
1279            //     proto::participant_location::Variant::SharedProject(project) => {
1280            //         location_kind = 0;
1281            //         location_project_id = Some(ProjectId::from_proto(project.id));
1282            //     }
1283            //     proto::participant_location::Variant::UnsharedProject(_) => {
1284            //         location_kind = 1;
1285            //         location_project_id = None;
1286            //     }
1287            //     proto::participant_location::Variant::External(_) => {
1288            //         location_kind = 2;
1289            //         location_project_id = None;
1290            //     }
1291            // }
1292
1293            // sqlx::query(
1294            //     "
1295            //     UPDATE room_participants
1296            //     SET location_kind = $1, location_project_id = $2
1297            //     WHERE room_id = $3 AND answering_connection_id = $4
1298            //     RETURNING 1
1299            //     ",
1300            // )
1301            // .bind(location_kind)
1302            // .bind(location_project_id)
1303            // .bind(room_id)
1304            // .bind(connection_id.0 as i32)
1305            // .fetch_one(&mut tx)
1306            // .await?;
1307
1308            // let room = self.get_room(room_id, &mut tx).await?;
1309            // self.commit_room_transaction(room_id, tx, room).await
1310        })
1311        .await
1312    }
1313
1314    async fn get_guest_connection_ids(
1315        &self,
1316        project_id: ProjectId,
1317        tx: &DatabaseTransaction,
1318    ) -> Result<Vec<ConnectionId>> {
1319        todo!()
1320        // let mut guest_connection_ids = Vec::new();
1321        // let mut db_guest_connection_ids = sqlx::query_scalar::<_, i32>(
1322        //     "
1323        //     SELECT connection_id
1324        //     FROM project_collaborators
1325        //     WHERE project_id = $1 AND is_host = FALSE
1326        //     ",
1327        // )
1328        // .bind(project_id)
1329        // .fetch(tx);
1330        // while let Some(connection_id) = db_guest_connection_ids.next().await {
1331        //     guest_connection_ids.push(ConnectionId(connection_id? as u32));
1332        // }
1333        // Ok(guest_connection_ids)
1334    }
1335
1336    fn build_incoming_call(
1337        room: &proto::Room,
1338        called_user_id: UserId,
1339    ) -> Option<proto::IncomingCall> {
1340        let pending_participant = room
1341            .pending_participants
1342            .iter()
1343            .find(|participant| participant.user_id == called_user_id.to_proto())?;
1344
1345        Some(proto::IncomingCall {
1346            room_id: room.id,
1347            calling_user_id: pending_participant.calling_user_id,
1348            participant_user_ids: room
1349                .participants
1350                .iter()
1351                .map(|participant| participant.user_id)
1352                .collect(),
1353            initial_project: room.participants.iter().find_map(|participant| {
1354                let initial_project_id = pending_participant.initial_project_id?;
1355                participant
1356                    .projects
1357                    .iter()
1358                    .find(|project| project.id == initial_project_id)
1359                    .cloned()
1360            }),
1361        })
1362    }
1363
1364    async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1365        let db_room = room::Entity::find_by_id(room_id)
1366            .one(tx)
1367            .await?
1368            .ok_or_else(|| anyhow!("could not find room"))?;
1369
1370        let mut db_participants = db_room
1371            .find_related(room_participant::Entity)
1372            .stream(tx)
1373            .await?;
1374        let mut participants = HashMap::default();
1375        let mut pending_participants = Vec::new();
1376        while let Some(db_participant) = db_participants.next().await {
1377            let db_participant = db_participant?;
1378            if let Some(answering_connection_id) = db_participant.answering_connection_id {
1379                let location = match (
1380                    db_participant.location_kind,
1381                    db_participant.location_project_id,
1382                ) {
1383                    (Some(0), Some(project_id)) => {
1384                        Some(proto::participant_location::Variant::SharedProject(
1385                            proto::participant_location::SharedProject {
1386                                id: project_id.to_proto(),
1387                            },
1388                        ))
1389                    }
1390                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1391                        Default::default(),
1392                    )),
1393                    _ => Some(proto::participant_location::Variant::External(
1394                        Default::default(),
1395                    )),
1396                };
1397                participants.insert(
1398                    answering_connection_id,
1399                    proto::Participant {
1400                        user_id: db_participant.user_id.to_proto(),
1401                        peer_id: answering_connection_id as u32,
1402                        projects: Default::default(),
1403                        location: Some(proto::ParticipantLocation { variant: location }),
1404                    },
1405                );
1406            } else {
1407                pending_participants.push(proto::PendingParticipant {
1408                    user_id: db_participant.user_id.to_proto(),
1409                    calling_user_id: db_participant.calling_user_id.to_proto(),
1410                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1411                });
1412            }
1413        }
1414
1415        let mut db_projects = db_room
1416            .find_related(project::Entity)
1417            .find_with_related(worktree::Entity)
1418            .stream(tx)
1419            .await?;
1420
1421        while let Some(row) = db_projects.next().await {
1422            let (db_project, db_worktree) = row?;
1423            if let Some(participant) = participants.get_mut(&db_project.host_connection_id) {
1424                let project = if let Some(project) = participant
1425                    .projects
1426                    .iter_mut()
1427                    .find(|project| project.id == db_project.id.to_proto())
1428                {
1429                    project
1430                } else {
1431                    participant.projects.push(proto::ParticipantProject {
1432                        id: db_project.id.to_proto(),
1433                        worktree_root_names: Default::default(),
1434                    });
1435                    participant.projects.last_mut().unwrap()
1436                };
1437
1438                if let Some(db_worktree) = db_worktree {
1439                    project.worktree_root_names.push(db_worktree.root_name);
1440                }
1441            }
1442        }
1443
1444        Ok(proto::Room {
1445            id: db_room.id.to_proto(),
1446            live_kit_room: db_room.live_kit_room,
1447            participants: participants.into_values().collect(),
1448            pending_participants,
1449        })
1450    }
1451
1452    async fn commit_room_transaction<T>(
1453        &self,
1454        room_id: RoomId,
1455        tx: DatabaseTransaction,
1456        data: T,
1457    ) -> Result<RoomGuard<T>> {
1458        let lock = self.rooms.entry(room_id).or_default().clone();
1459        let _guard = lock.lock_owned().await;
1460        tx.commit().await?;
1461        Ok(RoomGuard {
1462            data,
1463            _guard,
1464            _not_send: PhantomData,
1465        })
1466    }
1467
1468    // projects
1469
1470    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
1471        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1472        enum QueryAs {
1473            Count,
1474        }
1475
1476        self.transact(|tx| async move {
1477            Ok(project::Entity::find()
1478                .select_only()
1479                .column_as(project::Column::Id.count(), QueryAs::Count)
1480                .inner_join(user::Entity)
1481                .filter(user::Column::Admin.eq(false))
1482                .into_values::<_, QueryAs>()
1483                .one(&tx)
1484                .await?
1485                .unwrap_or(0) as usize)
1486        })
1487        .await
1488    }
1489
1490    pub async fn share_project(
1491        &self,
1492        room_id: RoomId,
1493        connection_id: ConnectionId,
1494        worktrees: &[proto::WorktreeMetadata],
1495    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
1496        self.transact(|tx| async move {
1497            let participant = room_participant::Entity::find()
1498                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0))
1499                .one(&tx)
1500                .await?
1501                .ok_or_else(|| anyhow!("could not find participant"))?;
1502            if participant.room_id != room_id {
1503                return Err(anyhow!("shared project on unexpected room"))?;
1504            }
1505
1506            let project = project::ActiveModel {
1507                room_id: ActiveValue::set(participant.room_id),
1508                host_user_id: ActiveValue::set(participant.user_id),
1509                host_connection_id: ActiveValue::set(connection_id.0 as i32),
1510                ..Default::default()
1511            }
1512            .insert(&tx)
1513            .await?;
1514
1515            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
1516                id: ActiveValue::set(worktree.id as i32),
1517                project_id: ActiveValue::set(project.id),
1518                abs_path: ActiveValue::set(worktree.abs_path.clone()),
1519                root_name: ActiveValue::set(worktree.root_name.clone()),
1520                visible: ActiveValue::set(worktree.visible),
1521                scan_id: ActiveValue::set(0),
1522                is_complete: ActiveValue::set(false),
1523            }))
1524            .exec(&tx)
1525            .await?;
1526
1527            project_collaborator::ActiveModel {
1528                project_id: ActiveValue::set(project.id),
1529                connection_id: ActiveValue::set(connection_id.0 as i32),
1530                user_id: ActiveValue::set(participant.user_id),
1531                replica_id: ActiveValue::set(ReplicaId(0)),
1532                is_host: ActiveValue::set(true),
1533                ..Default::default()
1534            }
1535            .insert(&tx)
1536            .await?;
1537
1538            let room = self.get_room(room_id, &tx).await?;
1539            self.commit_room_transaction(room_id, tx, (project.id, room))
1540                .await
1541        })
1542        .await
1543    }
1544
1545    pub async fn unshare_project(
1546        &self,
1547        project_id: ProjectId,
1548        connection_id: ConnectionId,
1549    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
1550        self.transact(|tx| async move {
1551            todo!()
1552            // let guest_connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?;
1553            // let room_id: RoomId = sqlx::query_scalar(
1554            //     "
1555            //     DELETE FROM projects
1556            //     WHERE id = $1 AND host_connection_id = $2
1557            //     RETURNING room_id
1558            //     ",
1559            // )
1560            // .bind(project_id)
1561            // .bind(connection_id.0 as i32)
1562            // .fetch_one(&mut tx)
1563            // .await?;
1564            // let room = self.get_room(room_id, &mut tx).await?;
1565            // self.commit_room_transaction(room_id, tx, (room, guest_connection_ids))
1566            //     .await
1567        })
1568        .await
1569    }
1570
1571    pub async fn update_project(
1572        &self,
1573        project_id: ProjectId,
1574        connection_id: ConnectionId,
1575        worktrees: &[proto::WorktreeMetadata],
1576    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
1577        self.transact(|tx| async move {
1578            todo!()
1579            // let room_id: RoomId = sqlx::query_scalar(
1580            //     "
1581            //     SELECT room_id
1582            //     FROM projects
1583            //     WHERE id = $1 AND host_connection_id = $2
1584            //     ",
1585            // )
1586            // .bind(project_id)
1587            // .bind(connection_id.0 as i32)
1588            // .fetch_one(&mut tx)
1589            // .await?;
1590
1591            // if !worktrees.is_empty() {
1592            //     let mut params = "(?, ?, ?, ?, ?, ?, ?),".repeat(worktrees.len());
1593            //     params.pop();
1594            //     let query = format!(
1595            //         "
1596            //         INSERT INTO worktrees (
1597            //         project_id,
1598            //         id,
1599            //         root_name,
1600            //         abs_path,
1601            //         visible,
1602            //         scan_id,
1603            //         is_complete
1604            //         )
1605            //         VALUES {params}
1606            //         ON CONFLICT (project_id, id) DO UPDATE SET root_name = excluded.root_name
1607            //         "
1608            //     );
1609
1610            //     let mut query = sqlx::query(&query);
1611            //     for worktree in worktrees {
1612            //         query = query
1613            //             .bind(project_id)
1614            //             .bind(worktree.id as i32)
1615            //             .bind(&worktree.root_name)
1616            //             .bind(&worktree.abs_path)
1617            //             .bind(worktree.visible)
1618            //             .bind(0)
1619            //             .bind(false)
1620            //     }
1621            //     query.execute(&mut tx).await?;
1622            // }
1623
1624            // let mut params = "?,".repeat(worktrees.len());
1625            // if !worktrees.is_empty() {
1626            //     params.pop();
1627            // }
1628            // let query = format!(
1629            //     "
1630            //     DELETE FROM worktrees
1631            //     WHERE project_id = ? AND id NOT IN ({params})
1632            //     ",
1633            // );
1634
1635            // let mut query = sqlx::query(&query).bind(project_id);
1636            // for worktree in worktrees {
1637            //     query = query.bind(WorktreeId(worktree.id as i32));
1638            // }
1639            // query.execute(&mut tx).await?;
1640
1641            // let guest_connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?;
1642            // let room = self.get_room(room_id, &mut tx).await?;
1643            // self.commit_room_transaction(room_id, tx, (room, guest_connection_ids))
1644            //     .await
1645        })
1646        .await
1647    }
1648
1649    pub async fn update_worktree(
1650        &self,
1651        update: &proto::UpdateWorktree,
1652        connection_id: ConnectionId,
1653    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
1654        self.transact(|tx| async move {
1655            todo!()
1656            // let project_id = ProjectId::from_proto(update.project_id);
1657            // let worktree_id = WorktreeId::from_proto(update.worktree_id);
1658
1659            // // Ensure the update comes from the host.
1660            // let room_id: RoomId = sqlx::query_scalar(
1661            //     "
1662            //     SELECT room_id
1663            //     FROM projects
1664            //     WHERE id = $1 AND host_connection_id = $2
1665            //     ",
1666            // )
1667            // .bind(project_id)
1668            // .bind(connection_id.0 as i32)
1669            // .fetch_one(&mut tx)
1670            // .await?;
1671
1672            // // Update metadata.
1673            // sqlx::query(
1674            //     "
1675            //     UPDATE worktrees
1676            //     SET
1677            //     root_name = $1,
1678            //     scan_id = $2,
1679            //     is_complete = $3,
1680            //     abs_path = $4
1681            //     WHERE project_id = $5 AND id = $6
1682            //     RETURNING 1
1683            //     ",
1684            // )
1685            // .bind(&update.root_name)
1686            // .bind(update.scan_id as i64)
1687            // .bind(update.is_last_update)
1688            // .bind(&update.abs_path)
1689            // .bind(project_id)
1690            // .bind(worktree_id)
1691            // .fetch_one(&mut tx)
1692            // .await?;
1693
1694            // if !update.updated_entries.is_empty() {
1695            //     let mut params =
1696            //         "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?),".repeat(update.updated_entries.len());
1697            //     params.pop();
1698
1699            //     let query = format!(
1700            //         "
1701            //         INSERT INTO worktree_entries (
1702            //         project_id,
1703            //         worktree_id,
1704            //         id,
1705            //         is_dir,
1706            //         path,
1707            //         inode,
1708            //         mtime_seconds,
1709            //         mtime_nanos,
1710            //         is_symlink,
1711            //         is_ignored
1712            //         )
1713            //         VALUES {params}
1714            //         ON CONFLICT (project_id, worktree_id, id) DO UPDATE SET
1715            //         is_dir = excluded.is_dir,
1716            //         path = excluded.path,
1717            //         inode = excluded.inode,
1718            //         mtime_seconds = excluded.mtime_seconds,
1719            //         mtime_nanos = excluded.mtime_nanos,
1720            //         is_symlink = excluded.is_symlink,
1721            //         is_ignored = excluded.is_ignored
1722            //         "
1723            //     );
1724            //     let mut query = sqlx::query(&query);
1725            //     for entry in &update.updated_entries {
1726            //         let mtime = entry.mtime.clone().unwrap_or_default();
1727            //         query = query
1728            //             .bind(project_id)
1729            //             .bind(worktree_id)
1730            //             .bind(entry.id as i64)
1731            //             .bind(entry.is_dir)
1732            //             .bind(&entry.path)
1733            //             .bind(entry.inode as i64)
1734            //             .bind(mtime.seconds as i64)
1735            //             .bind(mtime.nanos as i32)
1736            //             .bind(entry.is_symlink)
1737            //             .bind(entry.is_ignored);
1738            //     }
1739            //     query.execute(&mut tx).await?;
1740            // }
1741
1742            // if !update.removed_entries.is_empty() {
1743            //     let mut params = "?,".repeat(update.removed_entries.len());
1744            //     params.pop();
1745            //     let query = format!(
1746            //         "
1747            //         DELETE FROM worktree_entries
1748            //         WHERE project_id = ? AND worktree_id = ? AND id IN ({params})
1749            //         "
1750            //     );
1751
1752            //     let mut query = sqlx::query(&query).bind(project_id).bind(worktree_id);
1753            //     for entry_id in &update.removed_entries {
1754            //         query = query.bind(*entry_id as i64);
1755            //     }
1756            //     query.execute(&mut tx).await?;
1757            // }
1758
1759            // let connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?;
1760            // self.commit_room_transaction(room_id, tx, connection_ids)
1761            //     .await
1762        })
1763        .await
1764    }
1765
1766    pub async fn update_diagnostic_summary(
1767        &self,
1768        update: &proto::UpdateDiagnosticSummary,
1769        connection_id: ConnectionId,
1770    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
1771        self.transact(|tx| async {
1772            todo!()
1773            // let project_id = ProjectId::from_proto(update.project_id);
1774            // let worktree_id = WorktreeId::from_proto(update.worktree_id);
1775            // let summary = update
1776            //     .summary
1777            //     .as_ref()
1778            //     .ok_or_else(|| anyhow!("invalid summary"))?;
1779
1780            // // Ensure the update comes from the host.
1781            // let room_id: RoomId = sqlx::query_scalar(
1782            //     "
1783            //     SELECT room_id
1784            //     FROM projects
1785            //     WHERE id = $1 AND host_connection_id = $2
1786            //     ",
1787            // )
1788            // .bind(project_id)
1789            // .bind(connection_id.0 as i32)
1790            // .fetch_one(&mut tx)
1791            // .await?;
1792
1793            // // Update summary.
1794            // sqlx::query(
1795            //     "
1796            //     INSERT INTO worktree_diagnostic_summaries (
1797            //     project_id,
1798            //     worktree_id,
1799            //     path,
1800            //     language_server_id,
1801            //     error_count,
1802            //     warning_count
1803            //     )
1804            //     VALUES ($1, $2, $3, $4, $5, $6)
1805            //     ON CONFLICT (project_id, worktree_id, path) DO UPDATE SET
1806            //     language_server_id = excluded.language_server_id,
1807            //     error_count = excluded.error_count,
1808            //     warning_count = excluded.warning_count
1809            //     ",
1810            // )
1811            // .bind(project_id)
1812            // .bind(worktree_id)
1813            // .bind(&summary.path)
1814            // .bind(summary.language_server_id as i64)
1815            // .bind(summary.error_count as i32)
1816            // .bind(summary.warning_count as i32)
1817            // .execute(&mut tx)
1818            // .await?;
1819
1820            // let connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?;
1821            // self.commit_room_transaction(room_id, tx, connection_ids)
1822            //     .await
1823        })
1824        .await
1825    }
1826
1827    pub async fn start_language_server(
1828        &self,
1829        update: &proto::StartLanguageServer,
1830        connection_id: ConnectionId,
1831    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
1832        self.transact(|tx| async {
1833            todo!()
1834            // let project_id = ProjectId::from_proto(update.project_id);
1835            // let server = update
1836            //     .server
1837            //     .as_ref()
1838            //     .ok_or_else(|| anyhow!("invalid language server"))?;
1839
1840            // // Ensure the update comes from the host.
1841            // let room_id: RoomId = sqlx::query_scalar(
1842            //     "
1843            //     SELECT room_id
1844            //     FROM projects
1845            //     WHERE id = $1 AND host_connection_id = $2
1846            //     ",
1847            // )
1848            // .bind(project_id)
1849            // .bind(connection_id.0 as i32)
1850            // .fetch_one(&mut tx)
1851            // .await?;
1852
1853            // // Add the newly-started language server.
1854            // sqlx::query(
1855            //     "
1856            //     INSERT INTO language_servers (project_id, id, name)
1857            //     VALUES ($1, $2, $3)
1858            //     ON CONFLICT (project_id, id) DO UPDATE SET
1859            //     name = excluded.name
1860            //     ",
1861            // )
1862            // .bind(project_id)
1863            // .bind(server.id as i64)
1864            // .bind(&server.name)
1865            // .execute(&mut tx)
1866            // .await?;
1867
1868            // let connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?;
1869            // self.commit_room_transaction(room_id, tx, connection_ids)
1870            //     .await
1871        })
1872        .await
1873    }
1874
1875    pub async fn join_project(
1876        &self,
1877        project_id: ProjectId,
1878        connection_id: ConnectionId,
1879    ) -> Result<RoomGuard<(Project, ReplicaId)>> {
1880        self.transact(|tx| async move {
1881            todo!()
1882            // let (room_id, user_id) = sqlx::query_as::<_, (RoomId, UserId)>(
1883            //     "
1884            //     SELECT room_id, user_id
1885            //     FROM room_participants
1886            //     WHERE answering_connection_id = $1
1887            //     ",
1888            // )
1889            // .bind(connection_id.0 as i32)
1890            // .fetch_one(&mut tx)
1891            // .await?;
1892
1893            // // Ensure project id was shared on this room.
1894            // sqlx::query(
1895            //     "
1896            //     SELECT 1
1897            //     FROM projects
1898            //     WHERE id = $1 AND room_id = $2
1899            //     ",
1900            // )
1901            // .bind(project_id)
1902            // .bind(room_id)
1903            // .fetch_one(&mut tx)
1904            // .await?;
1905
1906            // let mut collaborators = sqlx::query_as::<_, ProjectCollaborator>(
1907            //     "
1908            //     SELECT *
1909            //     FROM project_collaborators
1910            //     WHERE project_id = $1
1911            //     ",
1912            // )
1913            // .bind(project_id)
1914            // .fetch_all(&mut tx)
1915            // .await?;
1916            // let replica_ids = collaborators
1917            //     .iter()
1918            //     .map(|c| c.replica_id)
1919            //     .collect::<HashSet<_>>();
1920            // let mut replica_id = ReplicaId(1);
1921            // while replica_ids.contains(&replica_id) {
1922            //     replica_id.0 += 1;
1923            // }
1924            // let new_collaborator = ProjectCollaborator {
1925            //     project_id,
1926            //     connection_id: connection_id.0 as i32,
1927            //     user_id,
1928            //     replica_id,
1929            //     is_host: false,
1930            // };
1931
1932            // sqlx::query(
1933            //     "
1934            //     INSERT INTO project_collaborators (
1935            //     project_id,
1936            //     connection_id,
1937            //     user_id,
1938            //     replica_id,
1939            //     is_host
1940            //     )
1941            //     VALUES ($1, $2, $3, $4, $5)
1942            //     ",
1943            // )
1944            // .bind(new_collaborator.project_id)
1945            // .bind(new_collaborator.connection_id)
1946            // .bind(new_collaborator.user_id)
1947            // .bind(new_collaborator.replica_id)
1948            // .bind(new_collaborator.is_host)
1949            // .execute(&mut tx)
1950            // .await?;
1951            // collaborators.push(new_collaborator);
1952
1953            // let worktree_rows = sqlx::query_as::<_, WorktreeRow>(
1954            //     "
1955            //     SELECT *
1956            //     FROM worktrees
1957            //     WHERE project_id = $1
1958            //     ",
1959            // )
1960            // .bind(project_id)
1961            // .fetch_all(&mut tx)
1962            // .await?;
1963            // let mut worktrees = worktree_rows
1964            //     .into_iter()
1965            //     .map(|worktree_row| {
1966            //         (
1967            //             worktree_row.id,
1968            //             Worktree {
1969            //                 id: worktree_row.id,
1970            //                 abs_path: worktree_row.abs_path,
1971            //                 root_name: worktree_row.root_name,
1972            //                 visible: worktree_row.visible,
1973            //                 entries: Default::default(),
1974            //                 diagnostic_summaries: Default::default(),
1975            //                 scan_id: worktree_row.scan_id as u64,
1976            //                 is_complete: worktree_row.is_complete,
1977            //             },
1978            //         )
1979            //     })
1980            //     .collect::<BTreeMap<_, _>>();
1981
1982            // // Populate worktree entries.
1983            // {
1984            //     let mut entries = sqlx::query_as::<_, WorktreeEntry>(
1985            //         "
1986            //         SELECT *
1987            //         FROM worktree_entries
1988            //         WHERE project_id = $1
1989            //         ",
1990            //     )
1991            //     .bind(project_id)
1992            //     .fetch(&mut tx);
1993            //     while let Some(entry) = entries.next().await {
1994            //         let entry = entry?;
1995            //         if let Some(worktree) = worktrees.get_mut(&entry.worktree_id) {
1996            //             worktree.entries.push(proto::Entry {
1997            //                 id: entry.id as u64,
1998            //                 is_dir: entry.is_dir,
1999            //                 path: entry.path,
2000            //                 inode: entry.inode as u64,
2001            //                 mtime: Some(proto::Timestamp {
2002            //                     seconds: entry.mtime_seconds as u64,
2003            //                     nanos: entry.mtime_nanos as u32,
2004            //                 }),
2005            //                 is_symlink: entry.is_symlink,
2006            //                 is_ignored: entry.is_ignored,
2007            //             });
2008            //         }
2009            //     }
2010            // }
2011
2012            // // Populate worktree diagnostic summaries.
2013            // {
2014            //     let mut summaries = sqlx::query_as::<_, WorktreeDiagnosticSummary>(
2015            //         "
2016            //         SELECT *
2017            //         FROM worktree_diagnostic_summaries
2018            //         WHERE project_id = $1
2019            //         ",
2020            //     )
2021            //     .bind(project_id)
2022            //     .fetch(&mut tx);
2023            //     while let Some(summary) = summaries.next().await {
2024            //         let summary = summary?;
2025            //         if let Some(worktree) = worktrees.get_mut(&summary.worktree_id) {
2026            //             worktree
2027            //                 .diagnostic_summaries
2028            //                 .push(proto::DiagnosticSummary {
2029            //                     path: summary.path,
2030            //                     language_server_id: summary.language_server_id as u64,
2031            //                     error_count: summary.error_count as u32,
2032            //                     warning_count: summary.warning_count as u32,
2033            //                 });
2034            //         }
2035            //     }
2036            // }
2037
2038            // // Populate language servers.
2039            // let language_servers = sqlx::query_as::<_, LanguageServer>(
2040            //     "
2041            //     SELECT *
2042            //     FROM language_servers
2043            //     WHERE project_id = $1
2044            //     ",
2045            // )
2046            // .bind(project_id)
2047            // .fetch_all(&mut tx)
2048            // .await?;
2049
2050            // self.commit_room_transaction(
2051            //     room_id,
2052            //     tx,
2053            //     (
2054            //         Project {
2055            //             collaborators,
2056            //             worktrees,
2057            //             language_servers: language_servers
2058            //                 .into_iter()
2059            //                 .map(|language_server| proto::LanguageServer {
2060            //                     id: language_server.id.to_proto(),
2061            //                     name: language_server.name,
2062            //                 })
2063            //                 .collect(),
2064            //         },
2065            //         replica_id as ReplicaId,
2066            //     ),
2067            // )
2068            // .await
2069        })
2070        .await
2071    }
2072
2073    pub async fn leave_project(
2074        &self,
2075        project_id: ProjectId,
2076        connection_id: ConnectionId,
2077    ) -> Result<RoomGuard<LeftProject>> {
2078        self.transact(|tx| async move {
2079            todo!()
2080            // let result = sqlx::query(
2081            //     "
2082            //     DELETE FROM project_collaborators
2083            //     WHERE project_id = $1 AND connection_id = $2
2084            //     ",
2085            // )
2086            // .bind(project_id)
2087            // .bind(connection_id.0 as i32)
2088            // .execute(&mut tx)
2089            // .await?;
2090
2091            // if result.rows_affected() == 0 {
2092            //     Err(anyhow!("not a collaborator on this project"))?;
2093            // }
2094
2095            // let connection_ids = sqlx::query_scalar::<_, i32>(
2096            //     "
2097            //     SELECT connection_id
2098            //     FROM project_collaborators
2099            //     WHERE project_id = $1
2100            //     ",
2101            // )
2102            // .bind(project_id)
2103            // .fetch_all(&mut tx)
2104            // .await?
2105            // .into_iter()
2106            // .map(|id| ConnectionId(id as u32))
2107            // .collect();
2108
2109            // let (room_id, host_user_id, host_connection_id) =
2110            //     sqlx::query_as::<_, (RoomId, i32, i32)>(
2111            //         "
2112            //         SELECT room_id, host_user_id, host_connection_id
2113            //         FROM projects
2114            //         WHERE id = $1
2115            //         ",
2116            //     )
2117            //     .bind(project_id)
2118            //     .fetch_one(&mut tx)
2119            //     .await?;
2120
2121            // self.commit_room_transaction(
2122            //     room_id,
2123            //     tx,
2124            //     LeftProject {
2125            //         id: project_id,
2126            //         host_user_id: UserId(host_user_id),
2127            //         host_connection_id: ConnectionId(host_connection_id as u32),
2128            //         connection_ids,
2129            //     },
2130            // )
2131            // .await
2132        })
2133        .await
2134    }
2135
2136    pub async fn project_collaborators(
2137        &self,
2138        project_id: ProjectId,
2139        connection_id: ConnectionId,
2140    ) -> Result<Vec<project_collaborator::Model>> {
2141        self.transact(|tx| async move {
2142            todo!()
2143            // let collaborators = sqlx::query_as::<_, ProjectCollaborator>(
2144            //     "
2145            //     SELECT *
2146            //     FROM project_collaborators
2147            //     WHERE project_id = $1
2148            //     ",
2149            // )
2150            // .bind(project_id)
2151            // .fetch_all(&mut tx)
2152            // .await?;
2153
2154            // if collaborators
2155            //     .iter()
2156            //     .any(|collaborator| collaborator.connection_id == connection_id.0 as i32)
2157            // {
2158            //     Ok(collaborators)
2159            // } else {
2160            //     Err(anyhow!("no such project"))?
2161            // }
2162        })
2163        .await
2164    }
2165
2166    pub async fn project_connection_ids(
2167        &self,
2168        project_id: ProjectId,
2169        connection_id: ConnectionId,
2170    ) -> Result<HashSet<ConnectionId>> {
2171        self.transact(|tx| async move {
2172            todo!()
2173            // let connection_ids = sqlx::query_scalar::<_, i32>(
2174            //     "
2175            //     SELECT connection_id
2176            //     FROM project_collaborators
2177            //     WHERE project_id = $1
2178            //     ",
2179            // )
2180            // .bind(project_id)
2181            // .fetch_all(&mut tx)
2182            // .await?;
2183
2184            // if connection_ids.contains(&(connection_id.0 as i32)) {
2185            //     Ok(connection_ids
2186            //         .into_iter()
2187            //         .map(|connection_id| ConnectionId(connection_id as u32))
2188            //         .collect())
2189            // } else {
2190            //     Err(anyhow!("no such project"))?
2191            // }
2192        })
2193        .await
2194    }
2195
2196    // access tokens
2197
2198    pub async fn create_access_token_hash(
2199        &self,
2200        user_id: UserId,
2201        access_token_hash: &str,
2202        max_access_token_count: usize,
2203    ) -> Result<()> {
2204        self.transact(|tx| async {
2205            let tx = tx;
2206
2207            access_token::ActiveModel {
2208                user_id: ActiveValue::set(user_id),
2209                hash: ActiveValue::set(access_token_hash.into()),
2210                ..Default::default()
2211            }
2212            .insert(&tx)
2213            .await?;
2214
2215            access_token::Entity::delete_many()
2216                .filter(
2217                    access_token::Column::Id.in_subquery(
2218                        Query::select()
2219                            .column(access_token::Column::Id)
2220                            .from(access_token::Entity)
2221                            .and_where(access_token::Column::UserId.eq(user_id))
2222                            .order_by(access_token::Column::Id, sea_orm::Order::Desc)
2223                            .limit(10000)
2224                            .offset(max_access_token_count as u64)
2225                            .to_owned(),
2226                    ),
2227                )
2228                .exec(&tx)
2229                .await?;
2230            tx.commit().await?;
2231            Ok(())
2232        })
2233        .await
2234    }
2235
2236    pub async fn get_access_token_hashes(&self, user_id: UserId) -> Result<Vec<String>> {
2237        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2238        enum QueryAs {
2239            Hash,
2240        }
2241
2242        self.transact(|tx| async move {
2243            Ok(access_token::Entity::find()
2244                .select_only()
2245                .column(access_token::Column::Hash)
2246                .filter(access_token::Column::UserId.eq(user_id))
2247                .order_by_desc(access_token::Column::Id)
2248                .into_values::<_, QueryAs>()
2249                .all(&tx)
2250                .await?)
2251        })
2252        .await
2253    }
2254
2255    async fn transact<F, Fut, T>(&self, f: F) -> Result<T>
2256    where
2257        F: Send + Fn(DatabaseTransaction) -> Fut,
2258        Fut: Send + Future<Output = Result<T>>,
2259    {
2260        let body = async {
2261            loop {
2262                let tx = self.pool.begin().await?;
2263
2264                // In Postgres, serializable transactions are opt-in
2265                if let DatabaseBackend::Postgres = self.pool.get_database_backend() {
2266                    tx.execute(Statement::from_string(
2267                        DatabaseBackend::Postgres,
2268                        "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;".into(),
2269                    ))
2270                    .await?;
2271                }
2272
2273                match f(tx).await {
2274                    Ok(result) => return Ok(result),
2275                    Err(error) => match error {
2276                        Error::Database2(
2277                            DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
2278                            | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
2279                        ) if error
2280                            .as_database_error()
2281                            .and_then(|error| error.code())
2282                            .as_deref()
2283                            == Some("40001") =>
2284                        {
2285                            // Retry (don't break the loop)
2286                        }
2287                        error @ _ => return Err(error),
2288                    },
2289                }
2290            }
2291        };
2292
2293        #[cfg(test)]
2294        {
2295            if let Some(background) = self.background.as_ref() {
2296                background.simulate_random_delay().await;
2297            }
2298
2299            self.runtime.as_ref().unwrap().block_on(body)
2300        }
2301
2302        #[cfg(not(test))]
2303        {
2304            body.await
2305        }
2306    }
2307}
2308
2309pub struct RoomGuard<T> {
2310    data: T,
2311    _guard: OwnedMutexGuard<()>,
2312    _not_send: PhantomData<Rc<()>>,
2313}
2314
2315impl<T> Deref for RoomGuard<T> {
2316    type Target = T;
2317
2318    fn deref(&self) -> &T {
2319        &self.data
2320    }
2321}
2322
2323impl<T> DerefMut for RoomGuard<T> {
2324    fn deref_mut(&mut self) -> &mut T {
2325        &mut self.data
2326    }
2327}
2328
2329#[derive(Debug, Serialize, Deserialize)]
2330pub struct NewUserParams {
2331    pub github_login: String,
2332    pub github_user_id: i32,
2333    pub invite_count: i32,
2334}
2335
2336#[derive(Debug)]
2337pub struct NewUserResult {
2338    pub user_id: UserId,
2339    pub metrics_id: String,
2340    pub inviting_user_id: Option<UserId>,
2341    pub signup_device_id: Option<String>,
2342}
2343
2344fn random_invite_code() -> String {
2345    nanoid::nanoid!(16)
2346}
2347
2348fn random_email_confirmation_code() -> String {
2349    nanoid::nanoid!(64)
2350}
2351
2352macro_rules! id_type {
2353    ($name:ident) => {
2354        #[derive(
2355            Clone,
2356            Copy,
2357            Debug,
2358            Default,
2359            PartialEq,
2360            Eq,
2361            PartialOrd,
2362            Ord,
2363            Hash,
2364            sqlx::Type,
2365            Serialize,
2366            Deserialize,
2367        )]
2368        #[sqlx(transparent)]
2369        #[serde(transparent)]
2370        pub struct $name(pub i32);
2371
2372        impl $name {
2373            #[allow(unused)]
2374            pub const MAX: Self = Self(i32::MAX);
2375
2376            #[allow(unused)]
2377            pub fn from_proto(value: u64) -> Self {
2378                Self(value as i32)
2379            }
2380
2381            #[allow(unused)]
2382            pub fn to_proto(self) -> u64 {
2383                self.0 as u64
2384            }
2385        }
2386
2387        impl std::fmt::Display for $name {
2388            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2389                self.0.fmt(f)
2390            }
2391        }
2392
2393        impl From<$name> for sea_query::Value {
2394            fn from(value: $name) -> Self {
2395                sea_query::Value::Int(Some(value.0))
2396            }
2397        }
2398
2399        impl sea_orm::TryGetable for $name {
2400            fn try_get(
2401                res: &sea_orm::QueryResult,
2402                pre: &str,
2403                col: &str,
2404            ) -> Result<Self, sea_orm::TryGetError> {
2405                Ok(Self(i32::try_get(res, pre, col)?))
2406            }
2407        }
2408
2409        impl sea_query::ValueType for $name {
2410            fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
2411                match v {
2412                    Value::TinyInt(Some(int)) => {
2413                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2414                    }
2415                    Value::SmallInt(Some(int)) => {
2416                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2417                    }
2418                    Value::Int(Some(int)) => {
2419                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2420                    }
2421                    Value::BigInt(Some(int)) => {
2422                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2423                    }
2424                    Value::TinyUnsigned(Some(int)) => {
2425                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2426                    }
2427                    Value::SmallUnsigned(Some(int)) => {
2428                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2429                    }
2430                    Value::Unsigned(Some(int)) => {
2431                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2432                    }
2433                    Value::BigUnsigned(Some(int)) => {
2434                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2435                    }
2436                    _ => Err(sea_query::ValueTypeErr),
2437                }
2438            }
2439
2440            fn type_name() -> String {
2441                stringify!($name).into()
2442            }
2443
2444            fn array_type() -> sea_query::ArrayType {
2445                sea_query::ArrayType::Int
2446            }
2447
2448            fn column_type() -> sea_query::ColumnType {
2449                sea_query::ColumnType::Integer(None)
2450            }
2451        }
2452
2453        impl sea_orm::TryFromU64 for $name {
2454            fn try_from_u64(n: u64) -> Result<Self, DbErr> {
2455                Ok(Self(n.try_into().map_err(|_| {
2456                    DbErr::ConvertFromU64(concat!(
2457                        "error converting ",
2458                        stringify!($name),
2459                        " to u64"
2460                    ))
2461                })?))
2462            }
2463        }
2464
2465        impl sea_query::Nullable for $name {
2466            fn null() -> Value {
2467                Value::Int(None)
2468            }
2469        }
2470    };
2471}
2472
2473id_type!(AccessTokenId);
2474id_type!(ContactId);
2475id_type!(RoomId);
2476id_type!(RoomParticipantId);
2477id_type!(ProjectId);
2478id_type!(ProjectCollaboratorId);
2479id_type!(ReplicaId);
2480id_type!(SignupId);
2481id_type!(UserId);
2482id_type!(WorktreeId);
2483
2484pub struct LeftRoom {
2485    pub room: proto::Room,
2486    pub left_projects: HashMap<ProjectId, LeftProject>,
2487    pub canceled_calls_to_user_ids: Vec<UserId>,
2488}
2489
2490pub struct Project {
2491    pub collaborators: Vec<project_collaborator::Model>,
2492    pub worktrees: BTreeMap<WorktreeId, Worktree>,
2493    pub language_servers: Vec<proto::LanguageServer>,
2494}
2495
2496pub struct LeftProject {
2497    pub id: ProjectId,
2498    pub host_user_id: UserId,
2499    pub host_connection_id: ConnectionId,
2500    pub connection_ids: Vec<ConnectionId>,
2501}
2502
2503pub struct Worktree {
2504    pub id: WorktreeId,
2505    pub abs_path: String,
2506    pub root_name: String,
2507    pub visible: bool,
2508    pub entries: Vec<proto::Entry>,
2509    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
2510    pub scan_id: u64,
2511    pub is_complete: bool,
2512}
2513
2514#[cfg(test)]
2515pub use test::*;
2516
2517#[cfg(test)]
2518mod test {
2519    use super::*;
2520    use gpui::executor::Background;
2521    use lazy_static::lazy_static;
2522    use parking_lot::Mutex;
2523    use rand::prelude::*;
2524    use sea_orm::ConnectionTrait;
2525    use sqlx::migrate::MigrateDatabase;
2526    use std::sync::Arc;
2527
2528    pub struct TestDb {
2529        pub db: Option<Arc<Database>>,
2530        pub connection: Option<sqlx::AnyConnection>,
2531    }
2532
2533    impl TestDb {
2534        pub fn sqlite(background: Arc<Background>) -> Self {
2535            let url = format!("sqlite::memory:");
2536            let runtime = tokio::runtime::Builder::new_current_thread()
2537                .enable_io()
2538                .enable_time()
2539                .build()
2540                .unwrap();
2541
2542            let mut db = runtime.block_on(async {
2543                let mut options = ConnectOptions::new(url);
2544                options.max_connections(5);
2545                let db = Database::new(options).await.unwrap();
2546                let sql = include_str!(concat!(
2547                    env!("CARGO_MANIFEST_DIR"),
2548                    "/migrations.sqlite/20221109000000_test_schema.sql"
2549                ));
2550                db.pool
2551                    .execute(sea_orm::Statement::from_string(
2552                        db.pool.get_database_backend(),
2553                        sql.into(),
2554                    ))
2555                    .await
2556                    .unwrap();
2557                db
2558            });
2559
2560            db.background = Some(background);
2561            db.runtime = Some(runtime);
2562
2563            Self {
2564                db: Some(Arc::new(db)),
2565                connection: None,
2566            }
2567        }
2568
2569        pub fn postgres(background: Arc<Background>) -> Self {
2570            lazy_static! {
2571                static ref LOCK: Mutex<()> = Mutex::new(());
2572            }
2573
2574            let _guard = LOCK.lock();
2575            let mut rng = StdRng::from_entropy();
2576            let url = format!(
2577                "postgres://postgres@localhost/zed-test-{}",
2578                rng.gen::<u128>()
2579            );
2580            let runtime = tokio::runtime::Builder::new_current_thread()
2581                .enable_io()
2582                .enable_time()
2583                .build()
2584                .unwrap();
2585
2586            let mut db = runtime.block_on(async {
2587                sqlx::Postgres::create_database(&url)
2588                    .await
2589                    .expect("failed to create test db");
2590                let mut options = ConnectOptions::new(url);
2591                options
2592                    .max_connections(5)
2593                    .idle_timeout(Duration::from_secs(0));
2594                let db = Database::new(options).await.unwrap();
2595                let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
2596                db.migrate(Path::new(migrations_path), false).await.unwrap();
2597                db
2598            });
2599
2600            db.background = Some(background);
2601            db.runtime = Some(runtime);
2602
2603            Self {
2604                db: Some(Arc::new(db)),
2605                connection: None,
2606            }
2607        }
2608
2609        pub fn db(&self) -> &Arc<Database> {
2610            self.db.as_ref().unwrap()
2611        }
2612    }
2613
2614    impl Drop for TestDb {
2615        fn drop(&mut self) {
2616            let db = self.db.take().unwrap();
2617            if let DatabaseBackend::Postgres = db.pool.get_database_backend() {
2618                db.runtime.as_ref().unwrap().block_on(async {
2619                    use util::ResultExt;
2620                    let query = "
2621                        SELECT pg_terminate_backend(pg_stat_activity.pid)
2622                        FROM pg_stat_activity
2623                        WHERE
2624                            pg_stat_activity.datname = current_database() AND
2625                            pid <> pg_backend_pid();
2626                    ";
2627                    db.pool
2628                        .execute(sea_orm::Statement::from_string(
2629                            db.pool.get_database_backend(),
2630                            query.into(),
2631                        ))
2632                        .await
2633                        .log_err();
2634                    sqlx::Postgres::drop_database(db.options.get_url())
2635                        .await
2636                        .log_err();
2637                })
2638            }
2639        }
2640    }
2641}