db.rs

   1mod access_token;
   2mod contact;
   3mod project;
   4mod project_collaborator;
   5mod room;
   6mod room_participant;
   7mod signup;
   8#[cfg(test)]
   9mod tests;
  10mod user;
  11mod worktree;
  12
  13use crate::{Error, Result};
  14use anyhow::anyhow;
  15use collections::{BTreeMap, HashMap, HashSet};
  16pub use contact::Contact;
  17use dashmap::DashMap;
  18use futures::StreamExt;
  19use hyper::StatusCode;
  20use rpc::{proto, ConnectionId};
  21pub use sea_orm::ConnectOptions;
  22use sea_orm::{
  23    entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseBackend, DatabaseConnection,
  24    DatabaseTransaction, DbErr, FromQueryResult, IntoActiveModel, JoinType, QueryOrder,
  25    QuerySelect, Statement, TransactionTrait,
  26};
  27use sea_query::{Alias, Expr, OnConflict, Query};
  28use serde::{Deserialize, Serialize};
  29pub use signup::{Invite, NewSignup, WaitlistSummary};
  30use sqlx::migrate::{Migrate, Migration, MigrationSource};
  31use sqlx::Connection;
  32use std::ops::{Deref, DerefMut};
  33use std::path::Path;
  34use std::time::Duration;
  35use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
  36use tokio::sync::{Mutex, OwnedMutexGuard};
  37pub use user::Model as User;
  38
  39pub struct Database {
  40    options: ConnectOptions,
  41    pool: DatabaseConnection,
  42    rooms: DashMap<RoomId, Arc<Mutex<()>>>,
  43    #[cfg(test)]
  44    background: Option<std::sync::Arc<gpui::executor::Background>>,
  45    #[cfg(test)]
  46    runtime: Option<tokio::runtime::Runtime>,
  47}
  48
  49impl Database {
  50    pub async fn new(options: ConnectOptions) -> Result<Self> {
  51        Ok(Self {
  52            options: options.clone(),
  53            pool: sea_orm::Database::connect(options).await?,
  54            rooms: DashMap::with_capacity(16384),
  55            #[cfg(test)]
  56            background: None,
  57            #[cfg(test)]
  58            runtime: None,
  59        })
  60    }
  61
  62    pub async fn migrate(
  63        &self,
  64        migrations_path: &Path,
  65        ignore_checksum_mismatch: bool,
  66    ) -> anyhow::Result<Vec<(Migration, Duration)>> {
  67        let migrations = MigrationSource::resolve(migrations_path)
  68            .await
  69            .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
  70
  71        let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
  72
  73        connection.ensure_migrations_table().await?;
  74        let applied_migrations: HashMap<_, _> = connection
  75            .list_applied_migrations()
  76            .await?
  77            .into_iter()
  78            .map(|m| (m.version, m))
  79            .collect();
  80
  81        let mut new_migrations = Vec::new();
  82        for migration in migrations {
  83            match applied_migrations.get(&migration.version) {
  84                Some(applied_migration) => {
  85                    if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
  86                    {
  87                        Err(anyhow!(
  88                            "checksum mismatch for applied migration {}",
  89                            migration.description
  90                        ))?;
  91                    }
  92                }
  93                None => {
  94                    let elapsed = connection.apply(&migration).await?;
  95                    new_migrations.push((migration, elapsed));
  96                }
  97            }
  98        }
  99
 100        Ok(new_migrations)
 101    }
 102
 103    // users
 104
 105    pub async fn create_user(
 106        &self,
 107        email_address: &str,
 108        admin: bool,
 109        params: NewUserParams,
 110    ) -> Result<NewUserResult> {
 111        self.transact(|tx| async {
 112            let user = user::Entity::insert(user::ActiveModel {
 113                email_address: ActiveValue::set(Some(email_address.into())),
 114                github_login: ActiveValue::set(params.github_login.clone()),
 115                github_user_id: ActiveValue::set(Some(params.github_user_id)),
 116                admin: ActiveValue::set(admin),
 117                metrics_id: ActiveValue::set(Uuid::new_v4()),
 118                ..Default::default()
 119            })
 120            .on_conflict(
 121                OnConflict::column(user::Column::GithubLogin)
 122                    .update_column(user::Column::GithubLogin)
 123                    .to_owned(),
 124            )
 125            .exec_with_returning(&tx)
 126            .await?;
 127
 128            tx.commit().await?;
 129
 130            Ok(NewUserResult {
 131                user_id: user.id,
 132                metrics_id: user.metrics_id.to_string(),
 133                signup_device_id: None,
 134                inviting_user_id: None,
 135            })
 136        })
 137        .await
 138    }
 139
 140    pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
 141        self.transact(|tx| async move { Ok(user::Entity::find_by_id(id).one(&tx).await?) })
 142            .await
 143    }
 144
 145    pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
 146        self.transact(|tx| async {
 147            let tx = tx;
 148            Ok(user::Entity::find()
 149                .filter(user::Column::Id.is_in(ids.iter().copied()))
 150                .all(&tx)
 151                .await?)
 152        })
 153        .await
 154    }
 155
 156    pub async fn get_user_by_github_account(
 157        &self,
 158        github_login: &str,
 159        github_user_id: Option<i32>,
 160    ) -> Result<Option<User>> {
 161        self.transact(|tx| async {
 162            let tx = tx;
 163            if let Some(github_user_id) = github_user_id {
 164                if let Some(user_by_github_user_id) = user::Entity::find()
 165                    .filter(user::Column::GithubUserId.eq(github_user_id))
 166                    .one(&tx)
 167                    .await?
 168                {
 169                    let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
 170                    user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
 171                    Ok(Some(user_by_github_user_id.update(&tx).await?))
 172                } else if let Some(user_by_github_login) = user::Entity::find()
 173                    .filter(user::Column::GithubLogin.eq(github_login))
 174                    .one(&tx)
 175                    .await?
 176                {
 177                    let mut user_by_github_login = user_by_github_login.into_active_model();
 178                    user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
 179                    Ok(Some(user_by_github_login.update(&tx).await?))
 180                } else {
 181                    Ok(None)
 182                }
 183            } else {
 184                Ok(user::Entity::find()
 185                    .filter(user::Column::GithubLogin.eq(github_login))
 186                    .one(&tx)
 187                    .await?)
 188            }
 189        })
 190        .await
 191    }
 192
 193    pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
 194        self.transact(|tx| async move {
 195            Ok(user::Entity::find()
 196                .order_by_asc(user::Column::GithubLogin)
 197                .limit(limit as u64)
 198                .offset(page as u64 * limit as u64)
 199                .all(&tx)
 200                .await?)
 201        })
 202        .await
 203    }
 204
 205    pub async fn get_users_with_no_invites(
 206        &self,
 207        invited_by_another_user: bool,
 208    ) -> Result<Vec<User>> {
 209        self.transact(|tx| async move {
 210            Ok(user::Entity::find()
 211                .filter(
 212                    user::Column::InviteCount
 213                        .eq(0)
 214                        .and(if invited_by_another_user {
 215                            user::Column::InviterId.is_not_null()
 216                        } else {
 217                            user::Column::InviterId.is_null()
 218                        }),
 219                )
 220                .all(&tx)
 221                .await?)
 222        })
 223        .await
 224    }
 225
 226    pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
 227        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 228        enum QueryAs {
 229            MetricsId,
 230        }
 231
 232        self.transact(|tx| async move {
 233            let metrics_id: Uuid = user::Entity::find_by_id(id)
 234                .select_only()
 235                .column(user::Column::MetricsId)
 236                .into_values::<_, QueryAs>()
 237                .one(&tx)
 238                .await?
 239                .ok_or_else(|| anyhow!("could not find user"))?;
 240            Ok(metrics_id.to_string())
 241        })
 242        .await
 243    }
 244
 245    pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
 246        self.transact(|tx| async move {
 247            user::Entity::update_many()
 248                .filter(user::Column::Id.eq(id))
 249                .col_expr(user::Column::Admin, is_admin.into())
 250                .exec(&tx)
 251                .await?;
 252            tx.commit().await?;
 253            Ok(())
 254        })
 255        .await
 256    }
 257
 258    pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
 259        self.transact(|tx| async move {
 260            user::Entity::update_many()
 261                .filter(user::Column::Id.eq(id))
 262                .col_expr(user::Column::ConnectedOnce, connected_once.into())
 263                .exec(&tx)
 264                .await?;
 265            tx.commit().await?;
 266            Ok(())
 267        })
 268        .await
 269    }
 270
 271    pub async fn destroy_user(&self, id: UserId) -> Result<()> {
 272        self.transact(|tx| async move {
 273            access_token::Entity::delete_many()
 274                .filter(access_token::Column::UserId.eq(id))
 275                .exec(&tx)
 276                .await?;
 277            user::Entity::delete_by_id(id).exec(&tx).await?;
 278            tx.commit().await?;
 279            Ok(())
 280        })
 281        .await
 282    }
 283
 284    // contacts
 285
 286    pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
 287        #[derive(Debug, FromQueryResult)]
 288        struct ContactWithUserBusyStatuses {
 289            user_id_a: UserId,
 290            user_id_b: UserId,
 291            a_to_b: bool,
 292            accepted: bool,
 293            should_notify: bool,
 294            user_a_busy: bool,
 295            user_b_busy: bool,
 296        }
 297
 298        self.transact(|tx| async move {
 299            let user_a_participant = Alias::new("user_a_participant");
 300            let user_b_participant = Alias::new("user_b_participant");
 301            let mut db_contacts = contact::Entity::find()
 302                .column_as(
 303                    Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
 304                        .is_not_null(),
 305                    "user_a_busy",
 306                )
 307                .column_as(
 308                    Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
 309                        .is_not_null(),
 310                    "user_b_busy",
 311                )
 312                .filter(
 313                    contact::Column::UserIdA
 314                        .eq(user_id)
 315                        .or(contact::Column::UserIdB.eq(user_id)),
 316                )
 317                .join_as(
 318                    JoinType::LeftJoin,
 319                    contact::Relation::UserARoomParticipant.def(),
 320                    user_a_participant,
 321                )
 322                .join_as(
 323                    JoinType::LeftJoin,
 324                    contact::Relation::UserBRoomParticipant.def(),
 325                    user_b_participant,
 326                )
 327                .into_model::<ContactWithUserBusyStatuses>()
 328                .stream(&tx)
 329                .await?;
 330
 331            let mut contacts = Vec::new();
 332            while let Some(db_contact) = db_contacts.next().await {
 333                let db_contact = db_contact?;
 334                if db_contact.user_id_a == user_id {
 335                    if db_contact.accepted {
 336                        contacts.push(Contact::Accepted {
 337                            user_id: db_contact.user_id_b,
 338                            should_notify: db_contact.should_notify && db_contact.a_to_b,
 339                            busy: db_contact.user_b_busy,
 340                        });
 341                    } else if db_contact.a_to_b {
 342                        contacts.push(Contact::Outgoing {
 343                            user_id: db_contact.user_id_b,
 344                        })
 345                    } else {
 346                        contacts.push(Contact::Incoming {
 347                            user_id: db_contact.user_id_b,
 348                            should_notify: db_contact.should_notify,
 349                        });
 350                    }
 351                } else if db_contact.accepted {
 352                    contacts.push(Contact::Accepted {
 353                        user_id: db_contact.user_id_a,
 354                        should_notify: db_contact.should_notify && !db_contact.a_to_b,
 355                        busy: db_contact.user_a_busy,
 356                    });
 357                } else if db_contact.a_to_b {
 358                    contacts.push(Contact::Incoming {
 359                        user_id: db_contact.user_id_a,
 360                        should_notify: db_contact.should_notify,
 361                    });
 362                } else {
 363                    contacts.push(Contact::Outgoing {
 364                        user_id: db_contact.user_id_a,
 365                    });
 366                }
 367            }
 368
 369            contacts.sort_unstable_by_key(|contact| contact.user_id());
 370
 371            Ok(contacts)
 372        })
 373        .await
 374    }
 375
 376    pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
 377        self.transact(|tx| async move {
 378            let participant = room_participant::Entity::find()
 379                .filter(room_participant::Column::UserId.eq(user_id))
 380                .one(&tx)
 381                .await?;
 382            Ok(participant.is_some())
 383        })
 384        .await
 385    }
 386
 387    pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
 388        self.transact(|tx| async move {
 389            let (id_a, id_b) = if user_id_1 < user_id_2 {
 390                (user_id_1, user_id_2)
 391            } else {
 392                (user_id_2, user_id_1)
 393            };
 394
 395            Ok(contact::Entity::find()
 396                .filter(
 397                    contact::Column::UserIdA
 398                        .eq(id_a)
 399                        .and(contact::Column::UserIdB.eq(id_b))
 400                        .and(contact::Column::Accepted.eq(true)),
 401                )
 402                .one(&tx)
 403                .await?
 404                .is_some())
 405        })
 406        .await
 407    }
 408
 409    pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
 410        self.transact(|tx| async move {
 411            let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
 412                (sender_id, receiver_id, true)
 413            } else {
 414                (receiver_id, sender_id, false)
 415            };
 416
 417            let rows_affected = contact::Entity::insert(contact::ActiveModel {
 418                user_id_a: ActiveValue::set(id_a),
 419                user_id_b: ActiveValue::set(id_b),
 420                a_to_b: ActiveValue::set(a_to_b),
 421                accepted: ActiveValue::set(false),
 422                should_notify: ActiveValue::set(true),
 423                ..Default::default()
 424            })
 425            .on_conflict(
 426                OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
 427                    .values([
 428                        (contact::Column::Accepted, true.into()),
 429                        (contact::Column::ShouldNotify, false.into()),
 430                    ])
 431                    .action_and_where(
 432                        contact::Column::Accepted.eq(false).and(
 433                            contact::Column::AToB
 434                                .eq(a_to_b)
 435                                .and(contact::Column::UserIdA.eq(id_b))
 436                                .or(contact::Column::AToB
 437                                    .ne(a_to_b)
 438                                    .and(contact::Column::UserIdA.eq(id_a))),
 439                        ),
 440                    )
 441                    .to_owned(),
 442            )
 443            .exec_without_returning(&tx)
 444            .await?;
 445
 446            if rows_affected == 1 {
 447                tx.commit().await?;
 448                Ok(())
 449            } else {
 450                Err(anyhow!("contact already requested"))?
 451            }
 452        })
 453        .await
 454    }
 455
 456    pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<()> {
 457        self.transact(|tx| async move {
 458            let (id_a, id_b) = if responder_id < requester_id {
 459                (responder_id, requester_id)
 460            } else {
 461                (requester_id, responder_id)
 462            };
 463
 464            let result = contact::Entity::delete_many()
 465                .filter(
 466                    contact::Column::UserIdA
 467                        .eq(id_a)
 468                        .and(contact::Column::UserIdB.eq(id_b)),
 469                )
 470                .exec(&tx)
 471                .await?;
 472
 473            if result.rows_affected == 1 {
 474                tx.commit().await?;
 475                Ok(())
 476            } else {
 477                Err(anyhow!("no such contact"))?
 478            }
 479        })
 480        .await
 481    }
 482
 483    pub async fn dismiss_contact_notification(
 484        &self,
 485        user_id: UserId,
 486        contact_user_id: UserId,
 487    ) -> Result<()> {
 488        self.transact(|tx| async move {
 489            let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
 490                (user_id, contact_user_id, true)
 491            } else {
 492                (contact_user_id, user_id, false)
 493            };
 494
 495            let result = contact::Entity::update_many()
 496                .set(contact::ActiveModel {
 497                    should_notify: ActiveValue::set(false),
 498                    ..Default::default()
 499                })
 500                .filter(
 501                    contact::Column::UserIdA
 502                        .eq(id_a)
 503                        .and(contact::Column::UserIdB.eq(id_b))
 504                        .and(
 505                            contact::Column::AToB
 506                                .eq(a_to_b)
 507                                .and(contact::Column::Accepted.eq(true))
 508                                .or(contact::Column::AToB
 509                                    .ne(a_to_b)
 510                                    .and(contact::Column::Accepted.eq(false))),
 511                        ),
 512                )
 513                .exec(&tx)
 514                .await?;
 515            if result.rows_affected == 0 {
 516                Err(anyhow!("no such contact request"))?
 517            } else {
 518                tx.commit().await?;
 519                Ok(())
 520            }
 521        })
 522        .await
 523    }
 524
 525    pub async fn respond_to_contact_request(
 526        &self,
 527        responder_id: UserId,
 528        requester_id: UserId,
 529        accept: bool,
 530    ) -> Result<()> {
 531        self.transact(|tx| async move {
 532            let (id_a, id_b, a_to_b) = if responder_id < requester_id {
 533                (responder_id, requester_id, false)
 534            } else {
 535                (requester_id, responder_id, true)
 536            };
 537            let rows_affected = if accept {
 538                let result = contact::Entity::update_many()
 539                    .set(contact::ActiveModel {
 540                        accepted: ActiveValue::set(true),
 541                        should_notify: ActiveValue::set(true),
 542                        ..Default::default()
 543                    })
 544                    .filter(
 545                        contact::Column::UserIdA
 546                            .eq(id_a)
 547                            .and(contact::Column::UserIdB.eq(id_b))
 548                            .and(contact::Column::AToB.eq(a_to_b)),
 549                    )
 550                    .exec(&tx)
 551                    .await?;
 552                result.rows_affected
 553            } else {
 554                let result = contact::Entity::delete_many()
 555                    .filter(
 556                        contact::Column::UserIdA
 557                            .eq(id_a)
 558                            .and(contact::Column::UserIdB.eq(id_b))
 559                            .and(contact::Column::AToB.eq(a_to_b))
 560                            .and(contact::Column::Accepted.eq(false)),
 561                    )
 562                    .exec(&tx)
 563                    .await?;
 564
 565                result.rows_affected
 566            };
 567
 568            if rows_affected == 1 {
 569                tx.commit().await?;
 570                Ok(())
 571            } else {
 572                Err(anyhow!("no such contact request"))?
 573            }
 574        })
 575        .await
 576    }
 577
 578    pub fn fuzzy_like_string(string: &str) -> String {
 579        let mut result = String::with_capacity(string.len() * 2 + 1);
 580        for c in string.chars() {
 581            if c.is_alphanumeric() {
 582                result.push('%');
 583                result.push(c);
 584            }
 585        }
 586        result.push('%');
 587        result
 588    }
 589
 590    pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
 591        self.transact(|tx| async {
 592            let tx = tx;
 593            let like_string = Self::fuzzy_like_string(name_query);
 594            let query = "
 595                SELECT users.*
 596                FROM users
 597                WHERE github_login ILIKE $1
 598                ORDER BY github_login <-> $2
 599                LIMIT $3
 600            ";
 601
 602            Ok(user::Entity::find()
 603                .from_raw_sql(Statement::from_sql_and_values(
 604                    self.pool.get_database_backend(),
 605                    query.into(),
 606                    vec![like_string.into(), name_query.into(), limit.into()],
 607                ))
 608                .all(&tx)
 609                .await?)
 610        })
 611        .await
 612    }
 613
 614    // signups
 615
 616    pub async fn create_signup(&self, signup: NewSignup) -> Result<()> {
 617        self.transact(|tx| async {
 618            signup::ActiveModel {
 619                email_address: ActiveValue::set(signup.email_address.clone()),
 620                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 621                email_confirmation_sent: ActiveValue::set(false),
 622                platform_mac: ActiveValue::set(signup.platform_mac),
 623                platform_windows: ActiveValue::set(signup.platform_windows),
 624                platform_linux: ActiveValue::set(signup.platform_linux),
 625                platform_unknown: ActiveValue::set(false),
 626                editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
 627                programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
 628                device_id: ActiveValue::set(signup.device_id.clone()),
 629                ..Default::default()
 630            }
 631            .insert(&tx)
 632            .await?;
 633            tx.commit().await?;
 634            Ok(())
 635        })
 636        .await
 637    }
 638
 639    pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
 640        self.transact(|tx| async move {
 641            let query = "
 642                SELECT
 643                    COUNT(*) as count,
 644                    COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
 645                    COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
 646                    COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
 647                    COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
 648                FROM (
 649                    SELECT *
 650                    FROM signups
 651                    WHERE
 652                        NOT email_confirmation_sent
 653                ) AS unsent
 654            ";
 655            Ok(
 656                WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
 657                    self.pool.get_database_backend(),
 658                    query.into(),
 659                    vec![],
 660                ))
 661                .one(&tx)
 662                .await?
 663                .ok_or_else(|| anyhow!("invalid result"))?,
 664            )
 665        })
 666        .await
 667    }
 668
 669    pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
 670        let emails = invites
 671            .iter()
 672            .map(|s| s.email_address.as_str())
 673            .collect::<Vec<_>>();
 674        self.transact(|tx| async {
 675            signup::Entity::update_many()
 676                .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
 677                .col_expr(signup::Column::EmailConfirmationSent, true.into())
 678                .exec(&tx)
 679                .await?;
 680            tx.commit().await?;
 681            Ok(())
 682        })
 683        .await
 684    }
 685
 686    pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
 687        self.transact(|tx| async move {
 688            Ok(signup::Entity::find()
 689                .select_only()
 690                .column(signup::Column::EmailAddress)
 691                .column(signup::Column::EmailConfirmationCode)
 692                .filter(
 693                    signup::Column::EmailConfirmationSent.eq(false).and(
 694                        signup::Column::PlatformMac
 695                            .eq(true)
 696                            .or(signup::Column::PlatformUnknown.eq(true)),
 697                    ),
 698                )
 699                .limit(count as u64)
 700                .into_model()
 701                .all(&tx)
 702                .await?)
 703        })
 704        .await
 705    }
 706
 707    // invite codes
 708
 709    pub async fn create_invite_from_code(
 710        &self,
 711        code: &str,
 712        email_address: &str,
 713        device_id: Option<&str>,
 714    ) -> Result<Invite> {
 715        self.transact(|tx| async move {
 716            let existing_user = user::Entity::find()
 717                .filter(user::Column::EmailAddress.eq(email_address))
 718                .one(&tx)
 719                .await?;
 720
 721            if existing_user.is_some() {
 722                Err(anyhow!("email address is already in use"))?;
 723            }
 724
 725            let inviter = match user::Entity::find()
 726                .filter(user::Column::InviteCode.eq(code))
 727                .one(&tx)
 728                .await?
 729            {
 730                Some(inviter) => inviter,
 731                None => {
 732                    return Err(Error::Http(
 733                        StatusCode::NOT_FOUND,
 734                        "invite code not found".to_string(),
 735                    ))?
 736                }
 737            };
 738
 739            if inviter.invite_count == 0 {
 740                Err(Error::Http(
 741                    StatusCode::UNAUTHORIZED,
 742                    "no invites remaining".to_string(),
 743                ))?;
 744            }
 745
 746            let signup = signup::Entity::insert(signup::ActiveModel {
 747                email_address: ActiveValue::set(email_address.into()),
 748                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 749                email_confirmation_sent: ActiveValue::set(false),
 750                inviting_user_id: ActiveValue::set(Some(inviter.id)),
 751                platform_linux: ActiveValue::set(false),
 752                platform_mac: ActiveValue::set(false),
 753                platform_windows: ActiveValue::set(false),
 754                platform_unknown: ActiveValue::set(true),
 755                device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
 756                ..Default::default()
 757            })
 758            .on_conflict(
 759                OnConflict::column(signup::Column::EmailAddress)
 760                    .update_column(signup::Column::InvitingUserId)
 761                    .to_owned(),
 762            )
 763            .exec_with_returning(&tx)
 764            .await?;
 765            tx.commit().await?;
 766
 767            Ok(Invite {
 768                email_address: signup.email_address,
 769                email_confirmation_code: signup.email_confirmation_code,
 770            })
 771        })
 772        .await
 773    }
 774
 775    pub async fn create_user_from_invite(
 776        &self,
 777        invite: &Invite,
 778        user: NewUserParams,
 779    ) -> Result<Option<NewUserResult>> {
 780        self.transact(|tx| async {
 781            let tx = tx;
 782            let signup = signup::Entity::find()
 783                .filter(
 784                    signup::Column::EmailAddress
 785                        .eq(invite.email_address.as_str())
 786                        .and(
 787                            signup::Column::EmailConfirmationCode
 788                                .eq(invite.email_confirmation_code.as_str()),
 789                        ),
 790                )
 791                .one(&tx)
 792                .await?
 793                .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
 794
 795            if signup.user_id.is_some() {
 796                return Ok(None);
 797            }
 798
 799            let user = user::Entity::insert(user::ActiveModel {
 800                email_address: ActiveValue::set(Some(invite.email_address.clone())),
 801                github_login: ActiveValue::set(user.github_login.clone()),
 802                github_user_id: ActiveValue::set(Some(user.github_user_id)),
 803                admin: ActiveValue::set(false),
 804                invite_count: ActiveValue::set(user.invite_count),
 805                invite_code: ActiveValue::set(Some(random_invite_code())),
 806                metrics_id: ActiveValue::set(Uuid::new_v4()),
 807                ..Default::default()
 808            })
 809            .on_conflict(
 810                OnConflict::column(user::Column::GithubLogin)
 811                    .update_columns([
 812                        user::Column::EmailAddress,
 813                        user::Column::GithubUserId,
 814                        user::Column::Admin,
 815                    ])
 816                    .to_owned(),
 817            )
 818            .exec_with_returning(&tx)
 819            .await?;
 820
 821            let mut signup = signup.into_active_model();
 822            signup.user_id = ActiveValue::set(Some(user.id));
 823            let signup = signup.update(&tx).await?;
 824
 825            if let Some(inviting_user_id) = signup.inviting_user_id {
 826                let result = user::Entity::update_many()
 827                    .filter(
 828                        user::Column::Id
 829                            .eq(inviting_user_id)
 830                            .and(user::Column::InviteCount.gt(0)),
 831                    )
 832                    .col_expr(
 833                        user::Column::InviteCount,
 834                        Expr::col(user::Column::InviteCount).sub(1),
 835                    )
 836                    .exec(&tx)
 837                    .await?;
 838
 839                if result.rows_affected == 0 {
 840                    Err(Error::Http(
 841                        StatusCode::UNAUTHORIZED,
 842                        "no invites remaining".to_string(),
 843                    ))?;
 844                }
 845
 846                contact::Entity::insert(contact::ActiveModel {
 847                    user_id_a: ActiveValue::set(inviting_user_id),
 848                    user_id_b: ActiveValue::set(user.id),
 849                    a_to_b: ActiveValue::set(true),
 850                    should_notify: ActiveValue::set(true),
 851                    accepted: ActiveValue::set(true),
 852                    ..Default::default()
 853                })
 854                .on_conflict(OnConflict::new().do_nothing().to_owned())
 855                .exec_without_returning(&tx)
 856                .await?;
 857            }
 858
 859            tx.commit().await?;
 860            Ok(Some(NewUserResult {
 861                user_id: user.id,
 862                metrics_id: user.metrics_id.to_string(),
 863                inviting_user_id: signup.inviting_user_id,
 864                signup_device_id: signup.device_id,
 865            }))
 866        })
 867        .await
 868    }
 869
 870    pub async fn set_invite_count_for_user(&self, id: UserId, count: u32) -> Result<()> {
 871        self.transact(|tx| async move {
 872            if count > 0 {
 873                user::Entity::update_many()
 874                    .filter(
 875                        user::Column::Id
 876                            .eq(id)
 877                            .and(user::Column::InviteCode.is_null()),
 878                    )
 879                    .col_expr(user::Column::InviteCode, random_invite_code().into())
 880                    .exec(&tx)
 881                    .await?;
 882            }
 883
 884            user::Entity::update_many()
 885                .filter(user::Column::Id.eq(id))
 886                .col_expr(user::Column::InviteCount, count.into())
 887                .exec(&tx)
 888                .await?;
 889            tx.commit().await?;
 890            Ok(())
 891        })
 892        .await
 893    }
 894
 895    pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, u32)>> {
 896        self.transact(|tx| async move {
 897            match user::Entity::find_by_id(id).one(&tx).await? {
 898                Some(user) if user.invite_code.is_some() => {
 899                    Ok(Some((user.invite_code.unwrap(), user.invite_count as u32)))
 900                }
 901                _ => Ok(None),
 902            }
 903        })
 904        .await
 905    }
 906
 907    pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
 908        self.transact(|tx| async move {
 909            user::Entity::find()
 910                .filter(user::Column::InviteCode.eq(code))
 911                .one(&tx)
 912                .await?
 913                .ok_or_else(|| {
 914                    Error::Http(
 915                        StatusCode::NOT_FOUND,
 916                        "that invite code does not exist".to_string(),
 917                    )
 918                })
 919        })
 920        .await
 921    }
 922
 923    // rooms
 924
 925    pub async fn incoming_call_for_user(
 926        &self,
 927        user_id: UserId,
 928    ) -> Result<Option<proto::IncomingCall>> {
 929        self.transact(|tx| async move {
 930            let pending_participant = room_participant::Entity::find()
 931                .filter(
 932                    room_participant::Column::UserId
 933                        .eq(user_id)
 934                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
 935                )
 936                .one(&tx)
 937                .await?;
 938
 939            if let Some(pending_participant) = pending_participant {
 940                let room = self.get_room(pending_participant.room_id, &tx).await?;
 941                Ok(Self::build_incoming_call(&room, user_id))
 942            } else {
 943                Ok(None)
 944            }
 945        })
 946        .await
 947    }
 948
 949    pub async fn create_room(
 950        &self,
 951        user_id: UserId,
 952        connection_id: ConnectionId,
 953        live_kit_room: &str,
 954    ) -> Result<RoomGuard<proto::Room>> {
 955        self.transact(|tx| async move {
 956            let room = room::ActiveModel {
 957                live_kit_room: ActiveValue::set(live_kit_room.into()),
 958                ..Default::default()
 959            }
 960            .insert(&tx)
 961            .await?;
 962            let room_id = room.id;
 963
 964            room_participant::ActiveModel {
 965                room_id: ActiveValue::set(room_id),
 966                user_id: ActiveValue::set(user_id),
 967                answering_connection_id: ActiveValue::set(Some(connection_id.0 as i32)),
 968                calling_user_id: ActiveValue::set(user_id),
 969                calling_connection_id: ActiveValue::set(connection_id.0 as i32),
 970                ..Default::default()
 971            }
 972            .insert(&tx)
 973            .await?;
 974
 975            let room = self.get_room(room_id, &tx).await?;
 976            self.commit_room_transaction(room_id, tx, room).await
 977        })
 978        .await
 979    }
 980
 981    pub async fn call(
 982        &self,
 983        room_id: RoomId,
 984        calling_user_id: UserId,
 985        calling_connection_id: ConnectionId,
 986        called_user_id: UserId,
 987        initial_project_id: Option<ProjectId>,
 988    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
 989        self.transact(|tx| async move {
 990            room_participant::ActiveModel {
 991                room_id: ActiveValue::set(room_id),
 992                user_id: ActiveValue::set(called_user_id),
 993                calling_user_id: ActiveValue::set(calling_user_id),
 994                calling_connection_id: ActiveValue::set(calling_connection_id.0 as i32),
 995                initial_project_id: ActiveValue::set(initial_project_id),
 996                ..Default::default()
 997            }
 998            .insert(&tx)
 999            .await?;
1000
1001            let room = self.get_room(room_id, &tx).await?;
1002            let incoming_call = Self::build_incoming_call(&room, called_user_id)
1003                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1004            self.commit_room_transaction(room_id, tx, (room, incoming_call))
1005                .await
1006        })
1007        .await
1008    }
1009
1010    pub async fn call_failed(
1011        &self,
1012        room_id: RoomId,
1013        called_user_id: UserId,
1014    ) -> Result<RoomGuard<proto::Room>> {
1015        self.transact(|tx| async move {
1016            room_participant::Entity::delete_many()
1017                .filter(
1018                    room_participant::Column::RoomId
1019                        .eq(room_id)
1020                        .and(room_participant::Column::UserId.eq(called_user_id)),
1021                )
1022                .exec(&tx)
1023                .await?;
1024            let room = self.get_room(room_id, &tx).await?;
1025            self.commit_room_transaction(room_id, tx, room).await
1026        })
1027        .await
1028    }
1029
1030    pub async fn decline_call(
1031        &self,
1032        expected_room_id: Option<RoomId>,
1033        user_id: UserId,
1034    ) -> Result<RoomGuard<proto::Room>> {
1035        self.transact(|tx| async move {
1036            let participant = room_participant::Entity::find()
1037                .filter(
1038                    room_participant::Column::UserId
1039                        .eq(user_id)
1040                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1041                )
1042                .one(&tx)
1043                .await?
1044                .ok_or_else(|| anyhow!("could not decline call"))?;
1045            let room_id = participant.room_id;
1046
1047            if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) {
1048                return Err(anyhow!("declining call on unexpected room"))?;
1049            }
1050
1051            room_participant::Entity::delete(participant.into_active_model())
1052                .exec(&tx)
1053                .await?;
1054
1055            let room = self.get_room(room_id, &tx).await?;
1056            self.commit_room_transaction(room_id, tx, room).await
1057        })
1058        .await
1059    }
1060
1061    pub async fn cancel_call(
1062        &self,
1063        expected_room_id: Option<RoomId>,
1064        calling_connection_id: ConnectionId,
1065        called_user_id: UserId,
1066    ) -> Result<RoomGuard<proto::Room>> {
1067        self.transact(|tx| async move {
1068            let participant = room_participant::Entity::find()
1069                .filter(
1070                    room_participant::Column::UserId
1071                        .eq(called_user_id)
1072                        .and(
1073                            room_participant::Column::CallingConnectionId
1074                                .eq(calling_connection_id.0 as i32),
1075                        )
1076                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1077                )
1078                .one(&tx)
1079                .await?
1080                .ok_or_else(|| anyhow!("could not cancel call"))?;
1081            let room_id = participant.room_id;
1082            if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) {
1083                return Err(anyhow!("canceling call on unexpected room"))?;
1084            }
1085
1086            room_participant::Entity::delete(participant.into_active_model())
1087                .exec(&tx)
1088                .await?;
1089
1090            let room = self.get_room(room_id, &tx).await?;
1091            self.commit_room_transaction(room_id, tx, room).await
1092        })
1093        .await
1094    }
1095
1096    pub async fn join_room(
1097        &self,
1098        room_id: RoomId,
1099        user_id: UserId,
1100        connection_id: ConnectionId,
1101    ) -> Result<RoomGuard<proto::Room>> {
1102        self.transact(|tx| async move {
1103            let result = room_participant::Entity::update_many()
1104                .filter(
1105                    room_participant::Column::RoomId
1106                        .eq(room_id)
1107                        .and(room_participant::Column::UserId.eq(user_id))
1108                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1109                )
1110                .col_expr(
1111                    room_participant::Column::AnsweringConnectionId,
1112                    connection_id.0.into(),
1113                )
1114                .exec(&tx)
1115                .await?;
1116            if result.rows_affected == 0 {
1117                Err(anyhow!("room does not exist or was already joined"))?
1118            } else {
1119                let room = self.get_room(room_id, &tx).await?;
1120                self.commit_room_transaction(room_id, tx, room).await
1121            }
1122        })
1123        .await
1124    }
1125
1126    pub async fn leave_room(
1127        &self,
1128        connection_id: ConnectionId,
1129    ) -> Result<Option<RoomGuard<LeftRoom>>> {
1130        self.transact(|tx| async move {
1131            let leaving_participant = room_participant::Entity::find()
1132                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0))
1133                .one(&tx)
1134                .await?;
1135
1136            if let Some(leaving_participant) = leaving_participant {
1137                // Leave room.
1138                let room_id = leaving_participant.room_id;
1139                room_participant::Entity::delete_by_id(leaving_participant.id)
1140                    .exec(&tx)
1141                    .await?;
1142
1143                // Cancel pending calls initiated by the leaving user.
1144                let called_participants = room_participant::Entity::find()
1145                    .filter(
1146                        room_participant::Column::CallingConnectionId
1147                            .eq(connection_id.0)
1148                            .and(room_participant::Column::AnsweringConnectionId.is_null()),
1149                    )
1150                    .all(&tx)
1151                    .await?;
1152                room_participant::Entity::delete_many()
1153                    .filter(
1154                        room_participant::Column::Id
1155                            .is_in(called_participants.iter().map(|participant| participant.id)),
1156                    )
1157                    .exec(&tx)
1158                    .await?;
1159                let canceled_calls_to_user_ids = called_participants
1160                    .into_iter()
1161                    .map(|participant| participant.user_id)
1162                    .collect();
1163
1164                // Detect left projects.
1165                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1166                enum QueryProjectIds {
1167                    ProjectId,
1168                }
1169                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1170                    .select_only()
1171                    .column_as(
1172                        project_collaborator::Column::ProjectId,
1173                        QueryProjectIds::ProjectId,
1174                    )
1175                    .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0))
1176                    .into_values::<_, QueryProjectIds>()
1177                    .all(&tx)
1178                    .await?;
1179                let mut left_projects = HashMap::default();
1180                let mut collaborators = project_collaborator::Entity::find()
1181                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1182                    .stream(&tx)
1183                    .await?;
1184                while let Some(collaborator) = collaborators.next().await {
1185                    let collaborator = collaborator?;
1186                    let left_project =
1187                        left_projects
1188                            .entry(collaborator.project_id)
1189                            .or_insert(LeftProject {
1190                                id: collaborator.project_id,
1191                                host_user_id: Default::default(),
1192                                connection_ids: Default::default(),
1193                                host_connection_id: Default::default(),
1194                            });
1195
1196                    let collaborator_connection_id =
1197                        ConnectionId(collaborator.connection_id as u32);
1198                    if collaborator_connection_id != connection_id {
1199                        left_project.connection_ids.push(collaborator_connection_id);
1200                    }
1201
1202                    if collaborator.is_host {
1203                        left_project.host_user_id = collaborator.user_id;
1204                        left_project.host_connection_id =
1205                            ConnectionId(collaborator.connection_id as u32);
1206                    }
1207                }
1208                drop(collaborators);
1209
1210                // Leave projects.
1211                project_collaborator::Entity::delete_many()
1212                    .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0))
1213                    .exec(&tx)
1214                    .await?;
1215
1216                // Unshare projects.
1217                project::Entity::delete_many()
1218                    .filter(
1219                        project::Column::RoomId
1220                            .eq(room_id)
1221                            .and(project::Column::HostConnectionId.eq(connection_id.0)),
1222                    )
1223                    .exec(&tx)
1224                    .await?;
1225
1226                let room = self.get_room(room_id, &tx).await?;
1227                Ok(Some(
1228                    self.commit_room_transaction(
1229                        room_id,
1230                        tx,
1231                        LeftRoom {
1232                            room,
1233                            left_projects,
1234                            canceled_calls_to_user_ids,
1235                        },
1236                    )
1237                    .await?,
1238                ))
1239            } else {
1240                Ok(None)
1241            }
1242        })
1243        .await
1244    }
1245
1246    pub async fn update_room_participant_location(
1247        &self,
1248        room_id: RoomId,
1249        connection_id: ConnectionId,
1250        location: proto::ParticipantLocation,
1251    ) -> Result<RoomGuard<proto::Room>> {
1252        self.transact(|tx| async {
1253            let mut tx = tx;
1254            let location_kind;
1255            let location_project_id;
1256            match location
1257                .variant
1258                .as_ref()
1259                .ok_or_else(|| anyhow!("invalid location"))?
1260            {
1261                proto::participant_location::Variant::SharedProject(project) => {
1262                    location_kind = 0;
1263                    location_project_id = Some(ProjectId::from_proto(project.id));
1264                }
1265                proto::participant_location::Variant::UnsharedProject(_) => {
1266                    location_kind = 1;
1267                    location_project_id = None;
1268                }
1269                proto::participant_location::Variant::External(_) => {
1270                    location_kind = 2;
1271                    location_project_id = None;
1272                }
1273            }
1274
1275            let result = room_participant::Entity::update_many()
1276                .filter(
1277                    room_participant::Column::RoomId
1278                        .eq(room_id)
1279                        .and(room_participant::Column::AnsweringConnectionId.eq(connection_id.0)),
1280                )
1281                .set(room_participant::ActiveModel {
1282                    location_kind: ActiveValue::set(Some(location_kind)),
1283                    location_project_id: ActiveValue::set(location_project_id),
1284                    ..Default::default()
1285                })
1286                .exec(&tx)
1287                .await?;
1288
1289            if result.rows_affected == 1 {
1290                let room = self.get_room(room_id, &mut tx).await?;
1291                self.commit_room_transaction(room_id, tx, room).await
1292            } else {
1293                Err(anyhow!("could not update room participant location"))?
1294            }
1295        })
1296        .await
1297    }
1298
1299    async fn get_guest_connection_ids(
1300        &self,
1301        project_id: ProjectId,
1302        tx: &DatabaseTransaction,
1303    ) -> Result<Vec<ConnectionId>> {
1304        todo!()
1305        // let mut guest_connection_ids = Vec::new();
1306        // let mut db_guest_connection_ids = sqlx::query_scalar::<_, i32>(
1307        //     "
1308        //     SELECT connection_id
1309        //     FROM project_collaborators
1310        //     WHERE project_id = $1 AND is_host = FALSE
1311        //     ",
1312        // )
1313        // .bind(project_id)
1314        // .fetch(tx);
1315        // while let Some(connection_id) = db_guest_connection_ids.next().await {
1316        //     guest_connection_ids.push(ConnectionId(connection_id? as u32));
1317        // }
1318        // Ok(guest_connection_ids)
1319    }
1320
1321    fn build_incoming_call(
1322        room: &proto::Room,
1323        called_user_id: UserId,
1324    ) -> Option<proto::IncomingCall> {
1325        let pending_participant = room
1326            .pending_participants
1327            .iter()
1328            .find(|participant| participant.user_id == called_user_id.to_proto())?;
1329
1330        Some(proto::IncomingCall {
1331            room_id: room.id,
1332            calling_user_id: pending_participant.calling_user_id,
1333            participant_user_ids: room
1334                .participants
1335                .iter()
1336                .map(|participant| participant.user_id)
1337                .collect(),
1338            initial_project: room.participants.iter().find_map(|participant| {
1339                let initial_project_id = pending_participant.initial_project_id?;
1340                participant
1341                    .projects
1342                    .iter()
1343                    .find(|project| project.id == initial_project_id)
1344                    .cloned()
1345            }),
1346        })
1347    }
1348
1349    async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1350        let db_room = room::Entity::find_by_id(room_id)
1351            .one(tx)
1352            .await?
1353            .ok_or_else(|| anyhow!("could not find room"))?;
1354
1355        let mut db_participants = db_room
1356            .find_related(room_participant::Entity)
1357            .stream(tx)
1358            .await?;
1359        let mut participants = HashMap::default();
1360        let mut pending_participants = Vec::new();
1361        while let Some(db_participant) = db_participants.next().await {
1362            let db_participant = db_participant?;
1363            if let Some(answering_connection_id) = db_participant.answering_connection_id {
1364                let location = match (
1365                    db_participant.location_kind,
1366                    db_participant.location_project_id,
1367                ) {
1368                    (Some(0), Some(project_id)) => {
1369                        Some(proto::participant_location::Variant::SharedProject(
1370                            proto::participant_location::SharedProject {
1371                                id: project_id.to_proto(),
1372                            },
1373                        ))
1374                    }
1375                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1376                        Default::default(),
1377                    )),
1378                    _ => Some(proto::participant_location::Variant::External(
1379                        Default::default(),
1380                    )),
1381                };
1382                participants.insert(
1383                    answering_connection_id,
1384                    proto::Participant {
1385                        user_id: db_participant.user_id.to_proto(),
1386                        peer_id: answering_connection_id as u32,
1387                        projects: Default::default(),
1388                        location: Some(proto::ParticipantLocation { variant: location }),
1389                    },
1390                );
1391            } else {
1392                pending_participants.push(proto::PendingParticipant {
1393                    user_id: db_participant.user_id.to_proto(),
1394                    calling_user_id: db_participant.calling_user_id.to_proto(),
1395                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1396                });
1397            }
1398        }
1399        drop(db_participants);
1400
1401        let mut db_projects = db_room
1402            .find_related(project::Entity)
1403            .find_with_related(worktree::Entity)
1404            .stream(tx)
1405            .await?;
1406
1407        while let Some(row) = db_projects.next().await {
1408            let (db_project, db_worktree) = row?;
1409            if let Some(participant) = participants.get_mut(&db_project.host_connection_id) {
1410                let project = if let Some(project) = participant
1411                    .projects
1412                    .iter_mut()
1413                    .find(|project| project.id == db_project.id.to_proto())
1414                {
1415                    project
1416                } else {
1417                    participant.projects.push(proto::ParticipantProject {
1418                        id: db_project.id.to_proto(),
1419                        worktree_root_names: Default::default(),
1420                    });
1421                    participant.projects.last_mut().unwrap()
1422                };
1423
1424                if let Some(db_worktree) = db_worktree {
1425                    project.worktree_root_names.push(db_worktree.root_name);
1426                }
1427            }
1428        }
1429
1430        Ok(proto::Room {
1431            id: db_room.id.to_proto(),
1432            live_kit_room: db_room.live_kit_room,
1433            participants: participants.into_values().collect(),
1434            pending_participants,
1435        })
1436    }
1437
1438    async fn commit_room_transaction<T>(
1439        &self,
1440        room_id: RoomId,
1441        tx: DatabaseTransaction,
1442        data: T,
1443    ) -> Result<RoomGuard<T>> {
1444        let lock = self.rooms.entry(room_id).or_default().clone();
1445        let _guard = lock.lock_owned().await;
1446        tx.commit().await?;
1447        Ok(RoomGuard {
1448            data,
1449            _guard,
1450            _not_send: PhantomData,
1451        })
1452    }
1453
1454    // projects
1455
1456    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
1457        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1458        enum QueryAs {
1459            Count,
1460        }
1461
1462        self.transact(|tx| async move {
1463            Ok(project::Entity::find()
1464                .select_only()
1465                .column_as(project::Column::Id.count(), QueryAs::Count)
1466                .inner_join(user::Entity)
1467                .filter(user::Column::Admin.eq(false))
1468                .into_values::<_, QueryAs>()
1469                .one(&tx)
1470                .await?
1471                .unwrap_or(0) as usize)
1472        })
1473        .await
1474    }
1475
1476    pub async fn share_project(
1477        &self,
1478        room_id: RoomId,
1479        connection_id: ConnectionId,
1480        worktrees: &[proto::WorktreeMetadata],
1481    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
1482        self.transact(|tx| async move {
1483            let participant = room_participant::Entity::find()
1484                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0))
1485                .one(&tx)
1486                .await?
1487                .ok_or_else(|| anyhow!("could not find participant"))?;
1488            if participant.room_id != room_id {
1489                return Err(anyhow!("shared project on unexpected room"))?;
1490            }
1491
1492            let project = project::ActiveModel {
1493                room_id: ActiveValue::set(participant.room_id),
1494                host_user_id: ActiveValue::set(participant.user_id),
1495                host_connection_id: ActiveValue::set(connection_id.0 as i32),
1496                ..Default::default()
1497            }
1498            .insert(&tx)
1499            .await?;
1500
1501            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
1502                id: ActiveValue::set(worktree.id as i32),
1503                project_id: ActiveValue::set(project.id),
1504                abs_path: ActiveValue::set(worktree.abs_path.clone()),
1505                root_name: ActiveValue::set(worktree.root_name.clone()),
1506                visible: ActiveValue::set(worktree.visible),
1507                scan_id: ActiveValue::set(0),
1508                is_complete: ActiveValue::set(false),
1509            }))
1510            .exec(&tx)
1511            .await?;
1512
1513            project_collaborator::ActiveModel {
1514                project_id: ActiveValue::set(project.id),
1515                connection_id: ActiveValue::set(connection_id.0 as i32),
1516                user_id: ActiveValue::set(participant.user_id),
1517                replica_id: ActiveValue::set(ReplicaId(0)),
1518                is_host: ActiveValue::set(true),
1519                ..Default::default()
1520            }
1521            .insert(&tx)
1522            .await?;
1523
1524            let room = self.get_room(room_id, &tx).await?;
1525            self.commit_room_transaction(room_id, tx, (project.id, room))
1526                .await
1527        })
1528        .await
1529    }
1530
1531    pub async fn unshare_project(
1532        &self,
1533        project_id: ProjectId,
1534        connection_id: ConnectionId,
1535    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
1536        self.transact(|tx| async move {
1537            todo!()
1538            // let guest_connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?;
1539            // let room_id: RoomId = sqlx::query_scalar(
1540            //     "
1541            //     DELETE FROM projects
1542            //     WHERE id = $1 AND host_connection_id = $2
1543            //     RETURNING room_id
1544            //     ",
1545            // )
1546            // .bind(project_id)
1547            // .bind(connection_id.0 as i32)
1548            // .fetch_one(&mut tx)
1549            // .await?;
1550            // let room = self.get_room(room_id, &mut tx).await?;
1551            // self.commit_room_transaction(room_id, tx, (room, guest_connection_ids))
1552            //     .await
1553        })
1554        .await
1555    }
1556
1557    pub async fn update_project(
1558        &self,
1559        project_id: ProjectId,
1560        connection_id: ConnectionId,
1561        worktrees: &[proto::WorktreeMetadata],
1562    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
1563        self.transact(|tx| async move {
1564            todo!()
1565            // let room_id: RoomId = sqlx::query_scalar(
1566            //     "
1567            //     SELECT room_id
1568            //     FROM projects
1569            //     WHERE id = $1 AND host_connection_id = $2
1570            //     ",
1571            // )
1572            // .bind(project_id)
1573            // .bind(connection_id.0 as i32)
1574            // .fetch_one(&mut tx)
1575            // .await?;
1576
1577            // if !worktrees.is_empty() {
1578            //     let mut params = "(?, ?, ?, ?, ?, ?, ?),".repeat(worktrees.len());
1579            //     params.pop();
1580            //     let query = format!(
1581            //         "
1582            //         INSERT INTO worktrees (
1583            //         project_id,
1584            //         id,
1585            //         root_name,
1586            //         abs_path,
1587            //         visible,
1588            //         scan_id,
1589            //         is_complete
1590            //         )
1591            //         VALUES {params}
1592            //         ON CONFLICT (project_id, id) DO UPDATE SET root_name = excluded.root_name
1593            //         "
1594            //     );
1595
1596            //     let mut query = sqlx::query(&query);
1597            //     for worktree in worktrees {
1598            //         query = query
1599            //             .bind(project_id)
1600            //             .bind(worktree.id as i32)
1601            //             .bind(&worktree.root_name)
1602            //             .bind(&worktree.abs_path)
1603            //             .bind(worktree.visible)
1604            //             .bind(0)
1605            //             .bind(false)
1606            //     }
1607            //     query.execute(&mut tx).await?;
1608            // }
1609
1610            // let mut params = "?,".repeat(worktrees.len());
1611            // if !worktrees.is_empty() {
1612            //     params.pop();
1613            // }
1614            // let query = format!(
1615            //     "
1616            //     DELETE FROM worktrees
1617            //     WHERE project_id = ? AND id NOT IN ({params})
1618            //     ",
1619            // );
1620
1621            // let mut query = sqlx::query(&query).bind(project_id);
1622            // for worktree in worktrees {
1623            //     query = query.bind(WorktreeId(worktree.id as i32));
1624            // }
1625            // query.execute(&mut tx).await?;
1626
1627            // let guest_connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?;
1628            // let room = self.get_room(room_id, &mut tx).await?;
1629            // self.commit_room_transaction(room_id, tx, (room, guest_connection_ids))
1630            //     .await
1631        })
1632        .await
1633    }
1634
1635    pub async fn update_worktree(
1636        &self,
1637        update: &proto::UpdateWorktree,
1638        connection_id: ConnectionId,
1639    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
1640        self.transact(|tx| async move {
1641            todo!()
1642            // let project_id = ProjectId::from_proto(update.project_id);
1643            // let worktree_id = WorktreeId::from_proto(update.worktree_id);
1644
1645            // // Ensure the update comes from the host.
1646            // let room_id: RoomId = sqlx::query_scalar(
1647            //     "
1648            //     SELECT room_id
1649            //     FROM projects
1650            //     WHERE id = $1 AND host_connection_id = $2
1651            //     ",
1652            // )
1653            // .bind(project_id)
1654            // .bind(connection_id.0 as i32)
1655            // .fetch_one(&mut tx)
1656            // .await?;
1657
1658            // // Update metadata.
1659            // sqlx::query(
1660            //     "
1661            //     UPDATE worktrees
1662            //     SET
1663            //     root_name = $1,
1664            //     scan_id = $2,
1665            //     is_complete = $3,
1666            //     abs_path = $4
1667            //     WHERE project_id = $5 AND id = $6
1668            //     RETURNING 1
1669            //     ",
1670            // )
1671            // .bind(&update.root_name)
1672            // .bind(update.scan_id as i64)
1673            // .bind(update.is_last_update)
1674            // .bind(&update.abs_path)
1675            // .bind(project_id)
1676            // .bind(worktree_id)
1677            // .fetch_one(&mut tx)
1678            // .await?;
1679
1680            // if !update.updated_entries.is_empty() {
1681            //     let mut params =
1682            //         "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?),".repeat(update.updated_entries.len());
1683            //     params.pop();
1684
1685            //     let query = format!(
1686            //         "
1687            //         INSERT INTO worktree_entries (
1688            //         project_id,
1689            //         worktree_id,
1690            //         id,
1691            //         is_dir,
1692            //         path,
1693            //         inode,
1694            //         mtime_seconds,
1695            //         mtime_nanos,
1696            //         is_symlink,
1697            //         is_ignored
1698            //         )
1699            //         VALUES {params}
1700            //         ON CONFLICT (project_id, worktree_id, id) DO UPDATE SET
1701            //         is_dir = excluded.is_dir,
1702            //         path = excluded.path,
1703            //         inode = excluded.inode,
1704            //         mtime_seconds = excluded.mtime_seconds,
1705            //         mtime_nanos = excluded.mtime_nanos,
1706            //         is_symlink = excluded.is_symlink,
1707            //         is_ignored = excluded.is_ignored
1708            //         "
1709            //     );
1710            //     let mut query = sqlx::query(&query);
1711            //     for entry in &update.updated_entries {
1712            //         let mtime = entry.mtime.clone().unwrap_or_default();
1713            //         query = query
1714            //             .bind(project_id)
1715            //             .bind(worktree_id)
1716            //             .bind(entry.id as i64)
1717            //             .bind(entry.is_dir)
1718            //             .bind(&entry.path)
1719            //             .bind(entry.inode as i64)
1720            //             .bind(mtime.seconds as i64)
1721            //             .bind(mtime.nanos as i32)
1722            //             .bind(entry.is_symlink)
1723            //             .bind(entry.is_ignored);
1724            //     }
1725            //     query.execute(&mut tx).await?;
1726            // }
1727
1728            // if !update.removed_entries.is_empty() {
1729            //     let mut params = "?,".repeat(update.removed_entries.len());
1730            //     params.pop();
1731            //     let query = format!(
1732            //         "
1733            //         DELETE FROM worktree_entries
1734            //         WHERE project_id = ? AND worktree_id = ? AND id IN ({params})
1735            //         "
1736            //     );
1737
1738            //     let mut query = sqlx::query(&query).bind(project_id).bind(worktree_id);
1739            //     for entry_id in &update.removed_entries {
1740            //         query = query.bind(*entry_id as i64);
1741            //     }
1742            //     query.execute(&mut tx).await?;
1743            // }
1744
1745            // let connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?;
1746            // self.commit_room_transaction(room_id, tx, connection_ids)
1747            //     .await
1748        })
1749        .await
1750    }
1751
1752    pub async fn update_diagnostic_summary(
1753        &self,
1754        update: &proto::UpdateDiagnosticSummary,
1755        connection_id: ConnectionId,
1756    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
1757        self.transact(|tx| async {
1758            todo!()
1759            // let project_id = ProjectId::from_proto(update.project_id);
1760            // let worktree_id = WorktreeId::from_proto(update.worktree_id);
1761            // let summary = update
1762            //     .summary
1763            //     .as_ref()
1764            //     .ok_or_else(|| anyhow!("invalid summary"))?;
1765
1766            // // Ensure the update comes from the host.
1767            // let room_id: RoomId = sqlx::query_scalar(
1768            //     "
1769            //     SELECT room_id
1770            //     FROM projects
1771            //     WHERE id = $1 AND host_connection_id = $2
1772            //     ",
1773            // )
1774            // .bind(project_id)
1775            // .bind(connection_id.0 as i32)
1776            // .fetch_one(&mut tx)
1777            // .await?;
1778
1779            // // Update summary.
1780            // sqlx::query(
1781            //     "
1782            //     INSERT INTO worktree_diagnostic_summaries (
1783            //     project_id,
1784            //     worktree_id,
1785            //     path,
1786            //     language_server_id,
1787            //     error_count,
1788            //     warning_count
1789            //     )
1790            //     VALUES ($1, $2, $3, $4, $5, $6)
1791            //     ON CONFLICT (project_id, worktree_id, path) DO UPDATE SET
1792            //     language_server_id = excluded.language_server_id,
1793            //     error_count = excluded.error_count,
1794            //     warning_count = excluded.warning_count
1795            //     ",
1796            // )
1797            // .bind(project_id)
1798            // .bind(worktree_id)
1799            // .bind(&summary.path)
1800            // .bind(summary.language_server_id as i64)
1801            // .bind(summary.error_count as i32)
1802            // .bind(summary.warning_count as i32)
1803            // .execute(&mut tx)
1804            // .await?;
1805
1806            // let connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?;
1807            // self.commit_room_transaction(room_id, tx, connection_ids)
1808            //     .await
1809        })
1810        .await
1811    }
1812
1813    pub async fn start_language_server(
1814        &self,
1815        update: &proto::StartLanguageServer,
1816        connection_id: ConnectionId,
1817    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
1818        self.transact(|tx| async {
1819            todo!()
1820            // let project_id = ProjectId::from_proto(update.project_id);
1821            // let server = update
1822            //     .server
1823            //     .as_ref()
1824            //     .ok_or_else(|| anyhow!("invalid language server"))?;
1825
1826            // // Ensure the update comes from the host.
1827            // let room_id: RoomId = sqlx::query_scalar(
1828            //     "
1829            //     SELECT room_id
1830            //     FROM projects
1831            //     WHERE id = $1 AND host_connection_id = $2
1832            //     ",
1833            // )
1834            // .bind(project_id)
1835            // .bind(connection_id.0 as i32)
1836            // .fetch_one(&mut tx)
1837            // .await?;
1838
1839            // // Add the newly-started language server.
1840            // sqlx::query(
1841            //     "
1842            //     INSERT INTO language_servers (project_id, id, name)
1843            //     VALUES ($1, $2, $3)
1844            //     ON CONFLICT (project_id, id) DO UPDATE SET
1845            //     name = excluded.name
1846            //     ",
1847            // )
1848            // .bind(project_id)
1849            // .bind(server.id as i64)
1850            // .bind(&server.name)
1851            // .execute(&mut tx)
1852            // .await?;
1853
1854            // let connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?;
1855            // self.commit_room_transaction(room_id, tx, connection_ids)
1856            //     .await
1857        })
1858        .await
1859    }
1860
1861    pub async fn join_project(
1862        &self,
1863        project_id: ProjectId,
1864        connection_id: ConnectionId,
1865    ) -> Result<RoomGuard<(Project, ReplicaId)>> {
1866        self.transact(|tx| async move {
1867            todo!()
1868            // let (room_id, user_id) = sqlx::query_as::<_, (RoomId, UserId)>(
1869            //     "
1870            //     SELECT room_id, user_id
1871            //     FROM room_participants
1872            //     WHERE answering_connection_id = $1
1873            //     ",
1874            // )
1875            // .bind(connection_id.0 as i32)
1876            // .fetch_one(&mut tx)
1877            // .await?;
1878
1879            // // Ensure project id was shared on this room.
1880            // sqlx::query(
1881            //     "
1882            //     SELECT 1
1883            //     FROM projects
1884            //     WHERE id = $1 AND room_id = $2
1885            //     ",
1886            // )
1887            // .bind(project_id)
1888            // .bind(room_id)
1889            // .fetch_one(&mut tx)
1890            // .await?;
1891
1892            // let mut collaborators = sqlx::query_as::<_, ProjectCollaborator>(
1893            //     "
1894            //     SELECT *
1895            //     FROM project_collaborators
1896            //     WHERE project_id = $1
1897            //     ",
1898            // )
1899            // .bind(project_id)
1900            // .fetch_all(&mut tx)
1901            // .await?;
1902            // let replica_ids = collaborators
1903            //     .iter()
1904            //     .map(|c| c.replica_id)
1905            //     .collect::<HashSet<_>>();
1906            // let mut replica_id = ReplicaId(1);
1907            // while replica_ids.contains(&replica_id) {
1908            //     replica_id.0 += 1;
1909            // }
1910            // let new_collaborator = ProjectCollaborator {
1911            //     project_id,
1912            //     connection_id: connection_id.0 as i32,
1913            //     user_id,
1914            //     replica_id,
1915            //     is_host: false,
1916            // };
1917
1918            // sqlx::query(
1919            //     "
1920            //     INSERT INTO project_collaborators (
1921            //     project_id,
1922            //     connection_id,
1923            //     user_id,
1924            //     replica_id,
1925            //     is_host
1926            //     )
1927            //     VALUES ($1, $2, $3, $4, $5)
1928            //     ",
1929            // )
1930            // .bind(new_collaborator.project_id)
1931            // .bind(new_collaborator.connection_id)
1932            // .bind(new_collaborator.user_id)
1933            // .bind(new_collaborator.replica_id)
1934            // .bind(new_collaborator.is_host)
1935            // .execute(&mut tx)
1936            // .await?;
1937            // collaborators.push(new_collaborator);
1938
1939            // let worktree_rows = sqlx::query_as::<_, WorktreeRow>(
1940            //     "
1941            //     SELECT *
1942            //     FROM worktrees
1943            //     WHERE project_id = $1
1944            //     ",
1945            // )
1946            // .bind(project_id)
1947            // .fetch_all(&mut tx)
1948            // .await?;
1949            // let mut worktrees = worktree_rows
1950            //     .into_iter()
1951            //     .map(|worktree_row| {
1952            //         (
1953            //             worktree_row.id,
1954            //             Worktree {
1955            //                 id: worktree_row.id,
1956            //                 abs_path: worktree_row.abs_path,
1957            //                 root_name: worktree_row.root_name,
1958            //                 visible: worktree_row.visible,
1959            //                 entries: Default::default(),
1960            //                 diagnostic_summaries: Default::default(),
1961            //                 scan_id: worktree_row.scan_id as u64,
1962            //                 is_complete: worktree_row.is_complete,
1963            //             },
1964            //         )
1965            //     })
1966            //     .collect::<BTreeMap<_, _>>();
1967
1968            // // Populate worktree entries.
1969            // {
1970            //     let mut entries = sqlx::query_as::<_, WorktreeEntry>(
1971            //         "
1972            //         SELECT *
1973            //         FROM worktree_entries
1974            //         WHERE project_id = $1
1975            //         ",
1976            //     )
1977            //     .bind(project_id)
1978            //     .fetch(&mut tx);
1979            //     while let Some(entry) = entries.next().await {
1980            //         let entry = entry?;
1981            //         if let Some(worktree) = worktrees.get_mut(&entry.worktree_id) {
1982            //             worktree.entries.push(proto::Entry {
1983            //                 id: entry.id as u64,
1984            //                 is_dir: entry.is_dir,
1985            //                 path: entry.path,
1986            //                 inode: entry.inode as u64,
1987            //                 mtime: Some(proto::Timestamp {
1988            //                     seconds: entry.mtime_seconds as u64,
1989            //                     nanos: entry.mtime_nanos as u32,
1990            //                 }),
1991            //                 is_symlink: entry.is_symlink,
1992            //                 is_ignored: entry.is_ignored,
1993            //             });
1994            //         }
1995            //     }
1996            // }
1997
1998            // // Populate worktree diagnostic summaries.
1999            // {
2000            //     let mut summaries = sqlx::query_as::<_, WorktreeDiagnosticSummary>(
2001            //         "
2002            //         SELECT *
2003            //         FROM worktree_diagnostic_summaries
2004            //         WHERE project_id = $1
2005            //         ",
2006            //     )
2007            //     .bind(project_id)
2008            //     .fetch(&mut tx);
2009            //     while let Some(summary) = summaries.next().await {
2010            //         let summary = summary?;
2011            //         if let Some(worktree) = worktrees.get_mut(&summary.worktree_id) {
2012            //             worktree
2013            //                 .diagnostic_summaries
2014            //                 .push(proto::DiagnosticSummary {
2015            //                     path: summary.path,
2016            //                     language_server_id: summary.language_server_id as u64,
2017            //                     error_count: summary.error_count as u32,
2018            //                     warning_count: summary.warning_count as u32,
2019            //                 });
2020            //         }
2021            //     }
2022            // }
2023
2024            // // Populate language servers.
2025            // let language_servers = sqlx::query_as::<_, LanguageServer>(
2026            //     "
2027            //     SELECT *
2028            //     FROM language_servers
2029            //     WHERE project_id = $1
2030            //     ",
2031            // )
2032            // .bind(project_id)
2033            // .fetch_all(&mut tx)
2034            // .await?;
2035
2036            // self.commit_room_transaction(
2037            //     room_id,
2038            //     tx,
2039            //     (
2040            //         Project {
2041            //             collaborators,
2042            //             worktrees,
2043            //             language_servers: language_servers
2044            //                 .into_iter()
2045            //                 .map(|language_server| proto::LanguageServer {
2046            //                     id: language_server.id.to_proto(),
2047            //                     name: language_server.name,
2048            //                 })
2049            //                 .collect(),
2050            //         },
2051            //         replica_id as ReplicaId,
2052            //     ),
2053            // )
2054            // .await
2055        })
2056        .await
2057    }
2058
2059    pub async fn leave_project(
2060        &self,
2061        project_id: ProjectId,
2062        connection_id: ConnectionId,
2063    ) -> Result<RoomGuard<LeftProject>> {
2064        self.transact(|tx| async move {
2065            todo!()
2066            // let result = sqlx::query(
2067            //     "
2068            //     DELETE FROM project_collaborators
2069            //     WHERE project_id = $1 AND connection_id = $2
2070            //     ",
2071            // )
2072            // .bind(project_id)
2073            // .bind(connection_id.0 as i32)
2074            // .execute(&mut tx)
2075            // .await?;
2076
2077            // if result.rows_affected() == 0 {
2078            //     Err(anyhow!("not a collaborator on this project"))?;
2079            // }
2080
2081            // let connection_ids = sqlx::query_scalar::<_, i32>(
2082            //     "
2083            //     SELECT connection_id
2084            //     FROM project_collaborators
2085            //     WHERE project_id = $1
2086            //     ",
2087            // )
2088            // .bind(project_id)
2089            // .fetch_all(&mut tx)
2090            // .await?
2091            // .into_iter()
2092            // .map(|id| ConnectionId(id as u32))
2093            // .collect();
2094
2095            // let (room_id, host_user_id, host_connection_id) =
2096            //     sqlx::query_as::<_, (RoomId, i32, i32)>(
2097            //         "
2098            //         SELECT room_id, host_user_id, host_connection_id
2099            //         FROM projects
2100            //         WHERE id = $1
2101            //         ",
2102            //     )
2103            //     .bind(project_id)
2104            //     .fetch_one(&mut tx)
2105            //     .await?;
2106
2107            // self.commit_room_transaction(
2108            //     room_id,
2109            //     tx,
2110            //     LeftProject {
2111            //         id: project_id,
2112            //         host_user_id: UserId(host_user_id),
2113            //         host_connection_id: ConnectionId(host_connection_id as u32),
2114            //         connection_ids,
2115            //     },
2116            // )
2117            // .await
2118        })
2119        .await
2120    }
2121
2122    pub async fn project_collaborators(
2123        &self,
2124        project_id: ProjectId,
2125        connection_id: ConnectionId,
2126    ) -> Result<Vec<project_collaborator::Model>> {
2127        self.transact(|tx| async move {
2128            todo!()
2129            // let collaborators = sqlx::query_as::<_, ProjectCollaborator>(
2130            //     "
2131            //     SELECT *
2132            //     FROM project_collaborators
2133            //     WHERE project_id = $1
2134            //     ",
2135            // )
2136            // .bind(project_id)
2137            // .fetch_all(&mut tx)
2138            // .await?;
2139
2140            // if collaborators
2141            //     .iter()
2142            //     .any(|collaborator| collaborator.connection_id == connection_id.0 as i32)
2143            // {
2144            //     Ok(collaborators)
2145            // } else {
2146            //     Err(anyhow!("no such project"))?
2147            // }
2148        })
2149        .await
2150    }
2151
2152    pub async fn project_connection_ids(
2153        &self,
2154        project_id: ProjectId,
2155        connection_id: ConnectionId,
2156    ) -> Result<HashSet<ConnectionId>> {
2157        self.transact(|tx| async move {
2158            todo!()
2159            // let connection_ids = sqlx::query_scalar::<_, i32>(
2160            //     "
2161            //     SELECT connection_id
2162            //     FROM project_collaborators
2163            //     WHERE project_id = $1
2164            //     ",
2165            // )
2166            // .bind(project_id)
2167            // .fetch_all(&mut tx)
2168            // .await?;
2169
2170            // if connection_ids.contains(&(connection_id.0 as i32)) {
2171            //     Ok(connection_ids
2172            //         .into_iter()
2173            //         .map(|connection_id| ConnectionId(connection_id as u32))
2174            //         .collect())
2175            // } else {
2176            //     Err(anyhow!("no such project"))?
2177            // }
2178        })
2179        .await
2180    }
2181
2182    // access tokens
2183
2184    pub async fn create_access_token_hash(
2185        &self,
2186        user_id: UserId,
2187        access_token_hash: &str,
2188        max_access_token_count: usize,
2189    ) -> Result<()> {
2190        self.transact(|tx| async {
2191            let tx = tx;
2192
2193            access_token::ActiveModel {
2194                user_id: ActiveValue::set(user_id),
2195                hash: ActiveValue::set(access_token_hash.into()),
2196                ..Default::default()
2197            }
2198            .insert(&tx)
2199            .await?;
2200
2201            access_token::Entity::delete_many()
2202                .filter(
2203                    access_token::Column::Id.in_subquery(
2204                        Query::select()
2205                            .column(access_token::Column::Id)
2206                            .from(access_token::Entity)
2207                            .and_where(access_token::Column::UserId.eq(user_id))
2208                            .order_by(access_token::Column::Id, sea_orm::Order::Desc)
2209                            .limit(10000)
2210                            .offset(max_access_token_count as u64)
2211                            .to_owned(),
2212                    ),
2213                )
2214                .exec(&tx)
2215                .await?;
2216            tx.commit().await?;
2217            Ok(())
2218        })
2219        .await
2220    }
2221
2222    pub async fn get_access_token_hashes(&self, user_id: UserId) -> Result<Vec<String>> {
2223        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2224        enum QueryAs {
2225            Hash,
2226        }
2227
2228        self.transact(|tx| async move {
2229            Ok(access_token::Entity::find()
2230                .select_only()
2231                .column(access_token::Column::Hash)
2232                .filter(access_token::Column::UserId.eq(user_id))
2233                .order_by_desc(access_token::Column::Id)
2234                .into_values::<_, QueryAs>()
2235                .all(&tx)
2236                .await?)
2237        })
2238        .await
2239    }
2240
2241    async fn transact<F, Fut, T>(&self, f: F) -> Result<T>
2242    where
2243        F: Send + Fn(DatabaseTransaction) -> Fut,
2244        Fut: Send + Future<Output = Result<T>>,
2245    {
2246        let body = async {
2247            loop {
2248                let tx = self.pool.begin().await?;
2249
2250                // In Postgres, serializable transactions are opt-in
2251                if let DatabaseBackend::Postgres = self.pool.get_database_backend() {
2252                    tx.execute(Statement::from_string(
2253                        DatabaseBackend::Postgres,
2254                        "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;".into(),
2255                    ))
2256                    .await?;
2257                }
2258
2259                match f(tx).await {
2260                    Ok(result) => return Ok(result),
2261                    Err(error) => match error {
2262                        Error::Database2(
2263                            DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
2264                            | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
2265                        ) if error
2266                            .as_database_error()
2267                            .and_then(|error| error.code())
2268                            .as_deref()
2269                            == Some("40001") =>
2270                        {
2271                            // Retry (don't break the loop)
2272                        }
2273                        error @ _ => return Err(error),
2274                    },
2275                }
2276            }
2277        };
2278
2279        #[cfg(test)]
2280        {
2281            if let Some(background) = self.background.as_ref() {
2282                background.simulate_random_delay().await;
2283            }
2284
2285            self.runtime.as_ref().unwrap().block_on(body)
2286        }
2287
2288        #[cfg(not(test))]
2289        {
2290            body.await
2291        }
2292    }
2293}
2294
2295pub struct RoomGuard<T> {
2296    data: T,
2297    _guard: OwnedMutexGuard<()>,
2298    _not_send: PhantomData<Rc<()>>,
2299}
2300
2301impl<T> Deref for RoomGuard<T> {
2302    type Target = T;
2303
2304    fn deref(&self) -> &T {
2305        &self.data
2306    }
2307}
2308
2309impl<T> DerefMut for RoomGuard<T> {
2310    fn deref_mut(&mut self) -> &mut T {
2311        &mut self.data
2312    }
2313}
2314
2315#[derive(Debug, Serialize, Deserialize)]
2316pub struct NewUserParams {
2317    pub github_login: String,
2318    pub github_user_id: i32,
2319    pub invite_count: i32,
2320}
2321
2322#[derive(Debug)]
2323pub struct NewUserResult {
2324    pub user_id: UserId,
2325    pub metrics_id: String,
2326    pub inviting_user_id: Option<UserId>,
2327    pub signup_device_id: Option<String>,
2328}
2329
2330fn random_invite_code() -> String {
2331    nanoid::nanoid!(16)
2332}
2333
2334fn random_email_confirmation_code() -> String {
2335    nanoid::nanoid!(64)
2336}
2337
2338macro_rules! id_type {
2339    ($name:ident) => {
2340        #[derive(
2341            Clone,
2342            Copy,
2343            Debug,
2344            Default,
2345            PartialEq,
2346            Eq,
2347            PartialOrd,
2348            Ord,
2349            Hash,
2350            sqlx::Type,
2351            Serialize,
2352            Deserialize,
2353        )]
2354        #[sqlx(transparent)]
2355        #[serde(transparent)]
2356        pub struct $name(pub i32);
2357
2358        impl $name {
2359            #[allow(unused)]
2360            pub const MAX: Self = Self(i32::MAX);
2361
2362            #[allow(unused)]
2363            pub fn from_proto(value: u64) -> Self {
2364                Self(value as i32)
2365            }
2366
2367            #[allow(unused)]
2368            pub fn to_proto(self) -> u64 {
2369                self.0 as u64
2370            }
2371        }
2372
2373        impl std::fmt::Display for $name {
2374            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2375                self.0.fmt(f)
2376            }
2377        }
2378
2379        impl From<$name> for sea_query::Value {
2380            fn from(value: $name) -> Self {
2381                sea_query::Value::Int(Some(value.0))
2382            }
2383        }
2384
2385        impl sea_orm::TryGetable for $name {
2386            fn try_get(
2387                res: &sea_orm::QueryResult,
2388                pre: &str,
2389                col: &str,
2390            ) -> Result<Self, sea_orm::TryGetError> {
2391                Ok(Self(i32::try_get(res, pre, col)?))
2392            }
2393        }
2394
2395        impl sea_query::ValueType for $name {
2396            fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
2397                match v {
2398                    Value::TinyInt(Some(int)) => {
2399                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2400                    }
2401                    Value::SmallInt(Some(int)) => {
2402                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2403                    }
2404                    Value::Int(Some(int)) => {
2405                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2406                    }
2407                    Value::BigInt(Some(int)) => {
2408                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2409                    }
2410                    Value::TinyUnsigned(Some(int)) => {
2411                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2412                    }
2413                    Value::SmallUnsigned(Some(int)) => {
2414                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2415                    }
2416                    Value::Unsigned(Some(int)) => {
2417                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2418                    }
2419                    Value::BigUnsigned(Some(int)) => {
2420                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2421                    }
2422                    _ => Err(sea_query::ValueTypeErr),
2423                }
2424            }
2425
2426            fn type_name() -> String {
2427                stringify!($name).into()
2428            }
2429
2430            fn array_type() -> sea_query::ArrayType {
2431                sea_query::ArrayType::Int
2432            }
2433
2434            fn column_type() -> sea_query::ColumnType {
2435                sea_query::ColumnType::Integer(None)
2436            }
2437        }
2438
2439        impl sea_orm::TryFromU64 for $name {
2440            fn try_from_u64(n: u64) -> Result<Self, DbErr> {
2441                Ok(Self(n.try_into().map_err(|_| {
2442                    DbErr::ConvertFromU64(concat!(
2443                        "error converting ",
2444                        stringify!($name),
2445                        " to u64"
2446                    ))
2447                })?))
2448            }
2449        }
2450
2451        impl sea_query::Nullable for $name {
2452            fn null() -> Value {
2453                Value::Int(None)
2454            }
2455        }
2456    };
2457}
2458
2459id_type!(AccessTokenId);
2460id_type!(ContactId);
2461id_type!(RoomId);
2462id_type!(RoomParticipantId);
2463id_type!(ProjectId);
2464id_type!(ProjectCollaboratorId);
2465id_type!(ReplicaId);
2466id_type!(SignupId);
2467id_type!(UserId);
2468id_type!(WorktreeId);
2469
2470pub struct LeftRoom {
2471    pub room: proto::Room,
2472    pub left_projects: HashMap<ProjectId, LeftProject>,
2473    pub canceled_calls_to_user_ids: Vec<UserId>,
2474}
2475
2476pub struct Project {
2477    pub collaborators: Vec<project_collaborator::Model>,
2478    pub worktrees: BTreeMap<WorktreeId, Worktree>,
2479    pub language_servers: Vec<proto::LanguageServer>,
2480}
2481
2482pub struct LeftProject {
2483    pub id: ProjectId,
2484    pub host_user_id: UserId,
2485    pub host_connection_id: ConnectionId,
2486    pub connection_ids: Vec<ConnectionId>,
2487}
2488
2489pub struct Worktree {
2490    pub id: WorktreeId,
2491    pub abs_path: String,
2492    pub root_name: String,
2493    pub visible: bool,
2494    pub entries: Vec<proto::Entry>,
2495    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
2496    pub scan_id: u64,
2497    pub is_complete: bool,
2498}
2499
2500#[cfg(test)]
2501pub use test::*;
2502
2503#[cfg(test)]
2504mod test {
2505    use super::*;
2506    use gpui::executor::Background;
2507    use lazy_static::lazy_static;
2508    use parking_lot::Mutex;
2509    use rand::prelude::*;
2510    use sea_orm::ConnectionTrait;
2511    use sqlx::migrate::MigrateDatabase;
2512    use std::sync::Arc;
2513
2514    pub struct TestDb {
2515        pub db: Option<Arc<Database>>,
2516        pub connection: Option<sqlx::AnyConnection>,
2517    }
2518
2519    impl TestDb {
2520        pub fn sqlite(background: Arc<Background>) -> Self {
2521            let url = format!("sqlite::memory:");
2522            let runtime = tokio::runtime::Builder::new_current_thread()
2523                .enable_io()
2524                .enable_time()
2525                .build()
2526                .unwrap();
2527
2528            let mut db = runtime.block_on(async {
2529                let mut options = ConnectOptions::new(url);
2530                options.max_connections(5);
2531                let db = Database::new(options).await.unwrap();
2532                let sql = include_str!(concat!(
2533                    env!("CARGO_MANIFEST_DIR"),
2534                    "/migrations.sqlite/20221109000000_test_schema.sql"
2535                ));
2536                db.pool
2537                    .execute(sea_orm::Statement::from_string(
2538                        db.pool.get_database_backend(),
2539                        sql.into(),
2540                    ))
2541                    .await
2542                    .unwrap();
2543                db
2544            });
2545
2546            db.background = Some(background);
2547            db.runtime = Some(runtime);
2548
2549            Self {
2550                db: Some(Arc::new(db)),
2551                connection: None,
2552            }
2553        }
2554
2555        pub fn postgres(background: Arc<Background>) -> Self {
2556            lazy_static! {
2557                static ref LOCK: Mutex<()> = Mutex::new(());
2558            }
2559
2560            let _guard = LOCK.lock();
2561            let mut rng = StdRng::from_entropy();
2562            let url = format!(
2563                "postgres://postgres@localhost/zed-test-{}",
2564                rng.gen::<u128>()
2565            );
2566            let runtime = tokio::runtime::Builder::new_current_thread()
2567                .enable_io()
2568                .enable_time()
2569                .build()
2570                .unwrap();
2571
2572            let mut db = runtime.block_on(async {
2573                sqlx::Postgres::create_database(&url)
2574                    .await
2575                    .expect("failed to create test db");
2576                let mut options = ConnectOptions::new(url);
2577                options
2578                    .max_connections(5)
2579                    .idle_timeout(Duration::from_secs(0));
2580                let db = Database::new(options).await.unwrap();
2581                let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
2582                db.migrate(Path::new(migrations_path), false).await.unwrap();
2583                db
2584            });
2585
2586            db.background = Some(background);
2587            db.runtime = Some(runtime);
2588
2589            Self {
2590                db: Some(Arc::new(db)),
2591                connection: None,
2592            }
2593        }
2594
2595        pub fn db(&self) -> &Arc<Database> {
2596            self.db.as_ref().unwrap()
2597        }
2598    }
2599
2600    impl Drop for TestDb {
2601        fn drop(&mut self) {
2602            let db = self.db.take().unwrap();
2603            if let DatabaseBackend::Postgres = db.pool.get_database_backend() {
2604                db.runtime.as_ref().unwrap().block_on(async {
2605                    use util::ResultExt;
2606                    let query = "
2607                        SELECT pg_terminate_backend(pg_stat_activity.pid)
2608                        FROM pg_stat_activity
2609                        WHERE
2610                            pg_stat_activity.datname = current_database() AND
2611                            pid <> pg_backend_pid();
2612                    ";
2613                    db.pool
2614                        .execute(sea_orm::Statement::from_string(
2615                            db.pool.get_database_backend(),
2616                            query.into(),
2617                        ))
2618                        .await
2619                        .log_err();
2620                    sqlx::Postgres::drop_database(db.options.get_url())
2621                        .await
2622                        .log_err();
2623                })
2624            }
2625        }
2626    }
2627}