db.rs

   1mod access_token;
   2mod contact;
   3mod language_server;
   4mod project;
   5mod project_collaborator;
   6mod room;
   7mod room_participant;
   8mod signup;
   9#[cfg(test)]
  10mod tests;
  11mod user;
  12mod worktree;
  13mod worktree_diagnostic_summary;
  14mod worktree_entry;
  15
  16use crate::{Error, Result};
  17use anyhow::anyhow;
  18use collections::{BTreeMap, HashMap, HashSet};
  19pub use contact::Contact;
  20use dashmap::DashMap;
  21use futures::StreamExt;
  22use hyper::StatusCode;
  23use rpc::{proto, ConnectionId};
  24use sea_orm::Condition;
  25pub use sea_orm::ConnectOptions;
  26use sea_orm::{
  27    entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
  28    DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
  29    Statement, TransactionTrait,
  30};
  31use sea_query::{Alias, Expr, OnConflict, Query};
  32use serde::{Deserialize, Serialize};
  33pub use signup::{Invite, NewSignup, WaitlistSummary};
  34use sqlx::migrate::{Migrate, Migration, MigrationSource};
  35use sqlx::Connection;
  36use std::ops::{Deref, DerefMut};
  37use std::path::Path;
  38use std::time::Duration;
  39use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
  40use tokio::sync::{Mutex, OwnedMutexGuard};
  41pub use user::Model as User;
  42
  43pub struct Database {
  44    options: ConnectOptions,
  45    pool: DatabaseConnection,
  46    rooms: DashMap<RoomId, Arc<Mutex<()>>>,
  47    #[cfg(test)]
  48    background: Option<std::sync::Arc<gpui::executor::Background>>,
  49    #[cfg(test)]
  50    runtime: Option<tokio::runtime::Runtime>,
  51    epoch: parking_lot::RwLock<Uuid>,
  52}
  53
  54impl Database {
  55    pub async fn new(options: ConnectOptions) -> Result<Self> {
  56        Ok(Self {
  57            options: options.clone(),
  58            pool: sea_orm::Database::connect(options).await?,
  59            rooms: DashMap::with_capacity(16384),
  60            #[cfg(test)]
  61            background: None,
  62            #[cfg(test)]
  63            runtime: None,
  64            epoch: parking_lot::RwLock::new(Uuid::new_v4()),
  65        })
  66    }
  67
  68    #[cfg(test)]
  69    pub fn reset(&self) {
  70        self.rooms.clear();
  71        *self.epoch.write() = Uuid::new_v4();
  72    }
  73
  74    fn epoch(&self) -> Uuid {
  75        *self.epoch.read()
  76    }
  77
  78    pub async fn migrate(
  79        &self,
  80        migrations_path: &Path,
  81        ignore_checksum_mismatch: bool,
  82    ) -> anyhow::Result<Vec<(Migration, Duration)>> {
  83        let migrations = MigrationSource::resolve(migrations_path)
  84            .await
  85            .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
  86
  87        let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
  88
  89        connection.ensure_migrations_table().await?;
  90        let applied_migrations: HashMap<_, _> = connection
  91            .list_applied_migrations()
  92            .await?
  93            .into_iter()
  94            .map(|m| (m.version, m))
  95            .collect();
  96
  97        let mut new_migrations = Vec::new();
  98        for migration in migrations {
  99            match applied_migrations.get(&migration.version) {
 100                Some(applied_migration) => {
 101                    if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
 102                    {
 103                        Err(anyhow!(
 104                            "checksum mismatch for applied migration {}",
 105                            migration.description
 106                        ))?;
 107                    }
 108                }
 109                None => {
 110                    let elapsed = connection.apply(&migration).await?;
 111                    new_migrations.push((migration, elapsed));
 112                }
 113            }
 114        }
 115
 116        Ok(new_migrations)
 117    }
 118
 119    pub async fn delete_stale_projects(&self) -> Result<()> {
 120        self.transaction(|tx| async move {
 121            project_collaborator::Entity::delete_many()
 122                .filter(project_collaborator::Column::ConnectionEpoch.ne(self.epoch()))
 123                .exec(&*tx)
 124                .await?;
 125            project::Entity::delete_many()
 126                .filter(project::Column::HostConnectionEpoch.ne(self.epoch()))
 127                .exec(&*tx)
 128                .await?;
 129            Ok(())
 130        })
 131        .await
 132    }
 133
 134    pub async fn stale_room_ids(&self) -> Result<Vec<RoomId>> {
 135        self.transaction(|tx| async move {
 136            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 137            enum QueryAs {
 138                RoomId,
 139            }
 140
 141            Ok(room_participant::Entity::find()
 142                .select_only()
 143                .column(room_participant::Column::RoomId)
 144                .distinct()
 145                .filter(room_participant::Column::AnsweringConnectionEpoch.ne(self.epoch()))
 146                .into_values::<_, QueryAs>()
 147                .all(&*tx)
 148                .await?)
 149        })
 150        .await
 151    }
 152
 153    pub async fn refresh_room(&self, room_id: RoomId) -> Result<RoomGuard<RefreshedRoom>> {
 154        self.room_transaction(|tx| async move {
 155            let stale_participant_filter = Condition::all()
 156                .add(room_participant::Column::RoomId.eq(room_id))
 157                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
 158                .add(room_participant::Column::AnsweringConnectionEpoch.ne(self.epoch()));
 159
 160            let stale_participant_user_ids = room_participant::Entity::find()
 161                .filter(stale_participant_filter.clone())
 162                .all(&*tx)
 163                .await?
 164                .into_iter()
 165                .map(|participant| participant.user_id)
 166                .collect::<Vec<_>>();
 167
 168            // Delete participants who failed to reconnect.
 169            room_participant::Entity::delete_many()
 170                .filter(stale_participant_filter)
 171                .exec(&*tx)
 172                .await?;
 173
 174            let room = self.get_room(room_id, &tx).await?;
 175            let mut canceled_calls_to_user_ids = Vec::new();
 176            // Delete the room if it becomes empty and cancel pending calls.
 177            if room.participants.is_empty() {
 178                canceled_calls_to_user_ids.extend(
 179                    room.pending_participants
 180                        .iter()
 181                        .map(|pending_participant| UserId::from_proto(pending_participant.user_id)),
 182                );
 183                room_participant::Entity::delete_many()
 184                    .filter(room_participant::Column::RoomId.eq(room_id))
 185                    .exec(&*tx)
 186                    .await?;
 187                room::Entity::delete_by_id(room_id).exec(&*tx).await?;
 188            }
 189
 190            Ok((
 191                room_id,
 192                RefreshedRoom {
 193                    room,
 194                    stale_participant_user_ids,
 195                    canceled_calls_to_user_ids,
 196                },
 197            ))
 198        })
 199        .await
 200    }
 201
 202    // users
 203
 204    pub async fn create_user(
 205        &self,
 206        email_address: &str,
 207        admin: bool,
 208        params: NewUserParams,
 209    ) -> Result<NewUserResult> {
 210        self.transaction(|tx| async {
 211            let tx = tx;
 212            let user = user::Entity::insert(user::ActiveModel {
 213                email_address: ActiveValue::set(Some(email_address.into())),
 214                github_login: ActiveValue::set(params.github_login.clone()),
 215                github_user_id: ActiveValue::set(Some(params.github_user_id)),
 216                admin: ActiveValue::set(admin),
 217                metrics_id: ActiveValue::set(Uuid::new_v4()),
 218                ..Default::default()
 219            })
 220            .on_conflict(
 221                OnConflict::column(user::Column::GithubLogin)
 222                    .update_column(user::Column::GithubLogin)
 223                    .to_owned(),
 224            )
 225            .exec_with_returning(&*tx)
 226            .await?;
 227
 228            Ok(NewUserResult {
 229                user_id: user.id,
 230                metrics_id: user.metrics_id.to_string(),
 231                signup_device_id: None,
 232                inviting_user_id: None,
 233            })
 234        })
 235        .await
 236    }
 237
 238    pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
 239        self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
 240            .await
 241    }
 242
 243    pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
 244        self.transaction(|tx| async {
 245            let tx = tx;
 246            Ok(user::Entity::find()
 247                .filter(user::Column::Id.is_in(ids.iter().copied()))
 248                .all(&*tx)
 249                .await?)
 250        })
 251        .await
 252    }
 253
 254    pub async fn get_user_by_github_account(
 255        &self,
 256        github_login: &str,
 257        github_user_id: Option<i32>,
 258    ) -> Result<Option<User>> {
 259        self.transaction(|tx| async move {
 260            let tx = &*tx;
 261            if let Some(github_user_id) = github_user_id {
 262                if let Some(user_by_github_user_id) = user::Entity::find()
 263                    .filter(user::Column::GithubUserId.eq(github_user_id))
 264                    .one(tx)
 265                    .await?
 266                {
 267                    let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
 268                    user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
 269                    Ok(Some(user_by_github_user_id.update(tx).await?))
 270                } else if let Some(user_by_github_login) = user::Entity::find()
 271                    .filter(user::Column::GithubLogin.eq(github_login))
 272                    .one(tx)
 273                    .await?
 274                {
 275                    let mut user_by_github_login = user_by_github_login.into_active_model();
 276                    user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
 277                    Ok(Some(user_by_github_login.update(tx).await?))
 278                } else {
 279                    Ok(None)
 280                }
 281            } else {
 282                Ok(user::Entity::find()
 283                    .filter(user::Column::GithubLogin.eq(github_login))
 284                    .one(tx)
 285                    .await?)
 286            }
 287        })
 288        .await
 289    }
 290
 291    pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
 292        self.transaction(|tx| async move {
 293            Ok(user::Entity::find()
 294                .order_by_asc(user::Column::GithubLogin)
 295                .limit(limit as u64)
 296                .offset(page as u64 * limit as u64)
 297                .all(&*tx)
 298                .await?)
 299        })
 300        .await
 301    }
 302
 303    pub async fn get_users_with_no_invites(
 304        &self,
 305        invited_by_another_user: bool,
 306    ) -> Result<Vec<User>> {
 307        self.transaction(|tx| async move {
 308            Ok(user::Entity::find()
 309                .filter(
 310                    user::Column::InviteCount
 311                        .eq(0)
 312                        .and(if invited_by_another_user {
 313                            user::Column::InviterId.is_not_null()
 314                        } else {
 315                            user::Column::InviterId.is_null()
 316                        }),
 317                )
 318                .all(&*tx)
 319                .await?)
 320        })
 321        .await
 322    }
 323
 324    pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
 325        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 326        enum QueryAs {
 327            MetricsId,
 328        }
 329
 330        self.transaction(|tx| async move {
 331            let metrics_id: Uuid = user::Entity::find_by_id(id)
 332                .select_only()
 333                .column(user::Column::MetricsId)
 334                .into_values::<_, QueryAs>()
 335                .one(&*tx)
 336                .await?
 337                .ok_or_else(|| anyhow!("could not find user"))?;
 338            Ok(metrics_id.to_string())
 339        })
 340        .await
 341    }
 342
 343    pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
 344        self.transaction(|tx| async move {
 345            user::Entity::update_many()
 346                .filter(user::Column::Id.eq(id))
 347                .set(user::ActiveModel {
 348                    admin: ActiveValue::set(is_admin),
 349                    ..Default::default()
 350                })
 351                .exec(&*tx)
 352                .await?;
 353            Ok(())
 354        })
 355        .await
 356    }
 357
 358    pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
 359        self.transaction(|tx| async move {
 360            user::Entity::update_many()
 361                .filter(user::Column::Id.eq(id))
 362                .set(user::ActiveModel {
 363                    connected_once: ActiveValue::set(connected_once),
 364                    ..Default::default()
 365                })
 366                .exec(&*tx)
 367                .await?;
 368            Ok(())
 369        })
 370        .await
 371    }
 372
 373    pub async fn destroy_user(&self, id: UserId) -> Result<()> {
 374        self.transaction(|tx| async move {
 375            access_token::Entity::delete_many()
 376                .filter(access_token::Column::UserId.eq(id))
 377                .exec(&*tx)
 378                .await?;
 379            user::Entity::delete_by_id(id).exec(&*tx).await?;
 380            Ok(())
 381        })
 382        .await
 383    }
 384
 385    // contacts
 386
 387    pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
 388        #[derive(Debug, FromQueryResult)]
 389        struct ContactWithUserBusyStatuses {
 390            user_id_a: UserId,
 391            user_id_b: UserId,
 392            a_to_b: bool,
 393            accepted: bool,
 394            should_notify: bool,
 395            user_a_busy: bool,
 396            user_b_busy: bool,
 397        }
 398
 399        self.transaction(|tx| async move {
 400            let user_a_participant = Alias::new("user_a_participant");
 401            let user_b_participant = Alias::new("user_b_participant");
 402            let mut db_contacts = contact::Entity::find()
 403                .column_as(
 404                    Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
 405                        .is_not_null(),
 406                    "user_a_busy",
 407                )
 408                .column_as(
 409                    Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
 410                        .is_not_null(),
 411                    "user_b_busy",
 412                )
 413                .filter(
 414                    contact::Column::UserIdA
 415                        .eq(user_id)
 416                        .or(contact::Column::UserIdB.eq(user_id)),
 417                )
 418                .join_as(
 419                    JoinType::LeftJoin,
 420                    contact::Relation::UserARoomParticipant.def(),
 421                    user_a_participant,
 422                )
 423                .join_as(
 424                    JoinType::LeftJoin,
 425                    contact::Relation::UserBRoomParticipant.def(),
 426                    user_b_participant,
 427                )
 428                .into_model::<ContactWithUserBusyStatuses>()
 429                .stream(&*tx)
 430                .await?;
 431
 432            let mut contacts = Vec::new();
 433            while let Some(db_contact) = db_contacts.next().await {
 434                let db_contact = db_contact?;
 435                if db_contact.user_id_a == user_id {
 436                    if db_contact.accepted {
 437                        contacts.push(Contact::Accepted {
 438                            user_id: db_contact.user_id_b,
 439                            should_notify: db_contact.should_notify && db_contact.a_to_b,
 440                            busy: db_contact.user_b_busy,
 441                        });
 442                    } else if db_contact.a_to_b {
 443                        contacts.push(Contact::Outgoing {
 444                            user_id: db_contact.user_id_b,
 445                        })
 446                    } else {
 447                        contacts.push(Contact::Incoming {
 448                            user_id: db_contact.user_id_b,
 449                            should_notify: db_contact.should_notify,
 450                        });
 451                    }
 452                } else if db_contact.accepted {
 453                    contacts.push(Contact::Accepted {
 454                        user_id: db_contact.user_id_a,
 455                        should_notify: db_contact.should_notify && !db_contact.a_to_b,
 456                        busy: db_contact.user_a_busy,
 457                    });
 458                } else if db_contact.a_to_b {
 459                    contacts.push(Contact::Incoming {
 460                        user_id: db_contact.user_id_a,
 461                        should_notify: db_contact.should_notify,
 462                    });
 463                } else {
 464                    contacts.push(Contact::Outgoing {
 465                        user_id: db_contact.user_id_a,
 466                    });
 467                }
 468            }
 469
 470            contacts.sort_unstable_by_key(|contact| contact.user_id());
 471
 472            Ok(contacts)
 473        })
 474        .await
 475    }
 476
 477    pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
 478        self.transaction(|tx| async move {
 479            let participant = room_participant::Entity::find()
 480                .filter(room_participant::Column::UserId.eq(user_id))
 481                .one(&*tx)
 482                .await?;
 483            Ok(participant.is_some())
 484        })
 485        .await
 486    }
 487
 488    pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
 489        self.transaction(|tx| async move {
 490            let (id_a, id_b) = if user_id_1 < user_id_2 {
 491                (user_id_1, user_id_2)
 492            } else {
 493                (user_id_2, user_id_1)
 494            };
 495
 496            Ok(contact::Entity::find()
 497                .filter(
 498                    contact::Column::UserIdA
 499                        .eq(id_a)
 500                        .and(contact::Column::UserIdB.eq(id_b))
 501                        .and(contact::Column::Accepted.eq(true)),
 502                )
 503                .one(&*tx)
 504                .await?
 505                .is_some())
 506        })
 507        .await
 508    }
 509
 510    pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
 511        self.transaction(|tx| async move {
 512            let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
 513                (sender_id, receiver_id, true)
 514            } else {
 515                (receiver_id, sender_id, false)
 516            };
 517
 518            let rows_affected = contact::Entity::insert(contact::ActiveModel {
 519                user_id_a: ActiveValue::set(id_a),
 520                user_id_b: ActiveValue::set(id_b),
 521                a_to_b: ActiveValue::set(a_to_b),
 522                accepted: ActiveValue::set(false),
 523                should_notify: ActiveValue::set(true),
 524                ..Default::default()
 525            })
 526            .on_conflict(
 527                OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
 528                    .values([
 529                        (contact::Column::Accepted, true.into()),
 530                        (contact::Column::ShouldNotify, false.into()),
 531                    ])
 532                    .action_and_where(
 533                        contact::Column::Accepted.eq(false).and(
 534                            contact::Column::AToB
 535                                .eq(a_to_b)
 536                                .and(contact::Column::UserIdA.eq(id_b))
 537                                .or(contact::Column::AToB
 538                                    .ne(a_to_b)
 539                                    .and(contact::Column::UserIdA.eq(id_a))),
 540                        ),
 541                    )
 542                    .to_owned(),
 543            )
 544            .exec_without_returning(&*tx)
 545            .await?;
 546
 547            if rows_affected == 1 {
 548                Ok(())
 549            } else {
 550                Err(anyhow!("contact already requested"))?
 551            }
 552        })
 553        .await
 554    }
 555
 556    pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<()> {
 557        self.transaction(|tx| async move {
 558            let (id_a, id_b) = if responder_id < requester_id {
 559                (responder_id, requester_id)
 560            } else {
 561                (requester_id, responder_id)
 562            };
 563
 564            let result = contact::Entity::delete_many()
 565                .filter(
 566                    contact::Column::UserIdA
 567                        .eq(id_a)
 568                        .and(contact::Column::UserIdB.eq(id_b)),
 569                )
 570                .exec(&*tx)
 571                .await?;
 572
 573            if result.rows_affected == 1 {
 574                Ok(())
 575            } else {
 576                Err(anyhow!("no such contact"))?
 577            }
 578        })
 579        .await
 580    }
 581
 582    pub async fn dismiss_contact_notification(
 583        &self,
 584        user_id: UserId,
 585        contact_user_id: UserId,
 586    ) -> Result<()> {
 587        self.transaction(|tx| async move {
 588            let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
 589                (user_id, contact_user_id, true)
 590            } else {
 591                (contact_user_id, user_id, false)
 592            };
 593
 594            let result = contact::Entity::update_many()
 595                .set(contact::ActiveModel {
 596                    should_notify: ActiveValue::set(false),
 597                    ..Default::default()
 598                })
 599                .filter(
 600                    contact::Column::UserIdA
 601                        .eq(id_a)
 602                        .and(contact::Column::UserIdB.eq(id_b))
 603                        .and(
 604                            contact::Column::AToB
 605                                .eq(a_to_b)
 606                                .and(contact::Column::Accepted.eq(true))
 607                                .or(contact::Column::AToB
 608                                    .ne(a_to_b)
 609                                    .and(contact::Column::Accepted.eq(false))),
 610                        ),
 611                )
 612                .exec(&*tx)
 613                .await?;
 614            if result.rows_affected == 0 {
 615                Err(anyhow!("no such contact request"))?
 616            } else {
 617                Ok(())
 618            }
 619        })
 620        .await
 621    }
 622
 623    pub async fn respond_to_contact_request(
 624        &self,
 625        responder_id: UserId,
 626        requester_id: UserId,
 627        accept: bool,
 628    ) -> Result<()> {
 629        self.transaction(|tx| async move {
 630            let (id_a, id_b, a_to_b) = if responder_id < requester_id {
 631                (responder_id, requester_id, false)
 632            } else {
 633                (requester_id, responder_id, true)
 634            };
 635            let rows_affected = if accept {
 636                let result = contact::Entity::update_many()
 637                    .set(contact::ActiveModel {
 638                        accepted: ActiveValue::set(true),
 639                        should_notify: ActiveValue::set(true),
 640                        ..Default::default()
 641                    })
 642                    .filter(
 643                        contact::Column::UserIdA
 644                            .eq(id_a)
 645                            .and(contact::Column::UserIdB.eq(id_b))
 646                            .and(contact::Column::AToB.eq(a_to_b)),
 647                    )
 648                    .exec(&*tx)
 649                    .await?;
 650                result.rows_affected
 651            } else {
 652                let result = contact::Entity::delete_many()
 653                    .filter(
 654                        contact::Column::UserIdA
 655                            .eq(id_a)
 656                            .and(contact::Column::UserIdB.eq(id_b))
 657                            .and(contact::Column::AToB.eq(a_to_b))
 658                            .and(contact::Column::Accepted.eq(false)),
 659                    )
 660                    .exec(&*tx)
 661                    .await?;
 662
 663                result.rows_affected
 664            };
 665
 666            if rows_affected == 1 {
 667                Ok(())
 668            } else {
 669                Err(anyhow!("no such contact request"))?
 670            }
 671        })
 672        .await
 673    }
 674
 675    pub fn fuzzy_like_string(string: &str) -> String {
 676        let mut result = String::with_capacity(string.len() * 2 + 1);
 677        for c in string.chars() {
 678            if c.is_alphanumeric() {
 679                result.push('%');
 680                result.push(c);
 681            }
 682        }
 683        result.push('%');
 684        result
 685    }
 686
 687    pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
 688        self.transaction(|tx| async {
 689            let tx = tx;
 690            let like_string = Self::fuzzy_like_string(name_query);
 691            let query = "
 692                SELECT users.*
 693                FROM users
 694                WHERE github_login ILIKE $1
 695                ORDER BY github_login <-> $2
 696                LIMIT $3
 697            ";
 698
 699            Ok(user::Entity::find()
 700                .from_raw_sql(Statement::from_sql_and_values(
 701                    self.pool.get_database_backend(),
 702                    query.into(),
 703                    vec![like_string.into(), name_query.into(), limit.into()],
 704                ))
 705                .all(&*tx)
 706                .await?)
 707        })
 708        .await
 709    }
 710
 711    // signups
 712
 713    pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
 714        self.transaction(|tx| async move {
 715            signup::Entity::insert(signup::ActiveModel {
 716                email_address: ActiveValue::set(signup.email_address.clone()),
 717                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 718                email_confirmation_sent: ActiveValue::set(false),
 719                platform_mac: ActiveValue::set(signup.platform_mac),
 720                platform_windows: ActiveValue::set(signup.platform_windows),
 721                platform_linux: ActiveValue::set(signup.platform_linux),
 722                platform_unknown: ActiveValue::set(false),
 723                editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
 724                programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
 725                device_id: ActiveValue::set(signup.device_id.clone()),
 726                added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
 727                ..Default::default()
 728            })
 729            .on_conflict(
 730                OnConflict::column(signup::Column::EmailAddress)
 731                    .update_columns([
 732                        signup::Column::PlatformMac,
 733                        signup::Column::PlatformWindows,
 734                        signup::Column::PlatformLinux,
 735                        signup::Column::EditorFeatures,
 736                        signup::Column::ProgrammingLanguages,
 737                        signup::Column::DeviceId,
 738                        signup::Column::AddedToMailingList,
 739                    ])
 740                    .to_owned(),
 741            )
 742            .exec(&*tx)
 743            .await?;
 744            Ok(())
 745        })
 746        .await
 747    }
 748
 749    pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
 750        self.transaction(|tx| async move {
 751            let signup = signup::Entity::find()
 752                .filter(signup::Column::EmailAddress.eq(email_address))
 753                .one(&*tx)
 754                .await?
 755                .ok_or_else(|| {
 756                    anyhow!("signup with email address {} doesn't exist", email_address)
 757                })?;
 758
 759            Ok(signup)
 760        })
 761        .await
 762    }
 763
 764    pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
 765        self.transaction(|tx| async move {
 766            let query = "
 767                SELECT
 768                    COUNT(*) as count,
 769                    COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
 770                    COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
 771                    COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
 772                    COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
 773                FROM (
 774                    SELECT *
 775                    FROM signups
 776                    WHERE
 777                        NOT email_confirmation_sent
 778                ) AS unsent
 779            ";
 780            Ok(
 781                WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
 782                    self.pool.get_database_backend(),
 783                    query.into(),
 784                    vec![],
 785                ))
 786                .one(&*tx)
 787                .await?
 788                .ok_or_else(|| anyhow!("invalid result"))?,
 789            )
 790        })
 791        .await
 792    }
 793
 794    pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
 795        let emails = invites
 796            .iter()
 797            .map(|s| s.email_address.as_str())
 798            .collect::<Vec<_>>();
 799        self.transaction(|tx| async {
 800            let tx = tx;
 801            signup::Entity::update_many()
 802                .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
 803                .set(signup::ActiveModel {
 804                    email_confirmation_sent: ActiveValue::set(true),
 805                    ..Default::default()
 806                })
 807                .exec(&*tx)
 808                .await?;
 809            Ok(())
 810        })
 811        .await
 812    }
 813
 814    pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
 815        self.transaction(|tx| async move {
 816            Ok(signup::Entity::find()
 817                .select_only()
 818                .column(signup::Column::EmailAddress)
 819                .column(signup::Column::EmailConfirmationCode)
 820                .filter(
 821                    signup::Column::EmailConfirmationSent.eq(false).and(
 822                        signup::Column::PlatformMac
 823                            .eq(true)
 824                            .or(signup::Column::PlatformUnknown.eq(true)),
 825                    ),
 826                )
 827                .order_by_asc(signup::Column::CreatedAt)
 828                .limit(count as u64)
 829                .into_model()
 830                .all(&*tx)
 831                .await?)
 832        })
 833        .await
 834    }
 835
 836    // invite codes
 837
 838    pub async fn create_invite_from_code(
 839        &self,
 840        code: &str,
 841        email_address: &str,
 842        device_id: Option<&str>,
 843    ) -> Result<Invite> {
 844        self.transaction(|tx| async move {
 845            let existing_user = user::Entity::find()
 846                .filter(user::Column::EmailAddress.eq(email_address))
 847                .one(&*tx)
 848                .await?;
 849
 850            if existing_user.is_some() {
 851                Err(anyhow!("email address is already in use"))?;
 852            }
 853
 854            let inviting_user_with_invites = match user::Entity::find()
 855                .filter(
 856                    user::Column::InviteCode
 857                        .eq(code)
 858                        .and(user::Column::InviteCount.gt(0)),
 859                )
 860                .one(&*tx)
 861                .await?
 862            {
 863                Some(inviting_user) => inviting_user,
 864                None => {
 865                    return Err(Error::Http(
 866                        StatusCode::UNAUTHORIZED,
 867                        "unable to find an invite code with invites remaining".to_string(),
 868                    ))?
 869                }
 870            };
 871            user::Entity::update_many()
 872                .filter(
 873                    user::Column::Id
 874                        .eq(inviting_user_with_invites.id)
 875                        .and(user::Column::InviteCount.gt(0)),
 876                )
 877                .col_expr(
 878                    user::Column::InviteCount,
 879                    Expr::col(user::Column::InviteCount).sub(1),
 880                )
 881                .exec(&*tx)
 882                .await?;
 883
 884            let signup = signup::Entity::insert(signup::ActiveModel {
 885                email_address: ActiveValue::set(email_address.into()),
 886                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 887                email_confirmation_sent: ActiveValue::set(false),
 888                inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
 889                platform_linux: ActiveValue::set(false),
 890                platform_mac: ActiveValue::set(false),
 891                platform_windows: ActiveValue::set(false),
 892                platform_unknown: ActiveValue::set(true),
 893                device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
 894                ..Default::default()
 895            })
 896            .on_conflict(
 897                OnConflict::column(signup::Column::EmailAddress)
 898                    .update_column(signup::Column::InvitingUserId)
 899                    .to_owned(),
 900            )
 901            .exec_with_returning(&*tx)
 902            .await?;
 903
 904            Ok(Invite {
 905                email_address: signup.email_address,
 906                email_confirmation_code: signup.email_confirmation_code,
 907            })
 908        })
 909        .await
 910    }
 911
 912    pub async fn create_user_from_invite(
 913        &self,
 914        invite: &Invite,
 915        user: NewUserParams,
 916    ) -> Result<Option<NewUserResult>> {
 917        self.transaction(|tx| async {
 918            let tx = tx;
 919            let signup = signup::Entity::find()
 920                .filter(
 921                    signup::Column::EmailAddress
 922                        .eq(invite.email_address.as_str())
 923                        .and(
 924                            signup::Column::EmailConfirmationCode
 925                                .eq(invite.email_confirmation_code.as_str()),
 926                        ),
 927                )
 928                .one(&*tx)
 929                .await?
 930                .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
 931
 932            if signup.user_id.is_some() {
 933                return Ok(None);
 934            }
 935
 936            let user = user::Entity::insert(user::ActiveModel {
 937                email_address: ActiveValue::set(Some(invite.email_address.clone())),
 938                github_login: ActiveValue::set(user.github_login.clone()),
 939                github_user_id: ActiveValue::set(Some(user.github_user_id)),
 940                admin: ActiveValue::set(false),
 941                invite_count: ActiveValue::set(user.invite_count),
 942                invite_code: ActiveValue::set(Some(random_invite_code())),
 943                metrics_id: ActiveValue::set(Uuid::new_v4()),
 944                ..Default::default()
 945            })
 946            .on_conflict(
 947                OnConflict::column(user::Column::GithubLogin)
 948                    .update_columns([
 949                        user::Column::EmailAddress,
 950                        user::Column::GithubUserId,
 951                        user::Column::Admin,
 952                    ])
 953                    .to_owned(),
 954            )
 955            .exec_with_returning(&*tx)
 956            .await?;
 957
 958            let mut signup = signup.into_active_model();
 959            signup.user_id = ActiveValue::set(Some(user.id));
 960            let signup = signup.update(&*tx).await?;
 961
 962            if let Some(inviting_user_id) = signup.inviting_user_id {
 963                let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
 964                    (inviting_user_id, user.id, true)
 965                } else {
 966                    (user.id, inviting_user_id, false)
 967                };
 968
 969                contact::Entity::insert(contact::ActiveModel {
 970                    user_id_a: ActiveValue::set(user_id_a),
 971                    user_id_b: ActiveValue::set(user_id_b),
 972                    a_to_b: ActiveValue::set(a_to_b),
 973                    should_notify: ActiveValue::set(true),
 974                    accepted: ActiveValue::set(true),
 975                    ..Default::default()
 976                })
 977                .on_conflict(OnConflict::new().do_nothing().to_owned())
 978                .exec_without_returning(&*tx)
 979                .await?;
 980            }
 981
 982            Ok(Some(NewUserResult {
 983                user_id: user.id,
 984                metrics_id: user.metrics_id.to_string(),
 985                inviting_user_id: signup.inviting_user_id,
 986                signup_device_id: signup.device_id,
 987            }))
 988        })
 989        .await
 990    }
 991
 992    pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
 993        self.transaction(|tx| async move {
 994            if count > 0 {
 995                user::Entity::update_many()
 996                    .filter(
 997                        user::Column::Id
 998                            .eq(id)
 999                            .and(user::Column::InviteCode.is_null()),
1000                    )
1001                    .set(user::ActiveModel {
1002                        invite_code: ActiveValue::set(Some(random_invite_code())),
1003                        ..Default::default()
1004                    })
1005                    .exec(&*tx)
1006                    .await?;
1007            }
1008
1009            user::Entity::update_many()
1010                .filter(user::Column::Id.eq(id))
1011                .set(user::ActiveModel {
1012                    invite_count: ActiveValue::set(count),
1013                    ..Default::default()
1014                })
1015                .exec(&*tx)
1016                .await?;
1017            Ok(())
1018        })
1019        .await
1020    }
1021
1022    pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1023        self.transaction(|tx| async move {
1024            match user::Entity::find_by_id(id).one(&*tx).await? {
1025                Some(user) if user.invite_code.is_some() => {
1026                    Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1027                }
1028                _ => Ok(None),
1029            }
1030        })
1031        .await
1032    }
1033
1034    pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1035        self.transaction(|tx| async move {
1036            user::Entity::find()
1037                .filter(user::Column::InviteCode.eq(code))
1038                .one(&*tx)
1039                .await?
1040                .ok_or_else(|| {
1041                    Error::Http(
1042                        StatusCode::NOT_FOUND,
1043                        "that invite code does not exist".to_string(),
1044                    )
1045                })
1046        })
1047        .await
1048    }
1049
1050    // rooms
1051
1052    pub async fn incoming_call_for_user(
1053        &self,
1054        user_id: UserId,
1055    ) -> Result<Option<proto::IncomingCall>> {
1056        self.transaction(|tx| async move {
1057            let pending_participant = room_participant::Entity::find()
1058                .filter(
1059                    room_participant::Column::UserId
1060                        .eq(user_id)
1061                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1062                )
1063                .one(&*tx)
1064                .await?;
1065
1066            if let Some(pending_participant) = pending_participant {
1067                let room = self.get_room(pending_participant.room_id, &tx).await?;
1068                Ok(Self::build_incoming_call(&room, user_id))
1069            } else {
1070                Ok(None)
1071            }
1072        })
1073        .await
1074    }
1075
1076    pub async fn create_room(
1077        &self,
1078        user_id: UserId,
1079        connection_id: ConnectionId,
1080        live_kit_room: &str,
1081    ) -> Result<RoomGuard<proto::Room>> {
1082        self.room_transaction(|tx| async move {
1083            let room = room::ActiveModel {
1084                live_kit_room: ActiveValue::set(live_kit_room.into()),
1085                ..Default::default()
1086            }
1087            .insert(&*tx)
1088            .await?;
1089            let room_id = room.id;
1090
1091            room_participant::ActiveModel {
1092                room_id: ActiveValue::set(room_id),
1093                user_id: ActiveValue::set(user_id),
1094                answering_connection_id: ActiveValue::set(Some(connection_id.0 as i32)),
1095                answering_connection_epoch: ActiveValue::set(Some(self.epoch())),
1096                answering_connection_lost: ActiveValue::set(false),
1097                calling_user_id: ActiveValue::set(user_id),
1098                calling_connection_id: ActiveValue::set(connection_id.0 as i32),
1099                calling_connection_epoch: ActiveValue::set(self.epoch()),
1100                ..Default::default()
1101            }
1102            .insert(&*tx)
1103            .await?;
1104
1105            let room = self.get_room(room_id, &tx).await?;
1106            Ok((room_id, room))
1107        })
1108        .await
1109    }
1110
1111    pub async fn call(
1112        &self,
1113        room_id: RoomId,
1114        calling_user_id: UserId,
1115        calling_connection_id: ConnectionId,
1116        called_user_id: UserId,
1117        initial_project_id: Option<ProjectId>,
1118    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1119        self.room_transaction(|tx| async move {
1120            room_participant::ActiveModel {
1121                room_id: ActiveValue::set(room_id),
1122                user_id: ActiveValue::set(called_user_id),
1123                answering_connection_lost: ActiveValue::set(false),
1124                calling_user_id: ActiveValue::set(calling_user_id),
1125                calling_connection_id: ActiveValue::set(calling_connection_id.0 as i32),
1126                calling_connection_epoch: ActiveValue::set(self.epoch()),
1127                initial_project_id: ActiveValue::set(initial_project_id),
1128                ..Default::default()
1129            }
1130            .insert(&*tx)
1131            .await?;
1132
1133            let room = self.get_room(room_id, &tx).await?;
1134            let incoming_call = Self::build_incoming_call(&room, called_user_id)
1135                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1136            Ok((room_id, (room, incoming_call)))
1137        })
1138        .await
1139    }
1140
1141    pub async fn call_failed(
1142        &self,
1143        room_id: RoomId,
1144        called_user_id: UserId,
1145    ) -> Result<RoomGuard<proto::Room>> {
1146        self.room_transaction(|tx| async move {
1147            room_participant::Entity::delete_many()
1148                .filter(
1149                    room_participant::Column::RoomId
1150                        .eq(room_id)
1151                        .and(room_participant::Column::UserId.eq(called_user_id)),
1152                )
1153                .exec(&*tx)
1154                .await?;
1155            let room = self.get_room(room_id, &tx).await?;
1156            Ok((room_id, room))
1157        })
1158        .await
1159    }
1160
1161    pub async fn decline_call(
1162        &self,
1163        expected_room_id: Option<RoomId>,
1164        user_id: UserId,
1165    ) -> Result<RoomGuard<proto::Room>> {
1166        self.room_transaction(|tx| async move {
1167            let participant = room_participant::Entity::find()
1168                .filter(
1169                    room_participant::Column::UserId
1170                        .eq(user_id)
1171                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1172                )
1173                .one(&*tx)
1174                .await?
1175                .ok_or_else(|| anyhow!("could not decline call"))?;
1176            let room_id = participant.room_id;
1177
1178            if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) {
1179                return Err(anyhow!("declining call on unexpected room"))?;
1180            }
1181
1182            room_participant::Entity::delete(participant.into_active_model())
1183                .exec(&*tx)
1184                .await?;
1185
1186            let room = self.get_room(room_id, &tx).await?;
1187            Ok((room_id, room))
1188        })
1189        .await
1190    }
1191
1192    pub async fn cancel_call(
1193        &self,
1194        expected_room_id: Option<RoomId>,
1195        calling_connection_id: ConnectionId,
1196        called_user_id: UserId,
1197    ) -> Result<RoomGuard<proto::Room>> {
1198        self.room_transaction(|tx| async move {
1199            let participant = room_participant::Entity::find()
1200                .filter(
1201                    room_participant::Column::UserId
1202                        .eq(called_user_id)
1203                        .and(
1204                            room_participant::Column::CallingConnectionId
1205                                .eq(calling_connection_id.0 as i32),
1206                        )
1207                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1208                )
1209                .one(&*tx)
1210                .await?
1211                .ok_or_else(|| anyhow!("could not cancel call"))?;
1212            let room_id = participant.room_id;
1213            if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) {
1214                return Err(anyhow!("canceling call on unexpected room"))?;
1215            }
1216
1217            room_participant::Entity::delete(participant.into_active_model())
1218                .exec(&*tx)
1219                .await?;
1220
1221            let room = self.get_room(room_id, &tx).await?;
1222            Ok((room_id, room))
1223        })
1224        .await
1225    }
1226
1227    pub async fn join_room(
1228        &self,
1229        room_id: RoomId,
1230        user_id: UserId,
1231        connection_id: ConnectionId,
1232    ) -> Result<RoomGuard<proto::Room>> {
1233        self.room_transaction(|tx| async move {
1234            let result = room_participant::Entity::update_many()
1235                .filter(
1236                    Condition::all()
1237                        .add(room_participant::Column::RoomId.eq(room_id))
1238                        .add(room_participant::Column::UserId.eq(user_id))
1239                        .add(
1240                            Condition::any()
1241                                .add(room_participant::Column::AnsweringConnectionId.is_null())
1242                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1243                                .add(
1244                                    room_participant::Column::AnsweringConnectionEpoch
1245                                        .ne(self.epoch()),
1246                                ),
1247                        ),
1248                )
1249                .set(room_participant::ActiveModel {
1250                    answering_connection_id: ActiveValue::set(Some(connection_id.0 as i32)),
1251                    answering_connection_epoch: ActiveValue::set(Some(self.epoch())),
1252                    answering_connection_lost: ActiveValue::set(false),
1253                    ..Default::default()
1254                })
1255                .exec(&*tx)
1256                .await?;
1257            if result.rows_affected == 0 {
1258                Err(anyhow!("room does not exist or was already joined"))?
1259            } else {
1260                let room = self.get_room(room_id, &tx).await?;
1261                Ok((room_id, room))
1262            }
1263        })
1264        .await
1265    }
1266
1267    pub async fn leave_room(&self, connection_id: ConnectionId) -> Result<RoomGuard<LeftRoom>> {
1268        self.room_transaction(|tx| async move {
1269            let leaving_participant = room_participant::Entity::find()
1270                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32))
1271                .one(&*tx)
1272                .await?;
1273
1274            if let Some(leaving_participant) = leaving_participant {
1275                // Leave room.
1276                let room_id = leaving_participant.room_id;
1277                room_participant::Entity::delete_by_id(leaving_participant.id)
1278                    .exec(&*tx)
1279                    .await?;
1280
1281                // Cancel pending calls initiated by the leaving user.
1282                let called_participants = room_participant::Entity::find()
1283                    .filter(
1284                        room_participant::Column::CallingConnectionId
1285                            .eq(connection_id.0)
1286                            .and(room_participant::Column::AnsweringConnectionId.is_null()),
1287                    )
1288                    .all(&*tx)
1289                    .await?;
1290                room_participant::Entity::delete_many()
1291                    .filter(
1292                        room_participant::Column::Id
1293                            .is_in(called_participants.iter().map(|participant| participant.id)),
1294                    )
1295                    .exec(&*tx)
1296                    .await?;
1297                let canceled_calls_to_user_ids = called_participants
1298                    .into_iter()
1299                    .map(|participant| participant.user_id)
1300                    .collect();
1301
1302                // Detect left projects.
1303                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1304                enum QueryProjectIds {
1305                    ProjectId,
1306                }
1307                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1308                    .select_only()
1309                    .column_as(
1310                        project_collaborator::Column::ProjectId,
1311                        QueryProjectIds::ProjectId,
1312                    )
1313                    .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32))
1314                    .into_values::<_, QueryProjectIds>()
1315                    .all(&*tx)
1316                    .await?;
1317                let mut left_projects = HashMap::default();
1318                let mut collaborators = project_collaborator::Entity::find()
1319                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1320                    .stream(&*tx)
1321                    .await?;
1322                while let Some(collaborator) = collaborators.next().await {
1323                    let collaborator = collaborator?;
1324                    let left_project =
1325                        left_projects
1326                            .entry(collaborator.project_id)
1327                            .or_insert(LeftProject {
1328                                id: collaborator.project_id,
1329                                host_user_id: Default::default(),
1330                                connection_ids: Default::default(),
1331                                host_connection_id: Default::default(),
1332                            });
1333
1334                    let collaborator_connection_id =
1335                        ConnectionId(collaborator.connection_id as u32);
1336                    if collaborator_connection_id != connection_id {
1337                        left_project.connection_ids.push(collaborator_connection_id);
1338                    }
1339
1340                    if collaborator.is_host {
1341                        left_project.host_user_id = collaborator.user_id;
1342                        left_project.host_connection_id =
1343                            ConnectionId(collaborator.connection_id as u32);
1344                    }
1345                }
1346                drop(collaborators);
1347
1348                // Leave projects.
1349                project_collaborator::Entity::delete_many()
1350                    .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32))
1351                    .exec(&*tx)
1352                    .await?;
1353
1354                // Unshare projects.
1355                project::Entity::delete_many()
1356                    .filter(
1357                        project::Column::RoomId
1358                            .eq(room_id)
1359                            .and(project::Column::HostConnectionId.eq(connection_id.0 as i32)),
1360                    )
1361                    .exec(&*tx)
1362                    .await?;
1363
1364                let room = self.get_room(room_id, &tx).await?;
1365                if room.participants.is_empty() {
1366                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
1367                }
1368
1369                let left_room = LeftRoom {
1370                    room,
1371                    left_projects,
1372                    canceled_calls_to_user_ids,
1373                };
1374
1375                if left_room.room.participants.is_empty() {
1376                    self.rooms.remove(&room_id);
1377                }
1378
1379                Ok((room_id, left_room))
1380            } else {
1381                Err(anyhow!("could not leave room"))?
1382            }
1383        })
1384        .await
1385    }
1386
1387    pub async fn update_room_participant_location(
1388        &self,
1389        room_id: RoomId,
1390        connection_id: ConnectionId,
1391        location: proto::ParticipantLocation,
1392    ) -> Result<RoomGuard<proto::Room>> {
1393        self.room_transaction(|tx| async {
1394            let tx = tx;
1395            let location_kind;
1396            let location_project_id;
1397            match location
1398                .variant
1399                .as_ref()
1400                .ok_or_else(|| anyhow!("invalid location"))?
1401            {
1402                proto::participant_location::Variant::SharedProject(project) => {
1403                    location_kind = 0;
1404                    location_project_id = Some(ProjectId::from_proto(project.id));
1405                }
1406                proto::participant_location::Variant::UnsharedProject(_) => {
1407                    location_kind = 1;
1408                    location_project_id = None;
1409                }
1410                proto::participant_location::Variant::External(_) => {
1411                    location_kind = 2;
1412                    location_project_id = None;
1413                }
1414            }
1415
1416            let result = room_participant::Entity::update_many()
1417                .filter(room_participant::Column::RoomId.eq(room_id).and(
1418                    room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32),
1419                ))
1420                .set(room_participant::ActiveModel {
1421                    location_kind: ActiveValue::set(Some(location_kind)),
1422                    location_project_id: ActiveValue::set(location_project_id),
1423                    ..Default::default()
1424                })
1425                .exec(&*tx)
1426                .await?;
1427
1428            if result.rows_affected == 1 {
1429                let room = self.get_room(room_id, &tx).await?;
1430                Ok((room_id, room))
1431            } else {
1432                Err(anyhow!("could not update room participant location"))?
1433            }
1434        })
1435        .await
1436    }
1437
1438    pub async fn connection_lost(
1439        &self,
1440        connection_id: ConnectionId,
1441    ) -> Result<RoomGuard<Vec<LeftProject>>> {
1442        self.room_transaction(|tx| async move {
1443            let participant = room_participant::Entity::find()
1444                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32))
1445                .one(&*tx)
1446                .await?
1447                .ok_or_else(|| anyhow!("not a participant in any room"))?;
1448            let room_id = participant.room_id;
1449
1450            room_participant::Entity::update(room_participant::ActiveModel {
1451                answering_connection_lost: ActiveValue::set(true),
1452                ..participant.into_active_model()
1453            })
1454            .exec(&*tx)
1455            .await?;
1456
1457            let collaborator_on_projects = project_collaborator::Entity::find()
1458                .find_also_related(project::Entity)
1459                .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32))
1460                .all(&*tx)
1461                .await?;
1462            project_collaborator::Entity::delete_many()
1463                .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32))
1464                .exec(&*tx)
1465                .await?;
1466
1467            let mut left_projects = Vec::new();
1468            for (_, project) in collaborator_on_projects {
1469                if let Some(project) = project {
1470                    let collaborators = project
1471                        .find_related(project_collaborator::Entity)
1472                        .all(&*tx)
1473                        .await?;
1474                    let connection_ids = collaborators
1475                        .into_iter()
1476                        .map(|collaborator| ConnectionId(collaborator.connection_id as u32))
1477                        .collect();
1478
1479                    left_projects.push(LeftProject {
1480                        id: project.id,
1481                        host_user_id: project.host_user_id,
1482                        host_connection_id: ConnectionId(project.host_connection_id as u32),
1483                        connection_ids,
1484                    });
1485                }
1486            }
1487
1488            project::Entity::delete_many()
1489                .filter(project::Column::HostConnectionId.eq(connection_id.0 as i32))
1490                .exec(&*tx)
1491                .await?;
1492
1493            Ok((room_id, left_projects))
1494        })
1495        .await
1496    }
1497
1498    fn build_incoming_call(
1499        room: &proto::Room,
1500        called_user_id: UserId,
1501    ) -> Option<proto::IncomingCall> {
1502        let pending_participant = room
1503            .pending_participants
1504            .iter()
1505            .find(|participant| participant.user_id == called_user_id.to_proto())?;
1506
1507        Some(proto::IncomingCall {
1508            room_id: room.id,
1509            calling_user_id: pending_participant.calling_user_id,
1510            participant_user_ids: room
1511                .participants
1512                .iter()
1513                .map(|participant| participant.user_id)
1514                .collect(),
1515            initial_project: room.participants.iter().find_map(|participant| {
1516                let initial_project_id = pending_participant.initial_project_id?;
1517                participant
1518                    .projects
1519                    .iter()
1520                    .find(|project| project.id == initial_project_id)
1521                    .cloned()
1522            }),
1523        })
1524    }
1525
1526    async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1527        let db_room = room::Entity::find_by_id(room_id)
1528            .one(tx)
1529            .await?
1530            .ok_or_else(|| anyhow!("could not find room"))?;
1531
1532        let mut db_participants = db_room
1533            .find_related(room_participant::Entity)
1534            .stream(tx)
1535            .await?;
1536        let mut participants = HashMap::default();
1537        let mut pending_participants = Vec::new();
1538        while let Some(db_participant) = db_participants.next().await {
1539            let db_participant = db_participant?;
1540            if let Some(answering_connection_id) = db_participant.answering_connection_id {
1541                let location = match (
1542                    db_participant.location_kind,
1543                    db_participant.location_project_id,
1544                ) {
1545                    (Some(0), Some(project_id)) => {
1546                        Some(proto::participant_location::Variant::SharedProject(
1547                            proto::participant_location::SharedProject {
1548                                id: project_id.to_proto(),
1549                            },
1550                        ))
1551                    }
1552                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1553                        Default::default(),
1554                    )),
1555                    _ => Some(proto::participant_location::Variant::External(
1556                        Default::default(),
1557                    )),
1558                };
1559                participants.insert(
1560                    answering_connection_id,
1561                    proto::Participant {
1562                        user_id: db_participant.user_id.to_proto(),
1563                        peer_id: answering_connection_id as u32,
1564                        projects: Default::default(),
1565                        location: Some(proto::ParticipantLocation { variant: location }),
1566                    },
1567                );
1568            } else {
1569                pending_participants.push(proto::PendingParticipant {
1570                    user_id: db_participant.user_id.to_proto(),
1571                    calling_user_id: db_participant.calling_user_id.to_proto(),
1572                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1573                });
1574            }
1575        }
1576        drop(db_participants);
1577
1578        let mut db_projects = db_room
1579            .find_related(project::Entity)
1580            .find_with_related(worktree::Entity)
1581            .stream(tx)
1582            .await?;
1583
1584        while let Some(row) = db_projects.next().await {
1585            let (db_project, db_worktree) = row?;
1586            if let Some(participant) = participants.get_mut(&db_project.host_connection_id) {
1587                let project = if let Some(project) = participant
1588                    .projects
1589                    .iter_mut()
1590                    .find(|project| project.id == db_project.id.to_proto())
1591                {
1592                    project
1593                } else {
1594                    participant.projects.push(proto::ParticipantProject {
1595                        id: db_project.id.to_proto(),
1596                        worktree_root_names: Default::default(),
1597                    });
1598                    participant.projects.last_mut().unwrap()
1599                };
1600
1601                if let Some(db_worktree) = db_worktree {
1602                    project.worktree_root_names.push(db_worktree.root_name);
1603                }
1604            }
1605        }
1606
1607        Ok(proto::Room {
1608            id: db_room.id.to_proto(),
1609            live_kit_room: db_room.live_kit_room,
1610            participants: participants.into_values().collect(),
1611            pending_participants,
1612        })
1613    }
1614
1615    // projects
1616
1617    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
1618        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1619        enum QueryAs {
1620            Count,
1621        }
1622
1623        self.transaction(|tx| async move {
1624            Ok(project::Entity::find()
1625                .select_only()
1626                .column_as(project::Column::Id.count(), QueryAs::Count)
1627                .inner_join(user::Entity)
1628                .filter(user::Column::Admin.eq(false))
1629                .into_values::<_, QueryAs>()
1630                .one(&*tx)
1631                .await?
1632                .unwrap_or(0i64) as usize)
1633        })
1634        .await
1635    }
1636
1637    pub async fn share_project(
1638        &self,
1639        room_id: RoomId,
1640        connection_id: ConnectionId,
1641        worktrees: &[proto::WorktreeMetadata],
1642    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
1643        self.room_transaction(|tx| async move {
1644            let participant = room_participant::Entity::find()
1645                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32))
1646                .one(&*tx)
1647                .await?
1648                .ok_or_else(|| anyhow!("could not find participant"))?;
1649            if participant.room_id != room_id {
1650                return Err(anyhow!("shared project on unexpected room"))?;
1651            }
1652
1653            let project = project::ActiveModel {
1654                room_id: ActiveValue::set(participant.room_id),
1655                host_user_id: ActiveValue::set(participant.user_id),
1656                host_connection_id: ActiveValue::set(connection_id.0 as i32),
1657                host_connection_epoch: ActiveValue::set(self.epoch()),
1658                ..Default::default()
1659            }
1660            .insert(&*tx)
1661            .await?;
1662
1663            if !worktrees.is_empty() {
1664                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
1665                    worktree::ActiveModel {
1666                        id: ActiveValue::set(worktree.id as i64),
1667                        project_id: ActiveValue::set(project.id),
1668                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
1669                        root_name: ActiveValue::set(worktree.root_name.clone()),
1670                        visible: ActiveValue::set(worktree.visible),
1671                        scan_id: ActiveValue::set(0),
1672                        is_complete: ActiveValue::set(false),
1673                    }
1674                }))
1675                .exec(&*tx)
1676                .await?;
1677            }
1678
1679            project_collaborator::ActiveModel {
1680                project_id: ActiveValue::set(project.id),
1681                connection_id: ActiveValue::set(connection_id.0 as i32),
1682                connection_epoch: ActiveValue::set(self.epoch()),
1683                user_id: ActiveValue::set(participant.user_id),
1684                replica_id: ActiveValue::set(ReplicaId(0)),
1685                is_host: ActiveValue::set(true),
1686                ..Default::default()
1687            }
1688            .insert(&*tx)
1689            .await?;
1690
1691            let room = self.get_room(room_id, &tx).await?;
1692            Ok((room_id, (project.id, room)))
1693        })
1694        .await
1695    }
1696
1697    pub async fn unshare_project(
1698        &self,
1699        project_id: ProjectId,
1700        connection_id: ConnectionId,
1701    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
1702        self.room_transaction(|tx| async move {
1703            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
1704
1705            let project = project::Entity::find_by_id(project_id)
1706                .one(&*tx)
1707                .await?
1708                .ok_or_else(|| anyhow!("project not found"))?;
1709            if project.host_connection_id == connection_id.0 as i32 {
1710                let room_id = project.room_id;
1711                project::Entity::delete(project.into_active_model())
1712                    .exec(&*tx)
1713                    .await?;
1714                let room = self.get_room(room_id, &tx).await?;
1715                Ok((room_id, (room, guest_connection_ids)))
1716            } else {
1717                Err(anyhow!("cannot unshare a project hosted by another user"))?
1718            }
1719        })
1720        .await
1721    }
1722
1723    pub async fn update_project(
1724        &self,
1725        project_id: ProjectId,
1726        connection_id: ConnectionId,
1727        worktrees: &[proto::WorktreeMetadata],
1728    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
1729        self.room_transaction(|tx| async move {
1730            let project = project::Entity::find_by_id(project_id)
1731                .filter(project::Column::HostConnectionId.eq(connection_id.0 as i32))
1732                .one(&*tx)
1733                .await?
1734                .ok_or_else(|| anyhow!("no such project"))?;
1735
1736            if !worktrees.is_empty() {
1737                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
1738                    worktree::ActiveModel {
1739                        id: ActiveValue::set(worktree.id as i64),
1740                        project_id: ActiveValue::set(project.id),
1741                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
1742                        root_name: ActiveValue::set(worktree.root_name.clone()),
1743                        visible: ActiveValue::set(worktree.visible),
1744                        scan_id: ActiveValue::set(0),
1745                        is_complete: ActiveValue::set(false),
1746                    }
1747                }))
1748                .on_conflict(
1749                    OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
1750                        .update_column(worktree::Column::RootName)
1751                        .to_owned(),
1752                )
1753                .exec(&*tx)
1754                .await?;
1755            }
1756
1757            worktree::Entity::delete_many()
1758                .filter(
1759                    worktree::Column::ProjectId.eq(project.id).and(
1760                        worktree::Column::Id
1761                            .is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
1762                    ),
1763                )
1764                .exec(&*tx)
1765                .await?;
1766
1767            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
1768            let room = self.get_room(project.room_id, &tx).await?;
1769            Ok((project.room_id, (room, guest_connection_ids)))
1770        })
1771        .await
1772    }
1773
1774    pub async fn update_worktree(
1775        &self,
1776        update: &proto::UpdateWorktree,
1777        connection_id: ConnectionId,
1778    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
1779        self.room_transaction(|tx| async move {
1780            let project_id = ProjectId::from_proto(update.project_id);
1781            let worktree_id = update.worktree_id as i64;
1782
1783            // Ensure the update comes from the host.
1784            let project = project::Entity::find_by_id(project_id)
1785                .filter(project::Column::HostConnectionId.eq(connection_id.0 as i32))
1786                .one(&*tx)
1787                .await?
1788                .ok_or_else(|| anyhow!("no such project"))?;
1789            let room_id = project.room_id;
1790
1791            // Update metadata.
1792            worktree::Entity::update(worktree::ActiveModel {
1793                id: ActiveValue::set(worktree_id),
1794                project_id: ActiveValue::set(project_id),
1795                root_name: ActiveValue::set(update.root_name.clone()),
1796                scan_id: ActiveValue::set(update.scan_id as i64),
1797                is_complete: ActiveValue::set(update.is_last_update),
1798                abs_path: ActiveValue::set(update.abs_path.clone()),
1799                ..Default::default()
1800            })
1801            .exec(&*tx)
1802            .await?;
1803
1804            if !update.updated_entries.is_empty() {
1805                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
1806                    let mtime = entry.mtime.clone().unwrap_or_default();
1807                    worktree_entry::ActiveModel {
1808                        project_id: ActiveValue::set(project_id),
1809                        worktree_id: ActiveValue::set(worktree_id),
1810                        id: ActiveValue::set(entry.id as i64),
1811                        is_dir: ActiveValue::set(entry.is_dir),
1812                        path: ActiveValue::set(entry.path.clone()),
1813                        inode: ActiveValue::set(entry.inode as i64),
1814                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
1815                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
1816                        is_symlink: ActiveValue::set(entry.is_symlink),
1817                        is_ignored: ActiveValue::set(entry.is_ignored),
1818                    }
1819                }))
1820                .on_conflict(
1821                    OnConflict::columns([
1822                        worktree_entry::Column::ProjectId,
1823                        worktree_entry::Column::WorktreeId,
1824                        worktree_entry::Column::Id,
1825                    ])
1826                    .update_columns([
1827                        worktree_entry::Column::IsDir,
1828                        worktree_entry::Column::Path,
1829                        worktree_entry::Column::Inode,
1830                        worktree_entry::Column::MtimeSeconds,
1831                        worktree_entry::Column::MtimeNanos,
1832                        worktree_entry::Column::IsSymlink,
1833                        worktree_entry::Column::IsIgnored,
1834                    ])
1835                    .to_owned(),
1836                )
1837                .exec(&*tx)
1838                .await?;
1839            }
1840
1841            if !update.removed_entries.is_empty() {
1842                worktree_entry::Entity::delete_many()
1843                    .filter(
1844                        worktree_entry::Column::ProjectId
1845                            .eq(project_id)
1846                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
1847                            .and(
1848                                worktree_entry::Column::Id
1849                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
1850                            ),
1851                    )
1852                    .exec(&*tx)
1853                    .await?;
1854            }
1855
1856            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
1857            Ok((room_id, connection_ids))
1858        })
1859        .await
1860    }
1861
1862    pub async fn update_diagnostic_summary(
1863        &self,
1864        update: &proto::UpdateDiagnosticSummary,
1865        connection_id: ConnectionId,
1866    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
1867        self.room_transaction(|tx| async move {
1868            let project_id = ProjectId::from_proto(update.project_id);
1869            let worktree_id = update.worktree_id as i64;
1870            let summary = update
1871                .summary
1872                .as_ref()
1873                .ok_or_else(|| anyhow!("invalid summary"))?;
1874
1875            // Ensure the update comes from the host.
1876            let project = project::Entity::find_by_id(project_id)
1877                .one(&*tx)
1878                .await?
1879                .ok_or_else(|| anyhow!("no such project"))?;
1880            if project.host_connection_id != connection_id.0 as i32 {
1881                return Err(anyhow!("can't update a project hosted by someone else"))?;
1882            }
1883
1884            // Update summary.
1885            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
1886                project_id: ActiveValue::set(project_id),
1887                worktree_id: ActiveValue::set(worktree_id),
1888                path: ActiveValue::set(summary.path.clone()),
1889                language_server_id: ActiveValue::set(summary.language_server_id as i64),
1890                error_count: ActiveValue::set(summary.error_count as i32),
1891                warning_count: ActiveValue::set(summary.warning_count as i32),
1892                ..Default::default()
1893            })
1894            .on_conflict(
1895                OnConflict::columns([
1896                    worktree_diagnostic_summary::Column::ProjectId,
1897                    worktree_diagnostic_summary::Column::WorktreeId,
1898                    worktree_diagnostic_summary::Column::Path,
1899                ])
1900                .update_columns([
1901                    worktree_diagnostic_summary::Column::LanguageServerId,
1902                    worktree_diagnostic_summary::Column::ErrorCount,
1903                    worktree_diagnostic_summary::Column::WarningCount,
1904                ])
1905                .to_owned(),
1906            )
1907            .exec(&*tx)
1908            .await?;
1909
1910            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
1911            Ok((project.room_id, connection_ids))
1912        })
1913        .await
1914    }
1915
1916    pub async fn start_language_server(
1917        &self,
1918        update: &proto::StartLanguageServer,
1919        connection_id: ConnectionId,
1920    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
1921        self.room_transaction(|tx| async move {
1922            let project_id = ProjectId::from_proto(update.project_id);
1923            let server = update
1924                .server
1925                .as_ref()
1926                .ok_or_else(|| anyhow!("invalid language server"))?;
1927
1928            // Ensure the update comes from the host.
1929            let project = project::Entity::find_by_id(project_id)
1930                .one(&*tx)
1931                .await?
1932                .ok_or_else(|| anyhow!("no such project"))?;
1933            if project.host_connection_id != connection_id.0 as i32 {
1934                return Err(anyhow!("can't update a project hosted by someone else"))?;
1935            }
1936
1937            // Add the newly-started language server.
1938            language_server::Entity::insert(language_server::ActiveModel {
1939                project_id: ActiveValue::set(project_id),
1940                id: ActiveValue::set(server.id as i64),
1941                name: ActiveValue::set(server.name.clone()),
1942                ..Default::default()
1943            })
1944            .on_conflict(
1945                OnConflict::columns([
1946                    language_server::Column::ProjectId,
1947                    language_server::Column::Id,
1948                ])
1949                .update_column(language_server::Column::Name)
1950                .to_owned(),
1951            )
1952            .exec(&*tx)
1953            .await?;
1954
1955            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
1956            Ok((project.room_id, connection_ids))
1957        })
1958        .await
1959    }
1960
1961    pub async fn join_project(
1962        &self,
1963        project_id: ProjectId,
1964        connection_id: ConnectionId,
1965    ) -> Result<RoomGuard<(Project, ReplicaId)>> {
1966        self.room_transaction(|tx| async move {
1967            let participant = room_participant::Entity::find()
1968                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32))
1969                .one(&*tx)
1970                .await?
1971                .ok_or_else(|| anyhow!("must join a room first"))?;
1972
1973            let project = project::Entity::find_by_id(project_id)
1974                .one(&*tx)
1975                .await?
1976                .ok_or_else(|| anyhow!("no such project"))?;
1977            if project.room_id != participant.room_id {
1978                return Err(anyhow!("no such project"))?;
1979            }
1980
1981            let mut collaborators = project
1982                .find_related(project_collaborator::Entity)
1983                .all(&*tx)
1984                .await?;
1985            let replica_ids = collaborators
1986                .iter()
1987                .map(|c| c.replica_id)
1988                .collect::<HashSet<_>>();
1989            let mut replica_id = ReplicaId(1);
1990            while replica_ids.contains(&replica_id) {
1991                replica_id.0 += 1;
1992            }
1993            let new_collaborator = project_collaborator::ActiveModel {
1994                project_id: ActiveValue::set(project_id),
1995                connection_id: ActiveValue::set(connection_id.0 as i32),
1996                connection_epoch: ActiveValue::set(self.epoch()),
1997                user_id: ActiveValue::set(participant.user_id),
1998                replica_id: ActiveValue::set(replica_id),
1999                is_host: ActiveValue::set(false),
2000                ..Default::default()
2001            }
2002            .insert(&*tx)
2003            .await?;
2004            collaborators.push(new_collaborator);
2005
2006            let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2007            let mut worktrees = db_worktrees
2008                .into_iter()
2009                .map(|db_worktree| {
2010                    (
2011                        db_worktree.id as u64,
2012                        Worktree {
2013                            id: db_worktree.id as u64,
2014                            abs_path: db_worktree.abs_path,
2015                            root_name: db_worktree.root_name,
2016                            visible: db_worktree.visible,
2017                            entries: Default::default(),
2018                            diagnostic_summaries: Default::default(),
2019                            scan_id: db_worktree.scan_id as u64,
2020                            is_complete: db_worktree.is_complete,
2021                        },
2022                    )
2023                })
2024                .collect::<BTreeMap<_, _>>();
2025
2026            // Populate worktree entries.
2027            {
2028                let mut db_entries = worktree_entry::Entity::find()
2029                    .filter(worktree_entry::Column::ProjectId.eq(project_id))
2030                    .stream(&*tx)
2031                    .await?;
2032                while let Some(db_entry) = db_entries.next().await {
2033                    let db_entry = db_entry?;
2034                    if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2035                        worktree.entries.push(proto::Entry {
2036                            id: db_entry.id as u64,
2037                            is_dir: db_entry.is_dir,
2038                            path: db_entry.path,
2039                            inode: db_entry.inode as u64,
2040                            mtime: Some(proto::Timestamp {
2041                                seconds: db_entry.mtime_seconds as u64,
2042                                nanos: db_entry.mtime_nanos as u32,
2043                            }),
2044                            is_symlink: db_entry.is_symlink,
2045                            is_ignored: db_entry.is_ignored,
2046                        });
2047                    }
2048                }
2049            }
2050
2051            // Populate worktree diagnostic summaries.
2052            {
2053                let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2054                    .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2055                    .stream(&*tx)
2056                    .await?;
2057                while let Some(db_summary) = db_summaries.next().await {
2058                    let db_summary = db_summary?;
2059                    if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2060                        worktree
2061                            .diagnostic_summaries
2062                            .push(proto::DiagnosticSummary {
2063                                path: db_summary.path,
2064                                language_server_id: db_summary.language_server_id as u64,
2065                                error_count: db_summary.error_count as u32,
2066                                warning_count: db_summary.warning_count as u32,
2067                            });
2068                    }
2069                }
2070            }
2071
2072            // Populate language servers.
2073            let language_servers = project
2074                .find_related(language_server::Entity)
2075                .all(&*tx)
2076                .await?;
2077
2078            let room_id = project.room_id;
2079            let project = Project {
2080                collaborators,
2081                worktrees,
2082                language_servers: language_servers
2083                    .into_iter()
2084                    .map(|language_server| proto::LanguageServer {
2085                        id: language_server.id as u64,
2086                        name: language_server.name,
2087                    })
2088                    .collect(),
2089            };
2090            Ok((room_id, (project, replica_id as ReplicaId)))
2091        })
2092        .await
2093    }
2094
2095    pub async fn leave_project(
2096        &self,
2097        project_id: ProjectId,
2098        connection_id: ConnectionId,
2099    ) -> Result<RoomGuard<LeftProject>> {
2100        self.room_transaction(|tx| async move {
2101            let result = project_collaborator::Entity::delete_many()
2102                .filter(
2103                    project_collaborator::Column::ProjectId
2104                        .eq(project_id)
2105                        .and(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32)),
2106                )
2107                .exec(&*tx)
2108                .await?;
2109            if result.rows_affected == 0 {
2110                Err(anyhow!("not a collaborator on this project"))?;
2111            }
2112
2113            let project = project::Entity::find_by_id(project_id)
2114                .one(&*tx)
2115                .await?
2116                .ok_or_else(|| anyhow!("no such project"))?;
2117            let collaborators = project
2118                .find_related(project_collaborator::Entity)
2119                .all(&*tx)
2120                .await?;
2121            let connection_ids = collaborators
2122                .into_iter()
2123                .map(|collaborator| ConnectionId(collaborator.connection_id as u32))
2124                .collect();
2125
2126            let left_project = LeftProject {
2127                id: project_id,
2128                host_user_id: project.host_user_id,
2129                host_connection_id: ConnectionId(project.host_connection_id as u32),
2130                connection_ids,
2131            };
2132            Ok((project.room_id, left_project))
2133        })
2134        .await
2135    }
2136
2137    pub async fn project_collaborators(
2138        &self,
2139        project_id: ProjectId,
2140        connection_id: ConnectionId,
2141    ) -> Result<RoomGuard<Vec<project_collaborator::Model>>> {
2142        self.room_transaction(|tx| async move {
2143            let project = project::Entity::find_by_id(project_id)
2144                .one(&*tx)
2145                .await?
2146                .ok_or_else(|| anyhow!("no such project"))?;
2147            let collaborators = project_collaborator::Entity::find()
2148                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2149                .all(&*tx)
2150                .await?;
2151
2152            if collaborators
2153                .iter()
2154                .any(|collaborator| collaborator.connection_id == connection_id.0 as i32)
2155            {
2156                Ok((project.room_id, collaborators))
2157            } else {
2158                Err(anyhow!("no such project"))?
2159            }
2160        })
2161        .await
2162    }
2163
2164    pub async fn project_connection_ids(
2165        &self,
2166        project_id: ProjectId,
2167        connection_id: ConnectionId,
2168    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
2169        self.room_transaction(|tx| async move {
2170            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2171            enum QueryAs {
2172                ConnectionId,
2173            }
2174
2175            let project = project::Entity::find_by_id(project_id)
2176                .one(&*tx)
2177                .await?
2178                .ok_or_else(|| anyhow!("no such project"))?;
2179            let mut db_connection_ids = project_collaborator::Entity::find()
2180                .select_only()
2181                .column_as(
2182                    project_collaborator::Column::ConnectionId,
2183                    QueryAs::ConnectionId,
2184                )
2185                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2186                .into_values::<i32, QueryAs>()
2187                .stream(&*tx)
2188                .await?;
2189
2190            let mut connection_ids = HashSet::default();
2191            while let Some(connection_id) = db_connection_ids.next().await {
2192                connection_ids.insert(ConnectionId(connection_id? as u32));
2193            }
2194
2195            if connection_ids.contains(&connection_id) {
2196                Ok((project.room_id, connection_ids))
2197            } else {
2198                Err(anyhow!("no such project"))?
2199            }
2200        })
2201        .await
2202    }
2203
2204    async fn project_guest_connection_ids(
2205        &self,
2206        project_id: ProjectId,
2207        tx: &DatabaseTransaction,
2208    ) -> Result<Vec<ConnectionId>> {
2209        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2210        enum QueryAs {
2211            ConnectionId,
2212        }
2213
2214        let mut db_guest_connection_ids = project_collaborator::Entity::find()
2215            .select_only()
2216            .column_as(
2217                project_collaborator::Column::ConnectionId,
2218                QueryAs::ConnectionId,
2219            )
2220            .filter(
2221                project_collaborator::Column::ProjectId
2222                    .eq(project_id)
2223                    .and(project_collaborator::Column::IsHost.eq(false)),
2224            )
2225            .into_values::<i32, QueryAs>()
2226            .stream(tx)
2227            .await?;
2228
2229        let mut guest_connection_ids = Vec::new();
2230        while let Some(connection_id) = db_guest_connection_ids.next().await {
2231            guest_connection_ids.push(ConnectionId(connection_id? as u32));
2232        }
2233        Ok(guest_connection_ids)
2234    }
2235
2236    // access tokens
2237
2238    pub async fn create_access_token_hash(
2239        &self,
2240        user_id: UserId,
2241        access_token_hash: &str,
2242        max_access_token_count: usize,
2243    ) -> Result<()> {
2244        self.transaction(|tx| async {
2245            let tx = tx;
2246
2247            access_token::ActiveModel {
2248                user_id: ActiveValue::set(user_id),
2249                hash: ActiveValue::set(access_token_hash.into()),
2250                ..Default::default()
2251            }
2252            .insert(&*tx)
2253            .await?;
2254
2255            access_token::Entity::delete_many()
2256                .filter(
2257                    access_token::Column::Id.in_subquery(
2258                        Query::select()
2259                            .column(access_token::Column::Id)
2260                            .from(access_token::Entity)
2261                            .and_where(access_token::Column::UserId.eq(user_id))
2262                            .order_by(access_token::Column::Id, sea_orm::Order::Desc)
2263                            .limit(10000)
2264                            .offset(max_access_token_count as u64)
2265                            .to_owned(),
2266                    ),
2267                )
2268                .exec(&*tx)
2269                .await?;
2270            Ok(())
2271        })
2272        .await
2273    }
2274
2275    pub async fn get_access_token_hashes(&self, user_id: UserId) -> Result<Vec<String>> {
2276        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2277        enum QueryAs {
2278            Hash,
2279        }
2280
2281        self.transaction(|tx| async move {
2282            Ok(access_token::Entity::find()
2283                .select_only()
2284                .column(access_token::Column::Hash)
2285                .filter(access_token::Column::UserId.eq(user_id))
2286                .order_by_desc(access_token::Column::Id)
2287                .into_values::<_, QueryAs>()
2288                .all(&*tx)
2289                .await?)
2290        })
2291        .await
2292    }
2293
2294    async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
2295    where
2296        F: Send + Fn(TransactionHandle) -> Fut,
2297        Fut: Send + Future<Output = Result<T>>,
2298    {
2299        let body = async {
2300            loop {
2301                let (tx, result) = self.with_transaction(&f).await?;
2302                match result {
2303                    Ok(result) => {
2304                        match tx.commit().await.map_err(Into::into) {
2305                            Ok(()) => return Ok(result),
2306                            Err(error) => {
2307                                if is_serialization_error(&error) {
2308                                    // Retry (don't break the loop)
2309                                } else {
2310                                    return Err(error);
2311                                }
2312                            }
2313                        }
2314                    }
2315                    Err(error) => {
2316                        tx.rollback().await?;
2317                        if is_serialization_error(&error) {
2318                            // Retry (don't break the loop)
2319                        } else {
2320                            return Err(error);
2321                        }
2322                    }
2323                }
2324            }
2325        };
2326
2327        self.run(body).await
2328    }
2329
2330    async fn room_transaction<F, Fut, T>(&self, f: F) -> Result<RoomGuard<T>>
2331    where
2332        F: Send + Fn(TransactionHandle) -> Fut,
2333        Fut: Send + Future<Output = Result<(RoomId, T)>>,
2334    {
2335        let body = async {
2336            loop {
2337                let (tx, result) = self.with_transaction(&f).await?;
2338                match result {
2339                    Ok((room_id, data)) => {
2340                        let lock = self.rooms.entry(room_id).or_default().clone();
2341                        let _guard = lock.lock_owned().await;
2342                        match tx.commit().await.map_err(Into::into) {
2343                            Ok(()) => {
2344                                return Ok(RoomGuard {
2345                                    data,
2346                                    _guard,
2347                                    _not_send: PhantomData,
2348                                });
2349                            }
2350                            Err(error) => {
2351                                if is_serialization_error(&error) {
2352                                    // Retry (don't break the loop)
2353                                } else {
2354                                    return Err(error);
2355                                }
2356                            }
2357                        }
2358                    }
2359                    Err(error) => {
2360                        tx.rollback().await?;
2361                        if is_serialization_error(&error) {
2362                            // Retry (don't break the loop)
2363                        } else {
2364                            return Err(error);
2365                        }
2366                    }
2367                }
2368            }
2369        };
2370
2371        self.run(body).await
2372    }
2373
2374    async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
2375    where
2376        F: Send + Fn(TransactionHandle) -> Fut,
2377        Fut: Send + Future<Output = Result<T>>,
2378    {
2379        let tx = self
2380            .pool
2381            .begin_with_config(Some(IsolationLevel::Serializable), None)
2382            .await?;
2383
2384        let mut tx = Arc::new(Some(tx));
2385        let result = f(TransactionHandle(tx.clone())).await;
2386        let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
2387            return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
2388        };
2389
2390        Ok((tx, result))
2391    }
2392
2393    async fn run<F, T>(&self, future: F) -> T
2394    where
2395        F: Future<Output = T>,
2396    {
2397        #[cfg(test)]
2398        {
2399            if let Some(background) = self.background.as_ref() {
2400                background.simulate_random_delay().await;
2401            }
2402
2403            self.runtime.as_ref().unwrap().block_on(future)
2404        }
2405
2406        #[cfg(not(test))]
2407        {
2408            future.await
2409        }
2410    }
2411}
2412
2413fn is_serialization_error(error: &Error) -> bool {
2414    const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
2415    match error {
2416        Error::Database(
2417            DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
2418            | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
2419        ) if error
2420            .as_database_error()
2421            .and_then(|error| error.code())
2422            .as_deref()
2423            == Some(SERIALIZATION_FAILURE_CODE) =>
2424        {
2425            true
2426        }
2427        _ => false,
2428    }
2429}
2430
2431struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
2432
2433impl Deref for TransactionHandle {
2434    type Target = DatabaseTransaction;
2435
2436    fn deref(&self) -> &Self::Target {
2437        self.0.as_ref().as_ref().unwrap()
2438    }
2439}
2440
2441pub struct RoomGuard<T> {
2442    data: T,
2443    _guard: OwnedMutexGuard<()>,
2444    _not_send: PhantomData<Rc<()>>,
2445}
2446
2447impl<T> Deref for RoomGuard<T> {
2448    type Target = T;
2449
2450    fn deref(&self) -> &T {
2451        &self.data
2452    }
2453}
2454
2455impl<T> DerefMut for RoomGuard<T> {
2456    fn deref_mut(&mut self) -> &mut T {
2457        &mut self.data
2458    }
2459}
2460
2461#[derive(Debug, Serialize, Deserialize)]
2462pub struct NewUserParams {
2463    pub github_login: String,
2464    pub github_user_id: i32,
2465    pub invite_count: i32,
2466}
2467
2468#[derive(Debug)]
2469pub struct NewUserResult {
2470    pub user_id: UserId,
2471    pub metrics_id: String,
2472    pub inviting_user_id: Option<UserId>,
2473    pub signup_device_id: Option<String>,
2474}
2475
2476fn random_invite_code() -> String {
2477    nanoid::nanoid!(16)
2478}
2479
2480fn random_email_confirmation_code() -> String {
2481    nanoid::nanoid!(64)
2482}
2483
2484macro_rules! id_type {
2485    ($name:ident) => {
2486        #[derive(
2487            Clone,
2488            Copy,
2489            Debug,
2490            Default,
2491            PartialEq,
2492            Eq,
2493            PartialOrd,
2494            Ord,
2495            Hash,
2496            Serialize,
2497            Deserialize,
2498        )]
2499        #[serde(transparent)]
2500        pub struct $name(pub i32);
2501
2502        impl $name {
2503            #[allow(unused)]
2504            pub const MAX: Self = Self(i32::MAX);
2505
2506            #[allow(unused)]
2507            pub fn from_proto(value: u64) -> Self {
2508                Self(value as i32)
2509            }
2510
2511            #[allow(unused)]
2512            pub fn to_proto(self) -> u64 {
2513                self.0 as u64
2514            }
2515        }
2516
2517        impl std::fmt::Display for $name {
2518            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2519                self.0.fmt(f)
2520            }
2521        }
2522
2523        impl From<$name> for sea_query::Value {
2524            fn from(value: $name) -> Self {
2525                sea_query::Value::Int(Some(value.0))
2526            }
2527        }
2528
2529        impl sea_orm::TryGetable for $name {
2530            fn try_get(
2531                res: &sea_orm::QueryResult,
2532                pre: &str,
2533                col: &str,
2534            ) -> Result<Self, sea_orm::TryGetError> {
2535                Ok(Self(i32::try_get(res, pre, col)?))
2536            }
2537        }
2538
2539        impl sea_query::ValueType for $name {
2540            fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
2541                match v {
2542                    Value::TinyInt(Some(int)) => {
2543                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2544                    }
2545                    Value::SmallInt(Some(int)) => {
2546                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2547                    }
2548                    Value::Int(Some(int)) => {
2549                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2550                    }
2551                    Value::BigInt(Some(int)) => {
2552                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2553                    }
2554                    Value::TinyUnsigned(Some(int)) => {
2555                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2556                    }
2557                    Value::SmallUnsigned(Some(int)) => {
2558                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2559                    }
2560                    Value::Unsigned(Some(int)) => {
2561                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2562                    }
2563                    Value::BigUnsigned(Some(int)) => {
2564                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2565                    }
2566                    _ => Err(sea_query::ValueTypeErr),
2567                }
2568            }
2569
2570            fn type_name() -> String {
2571                stringify!($name).into()
2572            }
2573
2574            fn array_type() -> sea_query::ArrayType {
2575                sea_query::ArrayType::Int
2576            }
2577
2578            fn column_type() -> sea_query::ColumnType {
2579                sea_query::ColumnType::Integer(None)
2580            }
2581        }
2582
2583        impl sea_orm::TryFromU64 for $name {
2584            fn try_from_u64(n: u64) -> Result<Self, DbErr> {
2585                Ok(Self(n.try_into().map_err(|_| {
2586                    DbErr::ConvertFromU64(concat!(
2587                        "error converting ",
2588                        stringify!($name),
2589                        " to u64"
2590                    ))
2591                })?))
2592            }
2593        }
2594
2595        impl sea_query::Nullable for $name {
2596            fn null() -> Value {
2597                Value::Int(None)
2598            }
2599        }
2600    };
2601}
2602
2603id_type!(AccessTokenId);
2604id_type!(ContactId);
2605id_type!(RoomId);
2606id_type!(RoomParticipantId);
2607id_type!(ProjectId);
2608id_type!(ProjectCollaboratorId);
2609id_type!(ReplicaId);
2610id_type!(SignupId);
2611id_type!(UserId);
2612
2613pub struct LeftRoom {
2614    pub room: proto::Room,
2615    pub left_projects: HashMap<ProjectId, LeftProject>,
2616    pub canceled_calls_to_user_ids: Vec<UserId>,
2617}
2618
2619pub struct RefreshedRoom {
2620    pub room: proto::Room,
2621    pub stale_participant_user_ids: Vec<UserId>,
2622    pub canceled_calls_to_user_ids: Vec<UserId>,
2623}
2624
2625pub struct Project {
2626    pub collaborators: Vec<project_collaborator::Model>,
2627    pub worktrees: BTreeMap<u64, Worktree>,
2628    pub language_servers: Vec<proto::LanguageServer>,
2629}
2630
2631pub struct LeftProject {
2632    pub id: ProjectId,
2633    pub host_user_id: UserId,
2634    pub host_connection_id: ConnectionId,
2635    pub connection_ids: Vec<ConnectionId>,
2636}
2637
2638pub struct Worktree {
2639    pub id: u64,
2640    pub abs_path: String,
2641    pub root_name: String,
2642    pub visible: bool,
2643    pub entries: Vec<proto::Entry>,
2644    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
2645    pub scan_id: u64,
2646    pub is_complete: bool,
2647}
2648
2649#[cfg(test)]
2650pub use test::*;
2651
2652#[cfg(test)]
2653mod test {
2654    use super::*;
2655    use gpui::executor::Background;
2656    use lazy_static::lazy_static;
2657    use parking_lot::Mutex;
2658    use rand::prelude::*;
2659    use sea_orm::ConnectionTrait;
2660    use sqlx::migrate::MigrateDatabase;
2661    use std::sync::Arc;
2662
2663    pub struct TestDb {
2664        pub db: Option<Arc<Database>>,
2665        pub connection: Option<sqlx::AnyConnection>,
2666    }
2667
2668    impl TestDb {
2669        pub fn sqlite(background: Arc<Background>) -> Self {
2670            let url = format!("sqlite::memory:");
2671            let runtime = tokio::runtime::Builder::new_current_thread()
2672                .enable_io()
2673                .enable_time()
2674                .build()
2675                .unwrap();
2676
2677            let mut db = runtime.block_on(async {
2678                let mut options = ConnectOptions::new(url);
2679                options.max_connections(5);
2680                let db = Database::new(options).await.unwrap();
2681                let sql = include_str!(concat!(
2682                    env!("CARGO_MANIFEST_DIR"),
2683                    "/migrations.sqlite/20221109000000_test_schema.sql"
2684                ));
2685                db.pool
2686                    .execute(sea_orm::Statement::from_string(
2687                        db.pool.get_database_backend(),
2688                        sql.into(),
2689                    ))
2690                    .await
2691                    .unwrap();
2692                db
2693            });
2694
2695            db.background = Some(background);
2696            db.runtime = Some(runtime);
2697
2698            Self {
2699                db: Some(Arc::new(db)),
2700                connection: None,
2701            }
2702        }
2703
2704        pub fn postgres(background: Arc<Background>) -> Self {
2705            lazy_static! {
2706                static ref LOCK: Mutex<()> = Mutex::new(());
2707            }
2708
2709            let _guard = LOCK.lock();
2710            let mut rng = StdRng::from_entropy();
2711            let url = format!(
2712                "postgres://postgres@localhost/zed-test-{}",
2713                rng.gen::<u128>()
2714            );
2715            let runtime = tokio::runtime::Builder::new_current_thread()
2716                .enable_io()
2717                .enable_time()
2718                .build()
2719                .unwrap();
2720
2721            let mut db = runtime.block_on(async {
2722                sqlx::Postgres::create_database(&url)
2723                    .await
2724                    .expect("failed to create test db");
2725                let mut options = ConnectOptions::new(url);
2726                options
2727                    .max_connections(5)
2728                    .idle_timeout(Duration::from_secs(0));
2729                let db = Database::new(options).await.unwrap();
2730                let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
2731                db.migrate(Path::new(migrations_path), false).await.unwrap();
2732                db
2733            });
2734
2735            db.background = Some(background);
2736            db.runtime = Some(runtime);
2737
2738            Self {
2739                db: Some(Arc::new(db)),
2740                connection: None,
2741            }
2742        }
2743
2744        pub fn db(&self) -> &Arc<Database> {
2745            self.db.as_ref().unwrap()
2746        }
2747    }
2748
2749    impl Drop for TestDb {
2750        fn drop(&mut self) {
2751            let db = self.db.take().unwrap();
2752            if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
2753                db.runtime.as_ref().unwrap().block_on(async {
2754                    use util::ResultExt;
2755                    let query = "
2756                        SELECT pg_terminate_backend(pg_stat_activity.pid)
2757                        FROM pg_stat_activity
2758                        WHERE
2759                            pg_stat_activity.datname = current_database() AND
2760                            pid <> pg_backend_pid();
2761                    ";
2762                    db.pool
2763                        .execute(sea_orm::Statement::from_string(
2764                            db.pool.get_database_backend(),
2765                            query.into(),
2766                        ))
2767                        .await
2768                        .log_err();
2769                    sqlx::Postgres::drop_database(db.options.get_url())
2770                        .await
2771                        .log_err();
2772                })
2773            }
2774        }
2775    }
2776}