db.rs

   1mod access_token;
   2mod contact;
   3mod language_server;
   4mod project;
   5mod project_collaborator;
   6mod room;
   7mod room_participant;
   8mod signup;
   9#[cfg(test)]
  10mod tests;
  11mod user;
  12mod worktree;
  13mod worktree_diagnostic_summary;
  14mod worktree_entry;
  15
  16use crate::{Error, Result};
  17use anyhow::anyhow;
  18use collections::{BTreeMap, HashMap, HashSet};
  19pub use contact::Contact;
  20use dashmap::DashMap;
  21use futures::StreamExt;
  22use hyper::StatusCode;
  23use rpc::{proto, ConnectionId};
  24pub use sea_orm::ConnectOptions;
  25use sea_orm::{
  26    entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseBackend, DatabaseConnection,
  27    DatabaseTransaction, DbErr, FromQueryResult, IntoActiveModel, JoinType, QueryOrder,
  28    QuerySelect, Statement, TransactionTrait,
  29};
  30use sea_query::{Alias, Expr, OnConflict, Query};
  31use serde::{Deserialize, Serialize};
  32pub use signup::{Invite, NewSignup, WaitlistSummary};
  33use sqlx::migrate::{Migrate, Migration, MigrationSource};
  34use sqlx::Connection;
  35use std::ops::{Deref, DerefMut};
  36use std::path::Path;
  37use std::time::Duration;
  38use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
  39use tokio::sync::{Mutex, OwnedMutexGuard};
  40pub use user::Model as User;
  41
  42pub struct Database {
  43    options: ConnectOptions,
  44    pool: DatabaseConnection,
  45    rooms: DashMap<RoomId, Arc<Mutex<()>>>,
  46    #[cfg(test)]
  47    background: Option<std::sync::Arc<gpui::executor::Background>>,
  48    #[cfg(test)]
  49    runtime: Option<tokio::runtime::Runtime>,
  50    epoch: Uuid,
  51}
  52
  53impl Database {
  54    pub async fn new(options: ConnectOptions) -> Result<Self> {
  55        Ok(Self {
  56            options: options.clone(),
  57            pool: sea_orm::Database::connect(options).await?,
  58            rooms: DashMap::with_capacity(16384),
  59            #[cfg(test)]
  60            background: None,
  61            #[cfg(test)]
  62            runtime: None,
  63            epoch: Uuid::new_v4(),
  64        })
  65    }
  66
  67    pub async fn migrate(
  68        &self,
  69        migrations_path: &Path,
  70        ignore_checksum_mismatch: bool,
  71    ) -> anyhow::Result<Vec<(Migration, Duration)>> {
  72        let migrations = MigrationSource::resolve(migrations_path)
  73            .await
  74            .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
  75
  76        let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
  77
  78        connection.ensure_migrations_table().await?;
  79        let applied_migrations: HashMap<_, _> = connection
  80            .list_applied_migrations()
  81            .await?
  82            .into_iter()
  83            .map(|m| (m.version, m))
  84            .collect();
  85
  86        let mut new_migrations = Vec::new();
  87        for migration in migrations {
  88            match applied_migrations.get(&migration.version) {
  89                Some(applied_migration) => {
  90                    if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
  91                    {
  92                        Err(anyhow!(
  93                            "checksum mismatch for applied migration {}",
  94                            migration.description
  95                        ))?;
  96                    }
  97                }
  98                None => {
  99                    let elapsed = connection.apply(&migration).await?;
 100                    new_migrations.push((migration, elapsed));
 101                }
 102            }
 103        }
 104
 105        Ok(new_migrations)
 106    }
 107
 108    pub async fn clear_stale_data(&self) -> Result<()> {
 109        self.transact(|tx| async {
 110            project_collaborator::Entity::delete_many()
 111                .filter(project_collaborator::Column::ConnectionEpoch.ne(self.epoch))
 112                .exec(&tx)
 113                .await?;
 114            room_participant::Entity::delete_many()
 115                .filter(
 116                    room_participant::Column::AnsweringConnectionEpoch
 117                        .ne(self.epoch)
 118                        .or(room_participant::Column::CallingConnectionEpoch.ne(self.epoch)),
 119                )
 120                .exec(&tx)
 121                .await?;
 122            project::Entity::delete_many()
 123                .filter(project::Column::HostConnectionEpoch.ne(self.epoch))
 124                .exec(&tx)
 125                .await?;
 126            room::Entity::delete_many()
 127                .filter(
 128                    room::Column::Id.not_in_subquery(
 129                        Query::select()
 130                            .column(room_participant::Column::RoomId)
 131                            .from(room_participant::Entity)
 132                            .distinct()
 133                            .to_owned(),
 134                    ),
 135                )
 136                .exec(&tx)
 137                .await?;
 138            tx.commit().await?;
 139            Ok(())
 140        })
 141        .await
 142    }
 143
 144    // users
 145
 146    pub async fn create_user(
 147        &self,
 148        email_address: &str,
 149        admin: bool,
 150        params: NewUserParams,
 151    ) -> Result<NewUserResult> {
 152        self.transact(|tx| async {
 153            let user = user::Entity::insert(user::ActiveModel {
 154                email_address: ActiveValue::set(Some(email_address.into())),
 155                github_login: ActiveValue::set(params.github_login.clone()),
 156                github_user_id: ActiveValue::set(Some(params.github_user_id)),
 157                admin: ActiveValue::set(admin),
 158                metrics_id: ActiveValue::set(Uuid::new_v4()),
 159                ..Default::default()
 160            })
 161            .on_conflict(
 162                OnConflict::column(user::Column::GithubLogin)
 163                    .update_column(user::Column::GithubLogin)
 164                    .to_owned(),
 165            )
 166            .exec_with_returning(&tx)
 167            .await?;
 168
 169            tx.commit().await?;
 170
 171            Ok(NewUserResult {
 172                user_id: user.id,
 173                metrics_id: user.metrics_id.to_string(),
 174                signup_device_id: None,
 175                inviting_user_id: None,
 176            })
 177        })
 178        .await
 179    }
 180
 181    pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
 182        self.transact(|tx| async move { Ok(user::Entity::find_by_id(id).one(&tx).await?) })
 183            .await
 184    }
 185
 186    pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
 187        self.transact(|tx| async {
 188            let tx = tx;
 189            Ok(user::Entity::find()
 190                .filter(user::Column::Id.is_in(ids.iter().copied()))
 191                .all(&tx)
 192                .await?)
 193        })
 194        .await
 195    }
 196
 197    pub async fn get_user_by_github_account(
 198        &self,
 199        github_login: &str,
 200        github_user_id: Option<i32>,
 201    ) -> Result<Option<User>> {
 202        self.transact(|tx| async {
 203            let tx = tx;
 204            if let Some(github_user_id) = github_user_id {
 205                if let Some(user_by_github_user_id) = user::Entity::find()
 206                    .filter(user::Column::GithubUserId.eq(github_user_id))
 207                    .one(&tx)
 208                    .await?
 209                {
 210                    let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
 211                    user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
 212                    Ok(Some(user_by_github_user_id.update(&tx).await?))
 213                } else if let Some(user_by_github_login) = user::Entity::find()
 214                    .filter(user::Column::GithubLogin.eq(github_login))
 215                    .one(&tx)
 216                    .await?
 217                {
 218                    let mut user_by_github_login = user_by_github_login.into_active_model();
 219                    user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
 220                    Ok(Some(user_by_github_login.update(&tx).await?))
 221                } else {
 222                    Ok(None)
 223                }
 224            } else {
 225                Ok(user::Entity::find()
 226                    .filter(user::Column::GithubLogin.eq(github_login))
 227                    .one(&tx)
 228                    .await?)
 229            }
 230        })
 231        .await
 232    }
 233
 234    pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
 235        self.transact(|tx| async move {
 236            Ok(user::Entity::find()
 237                .order_by_asc(user::Column::GithubLogin)
 238                .limit(limit as u64)
 239                .offset(page as u64 * limit as u64)
 240                .all(&tx)
 241                .await?)
 242        })
 243        .await
 244    }
 245
 246    pub async fn get_users_with_no_invites(
 247        &self,
 248        invited_by_another_user: bool,
 249    ) -> Result<Vec<User>> {
 250        self.transact(|tx| async move {
 251            Ok(user::Entity::find()
 252                .filter(
 253                    user::Column::InviteCount
 254                        .eq(0)
 255                        .and(if invited_by_another_user {
 256                            user::Column::InviterId.is_not_null()
 257                        } else {
 258                            user::Column::InviterId.is_null()
 259                        }),
 260                )
 261                .all(&tx)
 262                .await?)
 263        })
 264        .await
 265    }
 266
 267    pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
 268        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 269        enum QueryAs {
 270            MetricsId,
 271        }
 272
 273        self.transact(|tx| async move {
 274            let metrics_id: Uuid = user::Entity::find_by_id(id)
 275                .select_only()
 276                .column(user::Column::MetricsId)
 277                .into_values::<_, QueryAs>()
 278                .one(&tx)
 279                .await?
 280                .ok_or_else(|| anyhow!("could not find user"))?;
 281            Ok(metrics_id.to_string())
 282        })
 283        .await
 284    }
 285
 286    pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
 287        self.transact(|tx| async move {
 288            user::Entity::update_many()
 289                .filter(user::Column::Id.eq(id))
 290                .set(user::ActiveModel {
 291                    admin: ActiveValue::set(is_admin),
 292                    ..Default::default()
 293                })
 294                .exec(&tx)
 295                .await?;
 296            tx.commit().await?;
 297            Ok(())
 298        })
 299        .await
 300    }
 301
 302    pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
 303        self.transact(|tx| async move {
 304            user::Entity::update_many()
 305                .filter(user::Column::Id.eq(id))
 306                .set(user::ActiveModel {
 307                    connected_once: ActiveValue::set(connected_once),
 308                    ..Default::default()
 309                })
 310                .exec(&tx)
 311                .await?;
 312            tx.commit().await?;
 313            Ok(())
 314        })
 315        .await
 316    }
 317
 318    pub async fn destroy_user(&self, id: UserId) -> Result<()> {
 319        self.transact(|tx| async move {
 320            access_token::Entity::delete_many()
 321                .filter(access_token::Column::UserId.eq(id))
 322                .exec(&tx)
 323                .await?;
 324            user::Entity::delete_by_id(id).exec(&tx).await?;
 325            tx.commit().await?;
 326            Ok(())
 327        })
 328        .await
 329    }
 330
 331    // contacts
 332
 333    pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
 334        #[derive(Debug, FromQueryResult)]
 335        struct ContactWithUserBusyStatuses {
 336            user_id_a: UserId,
 337            user_id_b: UserId,
 338            a_to_b: bool,
 339            accepted: bool,
 340            should_notify: bool,
 341            user_a_busy: bool,
 342            user_b_busy: bool,
 343        }
 344
 345        self.transact(|tx| async move {
 346            let user_a_participant = Alias::new("user_a_participant");
 347            let user_b_participant = Alias::new("user_b_participant");
 348            let mut db_contacts = contact::Entity::find()
 349                .column_as(
 350                    Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
 351                        .is_not_null(),
 352                    "user_a_busy",
 353                )
 354                .column_as(
 355                    Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
 356                        .is_not_null(),
 357                    "user_b_busy",
 358                )
 359                .filter(
 360                    contact::Column::UserIdA
 361                        .eq(user_id)
 362                        .or(contact::Column::UserIdB.eq(user_id)),
 363                )
 364                .join_as(
 365                    JoinType::LeftJoin,
 366                    contact::Relation::UserARoomParticipant.def(),
 367                    user_a_participant,
 368                )
 369                .join_as(
 370                    JoinType::LeftJoin,
 371                    contact::Relation::UserBRoomParticipant.def(),
 372                    user_b_participant,
 373                )
 374                .into_model::<ContactWithUserBusyStatuses>()
 375                .stream(&tx)
 376                .await?;
 377
 378            let mut contacts = Vec::new();
 379            while let Some(db_contact) = db_contacts.next().await {
 380                let db_contact = db_contact?;
 381                if db_contact.user_id_a == user_id {
 382                    if db_contact.accepted {
 383                        contacts.push(Contact::Accepted {
 384                            user_id: db_contact.user_id_b,
 385                            should_notify: db_contact.should_notify && db_contact.a_to_b,
 386                            busy: db_contact.user_b_busy,
 387                        });
 388                    } else if db_contact.a_to_b {
 389                        contacts.push(Contact::Outgoing {
 390                            user_id: db_contact.user_id_b,
 391                        })
 392                    } else {
 393                        contacts.push(Contact::Incoming {
 394                            user_id: db_contact.user_id_b,
 395                            should_notify: db_contact.should_notify,
 396                        });
 397                    }
 398                } else if db_contact.accepted {
 399                    contacts.push(Contact::Accepted {
 400                        user_id: db_contact.user_id_a,
 401                        should_notify: db_contact.should_notify && !db_contact.a_to_b,
 402                        busy: db_contact.user_a_busy,
 403                    });
 404                } else if db_contact.a_to_b {
 405                    contacts.push(Contact::Incoming {
 406                        user_id: db_contact.user_id_a,
 407                        should_notify: db_contact.should_notify,
 408                    });
 409                } else {
 410                    contacts.push(Contact::Outgoing {
 411                        user_id: db_contact.user_id_a,
 412                    });
 413                }
 414            }
 415
 416            contacts.sort_unstable_by_key(|contact| contact.user_id());
 417
 418            Ok(contacts)
 419        })
 420        .await
 421    }
 422
 423    pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
 424        self.transact(|tx| async move {
 425            let participant = room_participant::Entity::find()
 426                .filter(room_participant::Column::UserId.eq(user_id))
 427                .one(&tx)
 428                .await?;
 429            Ok(participant.is_some())
 430        })
 431        .await
 432    }
 433
 434    pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
 435        self.transact(|tx| async move {
 436            let (id_a, id_b) = if user_id_1 < user_id_2 {
 437                (user_id_1, user_id_2)
 438            } else {
 439                (user_id_2, user_id_1)
 440            };
 441
 442            Ok(contact::Entity::find()
 443                .filter(
 444                    contact::Column::UserIdA
 445                        .eq(id_a)
 446                        .and(contact::Column::UserIdB.eq(id_b))
 447                        .and(contact::Column::Accepted.eq(true)),
 448                )
 449                .one(&tx)
 450                .await?
 451                .is_some())
 452        })
 453        .await
 454    }
 455
 456    pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
 457        self.transact(|tx| async move {
 458            let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
 459                (sender_id, receiver_id, true)
 460            } else {
 461                (receiver_id, sender_id, false)
 462            };
 463
 464            let rows_affected = contact::Entity::insert(contact::ActiveModel {
 465                user_id_a: ActiveValue::set(id_a),
 466                user_id_b: ActiveValue::set(id_b),
 467                a_to_b: ActiveValue::set(a_to_b),
 468                accepted: ActiveValue::set(false),
 469                should_notify: ActiveValue::set(true),
 470                ..Default::default()
 471            })
 472            .on_conflict(
 473                OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
 474                    .values([
 475                        (contact::Column::Accepted, true.into()),
 476                        (contact::Column::ShouldNotify, false.into()),
 477                    ])
 478                    .action_and_where(
 479                        contact::Column::Accepted.eq(false).and(
 480                            contact::Column::AToB
 481                                .eq(a_to_b)
 482                                .and(contact::Column::UserIdA.eq(id_b))
 483                                .or(contact::Column::AToB
 484                                    .ne(a_to_b)
 485                                    .and(contact::Column::UserIdA.eq(id_a))),
 486                        ),
 487                    )
 488                    .to_owned(),
 489            )
 490            .exec_without_returning(&tx)
 491            .await?;
 492
 493            if rows_affected == 1 {
 494                tx.commit().await?;
 495                Ok(())
 496            } else {
 497                Err(anyhow!("contact already requested"))?
 498            }
 499        })
 500        .await
 501    }
 502
 503    pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<()> {
 504        self.transact(|tx| async move {
 505            let (id_a, id_b) = if responder_id < requester_id {
 506                (responder_id, requester_id)
 507            } else {
 508                (requester_id, responder_id)
 509            };
 510
 511            let result = contact::Entity::delete_many()
 512                .filter(
 513                    contact::Column::UserIdA
 514                        .eq(id_a)
 515                        .and(contact::Column::UserIdB.eq(id_b)),
 516                )
 517                .exec(&tx)
 518                .await?;
 519
 520            if result.rows_affected == 1 {
 521                tx.commit().await?;
 522                Ok(())
 523            } else {
 524                Err(anyhow!("no such contact"))?
 525            }
 526        })
 527        .await
 528    }
 529
 530    pub async fn dismiss_contact_notification(
 531        &self,
 532        user_id: UserId,
 533        contact_user_id: UserId,
 534    ) -> Result<()> {
 535        self.transact(|tx| async move {
 536            let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
 537                (user_id, contact_user_id, true)
 538            } else {
 539                (contact_user_id, user_id, false)
 540            };
 541
 542            let result = contact::Entity::update_many()
 543                .set(contact::ActiveModel {
 544                    should_notify: ActiveValue::set(false),
 545                    ..Default::default()
 546                })
 547                .filter(
 548                    contact::Column::UserIdA
 549                        .eq(id_a)
 550                        .and(contact::Column::UserIdB.eq(id_b))
 551                        .and(
 552                            contact::Column::AToB
 553                                .eq(a_to_b)
 554                                .and(contact::Column::Accepted.eq(true))
 555                                .or(contact::Column::AToB
 556                                    .ne(a_to_b)
 557                                    .and(contact::Column::Accepted.eq(false))),
 558                        ),
 559                )
 560                .exec(&tx)
 561                .await?;
 562            if result.rows_affected == 0 {
 563                Err(anyhow!("no such contact request"))?
 564            } else {
 565                tx.commit().await?;
 566                Ok(())
 567            }
 568        })
 569        .await
 570    }
 571
 572    pub async fn respond_to_contact_request(
 573        &self,
 574        responder_id: UserId,
 575        requester_id: UserId,
 576        accept: bool,
 577    ) -> Result<()> {
 578        self.transact(|tx| async move {
 579            let (id_a, id_b, a_to_b) = if responder_id < requester_id {
 580                (responder_id, requester_id, false)
 581            } else {
 582                (requester_id, responder_id, true)
 583            };
 584            let rows_affected = if accept {
 585                let result = contact::Entity::update_many()
 586                    .set(contact::ActiveModel {
 587                        accepted: ActiveValue::set(true),
 588                        should_notify: ActiveValue::set(true),
 589                        ..Default::default()
 590                    })
 591                    .filter(
 592                        contact::Column::UserIdA
 593                            .eq(id_a)
 594                            .and(contact::Column::UserIdB.eq(id_b))
 595                            .and(contact::Column::AToB.eq(a_to_b)),
 596                    )
 597                    .exec(&tx)
 598                    .await?;
 599                result.rows_affected
 600            } else {
 601                let result = contact::Entity::delete_many()
 602                    .filter(
 603                        contact::Column::UserIdA
 604                            .eq(id_a)
 605                            .and(contact::Column::UserIdB.eq(id_b))
 606                            .and(contact::Column::AToB.eq(a_to_b))
 607                            .and(contact::Column::Accepted.eq(false)),
 608                    )
 609                    .exec(&tx)
 610                    .await?;
 611
 612                result.rows_affected
 613            };
 614
 615            if rows_affected == 1 {
 616                tx.commit().await?;
 617                Ok(())
 618            } else {
 619                Err(anyhow!("no such contact request"))?
 620            }
 621        })
 622        .await
 623    }
 624
 625    pub fn fuzzy_like_string(string: &str) -> String {
 626        let mut result = String::with_capacity(string.len() * 2 + 1);
 627        for c in string.chars() {
 628            if c.is_alphanumeric() {
 629                result.push('%');
 630                result.push(c);
 631            }
 632        }
 633        result.push('%');
 634        result
 635    }
 636
 637    pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
 638        self.transact(|tx| async {
 639            let tx = tx;
 640            let like_string = Self::fuzzy_like_string(name_query);
 641            let query = "
 642                SELECT users.*
 643                FROM users
 644                WHERE github_login ILIKE $1
 645                ORDER BY github_login <-> $2
 646                LIMIT $3
 647            ";
 648
 649            Ok(user::Entity::find()
 650                .from_raw_sql(Statement::from_sql_and_values(
 651                    self.pool.get_database_backend(),
 652                    query.into(),
 653                    vec![like_string.into(), name_query.into(), limit.into()],
 654                ))
 655                .all(&tx)
 656                .await?)
 657        })
 658        .await
 659    }
 660
 661    // signups
 662
 663    pub async fn create_signup(&self, signup: NewSignup) -> Result<()> {
 664        self.transact(|tx| async {
 665            signup::ActiveModel {
 666                email_address: ActiveValue::set(signup.email_address.clone()),
 667                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 668                email_confirmation_sent: ActiveValue::set(false),
 669                platform_mac: ActiveValue::set(signup.platform_mac),
 670                platform_windows: ActiveValue::set(signup.platform_windows),
 671                platform_linux: ActiveValue::set(signup.platform_linux),
 672                platform_unknown: ActiveValue::set(false),
 673                editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
 674                programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
 675                device_id: ActiveValue::set(signup.device_id.clone()),
 676                ..Default::default()
 677            }
 678            .insert(&tx)
 679            .await?;
 680            tx.commit().await?;
 681            Ok(())
 682        })
 683        .await
 684    }
 685
 686    pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
 687        self.transact(|tx| async move {
 688            let query = "
 689                SELECT
 690                    COUNT(*) as count,
 691                    COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
 692                    COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
 693                    COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
 694                    COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
 695                FROM (
 696                    SELECT *
 697                    FROM signups
 698                    WHERE
 699                        NOT email_confirmation_sent
 700                ) AS unsent
 701            ";
 702            Ok(
 703                WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
 704                    self.pool.get_database_backend(),
 705                    query.into(),
 706                    vec![],
 707                ))
 708                .one(&tx)
 709                .await?
 710                .ok_or_else(|| anyhow!("invalid result"))?,
 711            )
 712        })
 713        .await
 714    }
 715
 716    pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
 717        let emails = invites
 718            .iter()
 719            .map(|s| s.email_address.as_str())
 720            .collect::<Vec<_>>();
 721        self.transact(|tx| async {
 722            signup::Entity::update_many()
 723                .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
 724                .set(signup::ActiveModel {
 725                    email_confirmation_sent: ActiveValue::set(true),
 726                    ..Default::default()
 727                })
 728                .exec(&tx)
 729                .await?;
 730            tx.commit().await?;
 731            Ok(())
 732        })
 733        .await
 734    }
 735
 736    pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
 737        self.transact(|tx| async move {
 738            Ok(signup::Entity::find()
 739                .select_only()
 740                .column(signup::Column::EmailAddress)
 741                .column(signup::Column::EmailConfirmationCode)
 742                .filter(
 743                    signup::Column::EmailConfirmationSent.eq(false).and(
 744                        signup::Column::PlatformMac
 745                            .eq(true)
 746                            .or(signup::Column::PlatformUnknown.eq(true)),
 747                    ),
 748                )
 749                .limit(count as u64)
 750                .into_model()
 751                .all(&tx)
 752                .await?)
 753        })
 754        .await
 755    }
 756
 757    // invite codes
 758
 759    pub async fn create_invite_from_code(
 760        &self,
 761        code: &str,
 762        email_address: &str,
 763        device_id: Option<&str>,
 764    ) -> Result<Invite> {
 765        self.transact(|tx| async move {
 766            let existing_user = user::Entity::find()
 767                .filter(user::Column::EmailAddress.eq(email_address))
 768                .one(&tx)
 769                .await?;
 770
 771            if existing_user.is_some() {
 772                Err(anyhow!("email address is already in use"))?;
 773            }
 774
 775            let inviter = match user::Entity::find()
 776                .filter(user::Column::InviteCode.eq(code))
 777                .one(&tx)
 778                .await?
 779            {
 780                Some(inviter) => inviter,
 781                None => {
 782                    return Err(Error::Http(
 783                        StatusCode::NOT_FOUND,
 784                        "invite code not found".to_string(),
 785                    ))?
 786                }
 787            };
 788
 789            if inviter.invite_count == 0 {
 790                Err(Error::Http(
 791                    StatusCode::UNAUTHORIZED,
 792                    "no invites remaining".to_string(),
 793                ))?;
 794            }
 795
 796            let signup = signup::Entity::insert(signup::ActiveModel {
 797                email_address: ActiveValue::set(email_address.into()),
 798                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 799                email_confirmation_sent: ActiveValue::set(false),
 800                inviting_user_id: ActiveValue::set(Some(inviter.id)),
 801                platform_linux: ActiveValue::set(false),
 802                platform_mac: ActiveValue::set(false),
 803                platform_windows: ActiveValue::set(false),
 804                platform_unknown: ActiveValue::set(true),
 805                device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
 806                ..Default::default()
 807            })
 808            .on_conflict(
 809                OnConflict::column(signup::Column::EmailAddress)
 810                    .update_column(signup::Column::InvitingUserId)
 811                    .to_owned(),
 812            )
 813            .exec_with_returning(&tx)
 814            .await?;
 815            tx.commit().await?;
 816
 817            Ok(Invite {
 818                email_address: signup.email_address,
 819                email_confirmation_code: signup.email_confirmation_code,
 820            })
 821        })
 822        .await
 823    }
 824
 825    pub async fn create_user_from_invite(
 826        &self,
 827        invite: &Invite,
 828        user: NewUserParams,
 829    ) -> Result<Option<NewUserResult>> {
 830        self.transact(|tx| async {
 831            let tx = tx;
 832            let signup = signup::Entity::find()
 833                .filter(
 834                    signup::Column::EmailAddress
 835                        .eq(invite.email_address.as_str())
 836                        .and(
 837                            signup::Column::EmailConfirmationCode
 838                                .eq(invite.email_confirmation_code.as_str()),
 839                        ),
 840                )
 841                .one(&tx)
 842                .await?
 843                .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
 844
 845            if signup.user_id.is_some() {
 846                return Ok(None);
 847            }
 848
 849            let user = user::Entity::insert(user::ActiveModel {
 850                email_address: ActiveValue::set(Some(invite.email_address.clone())),
 851                github_login: ActiveValue::set(user.github_login.clone()),
 852                github_user_id: ActiveValue::set(Some(user.github_user_id)),
 853                admin: ActiveValue::set(false),
 854                invite_count: ActiveValue::set(user.invite_count),
 855                invite_code: ActiveValue::set(Some(random_invite_code())),
 856                metrics_id: ActiveValue::set(Uuid::new_v4()),
 857                ..Default::default()
 858            })
 859            .on_conflict(
 860                OnConflict::column(user::Column::GithubLogin)
 861                    .update_columns([
 862                        user::Column::EmailAddress,
 863                        user::Column::GithubUserId,
 864                        user::Column::Admin,
 865                    ])
 866                    .to_owned(),
 867            )
 868            .exec_with_returning(&tx)
 869            .await?;
 870
 871            let mut signup = signup.into_active_model();
 872            signup.user_id = ActiveValue::set(Some(user.id));
 873            let signup = signup.update(&tx).await?;
 874
 875            if let Some(inviting_user_id) = signup.inviting_user_id {
 876                let result = user::Entity::update_many()
 877                    .filter(
 878                        user::Column::Id
 879                            .eq(inviting_user_id)
 880                            .and(user::Column::InviteCount.gt(0)),
 881                    )
 882                    .col_expr(
 883                        user::Column::InviteCount,
 884                        Expr::col(user::Column::InviteCount).sub(1),
 885                    )
 886                    .exec(&tx)
 887                    .await?;
 888
 889                if result.rows_affected == 0 {
 890                    Err(Error::Http(
 891                        StatusCode::UNAUTHORIZED,
 892                        "no invites remaining".to_string(),
 893                    ))?;
 894                }
 895
 896                contact::Entity::insert(contact::ActiveModel {
 897                    user_id_a: ActiveValue::set(inviting_user_id),
 898                    user_id_b: ActiveValue::set(user.id),
 899                    a_to_b: ActiveValue::set(true),
 900                    should_notify: ActiveValue::set(true),
 901                    accepted: ActiveValue::set(true),
 902                    ..Default::default()
 903                })
 904                .on_conflict(OnConflict::new().do_nothing().to_owned())
 905                .exec_without_returning(&tx)
 906                .await?;
 907            }
 908
 909            tx.commit().await?;
 910            Ok(Some(NewUserResult {
 911                user_id: user.id,
 912                metrics_id: user.metrics_id.to_string(),
 913                inviting_user_id: signup.inviting_user_id,
 914                signup_device_id: signup.device_id,
 915            }))
 916        })
 917        .await
 918    }
 919
 920    pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
 921        self.transact(|tx| async move {
 922            if count > 0 {
 923                user::Entity::update_many()
 924                    .filter(
 925                        user::Column::Id
 926                            .eq(id)
 927                            .and(user::Column::InviteCode.is_null()),
 928                    )
 929                    .set(user::ActiveModel {
 930                        invite_code: ActiveValue::set(Some(random_invite_code())),
 931                        ..Default::default()
 932                    })
 933                    .exec(&tx)
 934                    .await?;
 935            }
 936
 937            user::Entity::update_many()
 938                .filter(user::Column::Id.eq(id))
 939                .set(user::ActiveModel {
 940                    invite_count: ActiveValue::set(count),
 941                    ..Default::default()
 942                })
 943                .exec(&tx)
 944                .await?;
 945            tx.commit().await?;
 946            Ok(())
 947        })
 948        .await
 949    }
 950
 951    pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
 952        self.transact(|tx| async move {
 953            match user::Entity::find_by_id(id).one(&tx).await? {
 954                Some(user) if user.invite_code.is_some() => {
 955                    Ok(Some((user.invite_code.unwrap(), user.invite_count)))
 956                }
 957                _ => Ok(None),
 958            }
 959        })
 960        .await
 961    }
 962
 963    pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
 964        self.transact(|tx| async move {
 965            user::Entity::find()
 966                .filter(user::Column::InviteCode.eq(code))
 967                .one(&tx)
 968                .await?
 969                .ok_or_else(|| {
 970                    Error::Http(
 971                        StatusCode::NOT_FOUND,
 972                        "that invite code does not exist".to_string(),
 973                    )
 974                })
 975        })
 976        .await
 977    }
 978
 979    // rooms
 980
 981    pub async fn incoming_call_for_user(
 982        &self,
 983        user_id: UserId,
 984    ) -> Result<Option<proto::IncomingCall>> {
 985        self.transact(|tx| async move {
 986            let pending_participant = room_participant::Entity::find()
 987                .filter(
 988                    room_participant::Column::UserId
 989                        .eq(user_id)
 990                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
 991                )
 992                .one(&tx)
 993                .await?;
 994
 995            if let Some(pending_participant) = pending_participant {
 996                let room = self.get_room(pending_participant.room_id, &tx).await?;
 997                Ok(Self::build_incoming_call(&room, user_id))
 998            } else {
 999                Ok(None)
1000            }
1001        })
1002        .await
1003    }
1004
1005    pub async fn create_room(
1006        &self,
1007        user_id: UserId,
1008        connection_id: ConnectionId,
1009        live_kit_room: &str,
1010    ) -> Result<RoomGuard<proto::Room>> {
1011        self.transact(|tx| async move {
1012            let room = room::ActiveModel {
1013                live_kit_room: ActiveValue::set(live_kit_room.into()),
1014                ..Default::default()
1015            }
1016            .insert(&tx)
1017            .await?;
1018            let room_id = room.id;
1019
1020            room_participant::ActiveModel {
1021                room_id: ActiveValue::set(room_id),
1022                user_id: ActiveValue::set(user_id),
1023                answering_connection_id: ActiveValue::set(Some(connection_id.0 as i32)),
1024                answering_connection_epoch: ActiveValue::set(Some(self.epoch)),
1025                calling_user_id: ActiveValue::set(user_id),
1026                calling_connection_id: ActiveValue::set(connection_id.0 as i32),
1027                calling_connection_epoch: ActiveValue::set(self.epoch),
1028                ..Default::default()
1029            }
1030            .insert(&tx)
1031            .await?;
1032
1033            let room = self.get_room(room_id, &tx).await?;
1034            self.commit_room_transaction(room_id, tx, room).await
1035        })
1036        .await
1037    }
1038
1039    pub async fn call(
1040        &self,
1041        room_id: RoomId,
1042        calling_user_id: UserId,
1043        calling_connection_id: ConnectionId,
1044        called_user_id: UserId,
1045        initial_project_id: Option<ProjectId>,
1046    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1047        self.transact(|tx| async move {
1048            room_participant::ActiveModel {
1049                room_id: ActiveValue::set(room_id),
1050                user_id: ActiveValue::set(called_user_id),
1051                calling_user_id: ActiveValue::set(calling_user_id),
1052                calling_connection_id: ActiveValue::set(calling_connection_id.0 as i32),
1053                calling_connection_epoch: ActiveValue::set(self.epoch),
1054                initial_project_id: ActiveValue::set(initial_project_id),
1055                ..Default::default()
1056            }
1057            .insert(&tx)
1058            .await?;
1059
1060            let room = self.get_room(room_id, &tx).await?;
1061            let incoming_call = Self::build_incoming_call(&room, called_user_id)
1062                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1063            self.commit_room_transaction(room_id, tx, (room, incoming_call))
1064                .await
1065        })
1066        .await
1067    }
1068
1069    pub async fn call_failed(
1070        &self,
1071        room_id: RoomId,
1072        called_user_id: UserId,
1073    ) -> Result<RoomGuard<proto::Room>> {
1074        self.transact(|tx| async move {
1075            room_participant::Entity::delete_many()
1076                .filter(
1077                    room_participant::Column::RoomId
1078                        .eq(room_id)
1079                        .and(room_participant::Column::UserId.eq(called_user_id)),
1080                )
1081                .exec(&tx)
1082                .await?;
1083            let room = self.get_room(room_id, &tx).await?;
1084            self.commit_room_transaction(room_id, tx, room).await
1085        })
1086        .await
1087    }
1088
1089    pub async fn decline_call(
1090        &self,
1091        expected_room_id: Option<RoomId>,
1092        user_id: UserId,
1093    ) -> Result<RoomGuard<proto::Room>> {
1094        self.transact(|tx| async move {
1095            let participant = room_participant::Entity::find()
1096                .filter(
1097                    room_participant::Column::UserId
1098                        .eq(user_id)
1099                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1100                )
1101                .one(&tx)
1102                .await?
1103                .ok_or_else(|| anyhow!("could not decline call"))?;
1104            let room_id = participant.room_id;
1105
1106            if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) {
1107                return Err(anyhow!("declining call on unexpected room"))?;
1108            }
1109
1110            room_participant::Entity::delete(participant.into_active_model())
1111                .exec(&tx)
1112                .await?;
1113
1114            let room = self.get_room(room_id, &tx).await?;
1115            self.commit_room_transaction(room_id, tx, room).await
1116        })
1117        .await
1118    }
1119
1120    pub async fn cancel_call(
1121        &self,
1122        expected_room_id: Option<RoomId>,
1123        calling_connection_id: ConnectionId,
1124        called_user_id: UserId,
1125    ) -> Result<RoomGuard<proto::Room>> {
1126        self.transact(|tx| async move {
1127            let participant = room_participant::Entity::find()
1128                .filter(
1129                    room_participant::Column::UserId
1130                        .eq(called_user_id)
1131                        .and(
1132                            room_participant::Column::CallingConnectionId
1133                                .eq(calling_connection_id.0 as i32),
1134                        )
1135                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1136                )
1137                .one(&tx)
1138                .await?
1139                .ok_or_else(|| anyhow!("could not cancel call"))?;
1140            let room_id = participant.room_id;
1141            if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) {
1142                return Err(anyhow!("canceling call on unexpected room"))?;
1143            }
1144
1145            room_participant::Entity::delete(participant.into_active_model())
1146                .exec(&tx)
1147                .await?;
1148
1149            let room = self.get_room(room_id, &tx).await?;
1150            self.commit_room_transaction(room_id, tx, room).await
1151        })
1152        .await
1153    }
1154
1155    pub async fn join_room(
1156        &self,
1157        room_id: RoomId,
1158        user_id: UserId,
1159        connection_id: ConnectionId,
1160    ) -> Result<RoomGuard<proto::Room>> {
1161        self.transact(|tx| async move {
1162            let result = room_participant::Entity::update_many()
1163                .filter(
1164                    room_participant::Column::RoomId
1165                        .eq(room_id)
1166                        .and(room_participant::Column::UserId.eq(user_id))
1167                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1168                )
1169                .set(room_participant::ActiveModel {
1170                    answering_connection_id: ActiveValue::set(Some(connection_id.0 as i32)),
1171                    answering_connection_epoch: ActiveValue::set(Some(self.epoch)),
1172                    ..Default::default()
1173                })
1174                .exec(&tx)
1175                .await?;
1176            if result.rows_affected == 0 {
1177                Err(anyhow!("room does not exist or was already joined"))?
1178            } else {
1179                let room = self.get_room(room_id, &tx).await?;
1180                self.commit_room_transaction(room_id, tx, room).await
1181            }
1182        })
1183        .await
1184    }
1185
1186    pub async fn leave_room(
1187        &self,
1188        connection_id: ConnectionId,
1189    ) -> Result<Option<RoomGuard<LeftRoom>>> {
1190        self.transact(|tx| async move {
1191            let leaving_participant = room_participant::Entity::find()
1192                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0))
1193                .one(&tx)
1194                .await?;
1195
1196            if let Some(leaving_participant) = leaving_participant {
1197                // Leave room.
1198                let room_id = leaving_participant.room_id;
1199                room_participant::Entity::delete_by_id(leaving_participant.id)
1200                    .exec(&tx)
1201                    .await?;
1202
1203                // Cancel pending calls initiated by the leaving user.
1204                let called_participants = room_participant::Entity::find()
1205                    .filter(
1206                        room_participant::Column::CallingConnectionId
1207                            .eq(connection_id.0)
1208                            .and(room_participant::Column::AnsweringConnectionId.is_null()),
1209                    )
1210                    .all(&tx)
1211                    .await?;
1212                room_participant::Entity::delete_many()
1213                    .filter(
1214                        room_participant::Column::Id
1215                            .is_in(called_participants.iter().map(|participant| participant.id)),
1216                    )
1217                    .exec(&tx)
1218                    .await?;
1219                let canceled_calls_to_user_ids = called_participants
1220                    .into_iter()
1221                    .map(|participant| participant.user_id)
1222                    .collect();
1223
1224                // Detect left projects.
1225                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1226                enum QueryProjectIds {
1227                    ProjectId,
1228                }
1229                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1230                    .select_only()
1231                    .column_as(
1232                        project_collaborator::Column::ProjectId,
1233                        QueryProjectIds::ProjectId,
1234                    )
1235                    .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0))
1236                    .into_values::<_, QueryProjectIds>()
1237                    .all(&tx)
1238                    .await?;
1239                let mut left_projects = HashMap::default();
1240                let mut collaborators = project_collaborator::Entity::find()
1241                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1242                    .stream(&tx)
1243                    .await?;
1244                while let Some(collaborator) = collaborators.next().await {
1245                    let collaborator = collaborator?;
1246                    let left_project =
1247                        left_projects
1248                            .entry(collaborator.project_id)
1249                            .or_insert(LeftProject {
1250                                id: collaborator.project_id,
1251                                host_user_id: Default::default(),
1252                                connection_ids: Default::default(),
1253                                host_connection_id: Default::default(),
1254                            });
1255
1256                    let collaborator_connection_id =
1257                        ConnectionId(collaborator.connection_id as u32);
1258                    if collaborator_connection_id != connection_id {
1259                        left_project.connection_ids.push(collaborator_connection_id);
1260                    }
1261
1262                    if collaborator.is_host {
1263                        left_project.host_user_id = collaborator.user_id;
1264                        left_project.host_connection_id =
1265                            ConnectionId(collaborator.connection_id as u32);
1266                    }
1267                }
1268                drop(collaborators);
1269
1270                // Leave projects.
1271                project_collaborator::Entity::delete_many()
1272                    .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0))
1273                    .exec(&tx)
1274                    .await?;
1275
1276                // Unshare projects.
1277                project::Entity::delete_many()
1278                    .filter(
1279                        project::Column::RoomId
1280                            .eq(room_id)
1281                            .and(project::Column::HostConnectionId.eq(connection_id.0)),
1282                    )
1283                    .exec(&tx)
1284                    .await?;
1285
1286                let room = self.get_room(room_id, &tx).await?;
1287                if room.participants.is_empty() {
1288                    room::Entity::delete_by_id(room_id).exec(&tx).await?;
1289                }
1290
1291                let left_room = self
1292                    .commit_room_transaction(
1293                        room_id,
1294                        tx,
1295                        LeftRoom {
1296                            room,
1297                            left_projects,
1298                            canceled_calls_to_user_ids,
1299                        },
1300                    )
1301                    .await?;
1302
1303                if left_room.room.participants.is_empty() {
1304                    self.rooms.remove(&room_id);
1305                }
1306
1307                Ok(Some(left_room))
1308            } else {
1309                Ok(None)
1310            }
1311        })
1312        .await
1313    }
1314
1315    pub async fn update_room_participant_location(
1316        &self,
1317        room_id: RoomId,
1318        connection_id: ConnectionId,
1319        location: proto::ParticipantLocation,
1320    ) -> Result<RoomGuard<proto::Room>> {
1321        self.transact(|tx| async {
1322            let mut tx = tx;
1323            let location_kind;
1324            let location_project_id;
1325            match location
1326                .variant
1327                .as_ref()
1328                .ok_or_else(|| anyhow!("invalid location"))?
1329            {
1330                proto::participant_location::Variant::SharedProject(project) => {
1331                    location_kind = 0;
1332                    location_project_id = Some(ProjectId::from_proto(project.id));
1333                }
1334                proto::participant_location::Variant::UnsharedProject(_) => {
1335                    location_kind = 1;
1336                    location_project_id = None;
1337                }
1338                proto::participant_location::Variant::External(_) => {
1339                    location_kind = 2;
1340                    location_project_id = None;
1341                }
1342            }
1343
1344            let result = room_participant::Entity::update_many()
1345                .filter(
1346                    room_participant::Column::RoomId
1347                        .eq(room_id)
1348                        .and(room_participant::Column::AnsweringConnectionId.eq(connection_id.0)),
1349                )
1350                .set(room_participant::ActiveModel {
1351                    location_kind: ActiveValue::set(Some(location_kind)),
1352                    location_project_id: ActiveValue::set(location_project_id),
1353                    ..Default::default()
1354                })
1355                .exec(&tx)
1356                .await?;
1357
1358            if result.rows_affected == 1 {
1359                let room = self.get_room(room_id, &mut tx).await?;
1360                self.commit_room_transaction(room_id, tx, room).await
1361            } else {
1362                Err(anyhow!("could not update room participant location"))?
1363            }
1364        })
1365        .await
1366    }
1367
1368    fn build_incoming_call(
1369        room: &proto::Room,
1370        called_user_id: UserId,
1371    ) -> Option<proto::IncomingCall> {
1372        let pending_participant = room
1373            .pending_participants
1374            .iter()
1375            .find(|participant| participant.user_id == called_user_id.to_proto())?;
1376
1377        Some(proto::IncomingCall {
1378            room_id: room.id,
1379            calling_user_id: pending_participant.calling_user_id,
1380            participant_user_ids: room
1381                .participants
1382                .iter()
1383                .map(|participant| participant.user_id)
1384                .collect(),
1385            initial_project: room.participants.iter().find_map(|participant| {
1386                let initial_project_id = pending_participant.initial_project_id?;
1387                participant
1388                    .projects
1389                    .iter()
1390                    .find(|project| project.id == initial_project_id)
1391                    .cloned()
1392            }),
1393        })
1394    }
1395
1396    async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1397        let db_room = room::Entity::find_by_id(room_id)
1398            .one(tx)
1399            .await?
1400            .ok_or_else(|| anyhow!("could not find room"))?;
1401
1402        let mut db_participants = db_room
1403            .find_related(room_participant::Entity)
1404            .stream(tx)
1405            .await?;
1406        let mut participants = HashMap::default();
1407        let mut pending_participants = Vec::new();
1408        while let Some(db_participant) = db_participants.next().await {
1409            let db_participant = db_participant?;
1410            if let Some(answering_connection_id) = db_participant.answering_connection_id {
1411                let location = match (
1412                    db_participant.location_kind,
1413                    db_participant.location_project_id,
1414                ) {
1415                    (Some(0), Some(project_id)) => {
1416                        Some(proto::participant_location::Variant::SharedProject(
1417                            proto::participant_location::SharedProject {
1418                                id: project_id.to_proto(),
1419                            },
1420                        ))
1421                    }
1422                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1423                        Default::default(),
1424                    )),
1425                    _ => Some(proto::participant_location::Variant::External(
1426                        Default::default(),
1427                    )),
1428                };
1429                participants.insert(
1430                    answering_connection_id,
1431                    proto::Participant {
1432                        user_id: db_participant.user_id.to_proto(),
1433                        peer_id: answering_connection_id as u32,
1434                        projects: Default::default(),
1435                        location: Some(proto::ParticipantLocation { variant: location }),
1436                    },
1437                );
1438            } else {
1439                pending_participants.push(proto::PendingParticipant {
1440                    user_id: db_participant.user_id.to_proto(),
1441                    calling_user_id: db_participant.calling_user_id.to_proto(),
1442                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1443                });
1444            }
1445        }
1446        drop(db_participants);
1447
1448        let mut db_projects = db_room
1449            .find_related(project::Entity)
1450            .find_with_related(worktree::Entity)
1451            .stream(tx)
1452            .await?;
1453
1454        while let Some(row) = db_projects.next().await {
1455            let (db_project, db_worktree) = row?;
1456            if let Some(participant) = participants.get_mut(&db_project.host_connection_id) {
1457                let project = if let Some(project) = participant
1458                    .projects
1459                    .iter_mut()
1460                    .find(|project| project.id == db_project.id.to_proto())
1461                {
1462                    project
1463                } else {
1464                    participant.projects.push(proto::ParticipantProject {
1465                        id: db_project.id.to_proto(),
1466                        worktree_root_names: Default::default(),
1467                    });
1468                    participant.projects.last_mut().unwrap()
1469                };
1470
1471                if let Some(db_worktree) = db_worktree {
1472                    project.worktree_root_names.push(db_worktree.root_name);
1473                }
1474            }
1475        }
1476
1477        Ok(proto::Room {
1478            id: db_room.id.to_proto(),
1479            live_kit_room: db_room.live_kit_room,
1480            participants: participants.into_values().collect(),
1481            pending_participants,
1482        })
1483    }
1484
1485    async fn commit_room_transaction<T>(
1486        &self,
1487        room_id: RoomId,
1488        tx: DatabaseTransaction,
1489        data: T,
1490    ) -> Result<RoomGuard<T>> {
1491        let lock = self.rooms.entry(room_id).or_default().clone();
1492        let _guard = lock.lock_owned().await;
1493        tx.commit().await?;
1494        Ok(RoomGuard {
1495            data,
1496            _guard,
1497            _not_send: PhantomData,
1498        })
1499    }
1500
1501    // projects
1502
1503    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
1504        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1505        enum QueryAs {
1506            Count,
1507        }
1508
1509        self.transact(|tx| async move {
1510            Ok(project::Entity::find()
1511                .select_only()
1512                .column_as(project::Column::Id.count(), QueryAs::Count)
1513                .inner_join(user::Entity)
1514                .filter(user::Column::Admin.eq(false))
1515                .into_values::<_, QueryAs>()
1516                .one(&tx)
1517                .await?
1518                .unwrap_or(0) as usize)
1519        })
1520        .await
1521    }
1522
1523    pub async fn share_project(
1524        &self,
1525        room_id: RoomId,
1526        connection_id: ConnectionId,
1527        worktrees: &[proto::WorktreeMetadata],
1528    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
1529        self.transact(|tx| async move {
1530            let participant = room_participant::Entity::find()
1531                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0))
1532                .one(&tx)
1533                .await?
1534                .ok_or_else(|| anyhow!("could not find participant"))?;
1535            if participant.room_id != room_id {
1536                return Err(anyhow!("shared project on unexpected room"))?;
1537            }
1538
1539            let project = project::ActiveModel {
1540                room_id: ActiveValue::set(participant.room_id),
1541                host_user_id: ActiveValue::set(participant.user_id),
1542                host_connection_id: ActiveValue::set(connection_id.0 as i32),
1543                host_connection_epoch: ActiveValue::set(self.epoch),
1544                ..Default::default()
1545            }
1546            .insert(&tx)
1547            .await?;
1548
1549            if !worktrees.is_empty() {
1550                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
1551                    worktree::ActiveModel {
1552                        id: ActiveValue::set(worktree.id as i64),
1553                        project_id: ActiveValue::set(project.id),
1554                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
1555                        root_name: ActiveValue::set(worktree.root_name.clone()),
1556                        visible: ActiveValue::set(worktree.visible),
1557                        scan_id: ActiveValue::set(0),
1558                        is_complete: ActiveValue::set(false),
1559                    }
1560                }))
1561                .exec(&tx)
1562                .await?;
1563            }
1564
1565            project_collaborator::ActiveModel {
1566                project_id: ActiveValue::set(project.id),
1567                connection_id: ActiveValue::set(connection_id.0 as i32),
1568                connection_epoch: ActiveValue::set(self.epoch),
1569                user_id: ActiveValue::set(participant.user_id),
1570                replica_id: ActiveValue::set(ReplicaId(0)),
1571                is_host: ActiveValue::set(true),
1572                ..Default::default()
1573            }
1574            .insert(&tx)
1575            .await?;
1576
1577            let room = self.get_room(room_id, &tx).await?;
1578            self.commit_room_transaction(room_id, tx, (project.id, room))
1579                .await
1580        })
1581        .await
1582    }
1583
1584    pub async fn unshare_project(
1585        &self,
1586        project_id: ProjectId,
1587        connection_id: ConnectionId,
1588    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
1589        self.transact(|tx| async move {
1590            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
1591
1592            let project = project::Entity::find_by_id(project_id)
1593                .one(&tx)
1594                .await?
1595                .ok_or_else(|| anyhow!("project not found"))?;
1596            if project.host_connection_id == connection_id.0 as i32 {
1597                let room_id = project.room_id;
1598                project::Entity::delete(project.into_active_model())
1599                    .exec(&tx)
1600                    .await?;
1601                let room = self.get_room(room_id, &tx).await?;
1602                self.commit_room_transaction(room_id, tx, (room, guest_connection_ids))
1603                    .await
1604            } else {
1605                Err(anyhow!("cannot unshare a project hosted by another user"))?
1606            }
1607        })
1608        .await
1609    }
1610
1611    pub async fn update_project(
1612        &self,
1613        project_id: ProjectId,
1614        connection_id: ConnectionId,
1615        worktrees: &[proto::WorktreeMetadata],
1616    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
1617        self.transact(|tx| async move {
1618            let project = project::Entity::find_by_id(project_id)
1619                .filter(project::Column::HostConnectionId.eq(connection_id.0))
1620                .one(&tx)
1621                .await?
1622                .ok_or_else(|| anyhow!("no such project"))?;
1623
1624            if !worktrees.is_empty() {
1625                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
1626                    worktree::ActiveModel {
1627                        id: ActiveValue::set(worktree.id as i64),
1628                        project_id: ActiveValue::set(project.id),
1629                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
1630                        root_name: ActiveValue::set(worktree.root_name.clone()),
1631                        visible: ActiveValue::set(worktree.visible),
1632                        scan_id: ActiveValue::set(0),
1633                        is_complete: ActiveValue::set(false),
1634                    }
1635                }))
1636                .on_conflict(
1637                    OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
1638                        .update_column(worktree::Column::RootName)
1639                        .to_owned(),
1640                )
1641                .exec(&tx)
1642                .await?;
1643            }
1644
1645            worktree::Entity::delete_many()
1646                .filter(
1647                    worktree::Column::ProjectId.eq(project.id).and(
1648                        worktree::Column::Id
1649                            .is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
1650                    ),
1651                )
1652                .exec(&tx)
1653                .await?;
1654
1655            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
1656            let room = self.get_room(project.room_id, &tx).await?;
1657            self.commit_room_transaction(project.room_id, tx, (room, guest_connection_ids))
1658                .await
1659        })
1660        .await
1661    }
1662
1663    pub async fn update_worktree(
1664        &self,
1665        update: &proto::UpdateWorktree,
1666        connection_id: ConnectionId,
1667    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
1668        self.transact(|tx| async move {
1669            let project_id = ProjectId::from_proto(update.project_id);
1670            let worktree_id = update.worktree_id as i64;
1671
1672            // Ensure the update comes from the host.
1673            let project = project::Entity::find_by_id(project_id)
1674                .filter(project::Column::HostConnectionId.eq(connection_id.0))
1675                .one(&tx)
1676                .await?
1677                .ok_or_else(|| anyhow!("no such project"))?;
1678            let room_id = project.room_id;
1679
1680            // Update metadata.
1681            worktree::Entity::update(worktree::ActiveModel {
1682                id: ActiveValue::set(worktree_id),
1683                project_id: ActiveValue::set(project_id),
1684                root_name: ActiveValue::set(update.root_name.clone()),
1685                scan_id: ActiveValue::set(update.scan_id as i64),
1686                is_complete: ActiveValue::set(update.is_last_update),
1687                abs_path: ActiveValue::set(update.abs_path.clone()),
1688                ..Default::default()
1689            })
1690            .exec(&tx)
1691            .await?;
1692
1693            if !update.updated_entries.is_empty() {
1694                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
1695                    let mtime = entry.mtime.clone().unwrap_or_default();
1696                    worktree_entry::ActiveModel {
1697                        project_id: ActiveValue::set(project_id),
1698                        worktree_id: ActiveValue::set(worktree_id),
1699                        id: ActiveValue::set(entry.id as i64),
1700                        is_dir: ActiveValue::set(entry.is_dir),
1701                        path: ActiveValue::set(entry.path.clone()),
1702                        inode: ActiveValue::set(entry.inode as i64),
1703                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
1704                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
1705                        is_symlink: ActiveValue::set(entry.is_symlink),
1706                        is_ignored: ActiveValue::set(entry.is_ignored),
1707                    }
1708                }))
1709                .on_conflict(
1710                    OnConflict::columns([
1711                        worktree_entry::Column::ProjectId,
1712                        worktree_entry::Column::WorktreeId,
1713                        worktree_entry::Column::Id,
1714                    ])
1715                    .update_columns([
1716                        worktree_entry::Column::IsDir,
1717                        worktree_entry::Column::Path,
1718                        worktree_entry::Column::Inode,
1719                        worktree_entry::Column::MtimeSeconds,
1720                        worktree_entry::Column::MtimeNanos,
1721                        worktree_entry::Column::IsSymlink,
1722                        worktree_entry::Column::IsIgnored,
1723                    ])
1724                    .to_owned(),
1725                )
1726                .exec(&tx)
1727                .await?;
1728            }
1729
1730            if !update.removed_entries.is_empty() {
1731                worktree_entry::Entity::delete_many()
1732                    .filter(
1733                        worktree_entry::Column::ProjectId
1734                            .eq(project_id)
1735                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
1736                            .and(
1737                                worktree_entry::Column::Id
1738                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
1739                            ),
1740                    )
1741                    .exec(&tx)
1742                    .await?;
1743            }
1744
1745            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
1746            self.commit_room_transaction(room_id, tx, connection_ids)
1747                .await
1748        })
1749        .await
1750    }
1751
1752    pub async fn update_diagnostic_summary(
1753        &self,
1754        update: &proto::UpdateDiagnosticSummary,
1755        connection_id: ConnectionId,
1756    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
1757        self.transact(|tx| async {
1758            let project_id = ProjectId::from_proto(update.project_id);
1759            let worktree_id = update.worktree_id as i64;
1760            let summary = update
1761                .summary
1762                .as_ref()
1763                .ok_or_else(|| anyhow!("invalid summary"))?;
1764
1765            // Ensure the update comes from the host.
1766            let project = project::Entity::find_by_id(project_id)
1767                .one(&tx)
1768                .await?
1769                .ok_or_else(|| anyhow!("no such project"))?;
1770            if project.host_connection_id != connection_id.0 as i32 {
1771                return Err(anyhow!("can't update a project hosted by someone else"))?;
1772            }
1773
1774            // Update summary.
1775            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
1776                project_id: ActiveValue::set(project_id),
1777                worktree_id: ActiveValue::set(worktree_id),
1778                path: ActiveValue::set(summary.path.clone()),
1779                language_server_id: ActiveValue::set(summary.language_server_id as i64),
1780                error_count: ActiveValue::set(summary.error_count as i32),
1781                warning_count: ActiveValue::set(summary.warning_count as i32),
1782                ..Default::default()
1783            })
1784            .on_conflict(
1785                OnConflict::columns([
1786                    worktree_diagnostic_summary::Column::ProjectId,
1787                    worktree_diagnostic_summary::Column::WorktreeId,
1788                    worktree_diagnostic_summary::Column::Path,
1789                ])
1790                .update_columns([
1791                    worktree_diagnostic_summary::Column::LanguageServerId,
1792                    worktree_diagnostic_summary::Column::ErrorCount,
1793                    worktree_diagnostic_summary::Column::WarningCount,
1794                ])
1795                .to_owned(),
1796            )
1797            .exec(&tx)
1798            .await?;
1799
1800            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
1801            self.commit_room_transaction(project.room_id, tx, connection_ids)
1802                .await
1803        })
1804        .await
1805    }
1806
1807    pub async fn start_language_server(
1808        &self,
1809        update: &proto::StartLanguageServer,
1810        connection_id: ConnectionId,
1811    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
1812        self.transact(|tx| async {
1813            let project_id = ProjectId::from_proto(update.project_id);
1814            let server = update
1815                .server
1816                .as_ref()
1817                .ok_or_else(|| anyhow!("invalid language server"))?;
1818
1819            // Ensure the update comes from the host.
1820            let project = project::Entity::find_by_id(project_id)
1821                .one(&tx)
1822                .await?
1823                .ok_or_else(|| anyhow!("no such project"))?;
1824            if project.host_connection_id != connection_id.0 as i32 {
1825                return Err(anyhow!("can't update a project hosted by someone else"))?;
1826            }
1827
1828            // Add the newly-started language server.
1829            language_server::Entity::insert(language_server::ActiveModel {
1830                project_id: ActiveValue::set(project_id),
1831                id: ActiveValue::set(server.id as i64),
1832                name: ActiveValue::set(server.name.clone()),
1833                ..Default::default()
1834            })
1835            .on_conflict(
1836                OnConflict::columns([
1837                    language_server::Column::ProjectId,
1838                    language_server::Column::Id,
1839                ])
1840                .update_column(language_server::Column::Name)
1841                .to_owned(),
1842            )
1843            .exec(&tx)
1844            .await?;
1845
1846            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
1847            self.commit_room_transaction(project.room_id, tx, connection_ids)
1848                .await
1849        })
1850        .await
1851    }
1852
1853    pub async fn join_project(
1854        &self,
1855        project_id: ProjectId,
1856        connection_id: ConnectionId,
1857    ) -> Result<RoomGuard<(Project, ReplicaId)>> {
1858        self.transact(|tx| async move {
1859            let participant = room_participant::Entity::find()
1860                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0))
1861                .one(&tx)
1862                .await?
1863                .ok_or_else(|| anyhow!("must join a room first"))?;
1864
1865            let project = project::Entity::find_by_id(project_id)
1866                .one(&tx)
1867                .await?
1868                .ok_or_else(|| anyhow!("no such project"))?;
1869            if project.room_id != participant.room_id {
1870                return Err(anyhow!("no such project"))?;
1871            }
1872
1873            let mut collaborators = project
1874                .find_related(project_collaborator::Entity)
1875                .all(&tx)
1876                .await?;
1877            let replica_ids = collaborators
1878                .iter()
1879                .map(|c| c.replica_id)
1880                .collect::<HashSet<_>>();
1881            let mut replica_id = ReplicaId(1);
1882            while replica_ids.contains(&replica_id) {
1883                replica_id.0 += 1;
1884            }
1885            let new_collaborator = project_collaborator::ActiveModel {
1886                project_id: ActiveValue::set(project_id),
1887                connection_id: ActiveValue::set(connection_id.0 as i32),
1888                connection_epoch: ActiveValue::set(self.epoch),
1889                user_id: ActiveValue::set(participant.user_id),
1890                replica_id: ActiveValue::set(replica_id),
1891                is_host: ActiveValue::set(false),
1892                ..Default::default()
1893            }
1894            .insert(&tx)
1895            .await?;
1896            collaborators.push(new_collaborator);
1897
1898            let db_worktrees = project.find_related(worktree::Entity).all(&tx).await?;
1899            let mut worktrees = db_worktrees
1900                .into_iter()
1901                .map(|db_worktree| {
1902                    (
1903                        db_worktree.id as u64,
1904                        Worktree {
1905                            id: db_worktree.id as u64,
1906                            abs_path: db_worktree.abs_path,
1907                            root_name: db_worktree.root_name,
1908                            visible: db_worktree.visible,
1909                            entries: Default::default(),
1910                            diagnostic_summaries: Default::default(),
1911                            scan_id: db_worktree.scan_id as u64,
1912                            is_complete: db_worktree.is_complete,
1913                        },
1914                    )
1915                })
1916                .collect::<BTreeMap<_, _>>();
1917
1918            // Populate worktree entries.
1919            {
1920                let mut db_entries = worktree_entry::Entity::find()
1921                    .filter(worktree_entry::Column::ProjectId.eq(project_id))
1922                    .stream(&tx)
1923                    .await?;
1924                while let Some(db_entry) = db_entries.next().await {
1925                    let db_entry = db_entry?;
1926                    if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
1927                        worktree.entries.push(proto::Entry {
1928                            id: db_entry.id as u64,
1929                            is_dir: db_entry.is_dir,
1930                            path: db_entry.path,
1931                            inode: db_entry.inode as u64,
1932                            mtime: Some(proto::Timestamp {
1933                                seconds: db_entry.mtime_seconds as u64,
1934                                nanos: db_entry.mtime_nanos as u32,
1935                            }),
1936                            is_symlink: db_entry.is_symlink,
1937                            is_ignored: db_entry.is_ignored,
1938                        });
1939                    }
1940                }
1941            }
1942
1943            // Populate worktree diagnostic summaries.
1944            {
1945                let mut db_summaries = worktree_diagnostic_summary::Entity::find()
1946                    .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
1947                    .stream(&tx)
1948                    .await?;
1949                while let Some(db_summary) = db_summaries.next().await {
1950                    let db_summary = db_summary?;
1951                    if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
1952                        worktree
1953                            .diagnostic_summaries
1954                            .push(proto::DiagnosticSummary {
1955                                path: db_summary.path,
1956                                language_server_id: db_summary.language_server_id as u64,
1957                                error_count: db_summary.error_count as u32,
1958                                warning_count: db_summary.warning_count as u32,
1959                            });
1960                    }
1961                }
1962            }
1963
1964            // Populate language servers.
1965            let language_servers = project
1966                .find_related(language_server::Entity)
1967                .all(&tx)
1968                .await?;
1969
1970            self.commit_room_transaction(
1971                project.room_id,
1972                tx,
1973                (
1974                    Project {
1975                        collaborators,
1976                        worktrees,
1977                        language_servers: language_servers
1978                            .into_iter()
1979                            .map(|language_server| proto::LanguageServer {
1980                                id: language_server.id as u64,
1981                                name: language_server.name,
1982                            })
1983                            .collect(),
1984                    },
1985                    replica_id as ReplicaId,
1986                ),
1987            )
1988            .await
1989        })
1990        .await
1991    }
1992
1993    pub async fn leave_project(
1994        &self,
1995        project_id: ProjectId,
1996        connection_id: ConnectionId,
1997    ) -> Result<RoomGuard<LeftProject>> {
1998        self.transact(|tx| async move {
1999            let result = project_collaborator::Entity::delete_many()
2000                .filter(
2001                    project_collaborator::Column::ProjectId
2002                        .eq(project_id)
2003                        .and(project_collaborator::Column::ConnectionId.eq(connection_id.0)),
2004                )
2005                .exec(&tx)
2006                .await?;
2007            if result.rows_affected == 0 {
2008                Err(anyhow!("not a collaborator on this project"))?;
2009            }
2010
2011            let project = project::Entity::find_by_id(project_id)
2012                .one(&tx)
2013                .await?
2014                .ok_or_else(|| anyhow!("no such project"))?;
2015            let collaborators = project
2016                .find_related(project_collaborator::Entity)
2017                .all(&tx)
2018                .await?;
2019            let connection_ids = collaborators
2020                .into_iter()
2021                .map(|collaborator| ConnectionId(collaborator.connection_id as u32))
2022                .collect();
2023
2024            self.commit_room_transaction(
2025                project.room_id,
2026                tx,
2027                LeftProject {
2028                    id: project_id,
2029                    host_user_id: project.host_user_id,
2030                    host_connection_id: ConnectionId(project.host_connection_id as u32),
2031                    connection_ids,
2032                },
2033            )
2034            .await
2035        })
2036        .await
2037    }
2038
2039    pub async fn project_collaborators(
2040        &self,
2041        project_id: ProjectId,
2042        connection_id: ConnectionId,
2043    ) -> Result<Vec<project_collaborator::Model>> {
2044        self.transact(|tx| async move {
2045            let collaborators = project_collaborator::Entity::find()
2046                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2047                .all(&tx)
2048                .await?;
2049
2050            if collaborators
2051                .iter()
2052                .any(|collaborator| collaborator.connection_id == connection_id.0 as i32)
2053            {
2054                Ok(collaborators)
2055            } else {
2056                Err(anyhow!("no such project"))?
2057            }
2058        })
2059        .await
2060    }
2061
2062    pub async fn project_connection_ids(
2063        &self,
2064        project_id: ProjectId,
2065        connection_id: ConnectionId,
2066    ) -> Result<HashSet<ConnectionId>> {
2067        self.transact(|tx| async move {
2068            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2069            enum QueryAs {
2070                ConnectionId,
2071            }
2072
2073            let mut db_connection_ids = project_collaborator::Entity::find()
2074                .select_only()
2075                .column_as(
2076                    project_collaborator::Column::ConnectionId,
2077                    QueryAs::ConnectionId,
2078                )
2079                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2080                .into_values::<i32, QueryAs>()
2081                .stream(&tx)
2082                .await?;
2083
2084            let mut connection_ids = HashSet::default();
2085            while let Some(connection_id) = db_connection_ids.next().await {
2086                connection_ids.insert(ConnectionId(connection_id? as u32));
2087            }
2088
2089            if connection_ids.contains(&connection_id) {
2090                Ok(connection_ids)
2091            } else {
2092                Err(anyhow!("no such project"))?
2093            }
2094        })
2095        .await
2096    }
2097
2098    async fn project_guest_connection_ids(
2099        &self,
2100        project_id: ProjectId,
2101        tx: &DatabaseTransaction,
2102    ) -> Result<Vec<ConnectionId>> {
2103        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2104        enum QueryAs {
2105            ConnectionId,
2106        }
2107
2108        let mut db_guest_connection_ids = project_collaborator::Entity::find()
2109            .select_only()
2110            .column_as(
2111                project_collaborator::Column::ConnectionId,
2112                QueryAs::ConnectionId,
2113            )
2114            .filter(
2115                project_collaborator::Column::ProjectId
2116                    .eq(project_id)
2117                    .and(project_collaborator::Column::IsHost.eq(false)),
2118            )
2119            .into_values::<i32, QueryAs>()
2120            .stream(tx)
2121            .await?;
2122
2123        let mut guest_connection_ids = Vec::new();
2124        while let Some(connection_id) = db_guest_connection_ids.next().await {
2125            guest_connection_ids.push(ConnectionId(connection_id? as u32));
2126        }
2127        Ok(guest_connection_ids)
2128    }
2129
2130    // access tokens
2131
2132    pub async fn create_access_token_hash(
2133        &self,
2134        user_id: UserId,
2135        access_token_hash: &str,
2136        max_access_token_count: usize,
2137    ) -> Result<()> {
2138        self.transact(|tx| async {
2139            let tx = tx;
2140
2141            access_token::ActiveModel {
2142                user_id: ActiveValue::set(user_id),
2143                hash: ActiveValue::set(access_token_hash.into()),
2144                ..Default::default()
2145            }
2146            .insert(&tx)
2147            .await?;
2148
2149            access_token::Entity::delete_many()
2150                .filter(
2151                    access_token::Column::Id.in_subquery(
2152                        Query::select()
2153                            .column(access_token::Column::Id)
2154                            .from(access_token::Entity)
2155                            .and_where(access_token::Column::UserId.eq(user_id))
2156                            .order_by(access_token::Column::Id, sea_orm::Order::Desc)
2157                            .limit(10000)
2158                            .offset(max_access_token_count as u64)
2159                            .to_owned(),
2160                    ),
2161                )
2162                .exec(&tx)
2163                .await?;
2164            tx.commit().await?;
2165            Ok(())
2166        })
2167        .await
2168    }
2169
2170    pub async fn get_access_token_hashes(&self, user_id: UserId) -> Result<Vec<String>> {
2171        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2172        enum QueryAs {
2173            Hash,
2174        }
2175
2176        self.transact(|tx| async move {
2177            Ok(access_token::Entity::find()
2178                .select_only()
2179                .column(access_token::Column::Hash)
2180                .filter(access_token::Column::UserId.eq(user_id))
2181                .order_by_desc(access_token::Column::Id)
2182                .into_values::<_, QueryAs>()
2183                .all(&tx)
2184                .await?)
2185        })
2186        .await
2187    }
2188
2189    async fn transact<F, Fut, T>(&self, f: F) -> Result<T>
2190    where
2191        F: Send + Fn(DatabaseTransaction) -> Fut,
2192        Fut: Send + Future<Output = Result<T>>,
2193    {
2194        let body = async {
2195            loop {
2196                let tx = self.pool.begin().await?;
2197
2198                // In Postgres, serializable transactions are opt-in
2199                if let DatabaseBackend::Postgres = self.pool.get_database_backend() {
2200                    tx.execute(Statement::from_string(
2201                        DatabaseBackend::Postgres,
2202                        "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;".into(),
2203                    ))
2204                    .await?;
2205                }
2206
2207                match f(tx).await {
2208                    Ok(result) => return Ok(result),
2209                    Err(error) => match error {
2210                        Error::Database2(
2211                            DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
2212                            | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
2213                        ) if error
2214                            .as_database_error()
2215                            .and_then(|error| error.code())
2216                            .as_deref()
2217                            == Some("40001") =>
2218                        {
2219                            // Retry (don't break the loop)
2220                        }
2221                        error @ _ => return Err(error),
2222                    },
2223                }
2224            }
2225        };
2226
2227        #[cfg(test)]
2228        {
2229            if let Some(background) = self.background.as_ref() {
2230                background.simulate_random_delay().await;
2231            }
2232
2233            self.runtime.as_ref().unwrap().block_on(body)
2234        }
2235
2236        #[cfg(not(test))]
2237        {
2238            body.await
2239        }
2240    }
2241}
2242
2243pub struct RoomGuard<T> {
2244    data: T,
2245    _guard: OwnedMutexGuard<()>,
2246    _not_send: PhantomData<Rc<()>>,
2247}
2248
2249impl<T> Deref for RoomGuard<T> {
2250    type Target = T;
2251
2252    fn deref(&self) -> &T {
2253        &self.data
2254    }
2255}
2256
2257impl<T> DerefMut for RoomGuard<T> {
2258    fn deref_mut(&mut self) -> &mut T {
2259        &mut self.data
2260    }
2261}
2262
2263#[derive(Debug, Serialize, Deserialize)]
2264pub struct NewUserParams {
2265    pub github_login: String,
2266    pub github_user_id: i32,
2267    pub invite_count: i32,
2268}
2269
2270#[derive(Debug)]
2271pub struct NewUserResult {
2272    pub user_id: UserId,
2273    pub metrics_id: String,
2274    pub inviting_user_id: Option<UserId>,
2275    pub signup_device_id: Option<String>,
2276}
2277
2278fn random_invite_code() -> String {
2279    nanoid::nanoid!(16)
2280}
2281
2282fn random_email_confirmation_code() -> String {
2283    nanoid::nanoid!(64)
2284}
2285
2286macro_rules! id_type {
2287    ($name:ident) => {
2288        #[derive(
2289            Clone,
2290            Copy,
2291            Debug,
2292            Default,
2293            PartialEq,
2294            Eq,
2295            PartialOrd,
2296            Ord,
2297            Hash,
2298            Serialize,
2299            Deserialize,
2300        )]
2301        #[serde(transparent)]
2302        pub struct $name(pub i32);
2303
2304        impl $name {
2305            #[allow(unused)]
2306            pub const MAX: Self = Self(i32::MAX);
2307
2308            #[allow(unused)]
2309            pub fn from_proto(value: u64) -> Self {
2310                Self(value as i32)
2311            }
2312
2313            #[allow(unused)]
2314            pub fn to_proto(self) -> u64 {
2315                self.0 as u64
2316            }
2317        }
2318
2319        impl std::fmt::Display for $name {
2320            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2321                self.0.fmt(f)
2322            }
2323        }
2324
2325        impl From<$name> for sea_query::Value {
2326            fn from(value: $name) -> Self {
2327                sea_query::Value::Int(Some(value.0))
2328            }
2329        }
2330
2331        impl sea_orm::TryGetable for $name {
2332            fn try_get(
2333                res: &sea_orm::QueryResult,
2334                pre: &str,
2335                col: &str,
2336            ) -> Result<Self, sea_orm::TryGetError> {
2337                Ok(Self(i32::try_get(res, pre, col)?))
2338            }
2339        }
2340
2341        impl sea_query::ValueType for $name {
2342            fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
2343                match v {
2344                    Value::TinyInt(Some(int)) => {
2345                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2346                    }
2347                    Value::SmallInt(Some(int)) => {
2348                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2349                    }
2350                    Value::Int(Some(int)) => {
2351                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2352                    }
2353                    Value::BigInt(Some(int)) => {
2354                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2355                    }
2356                    Value::TinyUnsigned(Some(int)) => {
2357                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2358                    }
2359                    Value::SmallUnsigned(Some(int)) => {
2360                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2361                    }
2362                    Value::Unsigned(Some(int)) => {
2363                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2364                    }
2365                    Value::BigUnsigned(Some(int)) => {
2366                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2367                    }
2368                    _ => Err(sea_query::ValueTypeErr),
2369                }
2370            }
2371
2372            fn type_name() -> String {
2373                stringify!($name).into()
2374            }
2375
2376            fn array_type() -> sea_query::ArrayType {
2377                sea_query::ArrayType::Int
2378            }
2379
2380            fn column_type() -> sea_query::ColumnType {
2381                sea_query::ColumnType::Integer(None)
2382            }
2383        }
2384
2385        impl sea_orm::TryFromU64 for $name {
2386            fn try_from_u64(n: u64) -> Result<Self, DbErr> {
2387                Ok(Self(n.try_into().map_err(|_| {
2388                    DbErr::ConvertFromU64(concat!(
2389                        "error converting ",
2390                        stringify!($name),
2391                        " to u64"
2392                    ))
2393                })?))
2394            }
2395        }
2396
2397        impl sea_query::Nullable for $name {
2398            fn null() -> Value {
2399                Value::Int(None)
2400            }
2401        }
2402    };
2403}
2404
2405id_type!(AccessTokenId);
2406id_type!(ContactId);
2407id_type!(RoomId);
2408id_type!(RoomParticipantId);
2409id_type!(ProjectId);
2410id_type!(ProjectCollaboratorId);
2411id_type!(ReplicaId);
2412id_type!(SignupId);
2413id_type!(UserId);
2414
2415pub struct LeftRoom {
2416    pub room: proto::Room,
2417    pub left_projects: HashMap<ProjectId, LeftProject>,
2418    pub canceled_calls_to_user_ids: Vec<UserId>,
2419}
2420
2421pub struct Project {
2422    pub collaborators: Vec<project_collaborator::Model>,
2423    pub worktrees: BTreeMap<u64, Worktree>,
2424    pub language_servers: Vec<proto::LanguageServer>,
2425}
2426
2427pub struct LeftProject {
2428    pub id: ProjectId,
2429    pub host_user_id: UserId,
2430    pub host_connection_id: ConnectionId,
2431    pub connection_ids: Vec<ConnectionId>,
2432}
2433
2434pub struct Worktree {
2435    pub id: u64,
2436    pub abs_path: String,
2437    pub root_name: String,
2438    pub visible: bool,
2439    pub entries: Vec<proto::Entry>,
2440    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
2441    pub scan_id: u64,
2442    pub is_complete: bool,
2443}
2444
2445#[cfg(test)]
2446pub use test::*;
2447
2448#[cfg(test)]
2449mod test {
2450    use super::*;
2451    use gpui::executor::Background;
2452    use lazy_static::lazy_static;
2453    use parking_lot::Mutex;
2454    use rand::prelude::*;
2455    use sea_orm::ConnectionTrait;
2456    use sqlx::migrate::MigrateDatabase;
2457    use std::sync::Arc;
2458
2459    pub struct TestDb {
2460        pub db: Option<Arc<Database>>,
2461        pub connection: Option<sqlx::AnyConnection>,
2462    }
2463
2464    impl TestDb {
2465        pub fn sqlite(background: Arc<Background>) -> Self {
2466            let url = format!("sqlite::memory:");
2467            let runtime = tokio::runtime::Builder::new_current_thread()
2468                .enable_io()
2469                .enable_time()
2470                .build()
2471                .unwrap();
2472
2473            let mut db = runtime.block_on(async {
2474                let mut options = ConnectOptions::new(url);
2475                options.max_connections(5);
2476                let db = Database::new(options).await.unwrap();
2477                let sql = include_str!(concat!(
2478                    env!("CARGO_MANIFEST_DIR"),
2479                    "/migrations.sqlite/20221109000000_test_schema.sql"
2480                ));
2481                db.pool
2482                    .execute(sea_orm::Statement::from_string(
2483                        db.pool.get_database_backend(),
2484                        sql.into(),
2485                    ))
2486                    .await
2487                    .unwrap();
2488                db
2489            });
2490
2491            db.background = Some(background);
2492            db.runtime = Some(runtime);
2493
2494            Self {
2495                db: Some(Arc::new(db)),
2496                connection: None,
2497            }
2498        }
2499
2500        pub fn postgres(background: Arc<Background>) -> Self {
2501            lazy_static! {
2502                static ref LOCK: Mutex<()> = Mutex::new(());
2503            }
2504
2505            let _guard = LOCK.lock();
2506            let mut rng = StdRng::from_entropy();
2507            let url = format!(
2508                "postgres://postgres@localhost/zed-test-{}",
2509                rng.gen::<u128>()
2510            );
2511            let runtime = tokio::runtime::Builder::new_current_thread()
2512                .enable_io()
2513                .enable_time()
2514                .build()
2515                .unwrap();
2516
2517            let mut db = runtime.block_on(async {
2518                sqlx::Postgres::create_database(&url)
2519                    .await
2520                    .expect("failed to create test db");
2521                let mut options = ConnectOptions::new(url);
2522                options
2523                    .max_connections(5)
2524                    .idle_timeout(Duration::from_secs(0));
2525                let db = Database::new(options).await.unwrap();
2526                let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
2527                db.migrate(Path::new(migrations_path), false).await.unwrap();
2528                db
2529            });
2530
2531            db.background = Some(background);
2532            db.runtime = Some(runtime);
2533
2534            Self {
2535                db: Some(Arc::new(db)),
2536                connection: None,
2537            }
2538        }
2539
2540        pub fn db(&self) -> &Arc<Database> {
2541            self.db.as_ref().unwrap()
2542        }
2543    }
2544
2545    impl Drop for TestDb {
2546        fn drop(&mut self) {
2547            let db = self.db.take().unwrap();
2548            if let DatabaseBackend::Postgres = db.pool.get_database_backend() {
2549                db.runtime.as_ref().unwrap().block_on(async {
2550                    use util::ResultExt;
2551                    let query = "
2552                        SELECT pg_terminate_backend(pg_stat_activity.pid)
2553                        FROM pg_stat_activity
2554                        WHERE
2555                            pg_stat_activity.datname = current_database() AND
2556                            pid <> pg_backend_pid();
2557                    ";
2558                    db.pool
2559                        .execute(sea_orm::Statement::from_string(
2560                            db.pool.get_database_backend(),
2561                            query.into(),
2562                        ))
2563                        .await
2564                        .log_err();
2565                    sqlx::Postgres::drop_database(db.options.get_url())
2566                        .await
2567                        .log_err();
2568                })
2569            }
2570        }
2571    }
2572}