db.rs

   1mod access_token;
   2mod contact;
   3mod language_server;
   4mod project;
   5mod project_collaborator;
   6mod room;
   7mod room_participant;
   8mod signup;
   9#[cfg(test)]
  10mod tests;
  11mod user;
  12mod worktree;
  13mod worktree_diagnostic_summary;
  14mod worktree_entry;
  15
  16use crate::{Error, Result};
  17use anyhow::anyhow;
  18use collections::{BTreeMap, HashMap, HashSet};
  19pub use contact::Contact;
  20use dashmap::DashMap;
  21use futures::StreamExt;
  22use hyper::StatusCode;
  23use rpc::{proto, ConnectionId};
  24pub use sea_orm::ConnectOptions;
  25use sea_orm::{
  26    entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
  27    DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
  28    Statement, TransactionTrait,
  29};
  30use sea_query::{Alias, Expr, OnConflict, Query};
  31use serde::{Deserialize, Serialize};
  32pub use signup::{Invite, NewSignup, WaitlistSummary};
  33use sqlx::migrate::{Migrate, Migration, MigrationSource};
  34use sqlx::Connection;
  35use std::ops::{Deref, DerefMut};
  36use std::path::Path;
  37use std::time::Duration;
  38use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
  39use tokio::sync::{Mutex, OwnedMutexGuard};
  40pub use user::Model as User;
  41
  42pub struct Database {
  43    options: ConnectOptions,
  44    pool: DatabaseConnection,
  45    rooms: DashMap<RoomId, Arc<Mutex<()>>>,
  46    #[cfg(test)]
  47    background: Option<std::sync::Arc<gpui::executor::Background>>,
  48    #[cfg(test)]
  49    runtime: Option<tokio::runtime::Runtime>,
  50    epoch: Uuid,
  51}
  52
  53impl Database {
  54    pub async fn new(options: ConnectOptions) -> Result<Self> {
  55        Ok(Self {
  56            options: options.clone(),
  57            pool: sea_orm::Database::connect(options).await?,
  58            rooms: DashMap::with_capacity(16384),
  59            #[cfg(test)]
  60            background: None,
  61            #[cfg(test)]
  62            runtime: None,
  63            epoch: Uuid::new_v4(),
  64        })
  65    }
  66
  67    pub async fn migrate(
  68        &self,
  69        migrations_path: &Path,
  70        ignore_checksum_mismatch: bool,
  71    ) -> anyhow::Result<Vec<(Migration, Duration)>> {
  72        let migrations = MigrationSource::resolve(migrations_path)
  73            .await
  74            .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
  75
  76        let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
  77
  78        connection.ensure_migrations_table().await?;
  79        let applied_migrations: HashMap<_, _> = connection
  80            .list_applied_migrations()
  81            .await?
  82            .into_iter()
  83            .map(|m| (m.version, m))
  84            .collect();
  85
  86        let mut new_migrations = Vec::new();
  87        for migration in migrations {
  88            match applied_migrations.get(&migration.version) {
  89                Some(applied_migration) => {
  90                    if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
  91                    {
  92                        Err(anyhow!(
  93                            "checksum mismatch for applied migration {}",
  94                            migration.description
  95                        ))?;
  96                    }
  97                }
  98                None => {
  99                    let elapsed = connection.apply(&migration).await?;
 100                    new_migrations.push((migration, elapsed));
 101                }
 102            }
 103        }
 104
 105        Ok(new_migrations)
 106    }
 107
 108    pub async fn clear_stale_data(&self) -> Result<()> {
 109        self.transaction(|tx| async move {
 110            project_collaborator::Entity::delete_many()
 111                .filter(project_collaborator::Column::ConnectionEpoch.ne(self.epoch))
 112                .exec(&*tx)
 113                .await?;
 114            room_participant::Entity::delete_many()
 115                .filter(
 116                    room_participant::Column::AnsweringConnectionEpoch
 117                        .ne(self.epoch)
 118                        .or(room_participant::Column::CallingConnectionEpoch.ne(self.epoch)),
 119                )
 120                .exec(&*tx)
 121                .await?;
 122            project::Entity::delete_many()
 123                .filter(project::Column::HostConnectionEpoch.ne(self.epoch))
 124                .exec(&*tx)
 125                .await?;
 126            room::Entity::delete_many()
 127                .filter(
 128                    room::Column::Id.not_in_subquery(
 129                        Query::select()
 130                            .column(room_participant::Column::RoomId)
 131                            .from(room_participant::Entity)
 132                            .distinct()
 133                            .to_owned(),
 134                    ),
 135                )
 136                .exec(&*tx)
 137                .await?;
 138            Ok(())
 139        })
 140        .await
 141    }
 142
 143    // users
 144
 145    pub async fn create_user(
 146        &self,
 147        email_address: &str,
 148        admin: bool,
 149        params: NewUserParams,
 150    ) -> Result<NewUserResult> {
 151        self.transaction(|tx| async {
 152            let tx = tx;
 153            let user = user::Entity::insert(user::ActiveModel {
 154                email_address: ActiveValue::set(Some(email_address.into())),
 155                github_login: ActiveValue::set(params.github_login.clone()),
 156                github_user_id: ActiveValue::set(Some(params.github_user_id)),
 157                admin: ActiveValue::set(admin),
 158                metrics_id: ActiveValue::set(Uuid::new_v4()),
 159                ..Default::default()
 160            })
 161            .on_conflict(
 162                OnConflict::column(user::Column::GithubLogin)
 163                    .update_column(user::Column::GithubLogin)
 164                    .to_owned(),
 165            )
 166            .exec_with_returning(&*tx)
 167            .await?;
 168
 169            Ok(NewUserResult {
 170                user_id: user.id,
 171                metrics_id: user.metrics_id.to_string(),
 172                signup_device_id: None,
 173                inviting_user_id: None,
 174            })
 175        })
 176        .await
 177    }
 178
 179    pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
 180        self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
 181            .await
 182    }
 183
 184    pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
 185        self.transaction(|tx| async {
 186            let tx = tx;
 187            Ok(user::Entity::find()
 188                .filter(user::Column::Id.is_in(ids.iter().copied()))
 189                .all(&*tx)
 190                .await?)
 191        })
 192        .await
 193    }
 194
 195    pub async fn get_user_by_github_account(
 196        &self,
 197        github_login: &str,
 198        github_user_id: Option<i32>,
 199    ) -> Result<Option<User>> {
 200        self.transaction(|tx| async move {
 201            let tx = &*tx;
 202            if let Some(github_user_id) = github_user_id {
 203                if let Some(user_by_github_user_id) = user::Entity::find()
 204                    .filter(user::Column::GithubUserId.eq(github_user_id))
 205                    .one(tx)
 206                    .await?
 207                {
 208                    let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
 209                    user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
 210                    Ok(Some(user_by_github_user_id.update(tx).await?))
 211                } else if let Some(user_by_github_login) = user::Entity::find()
 212                    .filter(user::Column::GithubLogin.eq(github_login))
 213                    .one(tx)
 214                    .await?
 215                {
 216                    let mut user_by_github_login = user_by_github_login.into_active_model();
 217                    user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
 218                    Ok(Some(user_by_github_login.update(tx).await?))
 219                } else {
 220                    Ok(None)
 221                }
 222            } else {
 223                Ok(user::Entity::find()
 224                    .filter(user::Column::GithubLogin.eq(github_login))
 225                    .one(tx)
 226                    .await?)
 227            }
 228        })
 229        .await
 230    }
 231
 232    pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
 233        self.transaction(|tx| async move {
 234            Ok(user::Entity::find()
 235                .order_by_asc(user::Column::GithubLogin)
 236                .limit(limit as u64)
 237                .offset(page as u64 * limit as u64)
 238                .all(&*tx)
 239                .await?)
 240        })
 241        .await
 242    }
 243
 244    pub async fn get_users_with_no_invites(
 245        &self,
 246        invited_by_another_user: bool,
 247    ) -> Result<Vec<User>> {
 248        self.transaction(|tx| async move {
 249            Ok(user::Entity::find()
 250                .filter(
 251                    user::Column::InviteCount
 252                        .eq(0)
 253                        .and(if invited_by_another_user {
 254                            user::Column::InviterId.is_not_null()
 255                        } else {
 256                            user::Column::InviterId.is_null()
 257                        }),
 258                )
 259                .all(&*tx)
 260                .await?)
 261        })
 262        .await
 263    }
 264
 265    pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
 266        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 267        enum QueryAs {
 268            MetricsId,
 269        }
 270
 271        self.transaction(|tx| async move {
 272            let metrics_id: Uuid = user::Entity::find_by_id(id)
 273                .select_only()
 274                .column(user::Column::MetricsId)
 275                .into_values::<_, QueryAs>()
 276                .one(&*tx)
 277                .await?
 278                .ok_or_else(|| anyhow!("could not find user"))?;
 279            Ok(metrics_id.to_string())
 280        })
 281        .await
 282    }
 283
 284    pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
 285        self.transaction(|tx| async move {
 286            user::Entity::update_many()
 287                .filter(user::Column::Id.eq(id))
 288                .set(user::ActiveModel {
 289                    admin: ActiveValue::set(is_admin),
 290                    ..Default::default()
 291                })
 292                .exec(&*tx)
 293                .await?;
 294            Ok(())
 295        })
 296        .await
 297    }
 298
 299    pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
 300        self.transaction(|tx| async move {
 301            user::Entity::update_many()
 302                .filter(user::Column::Id.eq(id))
 303                .set(user::ActiveModel {
 304                    connected_once: ActiveValue::set(connected_once),
 305                    ..Default::default()
 306                })
 307                .exec(&*tx)
 308                .await?;
 309            Ok(())
 310        })
 311        .await
 312    }
 313
 314    pub async fn destroy_user(&self, id: UserId) -> Result<()> {
 315        self.transaction(|tx| async move {
 316            access_token::Entity::delete_many()
 317                .filter(access_token::Column::UserId.eq(id))
 318                .exec(&*tx)
 319                .await?;
 320            user::Entity::delete_by_id(id).exec(&*tx).await?;
 321            Ok(())
 322        })
 323        .await
 324    }
 325
 326    // contacts
 327
 328    pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
 329        #[derive(Debug, FromQueryResult)]
 330        struct ContactWithUserBusyStatuses {
 331            user_id_a: UserId,
 332            user_id_b: UserId,
 333            a_to_b: bool,
 334            accepted: bool,
 335            should_notify: bool,
 336            user_a_busy: bool,
 337            user_b_busy: bool,
 338        }
 339
 340        self.transaction(|tx| async move {
 341            let user_a_participant = Alias::new("user_a_participant");
 342            let user_b_participant = Alias::new("user_b_participant");
 343            let mut db_contacts = contact::Entity::find()
 344                .column_as(
 345                    Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
 346                        .is_not_null(),
 347                    "user_a_busy",
 348                )
 349                .column_as(
 350                    Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
 351                        .is_not_null(),
 352                    "user_b_busy",
 353                )
 354                .filter(
 355                    contact::Column::UserIdA
 356                        .eq(user_id)
 357                        .or(contact::Column::UserIdB.eq(user_id)),
 358                )
 359                .join_as(
 360                    JoinType::LeftJoin,
 361                    contact::Relation::UserARoomParticipant.def(),
 362                    user_a_participant,
 363                )
 364                .join_as(
 365                    JoinType::LeftJoin,
 366                    contact::Relation::UserBRoomParticipant.def(),
 367                    user_b_participant,
 368                )
 369                .into_model::<ContactWithUserBusyStatuses>()
 370                .stream(&*tx)
 371                .await?;
 372
 373            let mut contacts = Vec::new();
 374            while let Some(db_contact) = db_contacts.next().await {
 375                let db_contact = db_contact?;
 376                if db_contact.user_id_a == user_id {
 377                    if db_contact.accepted {
 378                        contacts.push(Contact::Accepted {
 379                            user_id: db_contact.user_id_b,
 380                            should_notify: db_contact.should_notify && db_contact.a_to_b,
 381                            busy: db_contact.user_b_busy,
 382                        });
 383                    } else if db_contact.a_to_b {
 384                        contacts.push(Contact::Outgoing {
 385                            user_id: db_contact.user_id_b,
 386                        })
 387                    } else {
 388                        contacts.push(Contact::Incoming {
 389                            user_id: db_contact.user_id_b,
 390                            should_notify: db_contact.should_notify,
 391                        });
 392                    }
 393                } else if db_contact.accepted {
 394                    contacts.push(Contact::Accepted {
 395                        user_id: db_contact.user_id_a,
 396                        should_notify: db_contact.should_notify && !db_contact.a_to_b,
 397                        busy: db_contact.user_a_busy,
 398                    });
 399                } else if db_contact.a_to_b {
 400                    contacts.push(Contact::Incoming {
 401                        user_id: db_contact.user_id_a,
 402                        should_notify: db_contact.should_notify,
 403                    });
 404                } else {
 405                    contacts.push(Contact::Outgoing {
 406                        user_id: db_contact.user_id_a,
 407                    });
 408                }
 409            }
 410
 411            contacts.sort_unstable_by_key(|contact| contact.user_id());
 412
 413            Ok(contacts)
 414        })
 415        .await
 416    }
 417
 418    pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
 419        self.transaction(|tx| async move {
 420            let participant = room_participant::Entity::find()
 421                .filter(room_participant::Column::UserId.eq(user_id))
 422                .one(&*tx)
 423                .await?;
 424            Ok(participant.is_some())
 425        })
 426        .await
 427    }
 428
 429    pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
 430        self.transaction(|tx| async move {
 431            let (id_a, id_b) = if user_id_1 < user_id_2 {
 432                (user_id_1, user_id_2)
 433            } else {
 434                (user_id_2, user_id_1)
 435            };
 436
 437            Ok(contact::Entity::find()
 438                .filter(
 439                    contact::Column::UserIdA
 440                        .eq(id_a)
 441                        .and(contact::Column::UserIdB.eq(id_b))
 442                        .and(contact::Column::Accepted.eq(true)),
 443                )
 444                .one(&*tx)
 445                .await?
 446                .is_some())
 447        })
 448        .await
 449    }
 450
 451    pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
 452        self.transaction(|tx| async move {
 453            let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
 454                (sender_id, receiver_id, true)
 455            } else {
 456                (receiver_id, sender_id, false)
 457            };
 458
 459            let rows_affected = contact::Entity::insert(contact::ActiveModel {
 460                user_id_a: ActiveValue::set(id_a),
 461                user_id_b: ActiveValue::set(id_b),
 462                a_to_b: ActiveValue::set(a_to_b),
 463                accepted: ActiveValue::set(false),
 464                should_notify: ActiveValue::set(true),
 465                ..Default::default()
 466            })
 467            .on_conflict(
 468                OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
 469                    .values([
 470                        (contact::Column::Accepted, true.into()),
 471                        (contact::Column::ShouldNotify, false.into()),
 472                    ])
 473                    .action_and_where(
 474                        contact::Column::Accepted.eq(false).and(
 475                            contact::Column::AToB
 476                                .eq(a_to_b)
 477                                .and(contact::Column::UserIdA.eq(id_b))
 478                                .or(contact::Column::AToB
 479                                    .ne(a_to_b)
 480                                    .and(contact::Column::UserIdA.eq(id_a))),
 481                        ),
 482                    )
 483                    .to_owned(),
 484            )
 485            .exec_without_returning(&*tx)
 486            .await?;
 487
 488            if rows_affected == 1 {
 489                Ok(())
 490            } else {
 491                Err(anyhow!("contact already requested"))?
 492            }
 493        })
 494        .await
 495    }
 496
 497    pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<()> {
 498        self.transaction(|tx| async move {
 499            let (id_a, id_b) = if responder_id < requester_id {
 500                (responder_id, requester_id)
 501            } else {
 502                (requester_id, responder_id)
 503            };
 504
 505            let result = contact::Entity::delete_many()
 506                .filter(
 507                    contact::Column::UserIdA
 508                        .eq(id_a)
 509                        .and(contact::Column::UserIdB.eq(id_b)),
 510                )
 511                .exec(&*tx)
 512                .await?;
 513
 514            if result.rows_affected == 1 {
 515                Ok(())
 516            } else {
 517                Err(anyhow!("no such contact"))?
 518            }
 519        })
 520        .await
 521    }
 522
 523    pub async fn dismiss_contact_notification(
 524        &self,
 525        user_id: UserId,
 526        contact_user_id: UserId,
 527    ) -> Result<()> {
 528        self.transaction(|tx| async move {
 529            let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
 530                (user_id, contact_user_id, true)
 531            } else {
 532                (contact_user_id, user_id, false)
 533            };
 534
 535            let result = contact::Entity::update_many()
 536                .set(contact::ActiveModel {
 537                    should_notify: ActiveValue::set(false),
 538                    ..Default::default()
 539                })
 540                .filter(
 541                    contact::Column::UserIdA
 542                        .eq(id_a)
 543                        .and(contact::Column::UserIdB.eq(id_b))
 544                        .and(
 545                            contact::Column::AToB
 546                                .eq(a_to_b)
 547                                .and(contact::Column::Accepted.eq(true))
 548                                .or(contact::Column::AToB
 549                                    .ne(a_to_b)
 550                                    .and(contact::Column::Accepted.eq(false))),
 551                        ),
 552                )
 553                .exec(&*tx)
 554                .await?;
 555            if result.rows_affected == 0 {
 556                Err(anyhow!("no such contact request"))?
 557            } else {
 558                Ok(())
 559            }
 560        })
 561        .await
 562    }
 563
 564    pub async fn respond_to_contact_request(
 565        &self,
 566        responder_id: UserId,
 567        requester_id: UserId,
 568        accept: bool,
 569    ) -> Result<()> {
 570        self.transaction(|tx| async move {
 571            let (id_a, id_b, a_to_b) = if responder_id < requester_id {
 572                (responder_id, requester_id, false)
 573            } else {
 574                (requester_id, responder_id, true)
 575            };
 576            let rows_affected = if accept {
 577                let result = contact::Entity::update_many()
 578                    .set(contact::ActiveModel {
 579                        accepted: ActiveValue::set(true),
 580                        should_notify: ActiveValue::set(true),
 581                        ..Default::default()
 582                    })
 583                    .filter(
 584                        contact::Column::UserIdA
 585                            .eq(id_a)
 586                            .and(contact::Column::UserIdB.eq(id_b))
 587                            .and(contact::Column::AToB.eq(a_to_b)),
 588                    )
 589                    .exec(&*tx)
 590                    .await?;
 591                result.rows_affected
 592            } else {
 593                let result = contact::Entity::delete_many()
 594                    .filter(
 595                        contact::Column::UserIdA
 596                            .eq(id_a)
 597                            .and(contact::Column::UserIdB.eq(id_b))
 598                            .and(contact::Column::AToB.eq(a_to_b))
 599                            .and(contact::Column::Accepted.eq(false)),
 600                    )
 601                    .exec(&*tx)
 602                    .await?;
 603
 604                result.rows_affected
 605            };
 606
 607            if rows_affected == 1 {
 608                Ok(())
 609            } else {
 610                Err(anyhow!("no such contact request"))?
 611            }
 612        })
 613        .await
 614    }
 615
 616    pub fn fuzzy_like_string(string: &str) -> String {
 617        let mut result = String::with_capacity(string.len() * 2 + 1);
 618        for c in string.chars() {
 619            if c.is_alphanumeric() {
 620                result.push('%');
 621                result.push(c);
 622            }
 623        }
 624        result.push('%');
 625        result
 626    }
 627
 628    pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
 629        self.transaction(|tx| async {
 630            let tx = tx;
 631            let like_string = Self::fuzzy_like_string(name_query);
 632            let query = "
 633                SELECT users.*
 634                FROM users
 635                WHERE github_login ILIKE $1
 636                ORDER BY github_login <-> $2
 637                LIMIT $3
 638            ";
 639
 640            Ok(user::Entity::find()
 641                .from_raw_sql(Statement::from_sql_and_values(
 642                    self.pool.get_database_backend(),
 643                    query.into(),
 644                    vec![like_string.into(), name_query.into(), limit.into()],
 645                ))
 646                .all(&*tx)
 647                .await?)
 648        })
 649        .await
 650    }
 651
 652    // signups
 653
 654    pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
 655        self.transaction(|tx| async move {
 656            signup::Entity::insert(signup::ActiveModel {
 657                email_address: ActiveValue::set(signup.email_address.clone()),
 658                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 659                email_confirmation_sent: ActiveValue::set(false),
 660                platform_mac: ActiveValue::set(signup.platform_mac),
 661                platform_windows: ActiveValue::set(signup.platform_windows),
 662                platform_linux: ActiveValue::set(signup.platform_linux),
 663                platform_unknown: ActiveValue::set(false),
 664                editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
 665                programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
 666                device_id: ActiveValue::set(signup.device_id.clone()),
 667                added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
 668                ..Default::default()
 669            })
 670            .on_conflict(
 671                OnConflict::column(signup::Column::EmailAddress)
 672                    .update_columns([
 673                        signup::Column::PlatformMac,
 674                        signup::Column::PlatformWindows,
 675                        signup::Column::PlatformLinux,
 676                        signup::Column::EditorFeatures,
 677                        signup::Column::ProgrammingLanguages,
 678                        signup::Column::DeviceId,
 679                        signup::Column::AddedToMailingList,
 680                    ])
 681                    .to_owned(),
 682            )
 683            .exec(&*tx)
 684            .await?;
 685            Ok(())
 686        })
 687        .await
 688    }
 689
 690    pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
 691        self.transaction(|tx| async move {
 692            let signup = signup::Entity::find()
 693                .filter(signup::Column::EmailAddress.eq(email_address))
 694                .one(&*tx)
 695                .await?
 696                .ok_or_else(|| {
 697                    anyhow!("signup with email address {} doesn't exist", email_address)
 698                })?;
 699
 700            Ok(signup)
 701        })
 702        .await
 703    }
 704
 705    pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
 706        self.transaction(|tx| async move {
 707            let query = "
 708                SELECT
 709                    COUNT(*) as count,
 710                    COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
 711                    COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
 712                    COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
 713                    COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
 714                FROM (
 715                    SELECT *
 716                    FROM signups
 717                    WHERE
 718                        NOT email_confirmation_sent
 719                ) AS unsent
 720            ";
 721            Ok(
 722                WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
 723                    self.pool.get_database_backend(),
 724                    query.into(),
 725                    vec![],
 726                ))
 727                .one(&*tx)
 728                .await?
 729                .ok_or_else(|| anyhow!("invalid result"))?,
 730            )
 731        })
 732        .await
 733    }
 734
 735    pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
 736        let emails = invites
 737            .iter()
 738            .map(|s| s.email_address.as_str())
 739            .collect::<Vec<_>>();
 740        self.transaction(|tx| async {
 741            let tx = tx;
 742            signup::Entity::update_many()
 743                .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
 744                .set(signup::ActiveModel {
 745                    email_confirmation_sent: ActiveValue::set(true),
 746                    ..Default::default()
 747                })
 748                .exec(&*tx)
 749                .await?;
 750            Ok(())
 751        })
 752        .await
 753    }
 754
 755    pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
 756        self.transaction(|tx| async move {
 757            Ok(signup::Entity::find()
 758                .select_only()
 759                .column(signup::Column::EmailAddress)
 760                .column(signup::Column::EmailConfirmationCode)
 761                .filter(
 762                    signup::Column::EmailConfirmationSent.eq(false).and(
 763                        signup::Column::PlatformMac
 764                            .eq(true)
 765                            .or(signup::Column::PlatformUnknown.eq(true)),
 766                    ),
 767                )
 768                .order_by_asc(signup::Column::CreatedAt)
 769                .limit(count as u64)
 770                .into_model()
 771                .all(&*tx)
 772                .await?)
 773        })
 774        .await
 775    }
 776
 777    // invite codes
 778
 779    pub async fn create_invite_from_code(
 780        &self,
 781        code: &str,
 782        email_address: &str,
 783        device_id: Option<&str>,
 784    ) -> Result<Invite> {
 785        self.transaction(|tx| async move {
 786            let existing_user = user::Entity::find()
 787                .filter(user::Column::EmailAddress.eq(email_address))
 788                .one(&*tx)
 789                .await?;
 790
 791            if existing_user.is_some() {
 792                Err(anyhow!("email address is already in use"))?;
 793            }
 794
 795            let inviting_user_with_invites = match user::Entity::find()
 796                .filter(
 797                    user::Column::InviteCode
 798                        .eq(code)
 799                        .and(user::Column::InviteCount.gt(0)),
 800                )
 801                .one(&*tx)
 802                .await?
 803            {
 804                Some(inviting_user) => inviting_user,
 805                None => {
 806                    return Err(Error::Http(
 807                        StatusCode::UNAUTHORIZED,
 808                        "unable to find an invite code with invites remaining".to_string(),
 809                    ))?
 810                }
 811            };
 812            user::Entity::update_many()
 813                .filter(
 814                    user::Column::Id
 815                        .eq(inviting_user_with_invites.id)
 816                        .and(user::Column::InviteCount.gt(0)),
 817                )
 818                .col_expr(
 819                    user::Column::InviteCount,
 820                    Expr::col(user::Column::InviteCount).sub(1),
 821                )
 822                .exec(&*tx)
 823                .await?;
 824
 825            let signup = signup::Entity::insert(signup::ActiveModel {
 826                email_address: ActiveValue::set(email_address.into()),
 827                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 828                email_confirmation_sent: ActiveValue::set(false),
 829                inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
 830                platform_linux: ActiveValue::set(false),
 831                platform_mac: ActiveValue::set(false),
 832                platform_windows: ActiveValue::set(false),
 833                platform_unknown: ActiveValue::set(true),
 834                device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
 835                ..Default::default()
 836            })
 837            .on_conflict(
 838                OnConflict::column(signup::Column::EmailAddress)
 839                    .update_column(signup::Column::InvitingUserId)
 840                    .to_owned(),
 841            )
 842            .exec_with_returning(&*tx)
 843            .await?;
 844
 845            Ok(Invite {
 846                email_address: signup.email_address,
 847                email_confirmation_code: signup.email_confirmation_code,
 848            })
 849        })
 850        .await
 851    }
 852
 853    pub async fn create_user_from_invite(
 854        &self,
 855        invite: &Invite,
 856        user: NewUserParams,
 857    ) -> Result<Option<NewUserResult>> {
 858        self.transaction(|tx| async {
 859            let tx = tx;
 860            let signup = signup::Entity::find()
 861                .filter(
 862                    signup::Column::EmailAddress
 863                        .eq(invite.email_address.as_str())
 864                        .and(
 865                            signup::Column::EmailConfirmationCode
 866                                .eq(invite.email_confirmation_code.as_str()),
 867                        ),
 868                )
 869                .one(&*tx)
 870                .await?
 871                .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
 872
 873            if signup.user_id.is_some() {
 874                return Ok(None);
 875            }
 876
 877            let user = user::Entity::insert(user::ActiveModel {
 878                email_address: ActiveValue::set(Some(invite.email_address.clone())),
 879                github_login: ActiveValue::set(user.github_login.clone()),
 880                github_user_id: ActiveValue::set(Some(user.github_user_id)),
 881                admin: ActiveValue::set(false),
 882                invite_count: ActiveValue::set(user.invite_count),
 883                invite_code: ActiveValue::set(Some(random_invite_code())),
 884                metrics_id: ActiveValue::set(Uuid::new_v4()),
 885                ..Default::default()
 886            })
 887            .on_conflict(
 888                OnConflict::column(user::Column::GithubLogin)
 889                    .update_columns([
 890                        user::Column::EmailAddress,
 891                        user::Column::GithubUserId,
 892                        user::Column::Admin,
 893                    ])
 894                    .to_owned(),
 895            )
 896            .exec_with_returning(&*tx)
 897            .await?;
 898
 899            let mut signup = signup.into_active_model();
 900            signup.user_id = ActiveValue::set(Some(user.id));
 901            let signup = signup.update(&*tx).await?;
 902
 903            if let Some(inviting_user_id) = signup.inviting_user_id {
 904                let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
 905                    (inviting_user_id, user.id, true)
 906                } else {
 907                    (user.id, inviting_user_id, false)
 908                };
 909
 910                contact::Entity::insert(contact::ActiveModel {
 911                    user_id_a: ActiveValue::set(user_id_a),
 912                    user_id_b: ActiveValue::set(user_id_b),
 913                    a_to_b: ActiveValue::set(a_to_b),
 914                    should_notify: ActiveValue::set(true),
 915                    accepted: ActiveValue::set(true),
 916                    ..Default::default()
 917                })
 918                .on_conflict(OnConflict::new().do_nothing().to_owned())
 919                .exec_without_returning(&*tx)
 920                .await?;
 921            }
 922
 923            Ok(Some(NewUserResult {
 924                user_id: user.id,
 925                metrics_id: user.metrics_id.to_string(),
 926                inviting_user_id: signup.inviting_user_id,
 927                signup_device_id: signup.device_id,
 928            }))
 929        })
 930        .await
 931    }
 932
 933    pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
 934        self.transaction(|tx| async move {
 935            if count > 0 {
 936                user::Entity::update_many()
 937                    .filter(
 938                        user::Column::Id
 939                            .eq(id)
 940                            .and(user::Column::InviteCode.is_null()),
 941                    )
 942                    .set(user::ActiveModel {
 943                        invite_code: ActiveValue::set(Some(random_invite_code())),
 944                        ..Default::default()
 945                    })
 946                    .exec(&*tx)
 947                    .await?;
 948            }
 949
 950            user::Entity::update_many()
 951                .filter(user::Column::Id.eq(id))
 952                .set(user::ActiveModel {
 953                    invite_count: ActiveValue::set(count),
 954                    ..Default::default()
 955                })
 956                .exec(&*tx)
 957                .await?;
 958            Ok(())
 959        })
 960        .await
 961    }
 962
 963    pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
 964        self.transaction(|tx| async move {
 965            match user::Entity::find_by_id(id).one(&*tx).await? {
 966                Some(user) if user.invite_code.is_some() => {
 967                    Ok(Some((user.invite_code.unwrap(), user.invite_count)))
 968                }
 969                _ => Ok(None),
 970            }
 971        })
 972        .await
 973    }
 974
 975    pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
 976        self.transaction(|tx| async move {
 977            user::Entity::find()
 978                .filter(user::Column::InviteCode.eq(code))
 979                .one(&*tx)
 980                .await?
 981                .ok_or_else(|| {
 982                    Error::Http(
 983                        StatusCode::NOT_FOUND,
 984                        "that invite code does not exist".to_string(),
 985                    )
 986                })
 987        })
 988        .await
 989    }
 990
 991    // rooms
 992
 993    pub async fn incoming_call_for_user(
 994        &self,
 995        user_id: UserId,
 996    ) -> Result<Option<proto::IncomingCall>> {
 997        self.transaction(|tx| async move {
 998            let pending_participant = room_participant::Entity::find()
 999                .filter(
1000                    room_participant::Column::UserId
1001                        .eq(user_id)
1002                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1003                )
1004                .one(&*tx)
1005                .await?;
1006
1007            if let Some(pending_participant) = pending_participant {
1008                let room = self.get_room(pending_participant.room_id, &tx).await?;
1009                Ok(Self::build_incoming_call(&room, user_id))
1010            } else {
1011                Ok(None)
1012            }
1013        })
1014        .await
1015    }
1016
1017    pub async fn create_room(
1018        &self,
1019        user_id: UserId,
1020        connection_id: ConnectionId,
1021        live_kit_room: &str,
1022    ) -> Result<RoomGuard<proto::Room>> {
1023        self.room_transaction(|tx| async move {
1024            let room = room::ActiveModel {
1025                live_kit_room: ActiveValue::set(live_kit_room.into()),
1026                ..Default::default()
1027            }
1028            .insert(&*tx)
1029            .await?;
1030            let room_id = room.id;
1031
1032            room_participant::ActiveModel {
1033                room_id: ActiveValue::set(room_id),
1034                user_id: ActiveValue::set(user_id),
1035                answering_connection_id: ActiveValue::set(Some(connection_id.0 as i32)),
1036                answering_connection_epoch: ActiveValue::set(Some(self.epoch)),
1037                connection_lost: ActiveValue::set(false),
1038                calling_user_id: ActiveValue::set(user_id),
1039                calling_connection_id: ActiveValue::set(connection_id.0 as i32),
1040                calling_connection_epoch: ActiveValue::set(self.epoch),
1041                ..Default::default()
1042            }
1043            .insert(&*tx)
1044            .await?;
1045
1046            let room = self.get_room(room_id, &tx).await?;
1047            Ok((room_id, room))
1048        })
1049        .await
1050    }
1051
1052    pub async fn call(
1053        &self,
1054        room_id: RoomId,
1055        calling_user_id: UserId,
1056        calling_connection_id: ConnectionId,
1057        called_user_id: UserId,
1058        initial_project_id: Option<ProjectId>,
1059    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1060        self.room_transaction(|tx| async move {
1061            room_participant::ActiveModel {
1062                room_id: ActiveValue::set(room_id),
1063                user_id: ActiveValue::set(called_user_id),
1064                connection_lost: ActiveValue::set(false),
1065                calling_user_id: ActiveValue::set(calling_user_id),
1066                calling_connection_id: ActiveValue::set(calling_connection_id.0 as i32),
1067                calling_connection_epoch: ActiveValue::set(self.epoch),
1068                initial_project_id: ActiveValue::set(initial_project_id),
1069                ..Default::default()
1070            }
1071            .insert(&*tx)
1072            .await?;
1073
1074            let room = self.get_room(room_id, &tx).await?;
1075            let incoming_call = Self::build_incoming_call(&room, called_user_id)
1076                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1077            Ok((room_id, (room, incoming_call)))
1078        })
1079        .await
1080    }
1081
1082    pub async fn call_failed(
1083        &self,
1084        room_id: RoomId,
1085        called_user_id: UserId,
1086    ) -> Result<RoomGuard<proto::Room>> {
1087        self.room_transaction(|tx| async move {
1088            room_participant::Entity::delete_many()
1089                .filter(
1090                    room_participant::Column::RoomId
1091                        .eq(room_id)
1092                        .and(room_participant::Column::UserId.eq(called_user_id)),
1093                )
1094                .exec(&*tx)
1095                .await?;
1096            let room = self.get_room(room_id, &tx).await?;
1097            Ok((room_id, room))
1098        })
1099        .await
1100    }
1101
1102    pub async fn decline_call(
1103        &self,
1104        expected_room_id: Option<RoomId>,
1105        user_id: UserId,
1106    ) -> Result<RoomGuard<proto::Room>> {
1107        self.room_transaction(|tx| async move {
1108            let participant = room_participant::Entity::find()
1109                .filter(
1110                    room_participant::Column::UserId
1111                        .eq(user_id)
1112                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1113                )
1114                .one(&*tx)
1115                .await?
1116                .ok_or_else(|| anyhow!("could not decline call"))?;
1117            let room_id = participant.room_id;
1118
1119            if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) {
1120                return Err(anyhow!("declining call on unexpected room"))?;
1121            }
1122
1123            room_participant::Entity::delete(participant.into_active_model())
1124                .exec(&*tx)
1125                .await?;
1126
1127            let room = self.get_room(room_id, &tx).await?;
1128            Ok((room_id, room))
1129        })
1130        .await
1131    }
1132
1133    pub async fn cancel_call(
1134        &self,
1135        expected_room_id: Option<RoomId>,
1136        calling_connection_id: ConnectionId,
1137        called_user_id: UserId,
1138    ) -> Result<RoomGuard<proto::Room>> {
1139        self.room_transaction(|tx| async move {
1140            let participant = room_participant::Entity::find()
1141                .filter(
1142                    room_participant::Column::UserId
1143                        .eq(called_user_id)
1144                        .and(
1145                            room_participant::Column::CallingConnectionId
1146                                .eq(calling_connection_id.0 as i32),
1147                        )
1148                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1149                )
1150                .one(&*tx)
1151                .await?
1152                .ok_or_else(|| anyhow!("could not cancel call"))?;
1153            let room_id = participant.room_id;
1154            if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) {
1155                return Err(anyhow!("canceling call on unexpected room"))?;
1156            }
1157
1158            room_participant::Entity::delete(participant.into_active_model())
1159                .exec(&*tx)
1160                .await?;
1161
1162            let room = self.get_room(room_id, &tx).await?;
1163            Ok((room_id, room))
1164        })
1165        .await
1166    }
1167
1168    pub async fn join_room(
1169        &self,
1170        room_id: RoomId,
1171        user_id: UserId,
1172        connection_id: ConnectionId,
1173    ) -> Result<RoomGuard<proto::Room>> {
1174        self.room_transaction(|tx| async move {
1175            let result = room_participant::Entity::update_many()
1176                .filter(
1177                    room_participant::Column::RoomId
1178                        .eq(room_id)
1179                        .and(room_participant::Column::UserId.eq(user_id))
1180                        .and(
1181                            room_participant::Column::AnsweringConnectionId
1182                                .is_null()
1183                                .or(room_participant::Column::ConnectionLost.eq(true)),
1184                        ),
1185                )
1186                .set(room_participant::ActiveModel {
1187                    answering_connection_id: ActiveValue::set(Some(connection_id.0 as i32)),
1188                    answering_connection_epoch: ActiveValue::set(Some(self.epoch)),
1189                    connection_lost: ActiveValue::set(false),
1190                    ..Default::default()
1191                })
1192                .exec(&*tx)
1193                .await?;
1194            if result.rows_affected == 0 {
1195                Err(anyhow!("room does not exist or was already joined"))?
1196            } else {
1197                let room = self.get_room(room_id, &tx).await?;
1198                Ok((room_id, room))
1199            }
1200        })
1201        .await
1202    }
1203
1204    pub async fn leave_room(&self, connection_id: ConnectionId) -> Result<RoomGuard<LeftRoom>> {
1205        self.room_transaction(|tx| async move {
1206            let leaving_participant = room_participant::Entity::find()
1207                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0))
1208                .one(&*tx)
1209                .await?;
1210
1211            if let Some(leaving_participant) = leaving_participant {
1212                // Leave room.
1213                let room_id = leaving_participant.room_id;
1214                room_participant::Entity::delete_by_id(leaving_participant.id)
1215                    .exec(&*tx)
1216                    .await?;
1217
1218                // Cancel pending calls initiated by the leaving user.
1219                let called_participants = room_participant::Entity::find()
1220                    .filter(
1221                        room_participant::Column::CallingConnectionId
1222                            .eq(connection_id.0)
1223                            .and(room_participant::Column::AnsweringConnectionId.is_null()),
1224                    )
1225                    .all(&*tx)
1226                    .await?;
1227                room_participant::Entity::delete_many()
1228                    .filter(
1229                        room_participant::Column::Id
1230                            .is_in(called_participants.iter().map(|participant| participant.id)),
1231                    )
1232                    .exec(&*tx)
1233                    .await?;
1234                let canceled_calls_to_user_ids = called_participants
1235                    .into_iter()
1236                    .map(|participant| participant.user_id)
1237                    .collect();
1238
1239                // Detect left projects.
1240                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1241                enum QueryProjectIds {
1242                    ProjectId,
1243                }
1244                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1245                    .select_only()
1246                    .column_as(
1247                        project_collaborator::Column::ProjectId,
1248                        QueryProjectIds::ProjectId,
1249                    )
1250                    .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0))
1251                    .into_values::<_, QueryProjectIds>()
1252                    .all(&*tx)
1253                    .await?;
1254                let mut left_projects = HashMap::default();
1255                let mut collaborators = project_collaborator::Entity::find()
1256                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1257                    .stream(&*tx)
1258                    .await?;
1259                while let Some(collaborator) = collaborators.next().await {
1260                    let collaborator = collaborator?;
1261                    let left_project =
1262                        left_projects
1263                            .entry(collaborator.project_id)
1264                            .or_insert(LeftProject {
1265                                id: collaborator.project_id,
1266                                host_user_id: Default::default(),
1267                                connection_ids: Default::default(),
1268                                host_connection_id: Default::default(),
1269                            });
1270
1271                    let collaborator_connection_id =
1272                        ConnectionId(collaborator.connection_id as u32);
1273                    if collaborator_connection_id != connection_id {
1274                        left_project.connection_ids.push(collaborator_connection_id);
1275                    }
1276
1277                    if collaborator.is_host {
1278                        left_project.host_user_id = collaborator.user_id;
1279                        left_project.host_connection_id =
1280                            ConnectionId(collaborator.connection_id as u32);
1281                    }
1282                }
1283                drop(collaborators);
1284
1285                // Leave projects.
1286                project_collaborator::Entity::delete_many()
1287                    .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0))
1288                    .exec(&*tx)
1289                    .await?;
1290
1291                // Unshare projects.
1292                project::Entity::delete_many()
1293                    .filter(
1294                        project::Column::RoomId
1295                            .eq(room_id)
1296                            .and(project::Column::HostConnectionId.eq(connection_id.0)),
1297                    )
1298                    .exec(&*tx)
1299                    .await?;
1300
1301                let room = self.get_room(room_id, &tx).await?;
1302                if room.participants.is_empty() {
1303                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
1304                }
1305
1306                let left_room = LeftRoom {
1307                    room,
1308                    left_projects,
1309                    canceled_calls_to_user_ids,
1310                };
1311
1312                if left_room.room.participants.is_empty() {
1313                    self.rooms.remove(&room_id);
1314                }
1315
1316                Ok((room_id, left_room))
1317            } else {
1318                Err(anyhow!("could not leave room"))?
1319            }
1320        })
1321        .await
1322    }
1323
1324    pub async fn update_room_participant_location(
1325        &self,
1326        room_id: RoomId,
1327        connection_id: ConnectionId,
1328        location: proto::ParticipantLocation,
1329    ) -> Result<RoomGuard<proto::Room>> {
1330        self.room_transaction(|tx| async {
1331            let tx = tx;
1332            let location_kind;
1333            let location_project_id;
1334            match location
1335                .variant
1336                .as_ref()
1337                .ok_or_else(|| anyhow!("invalid location"))?
1338            {
1339                proto::participant_location::Variant::SharedProject(project) => {
1340                    location_kind = 0;
1341                    location_project_id = Some(ProjectId::from_proto(project.id));
1342                }
1343                proto::participant_location::Variant::UnsharedProject(_) => {
1344                    location_kind = 1;
1345                    location_project_id = None;
1346                }
1347                proto::participant_location::Variant::External(_) => {
1348                    location_kind = 2;
1349                    location_project_id = None;
1350                }
1351            }
1352
1353            let result = room_participant::Entity::update_many()
1354                .filter(
1355                    room_participant::Column::RoomId
1356                        .eq(room_id)
1357                        .and(room_participant::Column::AnsweringConnectionId.eq(connection_id.0)),
1358                )
1359                .set(room_participant::ActiveModel {
1360                    location_kind: ActiveValue::set(Some(location_kind)),
1361                    location_project_id: ActiveValue::set(location_project_id),
1362                    ..Default::default()
1363                })
1364                .exec(&*tx)
1365                .await?;
1366
1367            if result.rows_affected == 1 {
1368                let room = self.get_room(room_id, &tx).await?;
1369                Ok((room_id, room))
1370            } else {
1371                Err(anyhow!("could not update room participant location"))?
1372            }
1373        })
1374        .await
1375    }
1376
1377    pub async fn connection_lost(
1378        &self,
1379        connection_id: ConnectionId,
1380    ) -> Result<RoomGuard<Vec<LeftProject>>> {
1381        self.room_transaction(|tx| async move {
1382            let participant = room_participant::Entity::find()
1383                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32))
1384                .one(&*tx)
1385                .await?
1386                .ok_or_else(|| anyhow!("not a participant in any room"))?;
1387            let room_id = participant.room_id;
1388
1389            room_participant::Entity::update(room_participant::ActiveModel {
1390                connection_lost: ActiveValue::set(true),
1391                ..participant.into_active_model()
1392            })
1393            .exec(&*tx)
1394            .await?;
1395
1396            let collaborator_on_projects = project_collaborator::Entity::find()
1397                .find_also_related(project::Entity)
1398                .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32))
1399                .all(&*tx)
1400                .await?;
1401            project_collaborator::Entity::delete_many()
1402                .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0))
1403                .exec(&*tx)
1404                .await?;
1405
1406            let mut left_projects = Vec::new();
1407            for (_, project) in collaborator_on_projects {
1408                if let Some(project) = project {
1409                    let collaborators = project
1410                        .find_related(project_collaborator::Entity)
1411                        .all(&*tx)
1412                        .await?;
1413                    let connection_ids = collaborators
1414                        .into_iter()
1415                        .map(|collaborator| ConnectionId(collaborator.connection_id as u32))
1416                        .collect();
1417
1418                    left_projects.push(LeftProject {
1419                        id: project.id,
1420                        host_user_id: project.host_user_id,
1421                        host_connection_id: ConnectionId(project.host_connection_id as u32),
1422                        connection_ids,
1423                    });
1424                }
1425            }
1426
1427            Ok((room_id, left_projects))
1428        })
1429        .await
1430    }
1431
1432    fn build_incoming_call(
1433        room: &proto::Room,
1434        called_user_id: UserId,
1435    ) -> Option<proto::IncomingCall> {
1436        let pending_participant = room
1437            .pending_participants
1438            .iter()
1439            .find(|participant| participant.user_id == called_user_id.to_proto())?;
1440
1441        Some(proto::IncomingCall {
1442            room_id: room.id,
1443            calling_user_id: pending_participant.calling_user_id,
1444            participant_user_ids: room
1445                .participants
1446                .iter()
1447                .map(|participant| participant.user_id)
1448                .collect(),
1449            initial_project: room.participants.iter().find_map(|participant| {
1450                let initial_project_id = pending_participant.initial_project_id?;
1451                participant
1452                    .projects
1453                    .iter()
1454                    .find(|project| project.id == initial_project_id)
1455                    .cloned()
1456            }),
1457        })
1458    }
1459
1460    async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1461        let db_room = room::Entity::find_by_id(room_id)
1462            .one(tx)
1463            .await?
1464            .ok_or_else(|| anyhow!("could not find room"))?;
1465
1466        let mut db_participants = db_room
1467            .find_related(room_participant::Entity)
1468            .stream(tx)
1469            .await?;
1470        let mut participants = HashMap::default();
1471        let mut pending_participants = Vec::new();
1472        while let Some(db_participant) = db_participants.next().await {
1473            let db_participant = db_participant?;
1474            if let Some(answering_connection_id) = db_participant.answering_connection_id {
1475                let location = match (
1476                    db_participant.location_kind,
1477                    db_participant.location_project_id,
1478                ) {
1479                    (Some(0), Some(project_id)) => {
1480                        Some(proto::participant_location::Variant::SharedProject(
1481                            proto::participant_location::SharedProject {
1482                                id: project_id.to_proto(),
1483                            },
1484                        ))
1485                    }
1486                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1487                        Default::default(),
1488                    )),
1489                    _ => Some(proto::participant_location::Variant::External(
1490                        Default::default(),
1491                    )),
1492                };
1493                participants.insert(
1494                    answering_connection_id,
1495                    proto::Participant {
1496                        user_id: db_participant.user_id.to_proto(),
1497                        peer_id: answering_connection_id as u32,
1498                        projects: Default::default(),
1499                        location: Some(proto::ParticipantLocation { variant: location }),
1500                    },
1501                );
1502            } else {
1503                pending_participants.push(proto::PendingParticipant {
1504                    user_id: db_participant.user_id.to_proto(),
1505                    calling_user_id: db_participant.calling_user_id.to_proto(),
1506                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1507                });
1508            }
1509        }
1510        drop(db_participants);
1511
1512        let mut db_projects = db_room
1513            .find_related(project::Entity)
1514            .find_with_related(worktree::Entity)
1515            .stream(tx)
1516            .await?;
1517
1518        while let Some(row) = db_projects.next().await {
1519            let (db_project, db_worktree) = row?;
1520            if let Some(participant) = participants.get_mut(&db_project.host_connection_id) {
1521                let project = if let Some(project) = participant
1522                    .projects
1523                    .iter_mut()
1524                    .find(|project| project.id == db_project.id.to_proto())
1525                {
1526                    project
1527                } else {
1528                    participant.projects.push(proto::ParticipantProject {
1529                        id: db_project.id.to_proto(),
1530                        worktree_root_names: Default::default(),
1531                    });
1532                    participant.projects.last_mut().unwrap()
1533                };
1534
1535                if let Some(db_worktree) = db_worktree {
1536                    project.worktree_root_names.push(db_worktree.root_name);
1537                }
1538            }
1539        }
1540
1541        Ok(proto::Room {
1542            id: db_room.id.to_proto(),
1543            live_kit_room: db_room.live_kit_room,
1544            participants: participants.into_values().collect(),
1545            pending_participants,
1546        })
1547    }
1548
1549    // projects
1550
1551    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
1552        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1553        enum QueryAs {
1554            Count,
1555        }
1556
1557        self.transaction(|tx| async move {
1558            Ok(project::Entity::find()
1559                .select_only()
1560                .column_as(project::Column::Id.count(), QueryAs::Count)
1561                .inner_join(user::Entity)
1562                .filter(user::Column::Admin.eq(false))
1563                .into_values::<_, QueryAs>()
1564                .one(&*tx)
1565                .await?
1566                .unwrap_or(0i64) as usize)
1567        })
1568        .await
1569    }
1570
1571    pub async fn share_project(
1572        &self,
1573        room_id: RoomId,
1574        connection_id: ConnectionId,
1575        worktrees: &[proto::WorktreeMetadata],
1576    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
1577        self.room_transaction(|tx| async move {
1578            let participant = room_participant::Entity::find()
1579                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0))
1580                .one(&*tx)
1581                .await?
1582                .ok_or_else(|| anyhow!("could not find participant"))?;
1583            if participant.room_id != room_id {
1584                return Err(anyhow!("shared project on unexpected room"))?;
1585            }
1586
1587            let project = project::ActiveModel {
1588                room_id: ActiveValue::set(participant.room_id),
1589                host_user_id: ActiveValue::set(participant.user_id),
1590                host_connection_id: ActiveValue::set(connection_id.0 as i32),
1591                host_connection_epoch: ActiveValue::set(self.epoch),
1592                ..Default::default()
1593            }
1594            .insert(&*tx)
1595            .await?;
1596
1597            if !worktrees.is_empty() {
1598                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
1599                    worktree::ActiveModel {
1600                        id: ActiveValue::set(worktree.id as i64),
1601                        project_id: ActiveValue::set(project.id),
1602                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
1603                        root_name: ActiveValue::set(worktree.root_name.clone()),
1604                        visible: ActiveValue::set(worktree.visible),
1605                        scan_id: ActiveValue::set(0),
1606                        is_complete: ActiveValue::set(false),
1607                    }
1608                }))
1609                .exec(&*tx)
1610                .await?;
1611            }
1612
1613            project_collaborator::ActiveModel {
1614                project_id: ActiveValue::set(project.id),
1615                connection_id: ActiveValue::set(connection_id.0 as i32),
1616                connection_epoch: ActiveValue::set(self.epoch),
1617                user_id: ActiveValue::set(participant.user_id),
1618                replica_id: ActiveValue::set(ReplicaId(0)),
1619                is_host: ActiveValue::set(true),
1620                ..Default::default()
1621            }
1622            .insert(&*tx)
1623            .await?;
1624
1625            let room = self.get_room(room_id, &tx).await?;
1626            Ok((room_id, (project.id, room)))
1627        })
1628        .await
1629    }
1630
1631    pub async fn unshare_project(
1632        &self,
1633        project_id: ProjectId,
1634        connection_id: ConnectionId,
1635    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
1636        self.room_transaction(|tx| async move {
1637            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
1638
1639            let project = project::Entity::find_by_id(project_id)
1640                .one(&*tx)
1641                .await?
1642                .ok_or_else(|| anyhow!("project not found"))?;
1643            if project.host_connection_id == connection_id.0 as i32 {
1644                let room_id = project.room_id;
1645                project::Entity::delete(project.into_active_model())
1646                    .exec(&*tx)
1647                    .await?;
1648                let room = self.get_room(room_id, &tx).await?;
1649                Ok((room_id, (room, guest_connection_ids)))
1650            } else {
1651                Err(anyhow!("cannot unshare a project hosted by another user"))?
1652            }
1653        })
1654        .await
1655    }
1656
1657    pub async fn update_project(
1658        &self,
1659        project_id: ProjectId,
1660        connection_id: ConnectionId,
1661        worktrees: &[proto::WorktreeMetadata],
1662    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
1663        self.room_transaction(|tx| async move {
1664            let project = project::Entity::find_by_id(project_id)
1665                .filter(project::Column::HostConnectionId.eq(connection_id.0))
1666                .one(&*tx)
1667                .await?
1668                .ok_or_else(|| anyhow!("no such project"))?;
1669
1670            if !worktrees.is_empty() {
1671                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
1672                    worktree::ActiveModel {
1673                        id: ActiveValue::set(worktree.id as i64),
1674                        project_id: ActiveValue::set(project.id),
1675                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
1676                        root_name: ActiveValue::set(worktree.root_name.clone()),
1677                        visible: ActiveValue::set(worktree.visible),
1678                        scan_id: ActiveValue::set(0),
1679                        is_complete: ActiveValue::set(false),
1680                    }
1681                }))
1682                .on_conflict(
1683                    OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
1684                        .update_column(worktree::Column::RootName)
1685                        .to_owned(),
1686                )
1687                .exec(&*tx)
1688                .await?;
1689            }
1690
1691            worktree::Entity::delete_many()
1692                .filter(
1693                    worktree::Column::ProjectId.eq(project.id).and(
1694                        worktree::Column::Id
1695                            .is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
1696                    ),
1697                )
1698                .exec(&*tx)
1699                .await?;
1700
1701            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
1702            let room = self.get_room(project.room_id, &tx).await?;
1703            Ok((project.room_id, (room, guest_connection_ids)))
1704        })
1705        .await
1706    }
1707
1708    pub async fn update_worktree(
1709        &self,
1710        update: &proto::UpdateWorktree,
1711        connection_id: ConnectionId,
1712    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
1713        self.room_transaction(|tx| async move {
1714            let project_id = ProjectId::from_proto(update.project_id);
1715            let worktree_id = update.worktree_id as i64;
1716
1717            // Ensure the update comes from the host.
1718            let project = project::Entity::find_by_id(project_id)
1719                .filter(project::Column::HostConnectionId.eq(connection_id.0))
1720                .one(&*tx)
1721                .await?
1722                .ok_or_else(|| anyhow!("no such project"))?;
1723            let room_id = project.room_id;
1724
1725            // Update metadata.
1726            worktree::Entity::update(worktree::ActiveModel {
1727                id: ActiveValue::set(worktree_id),
1728                project_id: ActiveValue::set(project_id),
1729                root_name: ActiveValue::set(update.root_name.clone()),
1730                scan_id: ActiveValue::set(update.scan_id as i64),
1731                is_complete: ActiveValue::set(update.is_last_update),
1732                abs_path: ActiveValue::set(update.abs_path.clone()),
1733                ..Default::default()
1734            })
1735            .exec(&*tx)
1736            .await?;
1737
1738            if !update.updated_entries.is_empty() {
1739                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
1740                    let mtime = entry.mtime.clone().unwrap_or_default();
1741                    worktree_entry::ActiveModel {
1742                        project_id: ActiveValue::set(project_id),
1743                        worktree_id: ActiveValue::set(worktree_id),
1744                        id: ActiveValue::set(entry.id as i64),
1745                        is_dir: ActiveValue::set(entry.is_dir),
1746                        path: ActiveValue::set(entry.path.clone()),
1747                        inode: ActiveValue::set(entry.inode as i64),
1748                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
1749                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
1750                        is_symlink: ActiveValue::set(entry.is_symlink),
1751                        is_ignored: ActiveValue::set(entry.is_ignored),
1752                    }
1753                }))
1754                .on_conflict(
1755                    OnConflict::columns([
1756                        worktree_entry::Column::ProjectId,
1757                        worktree_entry::Column::WorktreeId,
1758                        worktree_entry::Column::Id,
1759                    ])
1760                    .update_columns([
1761                        worktree_entry::Column::IsDir,
1762                        worktree_entry::Column::Path,
1763                        worktree_entry::Column::Inode,
1764                        worktree_entry::Column::MtimeSeconds,
1765                        worktree_entry::Column::MtimeNanos,
1766                        worktree_entry::Column::IsSymlink,
1767                        worktree_entry::Column::IsIgnored,
1768                    ])
1769                    .to_owned(),
1770                )
1771                .exec(&*tx)
1772                .await?;
1773            }
1774
1775            if !update.removed_entries.is_empty() {
1776                worktree_entry::Entity::delete_many()
1777                    .filter(
1778                        worktree_entry::Column::ProjectId
1779                            .eq(project_id)
1780                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
1781                            .and(
1782                                worktree_entry::Column::Id
1783                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
1784                            ),
1785                    )
1786                    .exec(&*tx)
1787                    .await?;
1788            }
1789
1790            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
1791            Ok((room_id, connection_ids))
1792        })
1793        .await
1794    }
1795
1796    pub async fn update_diagnostic_summary(
1797        &self,
1798        update: &proto::UpdateDiagnosticSummary,
1799        connection_id: ConnectionId,
1800    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
1801        self.room_transaction(|tx| async move {
1802            let project_id = ProjectId::from_proto(update.project_id);
1803            let worktree_id = update.worktree_id as i64;
1804            let summary = update
1805                .summary
1806                .as_ref()
1807                .ok_or_else(|| anyhow!("invalid summary"))?;
1808
1809            // Ensure the update comes from the host.
1810            let project = project::Entity::find_by_id(project_id)
1811                .one(&*tx)
1812                .await?
1813                .ok_or_else(|| anyhow!("no such project"))?;
1814            if project.host_connection_id != connection_id.0 as i32 {
1815                return Err(anyhow!("can't update a project hosted by someone else"))?;
1816            }
1817
1818            // Update summary.
1819            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
1820                project_id: ActiveValue::set(project_id),
1821                worktree_id: ActiveValue::set(worktree_id),
1822                path: ActiveValue::set(summary.path.clone()),
1823                language_server_id: ActiveValue::set(summary.language_server_id as i64),
1824                error_count: ActiveValue::set(summary.error_count as i32),
1825                warning_count: ActiveValue::set(summary.warning_count as i32),
1826                ..Default::default()
1827            })
1828            .on_conflict(
1829                OnConflict::columns([
1830                    worktree_diagnostic_summary::Column::ProjectId,
1831                    worktree_diagnostic_summary::Column::WorktreeId,
1832                    worktree_diagnostic_summary::Column::Path,
1833                ])
1834                .update_columns([
1835                    worktree_diagnostic_summary::Column::LanguageServerId,
1836                    worktree_diagnostic_summary::Column::ErrorCount,
1837                    worktree_diagnostic_summary::Column::WarningCount,
1838                ])
1839                .to_owned(),
1840            )
1841            .exec(&*tx)
1842            .await?;
1843
1844            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
1845            Ok((project.room_id, connection_ids))
1846        })
1847        .await
1848    }
1849
1850    pub async fn start_language_server(
1851        &self,
1852        update: &proto::StartLanguageServer,
1853        connection_id: ConnectionId,
1854    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
1855        self.room_transaction(|tx| async move {
1856            let project_id = ProjectId::from_proto(update.project_id);
1857            let server = update
1858                .server
1859                .as_ref()
1860                .ok_or_else(|| anyhow!("invalid language server"))?;
1861
1862            // Ensure the update comes from the host.
1863            let project = project::Entity::find_by_id(project_id)
1864                .one(&*tx)
1865                .await?
1866                .ok_or_else(|| anyhow!("no such project"))?;
1867            if project.host_connection_id != connection_id.0 as i32 {
1868                return Err(anyhow!("can't update a project hosted by someone else"))?;
1869            }
1870
1871            // Add the newly-started language server.
1872            language_server::Entity::insert(language_server::ActiveModel {
1873                project_id: ActiveValue::set(project_id),
1874                id: ActiveValue::set(server.id as i64),
1875                name: ActiveValue::set(server.name.clone()),
1876                ..Default::default()
1877            })
1878            .on_conflict(
1879                OnConflict::columns([
1880                    language_server::Column::ProjectId,
1881                    language_server::Column::Id,
1882                ])
1883                .update_column(language_server::Column::Name)
1884                .to_owned(),
1885            )
1886            .exec(&*tx)
1887            .await?;
1888
1889            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
1890            Ok((project.room_id, connection_ids))
1891        })
1892        .await
1893    }
1894
1895    pub async fn join_project(
1896        &self,
1897        project_id: ProjectId,
1898        connection_id: ConnectionId,
1899    ) -> Result<RoomGuard<(Project, ReplicaId)>> {
1900        self.room_transaction(|tx| async move {
1901            let participant = room_participant::Entity::find()
1902                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0))
1903                .one(&*tx)
1904                .await?
1905                .ok_or_else(|| anyhow!("must join a room first"))?;
1906
1907            let project = project::Entity::find_by_id(project_id)
1908                .one(&*tx)
1909                .await?
1910                .ok_or_else(|| anyhow!("no such project"))?;
1911            if project.room_id != participant.room_id {
1912                return Err(anyhow!("no such project"))?;
1913            }
1914
1915            let mut collaborators = project
1916                .find_related(project_collaborator::Entity)
1917                .all(&*tx)
1918                .await?;
1919            let replica_ids = collaborators
1920                .iter()
1921                .map(|c| c.replica_id)
1922                .collect::<HashSet<_>>();
1923            let mut replica_id = ReplicaId(1);
1924            while replica_ids.contains(&replica_id) {
1925                replica_id.0 += 1;
1926            }
1927            let new_collaborator = project_collaborator::ActiveModel {
1928                project_id: ActiveValue::set(project_id),
1929                connection_id: ActiveValue::set(connection_id.0 as i32),
1930                connection_epoch: ActiveValue::set(self.epoch),
1931                user_id: ActiveValue::set(participant.user_id),
1932                replica_id: ActiveValue::set(replica_id),
1933                is_host: ActiveValue::set(false),
1934                ..Default::default()
1935            }
1936            .insert(&*tx)
1937            .await?;
1938            collaborators.push(new_collaborator);
1939
1940            let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1941            let mut worktrees = db_worktrees
1942                .into_iter()
1943                .map(|db_worktree| {
1944                    (
1945                        db_worktree.id as u64,
1946                        Worktree {
1947                            id: db_worktree.id as u64,
1948                            abs_path: db_worktree.abs_path,
1949                            root_name: db_worktree.root_name,
1950                            visible: db_worktree.visible,
1951                            entries: Default::default(),
1952                            diagnostic_summaries: Default::default(),
1953                            scan_id: db_worktree.scan_id as u64,
1954                            is_complete: db_worktree.is_complete,
1955                        },
1956                    )
1957                })
1958                .collect::<BTreeMap<_, _>>();
1959
1960            // Populate worktree entries.
1961            {
1962                let mut db_entries = worktree_entry::Entity::find()
1963                    .filter(worktree_entry::Column::ProjectId.eq(project_id))
1964                    .stream(&*tx)
1965                    .await?;
1966                while let Some(db_entry) = db_entries.next().await {
1967                    let db_entry = db_entry?;
1968                    if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
1969                        worktree.entries.push(proto::Entry {
1970                            id: db_entry.id as u64,
1971                            is_dir: db_entry.is_dir,
1972                            path: db_entry.path,
1973                            inode: db_entry.inode as u64,
1974                            mtime: Some(proto::Timestamp {
1975                                seconds: db_entry.mtime_seconds as u64,
1976                                nanos: db_entry.mtime_nanos as u32,
1977                            }),
1978                            is_symlink: db_entry.is_symlink,
1979                            is_ignored: db_entry.is_ignored,
1980                        });
1981                    }
1982                }
1983            }
1984
1985            // Populate worktree diagnostic summaries.
1986            {
1987                let mut db_summaries = worktree_diagnostic_summary::Entity::find()
1988                    .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
1989                    .stream(&*tx)
1990                    .await?;
1991                while let Some(db_summary) = db_summaries.next().await {
1992                    let db_summary = db_summary?;
1993                    if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
1994                        worktree
1995                            .diagnostic_summaries
1996                            .push(proto::DiagnosticSummary {
1997                                path: db_summary.path,
1998                                language_server_id: db_summary.language_server_id as u64,
1999                                error_count: db_summary.error_count as u32,
2000                                warning_count: db_summary.warning_count as u32,
2001                            });
2002                    }
2003                }
2004            }
2005
2006            // Populate language servers.
2007            let language_servers = project
2008                .find_related(language_server::Entity)
2009                .all(&*tx)
2010                .await?;
2011
2012            let room_id = project.room_id;
2013            let project = Project {
2014                collaborators,
2015                worktrees,
2016                language_servers: language_servers
2017                    .into_iter()
2018                    .map(|language_server| proto::LanguageServer {
2019                        id: language_server.id as u64,
2020                        name: language_server.name,
2021                    })
2022                    .collect(),
2023            };
2024            Ok((room_id, (project, replica_id as ReplicaId)))
2025        })
2026        .await
2027    }
2028
2029    pub async fn leave_project(
2030        &self,
2031        project_id: ProjectId,
2032        connection_id: ConnectionId,
2033    ) -> Result<RoomGuard<LeftProject>> {
2034        self.room_transaction(|tx| async move {
2035            let result = project_collaborator::Entity::delete_many()
2036                .filter(
2037                    project_collaborator::Column::ProjectId
2038                        .eq(project_id)
2039                        .and(project_collaborator::Column::ConnectionId.eq(connection_id.0)),
2040                )
2041                .exec(&*tx)
2042                .await?;
2043            if result.rows_affected == 0 {
2044                Err(anyhow!("not a collaborator on this project"))?;
2045            }
2046
2047            let project = project::Entity::find_by_id(project_id)
2048                .one(&*tx)
2049                .await?
2050                .ok_or_else(|| anyhow!("no such project"))?;
2051            let collaborators = project
2052                .find_related(project_collaborator::Entity)
2053                .all(&*tx)
2054                .await?;
2055            let connection_ids = collaborators
2056                .into_iter()
2057                .map(|collaborator| ConnectionId(collaborator.connection_id as u32))
2058                .collect();
2059
2060            let left_project = LeftProject {
2061                id: project_id,
2062                host_user_id: project.host_user_id,
2063                host_connection_id: ConnectionId(project.host_connection_id as u32),
2064                connection_ids,
2065            };
2066            Ok((project.room_id, left_project))
2067        })
2068        .await
2069    }
2070
2071    pub async fn project_collaborators(
2072        &self,
2073        project_id: ProjectId,
2074        connection_id: ConnectionId,
2075    ) -> Result<RoomGuard<Vec<project_collaborator::Model>>> {
2076        self.room_transaction(|tx| async move {
2077            let project = project::Entity::find_by_id(project_id)
2078                .one(&*tx)
2079                .await?
2080                .ok_or_else(|| anyhow!("no such project"))?;
2081            let collaborators = project_collaborator::Entity::find()
2082                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2083                .all(&*tx)
2084                .await?;
2085
2086            if collaborators
2087                .iter()
2088                .any(|collaborator| collaborator.connection_id == connection_id.0 as i32)
2089            {
2090                Ok((project.room_id, collaborators))
2091            } else {
2092                Err(anyhow!("no such project"))?
2093            }
2094        })
2095        .await
2096    }
2097
2098    pub async fn project_connection_ids(
2099        &self,
2100        project_id: ProjectId,
2101        connection_id: ConnectionId,
2102    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
2103        self.room_transaction(|tx| async move {
2104            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2105            enum QueryAs {
2106                ConnectionId,
2107            }
2108
2109            let project = project::Entity::find_by_id(project_id)
2110                .one(&*tx)
2111                .await?
2112                .ok_or_else(|| anyhow!("no such project"))?;
2113            let mut db_connection_ids = project_collaborator::Entity::find()
2114                .select_only()
2115                .column_as(
2116                    project_collaborator::Column::ConnectionId,
2117                    QueryAs::ConnectionId,
2118                )
2119                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2120                .into_values::<i32, QueryAs>()
2121                .stream(&*tx)
2122                .await?;
2123
2124            let mut connection_ids = HashSet::default();
2125            while let Some(connection_id) = db_connection_ids.next().await {
2126                connection_ids.insert(ConnectionId(connection_id? as u32));
2127            }
2128
2129            if connection_ids.contains(&connection_id) {
2130                Ok((project.room_id, connection_ids))
2131            } else {
2132                Err(anyhow!("no such project"))?
2133            }
2134        })
2135        .await
2136    }
2137
2138    async fn project_guest_connection_ids(
2139        &self,
2140        project_id: ProjectId,
2141        tx: &DatabaseTransaction,
2142    ) -> Result<Vec<ConnectionId>> {
2143        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2144        enum QueryAs {
2145            ConnectionId,
2146        }
2147
2148        let mut db_guest_connection_ids = project_collaborator::Entity::find()
2149            .select_only()
2150            .column_as(
2151                project_collaborator::Column::ConnectionId,
2152                QueryAs::ConnectionId,
2153            )
2154            .filter(
2155                project_collaborator::Column::ProjectId
2156                    .eq(project_id)
2157                    .and(project_collaborator::Column::IsHost.eq(false)),
2158            )
2159            .into_values::<i32, QueryAs>()
2160            .stream(tx)
2161            .await?;
2162
2163        let mut guest_connection_ids = Vec::new();
2164        while let Some(connection_id) = db_guest_connection_ids.next().await {
2165            guest_connection_ids.push(ConnectionId(connection_id? as u32));
2166        }
2167        Ok(guest_connection_ids)
2168    }
2169
2170    // access tokens
2171
2172    pub async fn create_access_token_hash(
2173        &self,
2174        user_id: UserId,
2175        access_token_hash: &str,
2176        max_access_token_count: usize,
2177    ) -> Result<()> {
2178        self.transaction(|tx| async {
2179            let tx = tx;
2180
2181            access_token::ActiveModel {
2182                user_id: ActiveValue::set(user_id),
2183                hash: ActiveValue::set(access_token_hash.into()),
2184                ..Default::default()
2185            }
2186            .insert(&*tx)
2187            .await?;
2188
2189            access_token::Entity::delete_many()
2190                .filter(
2191                    access_token::Column::Id.in_subquery(
2192                        Query::select()
2193                            .column(access_token::Column::Id)
2194                            .from(access_token::Entity)
2195                            .and_where(access_token::Column::UserId.eq(user_id))
2196                            .order_by(access_token::Column::Id, sea_orm::Order::Desc)
2197                            .limit(10000)
2198                            .offset(max_access_token_count as u64)
2199                            .to_owned(),
2200                    ),
2201                )
2202                .exec(&*tx)
2203                .await?;
2204            Ok(())
2205        })
2206        .await
2207    }
2208
2209    pub async fn get_access_token_hashes(&self, user_id: UserId) -> Result<Vec<String>> {
2210        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2211        enum QueryAs {
2212            Hash,
2213        }
2214
2215        self.transaction(|tx| async move {
2216            Ok(access_token::Entity::find()
2217                .select_only()
2218                .column(access_token::Column::Hash)
2219                .filter(access_token::Column::UserId.eq(user_id))
2220                .order_by_desc(access_token::Column::Id)
2221                .into_values::<_, QueryAs>()
2222                .all(&*tx)
2223                .await?)
2224        })
2225        .await
2226    }
2227
2228    async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
2229    where
2230        F: Send + Fn(TransactionHandle) -> Fut,
2231        Fut: Send + Future<Output = Result<T>>,
2232    {
2233        let body = async {
2234            loop {
2235                let (tx, result) = self.with_transaction(&f).await?;
2236                match result {
2237                    Ok(result) => {
2238                        match tx.commit().await.map_err(Into::into) {
2239                            Ok(()) => return Ok(result),
2240                            Err(error) => {
2241                                if is_serialization_error(&error) {
2242                                    // Retry (don't break the loop)
2243                                } else {
2244                                    return Err(error);
2245                                }
2246                            }
2247                        }
2248                    }
2249                    Err(error) => {
2250                        tx.rollback().await?;
2251                        if is_serialization_error(&error) {
2252                            // Retry (don't break the loop)
2253                        } else {
2254                            return Err(error);
2255                        }
2256                    }
2257                }
2258            }
2259        };
2260
2261        self.run(body).await
2262    }
2263
2264    async fn room_transaction<F, Fut, T>(&self, f: F) -> Result<RoomGuard<T>>
2265    where
2266        F: Send + Fn(TransactionHandle) -> Fut,
2267        Fut: Send + Future<Output = Result<(RoomId, T)>>,
2268    {
2269        let body = async {
2270            loop {
2271                let (tx, result) = self.with_transaction(&f).await?;
2272                match result {
2273                    Ok((room_id, data)) => {
2274                        let lock = self.rooms.entry(room_id).or_default().clone();
2275                        let _guard = lock.lock_owned().await;
2276                        match tx.commit().await.map_err(Into::into) {
2277                            Ok(()) => {
2278                                return Ok(RoomGuard {
2279                                    data,
2280                                    _guard,
2281                                    _not_send: PhantomData,
2282                                });
2283                            }
2284                            Err(error) => {
2285                                if is_serialization_error(&error) {
2286                                    // Retry (don't break the loop)
2287                                } else {
2288                                    return Err(error);
2289                                }
2290                            }
2291                        }
2292                    }
2293                    Err(error) => {
2294                        tx.rollback().await?;
2295                        if is_serialization_error(&error) {
2296                            // Retry (don't break the loop)
2297                        } else {
2298                            return Err(error);
2299                        }
2300                    }
2301                }
2302            }
2303        };
2304
2305        self.run(body).await
2306    }
2307
2308    async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
2309    where
2310        F: Send + Fn(TransactionHandle) -> Fut,
2311        Fut: Send + Future<Output = Result<T>>,
2312    {
2313        let tx = self
2314            .pool
2315            .begin_with_config(Some(IsolationLevel::Serializable), None)
2316            .await?;
2317
2318        let mut tx = Arc::new(Some(tx));
2319        let result = f(TransactionHandle(tx.clone())).await;
2320        let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
2321            return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
2322        };
2323
2324        Ok((tx, result))
2325    }
2326
2327    async fn run<F, T>(&self, future: F) -> T
2328    where
2329        F: Future<Output = T>,
2330    {
2331        #[cfg(test)]
2332        {
2333            if let Some(background) = self.background.as_ref() {
2334                background.simulate_random_delay().await;
2335            }
2336
2337            self.runtime.as_ref().unwrap().block_on(future)
2338        }
2339
2340        #[cfg(not(test))]
2341        {
2342            future.await
2343        }
2344    }
2345}
2346
2347fn is_serialization_error(error: &Error) -> bool {
2348    const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
2349    match error {
2350        Error::Database(
2351            DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
2352            | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
2353        ) if error
2354            .as_database_error()
2355            .and_then(|error| error.code())
2356            .as_deref()
2357            == Some(SERIALIZATION_FAILURE_CODE) =>
2358        {
2359            true
2360        }
2361        _ => false,
2362    }
2363}
2364
2365struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
2366
2367impl Deref for TransactionHandle {
2368    type Target = DatabaseTransaction;
2369
2370    fn deref(&self) -> &Self::Target {
2371        self.0.as_ref().as_ref().unwrap()
2372    }
2373}
2374
2375pub struct RoomGuard<T> {
2376    data: T,
2377    _guard: OwnedMutexGuard<()>,
2378    _not_send: PhantomData<Rc<()>>,
2379}
2380
2381impl<T> Deref for RoomGuard<T> {
2382    type Target = T;
2383
2384    fn deref(&self) -> &T {
2385        &self.data
2386    }
2387}
2388
2389impl<T> DerefMut for RoomGuard<T> {
2390    fn deref_mut(&mut self) -> &mut T {
2391        &mut self.data
2392    }
2393}
2394
2395#[derive(Debug, Serialize, Deserialize)]
2396pub struct NewUserParams {
2397    pub github_login: String,
2398    pub github_user_id: i32,
2399    pub invite_count: i32,
2400}
2401
2402#[derive(Debug)]
2403pub struct NewUserResult {
2404    pub user_id: UserId,
2405    pub metrics_id: String,
2406    pub inviting_user_id: Option<UserId>,
2407    pub signup_device_id: Option<String>,
2408}
2409
2410fn random_invite_code() -> String {
2411    nanoid::nanoid!(16)
2412}
2413
2414fn random_email_confirmation_code() -> String {
2415    nanoid::nanoid!(64)
2416}
2417
2418macro_rules! id_type {
2419    ($name:ident) => {
2420        #[derive(
2421            Clone,
2422            Copy,
2423            Debug,
2424            Default,
2425            PartialEq,
2426            Eq,
2427            PartialOrd,
2428            Ord,
2429            Hash,
2430            Serialize,
2431            Deserialize,
2432        )]
2433        #[serde(transparent)]
2434        pub struct $name(pub i32);
2435
2436        impl $name {
2437            #[allow(unused)]
2438            pub const MAX: Self = Self(i32::MAX);
2439
2440            #[allow(unused)]
2441            pub fn from_proto(value: u64) -> Self {
2442                Self(value as i32)
2443            }
2444
2445            #[allow(unused)]
2446            pub fn to_proto(self) -> u64 {
2447                self.0 as u64
2448            }
2449        }
2450
2451        impl std::fmt::Display for $name {
2452            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2453                self.0.fmt(f)
2454            }
2455        }
2456
2457        impl From<$name> for sea_query::Value {
2458            fn from(value: $name) -> Self {
2459                sea_query::Value::Int(Some(value.0))
2460            }
2461        }
2462
2463        impl sea_orm::TryGetable for $name {
2464            fn try_get(
2465                res: &sea_orm::QueryResult,
2466                pre: &str,
2467                col: &str,
2468            ) -> Result<Self, sea_orm::TryGetError> {
2469                Ok(Self(i32::try_get(res, pre, col)?))
2470            }
2471        }
2472
2473        impl sea_query::ValueType for $name {
2474            fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
2475                match v {
2476                    Value::TinyInt(Some(int)) => {
2477                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2478                    }
2479                    Value::SmallInt(Some(int)) => {
2480                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2481                    }
2482                    Value::Int(Some(int)) => {
2483                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2484                    }
2485                    Value::BigInt(Some(int)) => {
2486                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2487                    }
2488                    Value::TinyUnsigned(Some(int)) => {
2489                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2490                    }
2491                    Value::SmallUnsigned(Some(int)) => {
2492                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2493                    }
2494                    Value::Unsigned(Some(int)) => {
2495                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2496                    }
2497                    Value::BigUnsigned(Some(int)) => {
2498                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2499                    }
2500                    _ => Err(sea_query::ValueTypeErr),
2501                }
2502            }
2503
2504            fn type_name() -> String {
2505                stringify!($name).into()
2506            }
2507
2508            fn array_type() -> sea_query::ArrayType {
2509                sea_query::ArrayType::Int
2510            }
2511
2512            fn column_type() -> sea_query::ColumnType {
2513                sea_query::ColumnType::Integer(None)
2514            }
2515        }
2516
2517        impl sea_orm::TryFromU64 for $name {
2518            fn try_from_u64(n: u64) -> Result<Self, DbErr> {
2519                Ok(Self(n.try_into().map_err(|_| {
2520                    DbErr::ConvertFromU64(concat!(
2521                        "error converting ",
2522                        stringify!($name),
2523                        " to u64"
2524                    ))
2525                })?))
2526            }
2527        }
2528
2529        impl sea_query::Nullable for $name {
2530            fn null() -> Value {
2531                Value::Int(None)
2532            }
2533        }
2534    };
2535}
2536
2537id_type!(AccessTokenId);
2538id_type!(ContactId);
2539id_type!(RoomId);
2540id_type!(RoomParticipantId);
2541id_type!(ProjectId);
2542id_type!(ProjectCollaboratorId);
2543id_type!(ReplicaId);
2544id_type!(SignupId);
2545id_type!(UserId);
2546
2547pub struct LeftRoom {
2548    pub room: proto::Room,
2549    pub left_projects: HashMap<ProjectId, LeftProject>,
2550    pub canceled_calls_to_user_ids: Vec<UserId>,
2551}
2552
2553pub struct Project {
2554    pub collaborators: Vec<project_collaborator::Model>,
2555    pub worktrees: BTreeMap<u64, Worktree>,
2556    pub language_servers: Vec<proto::LanguageServer>,
2557}
2558
2559pub struct LeftProject {
2560    pub id: ProjectId,
2561    pub host_user_id: UserId,
2562    pub host_connection_id: ConnectionId,
2563    pub connection_ids: Vec<ConnectionId>,
2564}
2565
2566pub struct Worktree {
2567    pub id: u64,
2568    pub abs_path: String,
2569    pub root_name: String,
2570    pub visible: bool,
2571    pub entries: Vec<proto::Entry>,
2572    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
2573    pub scan_id: u64,
2574    pub is_complete: bool,
2575}
2576
2577#[cfg(test)]
2578pub use test::*;
2579
2580#[cfg(test)]
2581mod test {
2582    use super::*;
2583    use gpui::executor::Background;
2584    use lazy_static::lazy_static;
2585    use parking_lot::Mutex;
2586    use rand::prelude::*;
2587    use sea_orm::ConnectionTrait;
2588    use sqlx::migrate::MigrateDatabase;
2589    use std::sync::Arc;
2590
2591    pub struct TestDb {
2592        pub db: Option<Arc<Database>>,
2593        pub connection: Option<sqlx::AnyConnection>,
2594    }
2595
2596    impl TestDb {
2597        pub fn sqlite(background: Arc<Background>) -> Self {
2598            let url = format!("sqlite::memory:");
2599            let runtime = tokio::runtime::Builder::new_current_thread()
2600                .enable_io()
2601                .enable_time()
2602                .build()
2603                .unwrap();
2604
2605            let mut db = runtime.block_on(async {
2606                let mut options = ConnectOptions::new(url);
2607                options.max_connections(5);
2608                let db = Database::new(options).await.unwrap();
2609                let sql = include_str!(concat!(
2610                    env!("CARGO_MANIFEST_DIR"),
2611                    "/migrations.sqlite/20221109000000_test_schema.sql"
2612                ));
2613                db.pool
2614                    .execute(sea_orm::Statement::from_string(
2615                        db.pool.get_database_backend(),
2616                        sql.into(),
2617                    ))
2618                    .await
2619                    .unwrap();
2620                db
2621            });
2622
2623            db.background = Some(background);
2624            db.runtime = Some(runtime);
2625
2626            Self {
2627                db: Some(Arc::new(db)),
2628                connection: None,
2629            }
2630        }
2631
2632        pub fn postgres(background: Arc<Background>) -> Self {
2633            lazy_static! {
2634                static ref LOCK: Mutex<()> = Mutex::new(());
2635            }
2636
2637            let _guard = LOCK.lock();
2638            let mut rng = StdRng::from_entropy();
2639            let url = format!(
2640                "postgres://postgres@localhost/zed-test-{}",
2641                rng.gen::<u128>()
2642            );
2643            let runtime = tokio::runtime::Builder::new_current_thread()
2644                .enable_io()
2645                .enable_time()
2646                .build()
2647                .unwrap();
2648
2649            let mut db = runtime.block_on(async {
2650                sqlx::Postgres::create_database(&url)
2651                    .await
2652                    .expect("failed to create test db");
2653                let mut options = ConnectOptions::new(url);
2654                options
2655                    .max_connections(5)
2656                    .idle_timeout(Duration::from_secs(0));
2657                let db = Database::new(options).await.unwrap();
2658                let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
2659                db.migrate(Path::new(migrations_path), false).await.unwrap();
2660                db
2661            });
2662
2663            db.background = Some(background);
2664            db.runtime = Some(runtime);
2665
2666            Self {
2667                db: Some(Arc::new(db)),
2668                connection: None,
2669            }
2670        }
2671
2672        pub fn db(&self) -> &Arc<Database> {
2673            self.db.as_ref().unwrap()
2674        }
2675    }
2676
2677    impl Drop for TestDb {
2678        fn drop(&mut self) {
2679            let db = self.db.take().unwrap();
2680            if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
2681                db.runtime.as_ref().unwrap().block_on(async {
2682                    use util::ResultExt;
2683                    let query = "
2684                        SELECT pg_terminate_backend(pg_stat_activity.pid)
2685                        FROM pg_stat_activity
2686                        WHERE
2687                            pg_stat_activity.datname = current_database() AND
2688                            pid <> pg_backend_pid();
2689                    ";
2690                    db.pool
2691                        .execute(sea_orm::Statement::from_string(
2692                            db.pool.get_database_backend(),
2693                            query.into(),
2694                        ))
2695                        .await
2696                        .log_err();
2697                    sqlx::Postgres::drop_database(db.options.get_url())
2698                        .await
2699                        .log_err();
2700                })
2701            }
2702        }
2703    }
2704}