db.rs

   1mod access_token;
   2mod contact;
   3mod language_server;
   4mod project;
   5mod project_collaborator;
   6mod room;
   7mod room_participant;
   8mod signup;
   9#[cfg(test)]
  10mod tests;
  11mod user;
  12mod worktree;
  13mod worktree_diagnostic_summary;
  14mod worktree_entry;
  15
  16use crate::{Error, Result};
  17use anyhow::anyhow;
  18use collections::{BTreeMap, HashMap, HashSet};
  19pub use contact::Contact;
  20use dashmap::DashMap;
  21use futures::StreamExt;
  22use hyper::StatusCode;
  23use rpc::{proto, ConnectionId};
  24pub use sea_orm::ConnectOptions;
  25use sea_orm::{
  26    entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
  27    DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
  28    Statement, TransactionTrait,
  29};
  30use sea_query::{Alias, Expr, OnConflict, Query};
  31use serde::{Deserialize, Serialize};
  32pub use signup::{Invite, NewSignup, WaitlistSummary};
  33use sqlx::migrate::{Migrate, Migration, MigrationSource};
  34use sqlx::Connection;
  35use std::ops::{Deref, DerefMut};
  36use std::path::Path;
  37use std::time::Duration;
  38use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
  39use tokio::sync::{Mutex, OwnedMutexGuard};
  40pub use user::Model as User;
  41
  42pub struct Database {
  43    options: ConnectOptions,
  44    pool: DatabaseConnection,
  45    rooms: DashMap<RoomId, Arc<Mutex<()>>>,
  46    #[cfg(test)]
  47    background: Option<std::sync::Arc<gpui::executor::Background>>,
  48    #[cfg(test)]
  49    runtime: Option<tokio::runtime::Runtime>,
  50    epoch: Uuid,
  51}
  52
  53impl Database {
  54    pub async fn new(options: ConnectOptions) -> Result<Self> {
  55        Ok(Self {
  56            options: options.clone(),
  57            pool: sea_orm::Database::connect(options).await?,
  58            rooms: DashMap::with_capacity(16384),
  59            #[cfg(test)]
  60            background: None,
  61            #[cfg(test)]
  62            runtime: None,
  63            epoch: Uuid::new_v4(),
  64        })
  65    }
  66
  67    pub async fn migrate(
  68        &self,
  69        migrations_path: &Path,
  70        ignore_checksum_mismatch: bool,
  71    ) -> anyhow::Result<Vec<(Migration, Duration)>> {
  72        let migrations = MigrationSource::resolve(migrations_path)
  73            .await
  74            .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
  75
  76        let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
  77
  78        connection.ensure_migrations_table().await?;
  79        let applied_migrations: HashMap<_, _> = connection
  80            .list_applied_migrations()
  81            .await?
  82            .into_iter()
  83            .map(|m| (m.version, m))
  84            .collect();
  85
  86        let mut new_migrations = Vec::new();
  87        for migration in migrations {
  88            match applied_migrations.get(&migration.version) {
  89                Some(applied_migration) => {
  90                    if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
  91                    {
  92                        Err(anyhow!(
  93                            "checksum mismatch for applied migration {}",
  94                            migration.description
  95                        ))?;
  96                    }
  97                }
  98                None => {
  99                    let elapsed = connection.apply(&migration).await?;
 100                    new_migrations.push((migration, elapsed));
 101                }
 102            }
 103        }
 104
 105        Ok(new_migrations)
 106    }
 107
 108    pub async fn clear_stale_data(&self) -> Result<()> {
 109        self.transaction(|tx| async move {
 110            project_collaborator::Entity::delete_many()
 111                .filter(project_collaborator::Column::ConnectionEpoch.ne(self.epoch))
 112                .exec(&*tx)
 113                .await?;
 114            room_participant::Entity::delete_many()
 115                .filter(
 116                    room_participant::Column::AnsweringConnectionEpoch
 117                        .ne(self.epoch)
 118                        .or(room_participant::Column::CallingConnectionEpoch.ne(self.epoch)),
 119                )
 120                .exec(&*tx)
 121                .await?;
 122            project::Entity::delete_many()
 123                .filter(project::Column::HostConnectionEpoch.ne(self.epoch))
 124                .exec(&*tx)
 125                .await?;
 126            room::Entity::delete_many()
 127                .filter(
 128                    room::Column::Id.not_in_subquery(
 129                        Query::select()
 130                            .column(room_participant::Column::RoomId)
 131                            .from(room_participant::Entity)
 132                            .distinct()
 133                            .to_owned(),
 134                    ),
 135                )
 136                .exec(&*tx)
 137                .await?;
 138            Ok(())
 139        })
 140        .await
 141    }
 142
 143    // users
 144
 145    pub async fn create_user(
 146        &self,
 147        email_address: &str,
 148        admin: bool,
 149        params: NewUserParams,
 150    ) -> Result<NewUserResult> {
 151        self.transaction(|tx| async {
 152            let tx = tx;
 153            let user = user::Entity::insert(user::ActiveModel {
 154                email_address: ActiveValue::set(Some(email_address.into())),
 155                github_login: ActiveValue::set(params.github_login.clone()),
 156                github_user_id: ActiveValue::set(Some(params.github_user_id)),
 157                admin: ActiveValue::set(admin),
 158                metrics_id: ActiveValue::set(Uuid::new_v4()),
 159                ..Default::default()
 160            })
 161            .on_conflict(
 162                OnConflict::column(user::Column::GithubLogin)
 163                    .update_column(user::Column::GithubLogin)
 164                    .to_owned(),
 165            )
 166            .exec_with_returning(&*tx)
 167            .await?;
 168
 169            Ok(NewUserResult {
 170                user_id: user.id,
 171                metrics_id: user.metrics_id.to_string(),
 172                signup_device_id: None,
 173                inviting_user_id: None,
 174            })
 175        })
 176        .await
 177    }
 178
 179    pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
 180        self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
 181            .await
 182    }
 183
 184    pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
 185        self.transaction(|tx| async {
 186            let tx = tx;
 187            Ok(user::Entity::find()
 188                .filter(user::Column::Id.is_in(ids.iter().copied()))
 189                .all(&*tx)
 190                .await?)
 191        })
 192        .await
 193    }
 194
 195    pub async fn get_user_by_github_account(
 196        &self,
 197        github_login: &str,
 198        github_user_id: Option<i32>,
 199    ) -> Result<Option<User>> {
 200        self.transaction(|tx| async move {
 201            let tx = &*tx;
 202            if let Some(github_user_id) = github_user_id {
 203                if let Some(user_by_github_user_id) = user::Entity::find()
 204                    .filter(user::Column::GithubUserId.eq(github_user_id))
 205                    .one(tx)
 206                    .await?
 207                {
 208                    let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
 209                    user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
 210                    Ok(Some(user_by_github_user_id.update(tx).await?))
 211                } else if let Some(user_by_github_login) = user::Entity::find()
 212                    .filter(user::Column::GithubLogin.eq(github_login))
 213                    .one(tx)
 214                    .await?
 215                {
 216                    let mut user_by_github_login = user_by_github_login.into_active_model();
 217                    user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
 218                    Ok(Some(user_by_github_login.update(tx).await?))
 219                } else {
 220                    Ok(None)
 221                }
 222            } else {
 223                Ok(user::Entity::find()
 224                    .filter(user::Column::GithubLogin.eq(github_login))
 225                    .one(tx)
 226                    .await?)
 227            }
 228        })
 229        .await
 230    }
 231
 232    pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
 233        self.transaction(|tx| async move {
 234            Ok(user::Entity::find()
 235                .order_by_asc(user::Column::GithubLogin)
 236                .limit(limit as u64)
 237                .offset(page as u64 * limit as u64)
 238                .all(&*tx)
 239                .await?)
 240        })
 241        .await
 242    }
 243
 244    pub async fn get_users_with_no_invites(
 245        &self,
 246        invited_by_another_user: bool,
 247    ) -> Result<Vec<User>> {
 248        self.transaction(|tx| async move {
 249            Ok(user::Entity::find()
 250                .filter(
 251                    user::Column::InviteCount
 252                        .eq(0)
 253                        .and(if invited_by_another_user {
 254                            user::Column::InviterId.is_not_null()
 255                        } else {
 256                            user::Column::InviterId.is_null()
 257                        }),
 258                )
 259                .all(&*tx)
 260                .await?)
 261        })
 262        .await
 263    }
 264
 265    pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
 266        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 267        enum QueryAs {
 268            MetricsId,
 269        }
 270
 271        self.transaction(|tx| async move {
 272            let metrics_id: Uuid = user::Entity::find_by_id(id)
 273                .select_only()
 274                .column(user::Column::MetricsId)
 275                .into_values::<_, QueryAs>()
 276                .one(&*tx)
 277                .await?
 278                .ok_or_else(|| anyhow!("could not find user"))?;
 279            Ok(metrics_id.to_string())
 280        })
 281        .await
 282    }
 283
 284    pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
 285        self.transaction(|tx| async move {
 286            user::Entity::update_many()
 287                .filter(user::Column::Id.eq(id))
 288                .set(user::ActiveModel {
 289                    admin: ActiveValue::set(is_admin),
 290                    ..Default::default()
 291                })
 292                .exec(&*tx)
 293                .await?;
 294            Ok(())
 295        })
 296        .await
 297    }
 298
 299    pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
 300        self.transaction(|tx| async move {
 301            user::Entity::update_many()
 302                .filter(user::Column::Id.eq(id))
 303                .set(user::ActiveModel {
 304                    connected_once: ActiveValue::set(connected_once),
 305                    ..Default::default()
 306                })
 307                .exec(&*tx)
 308                .await?;
 309            Ok(())
 310        })
 311        .await
 312    }
 313
 314    pub async fn destroy_user(&self, id: UserId) -> Result<()> {
 315        self.transaction(|tx| async move {
 316            access_token::Entity::delete_many()
 317                .filter(access_token::Column::UserId.eq(id))
 318                .exec(&*tx)
 319                .await?;
 320            user::Entity::delete_by_id(id).exec(&*tx).await?;
 321            Ok(())
 322        })
 323        .await
 324    }
 325
 326    // contacts
 327
 328    pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
 329        #[derive(Debug, FromQueryResult)]
 330        struct ContactWithUserBusyStatuses {
 331            user_id_a: UserId,
 332            user_id_b: UserId,
 333            a_to_b: bool,
 334            accepted: bool,
 335            should_notify: bool,
 336            user_a_busy: bool,
 337            user_b_busy: bool,
 338        }
 339
 340        self.transaction(|tx| async move {
 341            let user_a_participant = Alias::new("user_a_participant");
 342            let user_b_participant = Alias::new("user_b_participant");
 343            let mut db_contacts = contact::Entity::find()
 344                .column_as(
 345                    Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
 346                        .is_not_null(),
 347                    "user_a_busy",
 348                )
 349                .column_as(
 350                    Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
 351                        .is_not_null(),
 352                    "user_b_busy",
 353                )
 354                .filter(
 355                    contact::Column::UserIdA
 356                        .eq(user_id)
 357                        .or(contact::Column::UserIdB.eq(user_id)),
 358                )
 359                .join_as(
 360                    JoinType::LeftJoin,
 361                    contact::Relation::UserARoomParticipant.def(),
 362                    user_a_participant,
 363                )
 364                .join_as(
 365                    JoinType::LeftJoin,
 366                    contact::Relation::UserBRoomParticipant.def(),
 367                    user_b_participant,
 368                )
 369                .into_model::<ContactWithUserBusyStatuses>()
 370                .stream(&*tx)
 371                .await?;
 372
 373            let mut contacts = Vec::new();
 374            while let Some(db_contact) = db_contacts.next().await {
 375                let db_contact = db_contact?;
 376                if db_contact.user_id_a == user_id {
 377                    if db_contact.accepted {
 378                        contacts.push(Contact::Accepted {
 379                            user_id: db_contact.user_id_b,
 380                            should_notify: db_contact.should_notify && db_contact.a_to_b,
 381                            busy: db_contact.user_b_busy,
 382                        });
 383                    } else if db_contact.a_to_b {
 384                        contacts.push(Contact::Outgoing {
 385                            user_id: db_contact.user_id_b,
 386                        })
 387                    } else {
 388                        contacts.push(Contact::Incoming {
 389                            user_id: db_contact.user_id_b,
 390                            should_notify: db_contact.should_notify,
 391                        });
 392                    }
 393                } else if db_contact.accepted {
 394                    contacts.push(Contact::Accepted {
 395                        user_id: db_contact.user_id_a,
 396                        should_notify: db_contact.should_notify && !db_contact.a_to_b,
 397                        busy: db_contact.user_a_busy,
 398                    });
 399                } else if db_contact.a_to_b {
 400                    contacts.push(Contact::Incoming {
 401                        user_id: db_contact.user_id_a,
 402                        should_notify: db_contact.should_notify,
 403                    });
 404                } else {
 405                    contacts.push(Contact::Outgoing {
 406                        user_id: db_contact.user_id_a,
 407                    });
 408                }
 409            }
 410
 411            contacts.sort_unstable_by_key(|contact| contact.user_id());
 412
 413            Ok(contacts)
 414        })
 415        .await
 416    }
 417
 418    pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
 419        self.transaction(|tx| async move {
 420            let participant = room_participant::Entity::find()
 421                .filter(room_participant::Column::UserId.eq(user_id))
 422                .one(&*tx)
 423                .await?;
 424            Ok(participant.is_some())
 425        })
 426        .await
 427    }
 428
 429    pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
 430        self.transaction(|tx| async move {
 431            let (id_a, id_b) = if user_id_1 < user_id_2 {
 432                (user_id_1, user_id_2)
 433            } else {
 434                (user_id_2, user_id_1)
 435            };
 436
 437            Ok(contact::Entity::find()
 438                .filter(
 439                    contact::Column::UserIdA
 440                        .eq(id_a)
 441                        .and(contact::Column::UserIdB.eq(id_b))
 442                        .and(contact::Column::Accepted.eq(true)),
 443                )
 444                .one(&*tx)
 445                .await?
 446                .is_some())
 447        })
 448        .await
 449    }
 450
 451    pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
 452        self.transaction(|tx| async move {
 453            let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
 454                (sender_id, receiver_id, true)
 455            } else {
 456                (receiver_id, sender_id, false)
 457            };
 458
 459            let rows_affected = contact::Entity::insert(contact::ActiveModel {
 460                user_id_a: ActiveValue::set(id_a),
 461                user_id_b: ActiveValue::set(id_b),
 462                a_to_b: ActiveValue::set(a_to_b),
 463                accepted: ActiveValue::set(false),
 464                should_notify: ActiveValue::set(true),
 465                ..Default::default()
 466            })
 467            .on_conflict(
 468                OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
 469                    .values([
 470                        (contact::Column::Accepted, true.into()),
 471                        (contact::Column::ShouldNotify, false.into()),
 472                    ])
 473                    .action_and_where(
 474                        contact::Column::Accepted.eq(false).and(
 475                            contact::Column::AToB
 476                                .eq(a_to_b)
 477                                .and(contact::Column::UserIdA.eq(id_b))
 478                                .or(contact::Column::AToB
 479                                    .ne(a_to_b)
 480                                    .and(contact::Column::UserIdA.eq(id_a))),
 481                        ),
 482                    )
 483                    .to_owned(),
 484            )
 485            .exec_without_returning(&*tx)
 486            .await?;
 487
 488            if rows_affected == 1 {
 489                Ok(())
 490            } else {
 491                Err(anyhow!("contact already requested"))?
 492            }
 493        })
 494        .await
 495    }
 496
 497    pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<()> {
 498        self.transaction(|tx| async move {
 499            let (id_a, id_b) = if responder_id < requester_id {
 500                (responder_id, requester_id)
 501            } else {
 502                (requester_id, responder_id)
 503            };
 504
 505            let result = contact::Entity::delete_many()
 506                .filter(
 507                    contact::Column::UserIdA
 508                        .eq(id_a)
 509                        .and(contact::Column::UserIdB.eq(id_b)),
 510                )
 511                .exec(&*tx)
 512                .await?;
 513
 514            if result.rows_affected == 1 {
 515                Ok(())
 516            } else {
 517                Err(anyhow!("no such contact"))?
 518            }
 519        })
 520        .await
 521    }
 522
 523    pub async fn dismiss_contact_notification(
 524        &self,
 525        user_id: UserId,
 526        contact_user_id: UserId,
 527    ) -> Result<()> {
 528        self.transaction(|tx| async move {
 529            let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
 530                (user_id, contact_user_id, true)
 531            } else {
 532                (contact_user_id, user_id, false)
 533            };
 534
 535            let result = contact::Entity::update_many()
 536                .set(contact::ActiveModel {
 537                    should_notify: ActiveValue::set(false),
 538                    ..Default::default()
 539                })
 540                .filter(
 541                    contact::Column::UserIdA
 542                        .eq(id_a)
 543                        .and(contact::Column::UserIdB.eq(id_b))
 544                        .and(
 545                            contact::Column::AToB
 546                                .eq(a_to_b)
 547                                .and(contact::Column::Accepted.eq(true))
 548                                .or(contact::Column::AToB
 549                                    .ne(a_to_b)
 550                                    .and(contact::Column::Accepted.eq(false))),
 551                        ),
 552                )
 553                .exec(&*tx)
 554                .await?;
 555            if result.rows_affected == 0 {
 556                Err(anyhow!("no such contact request"))?
 557            } else {
 558                Ok(())
 559            }
 560        })
 561        .await
 562    }
 563
 564    pub async fn respond_to_contact_request(
 565        &self,
 566        responder_id: UserId,
 567        requester_id: UserId,
 568        accept: bool,
 569    ) -> Result<()> {
 570        self.transaction(|tx| async move {
 571            let (id_a, id_b, a_to_b) = if responder_id < requester_id {
 572                (responder_id, requester_id, false)
 573            } else {
 574                (requester_id, responder_id, true)
 575            };
 576            let rows_affected = if accept {
 577                let result = contact::Entity::update_many()
 578                    .set(contact::ActiveModel {
 579                        accepted: ActiveValue::set(true),
 580                        should_notify: ActiveValue::set(true),
 581                        ..Default::default()
 582                    })
 583                    .filter(
 584                        contact::Column::UserIdA
 585                            .eq(id_a)
 586                            .and(contact::Column::UserIdB.eq(id_b))
 587                            .and(contact::Column::AToB.eq(a_to_b)),
 588                    )
 589                    .exec(&*tx)
 590                    .await?;
 591                result.rows_affected
 592            } else {
 593                let result = contact::Entity::delete_many()
 594                    .filter(
 595                        contact::Column::UserIdA
 596                            .eq(id_a)
 597                            .and(contact::Column::UserIdB.eq(id_b))
 598                            .and(contact::Column::AToB.eq(a_to_b))
 599                            .and(contact::Column::Accepted.eq(false)),
 600                    )
 601                    .exec(&*tx)
 602                    .await?;
 603
 604                result.rows_affected
 605            };
 606
 607            if rows_affected == 1 {
 608                Ok(())
 609            } else {
 610                Err(anyhow!("no such contact request"))?
 611            }
 612        })
 613        .await
 614    }
 615
 616    pub fn fuzzy_like_string(string: &str) -> String {
 617        let mut result = String::with_capacity(string.len() * 2 + 1);
 618        for c in string.chars() {
 619            if c.is_alphanumeric() {
 620                result.push('%');
 621                result.push(c);
 622            }
 623        }
 624        result.push('%');
 625        result
 626    }
 627
 628    pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
 629        self.transaction(|tx| async {
 630            let tx = tx;
 631            let like_string = Self::fuzzy_like_string(name_query);
 632            let query = "
 633                SELECT users.*
 634                FROM users
 635                WHERE github_login ILIKE $1
 636                ORDER BY github_login <-> $2
 637                LIMIT $3
 638            ";
 639
 640            Ok(user::Entity::find()
 641                .from_raw_sql(Statement::from_sql_and_values(
 642                    self.pool.get_database_backend(),
 643                    query.into(),
 644                    vec![like_string.into(), name_query.into(), limit.into()],
 645                ))
 646                .all(&*tx)
 647                .await?)
 648        })
 649        .await
 650    }
 651
 652    // signups
 653
 654    pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
 655        self.transaction(|tx| async move {
 656            signup::Entity::insert(signup::ActiveModel {
 657                email_address: ActiveValue::set(signup.email_address.clone()),
 658                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 659                email_confirmation_sent: ActiveValue::set(false),
 660                platform_mac: ActiveValue::set(signup.platform_mac),
 661                platform_windows: ActiveValue::set(signup.platform_windows),
 662                platform_linux: ActiveValue::set(signup.platform_linux),
 663                platform_unknown: ActiveValue::set(false),
 664                editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
 665                programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
 666                device_id: ActiveValue::set(signup.device_id.clone()),
 667                added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
 668                ..Default::default()
 669            })
 670            .on_conflict(
 671                OnConflict::column(signup::Column::EmailAddress)
 672                    .update_columns([
 673                        signup::Column::PlatformMac,
 674                        signup::Column::PlatformWindows,
 675                        signup::Column::PlatformLinux,
 676                        signup::Column::EditorFeatures,
 677                        signup::Column::ProgrammingLanguages,
 678                        signup::Column::DeviceId,
 679                        signup::Column::AddedToMailingList,
 680                    ])
 681                    .to_owned(),
 682            )
 683            .exec(&*tx)
 684            .await?;
 685            Ok(())
 686        })
 687        .await
 688    }
 689
 690    pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
 691        self.transaction(|tx| async move {
 692            let signup = signup::Entity::find()
 693                .filter(signup::Column::EmailAddress.eq(email_address))
 694                .one(&*tx)
 695                .await?
 696                .ok_or_else(|| {
 697                    anyhow!("signup with email address {} doesn't exist", email_address)
 698                })?;
 699
 700            Ok(signup)
 701        })
 702        .await
 703    }
 704
 705    pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
 706        self.transaction(|tx| async move {
 707            let query = "
 708                SELECT
 709                    COUNT(*) as count,
 710                    COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
 711                    COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
 712                    COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
 713                    COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
 714                FROM (
 715                    SELECT *
 716                    FROM signups
 717                    WHERE
 718                        NOT email_confirmation_sent
 719                ) AS unsent
 720            ";
 721            Ok(
 722                WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
 723                    self.pool.get_database_backend(),
 724                    query.into(),
 725                    vec![],
 726                ))
 727                .one(&*tx)
 728                .await?
 729                .ok_or_else(|| anyhow!("invalid result"))?,
 730            )
 731        })
 732        .await
 733    }
 734
 735    pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
 736        let emails = invites
 737            .iter()
 738            .map(|s| s.email_address.as_str())
 739            .collect::<Vec<_>>();
 740        self.transaction(|tx| async {
 741            let tx = tx;
 742            signup::Entity::update_many()
 743                .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
 744                .set(signup::ActiveModel {
 745                    email_confirmation_sent: ActiveValue::set(true),
 746                    ..Default::default()
 747                })
 748                .exec(&*tx)
 749                .await?;
 750            Ok(())
 751        })
 752        .await
 753    }
 754
 755    pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
 756        self.transaction(|tx| async move {
 757            Ok(signup::Entity::find()
 758                .select_only()
 759                .column(signup::Column::EmailAddress)
 760                .column(signup::Column::EmailConfirmationCode)
 761                .filter(
 762                    signup::Column::EmailConfirmationSent.eq(false).and(
 763                        signup::Column::PlatformMac
 764                            .eq(true)
 765                            .or(signup::Column::PlatformUnknown.eq(true)),
 766                    ),
 767                )
 768                .order_by_asc(signup::Column::CreatedAt)
 769                .limit(count as u64)
 770                .into_model()
 771                .all(&*tx)
 772                .await?)
 773        })
 774        .await
 775    }
 776
 777    // invite codes
 778
 779    pub async fn create_invite_from_code(
 780        &self,
 781        code: &str,
 782        email_address: &str,
 783        device_id: Option<&str>,
 784    ) -> Result<Invite> {
 785        self.transaction(|tx| async move {
 786            let existing_user = user::Entity::find()
 787                .filter(user::Column::EmailAddress.eq(email_address))
 788                .one(&*tx)
 789                .await?;
 790
 791            if existing_user.is_some() {
 792                Err(anyhow!("email address is already in use"))?;
 793            }
 794
 795            let inviting_user_with_invites = match user::Entity::find()
 796                .filter(
 797                    user::Column::InviteCode
 798                        .eq(code)
 799                        .and(user::Column::InviteCount.gt(0)),
 800                )
 801                .one(&*tx)
 802                .await?
 803            {
 804                Some(inviting_user) => inviting_user,
 805                None => {
 806                    return Err(Error::Http(
 807                        StatusCode::UNAUTHORIZED,
 808                        "unable to find an invite code with invites remaining".to_string(),
 809                    ))?
 810                }
 811            };
 812            user::Entity::update_many()
 813                .filter(
 814                    user::Column::Id
 815                        .eq(inviting_user_with_invites.id)
 816                        .and(user::Column::InviteCount.gt(0)),
 817                )
 818                .col_expr(
 819                    user::Column::InviteCount,
 820                    Expr::col(user::Column::InviteCount).sub(1),
 821                )
 822                .exec(&*tx)
 823                .await?;
 824
 825            let signup = signup::Entity::insert(signup::ActiveModel {
 826                email_address: ActiveValue::set(email_address.into()),
 827                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 828                email_confirmation_sent: ActiveValue::set(false),
 829                inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
 830                platform_linux: ActiveValue::set(false),
 831                platform_mac: ActiveValue::set(false),
 832                platform_windows: ActiveValue::set(false),
 833                platform_unknown: ActiveValue::set(true),
 834                device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
 835                ..Default::default()
 836            })
 837            .on_conflict(
 838                OnConflict::column(signup::Column::EmailAddress)
 839                    .update_column(signup::Column::InvitingUserId)
 840                    .to_owned(),
 841            )
 842            .exec_with_returning(&*tx)
 843            .await?;
 844
 845            Ok(Invite {
 846                email_address: signup.email_address,
 847                email_confirmation_code: signup.email_confirmation_code,
 848            })
 849        })
 850        .await
 851    }
 852
 853    pub async fn create_user_from_invite(
 854        &self,
 855        invite: &Invite,
 856        user: NewUserParams,
 857    ) -> Result<Option<NewUserResult>> {
 858        self.transaction(|tx| async {
 859            let tx = tx;
 860            let signup = signup::Entity::find()
 861                .filter(
 862                    signup::Column::EmailAddress
 863                        .eq(invite.email_address.as_str())
 864                        .and(
 865                            signup::Column::EmailConfirmationCode
 866                                .eq(invite.email_confirmation_code.as_str()),
 867                        ),
 868                )
 869                .one(&*tx)
 870                .await?
 871                .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
 872
 873            if signup.user_id.is_some() {
 874                return Ok(None);
 875            }
 876
 877            let user = user::Entity::insert(user::ActiveModel {
 878                email_address: ActiveValue::set(Some(invite.email_address.clone())),
 879                github_login: ActiveValue::set(user.github_login.clone()),
 880                github_user_id: ActiveValue::set(Some(user.github_user_id)),
 881                admin: ActiveValue::set(false),
 882                invite_count: ActiveValue::set(user.invite_count),
 883                invite_code: ActiveValue::set(Some(random_invite_code())),
 884                metrics_id: ActiveValue::set(Uuid::new_v4()),
 885                ..Default::default()
 886            })
 887            .on_conflict(
 888                OnConflict::column(user::Column::GithubLogin)
 889                    .update_columns([
 890                        user::Column::EmailAddress,
 891                        user::Column::GithubUserId,
 892                        user::Column::Admin,
 893                    ])
 894                    .to_owned(),
 895            )
 896            .exec_with_returning(&*tx)
 897            .await?;
 898
 899            let mut signup = signup.into_active_model();
 900            signup.user_id = ActiveValue::set(Some(user.id));
 901            let signup = signup.update(&*tx).await?;
 902
 903            if let Some(inviting_user_id) = signup.inviting_user_id {
 904                let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
 905                    (inviting_user_id, user.id, true)
 906                } else {
 907                    (user.id, inviting_user_id, false)
 908                };
 909
 910                contact::Entity::insert(contact::ActiveModel {
 911                    user_id_a: ActiveValue::set(user_id_a),
 912                    user_id_b: ActiveValue::set(user_id_b),
 913                    a_to_b: ActiveValue::set(a_to_b),
 914                    should_notify: ActiveValue::set(true),
 915                    accepted: ActiveValue::set(true),
 916                    ..Default::default()
 917                })
 918                .on_conflict(OnConflict::new().do_nothing().to_owned())
 919                .exec_without_returning(&*tx)
 920                .await?;
 921            }
 922
 923            Ok(Some(NewUserResult {
 924                user_id: user.id,
 925                metrics_id: user.metrics_id.to_string(),
 926                inviting_user_id: signup.inviting_user_id,
 927                signup_device_id: signup.device_id,
 928            }))
 929        })
 930        .await
 931    }
 932
 933    pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
 934        self.transaction(|tx| async move {
 935            if count > 0 {
 936                user::Entity::update_many()
 937                    .filter(
 938                        user::Column::Id
 939                            .eq(id)
 940                            .and(user::Column::InviteCode.is_null()),
 941                    )
 942                    .set(user::ActiveModel {
 943                        invite_code: ActiveValue::set(Some(random_invite_code())),
 944                        ..Default::default()
 945                    })
 946                    .exec(&*tx)
 947                    .await?;
 948            }
 949
 950            user::Entity::update_many()
 951                .filter(user::Column::Id.eq(id))
 952                .set(user::ActiveModel {
 953                    invite_count: ActiveValue::set(count),
 954                    ..Default::default()
 955                })
 956                .exec(&*tx)
 957                .await?;
 958            Ok(())
 959        })
 960        .await
 961    }
 962
 963    pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
 964        self.transaction(|tx| async move {
 965            match user::Entity::find_by_id(id).one(&*tx).await? {
 966                Some(user) if user.invite_code.is_some() => {
 967                    Ok(Some((user.invite_code.unwrap(), user.invite_count)))
 968                }
 969                _ => Ok(None),
 970            }
 971        })
 972        .await
 973    }
 974
 975    pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
 976        self.transaction(|tx| async move {
 977            user::Entity::find()
 978                .filter(user::Column::InviteCode.eq(code))
 979                .one(&*tx)
 980                .await?
 981                .ok_or_else(|| {
 982                    Error::Http(
 983                        StatusCode::NOT_FOUND,
 984                        "that invite code does not exist".to_string(),
 985                    )
 986                })
 987        })
 988        .await
 989    }
 990
 991    // rooms
 992
 993    pub async fn incoming_call_for_user(
 994        &self,
 995        user_id: UserId,
 996    ) -> Result<Option<proto::IncomingCall>> {
 997        self.transaction(|tx| async move {
 998            let pending_participant = room_participant::Entity::find()
 999                .filter(
1000                    room_participant::Column::UserId
1001                        .eq(user_id)
1002                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1003                )
1004                .one(&*tx)
1005                .await?;
1006
1007            if let Some(pending_participant) = pending_participant {
1008                let room = self.get_room(pending_participant.room_id, &tx).await?;
1009                Ok(Self::build_incoming_call(&room, user_id))
1010            } else {
1011                Ok(None)
1012            }
1013        })
1014        .await
1015    }
1016
1017    pub async fn create_room(
1018        &self,
1019        user_id: UserId,
1020        connection_id: ConnectionId,
1021        live_kit_room: &str,
1022    ) -> Result<RoomGuard<proto::Room>> {
1023        self.room_transaction(|tx| async move {
1024            let room = room::ActiveModel {
1025                live_kit_room: ActiveValue::set(live_kit_room.into()),
1026                ..Default::default()
1027            }
1028            .insert(&*tx)
1029            .await?;
1030            let room_id = room.id;
1031
1032            room_participant::ActiveModel {
1033                room_id: ActiveValue::set(room_id),
1034                user_id: ActiveValue::set(user_id),
1035                answering_connection_id: ActiveValue::set(Some(connection_id.0 as i32)),
1036                answering_connection_epoch: ActiveValue::set(Some(self.epoch)),
1037                answering_connection_lost: ActiveValue::set(false),
1038                calling_user_id: ActiveValue::set(user_id),
1039                calling_connection_id: ActiveValue::set(connection_id.0 as i32),
1040                calling_connection_epoch: ActiveValue::set(self.epoch),
1041                ..Default::default()
1042            }
1043            .insert(&*tx)
1044            .await?;
1045
1046            let room = self.get_room(room_id, &tx).await?;
1047            Ok((room_id, room))
1048        })
1049        .await
1050    }
1051
1052    pub async fn call(
1053        &self,
1054        room_id: RoomId,
1055        calling_user_id: UserId,
1056        calling_connection_id: ConnectionId,
1057        called_user_id: UserId,
1058        initial_project_id: Option<ProjectId>,
1059    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1060        self.room_transaction(|tx| async move {
1061            room_participant::ActiveModel {
1062                room_id: ActiveValue::set(room_id),
1063                user_id: ActiveValue::set(called_user_id),
1064                answering_connection_lost: ActiveValue::set(false),
1065                calling_user_id: ActiveValue::set(calling_user_id),
1066                calling_connection_id: ActiveValue::set(calling_connection_id.0 as i32),
1067                calling_connection_epoch: ActiveValue::set(self.epoch),
1068                initial_project_id: ActiveValue::set(initial_project_id),
1069                ..Default::default()
1070            }
1071            .insert(&*tx)
1072            .await?;
1073
1074            let room = self.get_room(room_id, &tx).await?;
1075            let incoming_call = Self::build_incoming_call(&room, called_user_id)
1076                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1077            Ok((room_id, (room, incoming_call)))
1078        })
1079        .await
1080    }
1081
1082    pub async fn call_failed(
1083        &self,
1084        room_id: RoomId,
1085        called_user_id: UserId,
1086    ) -> Result<RoomGuard<proto::Room>> {
1087        self.room_transaction(|tx| async move {
1088            room_participant::Entity::delete_many()
1089                .filter(
1090                    room_participant::Column::RoomId
1091                        .eq(room_id)
1092                        .and(room_participant::Column::UserId.eq(called_user_id)),
1093                )
1094                .exec(&*tx)
1095                .await?;
1096            let room = self.get_room(room_id, &tx).await?;
1097            Ok((room_id, room))
1098        })
1099        .await
1100    }
1101
1102    pub async fn decline_call(
1103        &self,
1104        expected_room_id: Option<RoomId>,
1105        user_id: UserId,
1106    ) -> Result<RoomGuard<proto::Room>> {
1107        self.room_transaction(|tx| async move {
1108            let participant = room_participant::Entity::find()
1109                .filter(
1110                    room_participant::Column::UserId
1111                        .eq(user_id)
1112                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1113                )
1114                .one(&*tx)
1115                .await?
1116                .ok_or_else(|| anyhow!("could not decline call"))?;
1117            let room_id = participant.room_id;
1118
1119            if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) {
1120                return Err(anyhow!("declining call on unexpected room"))?;
1121            }
1122
1123            room_participant::Entity::delete(participant.into_active_model())
1124                .exec(&*tx)
1125                .await?;
1126
1127            let room = self.get_room(room_id, &tx).await?;
1128            Ok((room_id, room))
1129        })
1130        .await
1131    }
1132
1133    pub async fn cancel_call(
1134        &self,
1135        expected_room_id: Option<RoomId>,
1136        calling_connection_id: ConnectionId,
1137        called_user_id: UserId,
1138    ) -> Result<RoomGuard<proto::Room>> {
1139        self.room_transaction(|tx| async move {
1140            let participant = room_participant::Entity::find()
1141                .filter(
1142                    room_participant::Column::UserId
1143                        .eq(called_user_id)
1144                        .and(
1145                            room_participant::Column::CallingConnectionId
1146                                .eq(calling_connection_id.0 as i32),
1147                        )
1148                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1149                )
1150                .one(&*tx)
1151                .await?
1152                .ok_or_else(|| anyhow!("could not cancel call"))?;
1153            let room_id = participant.room_id;
1154            if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) {
1155                return Err(anyhow!("canceling call on unexpected room"))?;
1156            }
1157
1158            room_participant::Entity::delete(participant.into_active_model())
1159                .exec(&*tx)
1160                .await?;
1161
1162            let room = self.get_room(room_id, &tx).await?;
1163            Ok((room_id, room))
1164        })
1165        .await
1166    }
1167
1168    pub async fn join_room(
1169        &self,
1170        room_id: RoomId,
1171        user_id: UserId,
1172        connection_id: ConnectionId,
1173    ) -> Result<RoomGuard<proto::Room>> {
1174        self.room_transaction(|tx| async move {
1175            let result = room_participant::Entity::update_many()
1176                .filter(
1177                    room_participant::Column::RoomId
1178                        .eq(room_id)
1179                        .and(room_participant::Column::UserId.eq(user_id))
1180                        .and(
1181                            room_participant::Column::AnsweringConnectionId
1182                                .is_null()
1183                                .or(room_participant::Column::AnsweringConnectionLost.eq(true)),
1184                        ),
1185                )
1186                .set(room_participant::ActiveModel {
1187                    answering_connection_id: ActiveValue::set(Some(connection_id.0 as i32)),
1188                    answering_connection_epoch: ActiveValue::set(Some(self.epoch)),
1189                    answering_connection_lost: ActiveValue::set(false),
1190                    ..Default::default()
1191                })
1192                .exec(&*tx)
1193                .await?;
1194            if result.rows_affected == 0 {
1195                Err(anyhow!("room does not exist or was already joined"))?
1196            } else {
1197                let room = self.get_room(room_id, &tx).await?;
1198                Ok((room_id, room))
1199            }
1200        })
1201        .await
1202    }
1203
1204    pub async fn leave_room(&self, connection_id: ConnectionId) -> Result<RoomGuard<LeftRoom>> {
1205        self.room_transaction(|tx| async move {
1206            let leaving_participant = room_participant::Entity::find()
1207                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32))
1208                .one(&*tx)
1209                .await?;
1210
1211            if let Some(leaving_participant) = leaving_participant {
1212                // Leave room.
1213                let room_id = leaving_participant.room_id;
1214                room_participant::Entity::delete_by_id(leaving_participant.id)
1215                    .exec(&*tx)
1216                    .await?;
1217
1218                // Cancel pending calls initiated by the leaving user.
1219                let called_participants = room_participant::Entity::find()
1220                    .filter(
1221                        room_participant::Column::CallingConnectionId
1222                            .eq(connection_id.0)
1223                            .and(room_participant::Column::AnsweringConnectionId.is_null()),
1224                    )
1225                    .all(&*tx)
1226                    .await?;
1227                room_participant::Entity::delete_many()
1228                    .filter(
1229                        room_participant::Column::Id
1230                            .is_in(called_participants.iter().map(|participant| participant.id)),
1231                    )
1232                    .exec(&*tx)
1233                    .await?;
1234                let canceled_calls_to_user_ids = called_participants
1235                    .into_iter()
1236                    .map(|participant| participant.user_id)
1237                    .collect();
1238
1239                // Detect left projects.
1240                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1241                enum QueryProjectIds {
1242                    ProjectId,
1243                }
1244                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1245                    .select_only()
1246                    .column_as(
1247                        project_collaborator::Column::ProjectId,
1248                        QueryProjectIds::ProjectId,
1249                    )
1250                    .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32))
1251                    .into_values::<_, QueryProjectIds>()
1252                    .all(&*tx)
1253                    .await?;
1254                let mut left_projects = HashMap::default();
1255                let mut collaborators = project_collaborator::Entity::find()
1256                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1257                    .stream(&*tx)
1258                    .await?;
1259                while let Some(collaborator) = collaborators.next().await {
1260                    let collaborator = collaborator?;
1261                    let left_project =
1262                        left_projects
1263                            .entry(collaborator.project_id)
1264                            .or_insert(LeftProject {
1265                                id: collaborator.project_id,
1266                                host_user_id: Default::default(),
1267                                connection_ids: Default::default(),
1268                                host_connection_id: Default::default(),
1269                            });
1270
1271                    let collaborator_connection_id =
1272                        ConnectionId(collaborator.connection_id as u32);
1273                    if collaborator_connection_id != connection_id {
1274                        left_project.connection_ids.push(collaborator_connection_id);
1275                    }
1276
1277                    if collaborator.is_host {
1278                        left_project.host_user_id = collaborator.user_id;
1279                        left_project.host_connection_id =
1280                            ConnectionId(collaborator.connection_id as u32);
1281                    }
1282                }
1283                drop(collaborators);
1284
1285                // Leave projects.
1286                project_collaborator::Entity::delete_many()
1287                    .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32))
1288                    .exec(&*tx)
1289                    .await?;
1290
1291                // Unshare projects.
1292                project::Entity::delete_many()
1293                    .filter(
1294                        project::Column::RoomId
1295                            .eq(room_id)
1296                            .and(project::Column::HostConnectionId.eq(connection_id.0 as i32)),
1297                    )
1298                    .exec(&*tx)
1299                    .await?;
1300
1301                let room = self.get_room(room_id, &tx).await?;
1302                if room.participants.is_empty() {
1303                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
1304                }
1305
1306                let left_room = LeftRoom {
1307                    room,
1308                    left_projects,
1309                    canceled_calls_to_user_ids,
1310                };
1311
1312                if left_room.room.participants.is_empty() {
1313                    self.rooms.remove(&room_id);
1314                }
1315
1316                Ok((room_id, left_room))
1317            } else {
1318                Err(anyhow!("could not leave room"))?
1319            }
1320        })
1321        .await
1322    }
1323
1324    pub async fn update_room_participant_location(
1325        &self,
1326        room_id: RoomId,
1327        connection_id: ConnectionId,
1328        location: proto::ParticipantLocation,
1329    ) -> Result<RoomGuard<proto::Room>> {
1330        self.room_transaction(|tx| async {
1331            let tx = tx;
1332            let location_kind;
1333            let location_project_id;
1334            match location
1335                .variant
1336                .as_ref()
1337                .ok_or_else(|| anyhow!("invalid location"))?
1338            {
1339                proto::participant_location::Variant::SharedProject(project) => {
1340                    location_kind = 0;
1341                    location_project_id = Some(ProjectId::from_proto(project.id));
1342                }
1343                proto::participant_location::Variant::UnsharedProject(_) => {
1344                    location_kind = 1;
1345                    location_project_id = None;
1346                }
1347                proto::participant_location::Variant::External(_) => {
1348                    location_kind = 2;
1349                    location_project_id = None;
1350                }
1351            }
1352
1353            let result = room_participant::Entity::update_many()
1354                .filter(room_participant::Column::RoomId.eq(room_id).and(
1355                    room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32),
1356                ))
1357                .set(room_participant::ActiveModel {
1358                    location_kind: ActiveValue::set(Some(location_kind)),
1359                    location_project_id: ActiveValue::set(location_project_id),
1360                    ..Default::default()
1361                })
1362                .exec(&*tx)
1363                .await?;
1364
1365            if result.rows_affected == 1 {
1366                let room = self.get_room(room_id, &tx).await?;
1367                Ok((room_id, room))
1368            } else {
1369                Err(anyhow!("could not update room participant location"))?
1370            }
1371        })
1372        .await
1373    }
1374
1375    pub async fn connection_lost(
1376        &self,
1377        connection_id: ConnectionId,
1378    ) -> Result<RoomGuard<Vec<LeftProject>>> {
1379        self.room_transaction(|tx| async move {
1380            let participant = room_participant::Entity::find()
1381                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32))
1382                .one(&*tx)
1383                .await?
1384                .ok_or_else(|| anyhow!("not a participant in any room"))?;
1385            let room_id = participant.room_id;
1386
1387            room_participant::Entity::update(room_participant::ActiveModel {
1388                answering_connection_lost: ActiveValue::set(true),
1389                ..participant.into_active_model()
1390            })
1391            .exec(&*tx)
1392            .await?;
1393
1394            let collaborator_on_projects = project_collaborator::Entity::find()
1395                .find_also_related(project::Entity)
1396                .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32))
1397                .all(&*tx)
1398                .await?;
1399            project_collaborator::Entity::delete_many()
1400                .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32))
1401                .exec(&*tx)
1402                .await?;
1403
1404            let mut left_projects = Vec::new();
1405            for (_, project) in collaborator_on_projects {
1406                if let Some(project) = project {
1407                    let collaborators = project
1408                        .find_related(project_collaborator::Entity)
1409                        .all(&*tx)
1410                        .await?;
1411                    let connection_ids = collaborators
1412                        .into_iter()
1413                        .map(|collaborator| ConnectionId(collaborator.connection_id as u32))
1414                        .collect();
1415
1416                    left_projects.push(LeftProject {
1417                        id: project.id,
1418                        host_user_id: project.host_user_id,
1419                        host_connection_id: ConnectionId(project.host_connection_id as u32),
1420                        connection_ids,
1421                    });
1422                }
1423            }
1424
1425            project::Entity::delete_many()
1426                .filter(project::Column::HostConnectionId.eq(connection_id.0 as i32))
1427                .exec(&*tx)
1428                .await?;
1429
1430            Ok((room_id, left_projects))
1431        })
1432        .await
1433    }
1434
1435    fn build_incoming_call(
1436        room: &proto::Room,
1437        called_user_id: UserId,
1438    ) -> Option<proto::IncomingCall> {
1439        let pending_participant = room
1440            .pending_participants
1441            .iter()
1442            .find(|participant| participant.user_id == called_user_id.to_proto())?;
1443
1444        Some(proto::IncomingCall {
1445            room_id: room.id,
1446            calling_user_id: pending_participant.calling_user_id,
1447            participant_user_ids: room
1448                .participants
1449                .iter()
1450                .map(|participant| participant.user_id)
1451                .collect(),
1452            initial_project: room.participants.iter().find_map(|participant| {
1453                let initial_project_id = pending_participant.initial_project_id?;
1454                participant
1455                    .projects
1456                    .iter()
1457                    .find(|project| project.id == initial_project_id)
1458                    .cloned()
1459            }),
1460        })
1461    }
1462
1463    async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1464        let db_room = room::Entity::find_by_id(room_id)
1465            .one(tx)
1466            .await?
1467            .ok_or_else(|| anyhow!("could not find room"))?;
1468
1469        let mut db_participants = db_room
1470            .find_related(room_participant::Entity)
1471            .stream(tx)
1472            .await?;
1473        let mut participants = HashMap::default();
1474        let mut pending_participants = Vec::new();
1475        while let Some(db_participant) = db_participants.next().await {
1476            let db_participant = db_participant?;
1477            if let Some(answering_connection_id) = db_participant.answering_connection_id {
1478                let location = match (
1479                    db_participant.location_kind,
1480                    db_participant.location_project_id,
1481                ) {
1482                    (Some(0), Some(project_id)) => {
1483                        Some(proto::participant_location::Variant::SharedProject(
1484                            proto::participant_location::SharedProject {
1485                                id: project_id.to_proto(),
1486                            },
1487                        ))
1488                    }
1489                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1490                        Default::default(),
1491                    )),
1492                    _ => Some(proto::participant_location::Variant::External(
1493                        Default::default(),
1494                    )),
1495                };
1496                participants.insert(
1497                    answering_connection_id,
1498                    proto::Participant {
1499                        user_id: db_participant.user_id.to_proto(),
1500                        peer_id: answering_connection_id as u32,
1501                        projects: Default::default(),
1502                        location: Some(proto::ParticipantLocation { variant: location }),
1503                    },
1504                );
1505            } else {
1506                pending_participants.push(proto::PendingParticipant {
1507                    user_id: db_participant.user_id.to_proto(),
1508                    calling_user_id: db_participant.calling_user_id.to_proto(),
1509                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1510                });
1511            }
1512        }
1513        drop(db_participants);
1514
1515        let mut db_projects = db_room
1516            .find_related(project::Entity)
1517            .find_with_related(worktree::Entity)
1518            .stream(tx)
1519            .await?;
1520
1521        while let Some(row) = db_projects.next().await {
1522            let (db_project, db_worktree) = row?;
1523            if let Some(participant) = participants.get_mut(&db_project.host_connection_id) {
1524                let project = if let Some(project) = participant
1525                    .projects
1526                    .iter_mut()
1527                    .find(|project| project.id == db_project.id.to_proto())
1528                {
1529                    project
1530                } else {
1531                    participant.projects.push(proto::ParticipantProject {
1532                        id: db_project.id.to_proto(),
1533                        worktree_root_names: Default::default(),
1534                    });
1535                    participant.projects.last_mut().unwrap()
1536                };
1537
1538                if let Some(db_worktree) = db_worktree {
1539                    project.worktree_root_names.push(db_worktree.root_name);
1540                }
1541            }
1542        }
1543
1544        Ok(proto::Room {
1545            id: db_room.id.to_proto(),
1546            live_kit_room: db_room.live_kit_room,
1547            participants: participants.into_values().collect(),
1548            pending_participants,
1549        })
1550    }
1551
1552    // projects
1553
1554    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
1555        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1556        enum QueryAs {
1557            Count,
1558        }
1559
1560        self.transaction(|tx| async move {
1561            Ok(project::Entity::find()
1562                .select_only()
1563                .column_as(project::Column::Id.count(), QueryAs::Count)
1564                .inner_join(user::Entity)
1565                .filter(user::Column::Admin.eq(false))
1566                .into_values::<_, QueryAs>()
1567                .one(&*tx)
1568                .await?
1569                .unwrap_or(0i64) as usize)
1570        })
1571        .await
1572    }
1573
1574    pub async fn share_project(
1575        &self,
1576        room_id: RoomId,
1577        connection_id: ConnectionId,
1578        worktrees: &[proto::WorktreeMetadata],
1579    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
1580        self.room_transaction(|tx| async move {
1581            let participant = room_participant::Entity::find()
1582                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32))
1583                .one(&*tx)
1584                .await?
1585                .ok_or_else(|| anyhow!("could not find participant"))?;
1586            if participant.room_id != room_id {
1587                return Err(anyhow!("shared project on unexpected room"))?;
1588            }
1589
1590            let project = project::ActiveModel {
1591                room_id: ActiveValue::set(participant.room_id),
1592                host_user_id: ActiveValue::set(participant.user_id),
1593                host_connection_id: ActiveValue::set(connection_id.0 as i32),
1594                host_connection_epoch: ActiveValue::set(self.epoch),
1595                ..Default::default()
1596            }
1597            .insert(&*tx)
1598            .await?;
1599
1600            if !worktrees.is_empty() {
1601                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
1602                    worktree::ActiveModel {
1603                        id: ActiveValue::set(worktree.id as i64),
1604                        project_id: ActiveValue::set(project.id),
1605                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
1606                        root_name: ActiveValue::set(worktree.root_name.clone()),
1607                        visible: ActiveValue::set(worktree.visible),
1608                        scan_id: ActiveValue::set(0),
1609                        is_complete: ActiveValue::set(false),
1610                    }
1611                }))
1612                .exec(&*tx)
1613                .await?;
1614            }
1615
1616            project_collaborator::ActiveModel {
1617                project_id: ActiveValue::set(project.id),
1618                connection_id: ActiveValue::set(connection_id.0 as i32),
1619                connection_epoch: ActiveValue::set(self.epoch),
1620                user_id: ActiveValue::set(participant.user_id),
1621                replica_id: ActiveValue::set(ReplicaId(0)),
1622                is_host: ActiveValue::set(true),
1623                ..Default::default()
1624            }
1625            .insert(&*tx)
1626            .await?;
1627
1628            let room = self.get_room(room_id, &tx).await?;
1629            Ok((room_id, (project.id, room)))
1630        })
1631        .await
1632    }
1633
1634    pub async fn unshare_project(
1635        &self,
1636        project_id: ProjectId,
1637        connection_id: ConnectionId,
1638    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
1639        self.room_transaction(|tx| async move {
1640            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
1641
1642            let project = project::Entity::find_by_id(project_id)
1643                .one(&*tx)
1644                .await?
1645                .ok_or_else(|| anyhow!("project not found"))?;
1646            if project.host_connection_id == connection_id.0 as i32 {
1647                let room_id = project.room_id;
1648                project::Entity::delete(project.into_active_model())
1649                    .exec(&*tx)
1650                    .await?;
1651                let room = self.get_room(room_id, &tx).await?;
1652                Ok((room_id, (room, guest_connection_ids)))
1653            } else {
1654                Err(anyhow!("cannot unshare a project hosted by another user"))?
1655            }
1656        })
1657        .await
1658    }
1659
1660    pub async fn update_project(
1661        &self,
1662        project_id: ProjectId,
1663        connection_id: ConnectionId,
1664        worktrees: &[proto::WorktreeMetadata],
1665    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
1666        self.room_transaction(|tx| async move {
1667            let project = project::Entity::find_by_id(project_id)
1668                .filter(project::Column::HostConnectionId.eq(connection_id.0 as i32))
1669                .one(&*tx)
1670                .await?
1671                .ok_or_else(|| anyhow!("no such project"))?;
1672
1673            if !worktrees.is_empty() {
1674                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
1675                    worktree::ActiveModel {
1676                        id: ActiveValue::set(worktree.id as i64),
1677                        project_id: ActiveValue::set(project.id),
1678                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
1679                        root_name: ActiveValue::set(worktree.root_name.clone()),
1680                        visible: ActiveValue::set(worktree.visible),
1681                        scan_id: ActiveValue::set(0),
1682                        is_complete: ActiveValue::set(false),
1683                    }
1684                }))
1685                .on_conflict(
1686                    OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
1687                        .update_column(worktree::Column::RootName)
1688                        .to_owned(),
1689                )
1690                .exec(&*tx)
1691                .await?;
1692            }
1693
1694            worktree::Entity::delete_many()
1695                .filter(
1696                    worktree::Column::ProjectId.eq(project.id).and(
1697                        worktree::Column::Id
1698                            .is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
1699                    ),
1700                )
1701                .exec(&*tx)
1702                .await?;
1703
1704            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
1705            let room = self.get_room(project.room_id, &tx).await?;
1706            Ok((project.room_id, (room, guest_connection_ids)))
1707        })
1708        .await
1709    }
1710
1711    pub async fn update_worktree(
1712        &self,
1713        update: &proto::UpdateWorktree,
1714        connection_id: ConnectionId,
1715    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
1716        self.room_transaction(|tx| async move {
1717            let project_id = ProjectId::from_proto(update.project_id);
1718            let worktree_id = update.worktree_id as i64;
1719
1720            // Ensure the update comes from the host.
1721            let project = project::Entity::find_by_id(project_id)
1722                .filter(project::Column::HostConnectionId.eq(connection_id.0 as i32))
1723                .one(&*tx)
1724                .await?
1725                .ok_or_else(|| anyhow!("no such project"))?;
1726            let room_id = project.room_id;
1727
1728            // Update metadata.
1729            worktree::Entity::update(worktree::ActiveModel {
1730                id: ActiveValue::set(worktree_id),
1731                project_id: ActiveValue::set(project_id),
1732                root_name: ActiveValue::set(update.root_name.clone()),
1733                scan_id: ActiveValue::set(update.scan_id as i64),
1734                is_complete: ActiveValue::set(update.is_last_update),
1735                abs_path: ActiveValue::set(update.abs_path.clone()),
1736                ..Default::default()
1737            })
1738            .exec(&*tx)
1739            .await?;
1740
1741            if !update.updated_entries.is_empty() {
1742                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
1743                    let mtime = entry.mtime.clone().unwrap_or_default();
1744                    worktree_entry::ActiveModel {
1745                        project_id: ActiveValue::set(project_id),
1746                        worktree_id: ActiveValue::set(worktree_id),
1747                        id: ActiveValue::set(entry.id as i64),
1748                        is_dir: ActiveValue::set(entry.is_dir),
1749                        path: ActiveValue::set(entry.path.clone()),
1750                        inode: ActiveValue::set(entry.inode as i64),
1751                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
1752                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
1753                        is_symlink: ActiveValue::set(entry.is_symlink),
1754                        is_ignored: ActiveValue::set(entry.is_ignored),
1755                    }
1756                }))
1757                .on_conflict(
1758                    OnConflict::columns([
1759                        worktree_entry::Column::ProjectId,
1760                        worktree_entry::Column::WorktreeId,
1761                        worktree_entry::Column::Id,
1762                    ])
1763                    .update_columns([
1764                        worktree_entry::Column::IsDir,
1765                        worktree_entry::Column::Path,
1766                        worktree_entry::Column::Inode,
1767                        worktree_entry::Column::MtimeSeconds,
1768                        worktree_entry::Column::MtimeNanos,
1769                        worktree_entry::Column::IsSymlink,
1770                        worktree_entry::Column::IsIgnored,
1771                    ])
1772                    .to_owned(),
1773                )
1774                .exec(&*tx)
1775                .await?;
1776            }
1777
1778            if !update.removed_entries.is_empty() {
1779                worktree_entry::Entity::delete_many()
1780                    .filter(
1781                        worktree_entry::Column::ProjectId
1782                            .eq(project_id)
1783                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
1784                            .and(
1785                                worktree_entry::Column::Id
1786                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
1787                            ),
1788                    )
1789                    .exec(&*tx)
1790                    .await?;
1791            }
1792
1793            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
1794            Ok((room_id, connection_ids))
1795        })
1796        .await
1797    }
1798
1799    pub async fn update_diagnostic_summary(
1800        &self,
1801        update: &proto::UpdateDiagnosticSummary,
1802        connection_id: ConnectionId,
1803    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
1804        self.room_transaction(|tx| async move {
1805            let project_id = ProjectId::from_proto(update.project_id);
1806            let worktree_id = update.worktree_id as i64;
1807            let summary = update
1808                .summary
1809                .as_ref()
1810                .ok_or_else(|| anyhow!("invalid summary"))?;
1811
1812            // Ensure the update comes from the host.
1813            let project = project::Entity::find_by_id(project_id)
1814                .one(&*tx)
1815                .await?
1816                .ok_or_else(|| anyhow!("no such project"))?;
1817            if project.host_connection_id != connection_id.0 as i32 {
1818                return Err(anyhow!("can't update a project hosted by someone else"))?;
1819            }
1820
1821            // Update summary.
1822            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
1823                project_id: ActiveValue::set(project_id),
1824                worktree_id: ActiveValue::set(worktree_id),
1825                path: ActiveValue::set(summary.path.clone()),
1826                language_server_id: ActiveValue::set(summary.language_server_id as i64),
1827                error_count: ActiveValue::set(summary.error_count as i32),
1828                warning_count: ActiveValue::set(summary.warning_count as i32),
1829                ..Default::default()
1830            })
1831            .on_conflict(
1832                OnConflict::columns([
1833                    worktree_diagnostic_summary::Column::ProjectId,
1834                    worktree_diagnostic_summary::Column::WorktreeId,
1835                    worktree_diagnostic_summary::Column::Path,
1836                ])
1837                .update_columns([
1838                    worktree_diagnostic_summary::Column::LanguageServerId,
1839                    worktree_diagnostic_summary::Column::ErrorCount,
1840                    worktree_diagnostic_summary::Column::WarningCount,
1841                ])
1842                .to_owned(),
1843            )
1844            .exec(&*tx)
1845            .await?;
1846
1847            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
1848            Ok((project.room_id, connection_ids))
1849        })
1850        .await
1851    }
1852
1853    pub async fn start_language_server(
1854        &self,
1855        update: &proto::StartLanguageServer,
1856        connection_id: ConnectionId,
1857    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
1858        self.room_transaction(|tx| async move {
1859            let project_id = ProjectId::from_proto(update.project_id);
1860            let server = update
1861                .server
1862                .as_ref()
1863                .ok_or_else(|| anyhow!("invalid language server"))?;
1864
1865            // Ensure the update comes from the host.
1866            let project = project::Entity::find_by_id(project_id)
1867                .one(&*tx)
1868                .await?
1869                .ok_or_else(|| anyhow!("no such project"))?;
1870            if project.host_connection_id != connection_id.0 as i32 {
1871                return Err(anyhow!("can't update a project hosted by someone else"))?;
1872            }
1873
1874            // Add the newly-started language server.
1875            language_server::Entity::insert(language_server::ActiveModel {
1876                project_id: ActiveValue::set(project_id),
1877                id: ActiveValue::set(server.id as i64),
1878                name: ActiveValue::set(server.name.clone()),
1879                ..Default::default()
1880            })
1881            .on_conflict(
1882                OnConflict::columns([
1883                    language_server::Column::ProjectId,
1884                    language_server::Column::Id,
1885                ])
1886                .update_column(language_server::Column::Name)
1887                .to_owned(),
1888            )
1889            .exec(&*tx)
1890            .await?;
1891
1892            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
1893            Ok((project.room_id, connection_ids))
1894        })
1895        .await
1896    }
1897
1898    pub async fn join_project(
1899        &self,
1900        project_id: ProjectId,
1901        connection_id: ConnectionId,
1902    ) -> Result<RoomGuard<(Project, ReplicaId)>> {
1903        self.room_transaction(|tx| async move {
1904            let participant = room_participant::Entity::find()
1905                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32))
1906                .one(&*tx)
1907                .await?
1908                .ok_or_else(|| anyhow!("must join a room first"))?;
1909
1910            let project = project::Entity::find_by_id(project_id)
1911                .one(&*tx)
1912                .await?
1913                .ok_or_else(|| anyhow!("no such project"))?;
1914            if project.room_id != participant.room_id {
1915                return Err(anyhow!("no such project"))?;
1916            }
1917
1918            let mut collaborators = project
1919                .find_related(project_collaborator::Entity)
1920                .all(&*tx)
1921                .await?;
1922            let replica_ids = collaborators
1923                .iter()
1924                .map(|c| c.replica_id)
1925                .collect::<HashSet<_>>();
1926            let mut replica_id = ReplicaId(1);
1927            while replica_ids.contains(&replica_id) {
1928                replica_id.0 += 1;
1929            }
1930            let new_collaborator = project_collaborator::ActiveModel {
1931                project_id: ActiveValue::set(project_id),
1932                connection_id: ActiveValue::set(connection_id.0 as i32),
1933                connection_epoch: ActiveValue::set(self.epoch),
1934                user_id: ActiveValue::set(participant.user_id),
1935                replica_id: ActiveValue::set(replica_id),
1936                is_host: ActiveValue::set(false),
1937                ..Default::default()
1938            }
1939            .insert(&*tx)
1940            .await?;
1941            collaborators.push(new_collaborator);
1942
1943            let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1944            let mut worktrees = db_worktrees
1945                .into_iter()
1946                .map(|db_worktree| {
1947                    (
1948                        db_worktree.id as u64,
1949                        Worktree {
1950                            id: db_worktree.id as u64,
1951                            abs_path: db_worktree.abs_path,
1952                            root_name: db_worktree.root_name,
1953                            visible: db_worktree.visible,
1954                            entries: Default::default(),
1955                            diagnostic_summaries: Default::default(),
1956                            scan_id: db_worktree.scan_id as u64,
1957                            is_complete: db_worktree.is_complete,
1958                        },
1959                    )
1960                })
1961                .collect::<BTreeMap<_, _>>();
1962
1963            // Populate worktree entries.
1964            {
1965                let mut db_entries = worktree_entry::Entity::find()
1966                    .filter(worktree_entry::Column::ProjectId.eq(project_id))
1967                    .stream(&*tx)
1968                    .await?;
1969                while let Some(db_entry) = db_entries.next().await {
1970                    let db_entry = db_entry?;
1971                    if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
1972                        worktree.entries.push(proto::Entry {
1973                            id: db_entry.id as u64,
1974                            is_dir: db_entry.is_dir,
1975                            path: db_entry.path,
1976                            inode: db_entry.inode as u64,
1977                            mtime: Some(proto::Timestamp {
1978                                seconds: db_entry.mtime_seconds as u64,
1979                                nanos: db_entry.mtime_nanos as u32,
1980                            }),
1981                            is_symlink: db_entry.is_symlink,
1982                            is_ignored: db_entry.is_ignored,
1983                        });
1984                    }
1985                }
1986            }
1987
1988            // Populate worktree diagnostic summaries.
1989            {
1990                let mut db_summaries = worktree_diagnostic_summary::Entity::find()
1991                    .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
1992                    .stream(&*tx)
1993                    .await?;
1994                while let Some(db_summary) = db_summaries.next().await {
1995                    let db_summary = db_summary?;
1996                    if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
1997                        worktree
1998                            .diagnostic_summaries
1999                            .push(proto::DiagnosticSummary {
2000                                path: db_summary.path,
2001                                language_server_id: db_summary.language_server_id as u64,
2002                                error_count: db_summary.error_count as u32,
2003                                warning_count: db_summary.warning_count as u32,
2004                            });
2005                    }
2006                }
2007            }
2008
2009            // Populate language servers.
2010            let language_servers = project
2011                .find_related(language_server::Entity)
2012                .all(&*tx)
2013                .await?;
2014
2015            let room_id = project.room_id;
2016            let project = Project {
2017                collaborators,
2018                worktrees,
2019                language_servers: language_servers
2020                    .into_iter()
2021                    .map(|language_server| proto::LanguageServer {
2022                        id: language_server.id as u64,
2023                        name: language_server.name,
2024                    })
2025                    .collect(),
2026            };
2027            Ok((room_id, (project, replica_id as ReplicaId)))
2028        })
2029        .await
2030    }
2031
2032    pub async fn leave_project(
2033        &self,
2034        project_id: ProjectId,
2035        connection_id: ConnectionId,
2036    ) -> Result<RoomGuard<LeftProject>> {
2037        self.room_transaction(|tx| async move {
2038            let result = project_collaborator::Entity::delete_many()
2039                .filter(
2040                    project_collaborator::Column::ProjectId
2041                        .eq(project_id)
2042                        .and(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32)),
2043                )
2044                .exec(&*tx)
2045                .await?;
2046            if result.rows_affected == 0 {
2047                Err(anyhow!("not a collaborator on this project"))?;
2048            }
2049
2050            let project = project::Entity::find_by_id(project_id)
2051                .one(&*tx)
2052                .await?
2053                .ok_or_else(|| anyhow!("no such project"))?;
2054            let collaborators = project
2055                .find_related(project_collaborator::Entity)
2056                .all(&*tx)
2057                .await?;
2058            let connection_ids = collaborators
2059                .into_iter()
2060                .map(|collaborator| ConnectionId(collaborator.connection_id as u32))
2061                .collect();
2062
2063            let left_project = LeftProject {
2064                id: project_id,
2065                host_user_id: project.host_user_id,
2066                host_connection_id: ConnectionId(project.host_connection_id as u32),
2067                connection_ids,
2068            };
2069            Ok((project.room_id, left_project))
2070        })
2071        .await
2072    }
2073
2074    pub async fn project_collaborators(
2075        &self,
2076        project_id: ProjectId,
2077        connection_id: ConnectionId,
2078    ) -> Result<RoomGuard<Vec<project_collaborator::Model>>> {
2079        self.room_transaction(|tx| async move {
2080            let project = project::Entity::find_by_id(project_id)
2081                .one(&*tx)
2082                .await?
2083                .ok_or_else(|| anyhow!("no such project"))?;
2084            let collaborators = project_collaborator::Entity::find()
2085                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2086                .all(&*tx)
2087                .await?;
2088
2089            if collaborators
2090                .iter()
2091                .any(|collaborator| collaborator.connection_id == connection_id.0 as i32)
2092            {
2093                Ok((project.room_id, collaborators))
2094            } else {
2095                Err(anyhow!("no such project"))?
2096            }
2097        })
2098        .await
2099    }
2100
2101    pub async fn project_connection_ids(
2102        &self,
2103        project_id: ProjectId,
2104        connection_id: ConnectionId,
2105    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
2106        self.room_transaction(|tx| async move {
2107            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2108            enum QueryAs {
2109                ConnectionId,
2110            }
2111
2112            let project = project::Entity::find_by_id(project_id)
2113                .one(&*tx)
2114                .await?
2115                .ok_or_else(|| anyhow!("no such project"))?;
2116            let mut db_connection_ids = project_collaborator::Entity::find()
2117                .select_only()
2118                .column_as(
2119                    project_collaborator::Column::ConnectionId,
2120                    QueryAs::ConnectionId,
2121                )
2122                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2123                .into_values::<i32, QueryAs>()
2124                .stream(&*tx)
2125                .await?;
2126
2127            let mut connection_ids = HashSet::default();
2128            while let Some(connection_id) = db_connection_ids.next().await {
2129                connection_ids.insert(ConnectionId(connection_id? as u32));
2130            }
2131
2132            if connection_ids.contains(&connection_id) {
2133                Ok((project.room_id, connection_ids))
2134            } else {
2135                Err(anyhow!("no such project"))?
2136            }
2137        })
2138        .await
2139    }
2140
2141    async fn project_guest_connection_ids(
2142        &self,
2143        project_id: ProjectId,
2144        tx: &DatabaseTransaction,
2145    ) -> Result<Vec<ConnectionId>> {
2146        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2147        enum QueryAs {
2148            ConnectionId,
2149        }
2150
2151        let mut db_guest_connection_ids = project_collaborator::Entity::find()
2152            .select_only()
2153            .column_as(
2154                project_collaborator::Column::ConnectionId,
2155                QueryAs::ConnectionId,
2156            )
2157            .filter(
2158                project_collaborator::Column::ProjectId
2159                    .eq(project_id)
2160                    .and(project_collaborator::Column::IsHost.eq(false)),
2161            )
2162            .into_values::<i32, QueryAs>()
2163            .stream(tx)
2164            .await?;
2165
2166        let mut guest_connection_ids = Vec::new();
2167        while let Some(connection_id) = db_guest_connection_ids.next().await {
2168            guest_connection_ids.push(ConnectionId(connection_id? as u32));
2169        }
2170        Ok(guest_connection_ids)
2171    }
2172
2173    // access tokens
2174
2175    pub async fn create_access_token_hash(
2176        &self,
2177        user_id: UserId,
2178        access_token_hash: &str,
2179        max_access_token_count: usize,
2180    ) -> Result<()> {
2181        self.transaction(|tx| async {
2182            let tx = tx;
2183
2184            access_token::ActiveModel {
2185                user_id: ActiveValue::set(user_id),
2186                hash: ActiveValue::set(access_token_hash.into()),
2187                ..Default::default()
2188            }
2189            .insert(&*tx)
2190            .await?;
2191
2192            access_token::Entity::delete_many()
2193                .filter(
2194                    access_token::Column::Id.in_subquery(
2195                        Query::select()
2196                            .column(access_token::Column::Id)
2197                            .from(access_token::Entity)
2198                            .and_where(access_token::Column::UserId.eq(user_id))
2199                            .order_by(access_token::Column::Id, sea_orm::Order::Desc)
2200                            .limit(10000)
2201                            .offset(max_access_token_count as u64)
2202                            .to_owned(),
2203                    ),
2204                )
2205                .exec(&*tx)
2206                .await?;
2207            Ok(())
2208        })
2209        .await
2210    }
2211
2212    pub async fn get_access_token_hashes(&self, user_id: UserId) -> Result<Vec<String>> {
2213        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2214        enum QueryAs {
2215            Hash,
2216        }
2217
2218        self.transaction(|tx| async move {
2219            Ok(access_token::Entity::find()
2220                .select_only()
2221                .column(access_token::Column::Hash)
2222                .filter(access_token::Column::UserId.eq(user_id))
2223                .order_by_desc(access_token::Column::Id)
2224                .into_values::<_, QueryAs>()
2225                .all(&*tx)
2226                .await?)
2227        })
2228        .await
2229    }
2230
2231    async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
2232    where
2233        F: Send + Fn(TransactionHandle) -> Fut,
2234        Fut: Send + Future<Output = Result<T>>,
2235    {
2236        let body = async {
2237            loop {
2238                let (tx, result) = self.with_transaction(&f).await?;
2239                match result {
2240                    Ok(result) => {
2241                        match tx.commit().await.map_err(Into::into) {
2242                            Ok(()) => return Ok(result),
2243                            Err(error) => {
2244                                if is_serialization_error(&error) {
2245                                    // Retry (don't break the loop)
2246                                } else {
2247                                    return Err(error);
2248                                }
2249                            }
2250                        }
2251                    }
2252                    Err(error) => {
2253                        tx.rollback().await?;
2254                        if is_serialization_error(&error) {
2255                            // Retry (don't break the loop)
2256                        } else {
2257                            return Err(error);
2258                        }
2259                    }
2260                }
2261            }
2262        };
2263
2264        self.run(body).await
2265    }
2266
2267    async fn room_transaction<F, Fut, T>(&self, f: F) -> Result<RoomGuard<T>>
2268    where
2269        F: Send + Fn(TransactionHandle) -> Fut,
2270        Fut: Send + Future<Output = Result<(RoomId, T)>>,
2271    {
2272        let body = async {
2273            loop {
2274                let (tx, result) = self.with_transaction(&f).await?;
2275                match result {
2276                    Ok((room_id, data)) => {
2277                        let lock = self.rooms.entry(room_id).or_default().clone();
2278                        let _guard = lock.lock_owned().await;
2279                        match tx.commit().await.map_err(Into::into) {
2280                            Ok(()) => {
2281                                return Ok(RoomGuard {
2282                                    data,
2283                                    _guard,
2284                                    _not_send: PhantomData,
2285                                });
2286                            }
2287                            Err(error) => {
2288                                if is_serialization_error(&error) {
2289                                    // Retry (don't break the loop)
2290                                } else {
2291                                    return Err(error);
2292                                }
2293                            }
2294                        }
2295                    }
2296                    Err(error) => {
2297                        tx.rollback().await?;
2298                        if is_serialization_error(&error) {
2299                            // Retry (don't break the loop)
2300                        } else {
2301                            return Err(error);
2302                        }
2303                    }
2304                }
2305            }
2306        };
2307
2308        self.run(body).await
2309    }
2310
2311    async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
2312    where
2313        F: Send + Fn(TransactionHandle) -> Fut,
2314        Fut: Send + Future<Output = Result<T>>,
2315    {
2316        let tx = self
2317            .pool
2318            .begin_with_config(Some(IsolationLevel::Serializable), None)
2319            .await?;
2320
2321        let mut tx = Arc::new(Some(tx));
2322        let result = f(TransactionHandle(tx.clone())).await;
2323        let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
2324            return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
2325        };
2326
2327        Ok((tx, result))
2328    }
2329
2330    async fn run<F, T>(&self, future: F) -> T
2331    where
2332        F: Future<Output = T>,
2333    {
2334        #[cfg(test)]
2335        {
2336            if let Some(background) = self.background.as_ref() {
2337                background.simulate_random_delay().await;
2338            }
2339
2340            self.runtime.as_ref().unwrap().block_on(future)
2341        }
2342
2343        #[cfg(not(test))]
2344        {
2345            future.await
2346        }
2347    }
2348}
2349
2350fn is_serialization_error(error: &Error) -> bool {
2351    const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
2352    match error {
2353        Error::Database(
2354            DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
2355            | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
2356        ) if error
2357            .as_database_error()
2358            .and_then(|error| error.code())
2359            .as_deref()
2360            == Some(SERIALIZATION_FAILURE_CODE) =>
2361        {
2362            true
2363        }
2364        _ => false,
2365    }
2366}
2367
2368struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
2369
2370impl Deref for TransactionHandle {
2371    type Target = DatabaseTransaction;
2372
2373    fn deref(&self) -> &Self::Target {
2374        self.0.as_ref().as_ref().unwrap()
2375    }
2376}
2377
2378pub struct RoomGuard<T> {
2379    data: T,
2380    _guard: OwnedMutexGuard<()>,
2381    _not_send: PhantomData<Rc<()>>,
2382}
2383
2384impl<T> Deref for RoomGuard<T> {
2385    type Target = T;
2386
2387    fn deref(&self) -> &T {
2388        &self.data
2389    }
2390}
2391
2392impl<T> DerefMut for RoomGuard<T> {
2393    fn deref_mut(&mut self) -> &mut T {
2394        &mut self.data
2395    }
2396}
2397
2398#[derive(Debug, Serialize, Deserialize)]
2399pub struct NewUserParams {
2400    pub github_login: String,
2401    pub github_user_id: i32,
2402    pub invite_count: i32,
2403}
2404
2405#[derive(Debug)]
2406pub struct NewUserResult {
2407    pub user_id: UserId,
2408    pub metrics_id: String,
2409    pub inviting_user_id: Option<UserId>,
2410    pub signup_device_id: Option<String>,
2411}
2412
2413fn random_invite_code() -> String {
2414    nanoid::nanoid!(16)
2415}
2416
2417fn random_email_confirmation_code() -> String {
2418    nanoid::nanoid!(64)
2419}
2420
2421macro_rules! id_type {
2422    ($name:ident) => {
2423        #[derive(
2424            Clone,
2425            Copy,
2426            Debug,
2427            Default,
2428            PartialEq,
2429            Eq,
2430            PartialOrd,
2431            Ord,
2432            Hash,
2433            Serialize,
2434            Deserialize,
2435        )]
2436        #[serde(transparent)]
2437        pub struct $name(pub i32);
2438
2439        impl $name {
2440            #[allow(unused)]
2441            pub const MAX: Self = Self(i32::MAX);
2442
2443            #[allow(unused)]
2444            pub fn from_proto(value: u64) -> Self {
2445                Self(value as i32)
2446            }
2447
2448            #[allow(unused)]
2449            pub fn to_proto(self) -> u64 {
2450                self.0 as u64
2451            }
2452        }
2453
2454        impl std::fmt::Display for $name {
2455            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2456                self.0.fmt(f)
2457            }
2458        }
2459
2460        impl From<$name> for sea_query::Value {
2461            fn from(value: $name) -> Self {
2462                sea_query::Value::Int(Some(value.0))
2463            }
2464        }
2465
2466        impl sea_orm::TryGetable for $name {
2467            fn try_get(
2468                res: &sea_orm::QueryResult,
2469                pre: &str,
2470                col: &str,
2471            ) -> Result<Self, sea_orm::TryGetError> {
2472                Ok(Self(i32::try_get(res, pre, col)?))
2473            }
2474        }
2475
2476        impl sea_query::ValueType for $name {
2477            fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
2478                match v {
2479                    Value::TinyInt(Some(int)) => {
2480                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2481                    }
2482                    Value::SmallInt(Some(int)) => {
2483                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2484                    }
2485                    Value::Int(Some(int)) => {
2486                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2487                    }
2488                    Value::BigInt(Some(int)) => {
2489                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2490                    }
2491                    Value::TinyUnsigned(Some(int)) => {
2492                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2493                    }
2494                    Value::SmallUnsigned(Some(int)) => {
2495                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2496                    }
2497                    Value::Unsigned(Some(int)) => {
2498                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2499                    }
2500                    Value::BigUnsigned(Some(int)) => {
2501                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2502                    }
2503                    _ => Err(sea_query::ValueTypeErr),
2504                }
2505            }
2506
2507            fn type_name() -> String {
2508                stringify!($name).into()
2509            }
2510
2511            fn array_type() -> sea_query::ArrayType {
2512                sea_query::ArrayType::Int
2513            }
2514
2515            fn column_type() -> sea_query::ColumnType {
2516                sea_query::ColumnType::Integer(None)
2517            }
2518        }
2519
2520        impl sea_orm::TryFromU64 for $name {
2521            fn try_from_u64(n: u64) -> Result<Self, DbErr> {
2522                Ok(Self(n.try_into().map_err(|_| {
2523                    DbErr::ConvertFromU64(concat!(
2524                        "error converting ",
2525                        stringify!($name),
2526                        " to u64"
2527                    ))
2528                })?))
2529            }
2530        }
2531
2532        impl sea_query::Nullable for $name {
2533            fn null() -> Value {
2534                Value::Int(None)
2535            }
2536        }
2537    };
2538}
2539
2540id_type!(AccessTokenId);
2541id_type!(ContactId);
2542id_type!(RoomId);
2543id_type!(RoomParticipantId);
2544id_type!(ProjectId);
2545id_type!(ProjectCollaboratorId);
2546id_type!(ReplicaId);
2547id_type!(SignupId);
2548id_type!(UserId);
2549
2550pub struct LeftRoom {
2551    pub room: proto::Room,
2552    pub left_projects: HashMap<ProjectId, LeftProject>,
2553    pub canceled_calls_to_user_ids: Vec<UserId>,
2554}
2555
2556pub struct Project {
2557    pub collaborators: Vec<project_collaborator::Model>,
2558    pub worktrees: BTreeMap<u64, Worktree>,
2559    pub language_servers: Vec<proto::LanguageServer>,
2560}
2561
2562pub struct LeftProject {
2563    pub id: ProjectId,
2564    pub host_user_id: UserId,
2565    pub host_connection_id: ConnectionId,
2566    pub connection_ids: Vec<ConnectionId>,
2567}
2568
2569pub struct Worktree {
2570    pub id: u64,
2571    pub abs_path: String,
2572    pub root_name: String,
2573    pub visible: bool,
2574    pub entries: Vec<proto::Entry>,
2575    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
2576    pub scan_id: u64,
2577    pub is_complete: bool,
2578}
2579
2580#[cfg(test)]
2581pub use test::*;
2582
2583#[cfg(test)]
2584mod test {
2585    use super::*;
2586    use gpui::executor::Background;
2587    use lazy_static::lazy_static;
2588    use parking_lot::Mutex;
2589    use rand::prelude::*;
2590    use sea_orm::ConnectionTrait;
2591    use sqlx::migrate::MigrateDatabase;
2592    use std::sync::Arc;
2593
2594    pub struct TestDb {
2595        pub db: Option<Arc<Database>>,
2596        pub connection: Option<sqlx::AnyConnection>,
2597    }
2598
2599    impl TestDb {
2600        pub fn sqlite(background: Arc<Background>) -> Self {
2601            let url = format!("sqlite::memory:");
2602            let runtime = tokio::runtime::Builder::new_current_thread()
2603                .enable_io()
2604                .enable_time()
2605                .build()
2606                .unwrap();
2607
2608            let mut db = runtime.block_on(async {
2609                let mut options = ConnectOptions::new(url);
2610                options.max_connections(5);
2611                let db = Database::new(options).await.unwrap();
2612                let sql = include_str!(concat!(
2613                    env!("CARGO_MANIFEST_DIR"),
2614                    "/migrations.sqlite/20221109000000_test_schema.sql"
2615                ));
2616                db.pool
2617                    .execute(sea_orm::Statement::from_string(
2618                        db.pool.get_database_backend(),
2619                        sql.into(),
2620                    ))
2621                    .await
2622                    .unwrap();
2623                db
2624            });
2625
2626            db.background = Some(background);
2627            db.runtime = Some(runtime);
2628
2629            Self {
2630                db: Some(Arc::new(db)),
2631                connection: None,
2632            }
2633        }
2634
2635        pub fn postgres(background: Arc<Background>) -> Self {
2636            lazy_static! {
2637                static ref LOCK: Mutex<()> = Mutex::new(());
2638            }
2639
2640            let _guard = LOCK.lock();
2641            let mut rng = StdRng::from_entropy();
2642            let url = format!(
2643                "postgres://postgres@localhost/zed-test-{}",
2644                rng.gen::<u128>()
2645            );
2646            let runtime = tokio::runtime::Builder::new_current_thread()
2647                .enable_io()
2648                .enable_time()
2649                .build()
2650                .unwrap();
2651
2652            let mut db = runtime.block_on(async {
2653                sqlx::Postgres::create_database(&url)
2654                    .await
2655                    .expect("failed to create test db");
2656                let mut options = ConnectOptions::new(url);
2657                options
2658                    .max_connections(5)
2659                    .idle_timeout(Duration::from_secs(0));
2660                let db = Database::new(options).await.unwrap();
2661                let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
2662                db.migrate(Path::new(migrations_path), false).await.unwrap();
2663                db
2664            });
2665
2666            db.background = Some(background);
2667            db.runtime = Some(runtime);
2668
2669            Self {
2670                db: Some(Arc::new(db)),
2671                connection: None,
2672            }
2673        }
2674
2675        pub fn db(&self) -> &Arc<Database> {
2676            self.db.as_ref().unwrap()
2677        }
2678    }
2679
2680    impl Drop for TestDb {
2681        fn drop(&mut self) {
2682            let db = self.db.take().unwrap();
2683            if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
2684                db.runtime.as_ref().unwrap().block_on(async {
2685                    use util::ResultExt;
2686                    let query = "
2687                        SELECT pg_terminate_backend(pg_stat_activity.pid)
2688                        FROM pg_stat_activity
2689                        WHERE
2690                            pg_stat_activity.datname = current_database() AND
2691                            pid <> pg_backend_pid();
2692                    ";
2693                    db.pool
2694                        .execute(sea_orm::Statement::from_string(
2695                            db.pool.get_database_backend(),
2696                            query.into(),
2697                        ))
2698                        .await
2699                        .log_err();
2700                    sqlx::Postgres::drop_database(db.options.get_url())
2701                        .await
2702                        .log_err();
2703                })
2704            }
2705        }
2706    }
2707}