db.rs

   1mod access_token;
   2mod contact;
   3mod language_server;
   4mod project;
   5mod project_collaborator;
   6mod room;
   7mod room_participant;
   8mod signup;
   9#[cfg(test)]
  10mod tests;
  11mod user;
  12mod worktree;
  13mod worktree_diagnostic_summary;
  14mod worktree_entry;
  15
  16use crate::{Error, Result};
  17use anyhow::anyhow;
  18use collections::{BTreeMap, HashMap, HashSet};
  19pub use contact::Contact;
  20use dashmap::DashMap;
  21use futures::StreamExt;
  22use hyper::StatusCode;
  23use rpc::{proto, ConnectionId};
  24use sea_orm::Condition;
  25pub use sea_orm::ConnectOptions;
  26use sea_orm::{
  27    entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
  28    DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
  29    Statement, TransactionTrait,
  30};
  31use sea_query::{Alias, Expr, OnConflict, Query};
  32use serde::{Deserialize, Serialize};
  33pub use signup::{Invite, NewSignup, WaitlistSummary};
  34use sqlx::migrate::{Migrate, Migration, MigrationSource};
  35use sqlx::Connection;
  36use std::ops::{Deref, DerefMut};
  37use std::path::Path;
  38use std::time::Duration;
  39use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
  40use tokio::sync::{Mutex, OwnedMutexGuard};
  41pub use user::Model as User;
  42
  43pub struct Database {
  44    options: ConnectOptions,
  45    pool: DatabaseConnection,
  46    rooms: DashMap<RoomId, Arc<Mutex<()>>>,
  47    #[cfg(test)]
  48    background: Option<std::sync::Arc<gpui::executor::Background>>,
  49    #[cfg(test)]
  50    runtime: Option<tokio::runtime::Runtime>,
  51    epoch: parking_lot::RwLock<Uuid>,
  52}
  53
  54impl Database {
  55    pub async fn new(options: ConnectOptions) -> Result<Self> {
  56        Ok(Self {
  57            options: options.clone(),
  58            pool: sea_orm::Database::connect(options).await?,
  59            rooms: DashMap::with_capacity(16384),
  60            #[cfg(test)]
  61            background: None,
  62            #[cfg(test)]
  63            runtime: None,
  64            epoch: parking_lot::RwLock::new(Uuid::new_v4()),
  65        })
  66    }
  67
  68    #[cfg(test)]
  69    pub fn reset(&self) {
  70        self.rooms.clear();
  71        *self.epoch.write() = Uuid::new_v4();
  72    }
  73
  74    fn epoch(&self) -> Uuid {
  75        *self.epoch.read()
  76    }
  77
  78    pub async fn migrate(
  79        &self,
  80        migrations_path: &Path,
  81        ignore_checksum_mismatch: bool,
  82    ) -> anyhow::Result<Vec<(Migration, Duration)>> {
  83        let migrations = MigrationSource::resolve(migrations_path)
  84            .await
  85            .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
  86
  87        let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
  88
  89        connection.ensure_migrations_table().await?;
  90        let applied_migrations: HashMap<_, _> = connection
  91            .list_applied_migrations()
  92            .await?
  93            .into_iter()
  94            .map(|m| (m.version, m))
  95            .collect();
  96
  97        let mut new_migrations = Vec::new();
  98        for migration in migrations {
  99            match applied_migrations.get(&migration.version) {
 100                Some(applied_migration) => {
 101                    if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
 102                    {
 103                        Err(anyhow!(
 104                            "checksum mismatch for applied migration {}",
 105                            migration.description
 106                        ))?;
 107                    }
 108                }
 109                None => {
 110                    let elapsed = connection.apply(&migration).await?;
 111                    new_migrations.push((migration, elapsed));
 112                }
 113            }
 114        }
 115
 116        Ok(new_migrations)
 117    }
 118
 119    pub async fn delete_stale_projects(&self) -> Result<()> {
 120        self.transaction(|tx| async move {
 121            project_collaborator::Entity::delete_many()
 122                .filter(project_collaborator::Column::ConnectionEpoch.ne(self.epoch()))
 123                .exec(&*tx)
 124                .await?;
 125            project::Entity::delete_many()
 126                .filter(project::Column::HostConnectionEpoch.ne(self.epoch()))
 127                .exec(&*tx)
 128                .await?;
 129            Ok(())
 130        })
 131        .await
 132    }
 133
 134    pub async fn delete_stale_rooms(&self) -> Result<()> {
 135        self.transaction(|tx| async move {
 136            room_participant::Entity::delete_many()
 137                .filter(
 138                    room_participant::Column::AnsweringConnectionEpoch
 139                        .ne(self.epoch())
 140                        .or(room_participant::Column::CallingConnectionEpoch.ne(self.epoch())),
 141                )
 142                .exec(&*tx)
 143                .await?;
 144            room::Entity::delete_many()
 145                .filter(
 146                    room::Column::Id.not_in_subquery(
 147                        Query::select()
 148                            .column(room_participant::Column::RoomId)
 149                            .from(room_participant::Entity)
 150                            .distinct()
 151                            .to_owned(),
 152                    ),
 153                )
 154                .exec(&*tx)
 155                .await?;
 156            Ok(())
 157        })
 158        .await
 159    }
 160
 161    // users
 162
 163    pub async fn create_user(
 164        &self,
 165        email_address: &str,
 166        admin: bool,
 167        params: NewUserParams,
 168    ) -> Result<NewUserResult> {
 169        self.transaction(|tx| async {
 170            let tx = tx;
 171            let user = user::Entity::insert(user::ActiveModel {
 172                email_address: ActiveValue::set(Some(email_address.into())),
 173                github_login: ActiveValue::set(params.github_login.clone()),
 174                github_user_id: ActiveValue::set(Some(params.github_user_id)),
 175                admin: ActiveValue::set(admin),
 176                metrics_id: ActiveValue::set(Uuid::new_v4()),
 177                ..Default::default()
 178            })
 179            .on_conflict(
 180                OnConflict::column(user::Column::GithubLogin)
 181                    .update_column(user::Column::GithubLogin)
 182                    .to_owned(),
 183            )
 184            .exec_with_returning(&*tx)
 185            .await?;
 186
 187            Ok(NewUserResult {
 188                user_id: user.id,
 189                metrics_id: user.metrics_id.to_string(),
 190                signup_device_id: None,
 191                inviting_user_id: None,
 192            })
 193        })
 194        .await
 195    }
 196
 197    pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
 198        self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
 199            .await
 200    }
 201
 202    pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
 203        self.transaction(|tx| async {
 204            let tx = tx;
 205            Ok(user::Entity::find()
 206                .filter(user::Column::Id.is_in(ids.iter().copied()))
 207                .all(&*tx)
 208                .await?)
 209        })
 210        .await
 211    }
 212
 213    pub async fn get_user_by_github_account(
 214        &self,
 215        github_login: &str,
 216        github_user_id: Option<i32>,
 217    ) -> Result<Option<User>> {
 218        self.transaction(|tx| async move {
 219            let tx = &*tx;
 220            if let Some(github_user_id) = github_user_id {
 221                if let Some(user_by_github_user_id) = user::Entity::find()
 222                    .filter(user::Column::GithubUserId.eq(github_user_id))
 223                    .one(tx)
 224                    .await?
 225                {
 226                    let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
 227                    user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
 228                    Ok(Some(user_by_github_user_id.update(tx).await?))
 229                } else if let Some(user_by_github_login) = user::Entity::find()
 230                    .filter(user::Column::GithubLogin.eq(github_login))
 231                    .one(tx)
 232                    .await?
 233                {
 234                    let mut user_by_github_login = user_by_github_login.into_active_model();
 235                    user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
 236                    Ok(Some(user_by_github_login.update(tx).await?))
 237                } else {
 238                    Ok(None)
 239                }
 240            } else {
 241                Ok(user::Entity::find()
 242                    .filter(user::Column::GithubLogin.eq(github_login))
 243                    .one(tx)
 244                    .await?)
 245            }
 246        })
 247        .await
 248    }
 249
 250    pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
 251        self.transaction(|tx| async move {
 252            Ok(user::Entity::find()
 253                .order_by_asc(user::Column::GithubLogin)
 254                .limit(limit as u64)
 255                .offset(page as u64 * limit as u64)
 256                .all(&*tx)
 257                .await?)
 258        })
 259        .await
 260    }
 261
 262    pub async fn get_users_with_no_invites(
 263        &self,
 264        invited_by_another_user: bool,
 265    ) -> Result<Vec<User>> {
 266        self.transaction(|tx| async move {
 267            Ok(user::Entity::find()
 268                .filter(
 269                    user::Column::InviteCount
 270                        .eq(0)
 271                        .and(if invited_by_another_user {
 272                            user::Column::InviterId.is_not_null()
 273                        } else {
 274                            user::Column::InviterId.is_null()
 275                        }),
 276                )
 277                .all(&*tx)
 278                .await?)
 279        })
 280        .await
 281    }
 282
 283    pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
 284        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 285        enum QueryAs {
 286            MetricsId,
 287        }
 288
 289        self.transaction(|tx| async move {
 290            let metrics_id: Uuid = user::Entity::find_by_id(id)
 291                .select_only()
 292                .column(user::Column::MetricsId)
 293                .into_values::<_, QueryAs>()
 294                .one(&*tx)
 295                .await?
 296                .ok_or_else(|| anyhow!("could not find user"))?;
 297            Ok(metrics_id.to_string())
 298        })
 299        .await
 300    }
 301
 302    pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
 303        self.transaction(|tx| async move {
 304            user::Entity::update_many()
 305                .filter(user::Column::Id.eq(id))
 306                .set(user::ActiveModel {
 307                    admin: ActiveValue::set(is_admin),
 308                    ..Default::default()
 309                })
 310                .exec(&*tx)
 311                .await?;
 312            Ok(())
 313        })
 314        .await
 315    }
 316
 317    pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
 318        self.transaction(|tx| async move {
 319            user::Entity::update_many()
 320                .filter(user::Column::Id.eq(id))
 321                .set(user::ActiveModel {
 322                    connected_once: ActiveValue::set(connected_once),
 323                    ..Default::default()
 324                })
 325                .exec(&*tx)
 326                .await?;
 327            Ok(())
 328        })
 329        .await
 330    }
 331
 332    pub async fn destroy_user(&self, id: UserId) -> Result<()> {
 333        self.transaction(|tx| async move {
 334            access_token::Entity::delete_many()
 335                .filter(access_token::Column::UserId.eq(id))
 336                .exec(&*tx)
 337                .await?;
 338            user::Entity::delete_by_id(id).exec(&*tx).await?;
 339            Ok(())
 340        })
 341        .await
 342    }
 343
 344    // contacts
 345
 346    pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
 347        #[derive(Debug, FromQueryResult)]
 348        struct ContactWithUserBusyStatuses {
 349            user_id_a: UserId,
 350            user_id_b: UserId,
 351            a_to_b: bool,
 352            accepted: bool,
 353            should_notify: bool,
 354            user_a_busy: bool,
 355            user_b_busy: bool,
 356        }
 357
 358        self.transaction(|tx| async move {
 359            let user_a_participant = Alias::new("user_a_participant");
 360            let user_b_participant = Alias::new("user_b_participant");
 361            let mut db_contacts = contact::Entity::find()
 362                .column_as(
 363                    Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
 364                        .is_not_null(),
 365                    "user_a_busy",
 366                )
 367                .column_as(
 368                    Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
 369                        .is_not_null(),
 370                    "user_b_busy",
 371                )
 372                .filter(
 373                    contact::Column::UserIdA
 374                        .eq(user_id)
 375                        .or(contact::Column::UserIdB.eq(user_id)),
 376                )
 377                .join_as(
 378                    JoinType::LeftJoin,
 379                    contact::Relation::UserARoomParticipant.def(),
 380                    user_a_participant,
 381                )
 382                .join_as(
 383                    JoinType::LeftJoin,
 384                    contact::Relation::UserBRoomParticipant.def(),
 385                    user_b_participant,
 386                )
 387                .into_model::<ContactWithUserBusyStatuses>()
 388                .stream(&*tx)
 389                .await?;
 390
 391            let mut contacts = Vec::new();
 392            while let Some(db_contact) = db_contacts.next().await {
 393                let db_contact = db_contact?;
 394                if db_contact.user_id_a == user_id {
 395                    if db_contact.accepted {
 396                        contacts.push(Contact::Accepted {
 397                            user_id: db_contact.user_id_b,
 398                            should_notify: db_contact.should_notify && db_contact.a_to_b,
 399                            busy: db_contact.user_b_busy,
 400                        });
 401                    } else if db_contact.a_to_b {
 402                        contacts.push(Contact::Outgoing {
 403                            user_id: db_contact.user_id_b,
 404                        })
 405                    } else {
 406                        contacts.push(Contact::Incoming {
 407                            user_id: db_contact.user_id_b,
 408                            should_notify: db_contact.should_notify,
 409                        });
 410                    }
 411                } else if db_contact.accepted {
 412                    contacts.push(Contact::Accepted {
 413                        user_id: db_contact.user_id_a,
 414                        should_notify: db_contact.should_notify && !db_contact.a_to_b,
 415                        busy: db_contact.user_a_busy,
 416                    });
 417                } else if db_contact.a_to_b {
 418                    contacts.push(Contact::Incoming {
 419                        user_id: db_contact.user_id_a,
 420                        should_notify: db_contact.should_notify,
 421                    });
 422                } else {
 423                    contacts.push(Contact::Outgoing {
 424                        user_id: db_contact.user_id_a,
 425                    });
 426                }
 427            }
 428
 429            contacts.sort_unstable_by_key(|contact| contact.user_id());
 430
 431            Ok(contacts)
 432        })
 433        .await
 434    }
 435
 436    pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
 437        self.transaction(|tx| async move {
 438            let participant = room_participant::Entity::find()
 439                .filter(room_participant::Column::UserId.eq(user_id))
 440                .one(&*tx)
 441                .await?;
 442            Ok(participant.is_some())
 443        })
 444        .await
 445    }
 446
 447    pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
 448        self.transaction(|tx| async move {
 449            let (id_a, id_b) = if user_id_1 < user_id_2 {
 450                (user_id_1, user_id_2)
 451            } else {
 452                (user_id_2, user_id_1)
 453            };
 454
 455            Ok(contact::Entity::find()
 456                .filter(
 457                    contact::Column::UserIdA
 458                        .eq(id_a)
 459                        .and(contact::Column::UserIdB.eq(id_b))
 460                        .and(contact::Column::Accepted.eq(true)),
 461                )
 462                .one(&*tx)
 463                .await?
 464                .is_some())
 465        })
 466        .await
 467    }
 468
 469    pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
 470        self.transaction(|tx| async move {
 471            let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
 472                (sender_id, receiver_id, true)
 473            } else {
 474                (receiver_id, sender_id, false)
 475            };
 476
 477            let rows_affected = contact::Entity::insert(contact::ActiveModel {
 478                user_id_a: ActiveValue::set(id_a),
 479                user_id_b: ActiveValue::set(id_b),
 480                a_to_b: ActiveValue::set(a_to_b),
 481                accepted: ActiveValue::set(false),
 482                should_notify: ActiveValue::set(true),
 483                ..Default::default()
 484            })
 485            .on_conflict(
 486                OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
 487                    .values([
 488                        (contact::Column::Accepted, true.into()),
 489                        (contact::Column::ShouldNotify, false.into()),
 490                    ])
 491                    .action_and_where(
 492                        contact::Column::Accepted.eq(false).and(
 493                            contact::Column::AToB
 494                                .eq(a_to_b)
 495                                .and(contact::Column::UserIdA.eq(id_b))
 496                                .or(contact::Column::AToB
 497                                    .ne(a_to_b)
 498                                    .and(contact::Column::UserIdA.eq(id_a))),
 499                        ),
 500                    )
 501                    .to_owned(),
 502            )
 503            .exec_without_returning(&*tx)
 504            .await?;
 505
 506            if rows_affected == 1 {
 507                Ok(())
 508            } else {
 509                Err(anyhow!("contact already requested"))?
 510            }
 511        })
 512        .await
 513    }
 514
 515    pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<()> {
 516        self.transaction(|tx| async move {
 517            let (id_a, id_b) = if responder_id < requester_id {
 518                (responder_id, requester_id)
 519            } else {
 520                (requester_id, responder_id)
 521            };
 522
 523            let result = contact::Entity::delete_many()
 524                .filter(
 525                    contact::Column::UserIdA
 526                        .eq(id_a)
 527                        .and(contact::Column::UserIdB.eq(id_b)),
 528                )
 529                .exec(&*tx)
 530                .await?;
 531
 532            if result.rows_affected == 1 {
 533                Ok(())
 534            } else {
 535                Err(anyhow!("no such contact"))?
 536            }
 537        })
 538        .await
 539    }
 540
 541    pub async fn dismiss_contact_notification(
 542        &self,
 543        user_id: UserId,
 544        contact_user_id: UserId,
 545    ) -> Result<()> {
 546        self.transaction(|tx| async move {
 547            let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
 548                (user_id, contact_user_id, true)
 549            } else {
 550                (contact_user_id, user_id, false)
 551            };
 552
 553            let result = contact::Entity::update_many()
 554                .set(contact::ActiveModel {
 555                    should_notify: ActiveValue::set(false),
 556                    ..Default::default()
 557                })
 558                .filter(
 559                    contact::Column::UserIdA
 560                        .eq(id_a)
 561                        .and(contact::Column::UserIdB.eq(id_b))
 562                        .and(
 563                            contact::Column::AToB
 564                                .eq(a_to_b)
 565                                .and(contact::Column::Accepted.eq(true))
 566                                .or(contact::Column::AToB
 567                                    .ne(a_to_b)
 568                                    .and(contact::Column::Accepted.eq(false))),
 569                        ),
 570                )
 571                .exec(&*tx)
 572                .await?;
 573            if result.rows_affected == 0 {
 574                Err(anyhow!("no such contact request"))?
 575            } else {
 576                Ok(())
 577            }
 578        })
 579        .await
 580    }
 581
 582    pub async fn respond_to_contact_request(
 583        &self,
 584        responder_id: UserId,
 585        requester_id: UserId,
 586        accept: bool,
 587    ) -> Result<()> {
 588        self.transaction(|tx| async move {
 589            let (id_a, id_b, a_to_b) = if responder_id < requester_id {
 590                (responder_id, requester_id, false)
 591            } else {
 592                (requester_id, responder_id, true)
 593            };
 594            let rows_affected = if accept {
 595                let result = contact::Entity::update_many()
 596                    .set(contact::ActiveModel {
 597                        accepted: ActiveValue::set(true),
 598                        should_notify: ActiveValue::set(true),
 599                        ..Default::default()
 600                    })
 601                    .filter(
 602                        contact::Column::UserIdA
 603                            .eq(id_a)
 604                            .and(contact::Column::UserIdB.eq(id_b))
 605                            .and(contact::Column::AToB.eq(a_to_b)),
 606                    )
 607                    .exec(&*tx)
 608                    .await?;
 609                result.rows_affected
 610            } else {
 611                let result = contact::Entity::delete_many()
 612                    .filter(
 613                        contact::Column::UserIdA
 614                            .eq(id_a)
 615                            .and(contact::Column::UserIdB.eq(id_b))
 616                            .and(contact::Column::AToB.eq(a_to_b))
 617                            .and(contact::Column::Accepted.eq(false)),
 618                    )
 619                    .exec(&*tx)
 620                    .await?;
 621
 622                result.rows_affected
 623            };
 624
 625            if rows_affected == 1 {
 626                Ok(())
 627            } else {
 628                Err(anyhow!("no such contact request"))?
 629            }
 630        })
 631        .await
 632    }
 633
 634    pub fn fuzzy_like_string(string: &str) -> String {
 635        let mut result = String::with_capacity(string.len() * 2 + 1);
 636        for c in string.chars() {
 637            if c.is_alphanumeric() {
 638                result.push('%');
 639                result.push(c);
 640            }
 641        }
 642        result.push('%');
 643        result
 644    }
 645
 646    pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
 647        self.transaction(|tx| async {
 648            let tx = tx;
 649            let like_string = Self::fuzzy_like_string(name_query);
 650            let query = "
 651                SELECT users.*
 652                FROM users
 653                WHERE github_login ILIKE $1
 654                ORDER BY github_login <-> $2
 655                LIMIT $3
 656            ";
 657
 658            Ok(user::Entity::find()
 659                .from_raw_sql(Statement::from_sql_and_values(
 660                    self.pool.get_database_backend(),
 661                    query.into(),
 662                    vec![like_string.into(), name_query.into(), limit.into()],
 663                ))
 664                .all(&*tx)
 665                .await?)
 666        })
 667        .await
 668    }
 669
 670    // signups
 671
 672    pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
 673        self.transaction(|tx| async move {
 674            signup::Entity::insert(signup::ActiveModel {
 675                email_address: ActiveValue::set(signup.email_address.clone()),
 676                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 677                email_confirmation_sent: ActiveValue::set(false),
 678                platform_mac: ActiveValue::set(signup.platform_mac),
 679                platform_windows: ActiveValue::set(signup.platform_windows),
 680                platform_linux: ActiveValue::set(signup.platform_linux),
 681                platform_unknown: ActiveValue::set(false),
 682                editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
 683                programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
 684                device_id: ActiveValue::set(signup.device_id.clone()),
 685                added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
 686                ..Default::default()
 687            })
 688            .on_conflict(
 689                OnConflict::column(signup::Column::EmailAddress)
 690                    .update_columns([
 691                        signup::Column::PlatformMac,
 692                        signup::Column::PlatformWindows,
 693                        signup::Column::PlatformLinux,
 694                        signup::Column::EditorFeatures,
 695                        signup::Column::ProgrammingLanguages,
 696                        signup::Column::DeviceId,
 697                        signup::Column::AddedToMailingList,
 698                    ])
 699                    .to_owned(),
 700            )
 701            .exec(&*tx)
 702            .await?;
 703            Ok(())
 704        })
 705        .await
 706    }
 707
 708    pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
 709        self.transaction(|tx| async move {
 710            let signup = signup::Entity::find()
 711                .filter(signup::Column::EmailAddress.eq(email_address))
 712                .one(&*tx)
 713                .await?
 714                .ok_or_else(|| {
 715                    anyhow!("signup with email address {} doesn't exist", email_address)
 716                })?;
 717
 718            Ok(signup)
 719        })
 720        .await
 721    }
 722
 723    pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
 724        self.transaction(|tx| async move {
 725            let query = "
 726                SELECT
 727                    COUNT(*) as count,
 728                    COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
 729                    COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
 730                    COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
 731                    COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
 732                FROM (
 733                    SELECT *
 734                    FROM signups
 735                    WHERE
 736                        NOT email_confirmation_sent
 737                ) AS unsent
 738            ";
 739            Ok(
 740                WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
 741                    self.pool.get_database_backend(),
 742                    query.into(),
 743                    vec![],
 744                ))
 745                .one(&*tx)
 746                .await?
 747                .ok_or_else(|| anyhow!("invalid result"))?,
 748            )
 749        })
 750        .await
 751    }
 752
 753    pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
 754        let emails = invites
 755            .iter()
 756            .map(|s| s.email_address.as_str())
 757            .collect::<Vec<_>>();
 758        self.transaction(|tx| async {
 759            let tx = tx;
 760            signup::Entity::update_many()
 761                .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
 762                .set(signup::ActiveModel {
 763                    email_confirmation_sent: ActiveValue::set(true),
 764                    ..Default::default()
 765                })
 766                .exec(&*tx)
 767                .await?;
 768            Ok(())
 769        })
 770        .await
 771    }
 772
 773    pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
 774        self.transaction(|tx| async move {
 775            Ok(signup::Entity::find()
 776                .select_only()
 777                .column(signup::Column::EmailAddress)
 778                .column(signup::Column::EmailConfirmationCode)
 779                .filter(
 780                    signup::Column::EmailConfirmationSent.eq(false).and(
 781                        signup::Column::PlatformMac
 782                            .eq(true)
 783                            .or(signup::Column::PlatformUnknown.eq(true)),
 784                    ),
 785                )
 786                .order_by_asc(signup::Column::CreatedAt)
 787                .limit(count as u64)
 788                .into_model()
 789                .all(&*tx)
 790                .await?)
 791        })
 792        .await
 793    }
 794
 795    // invite codes
 796
 797    pub async fn create_invite_from_code(
 798        &self,
 799        code: &str,
 800        email_address: &str,
 801        device_id: Option<&str>,
 802    ) -> Result<Invite> {
 803        self.transaction(|tx| async move {
 804            let existing_user = user::Entity::find()
 805                .filter(user::Column::EmailAddress.eq(email_address))
 806                .one(&*tx)
 807                .await?;
 808
 809            if existing_user.is_some() {
 810                Err(anyhow!("email address is already in use"))?;
 811            }
 812
 813            let inviting_user_with_invites = match user::Entity::find()
 814                .filter(
 815                    user::Column::InviteCode
 816                        .eq(code)
 817                        .and(user::Column::InviteCount.gt(0)),
 818                )
 819                .one(&*tx)
 820                .await?
 821            {
 822                Some(inviting_user) => inviting_user,
 823                None => {
 824                    return Err(Error::Http(
 825                        StatusCode::UNAUTHORIZED,
 826                        "unable to find an invite code with invites remaining".to_string(),
 827                    ))?
 828                }
 829            };
 830            user::Entity::update_many()
 831                .filter(
 832                    user::Column::Id
 833                        .eq(inviting_user_with_invites.id)
 834                        .and(user::Column::InviteCount.gt(0)),
 835                )
 836                .col_expr(
 837                    user::Column::InviteCount,
 838                    Expr::col(user::Column::InviteCount).sub(1),
 839                )
 840                .exec(&*tx)
 841                .await?;
 842
 843            let signup = signup::Entity::insert(signup::ActiveModel {
 844                email_address: ActiveValue::set(email_address.into()),
 845                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 846                email_confirmation_sent: ActiveValue::set(false),
 847                inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
 848                platform_linux: ActiveValue::set(false),
 849                platform_mac: ActiveValue::set(false),
 850                platform_windows: ActiveValue::set(false),
 851                platform_unknown: ActiveValue::set(true),
 852                device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
 853                ..Default::default()
 854            })
 855            .on_conflict(
 856                OnConflict::column(signup::Column::EmailAddress)
 857                    .update_column(signup::Column::InvitingUserId)
 858                    .to_owned(),
 859            )
 860            .exec_with_returning(&*tx)
 861            .await?;
 862
 863            Ok(Invite {
 864                email_address: signup.email_address,
 865                email_confirmation_code: signup.email_confirmation_code,
 866            })
 867        })
 868        .await
 869    }
 870
 871    pub async fn create_user_from_invite(
 872        &self,
 873        invite: &Invite,
 874        user: NewUserParams,
 875    ) -> Result<Option<NewUserResult>> {
 876        self.transaction(|tx| async {
 877            let tx = tx;
 878            let signup = signup::Entity::find()
 879                .filter(
 880                    signup::Column::EmailAddress
 881                        .eq(invite.email_address.as_str())
 882                        .and(
 883                            signup::Column::EmailConfirmationCode
 884                                .eq(invite.email_confirmation_code.as_str()),
 885                        ),
 886                )
 887                .one(&*tx)
 888                .await?
 889                .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
 890
 891            if signup.user_id.is_some() {
 892                return Ok(None);
 893            }
 894
 895            let user = user::Entity::insert(user::ActiveModel {
 896                email_address: ActiveValue::set(Some(invite.email_address.clone())),
 897                github_login: ActiveValue::set(user.github_login.clone()),
 898                github_user_id: ActiveValue::set(Some(user.github_user_id)),
 899                admin: ActiveValue::set(false),
 900                invite_count: ActiveValue::set(user.invite_count),
 901                invite_code: ActiveValue::set(Some(random_invite_code())),
 902                metrics_id: ActiveValue::set(Uuid::new_v4()),
 903                ..Default::default()
 904            })
 905            .on_conflict(
 906                OnConflict::column(user::Column::GithubLogin)
 907                    .update_columns([
 908                        user::Column::EmailAddress,
 909                        user::Column::GithubUserId,
 910                        user::Column::Admin,
 911                    ])
 912                    .to_owned(),
 913            )
 914            .exec_with_returning(&*tx)
 915            .await?;
 916
 917            let mut signup = signup.into_active_model();
 918            signup.user_id = ActiveValue::set(Some(user.id));
 919            let signup = signup.update(&*tx).await?;
 920
 921            if let Some(inviting_user_id) = signup.inviting_user_id {
 922                let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
 923                    (inviting_user_id, user.id, true)
 924                } else {
 925                    (user.id, inviting_user_id, false)
 926                };
 927
 928                contact::Entity::insert(contact::ActiveModel {
 929                    user_id_a: ActiveValue::set(user_id_a),
 930                    user_id_b: ActiveValue::set(user_id_b),
 931                    a_to_b: ActiveValue::set(a_to_b),
 932                    should_notify: ActiveValue::set(true),
 933                    accepted: ActiveValue::set(true),
 934                    ..Default::default()
 935                })
 936                .on_conflict(OnConflict::new().do_nothing().to_owned())
 937                .exec_without_returning(&*tx)
 938                .await?;
 939            }
 940
 941            Ok(Some(NewUserResult {
 942                user_id: user.id,
 943                metrics_id: user.metrics_id.to_string(),
 944                inviting_user_id: signup.inviting_user_id,
 945                signup_device_id: signup.device_id,
 946            }))
 947        })
 948        .await
 949    }
 950
 951    pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
 952        self.transaction(|tx| async move {
 953            if count > 0 {
 954                user::Entity::update_many()
 955                    .filter(
 956                        user::Column::Id
 957                            .eq(id)
 958                            .and(user::Column::InviteCode.is_null()),
 959                    )
 960                    .set(user::ActiveModel {
 961                        invite_code: ActiveValue::set(Some(random_invite_code())),
 962                        ..Default::default()
 963                    })
 964                    .exec(&*tx)
 965                    .await?;
 966            }
 967
 968            user::Entity::update_many()
 969                .filter(user::Column::Id.eq(id))
 970                .set(user::ActiveModel {
 971                    invite_count: ActiveValue::set(count),
 972                    ..Default::default()
 973                })
 974                .exec(&*tx)
 975                .await?;
 976            Ok(())
 977        })
 978        .await
 979    }
 980
 981    pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
 982        self.transaction(|tx| async move {
 983            match user::Entity::find_by_id(id).one(&*tx).await? {
 984                Some(user) if user.invite_code.is_some() => {
 985                    Ok(Some((user.invite_code.unwrap(), user.invite_count)))
 986                }
 987                _ => Ok(None),
 988            }
 989        })
 990        .await
 991    }
 992
 993    pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
 994        self.transaction(|tx| async move {
 995            user::Entity::find()
 996                .filter(user::Column::InviteCode.eq(code))
 997                .one(&*tx)
 998                .await?
 999                .ok_or_else(|| {
1000                    Error::Http(
1001                        StatusCode::NOT_FOUND,
1002                        "that invite code does not exist".to_string(),
1003                    )
1004                })
1005        })
1006        .await
1007    }
1008
1009    // rooms
1010
1011    pub async fn incoming_call_for_user(
1012        &self,
1013        user_id: UserId,
1014    ) -> Result<Option<proto::IncomingCall>> {
1015        self.transaction(|tx| async move {
1016            let pending_participant = room_participant::Entity::find()
1017                .filter(
1018                    room_participant::Column::UserId
1019                        .eq(user_id)
1020                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1021                )
1022                .one(&*tx)
1023                .await?;
1024
1025            if let Some(pending_participant) = pending_participant {
1026                let room = self.get_room(pending_participant.room_id, &tx).await?;
1027                Ok(Self::build_incoming_call(&room, user_id))
1028            } else {
1029                Ok(None)
1030            }
1031        })
1032        .await
1033    }
1034
1035    pub async fn create_room(
1036        &self,
1037        user_id: UserId,
1038        connection_id: ConnectionId,
1039        live_kit_room: &str,
1040    ) -> Result<RoomGuard<proto::Room>> {
1041        self.room_transaction(|tx| async move {
1042            let room = room::ActiveModel {
1043                live_kit_room: ActiveValue::set(live_kit_room.into()),
1044                ..Default::default()
1045            }
1046            .insert(&*tx)
1047            .await?;
1048            let room_id = room.id;
1049
1050            room_participant::ActiveModel {
1051                room_id: ActiveValue::set(room_id),
1052                user_id: ActiveValue::set(user_id),
1053                answering_connection_id: ActiveValue::set(Some(connection_id.0 as i32)),
1054                answering_connection_epoch: ActiveValue::set(Some(self.epoch())),
1055                answering_connection_lost: ActiveValue::set(false),
1056                calling_user_id: ActiveValue::set(user_id),
1057                calling_connection_id: ActiveValue::set(connection_id.0 as i32),
1058                calling_connection_epoch: ActiveValue::set(self.epoch()),
1059                ..Default::default()
1060            }
1061            .insert(&*tx)
1062            .await?;
1063
1064            let room = self.get_room(room_id, &tx).await?;
1065            Ok((room_id, room))
1066        })
1067        .await
1068    }
1069
1070    pub async fn call(
1071        &self,
1072        room_id: RoomId,
1073        calling_user_id: UserId,
1074        calling_connection_id: ConnectionId,
1075        called_user_id: UserId,
1076        initial_project_id: Option<ProjectId>,
1077    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1078        self.room_transaction(|tx| async move {
1079            room_participant::ActiveModel {
1080                room_id: ActiveValue::set(room_id),
1081                user_id: ActiveValue::set(called_user_id),
1082                answering_connection_lost: ActiveValue::set(false),
1083                calling_user_id: ActiveValue::set(calling_user_id),
1084                calling_connection_id: ActiveValue::set(calling_connection_id.0 as i32),
1085                calling_connection_epoch: ActiveValue::set(self.epoch()),
1086                initial_project_id: ActiveValue::set(initial_project_id),
1087                ..Default::default()
1088            }
1089            .insert(&*tx)
1090            .await?;
1091
1092            let room = self.get_room(room_id, &tx).await?;
1093            let incoming_call = Self::build_incoming_call(&room, called_user_id)
1094                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1095            Ok((room_id, (room, incoming_call)))
1096        })
1097        .await
1098    }
1099
1100    pub async fn call_failed(
1101        &self,
1102        room_id: RoomId,
1103        called_user_id: UserId,
1104    ) -> Result<RoomGuard<proto::Room>> {
1105        self.room_transaction(|tx| async move {
1106            room_participant::Entity::delete_many()
1107                .filter(
1108                    room_participant::Column::RoomId
1109                        .eq(room_id)
1110                        .and(room_participant::Column::UserId.eq(called_user_id)),
1111                )
1112                .exec(&*tx)
1113                .await?;
1114            let room = self.get_room(room_id, &tx).await?;
1115            Ok((room_id, room))
1116        })
1117        .await
1118    }
1119
1120    pub async fn decline_call(
1121        &self,
1122        expected_room_id: Option<RoomId>,
1123        user_id: UserId,
1124    ) -> Result<RoomGuard<proto::Room>> {
1125        self.room_transaction(|tx| async move {
1126            let participant = room_participant::Entity::find()
1127                .filter(
1128                    room_participant::Column::UserId
1129                        .eq(user_id)
1130                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1131                )
1132                .one(&*tx)
1133                .await?
1134                .ok_or_else(|| anyhow!("could not decline call"))?;
1135            let room_id = participant.room_id;
1136
1137            if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) {
1138                return Err(anyhow!("declining call on unexpected room"))?;
1139            }
1140
1141            room_participant::Entity::delete(participant.into_active_model())
1142                .exec(&*tx)
1143                .await?;
1144
1145            let room = self.get_room(room_id, &tx).await?;
1146            Ok((room_id, room))
1147        })
1148        .await
1149    }
1150
1151    pub async fn cancel_call(
1152        &self,
1153        expected_room_id: Option<RoomId>,
1154        calling_connection_id: ConnectionId,
1155        called_user_id: UserId,
1156    ) -> Result<RoomGuard<proto::Room>> {
1157        self.room_transaction(|tx| async move {
1158            let participant = room_participant::Entity::find()
1159                .filter(
1160                    room_participant::Column::UserId
1161                        .eq(called_user_id)
1162                        .and(
1163                            room_participant::Column::CallingConnectionId
1164                                .eq(calling_connection_id.0 as i32),
1165                        )
1166                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1167                )
1168                .one(&*tx)
1169                .await?
1170                .ok_or_else(|| anyhow!("could not cancel call"))?;
1171            let room_id = participant.room_id;
1172            if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) {
1173                return Err(anyhow!("canceling call on unexpected room"))?;
1174            }
1175
1176            room_participant::Entity::delete(participant.into_active_model())
1177                .exec(&*tx)
1178                .await?;
1179
1180            let room = self.get_room(room_id, &tx).await?;
1181            Ok((room_id, room))
1182        })
1183        .await
1184    }
1185
1186    pub async fn join_room(
1187        &self,
1188        room_id: RoomId,
1189        user_id: UserId,
1190        connection_id: ConnectionId,
1191    ) -> Result<RoomGuard<proto::Room>> {
1192        self.room_transaction(|tx| async move {
1193            let result = room_participant::Entity::update_many()
1194                .filter(
1195                    Condition::all()
1196                        .add(room_participant::Column::RoomId.eq(room_id))
1197                        .add(room_participant::Column::UserId.eq(user_id))
1198                        .add(
1199                            Condition::any()
1200                                .add(room_participant::Column::AnsweringConnectionId.is_null())
1201                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1202                                .add(
1203                                    room_participant::Column::AnsweringConnectionEpoch
1204                                        .ne(self.epoch()),
1205                                ),
1206                        ),
1207                )
1208                .set(room_participant::ActiveModel {
1209                    answering_connection_id: ActiveValue::set(Some(connection_id.0 as i32)),
1210                    answering_connection_epoch: ActiveValue::set(Some(self.epoch())),
1211                    answering_connection_lost: ActiveValue::set(false),
1212                    ..Default::default()
1213                })
1214                .exec(&*tx)
1215                .await?;
1216            if result.rows_affected == 0 {
1217                Err(anyhow!("room does not exist or was already joined"))?
1218            } else {
1219                let room = self.get_room(room_id, &tx).await?;
1220                Ok((room_id, room))
1221            }
1222        })
1223        .await
1224    }
1225
1226    pub async fn leave_room(&self, connection_id: ConnectionId) -> Result<RoomGuard<LeftRoom>> {
1227        self.room_transaction(|tx| async move {
1228            let leaving_participant = room_participant::Entity::find()
1229                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32))
1230                .one(&*tx)
1231                .await?;
1232
1233            if let Some(leaving_participant) = leaving_participant {
1234                // Leave room.
1235                let room_id = leaving_participant.room_id;
1236                room_participant::Entity::delete_by_id(leaving_participant.id)
1237                    .exec(&*tx)
1238                    .await?;
1239
1240                // Cancel pending calls initiated by the leaving user.
1241                let called_participants = room_participant::Entity::find()
1242                    .filter(
1243                        room_participant::Column::CallingConnectionId
1244                            .eq(connection_id.0)
1245                            .and(room_participant::Column::AnsweringConnectionId.is_null()),
1246                    )
1247                    .all(&*tx)
1248                    .await?;
1249                room_participant::Entity::delete_many()
1250                    .filter(
1251                        room_participant::Column::Id
1252                            .is_in(called_participants.iter().map(|participant| participant.id)),
1253                    )
1254                    .exec(&*tx)
1255                    .await?;
1256                let canceled_calls_to_user_ids = called_participants
1257                    .into_iter()
1258                    .map(|participant| participant.user_id)
1259                    .collect();
1260
1261                // Detect left projects.
1262                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1263                enum QueryProjectIds {
1264                    ProjectId,
1265                }
1266                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1267                    .select_only()
1268                    .column_as(
1269                        project_collaborator::Column::ProjectId,
1270                        QueryProjectIds::ProjectId,
1271                    )
1272                    .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32))
1273                    .into_values::<_, QueryProjectIds>()
1274                    .all(&*tx)
1275                    .await?;
1276                let mut left_projects = HashMap::default();
1277                let mut collaborators = project_collaborator::Entity::find()
1278                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1279                    .stream(&*tx)
1280                    .await?;
1281                while let Some(collaborator) = collaborators.next().await {
1282                    let collaborator = collaborator?;
1283                    let left_project =
1284                        left_projects
1285                            .entry(collaborator.project_id)
1286                            .or_insert(LeftProject {
1287                                id: collaborator.project_id,
1288                                host_user_id: Default::default(),
1289                                connection_ids: Default::default(),
1290                                host_connection_id: Default::default(),
1291                            });
1292
1293                    let collaborator_connection_id =
1294                        ConnectionId(collaborator.connection_id as u32);
1295                    if collaborator_connection_id != connection_id {
1296                        left_project.connection_ids.push(collaborator_connection_id);
1297                    }
1298
1299                    if collaborator.is_host {
1300                        left_project.host_user_id = collaborator.user_id;
1301                        left_project.host_connection_id =
1302                            ConnectionId(collaborator.connection_id as u32);
1303                    }
1304                }
1305                drop(collaborators);
1306
1307                // Leave projects.
1308                project_collaborator::Entity::delete_many()
1309                    .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32))
1310                    .exec(&*tx)
1311                    .await?;
1312
1313                // Unshare projects.
1314                project::Entity::delete_many()
1315                    .filter(
1316                        project::Column::RoomId
1317                            .eq(room_id)
1318                            .and(project::Column::HostConnectionId.eq(connection_id.0 as i32)),
1319                    )
1320                    .exec(&*tx)
1321                    .await?;
1322
1323                let room = self.get_room(room_id, &tx).await?;
1324                if room.participants.is_empty() {
1325                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
1326                }
1327
1328                let left_room = LeftRoom {
1329                    room,
1330                    left_projects,
1331                    canceled_calls_to_user_ids,
1332                };
1333
1334                if left_room.room.participants.is_empty() {
1335                    self.rooms.remove(&room_id);
1336                }
1337
1338                Ok((room_id, left_room))
1339            } else {
1340                Err(anyhow!("could not leave room"))?
1341            }
1342        })
1343        .await
1344    }
1345
1346    pub async fn update_room_participant_location(
1347        &self,
1348        room_id: RoomId,
1349        connection_id: ConnectionId,
1350        location: proto::ParticipantLocation,
1351    ) -> Result<RoomGuard<proto::Room>> {
1352        self.room_transaction(|tx| async {
1353            let tx = tx;
1354            let location_kind;
1355            let location_project_id;
1356            match location
1357                .variant
1358                .as_ref()
1359                .ok_or_else(|| anyhow!("invalid location"))?
1360            {
1361                proto::participant_location::Variant::SharedProject(project) => {
1362                    location_kind = 0;
1363                    location_project_id = Some(ProjectId::from_proto(project.id));
1364                }
1365                proto::participant_location::Variant::UnsharedProject(_) => {
1366                    location_kind = 1;
1367                    location_project_id = None;
1368                }
1369                proto::participant_location::Variant::External(_) => {
1370                    location_kind = 2;
1371                    location_project_id = None;
1372                }
1373            }
1374
1375            let result = room_participant::Entity::update_many()
1376                .filter(room_participant::Column::RoomId.eq(room_id).and(
1377                    room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32),
1378                ))
1379                .set(room_participant::ActiveModel {
1380                    location_kind: ActiveValue::set(Some(location_kind)),
1381                    location_project_id: ActiveValue::set(location_project_id),
1382                    ..Default::default()
1383                })
1384                .exec(&*tx)
1385                .await?;
1386
1387            if result.rows_affected == 1 {
1388                let room = self.get_room(room_id, &tx).await?;
1389                Ok((room_id, room))
1390            } else {
1391                Err(anyhow!("could not update room participant location"))?
1392            }
1393        })
1394        .await
1395    }
1396
1397    pub async fn connection_lost(
1398        &self,
1399        connection_id: ConnectionId,
1400    ) -> Result<RoomGuard<Vec<LeftProject>>> {
1401        self.room_transaction(|tx| async move {
1402            let participant = room_participant::Entity::find()
1403                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32))
1404                .one(&*tx)
1405                .await?
1406                .ok_or_else(|| anyhow!("not a participant in any room"))?;
1407            let room_id = participant.room_id;
1408
1409            room_participant::Entity::update(room_participant::ActiveModel {
1410                answering_connection_lost: ActiveValue::set(true),
1411                ..participant.into_active_model()
1412            })
1413            .exec(&*tx)
1414            .await?;
1415
1416            let collaborator_on_projects = project_collaborator::Entity::find()
1417                .find_also_related(project::Entity)
1418                .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32))
1419                .all(&*tx)
1420                .await?;
1421            project_collaborator::Entity::delete_many()
1422                .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32))
1423                .exec(&*tx)
1424                .await?;
1425
1426            let mut left_projects = Vec::new();
1427            for (_, project) in collaborator_on_projects {
1428                if let Some(project) = project {
1429                    let collaborators = project
1430                        .find_related(project_collaborator::Entity)
1431                        .all(&*tx)
1432                        .await?;
1433                    let connection_ids = collaborators
1434                        .into_iter()
1435                        .map(|collaborator| ConnectionId(collaborator.connection_id as u32))
1436                        .collect();
1437
1438                    left_projects.push(LeftProject {
1439                        id: project.id,
1440                        host_user_id: project.host_user_id,
1441                        host_connection_id: ConnectionId(project.host_connection_id as u32),
1442                        connection_ids,
1443                    });
1444                }
1445            }
1446
1447            project::Entity::delete_many()
1448                .filter(project::Column::HostConnectionId.eq(connection_id.0 as i32))
1449                .exec(&*tx)
1450                .await?;
1451
1452            Ok((room_id, left_projects))
1453        })
1454        .await
1455    }
1456
1457    fn build_incoming_call(
1458        room: &proto::Room,
1459        called_user_id: UserId,
1460    ) -> Option<proto::IncomingCall> {
1461        let pending_participant = room
1462            .pending_participants
1463            .iter()
1464            .find(|participant| participant.user_id == called_user_id.to_proto())?;
1465
1466        Some(proto::IncomingCall {
1467            room_id: room.id,
1468            calling_user_id: pending_participant.calling_user_id,
1469            participant_user_ids: room
1470                .participants
1471                .iter()
1472                .map(|participant| participant.user_id)
1473                .collect(),
1474            initial_project: room.participants.iter().find_map(|participant| {
1475                let initial_project_id = pending_participant.initial_project_id?;
1476                participant
1477                    .projects
1478                    .iter()
1479                    .find(|project| project.id == initial_project_id)
1480                    .cloned()
1481            }),
1482        })
1483    }
1484
1485    async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1486        let db_room = room::Entity::find_by_id(room_id)
1487            .one(tx)
1488            .await?
1489            .ok_or_else(|| anyhow!("could not find room"))?;
1490
1491        let mut db_participants = db_room
1492            .find_related(room_participant::Entity)
1493            .stream(tx)
1494            .await?;
1495        let mut participants = HashMap::default();
1496        let mut pending_participants = Vec::new();
1497        while let Some(db_participant) = db_participants.next().await {
1498            let db_participant = db_participant?;
1499            if let Some(answering_connection_id) = db_participant.answering_connection_id {
1500                let location = match (
1501                    db_participant.location_kind,
1502                    db_participant.location_project_id,
1503                ) {
1504                    (Some(0), Some(project_id)) => {
1505                        Some(proto::participant_location::Variant::SharedProject(
1506                            proto::participant_location::SharedProject {
1507                                id: project_id.to_proto(),
1508                            },
1509                        ))
1510                    }
1511                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1512                        Default::default(),
1513                    )),
1514                    _ => Some(proto::participant_location::Variant::External(
1515                        Default::default(),
1516                    )),
1517                };
1518                participants.insert(
1519                    answering_connection_id,
1520                    proto::Participant {
1521                        user_id: db_participant.user_id.to_proto(),
1522                        peer_id: answering_connection_id as u32,
1523                        projects: Default::default(),
1524                        location: Some(proto::ParticipantLocation { variant: location }),
1525                    },
1526                );
1527            } else {
1528                pending_participants.push(proto::PendingParticipant {
1529                    user_id: db_participant.user_id.to_proto(),
1530                    calling_user_id: db_participant.calling_user_id.to_proto(),
1531                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1532                });
1533            }
1534        }
1535        drop(db_participants);
1536
1537        let mut db_projects = db_room
1538            .find_related(project::Entity)
1539            .find_with_related(worktree::Entity)
1540            .stream(tx)
1541            .await?;
1542
1543        while let Some(row) = db_projects.next().await {
1544            let (db_project, db_worktree) = row?;
1545            if let Some(participant) = participants.get_mut(&db_project.host_connection_id) {
1546                let project = if let Some(project) = participant
1547                    .projects
1548                    .iter_mut()
1549                    .find(|project| project.id == db_project.id.to_proto())
1550                {
1551                    project
1552                } else {
1553                    participant.projects.push(proto::ParticipantProject {
1554                        id: db_project.id.to_proto(),
1555                        worktree_root_names: Default::default(),
1556                    });
1557                    participant.projects.last_mut().unwrap()
1558                };
1559
1560                if let Some(db_worktree) = db_worktree {
1561                    project.worktree_root_names.push(db_worktree.root_name);
1562                }
1563            }
1564        }
1565
1566        Ok(proto::Room {
1567            id: db_room.id.to_proto(),
1568            live_kit_room: db_room.live_kit_room,
1569            participants: participants.into_values().collect(),
1570            pending_participants,
1571        })
1572    }
1573
1574    // projects
1575
1576    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
1577        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1578        enum QueryAs {
1579            Count,
1580        }
1581
1582        self.transaction(|tx| async move {
1583            Ok(project::Entity::find()
1584                .select_only()
1585                .column_as(project::Column::Id.count(), QueryAs::Count)
1586                .inner_join(user::Entity)
1587                .filter(user::Column::Admin.eq(false))
1588                .into_values::<_, QueryAs>()
1589                .one(&*tx)
1590                .await?
1591                .unwrap_or(0i64) as usize)
1592        })
1593        .await
1594    }
1595
1596    pub async fn share_project(
1597        &self,
1598        room_id: RoomId,
1599        connection_id: ConnectionId,
1600        worktrees: &[proto::WorktreeMetadata],
1601    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
1602        self.room_transaction(|tx| async move {
1603            let participant = room_participant::Entity::find()
1604                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32))
1605                .one(&*tx)
1606                .await?
1607                .ok_or_else(|| anyhow!("could not find participant"))?;
1608            if participant.room_id != room_id {
1609                return Err(anyhow!("shared project on unexpected room"))?;
1610            }
1611
1612            let project = project::ActiveModel {
1613                room_id: ActiveValue::set(participant.room_id),
1614                host_user_id: ActiveValue::set(participant.user_id),
1615                host_connection_id: ActiveValue::set(connection_id.0 as i32),
1616                host_connection_epoch: ActiveValue::set(self.epoch()),
1617                ..Default::default()
1618            }
1619            .insert(&*tx)
1620            .await?;
1621
1622            if !worktrees.is_empty() {
1623                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
1624                    worktree::ActiveModel {
1625                        id: ActiveValue::set(worktree.id as i64),
1626                        project_id: ActiveValue::set(project.id),
1627                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
1628                        root_name: ActiveValue::set(worktree.root_name.clone()),
1629                        visible: ActiveValue::set(worktree.visible),
1630                        scan_id: ActiveValue::set(0),
1631                        is_complete: ActiveValue::set(false),
1632                    }
1633                }))
1634                .exec(&*tx)
1635                .await?;
1636            }
1637
1638            project_collaborator::ActiveModel {
1639                project_id: ActiveValue::set(project.id),
1640                connection_id: ActiveValue::set(connection_id.0 as i32),
1641                connection_epoch: ActiveValue::set(self.epoch()),
1642                user_id: ActiveValue::set(participant.user_id),
1643                replica_id: ActiveValue::set(ReplicaId(0)),
1644                is_host: ActiveValue::set(true),
1645                ..Default::default()
1646            }
1647            .insert(&*tx)
1648            .await?;
1649
1650            let room = self.get_room(room_id, &tx).await?;
1651            Ok((room_id, (project.id, room)))
1652        })
1653        .await
1654    }
1655
1656    pub async fn unshare_project(
1657        &self,
1658        project_id: ProjectId,
1659        connection_id: ConnectionId,
1660    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
1661        self.room_transaction(|tx| async move {
1662            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
1663
1664            let project = project::Entity::find_by_id(project_id)
1665                .one(&*tx)
1666                .await?
1667                .ok_or_else(|| anyhow!("project not found"))?;
1668            if project.host_connection_id == connection_id.0 as i32 {
1669                let room_id = project.room_id;
1670                project::Entity::delete(project.into_active_model())
1671                    .exec(&*tx)
1672                    .await?;
1673                let room = self.get_room(room_id, &tx).await?;
1674                Ok((room_id, (room, guest_connection_ids)))
1675            } else {
1676                Err(anyhow!("cannot unshare a project hosted by another user"))?
1677            }
1678        })
1679        .await
1680    }
1681
1682    pub async fn update_project(
1683        &self,
1684        project_id: ProjectId,
1685        connection_id: ConnectionId,
1686        worktrees: &[proto::WorktreeMetadata],
1687    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
1688        self.room_transaction(|tx| async move {
1689            let project = project::Entity::find_by_id(project_id)
1690                .filter(project::Column::HostConnectionId.eq(connection_id.0 as i32))
1691                .one(&*tx)
1692                .await?
1693                .ok_or_else(|| anyhow!("no such project"))?;
1694
1695            if !worktrees.is_empty() {
1696                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
1697                    worktree::ActiveModel {
1698                        id: ActiveValue::set(worktree.id as i64),
1699                        project_id: ActiveValue::set(project.id),
1700                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
1701                        root_name: ActiveValue::set(worktree.root_name.clone()),
1702                        visible: ActiveValue::set(worktree.visible),
1703                        scan_id: ActiveValue::set(0),
1704                        is_complete: ActiveValue::set(false),
1705                    }
1706                }))
1707                .on_conflict(
1708                    OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
1709                        .update_column(worktree::Column::RootName)
1710                        .to_owned(),
1711                )
1712                .exec(&*tx)
1713                .await?;
1714            }
1715
1716            worktree::Entity::delete_many()
1717                .filter(
1718                    worktree::Column::ProjectId.eq(project.id).and(
1719                        worktree::Column::Id
1720                            .is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
1721                    ),
1722                )
1723                .exec(&*tx)
1724                .await?;
1725
1726            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
1727            let room = self.get_room(project.room_id, &tx).await?;
1728            Ok((project.room_id, (room, guest_connection_ids)))
1729        })
1730        .await
1731    }
1732
1733    pub async fn update_worktree(
1734        &self,
1735        update: &proto::UpdateWorktree,
1736        connection_id: ConnectionId,
1737    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
1738        self.room_transaction(|tx| async move {
1739            let project_id = ProjectId::from_proto(update.project_id);
1740            let worktree_id = update.worktree_id as i64;
1741
1742            // Ensure the update comes from the host.
1743            let project = project::Entity::find_by_id(project_id)
1744                .filter(project::Column::HostConnectionId.eq(connection_id.0 as i32))
1745                .one(&*tx)
1746                .await?
1747                .ok_or_else(|| anyhow!("no such project"))?;
1748            let room_id = project.room_id;
1749
1750            // Update metadata.
1751            worktree::Entity::update(worktree::ActiveModel {
1752                id: ActiveValue::set(worktree_id),
1753                project_id: ActiveValue::set(project_id),
1754                root_name: ActiveValue::set(update.root_name.clone()),
1755                scan_id: ActiveValue::set(update.scan_id as i64),
1756                is_complete: ActiveValue::set(update.is_last_update),
1757                abs_path: ActiveValue::set(update.abs_path.clone()),
1758                ..Default::default()
1759            })
1760            .exec(&*tx)
1761            .await?;
1762
1763            if !update.updated_entries.is_empty() {
1764                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
1765                    let mtime = entry.mtime.clone().unwrap_or_default();
1766                    worktree_entry::ActiveModel {
1767                        project_id: ActiveValue::set(project_id),
1768                        worktree_id: ActiveValue::set(worktree_id),
1769                        id: ActiveValue::set(entry.id as i64),
1770                        is_dir: ActiveValue::set(entry.is_dir),
1771                        path: ActiveValue::set(entry.path.clone()),
1772                        inode: ActiveValue::set(entry.inode as i64),
1773                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
1774                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
1775                        is_symlink: ActiveValue::set(entry.is_symlink),
1776                        is_ignored: ActiveValue::set(entry.is_ignored),
1777                    }
1778                }))
1779                .on_conflict(
1780                    OnConflict::columns([
1781                        worktree_entry::Column::ProjectId,
1782                        worktree_entry::Column::WorktreeId,
1783                        worktree_entry::Column::Id,
1784                    ])
1785                    .update_columns([
1786                        worktree_entry::Column::IsDir,
1787                        worktree_entry::Column::Path,
1788                        worktree_entry::Column::Inode,
1789                        worktree_entry::Column::MtimeSeconds,
1790                        worktree_entry::Column::MtimeNanos,
1791                        worktree_entry::Column::IsSymlink,
1792                        worktree_entry::Column::IsIgnored,
1793                    ])
1794                    .to_owned(),
1795                )
1796                .exec(&*tx)
1797                .await?;
1798            }
1799
1800            if !update.removed_entries.is_empty() {
1801                worktree_entry::Entity::delete_many()
1802                    .filter(
1803                        worktree_entry::Column::ProjectId
1804                            .eq(project_id)
1805                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
1806                            .and(
1807                                worktree_entry::Column::Id
1808                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
1809                            ),
1810                    )
1811                    .exec(&*tx)
1812                    .await?;
1813            }
1814
1815            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
1816            Ok((room_id, connection_ids))
1817        })
1818        .await
1819    }
1820
1821    pub async fn update_diagnostic_summary(
1822        &self,
1823        update: &proto::UpdateDiagnosticSummary,
1824        connection_id: ConnectionId,
1825    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
1826        self.room_transaction(|tx| async move {
1827            let project_id = ProjectId::from_proto(update.project_id);
1828            let worktree_id = update.worktree_id as i64;
1829            let summary = update
1830                .summary
1831                .as_ref()
1832                .ok_or_else(|| anyhow!("invalid summary"))?;
1833
1834            // Ensure the update comes from the host.
1835            let project = project::Entity::find_by_id(project_id)
1836                .one(&*tx)
1837                .await?
1838                .ok_or_else(|| anyhow!("no such project"))?;
1839            if project.host_connection_id != connection_id.0 as i32 {
1840                return Err(anyhow!("can't update a project hosted by someone else"))?;
1841            }
1842
1843            // Update summary.
1844            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
1845                project_id: ActiveValue::set(project_id),
1846                worktree_id: ActiveValue::set(worktree_id),
1847                path: ActiveValue::set(summary.path.clone()),
1848                language_server_id: ActiveValue::set(summary.language_server_id as i64),
1849                error_count: ActiveValue::set(summary.error_count as i32),
1850                warning_count: ActiveValue::set(summary.warning_count as i32),
1851                ..Default::default()
1852            })
1853            .on_conflict(
1854                OnConflict::columns([
1855                    worktree_diagnostic_summary::Column::ProjectId,
1856                    worktree_diagnostic_summary::Column::WorktreeId,
1857                    worktree_diagnostic_summary::Column::Path,
1858                ])
1859                .update_columns([
1860                    worktree_diagnostic_summary::Column::LanguageServerId,
1861                    worktree_diagnostic_summary::Column::ErrorCount,
1862                    worktree_diagnostic_summary::Column::WarningCount,
1863                ])
1864                .to_owned(),
1865            )
1866            .exec(&*tx)
1867            .await?;
1868
1869            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
1870            Ok((project.room_id, connection_ids))
1871        })
1872        .await
1873    }
1874
1875    pub async fn start_language_server(
1876        &self,
1877        update: &proto::StartLanguageServer,
1878        connection_id: ConnectionId,
1879    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
1880        self.room_transaction(|tx| async move {
1881            let project_id = ProjectId::from_proto(update.project_id);
1882            let server = update
1883                .server
1884                .as_ref()
1885                .ok_or_else(|| anyhow!("invalid language server"))?;
1886
1887            // Ensure the update comes from the host.
1888            let project = project::Entity::find_by_id(project_id)
1889                .one(&*tx)
1890                .await?
1891                .ok_or_else(|| anyhow!("no such project"))?;
1892            if project.host_connection_id != connection_id.0 as i32 {
1893                return Err(anyhow!("can't update a project hosted by someone else"))?;
1894            }
1895
1896            // Add the newly-started language server.
1897            language_server::Entity::insert(language_server::ActiveModel {
1898                project_id: ActiveValue::set(project_id),
1899                id: ActiveValue::set(server.id as i64),
1900                name: ActiveValue::set(server.name.clone()),
1901                ..Default::default()
1902            })
1903            .on_conflict(
1904                OnConflict::columns([
1905                    language_server::Column::ProjectId,
1906                    language_server::Column::Id,
1907                ])
1908                .update_column(language_server::Column::Name)
1909                .to_owned(),
1910            )
1911            .exec(&*tx)
1912            .await?;
1913
1914            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
1915            Ok((project.room_id, connection_ids))
1916        })
1917        .await
1918    }
1919
1920    pub async fn join_project(
1921        &self,
1922        project_id: ProjectId,
1923        connection_id: ConnectionId,
1924    ) -> Result<RoomGuard<(Project, ReplicaId)>> {
1925        self.room_transaction(|tx| async move {
1926            let participant = room_participant::Entity::find()
1927                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32))
1928                .one(&*tx)
1929                .await?
1930                .ok_or_else(|| anyhow!("must join a room first"))?;
1931
1932            let project = project::Entity::find_by_id(project_id)
1933                .one(&*tx)
1934                .await?
1935                .ok_or_else(|| anyhow!("no such project"))?;
1936            if project.room_id != participant.room_id {
1937                return Err(anyhow!("no such project"))?;
1938            }
1939
1940            let mut collaborators = project
1941                .find_related(project_collaborator::Entity)
1942                .all(&*tx)
1943                .await?;
1944            let replica_ids = collaborators
1945                .iter()
1946                .map(|c| c.replica_id)
1947                .collect::<HashSet<_>>();
1948            let mut replica_id = ReplicaId(1);
1949            while replica_ids.contains(&replica_id) {
1950                replica_id.0 += 1;
1951            }
1952            let new_collaborator = project_collaborator::ActiveModel {
1953                project_id: ActiveValue::set(project_id),
1954                connection_id: ActiveValue::set(connection_id.0 as i32),
1955                connection_epoch: ActiveValue::set(self.epoch()),
1956                user_id: ActiveValue::set(participant.user_id),
1957                replica_id: ActiveValue::set(replica_id),
1958                is_host: ActiveValue::set(false),
1959                ..Default::default()
1960            }
1961            .insert(&*tx)
1962            .await?;
1963            collaborators.push(new_collaborator);
1964
1965            let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1966            let mut worktrees = db_worktrees
1967                .into_iter()
1968                .map(|db_worktree| {
1969                    (
1970                        db_worktree.id as u64,
1971                        Worktree {
1972                            id: db_worktree.id as u64,
1973                            abs_path: db_worktree.abs_path,
1974                            root_name: db_worktree.root_name,
1975                            visible: db_worktree.visible,
1976                            entries: Default::default(),
1977                            diagnostic_summaries: Default::default(),
1978                            scan_id: db_worktree.scan_id as u64,
1979                            is_complete: db_worktree.is_complete,
1980                        },
1981                    )
1982                })
1983                .collect::<BTreeMap<_, _>>();
1984
1985            // Populate worktree entries.
1986            {
1987                let mut db_entries = worktree_entry::Entity::find()
1988                    .filter(worktree_entry::Column::ProjectId.eq(project_id))
1989                    .stream(&*tx)
1990                    .await?;
1991                while let Some(db_entry) = db_entries.next().await {
1992                    let db_entry = db_entry?;
1993                    if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
1994                        worktree.entries.push(proto::Entry {
1995                            id: db_entry.id as u64,
1996                            is_dir: db_entry.is_dir,
1997                            path: db_entry.path,
1998                            inode: db_entry.inode as u64,
1999                            mtime: Some(proto::Timestamp {
2000                                seconds: db_entry.mtime_seconds as u64,
2001                                nanos: db_entry.mtime_nanos as u32,
2002                            }),
2003                            is_symlink: db_entry.is_symlink,
2004                            is_ignored: db_entry.is_ignored,
2005                        });
2006                    }
2007                }
2008            }
2009
2010            // Populate worktree diagnostic summaries.
2011            {
2012                let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2013                    .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2014                    .stream(&*tx)
2015                    .await?;
2016                while let Some(db_summary) = db_summaries.next().await {
2017                    let db_summary = db_summary?;
2018                    if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2019                        worktree
2020                            .diagnostic_summaries
2021                            .push(proto::DiagnosticSummary {
2022                                path: db_summary.path,
2023                                language_server_id: db_summary.language_server_id as u64,
2024                                error_count: db_summary.error_count as u32,
2025                                warning_count: db_summary.warning_count as u32,
2026                            });
2027                    }
2028                }
2029            }
2030
2031            // Populate language servers.
2032            let language_servers = project
2033                .find_related(language_server::Entity)
2034                .all(&*tx)
2035                .await?;
2036
2037            let room_id = project.room_id;
2038            let project = Project {
2039                collaborators,
2040                worktrees,
2041                language_servers: language_servers
2042                    .into_iter()
2043                    .map(|language_server| proto::LanguageServer {
2044                        id: language_server.id as u64,
2045                        name: language_server.name,
2046                    })
2047                    .collect(),
2048            };
2049            Ok((room_id, (project, replica_id as ReplicaId)))
2050        })
2051        .await
2052    }
2053
2054    pub async fn leave_project(
2055        &self,
2056        project_id: ProjectId,
2057        connection_id: ConnectionId,
2058    ) -> Result<RoomGuard<LeftProject>> {
2059        self.room_transaction(|tx| async move {
2060            let result = project_collaborator::Entity::delete_many()
2061                .filter(
2062                    project_collaborator::Column::ProjectId
2063                        .eq(project_id)
2064                        .and(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32)),
2065                )
2066                .exec(&*tx)
2067                .await?;
2068            if result.rows_affected == 0 {
2069                Err(anyhow!("not a collaborator on this project"))?;
2070            }
2071
2072            let project = project::Entity::find_by_id(project_id)
2073                .one(&*tx)
2074                .await?
2075                .ok_or_else(|| anyhow!("no such project"))?;
2076            let collaborators = project
2077                .find_related(project_collaborator::Entity)
2078                .all(&*tx)
2079                .await?;
2080            let connection_ids = collaborators
2081                .into_iter()
2082                .map(|collaborator| ConnectionId(collaborator.connection_id as u32))
2083                .collect();
2084
2085            let left_project = LeftProject {
2086                id: project_id,
2087                host_user_id: project.host_user_id,
2088                host_connection_id: ConnectionId(project.host_connection_id as u32),
2089                connection_ids,
2090            };
2091            Ok((project.room_id, left_project))
2092        })
2093        .await
2094    }
2095
2096    pub async fn project_collaborators(
2097        &self,
2098        project_id: ProjectId,
2099        connection_id: ConnectionId,
2100    ) -> Result<RoomGuard<Vec<project_collaborator::Model>>> {
2101        self.room_transaction(|tx| async move {
2102            let project = project::Entity::find_by_id(project_id)
2103                .one(&*tx)
2104                .await?
2105                .ok_or_else(|| anyhow!("no such project"))?;
2106            let collaborators = project_collaborator::Entity::find()
2107                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2108                .all(&*tx)
2109                .await?;
2110
2111            if collaborators
2112                .iter()
2113                .any(|collaborator| collaborator.connection_id == connection_id.0 as i32)
2114            {
2115                Ok((project.room_id, collaborators))
2116            } else {
2117                Err(anyhow!("no such project"))?
2118            }
2119        })
2120        .await
2121    }
2122
2123    pub async fn project_connection_ids(
2124        &self,
2125        project_id: ProjectId,
2126        connection_id: ConnectionId,
2127    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
2128        self.room_transaction(|tx| async move {
2129            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2130            enum QueryAs {
2131                ConnectionId,
2132            }
2133
2134            let project = project::Entity::find_by_id(project_id)
2135                .one(&*tx)
2136                .await?
2137                .ok_or_else(|| anyhow!("no such project"))?;
2138            let mut db_connection_ids = project_collaborator::Entity::find()
2139                .select_only()
2140                .column_as(
2141                    project_collaborator::Column::ConnectionId,
2142                    QueryAs::ConnectionId,
2143                )
2144                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2145                .into_values::<i32, QueryAs>()
2146                .stream(&*tx)
2147                .await?;
2148
2149            let mut connection_ids = HashSet::default();
2150            while let Some(connection_id) = db_connection_ids.next().await {
2151                connection_ids.insert(ConnectionId(connection_id? as u32));
2152            }
2153
2154            if connection_ids.contains(&connection_id) {
2155                Ok((project.room_id, connection_ids))
2156            } else {
2157                Err(anyhow!("no such project"))?
2158            }
2159        })
2160        .await
2161    }
2162
2163    async fn project_guest_connection_ids(
2164        &self,
2165        project_id: ProjectId,
2166        tx: &DatabaseTransaction,
2167    ) -> Result<Vec<ConnectionId>> {
2168        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2169        enum QueryAs {
2170            ConnectionId,
2171        }
2172
2173        let mut db_guest_connection_ids = project_collaborator::Entity::find()
2174            .select_only()
2175            .column_as(
2176                project_collaborator::Column::ConnectionId,
2177                QueryAs::ConnectionId,
2178            )
2179            .filter(
2180                project_collaborator::Column::ProjectId
2181                    .eq(project_id)
2182                    .and(project_collaborator::Column::IsHost.eq(false)),
2183            )
2184            .into_values::<i32, QueryAs>()
2185            .stream(tx)
2186            .await?;
2187
2188        let mut guest_connection_ids = Vec::new();
2189        while let Some(connection_id) = db_guest_connection_ids.next().await {
2190            guest_connection_ids.push(ConnectionId(connection_id? as u32));
2191        }
2192        Ok(guest_connection_ids)
2193    }
2194
2195    // access tokens
2196
2197    pub async fn create_access_token_hash(
2198        &self,
2199        user_id: UserId,
2200        access_token_hash: &str,
2201        max_access_token_count: usize,
2202    ) -> Result<()> {
2203        self.transaction(|tx| async {
2204            let tx = tx;
2205
2206            access_token::ActiveModel {
2207                user_id: ActiveValue::set(user_id),
2208                hash: ActiveValue::set(access_token_hash.into()),
2209                ..Default::default()
2210            }
2211            .insert(&*tx)
2212            .await?;
2213
2214            access_token::Entity::delete_many()
2215                .filter(
2216                    access_token::Column::Id.in_subquery(
2217                        Query::select()
2218                            .column(access_token::Column::Id)
2219                            .from(access_token::Entity)
2220                            .and_where(access_token::Column::UserId.eq(user_id))
2221                            .order_by(access_token::Column::Id, sea_orm::Order::Desc)
2222                            .limit(10000)
2223                            .offset(max_access_token_count as u64)
2224                            .to_owned(),
2225                    ),
2226                )
2227                .exec(&*tx)
2228                .await?;
2229            Ok(())
2230        })
2231        .await
2232    }
2233
2234    pub async fn get_access_token_hashes(&self, user_id: UserId) -> Result<Vec<String>> {
2235        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2236        enum QueryAs {
2237            Hash,
2238        }
2239
2240        self.transaction(|tx| async move {
2241            Ok(access_token::Entity::find()
2242                .select_only()
2243                .column(access_token::Column::Hash)
2244                .filter(access_token::Column::UserId.eq(user_id))
2245                .order_by_desc(access_token::Column::Id)
2246                .into_values::<_, QueryAs>()
2247                .all(&*tx)
2248                .await?)
2249        })
2250        .await
2251    }
2252
2253    async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
2254    where
2255        F: Send + Fn(TransactionHandle) -> Fut,
2256        Fut: Send + Future<Output = Result<T>>,
2257    {
2258        let body = async {
2259            loop {
2260                let (tx, result) = self.with_transaction(&f).await?;
2261                match result {
2262                    Ok(result) => {
2263                        match tx.commit().await.map_err(Into::into) {
2264                            Ok(()) => return Ok(result),
2265                            Err(error) => {
2266                                if is_serialization_error(&error) {
2267                                    // Retry (don't break the loop)
2268                                } else {
2269                                    return Err(error);
2270                                }
2271                            }
2272                        }
2273                    }
2274                    Err(error) => {
2275                        tx.rollback().await?;
2276                        if is_serialization_error(&error) {
2277                            // Retry (don't break the loop)
2278                        } else {
2279                            return Err(error);
2280                        }
2281                    }
2282                }
2283            }
2284        };
2285
2286        self.run(body).await
2287    }
2288
2289    async fn room_transaction<F, Fut, T>(&self, f: F) -> Result<RoomGuard<T>>
2290    where
2291        F: Send + Fn(TransactionHandle) -> Fut,
2292        Fut: Send + Future<Output = Result<(RoomId, T)>>,
2293    {
2294        let body = async {
2295            loop {
2296                let (tx, result) = self.with_transaction(&f).await?;
2297                match result {
2298                    Ok((room_id, data)) => {
2299                        let lock = self.rooms.entry(room_id).or_default().clone();
2300                        let _guard = lock.lock_owned().await;
2301                        match tx.commit().await.map_err(Into::into) {
2302                            Ok(()) => {
2303                                return Ok(RoomGuard {
2304                                    data,
2305                                    _guard,
2306                                    _not_send: PhantomData,
2307                                });
2308                            }
2309                            Err(error) => {
2310                                if is_serialization_error(&error) {
2311                                    // Retry (don't break the loop)
2312                                } else {
2313                                    return Err(error);
2314                                }
2315                            }
2316                        }
2317                    }
2318                    Err(error) => {
2319                        tx.rollback().await?;
2320                        if is_serialization_error(&error) {
2321                            // Retry (don't break the loop)
2322                        } else {
2323                            return Err(error);
2324                        }
2325                    }
2326                }
2327            }
2328        };
2329
2330        self.run(body).await
2331    }
2332
2333    async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
2334    where
2335        F: Send + Fn(TransactionHandle) -> Fut,
2336        Fut: Send + Future<Output = Result<T>>,
2337    {
2338        let tx = self
2339            .pool
2340            .begin_with_config(Some(IsolationLevel::Serializable), None)
2341            .await?;
2342
2343        let mut tx = Arc::new(Some(tx));
2344        let result = f(TransactionHandle(tx.clone())).await;
2345        let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
2346            return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
2347        };
2348
2349        Ok((tx, result))
2350    }
2351
2352    async fn run<F, T>(&self, future: F) -> T
2353    where
2354        F: Future<Output = T>,
2355    {
2356        #[cfg(test)]
2357        {
2358            if let Some(background) = self.background.as_ref() {
2359                background.simulate_random_delay().await;
2360            }
2361
2362            self.runtime.as_ref().unwrap().block_on(future)
2363        }
2364
2365        #[cfg(not(test))]
2366        {
2367            future.await
2368        }
2369    }
2370}
2371
2372fn is_serialization_error(error: &Error) -> bool {
2373    const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
2374    match error {
2375        Error::Database(
2376            DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
2377            | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
2378        ) if error
2379            .as_database_error()
2380            .and_then(|error| error.code())
2381            .as_deref()
2382            == Some(SERIALIZATION_FAILURE_CODE) =>
2383        {
2384            true
2385        }
2386        _ => false,
2387    }
2388}
2389
2390struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
2391
2392impl Deref for TransactionHandle {
2393    type Target = DatabaseTransaction;
2394
2395    fn deref(&self) -> &Self::Target {
2396        self.0.as_ref().as_ref().unwrap()
2397    }
2398}
2399
2400pub struct RoomGuard<T> {
2401    data: T,
2402    _guard: OwnedMutexGuard<()>,
2403    _not_send: PhantomData<Rc<()>>,
2404}
2405
2406impl<T> Deref for RoomGuard<T> {
2407    type Target = T;
2408
2409    fn deref(&self) -> &T {
2410        &self.data
2411    }
2412}
2413
2414impl<T> DerefMut for RoomGuard<T> {
2415    fn deref_mut(&mut self) -> &mut T {
2416        &mut self.data
2417    }
2418}
2419
2420#[derive(Debug, Serialize, Deserialize)]
2421pub struct NewUserParams {
2422    pub github_login: String,
2423    pub github_user_id: i32,
2424    pub invite_count: i32,
2425}
2426
2427#[derive(Debug)]
2428pub struct NewUserResult {
2429    pub user_id: UserId,
2430    pub metrics_id: String,
2431    pub inviting_user_id: Option<UserId>,
2432    pub signup_device_id: Option<String>,
2433}
2434
2435fn random_invite_code() -> String {
2436    nanoid::nanoid!(16)
2437}
2438
2439fn random_email_confirmation_code() -> String {
2440    nanoid::nanoid!(64)
2441}
2442
2443macro_rules! id_type {
2444    ($name:ident) => {
2445        #[derive(
2446            Clone,
2447            Copy,
2448            Debug,
2449            Default,
2450            PartialEq,
2451            Eq,
2452            PartialOrd,
2453            Ord,
2454            Hash,
2455            Serialize,
2456            Deserialize,
2457        )]
2458        #[serde(transparent)]
2459        pub struct $name(pub i32);
2460
2461        impl $name {
2462            #[allow(unused)]
2463            pub const MAX: Self = Self(i32::MAX);
2464
2465            #[allow(unused)]
2466            pub fn from_proto(value: u64) -> Self {
2467                Self(value as i32)
2468            }
2469
2470            #[allow(unused)]
2471            pub fn to_proto(self) -> u64 {
2472                self.0 as u64
2473            }
2474        }
2475
2476        impl std::fmt::Display for $name {
2477            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2478                self.0.fmt(f)
2479            }
2480        }
2481
2482        impl From<$name> for sea_query::Value {
2483            fn from(value: $name) -> Self {
2484                sea_query::Value::Int(Some(value.0))
2485            }
2486        }
2487
2488        impl sea_orm::TryGetable for $name {
2489            fn try_get(
2490                res: &sea_orm::QueryResult,
2491                pre: &str,
2492                col: &str,
2493            ) -> Result<Self, sea_orm::TryGetError> {
2494                Ok(Self(i32::try_get(res, pre, col)?))
2495            }
2496        }
2497
2498        impl sea_query::ValueType for $name {
2499            fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
2500                match v {
2501                    Value::TinyInt(Some(int)) => {
2502                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2503                    }
2504                    Value::SmallInt(Some(int)) => {
2505                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2506                    }
2507                    Value::Int(Some(int)) => {
2508                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2509                    }
2510                    Value::BigInt(Some(int)) => {
2511                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2512                    }
2513                    Value::TinyUnsigned(Some(int)) => {
2514                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2515                    }
2516                    Value::SmallUnsigned(Some(int)) => {
2517                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2518                    }
2519                    Value::Unsigned(Some(int)) => {
2520                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2521                    }
2522                    Value::BigUnsigned(Some(int)) => {
2523                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2524                    }
2525                    _ => Err(sea_query::ValueTypeErr),
2526                }
2527            }
2528
2529            fn type_name() -> String {
2530                stringify!($name).into()
2531            }
2532
2533            fn array_type() -> sea_query::ArrayType {
2534                sea_query::ArrayType::Int
2535            }
2536
2537            fn column_type() -> sea_query::ColumnType {
2538                sea_query::ColumnType::Integer(None)
2539            }
2540        }
2541
2542        impl sea_orm::TryFromU64 for $name {
2543            fn try_from_u64(n: u64) -> Result<Self, DbErr> {
2544                Ok(Self(n.try_into().map_err(|_| {
2545                    DbErr::ConvertFromU64(concat!(
2546                        "error converting ",
2547                        stringify!($name),
2548                        " to u64"
2549                    ))
2550                })?))
2551            }
2552        }
2553
2554        impl sea_query::Nullable for $name {
2555            fn null() -> Value {
2556                Value::Int(None)
2557            }
2558        }
2559    };
2560}
2561
2562id_type!(AccessTokenId);
2563id_type!(ContactId);
2564id_type!(RoomId);
2565id_type!(RoomParticipantId);
2566id_type!(ProjectId);
2567id_type!(ProjectCollaboratorId);
2568id_type!(ReplicaId);
2569id_type!(SignupId);
2570id_type!(UserId);
2571
2572pub struct LeftRoom {
2573    pub room: proto::Room,
2574    pub left_projects: HashMap<ProjectId, LeftProject>,
2575    pub canceled_calls_to_user_ids: Vec<UserId>,
2576}
2577
2578pub struct Project {
2579    pub collaborators: Vec<project_collaborator::Model>,
2580    pub worktrees: BTreeMap<u64, Worktree>,
2581    pub language_servers: Vec<proto::LanguageServer>,
2582}
2583
2584pub struct LeftProject {
2585    pub id: ProjectId,
2586    pub host_user_id: UserId,
2587    pub host_connection_id: ConnectionId,
2588    pub connection_ids: Vec<ConnectionId>,
2589}
2590
2591pub struct Worktree {
2592    pub id: u64,
2593    pub abs_path: String,
2594    pub root_name: String,
2595    pub visible: bool,
2596    pub entries: Vec<proto::Entry>,
2597    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
2598    pub scan_id: u64,
2599    pub is_complete: bool,
2600}
2601
2602#[cfg(test)]
2603pub use test::*;
2604
2605#[cfg(test)]
2606mod test {
2607    use super::*;
2608    use gpui::executor::Background;
2609    use lazy_static::lazy_static;
2610    use parking_lot::Mutex;
2611    use rand::prelude::*;
2612    use sea_orm::ConnectionTrait;
2613    use sqlx::migrate::MigrateDatabase;
2614    use std::sync::Arc;
2615
2616    pub struct TestDb {
2617        pub db: Option<Arc<Database>>,
2618        pub connection: Option<sqlx::AnyConnection>,
2619    }
2620
2621    impl TestDb {
2622        pub fn sqlite(background: Arc<Background>) -> Self {
2623            let url = format!("sqlite::memory:");
2624            let runtime = tokio::runtime::Builder::new_current_thread()
2625                .enable_io()
2626                .enable_time()
2627                .build()
2628                .unwrap();
2629
2630            let mut db = runtime.block_on(async {
2631                let mut options = ConnectOptions::new(url);
2632                options.max_connections(5);
2633                let db = Database::new(options).await.unwrap();
2634                let sql = include_str!(concat!(
2635                    env!("CARGO_MANIFEST_DIR"),
2636                    "/migrations.sqlite/20221109000000_test_schema.sql"
2637                ));
2638                db.pool
2639                    .execute(sea_orm::Statement::from_string(
2640                        db.pool.get_database_backend(),
2641                        sql.into(),
2642                    ))
2643                    .await
2644                    .unwrap();
2645                db
2646            });
2647
2648            db.background = Some(background);
2649            db.runtime = Some(runtime);
2650
2651            Self {
2652                db: Some(Arc::new(db)),
2653                connection: None,
2654            }
2655        }
2656
2657        pub fn postgres(background: Arc<Background>) -> Self {
2658            lazy_static! {
2659                static ref LOCK: Mutex<()> = Mutex::new(());
2660            }
2661
2662            let _guard = LOCK.lock();
2663            let mut rng = StdRng::from_entropy();
2664            let url = format!(
2665                "postgres://postgres@localhost/zed-test-{}",
2666                rng.gen::<u128>()
2667            );
2668            let runtime = tokio::runtime::Builder::new_current_thread()
2669                .enable_io()
2670                .enable_time()
2671                .build()
2672                .unwrap();
2673
2674            let mut db = runtime.block_on(async {
2675                sqlx::Postgres::create_database(&url)
2676                    .await
2677                    .expect("failed to create test db");
2678                let mut options = ConnectOptions::new(url);
2679                options
2680                    .max_connections(5)
2681                    .idle_timeout(Duration::from_secs(0));
2682                let db = Database::new(options).await.unwrap();
2683                let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
2684                db.migrate(Path::new(migrations_path), false).await.unwrap();
2685                db
2686            });
2687
2688            db.background = Some(background);
2689            db.runtime = Some(runtime);
2690
2691            Self {
2692                db: Some(Arc::new(db)),
2693                connection: None,
2694            }
2695        }
2696
2697        pub fn db(&self) -> &Arc<Database> {
2698            self.db.as_ref().unwrap()
2699        }
2700    }
2701
2702    impl Drop for TestDb {
2703        fn drop(&mut self) {
2704            let db = self.db.take().unwrap();
2705            if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
2706                db.runtime.as_ref().unwrap().block_on(async {
2707                    use util::ResultExt;
2708                    let query = "
2709                        SELECT pg_terminate_backend(pg_stat_activity.pid)
2710                        FROM pg_stat_activity
2711                        WHERE
2712                            pg_stat_activity.datname = current_database() AND
2713                            pid <> pg_backend_pid();
2714                    ";
2715                    db.pool
2716                        .execute(sea_orm::Statement::from_string(
2717                            db.pool.get_database_backend(),
2718                            query.into(),
2719                        ))
2720                        .await
2721                        .log_err();
2722                    sqlx::Postgres::drop_database(db.options.get_url())
2723                        .await
2724                        .log_err();
2725                })
2726            }
2727        }
2728    }
2729}