db.rs

   1mod access_token;
   2mod contact;
   3mod follower;
   4mod language_server;
   5mod project;
   6mod project_collaborator;
   7mod room;
   8mod room_participant;
   9mod server;
  10mod signup;
  11#[cfg(test)]
  12mod tests;
  13mod user;
  14mod worktree;
  15mod worktree_diagnostic_summary;
  16mod worktree_entry;
  17
  18use crate::executor::Executor;
  19use crate::{Error, Result};
  20use anyhow::anyhow;
  21use collections::{BTreeMap, HashMap, HashSet};
  22pub use contact::Contact;
  23use dashmap::DashMap;
  24use futures::StreamExt;
  25use hyper::StatusCode;
  26use rand::prelude::StdRng;
  27use rand::{Rng, SeedableRng};
  28use rpc::{proto, ConnectionId};
  29use sea_orm::Condition;
  30pub use sea_orm::ConnectOptions;
  31use sea_orm::{
  32    entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
  33    DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
  34    Statement, TransactionTrait,
  35};
  36use sea_query::{Alias, Expr, OnConflict, Query};
  37use serde::{Deserialize, Serialize};
  38pub use signup::{Invite, NewSignup, WaitlistSummary};
  39use sqlx::migrate::{Migrate, Migration, MigrationSource};
  40use sqlx::Connection;
  41use std::ops::{Deref, DerefMut};
  42use std::path::Path;
  43use std::time::Duration;
  44use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
  45use tokio::sync::{Mutex, OwnedMutexGuard};
  46pub use user::Model as User;
  47
  48pub struct Database {
  49    options: ConnectOptions,
  50    pool: DatabaseConnection,
  51    rooms: DashMap<RoomId, Arc<Mutex<()>>>,
  52    rng: Mutex<StdRng>,
  53    executor: Executor,
  54    #[cfg(test)]
  55    runtime: Option<tokio::runtime::Runtime>,
  56}
  57
  58impl Database {
  59    pub async fn new(options: ConnectOptions, executor: Executor) -> Result<Self> {
  60        Ok(Self {
  61            options: options.clone(),
  62            pool: sea_orm::Database::connect(options).await?,
  63            rooms: DashMap::with_capacity(16384),
  64            rng: Mutex::new(StdRng::seed_from_u64(0)),
  65            executor,
  66            #[cfg(test)]
  67            runtime: None,
  68        })
  69    }
  70
  71    #[cfg(test)]
  72    pub fn reset(&self) {
  73        self.rooms.clear();
  74    }
  75
  76    pub async fn migrate(
  77        &self,
  78        migrations_path: &Path,
  79        ignore_checksum_mismatch: bool,
  80    ) -> anyhow::Result<Vec<(Migration, Duration)>> {
  81        let migrations = MigrationSource::resolve(migrations_path)
  82            .await
  83            .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
  84
  85        let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
  86
  87        connection.ensure_migrations_table().await?;
  88        let applied_migrations: HashMap<_, _> = connection
  89            .list_applied_migrations()
  90            .await?
  91            .into_iter()
  92            .map(|m| (m.version, m))
  93            .collect();
  94
  95        let mut new_migrations = Vec::new();
  96        for migration in migrations {
  97            match applied_migrations.get(&migration.version) {
  98                Some(applied_migration) => {
  99                    if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
 100                    {
 101                        Err(anyhow!(
 102                            "checksum mismatch for applied migration {}",
 103                            migration.description
 104                        ))?;
 105                    }
 106                }
 107                None => {
 108                    let elapsed = connection.apply(&migration).await?;
 109                    new_migrations.push((migration, elapsed));
 110                }
 111            }
 112        }
 113
 114        Ok(new_migrations)
 115    }
 116
 117    pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
 118        self.transaction(|tx| async move {
 119            let server = server::ActiveModel {
 120                environment: ActiveValue::set(environment.into()),
 121                ..Default::default()
 122            }
 123            .insert(&*tx)
 124            .await?;
 125            Ok(server.id)
 126        })
 127        .await
 128    }
 129
 130    pub async fn stale_room_ids(
 131        &self,
 132        environment: &str,
 133        new_server_id: ServerId,
 134    ) -> Result<Vec<RoomId>> {
 135        self.transaction(|tx| async move {
 136            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 137            enum QueryAs {
 138                RoomId,
 139            }
 140
 141            let stale_server_epochs = self
 142                .stale_server_ids(environment, new_server_id, &tx)
 143                .await?;
 144            Ok(room_participant::Entity::find()
 145                .select_only()
 146                .column(room_participant::Column::RoomId)
 147                .distinct()
 148                .filter(
 149                    room_participant::Column::AnsweringConnectionServerId
 150                        .is_in(stale_server_epochs),
 151                )
 152                .into_values::<_, QueryAs>()
 153                .all(&*tx)
 154                .await?)
 155        })
 156        .await
 157    }
 158
 159    pub async fn refresh_room(
 160        &self,
 161        room_id: RoomId,
 162        new_server_id: ServerId,
 163    ) -> Result<RoomGuard<RefreshedRoom>> {
 164        self.room_transaction(room_id, |tx| async move {
 165            let stale_participant_filter = Condition::all()
 166                .add(room_participant::Column::RoomId.eq(room_id))
 167                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
 168                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
 169
 170            let stale_participant_user_ids = room_participant::Entity::find()
 171                .filter(stale_participant_filter.clone())
 172                .all(&*tx)
 173                .await?
 174                .into_iter()
 175                .map(|participant| participant.user_id)
 176                .collect::<Vec<_>>();
 177
 178            // Delete participants who failed to reconnect and cancel their calls.
 179            let mut canceled_calls_to_user_ids = Vec::new();
 180            room_participant::Entity::delete_many()
 181                .filter(stale_participant_filter)
 182                .exec(&*tx)
 183                .await?;
 184            let called_participants = room_participant::Entity::find()
 185                .filter(
 186                    Condition::all()
 187                        .add(
 188                            room_participant::Column::CallingUserId
 189                                .is_in(stale_participant_user_ids.iter().copied()),
 190                        )
 191                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
 192                )
 193                .all(&*tx)
 194                .await?;
 195            room_participant::Entity::delete_many()
 196                .filter(
 197                    room_participant::Column::Id
 198                        .is_in(called_participants.iter().map(|participant| participant.id)),
 199                )
 200                .exec(&*tx)
 201                .await?;
 202            canceled_calls_to_user_ids.extend(
 203                called_participants
 204                    .into_iter()
 205                    .map(|participant| participant.user_id),
 206            );
 207
 208            let room = self.get_room(room_id, &tx).await?;
 209            // Delete the room if it becomes empty.
 210            if room.participants.is_empty() {
 211                project::Entity::delete_many()
 212                    .filter(project::Column::RoomId.eq(room_id))
 213                    .exec(&*tx)
 214                    .await?;
 215                room::Entity::delete_by_id(room_id).exec(&*tx).await?;
 216            }
 217
 218            Ok(RefreshedRoom {
 219                room,
 220                stale_participant_user_ids,
 221                canceled_calls_to_user_ids,
 222            })
 223        })
 224        .await
 225    }
 226
 227    pub async fn delete_stale_servers(
 228        &self,
 229        environment: &str,
 230        new_server_id: ServerId,
 231    ) -> Result<()> {
 232        self.transaction(|tx| async move {
 233            server::Entity::delete_many()
 234                .filter(
 235                    Condition::all()
 236                        .add(server::Column::Environment.eq(environment))
 237                        .add(server::Column::Id.ne(new_server_id)),
 238                )
 239                .exec(&*tx)
 240                .await?;
 241            Ok(())
 242        })
 243        .await
 244    }
 245
 246    async fn stale_server_ids(
 247        &self,
 248        environment: &str,
 249        new_server_id: ServerId,
 250        tx: &DatabaseTransaction,
 251    ) -> Result<Vec<ServerId>> {
 252        let stale_servers = server::Entity::find()
 253            .filter(
 254                Condition::all()
 255                    .add(server::Column::Environment.eq(environment))
 256                    .add(server::Column::Id.ne(new_server_id)),
 257            )
 258            .all(&*tx)
 259            .await?;
 260        Ok(stale_servers.into_iter().map(|server| server.id).collect())
 261    }
 262
 263    // users
 264
 265    pub async fn create_user(
 266        &self,
 267        email_address: &str,
 268        admin: bool,
 269        params: NewUserParams,
 270    ) -> Result<NewUserResult> {
 271        self.transaction(|tx| async {
 272            let tx = tx;
 273            let user = user::Entity::insert(user::ActiveModel {
 274                email_address: ActiveValue::set(Some(email_address.into())),
 275                github_login: ActiveValue::set(params.github_login.clone()),
 276                github_user_id: ActiveValue::set(Some(params.github_user_id)),
 277                admin: ActiveValue::set(admin),
 278                metrics_id: ActiveValue::set(Uuid::new_v4()),
 279                ..Default::default()
 280            })
 281            .on_conflict(
 282                OnConflict::column(user::Column::GithubLogin)
 283                    .update_column(user::Column::GithubLogin)
 284                    .to_owned(),
 285            )
 286            .exec_with_returning(&*tx)
 287            .await?;
 288
 289            Ok(NewUserResult {
 290                user_id: user.id,
 291                metrics_id: user.metrics_id.to_string(),
 292                signup_device_id: None,
 293                inviting_user_id: None,
 294            })
 295        })
 296        .await
 297    }
 298
 299    pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
 300        self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
 301            .await
 302    }
 303
 304    pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
 305        self.transaction(|tx| async {
 306            let tx = tx;
 307            Ok(user::Entity::find()
 308                .filter(user::Column::Id.is_in(ids.iter().copied()))
 309                .all(&*tx)
 310                .await?)
 311        })
 312        .await
 313    }
 314
 315    pub async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>> {
 316        self.transaction(|tx| async move {
 317            Ok(user::Entity::find()
 318                .filter(user::Column::GithubLogin.eq(github_login))
 319                .one(&*tx)
 320                .await?)
 321        })
 322        .await
 323    }
 324
 325    pub async fn get_or_create_user_by_github_account(
 326        &self,
 327        github_login: &str,
 328        github_user_id: Option<i32>,
 329        github_email: Option<&str>,
 330    ) -> Result<Option<User>> {
 331        self.transaction(|tx| async move {
 332            let tx = &*tx;
 333            if let Some(github_user_id) = github_user_id {
 334                if let Some(user_by_github_user_id) = user::Entity::find()
 335                    .filter(user::Column::GithubUserId.eq(github_user_id))
 336                    .one(tx)
 337                    .await?
 338                {
 339                    let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
 340                    user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
 341                    Ok(Some(user_by_github_user_id.update(tx).await?))
 342                } else if let Some(user_by_github_login) = user::Entity::find()
 343                    .filter(user::Column::GithubLogin.eq(github_login))
 344                    .one(tx)
 345                    .await?
 346                {
 347                    let mut user_by_github_login = user_by_github_login.into_active_model();
 348                    user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
 349                    Ok(Some(user_by_github_login.update(tx).await?))
 350                } else {
 351                    let user = user::Entity::insert(user::ActiveModel {
 352                        email_address: ActiveValue::set(github_email.map(|email| email.into())),
 353                        github_login: ActiveValue::set(github_login.into()),
 354                        github_user_id: ActiveValue::set(Some(github_user_id)),
 355                        admin: ActiveValue::set(false),
 356                        invite_count: ActiveValue::set(0),
 357                        invite_code: ActiveValue::set(None),
 358                        metrics_id: ActiveValue::set(Uuid::new_v4()),
 359                        ..Default::default()
 360                    })
 361                    .exec_with_returning(&*tx)
 362                    .await?;
 363                    Ok(Some(user))
 364                }
 365            } else {
 366                Ok(user::Entity::find()
 367                    .filter(user::Column::GithubLogin.eq(github_login))
 368                    .one(tx)
 369                    .await?)
 370            }
 371        })
 372        .await
 373    }
 374
 375    pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
 376        self.transaction(|tx| async move {
 377            Ok(user::Entity::find()
 378                .order_by_asc(user::Column::GithubLogin)
 379                .limit(limit as u64)
 380                .offset(page as u64 * limit as u64)
 381                .all(&*tx)
 382                .await?)
 383        })
 384        .await
 385    }
 386
 387    pub async fn get_users_with_no_invites(
 388        &self,
 389        invited_by_another_user: bool,
 390    ) -> Result<Vec<User>> {
 391        self.transaction(|tx| async move {
 392            Ok(user::Entity::find()
 393                .filter(
 394                    user::Column::InviteCount
 395                        .eq(0)
 396                        .and(if invited_by_another_user {
 397                            user::Column::InviterId.is_not_null()
 398                        } else {
 399                            user::Column::InviterId.is_null()
 400                        }),
 401                )
 402                .all(&*tx)
 403                .await?)
 404        })
 405        .await
 406    }
 407
 408    pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
 409        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 410        enum QueryAs {
 411            MetricsId,
 412        }
 413
 414        self.transaction(|tx| async move {
 415            let metrics_id: Uuid = user::Entity::find_by_id(id)
 416                .select_only()
 417                .column(user::Column::MetricsId)
 418                .into_values::<_, QueryAs>()
 419                .one(&*tx)
 420                .await?
 421                .ok_or_else(|| anyhow!("could not find user"))?;
 422            Ok(metrics_id.to_string())
 423        })
 424        .await
 425    }
 426
 427    pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
 428        self.transaction(|tx| async move {
 429            user::Entity::update_many()
 430                .filter(user::Column::Id.eq(id))
 431                .set(user::ActiveModel {
 432                    admin: ActiveValue::set(is_admin),
 433                    ..Default::default()
 434                })
 435                .exec(&*tx)
 436                .await?;
 437            Ok(())
 438        })
 439        .await
 440    }
 441
 442    pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
 443        self.transaction(|tx| async move {
 444            user::Entity::update_many()
 445                .filter(user::Column::Id.eq(id))
 446                .set(user::ActiveModel {
 447                    connected_once: ActiveValue::set(connected_once),
 448                    ..Default::default()
 449                })
 450                .exec(&*tx)
 451                .await?;
 452            Ok(())
 453        })
 454        .await
 455    }
 456
 457    pub async fn destroy_user(&self, id: UserId) -> Result<()> {
 458        self.transaction(|tx| async move {
 459            access_token::Entity::delete_many()
 460                .filter(access_token::Column::UserId.eq(id))
 461                .exec(&*tx)
 462                .await?;
 463            user::Entity::delete_by_id(id).exec(&*tx).await?;
 464            Ok(())
 465        })
 466        .await
 467    }
 468
 469    // contacts
 470
 471    pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
 472        #[derive(Debug, FromQueryResult)]
 473        struct ContactWithUserBusyStatuses {
 474            user_id_a: UserId,
 475            user_id_b: UserId,
 476            a_to_b: bool,
 477            accepted: bool,
 478            should_notify: bool,
 479            user_a_busy: bool,
 480            user_b_busy: bool,
 481        }
 482
 483        self.transaction(|tx| async move {
 484            let user_a_participant = Alias::new("user_a_participant");
 485            let user_b_participant = Alias::new("user_b_participant");
 486            let mut db_contacts = contact::Entity::find()
 487                .column_as(
 488                    Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
 489                        .is_not_null(),
 490                    "user_a_busy",
 491                )
 492                .column_as(
 493                    Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
 494                        .is_not_null(),
 495                    "user_b_busy",
 496                )
 497                .filter(
 498                    contact::Column::UserIdA
 499                        .eq(user_id)
 500                        .or(contact::Column::UserIdB.eq(user_id)),
 501                )
 502                .join_as(
 503                    JoinType::LeftJoin,
 504                    contact::Relation::UserARoomParticipant.def(),
 505                    user_a_participant,
 506                )
 507                .join_as(
 508                    JoinType::LeftJoin,
 509                    contact::Relation::UserBRoomParticipant.def(),
 510                    user_b_participant,
 511                )
 512                .into_model::<ContactWithUserBusyStatuses>()
 513                .stream(&*tx)
 514                .await?;
 515
 516            let mut contacts = Vec::new();
 517            while let Some(db_contact) = db_contacts.next().await {
 518                let db_contact = db_contact?;
 519                if db_contact.user_id_a == user_id {
 520                    if db_contact.accepted {
 521                        contacts.push(Contact::Accepted {
 522                            user_id: db_contact.user_id_b,
 523                            should_notify: db_contact.should_notify && db_contact.a_to_b,
 524                            busy: db_contact.user_b_busy,
 525                        });
 526                    } else if db_contact.a_to_b {
 527                        contacts.push(Contact::Outgoing {
 528                            user_id: db_contact.user_id_b,
 529                        })
 530                    } else {
 531                        contacts.push(Contact::Incoming {
 532                            user_id: db_contact.user_id_b,
 533                            should_notify: db_contact.should_notify,
 534                        });
 535                    }
 536                } else if db_contact.accepted {
 537                    contacts.push(Contact::Accepted {
 538                        user_id: db_contact.user_id_a,
 539                        should_notify: db_contact.should_notify && !db_contact.a_to_b,
 540                        busy: db_contact.user_a_busy,
 541                    });
 542                } else if db_contact.a_to_b {
 543                    contacts.push(Contact::Incoming {
 544                        user_id: db_contact.user_id_a,
 545                        should_notify: db_contact.should_notify,
 546                    });
 547                } else {
 548                    contacts.push(Contact::Outgoing {
 549                        user_id: db_contact.user_id_a,
 550                    });
 551                }
 552            }
 553
 554            contacts.sort_unstable_by_key(|contact| contact.user_id());
 555
 556            Ok(contacts)
 557        })
 558        .await
 559    }
 560
 561    pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
 562        self.transaction(|tx| async move {
 563            let participant = room_participant::Entity::find()
 564                .filter(room_participant::Column::UserId.eq(user_id))
 565                .one(&*tx)
 566                .await?;
 567            Ok(participant.is_some())
 568        })
 569        .await
 570    }
 571
 572    pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
 573        self.transaction(|tx| async move {
 574            let (id_a, id_b) = if user_id_1 < user_id_2 {
 575                (user_id_1, user_id_2)
 576            } else {
 577                (user_id_2, user_id_1)
 578            };
 579
 580            Ok(contact::Entity::find()
 581                .filter(
 582                    contact::Column::UserIdA
 583                        .eq(id_a)
 584                        .and(contact::Column::UserIdB.eq(id_b))
 585                        .and(contact::Column::Accepted.eq(true)),
 586                )
 587                .one(&*tx)
 588                .await?
 589                .is_some())
 590        })
 591        .await
 592    }
 593
 594    pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
 595        self.transaction(|tx| async move {
 596            let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
 597                (sender_id, receiver_id, true)
 598            } else {
 599                (receiver_id, sender_id, false)
 600            };
 601
 602            let rows_affected = contact::Entity::insert(contact::ActiveModel {
 603                user_id_a: ActiveValue::set(id_a),
 604                user_id_b: ActiveValue::set(id_b),
 605                a_to_b: ActiveValue::set(a_to_b),
 606                accepted: ActiveValue::set(false),
 607                should_notify: ActiveValue::set(true),
 608                ..Default::default()
 609            })
 610            .on_conflict(
 611                OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
 612                    .values([
 613                        (contact::Column::Accepted, true.into()),
 614                        (contact::Column::ShouldNotify, false.into()),
 615                    ])
 616                    .action_and_where(
 617                        contact::Column::Accepted.eq(false).and(
 618                            contact::Column::AToB
 619                                .eq(a_to_b)
 620                                .and(contact::Column::UserIdA.eq(id_b))
 621                                .or(contact::Column::AToB
 622                                    .ne(a_to_b)
 623                                    .and(contact::Column::UserIdA.eq(id_a))),
 624                        ),
 625                    )
 626                    .to_owned(),
 627            )
 628            .exec_without_returning(&*tx)
 629            .await?;
 630
 631            if rows_affected == 1 {
 632                Ok(())
 633            } else {
 634                Err(anyhow!("contact already requested"))?
 635            }
 636        })
 637        .await
 638    }
 639
 640    /// Returns a bool indicating whether the removed contact had originally accepted or not
 641    ///
 642    /// Deletes the contact identified by the requester and responder ids, and then returns
 643    /// whether the deleted contact had originally accepted or was a pending contact request.
 644    ///
 645    /// # Arguments
 646    ///
 647    /// * `requester_id` - The user that initiates this request
 648    /// * `responder_id` - The user that will be removed
 649    pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<bool> {
 650        self.transaction(|tx| async move {
 651            let (id_a, id_b) = if responder_id < requester_id {
 652                (responder_id, requester_id)
 653            } else {
 654                (requester_id, responder_id)
 655            };
 656
 657            let contact = contact::Entity::find()
 658                .filter(
 659                    contact::Column::UserIdA
 660                        .eq(id_a)
 661                        .and(contact::Column::UserIdB.eq(id_b)),
 662                )
 663                .one(&*tx)
 664                .await?
 665                .ok_or_else(|| anyhow!("no such contact"))?;
 666
 667            contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
 668            Ok(contact.accepted)
 669        })
 670        .await
 671    }
 672
 673    pub async fn dismiss_contact_notification(
 674        &self,
 675        user_id: UserId,
 676        contact_user_id: UserId,
 677    ) -> Result<()> {
 678        self.transaction(|tx| async move {
 679            let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
 680                (user_id, contact_user_id, true)
 681            } else {
 682                (contact_user_id, user_id, false)
 683            };
 684
 685            let result = contact::Entity::update_many()
 686                .set(contact::ActiveModel {
 687                    should_notify: ActiveValue::set(false),
 688                    ..Default::default()
 689                })
 690                .filter(
 691                    contact::Column::UserIdA
 692                        .eq(id_a)
 693                        .and(contact::Column::UserIdB.eq(id_b))
 694                        .and(
 695                            contact::Column::AToB
 696                                .eq(a_to_b)
 697                                .and(contact::Column::Accepted.eq(true))
 698                                .or(contact::Column::AToB
 699                                    .ne(a_to_b)
 700                                    .and(contact::Column::Accepted.eq(false))),
 701                        ),
 702                )
 703                .exec(&*tx)
 704                .await?;
 705            if result.rows_affected == 0 {
 706                Err(anyhow!("no such contact request"))?
 707            } else {
 708                Ok(())
 709            }
 710        })
 711        .await
 712    }
 713
 714    pub async fn respond_to_contact_request(
 715        &self,
 716        responder_id: UserId,
 717        requester_id: UserId,
 718        accept: bool,
 719    ) -> Result<()> {
 720        self.transaction(|tx| async move {
 721            let (id_a, id_b, a_to_b) = if responder_id < requester_id {
 722                (responder_id, requester_id, false)
 723            } else {
 724                (requester_id, responder_id, true)
 725            };
 726            let rows_affected = if accept {
 727                let result = contact::Entity::update_many()
 728                    .set(contact::ActiveModel {
 729                        accepted: ActiveValue::set(true),
 730                        should_notify: ActiveValue::set(true),
 731                        ..Default::default()
 732                    })
 733                    .filter(
 734                        contact::Column::UserIdA
 735                            .eq(id_a)
 736                            .and(contact::Column::UserIdB.eq(id_b))
 737                            .and(contact::Column::AToB.eq(a_to_b)),
 738                    )
 739                    .exec(&*tx)
 740                    .await?;
 741                result.rows_affected
 742            } else {
 743                let result = contact::Entity::delete_many()
 744                    .filter(
 745                        contact::Column::UserIdA
 746                            .eq(id_a)
 747                            .and(contact::Column::UserIdB.eq(id_b))
 748                            .and(contact::Column::AToB.eq(a_to_b))
 749                            .and(contact::Column::Accepted.eq(false)),
 750                    )
 751                    .exec(&*tx)
 752                    .await?;
 753
 754                result.rows_affected
 755            };
 756
 757            if rows_affected == 1 {
 758                Ok(())
 759            } else {
 760                Err(anyhow!("no such contact request"))?
 761            }
 762        })
 763        .await
 764    }
 765
 766    pub fn fuzzy_like_string(string: &str) -> String {
 767        let mut result = String::with_capacity(string.len() * 2 + 1);
 768        for c in string.chars() {
 769            if c.is_alphanumeric() {
 770                result.push('%');
 771                result.push(c);
 772            }
 773        }
 774        result.push('%');
 775        result
 776    }
 777
 778    pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
 779        self.transaction(|tx| async {
 780            let tx = tx;
 781            let like_string = Self::fuzzy_like_string(name_query);
 782            let query = "
 783                SELECT users.*
 784                FROM users
 785                WHERE github_login ILIKE $1
 786                ORDER BY github_login <-> $2
 787                LIMIT $3
 788            ";
 789
 790            Ok(user::Entity::find()
 791                .from_raw_sql(Statement::from_sql_and_values(
 792                    self.pool.get_database_backend(),
 793                    query.into(),
 794                    vec![like_string.into(), name_query.into(), limit.into()],
 795                ))
 796                .all(&*tx)
 797                .await?)
 798        })
 799        .await
 800    }
 801
 802    // signups
 803
 804    pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
 805        self.transaction(|tx| async move {
 806            signup::Entity::insert(signup::ActiveModel {
 807                email_address: ActiveValue::set(signup.email_address.clone()),
 808                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 809                email_confirmation_sent: ActiveValue::set(false),
 810                platform_mac: ActiveValue::set(signup.platform_mac),
 811                platform_windows: ActiveValue::set(signup.platform_windows),
 812                platform_linux: ActiveValue::set(signup.platform_linux),
 813                platform_unknown: ActiveValue::set(false),
 814                editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
 815                programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
 816                device_id: ActiveValue::set(signup.device_id.clone()),
 817                added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
 818                ..Default::default()
 819            })
 820            .on_conflict(
 821                OnConflict::column(signup::Column::EmailAddress)
 822                    .update_columns([
 823                        signup::Column::PlatformMac,
 824                        signup::Column::PlatformWindows,
 825                        signup::Column::PlatformLinux,
 826                        signup::Column::EditorFeatures,
 827                        signup::Column::ProgrammingLanguages,
 828                        signup::Column::DeviceId,
 829                        signup::Column::AddedToMailingList,
 830                    ])
 831                    .to_owned(),
 832            )
 833            .exec(&*tx)
 834            .await?;
 835            Ok(())
 836        })
 837        .await
 838    }
 839
 840    pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
 841        self.transaction(|tx| async move {
 842            let signup = signup::Entity::find()
 843                .filter(signup::Column::EmailAddress.eq(email_address))
 844                .one(&*tx)
 845                .await?
 846                .ok_or_else(|| {
 847                    anyhow!("signup with email address {} doesn't exist", email_address)
 848                })?;
 849
 850            Ok(signup)
 851        })
 852        .await
 853    }
 854
 855    pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
 856        self.transaction(|tx| async move {
 857            let query = "
 858                SELECT
 859                    COUNT(*) as count,
 860                    COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
 861                    COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
 862                    COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
 863                    COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
 864                FROM (
 865                    SELECT *
 866                    FROM signups
 867                    WHERE
 868                        NOT email_confirmation_sent
 869                ) AS unsent
 870            ";
 871            Ok(
 872                WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
 873                    self.pool.get_database_backend(),
 874                    query.into(),
 875                    vec![],
 876                ))
 877                .one(&*tx)
 878                .await?
 879                .ok_or_else(|| anyhow!("invalid result"))?,
 880            )
 881        })
 882        .await
 883    }
 884
 885    pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
 886        let emails = invites
 887            .iter()
 888            .map(|s| s.email_address.as_str())
 889            .collect::<Vec<_>>();
 890        self.transaction(|tx| async {
 891            let tx = tx;
 892            signup::Entity::update_many()
 893                .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
 894                .set(signup::ActiveModel {
 895                    email_confirmation_sent: ActiveValue::set(true),
 896                    ..Default::default()
 897                })
 898                .exec(&*tx)
 899                .await?;
 900            Ok(())
 901        })
 902        .await
 903    }
 904
 905    pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
 906        self.transaction(|tx| async move {
 907            Ok(signup::Entity::find()
 908                .select_only()
 909                .column(signup::Column::EmailAddress)
 910                .column(signup::Column::EmailConfirmationCode)
 911                .filter(
 912                    signup::Column::EmailConfirmationSent.eq(false).and(
 913                        signup::Column::PlatformMac
 914                            .eq(true)
 915                            .or(signup::Column::PlatformUnknown.eq(true)),
 916                    ),
 917                )
 918                .order_by_asc(signup::Column::CreatedAt)
 919                .limit(count as u64)
 920                .into_model()
 921                .all(&*tx)
 922                .await?)
 923        })
 924        .await
 925    }
 926
 927    // invite codes
 928
 929    pub async fn create_invite_from_code(
 930        &self,
 931        code: &str,
 932        email_address: &str,
 933        device_id: Option<&str>,
 934        added_to_mailing_list: bool,
 935    ) -> Result<Invite> {
 936        self.transaction(|tx| async move {
 937            let existing_user = user::Entity::find()
 938                .filter(user::Column::EmailAddress.eq(email_address))
 939                .one(&*tx)
 940                .await?;
 941
 942            if existing_user.is_some() {
 943                Err(anyhow!("email address is already in use"))?;
 944            }
 945
 946            let inviting_user_with_invites = match user::Entity::find()
 947                .filter(
 948                    user::Column::InviteCode
 949                        .eq(code)
 950                        .and(user::Column::InviteCount.gt(0)),
 951                )
 952                .one(&*tx)
 953                .await?
 954            {
 955                Some(inviting_user) => inviting_user,
 956                None => {
 957                    return Err(Error::Http(
 958                        StatusCode::UNAUTHORIZED,
 959                        "unable to find an invite code with invites remaining".to_string(),
 960                    ))?
 961                }
 962            };
 963            user::Entity::update_many()
 964                .filter(
 965                    user::Column::Id
 966                        .eq(inviting_user_with_invites.id)
 967                        .and(user::Column::InviteCount.gt(0)),
 968                )
 969                .col_expr(
 970                    user::Column::InviteCount,
 971                    Expr::col(user::Column::InviteCount).sub(1),
 972                )
 973                .exec(&*tx)
 974                .await?;
 975
 976            let signup = signup::Entity::insert(signup::ActiveModel {
 977                email_address: ActiveValue::set(email_address.into()),
 978                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 979                email_confirmation_sent: ActiveValue::set(false),
 980                inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
 981                platform_linux: ActiveValue::set(false),
 982                platform_mac: ActiveValue::set(false),
 983                platform_windows: ActiveValue::set(false),
 984                platform_unknown: ActiveValue::set(true),
 985                device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
 986                added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
 987                ..Default::default()
 988            })
 989            .on_conflict(
 990                OnConflict::column(signup::Column::EmailAddress)
 991                    .update_column(signup::Column::InvitingUserId)
 992                    .to_owned(),
 993            )
 994            .exec_with_returning(&*tx)
 995            .await?;
 996
 997            Ok(Invite {
 998                email_address: signup.email_address,
 999                email_confirmation_code: signup.email_confirmation_code,
1000            })
1001        })
1002        .await
1003    }
1004
1005    pub async fn create_user_from_invite(
1006        &self,
1007        invite: &Invite,
1008        user: NewUserParams,
1009    ) -> Result<Option<NewUserResult>> {
1010        self.transaction(|tx| async {
1011            let tx = tx;
1012            let signup = signup::Entity::find()
1013                .filter(
1014                    signup::Column::EmailAddress
1015                        .eq(invite.email_address.as_str())
1016                        .and(
1017                            signup::Column::EmailConfirmationCode
1018                                .eq(invite.email_confirmation_code.as_str()),
1019                        ),
1020                )
1021                .one(&*tx)
1022                .await?
1023                .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
1024
1025            if signup.user_id.is_some() {
1026                return Ok(None);
1027            }
1028
1029            let user = user::Entity::insert(user::ActiveModel {
1030                email_address: ActiveValue::set(Some(invite.email_address.clone())),
1031                github_login: ActiveValue::set(user.github_login.clone()),
1032                github_user_id: ActiveValue::set(Some(user.github_user_id)),
1033                admin: ActiveValue::set(false),
1034                invite_count: ActiveValue::set(user.invite_count),
1035                invite_code: ActiveValue::set(Some(random_invite_code())),
1036                metrics_id: ActiveValue::set(Uuid::new_v4()),
1037                ..Default::default()
1038            })
1039            .on_conflict(
1040                OnConflict::column(user::Column::GithubLogin)
1041                    .update_columns([
1042                        user::Column::EmailAddress,
1043                        user::Column::GithubUserId,
1044                        user::Column::Admin,
1045                    ])
1046                    .to_owned(),
1047            )
1048            .exec_with_returning(&*tx)
1049            .await?;
1050
1051            let mut signup = signup.into_active_model();
1052            signup.user_id = ActiveValue::set(Some(user.id));
1053            let signup = signup.update(&*tx).await?;
1054
1055            if let Some(inviting_user_id) = signup.inviting_user_id {
1056                let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1057                    (inviting_user_id, user.id, true)
1058                } else {
1059                    (user.id, inviting_user_id, false)
1060                };
1061
1062                contact::Entity::insert(contact::ActiveModel {
1063                    user_id_a: ActiveValue::set(user_id_a),
1064                    user_id_b: ActiveValue::set(user_id_b),
1065                    a_to_b: ActiveValue::set(a_to_b),
1066                    should_notify: ActiveValue::set(true),
1067                    accepted: ActiveValue::set(true),
1068                    ..Default::default()
1069                })
1070                .on_conflict(OnConflict::new().do_nothing().to_owned())
1071                .exec_without_returning(&*tx)
1072                .await?;
1073            }
1074
1075            Ok(Some(NewUserResult {
1076                user_id: user.id,
1077                metrics_id: user.metrics_id.to_string(),
1078                inviting_user_id: signup.inviting_user_id,
1079                signup_device_id: signup.device_id,
1080            }))
1081        })
1082        .await
1083    }
1084
1085    pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1086        self.transaction(|tx| async move {
1087            if count > 0 {
1088                user::Entity::update_many()
1089                    .filter(
1090                        user::Column::Id
1091                            .eq(id)
1092                            .and(user::Column::InviteCode.is_null()),
1093                    )
1094                    .set(user::ActiveModel {
1095                        invite_code: ActiveValue::set(Some(random_invite_code())),
1096                        ..Default::default()
1097                    })
1098                    .exec(&*tx)
1099                    .await?;
1100            }
1101
1102            user::Entity::update_many()
1103                .filter(user::Column::Id.eq(id))
1104                .set(user::ActiveModel {
1105                    invite_count: ActiveValue::set(count),
1106                    ..Default::default()
1107                })
1108                .exec(&*tx)
1109                .await?;
1110            Ok(())
1111        })
1112        .await
1113    }
1114
1115    pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1116        self.transaction(|tx| async move {
1117            match user::Entity::find_by_id(id).one(&*tx).await? {
1118                Some(user) if user.invite_code.is_some() => {
1119                    Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1120                }
1121                _ => Ok(None),
1122            }
1123        })
1124        .await
1125    }
1126
1127    pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1128        self.transaction(|tx| async move {
1129            user::Entity::find()
1130                .filter(user::Column::InviteCode.eq(code))
1131                .one(&*tx)
1132                .await?
1133                .ok_or_else(|| {
1134                    Error::Http(
1135                        StatusCode::NOT_FOUND,
1136                        "that invite code does not exist".to_string(),
1137                    )
1138                })
1139        })
1140        .await
1141    }
1142
1143    // rooms
1144
1145    pub async fn incoming_call_for_user(
1146        &self,
1147        user_id: UserId,
1148    ) -> Result<Option<proto::IncomingCall>> {
1149        self.transaction(|tx| async move {
1150            let pending_participant = room_participant::Entity::find()
1151                .filter(
1152                    room_participant::Column::UserId
1153                        .eq(user_id)
1154                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1155                )
1156                .one(&*tx)
1157                .await?;
1158
1159            if let Some(pending_participant) = pending_participant {
1160                let room = self.get_room(pending_participant.room_id, &tx).await?;
1161                Ok(Self::build_incoming_call(&room, user_id))
1162            } else {
1163                Ok(None)
1164            }
1165        })
1166        .await
1167    }
1168
1169    pub async fn create_room(
1170        &self,
1171        user_id: UserId,
1172        connection: ConnectionId,
1173        live_kit_room: &str,
1174    ) -> Result<proto::Room> {
1175        self.transaction(|tx| async move {
1176            let room = room::ActiveModel {
1177                live_kit_room: ActiveValue::set(live_kit_room.into()),
1178                ..Default::default()
1179            }
1180            .insert(&*tx)
1181            .await?;
1182            room_participant::ActiveModel {
1183                room_id: ActiveValue::set(room.id),
1184                user_id: ActiveValue::set(user_id),
1185                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1186                answering_connection_server_id: ActiveValue::set(Some(ServerId(
1187                    connection.owner_id as i32,
1188                ))),
1189                answering_connection_lost: ActiveValue::set(false),
1190                calling_user_id: ActiveValue::set(user_id),
1191                calling_connection_id: ActiveValue::set(connection.id as i32),
1192                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1193                    connection.owner_id as i32,
1194                ))),
1195                ..Default::default()
1196            }
1197            .insert(&*tx)
1198            .await?;
1199
1200            let room = self.get_room(room.id, &tx).await?;
1201            Ok(room)
1202        })
1203        .await
1204    }
1205
1206    pub async fn call(
1207        &self,
1208        room_id: RoomId,
1209        calling_user_id: UserId,
1210        calling_connection: ConnectionId,
1211        called_user_id: UserId,
1212        initial_project_id: Option<ProjectId>,
1213    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1214        self.room_transaction(room_id, |tx| async move {
1215            room_participant::ActiveModel {
1216                room_id: ActiveValue::set(room_id),
1217                user_id: ActiveValue::set(called_user_id),
1218                answering_connection_lost: ActiveValue::set(false),
1219                calling_user_id: ActiveValue::set(calling_user_id),
1220                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1221                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1222                    calling_connection.owner_id as i32,
1223                ))),
1224                initial_project_id: ActiveValue::set(initial_project_id),
1225                ..Default::default()
1226            }
1227            .insert(&*tx)
1228            .await?;
1229
1230            let room = self.get_room(room_id, &tx).await?;
1231            let incoming_call = Self::build_incoming_call(&room, called_user_id)
1232                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1233            Ok((room, incoming_call))
1234        })
1235        .await
1236    }
1237
1238    pub async fn call_failed(
1239        &self,
1240        room_id: RoomId,
1241        called_user_id: UserId,
1242    ) -> Result<RoomGuard<proto::Room>> {
1243        self.room_transaction(room_id, |tx| async move {
1244            room_participant::Entity::delete_many()
1245                .filter(
1246                    room_participant::Column::RoomId
1247                        .eq(room_id)
1248                        .and(room_participant::Column::UserId.eq(called_user_id)),
1249                )
1250                .exec(&*tx)
1251                .await?;
1252            let room = self.get_room(room_id, &tx).await?;
1253            Ok(room)
1254        })
1255        .await
1256    }
1257
1258    pub async fn decline_call(
1259        &self,
1260        expected_room_id: Option<RoomId>,
1261        user_id: UserId,
1262    ) -> Result<Option<RoomGuard<proto::Room>>> {
1263        self.optional_room_transaction(|tx| async move {
1264            let mut filter = Condition::all()
1265                .add(room_participant::Column::UserId.eq(user_id))
1266                .add(room_participant::Column::AnsweringConnectionId.is_null());
1267            if let Some(room_id) = expected_room_id {
1268                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1269            }
1270            let participant = room_participant::Entity::find()
1271                .filter(filter)
1272                .one(&*tx)
1273                .await?;
1274
1275            let participant = if let Some(participant) = participant {
1276                participant
1277            } else if expected_room_id.is_some() {
1278                return Err(anyhow!("could not find call to decline"))?;
1279            } else {
1280                return Ok(None);
1281            };
1282
1283            let room_id = participant.room_id;
1284            room_participant::Entity::delete(participant.into_active_model())
1285                .exec(&*tx)
1286                .await?;
1287
1288            let room = self.get_room(room_id, &tx).await?;
1289            Ok(Some((room_id, room)))
1290        })
1291        .await
1292    }
1293
1294    pub async fn cancel_call(
1295        &self,
1296        room_id: RoomId,
1297        calling_connection: ConnectionId,
1298        called_user_id: UserId,
1299    ) -> Result<RoomGuard<proto::Room>> {
1300        self.room_transaction(room_id, |tx| async move {
1301            let participant = room_participant::Entity::find()
1302                .filter(
1303                    Condition::all()
1304                        .add(room_participant::Column::UserId.eq(called_user_id))
1305                        .add(room_participant::Column::RoomId.eq(room_id))
1306                        .add(
1307                            room_participant::Column::CallingConnectionId
1308                                .eq(calling_connection.id as i32),
1309                        )
1310                        .add(
1311                            room_participant::Column::CallingConnectionServerId
1312                                .eq(calling_connection.owner_id as i32),
1313                        )
1314                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1315                )
1316                .one(&*tx)
1317                .await?
1318                .ok_or_else(|| anyhow!("no call to cancel"))?;
1319
1320            room_participant::Entity::delete(participant.into_active_model())
1321                .exec(&*tx)
1322                .await?;
1323
1324            let room = self.get_room(room_id, &tx).await?;
1325            Ok(room)
1326        })
1327        .await
1328    }
1329
1330    pub async fn join_room(
1331        &self,
1332        room_id: RoomId,
1333        user_id: UserId,
1334        connection: ConnectionId,
1335    ) -> Result<RoomGuard<proto::Room>> {
1336        self.room_transaction(room_id, |tx| async move {
1337            let result = room_participant::Entity::update_many()
1338                .filter(
1339                    Condition::all()
1340                        .add(room_participant::Column::RoomId.eq(room_id))
1341                        .add(room_participant::Column::UserId.eq(user_id))
1342                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1343                )
1344                .set(room_participant::ActiveModel {
1345                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1346                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1347                        connection.owner_id as i32,
1348                    ))),
1349                    answering_connection_lost: ActiveValue::set(false),
1350                    ..Default::default()
1351                })
1352                .exec(&*tx)
1353                .await?;
1354            if result.rows_affected == 0 {
1355                Err(anyhow!("room does not exist or was already joined"))?
1356            } else {
1357                let room = self.get_room(room_id, &tx).await?;
1358                Ok(room)
1359            }
1360        })
1361        .await
1362    }
1363
1364    pub async fn rejoin_room(
1365        &self,
1366        rejoin_room: proto::RejoinRoom,
1367        user_id: UserId,
1368        connection: ConnectionId,
1369    ) -> Result<RoomGuard<RejoinedRoom>> {
1370        let room_id = RoomId::from_proto(rejoin_room.id);
1371        self.room_transaction(room_id, |tx| async {
1372            let tx = tx;
1373            let participant_update = room_participant::Entity::update_many()
1374                .filter(
1375                    Condition::all()
1376                        .add(room_participant::Column::RoomId.eq(room_id))
1377                        .add(room_participant::Column::UserId.eq(user_id))
1378                        .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1379                        .add(
1380                            Condition::any()
1381                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1382                                .add(
1383                                    room_participant::Column::AnsweringConnectionServerId
1384                                        .ne(connection.owner_id as i32),
1385                                ),
1386                        ),
1387                )
1388                .set(room_participant::ActiveModel {
1389                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1390                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1391                        connection.owner_id as i32,
1392                    ))),
1393                    answering_connection_lost: ActiveValue::set(false),
1394                    ..Default::default()
1395                })
1396                .exec(&*tx)
1397                .await?;
1398            if participant_update.rows_affected == 0 {
1399                return Err(anyhow!("room does not exist or was already joined"))?;
1400            }
1401
1402            let mut reshared_projects = Vec::new();
1403            for reshared_project in &rejoin_room.reshared_projects {
1404                let project_id = ProjectId::from_proto(reshared_project.project_id);
1405                let project = project::Entity::find_by_id(project_id)
1406                    .one(&*tx)
1407                    .await?
1408                    .ok_or_else(|| anyhow!("project does not exist"))?;
1409                if project.host_user_id != user_id {
1410                    return Err(anyhow!("no such project"))?;
1411                }
1412
1413                let mut collaborators = project
1414                    .find_related(project_collaborator::Entity)
1415                    .all(&*tx)
1416                    .await?;
1417                let host_ix = collaborators
1418                    .iter()
1419                    .position(|collaborator| {
1420                        collaborator.user_id == user_id && collaborator.is_host
1421                    })
1422                    .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1423                let host = collaborators.swap_remove(host_ix);
1424                let old_connection_id = host.connection();
1425
1426                project::Entity::update(project::ActiveModel {
1427                    host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1428                    host_connection_server_id: ActiveValue::set(Some(ServerId(
1429                        connection.owner_id as i32,
1430                    ))),
1431                    ..project.into_active_model()
1432                })
1433                .exec(&*tx)
1434                .await?;
1435                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1436                    connection_id: ActiveValue::set(connection.id as i32),
1437                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1438                    ..host.into_active_model()
1439                })
1440                .exec(&*tx)
1441                .await?;
1442
1443                self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1444                    .await?;
1445
1446                reshared_projects.push(ResharedProject {
1447                    id: project_id,
1448                    old_connection_id,
1449                    collaborators: collaborators
1450                        .iter()
1451                        .map(|collaborator| ProjectCollaborator {
1452                            connection_id: collaborator.connection(),
1453                            user_id: collaborator.user_id,
1454                            replica_id: collaborator.replica_id,
1455                            is_host: collaborator.is_host,
1456                        })
1457                        .collect(),
1458                    worktrees: reshared_project.worktrees.clone(),
1459                });
1460            }
1461
1462            project::Entity::delete_many()
1463                .filter(
1464                    Condition::all()
1465                        .add(project::Column::RoomId.eq(room_id))
1466                        .add(project::Column::HostUserId.eq(user_id))
1467                        .add(
1468                            project::Column::Id
1469                                .is_not_in(reshared_projects.iter().map(|project| project.id)),
1470                        ),
1471                )
1472                .exec(&*tx)
1473                .await?;
1474
1475            let mut rejoined_projects = Vec::new();
1476            for rejoined_project in &rejoin_room.rejoined_projects {
1477                let project_id = ProjectId::from_proto(rejoined_project.id);
1478                let Some(project) = project::Entity::find_by_id(project_id)
1479                    .one(&*tx)
1480                    .await? else { continue };
1481
1482                let mut worktrees = Vec::new();
1483                let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1484                for db_worktree in db_worktrees {
1485                    let mut worktree = RejoinedWorktree {
1486                        id: db_worktree.id as u64,
1487                        abs_path: db_worktree.abs_path,
1488                        root_name: db_worktree.root_name,
1489                        visible: db_worktree.visible,
1490                        updated_entries: Default::default(),
1491                        removed_entries: Default::default(),
1492                        diagnostic_summaries: Default::default(),
1493                        scan_id: db_worktree.scan_id as u64,
1494                        completed_scan_id: db_worktree.completed_scan_id as u64,
1495                    };
1496
1497                    let rejoined_worktree = rejoined_project
1498                        .worktrees
1499                        .iter()
1500                        .find(|worktree| worktree.id == db_worktree.id as u64);
1501                    let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1502                        worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1503                    } else {
1504                        worktree_entry::Column::IsDeleted.eq(false)
1505                    };
1506
1507                    let mut db_entries = worktree_entry::Entity::find()
1508                        .filter(
1509                            Condition::all()
1510                                .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1511                                .add(entry_filter),
1512                        )
1513                        .stream(&*tx)
1514                        .await?;
1515
1516                    while let Some(db_entry) = db_entries.next().await {
1517                        let db_entry = db_entry?;
1518                        if db_entry.is_deleted {
1519                            worktree.removed_entries.push(db_entry.id as u64);
1520                        } else {
1521                            worktree.updated_entries.push(proto::Entry {
1522                                id: db_entry.id as u64,
1523                                is_dir: db_entry.is_dir,
1524                                path: db_entry.path,
1525                                inode: db_entry.inode as u64,
1526                                mtime: Some(proto::Timestamp {
1527                                    seconds: db_entry.mtime_seconds as u64,
1528                                    nanos: db_entry.mtime_nanos as u32,
1529                                }),
1530                                is_symlink: db_entry.is_symlink,
1531                                is_ignored: db_entry.is_ignored,
1532                            });
1533                        }
1534                    }
1535
1536                    worktrees.push(worktree);
1537                }
1538
1539                let language_servers = project
1540                    .find_related(language_server::Entity)
1541                    .all(&*tx)
1542                    .await?
1543                    .into_iter()
1544                    .map(|language_server| proto::LanguageServer {
1545                        id: language_server.id as u64,
1546                        name: language_server.name,
1547                    })
1548                    .collect::<Vec<_>>();
1549
1550                let mut collaborators = project
1551                    .find_related(project_collaborator::Entity)
1552                    .all(&*tx)
1553                    .await?;
1554                let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1555                    .iter()
1556                    .position(|collaborator| collaborator.user_id == user_id)
1557                {
1558                    collaborators.swap_remove(self_collaborator_ix)
1559                } else {
1560                    continue;
1561                };
1562                let old_connection_id = self_collaborator.connection();
1563                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1564                    connection_id: ActiveValue::set(connection.id as i32),
1565                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1566                    ..self_collaborator.into_active_model()
1567                })
1568                .exec(&*tx)
1569                .await?;
1570
1571                let collaborators = collaborators
1572                    .into_iter()
1573                    .map(|collaborator| ProjectCollaborator {
1574                        connection_id: collaborator.connection(),
1575                        user_id: collaborator.user_id,
1576                        replica_id: collaborator.replica_id,
1577                        is_host: collaborator.is_host,
1578                    })
1579                    .collect::<Vec<_>>();
1580
1581                rejoined_projects.push(RejoinedProject {
1582                    id: project_id,
1583                    old_connection_id,
1584                    collaborators,
1585                    worktrees,
1586                    language_servers,
1587                });
1588            }
1589
1590            let room = self.get_room(room_id, &tx).await?;
1591            Ok(RejoinedRoom {
1592                room,
1593                rejoined_projects,
1594                reshared_projects,
1595            })
1596        })
1597        .await
1598    }
1599
1600    pub async fn leave_room(
1601        &self,
1602        connection: ConnectionId,
1603    ) -> Result<Option<RoomGuard<LeftRoom>>> {
1604        self.optional_room_transaction(|tx| async move {
1605            let leaving_participant = room_participant::Entity::find()
1606                .filter(
1607                    Condition::all()
1608                        .add(
1609                            room_participant::Column::AnsweringConnectionId
1610                                .eq(connection.id as i32),
1611                        )
1612                        .add(
1613                            room_participant::Column::AnsweringConnectionServerId
1614                                .eq(connection.owner_id as i32),
1615                        ),
1616                )
1617                .one(&*tx)
1618                .await?;
1619
1620            if let Some(leaving_participant) = leaving_participant {
1621                // Leave room.
1622                let room_id = leaving_participant.room_id;
1623                room_participant::Entity::delete_by_id(leaving_participant.id)
1624                    .exec(&*tx)
1625                    .await?;
1626
1627                // Cancel pending calls initiated by the leaving user.
1628                let called_participants = room_participant::Entity::find()
1629                    .filter(
1630                        Condition::all()
1631                            .add(
1632                                room_participant::Column::CallingUserId
1633                                    .eq(leaving_participant.user_id),
1634                            )
1635                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
1636                    )
1637                    .all(&*tx)
1638                    .await?;
1639                room_participant::Entity::delete_many()
1640                    .filter(
1641                        room_participant::Column::Id
1642                            .is_in(called_participants.iter().map(|participant| participant.id)),
1643                    )
1644                    .exec(&*tx)
1645                    .await?;
1646                let canceled_calls_to_user_ids = called_participants
1647                    .into_iter()
1648                    .map(|participant| participant.user_id)
1649                    .collect();
1650
1651                // Detect left projects.
1652                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1653                enum QueryProjectIds {
1654                    ProjectId,
1655                }
1656                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1657                    .select_only()
1658                    .column_as(
1659                        project_collaborator::Column::ProjectId,
1660                        QueryProjectIds::ProjectId,
1661                    )
1662                    .filter(
1663                        Condition::all()
1664                            .add(
1665                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1666                            )
1667                            .add(
1668                                project_collaborator::Column::ConnectionServerId
1669                                    .eq(connection.owner_id as i32),
1670                            ),
1671                    )
1672                    .into_values::<_, QueryProjectIds>()
1673                    .all(&*tx)
1674                    .await?;
1675                let mut left_projects = HashMap::default();
1676                let mut collaborators = project_collaborator::Entity::find()
1677                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1678                    .stream(&*tx)
1679                    .await?;
1680                while let Some(collaborator) = collaborators.next().await {
1681                    let collaborator = collaborator?;
1682                    let left_project =
1683                        left_projects
1684                            .entry(collaborator.project_id)
1685                            .or_insert(LeftProject {
1686                                id: collaborator.project_id,
1687                                host_user_id: Default::default(),
1688                                connection_ids: Default::default(),
1689                                host_connection_id: Default::default(),
1690                            });
1691
1692                    let collaborator_connection_id = collaborator.connection();
1693                    if collaborator_connection_id != connection {
1694                        left_project.connection_ids.push(collaborator_connection_id);
1695                    }
1696
1697                    if collaborator.is_host {
1698                        left_project.host_user_id = collaborator.user_id;
1699                        left_project.host_connection_id = collaborator_connection_id;
1700                    }
1701                }
1702                drop(collaborators);
1703
1704                // Leave projects.
1705                project_collaborator::Entity::delete_many()
1706                    .filter(
1707                        Condition::all()
1708                            .add(
1709                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1710                            )
1711                            .add(
1712                                project_collaborator::Column::ConnectionServerId
1713                                    .eq(connection.owner_id as i32),
1714                            ),
1715                    )
1716                    .exec(&*tx)
1717                    .await?;
1718
1719                // Unshare projects.
1720                project::Entity::delete_many()
1721                    .filter(
1722                        Condition::all()
1723                            .add(project::Column::RoomId.eq(room_id))
1724                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
1725                            .add(
1726                                project::Column::HostConnectionServerId
1727                                    .eq(connection.owner_id as i32),
1728                            ),
1729                    )
1730                    .exec(&*tx)
1731                    .await?;
1732
1733                let room = self.get_room(room_id, &tx).await?;
1734                if room.participants.is_empty() {
1735                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
1736                }
1737
1738                let left_room = LeftRoom {
1739                    room,
1740                    left_projects,
1741                    canceled_calls_to_user_ids,
1742                };
1743
1744                if left_room.room.participants.is_empty() {
1745                    self.rooms.remove(&room_id);
1746                }
1747
1748                Ok(Some((room_id, left_room)))
1749            } else {
1750                Ok(None)
1751            }
1752        })
1753        .await
1754    }
1755
1756    pub async fn follow(
1757        &self,
1758        project_id: ProjectId,
1759        leader_connection: ConnectionId,
1760        follower_connection: ConnectionId,
1761    ) -> Result<RoomGuard<proto::Room>> {
1762        let room_id = self.room_id_for_project(project_id).await?;
1763        self.room_transaction(room_id, |tx| async move {
1764            follower::ActiveModel {
1765                room_id: ActiveValue::set(room_id),
1766                project_id: ActiveValue::set(project_id),
1767                leader_connection_server_id: ActiveValue::set(ServerId(
1768                    leader_connection.owner_id as i32,
1769                )),
1770                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1771                follower_connection_server_id: ActiveValue::set(ServerId(
1772                    follower_connection.owner_id as i32,
1773                )),
1774                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1775                ..Default::default()
1776            }
1777            .insert(&*tx)
1778            .await?;
1779
1780            let room = self.get_room(room_id, &*tx).await?;
1781            Ok(room)
1782        })
1783        .await
1784    }
1785
1786    pub async fn unfollow(
1787        &self,
1788        project_id: ProjectId,
1789        leader_connection: ConnectionId,
1790        follower_connection: ConnectionId,
1791    ) -> Result<RoomGuard<proto::Room>> {
1792        let room_id = self.room_id_for_project(project_id).await?;
1793        self.room_transaction(room_id, |tx| async move {
1794            follower::Entity::delete_many()
1795                .filter(
1796                    Condition::all()
1797                        .add(follower::Column::ProjectId.eq(project_id))
1798                        .add(
1799                            follower::Column::LeaderConnectionServerId
1800                                .eq(leader_connection.owner_id),
1801                        )
1802                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1803                        .add(
1804                            follower::Column::FollowerConnectionServerId
1805                                .eq(follower_connection.owner_id),
1806                        )
1807                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1808                )
1809                .exec(&*tx)
1810                .await?;
1811
1812            let room = self.get_room(room_id, &*tx).await?;
1813            Ok(room)
1814        })
1815        .await
1816    }
1817
1818    pub async fn update_room_participant_location(
1819        &self,
1820        room_id: RoomId,
1821        connection: ConnectionId,
1822        location: proto::ParticipantLocation,
1823    ) -> Result<RoomGuard<proto::Room>> {
1824        self.room_transaction(room_id, |tx| async {
1825            let tx = tx;
1826            let location_kind;
1827            let location_project_id;
1828            match location
1829                .variant
1830                .as_ref()
1831                .ok_or_else(|| anyhow!("invalid location"))?
1832            {
1833                proto::participant_location::Variant::SharedProject(project) => {
1834                    location_kind = 0;
1835                    location_project_id = Some(ProjectId::from_proto(project.id));
1836                }
1837                proto::participant_location::Variant::UnsharedProject(_) => {
1838                    location_kind = 1;
1839                    location_project_id = None;
1840                }
1841                proto::participant_location::Variant::External(_) => {
1842                    location_kind = 2;
1843                    location_project_id = None;
1844                }
1845            }
1846
1847            let result = room_participant::Entity::update_many()
1848                .filter(
1849                    Condition::all()
1850                        .add(room_participant::Column::RoomId.eq(room_id))
1851                        .add(
1852                            room_participant::Column::AnsweringConnectionId
1853                                .eq(connection.id as i32),
1854                        )
1855                        .add(
1856                            room_participant::Column::AnsweringConnectionServerId
1857                                .eq(connection.owner_id as i32),
1858                        ),
1859                )
1860                .set(room_participant::ActiveModel {
1861                    location_kind: ActiveValue::set(Some(location_kind)),
1862                    location_project_id: ActiveValue::set(location_project_id),
1863                    ..Default::default()
1864                })
1865                .exec(&*tx)
1866                .await?;
1867
1868            if result.rows_affected == 1 {
1869                let room = self.get_room(room_id, &tx).await?;
1870                Ok(room)
1871            } else {
1872                Err(anyhow!("could not update room participant location"))?
1873            }
1874        })
1875        .await
1876    }
1877
1878    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1879        self.transaction(|tx| async move {
1880            let participant = room_participant::Entity::find()
1881                .filter(
1882                    Condition::all()
1883                        .add(
1884                            room_participant::Column::AnsweringConnectionId
1885                                .eq(connection.id as i32),
1886                        )
1887                        .add(
1888                            room_participant::Column::AnsweringConnectionServerId
1889                                .eq(connection.owner_id as i32),
1890                        ),
1891                )
1892                .one(&*tx)
1893                .await?
1894                .ok_or_else(|| anyhow!("not a participant in any room"))?;
1895
1896            room_participant::Entity::update(room_participant::ActiveModel {
1897                answering_connection_lost: ActiveValue::set(true),
1898                ..participant.into_active_model()
1899            })
1900            .exec(&*tx)
1901            .await?;
1902
1903            Ok(())
1904        })
1905        .await
1906    }
1907
1908    fn build_incoming_call(
1909        room: &proto::Room,
1910        called_user_id: UserId,
1911    ) -> Option<proto::IncomingCall> {
1912        let pending_participant = room
1913            .pending_participants
1914            .iter()
1915            .find(|participant| participant.user_id == called_user_id.to_proto())?;
1916
1917        Some(proto::IncomingCall {
1918            room_id: room.id,
1919            calling_user_id: pending_participant.calling_user_id,
1920            participant_user_ids: room
1921                .participants
1922                .iter()
1923                .map(|participant| participant.user_id)
1924                .collect(),
1925            initial_project: room.participants.iter().find_map(|participant| {
1926                let initial_project_id = pending_participant.initial_project_id?;
1927                participant
1928                    .projects
1929                    .iter()
1930                    .find(|project| project.id == initial_project_id)
1931                    .cloned()
1932            }),
1933        })
1934    }
1935
1936    async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1937        let db_room = room::Entity::find_by_id(room_id)
1938            .one(tx)
1939            .await?
1940            .ok_or_else(|| anyhow!("could not find room"))?;
1941
1942        let mut db_participants = db_room
1943            .find_related(room_participant::Entity)
1944            .stream(tx)
1945            .await?;
1946        let mut participants = HashMap::default();
1947        let mut pending_participants = Vec::new();
1948        while let Some(db_participant) = db_participants.next().await {
1949            let db_participant = db_participant?;
1950            if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
1951                .answering_connection_id
1952                .zip(db_participant.answering_connection_server_id)
1953            {
1954                let location = match (
1955                    db_participant.location_kind,
1956                    db_participant.location_project_id,
1957                ) {
1958                    (Some(0), Some(project_id)) => {
1959                        Some(proto::participant_location::Variant::SharedProject(
1960                            proto::participant_location::SharedProject {
1961                                id: project_id.to_proto(),
1962                            },
1963                        ))
1964                    }
1965                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1966                        Default::default(),
1967                    )),
1968                    _ => Some(proto::participant_location::Variant::External(
1969                        Default::default(),
1970                    )),
1971                };
1972
1973                let answering_connection = ConnectionId {
1974                    owner_id: answering_connection_server_id.0 as u32,
1975                    id: answering_connection_id as u32,
1976                };
1977                participants.insert(
1978                    answering_connection,
1979                    proto::Participant {
1980                        user_id: db_participant.user_id.to_proto(),
1981                        peer_id: Some(answering_connection.into()),
1982                        projects: Default::default(),
1983                        location: Some(proto::ParticipantLocation { variant: location }),
1984                    },
1985                );
1986            } else {
1987                pending_participants.push(proto::PendingParticipant {
1988                    user_id: db_participant.user_id.to_proto(),
1989                    calling_user_id: db_participant.calling_user_id.to_proto(),
1990                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1991                });
1992            }
1993        }
1994        drop(db_participants);
1995
1996        let mut db_projects = db_room
1997            .find_related(project::Entity)
1998            .find_with_related(worktree::Entity)
1999            .stream(tx)
2000            .await?;
2001
2002        while let Some(row) = db_projects.next().await {
2003            let (db_project, db_worktree) = row?;
2004            let host_connection = db_project.host_connection()?;
2005            if let Some(participant) = participants.get_mut(&host_connection) {
2006                let project = if let Some(project) = participant
2007                    .projects
2008                    .iter_mut()
2009                    .find(|project| project.id == db_project.id.to_proto())
2010                {
2011                    project
2012                } else {
2013                    participant.projects.push(proto::ParticipantProject {
2014                        id: db_project.id.to_proto(),
2015                        worktree_root_names: Default::default(),
2016                    });
2017                    participant.projects.last_mut().unwrap()
2018                };
2019
2020                if let Some(db_worktree) = db_worktree {
2021                    if db_worktree.visible {
2022                        project.worktree_root_names.push(db_worktree.root_name);
2023                    }
2024                }
2025            }
2026        }
2027        drop(db_projects);
2028
2029        let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
2030        let mut followers = Vec::new();
2031        while let Some(db_follower) = db_followers.next().await {
2032            let db_follower = db_follower?;
2033            followers.push(proto::Follower {
2034                leader_id: Some(db_follower.leader_connection().into()),
2035                follower_id: Some(db_follower.follower_connection().into()),
2036                project_id: db_follower.project_id.to_proto(),
2037            });
2038        }
2039
2040        Ok(proto::Room {
2041            id: db_room.id.to_proto(),
2042            live_kit_room: db_room.live_kit_room,
2043            participants: participants.into_values().collect(),
2044            pending_participants,
2045            followers,
2046        })
2047    }
2048
2049    // projects
2050
2051    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
2052        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2053        enum QueryAs {
2054            Count,
2055        }
2056
2057        self.transaction(|tx| async move {
2058            Ok(project::Entity::find()
2059                .select_only()
2060                .column_as(project::Column::Id.count(), QueryAs::Count)
2061                .inner_join(user::Entity)
2062                .filter(user::Column::Admin.eq(false))
2063                .into_values::<_, QueryAs>()
2064                .one(&*tx)
2065                .await?
2066                .unwrap_or(0i64) as usize)
2067        })
2068        .await
2069    }
2070
2071    pub async fn share_project(
2072        &self,
2073        room_id: RoomId,
2074        connection: ConnectionId,
2075        worktrees: &[proto::WorktreeMetadata],
2076    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
2077        self.room_transaction(room_id, |tx| async move {
2078            let participant = room_participant::Entity::find()
2079                .filter(
2080                    Condition::all()
2081                        .add(
2082                            room_participant::Column::AnsweringConnectionId
2083                                .eq(connection.id as i32),
2084                        )
2085                        .add(
2086                            room_participant::Column::AnsweringConnectionServerId
2087                                .eq(connection.owner_id as i32),
2088                        ),
2089                )
2090                .one(&*tx)
2091                .await?
2092                .ok_or_else(|| anyhow!("could not find participant"))?;
2093            if participant.room_id != room_id {
2094                return Err(anyhow!("shared project on unexpected room"))?;
2095            }
2096
2097            let project = project::ActiveModel {
2098                room_id: ActiveValue::set(participant.room_id),
2099                host_user_id: ActiveValue::set(participant.user_id),
2100                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
2101                host_connection_server_id: ActiveValue::set(Some(ServerId(
2102                    connection.owner_id as i32,
2103                ))),
2104                ..Default::default()
2105            }
2106            .insert(&*tx)
2107            .await?;
2108
2109            if !worktrees.is_empty() {
2110                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2111                    worktree::ActiveModel {
2112                        id: ActiveValue::set(worktree.id as i64),
2113                        project_id: ActiveValue::set(project.id),
2114                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
2115                        root_name: ActiveValue::set(worktree.root_name.clone()),
2116                        visible: ActiveValue::set(worktree.visible),
2117                        scan_id: ActiveValue::set(0),
2118                        completed_scan_id: ActiveValue::set(0),
2119                    }
2120                }))
2121                .exec(&*tx)
2122                .await?;
2123            }
2124
2125            project_collaborator::ActiveModel {
2126                project_id: ActiveValue::set(project.id),
2127                connection_id: ActiveValue::set(connection.id as i32),
2128                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2129                user_id: ActiveValue::set(participant.user_id),
2130                replica_id: ActiveValue::set(ReplicaId(0)),
2131                is_host: ActiveValue::set(true),
2132                ..Default::default()
2133            }
2134            .insert(&*tx)
2135            .await?;
2136
2137            let room = self.get_room(room_id, &tx).await?;
2138            Ok((project.id, room))
2139        })
2140        .await
2141    }
2142
2143    pub async fn unshare_project(
2144        &self,
2145        project_id: ProjectId,
2146        connection: ConnectionId,
2147    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2148        let room_id = self.room_id_for_project(project_id).await?;
2149        self.room_transaction(room_id, |tx| async move {
2150            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2151
2152            let project = project::Entity::find_by_id(project_id)
2153                .one(&*tx)
2154                .await?
2155                .ok_or_else(|| anyhow!("project not found"))?;
2156            if project.host_connection()? == connection {
2157                project::Entity::delete(project.into_active_model())
2158                    .exec(&*tx)
2159                    .await?;
2160                let room = self.get_room(room_id, &tx).await?;
2161                Ok((room, guest_connection_ids))
2162            } else {
2163                Err(anyhow!("cannot unshare a project hosted by another user"))?
2164            }
2165        })
2166        .await
2167    }
2168
2169    pub async fn update_project(
2170        &self,
2171        project_id: ProjectId,
2172        connection: ConnectionId,
2173        worktrees: &[proto::WorktreeMetadata],
2174    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2175        let room_id = self.room_id_for_project(project_id).await?;
2176        self.room_transaction(room_id, |tx| async move {
2177            let project = project::Entity::find_by_id(project_id)
2178                .filter(
2179                    Condition::all()
2180                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2181                        .add(
2182                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2183                        ),
2184                )
2185                .one(&*tx)
2186                .await?
2187                .ok_or_else(|| anyhow!("no such project"))?;
2188
2189            self.update_project_worktrees(project.id, worktrees, &tx)
2190                .await?;
2191
2192            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2193            let room = self.get_room(project.room_id, &tx).await?;
2194            Ok((room, guest_connection_ids))
2195        })
2196        .await
2197    }
2198
2199    async fn update_project_worktrees(
2200        &self,
2201        project_id: ProjectId,
2202        worktrees: &[proto::WorktreeMetadata],
2203        tx: &DatabaseTransaction,
2204    ) -> Result<()> {
2205        if !worktrees.is_empty() {
2206            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2207                id: ActiveValue::set(worktree.id as i64),
2208                project_id: ActiveValue::set(project_id),
2209                abs_path: ActiveValue::set(worktree.abs_path.clone()),
2210                root_name: ActiveValue::set(worktree.root_name.clone()),
2211                visible: ActiveValue::set(worktree.visible),
2212                scan_id: ActiveValue::set(0),
2213                completed_scan_id: ActiveValue::set(0),
2214            }))
2215            .on_conflict(
2216                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2217                    .update_column(worktree::Column::RootName)
2218                    .to_owned(),
2219            )
2220            .exec(&*tx)
2221            .await?;
2222        }
2223
2224        worktree::Entity::delete_many()
2225            .filter(worktree::Column::ProjectId.eq(project_id).and(
2226                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2227            ))
2228            .exec(&*tx)
2229            .await?;
2230
2231        Ok(())
2232    }
2233
2234    pub async fn update_worktree(
2235        &self,
2236        update: &proto::UpdateWorktree,
2237        connection: ConnectionId,
2238    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2239        let project_id = ProjectId::from_proto(update.project_id);
2240        let worktree_id = update.worktree_id as i64;
2241        let room_id = self.room_id_for_project(project_id).await?;
2242        self.room_transaction(room_id, |tx| async move {
2243            // Ensure the update comes from the host.
2244            let _project = project::Entity::find_by_id(project_id)
2245                .filter(
2246                    Condition::all()
2247                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2248                        .add(
2249                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2250                        ),
2251                )
2252                .one(&*tx)
2253                .await?
2254                .ok_or_else(|| anyhow!("no such project"))?;
2255
2256            // Update metadata.
2257            worktree::Entity::update(worktree::ActiveModel {
2258                id: ActiveValue::set(worktree_id),
2259                project_id: ActiveValue::set(project_id),
2260                root_name: ActiveValue::set(update.root_name.clone()),
2261                scan_id: ActiveValue::set(update.scan_id as i64),
2262                completed_scan_id: if update.is_last_update {
2263                    ActiveValue::set(update.scan_id as i64)
2264                } else {
2265                    ActiveValue::default()
2266                },
2267                abs_path: ActiveValue::set(update.abs_path.clone()),
2268                ..Default::default()
2269            })
2270            .exec(&*tx)
2271            .await?;
2272
2273            if !update.updated_entries.is_empty() {
2274                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2275                    let mtime = entry.mtime.clone().unwrap_or_default();
2276                    worktree_entry::ActiveModel {
2277                        project_id: ActiveValue::set(project_id),
2278                        worktree_id: ActiveValue::set(worktree_id),
2279                        id: ActiveValue::set(entry.id as i64),
2280                        is_dir: ActiveValue::set(entry.is_dir),
2281                        path: ActiveValue::set(entry.path.clone()),
2282                        inode: ActiveValue::set(entry.inode as i64),
2283                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2284                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2285                        is_symlink: ActiveValue::set(entry.is_symlink),
2286                        is_ignored: ActiveValue::set(entry.is_ignored),
2287                        is_deleted: ActiveValue::set(false),
2288                        scan_id: ActiveValue::set(update.scan_id as i64),
2289                    }
2290                }))
2291                .on_conflict(
2292                    OnConflict::columns([
2293                        worktree_entry::Column::ProjectId,
2294                        worktree_entry::Column::WorktreeId,
2295                        worktree_entry::Column::Id,
2296                    ])
2297                    .update_columns([
2298                        worktree_entry::Column::IsDir,
2299                        worktree_entry::Column::Path,
2300                        worktree_entry::Column::Inode,
2301                        worktree_entry::Column::MtimeSeconds,
2302                        worktree_entry::Column::MtimeNanos,
2303                        worktree_entry::Column::IsSymlink,
2304                        worktree_entry::Column::IsIgnored,
2305                        worktree_entry::Column::ScanId,
2306                    ])
2307                    .to_owned(),
2308                )
2309                .exec(&*tx)
2310                .await?;
2311            }
2312
2313            if !update.removed_entries.is_empty() {
2314                worktree_entry::Entity::update_many()
2315                    .filter(
2316                        worktree_entry::Column::ProjectId
2317                            .eq(project_id)
2318                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2319                            .and(
2320                                worktree_entry::Column::Id
2321                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2322                            ),
2323                    )
2324                    .set(worktree_entry::ActiveModel {
2325                        is_deleted: ActiveValue::Set(true),
2326                        scan_id: ActiveValue::Set(update.scan_id as i64),
2327                        ..Default::default()
2328                    })
2329                    .exec(&*tx)
2330                    .await?;
2331            }
2332
2333            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2334            Ok(connection_ids)
2335        })
2336        .await
2337    }
2338
2339    pub async fn update_diagnostic_summary(
2340        &self,
2341        update: &proto::UpdateDiagnosticSummary,
2342        connection: ConnectionId,
2343    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2344        let project_id = ProjectId::from_proto(update.project_id);
2345        let worktree_id = update.worktree_id as i64;
2346        let room_id = self.room_id_for_project(project_id).await?;
2347        self.room_transaction(room_id, |tx| async move {
2348            let summary = update
2349                .summary
2350                .as_ref()
2351                .ok_or_else(|| anyhow!("invalid summary"))?;
2352
2353            // Ensure the update comes from the host.
2354            let project = project::Entity::find_by_id(project_id)
2355                .one(&*tx)
2356                .await?
2357                .ok_or_else(|| anyhow!("no such project"))?;
2358            if project.host_connection()? != connection {
2359                return Err(anyhow!("can't update a project hosted by someone else"))?;
2360            }
2361
2362            // Update summary.
2363            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2364                project_id: ActiveValue::set(project_id),
2365                worktree_id: ActiveValue::set(worktree_id),
2366                path: ActiveValue::set(summary.path.clone()),
2367                language_server_id: ActiveValue::set(summary.language_server_id as i64),
2368                error_count: ActiveValue::set(summary.error_count as i32),
2369                warning_count: ActiveValue::set(summary.warning_count as i32),
2370                ..Default::default()
2371            })
2372            .on_conflict(
2373                OnConflict::columns([
2374                    worktree_diagnostic_summary::Column::ProjectId,
2375                    worktree_diagnostic_summary::Column::WorktreeId,
2376                    worktree_diagnostic_summary::Column::Path,
2377                ])
2378                .update_columns([
2379                    worktree_diagnostic_summary::Column::LanguageServerId,
2380                    worktree_diagnostic_summary::Column::ErrorCount,
2381                    worktree_diagnostic_summary::Column::WarningCount,
2382                ])
2383                .to_owned(),
2384            )
2385            .exec(&*tx)
2386            .await?;
2387
2388            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2389            Ok(connection_ids)
2390        })
2391        .await
2392    }
2393
2394    pub async fn start_language_server(
2395        &self,
2396        update: &proto::StartLanguageServer,
2397        connection: ConnectionId,
2398    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2399        let project_id = ProjectId::from_proto(update.project_id);
2400        let room_id = self.room_id_for_project(project_id).await?;
2401        self.room_transaction(room_id, |tx| async move {
2402            let server = update
2403                .server
2404                .as_ref()
2405                .ok_or_else(|| anyhow!("invalid language server"))?;
2406
2407            // Ensure the update comes from the host.
2408            let project = project::Entity::find_by_id(project_id)
2409                .one(&*tx)
2410                .await?
2411                .ok_or_else(|| anyhow!("no such project"))?;
2412            if project.host_connection()? != connection {
2413                return Err(anyhow!("can't update a project hosted by someone else"))?;
2414            }
2415
2416            // Add the newly-started language server.
2417            language_server::Entity::insert(language_server::ActiveModel {
2418                project_id: ActiveValue::set(project_id),
2419                id: ActiveValue::set(server.id as i64),
2420                name: ActiveValue::set(server.name.clone()),
2421                ..Default::default()
2422            })
2423            .on_conflict(
2424                OnConflict::columns([
2425                    language_server::Column::ProjectId,
2426                    language_server::Column::Id,
2427                ])
2428                .update_column(language_server::Column::Name)
2429                .to_owned(),
2430            )
2431            .exec(&*tx)
2432            .await?;
2433
2434            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2435            Ok(connection_ids)
2436        })
2437        .await
2438    }
2439
2440    pub async fn join_project(
2441        &self,
2442        project_id: ProjectId,
2443        connection: ConnectionId,
2444    ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2445        let room_id = self.room_id_for_project(project_id).await?;
2446        self.room_transaction(room_id, |tx| async move {
2447            let participant = room_participant::Entity::find()
2448                .filter(
2449                    Condition::all()
2450                        .add(
2451                            room_participant::Column::AnsweringConnectionId
2452                                .eq(connection.id as i32),
2453                        )
2454                        .add(
2455                            room_participant::Column::AnsweringConnectionServerId
2456                                .eq(connection.owner_id as i32),
2457                        ),
2458                )
2459                .one(&*tx)
2460                .await?
2461                .ok_or_else(|| anyhow!("must join a room first"))?;
2462
2463            let project = project::Entity::find_by_id(project_id)
2464                .one(&*tx)
2465                .await?
2466                .ok_or_else(|| anyhow!("no such project"))?;
2467            if project.room_id != participant.room_id {
2468                return Err(anyhow!("no such project"))?;
2469            }
2470
2471            let mut collaborators = project
2472                .find_related(project_collaborator::Entity)
2473                .all(&*tx)
2474                .await?;
2475            let replica_ids = collaborators
2476                .iter()
2477                .map(|c| c.replica_id)
2478                .collect::<HashSet<_>>();
2479            let mut replica_id = ReplicaId(1);
2480            while replica_ids.contains(&replica_id) {
2481                replica_id.0 += 1;
2482            }
2483            let new_collaborator = project_collaborator::ActiveModel {
2484                project_id: ActiveValue::set(project_id),
2485                connection_id: ActiveValue::set(connection.id as i32),
2486                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2487                user_id: ActiveValue::set(participant.user_id),
2488                replica_id: ActiveValue::set(replica_id),
2489                is_host: ActiveValue::set(false),
2490                ..Default::default()
2491            }
2492            .insert(&*tx)
2493            .await?;
2494            collaborators.push(new_collaborator);
2495
2496            let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2497            let mut worktrees = db_worktrees
2498                .into_iter()
2499                .map(|db_worktree| {
2500                    (
2501                        db_worktree.id as u64,
2502                        Worktree {
2503                            id: db_worktree.id as u64,
2504                            abs_path: db_worktree.abs_path,
2505                            root_name: db_worktree.root_name,
2506                            visible: db_worktree.visible,
2507                            entries: Default::default(),
2508                            diagnostic_summaries: Default::default(),
2509                            scan_id: db_worktree.scan_id as u64,
2510                            completed_scan_id: db_worktree.completed_scan_id as u64,
2511                        },
2512                    )
2513                })
2514                .collect::<BTreeMap<_, _>>();
2515
2516            // Populate worktree entries.
2517            {
2518                let mut db_entries = worktree_entry::Entity::find()
2519                    .filter(
2520                        Condition::all()
2521                            .add(worktree_entry::Column::ProjectId.eq(project_id))
2522                            .add(worktree_entry::Column::IsDeleted.eq(false)),
2523                    )
2524                    .stream(&*tx)
2525                    .await?;
2526                while let Some(db_entry) = db_entries.next().await {
2527                    let db_entry = db_entry?;
2528                    if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2529                        worktree.entries.push(proto::Entry {
2530                            id: db_entry.id as u64,
2531                            is_dir: db_entry.is_dir,
2532                            path: db_entry.path,
2533                            inode: db_entry.inode as u64,
2534                            mtime: Some(proto::Timestamp {
2535                                seconds: db_entry.mtime_seconds as u64,
2536                                nanos: db_entry.mtime_nanos as u32,
2537                            }),
2538                            is_symlink: db_entry.is_symlink,
2539                            is_ignored: db_entry.is_ignored,
2540                        });
2541                    }
2542                }
2543            }
2544
2545            // Populate worktree diagnostic summaries.
2546            {
2547                let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2548                    .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2549                    .stream(&*tx)
2550                    .await?;
2551                while let Some(db_summary) = db_summaries.next().await {
2552                    let db_summary = db_summary?;
2553                    if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2554                        worktree
2555                            .diagnostic_summaries
2556                            .push(proto::DiagnosticSummary {
2557                                path: db_summary.path,
2558                                language_server_id: db_summary.language_server_id as u64,
2559                                error_count: db_summary.error_count as u32,
2560                                warning_count: db_summary.warning_count as u32,
2561                            });
2562                    }
2563                }
2564            }
2565
2566            // Populate language servers.
2567            let language_servers = project
2568                .find_related(language_server::Entity)
2569                .all(&*tx)
2570                .await?;
2571
2572            let project = Project {
2573                collaborators: collaborators
2574                    .into_iter()
2575                    .map(|collaborator| ProjectCollaborator {
2576                        connection_id: collaborator.connection(),
2577                        user_id: collaborator.user_id,
2578                        replica_id: collaborator.replica_id,
2579                        is_host: collaborator.is_host,
2580                    })
2581                    .collect(),
2582                worktrees,
2583                language_servers: language_servers
2584                    .into_iter()
2585                    .map(|language_server| proto::LanguageServer {
2586                        id: language_server.id as u64,
2587                        name: language_server.name,
2588                    })
2589                    .collect(),
2590            };
2591            Ok((project, replica_id as ReplicaId))
2592        })
2593        .await
2594    }
2595
2596    pub async fn leave_project(
2597        &self,
2598        project_id: ProjectId,
2599        connection: ConnectionId,
2600    ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
2601        let room_id = self.room_id_for_project(project_id).await?;
2602        self.room_transaction(room_id, |tx| async move {
2603            let result = project_collaborator::Entity::delete_many()
2604                .filter(
2605                    Condition::all()
2606                        .add(project_collaborator::Column::ProjectId.eq(project_id))
2607                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2608                        .add(
2609                            project_collaborator::Column::ConnectionServerId
2610                                .eq(connection.owner_id as i32),
2611                        ),
2612                )
2613                .exec(&*tx)
2614                .await?;
2615            if result.rows_affected == 0 {
2616                Err(anyhow!("not a collaborator on this project"))?;
2617            }
2618
2619            let project = project::Entity::find_by_id(project_id)
2620                .one(&*tx)
2621                .await?
2622                .ok_or_else(|| anyhow!("no such project"))?;
2623            let collaborators = project
2624                .find_related(project_collaborator::Entity)
2625                .all(&*tx)
2626                .await?;
2627            let connection_ids = collaborators
2628                .into_iter()
2629                .map(|collaborator| collaborator.connection())
2630                .collect();
2631
2632            follower::Entity::delete_many()
2633                .filter(
2634                    Condition::any()
2635                        .add(
2636                            Condition::all()
2637                                .add(follower::Column::ProjectId.eq(project_id))
2638                                .add(
2639                                    follower::Column::LeaderConnectionServerId
2640                                        .eq(connection.owner_id),
2641                                )
2642                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
2643                        )
2644                        .add(
2645                            Condition::all()
2646                                .add(follower::Column::ProjectId.eq(project_id))
2647                                .add(
2648                                    follower::Column::FollowerConnectionServerId
2649                                        .eq(connection.owner_id),
2650                                )
2651                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
2652                        ),
2653                )
2654                .exec(&*tx)
2655                .await?;
2656
2657            let room = self.get_room(project.room_id, &tx).await?;
2658            let left_project = LeftProject {
2659                id: project_id,
2660                host_user_id: project.host_user_id,
2661                host_connection_id: project.host_connection()?,
2662                connection_ids,
2663            };
2664            Ok((room, left_project))
2665        })
2666        .await
2667    }
2668
2669    pub async fn project_collaborators(
2670        &self,
2671        project_id: ProjectId,
2672        connection_id: ConnectionId,
2673    ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
2674        let room_id = self.room_id_for_project(project_id).await?;
2675        self.room_transaction(room_id, |tx| async move {
2676            let collaborators = project_collaborator::Entity::find()
2677                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2678                .all(&*tx)
2679                .await?
2680                .into_iter()
2681                .map(|collaborator| ProjectCollaborator {
2682                    connection_id: collaborator.connection(),
2683                    user_id: collaborator.user_id,
2684                    replica_id: collaborator.replica_id,
2685                    is_host: collaborator.is_host,
2686                })
2687                .collect::<Vec<_>>();
2688
2689            if collaborators
2690                .iter()
2691                .any(|collaborator| collaborator.connection_id == connection_id)
2692            {
2693                Ok(collaborators)
2694            } else {
2695                Err(anyhow!("no such project"))?
2696            }
2697        })
2698        .await
2699    }
2700
2701    pub async fn project_connection_ids(
2702        &self,
2703        project_id: ProjectId,
2704        connection_id: ConnectionId,
2705    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
2706        let room_id = self.room_id_for_project(project_id).await?;
2707        self.room_transaction(room_id, |tx| async move {
2708            let mut collaborators = project_collaborator::Entity::find()
2709                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2710                .stream(&*tx)
2711                .await?;
2712
2713            let mut connection_ids = HashSet::default();
2714            while let Some(collaborator) = collaborators.next().await {
2715                let collaborator = collaborator?;
2716                connection_ids.insert(collaborator.connection());
2717            }
2718
2719            if connection_ids.contains(&connection_id) {
2720                Ok(connection_ids)
2721            } else {
2722                Err(anyhow!("no such project"))?
2723            }
2724        })
2725        .await
2726    }
2727
2728    async fn project_guest_connection_ids(
2729        &self,
2730        project_id: ProjectId,
2731        tx: &DatabaseTransaction,
2732    ) -> Result<Vec<ConnectionId>> {
2733        let mut collaborators = project_collaborator::Entity::find()
2734            .filter(
2735                project_collaborator::Column::ProjectId
2736                    .eq(project_id)
2737                    .and(project_collaborator::Column::IsHost.eq(false)),
2738            )
2739            .stream(tx)
2740            .await?;
2741
2742        let mut guest_connection_ids = Vec::new();
2743        while let Some(collaborator) = collaborators.next().await {
2744            let collaborator = collaborator?;
2745            guest_connection_ids.push(collaborator.connection());
2746        }
2747        Ok(guest_connection_ids)
2748    }
2749
2750    async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
2751        self.transaction(|tx| async move {
2752            let project = project::Entity::find_by_id(project_id)
2753                .one(&*tx)
2754                .await?
2755                .ok_or_else(|| anyhow!("project {} not found", project_id))?;
2756            Ok(project.room_id)
2757        })
2758        .await
2759    }
2760
2761    // access tokens
2762
2763    pub async fn create_access_token(
2764        &self,
2765        user_id: UserId,
2766        access_token_hash: &str,
2767        max_access_token_count: usize,
2768    ) -> Result<AccessTokenId> {
2769        self.transaction(|tx| async {
2770            let tx = tx;
2771
2772            let token = access_token::ActiveModel {
2773                user_id: ActiveValue::set(user_id),
2774                hash: ActiveValue::set(access_token_hash.into()),
2775                ..Default::default()
2776            }
2777            .insert(&*tx)
2778            .await?;
2779
2780            access_token::Entity::delete_many()
2781                .filter(
2782                    access_token::Column::Id.in_subquery(
2783                        Query::select()
2784                            .column(access_token::Column::Id)
2785                            .from(access_token::Entity)
2786                            .and_where(access_token::Column::UserId.eq(user_id))
2787                            .order_by(access_token::Column::Id, sea_orm::Order::Desc)
2788                            .limit(10000)
2789                            .offset(max_access_token_count as u64)
2790                            .to_owned(),
2791                    ),
2792                )
2793                .exec(&*tx)
2794                .await?;
2795            Ok(token.id)
2796        })
2797        .await
2798    }
2799
2800    pub async fn get_access_token(
2801        &self,
2802        access_token_id: AccessTokenId,
2803    ) -> Result<access_token::Model> {
2804        self.transaction(|tx| async move {
2805            Ok(access_token::Entity::find_by_id(access_token_id)
2806                .one(&*tx)
2807                .await?
2808                .ok_or_else(|| anyhow!("no such access token"))?)
2809        })
2810        .await
2811    }
2812
2813    async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
2814    where
2815        F: Send + Fn(TransactionHandle) -> Fut,
2816        Fut: Send + Future<Output = Result<T>>,
2817    {
2818        let body = async {
2819            let mut i = 0;
2820            loop {
2821                let (tx, result) = self.with_transaction(&f).await?;
2822                match result {
2823                    Ok(result) => match tx.commit().await.map_err(Into::into) {
2824                        Ok(()) => return Ok(result),
2825                        Err(error) => {
2826                            if !self.retry_on_serialization_error(&error, i).await {
2827                                return Err(error);
2828                            }
2829                        }
2830                    },
2831                    Err(error) => {
2832                        tx.rollback().await?;
2833                        if !self.retry_on_serialization_error(&error, i).await {
2834                            return Err(error);
2835                        }
2836                    }
2837                }
2838                i += 1;
2839            }
2840        };
2841
2842        self.run(body).await
2843    }
2844
2845    async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
2846    where
2847        F: Send + Fn(TransactionHandle) -> Fut,
2848        Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
2849    {
2850        let body = async {
2851            let mut i = 0;
2852            loop {
2853                let (tx, result) = self.with_transaction(&f).await?;
2854                match result {
2855                    Ok(Some((room_id, data))) => {
2856                        let lock = self.rooms.entry(room_id).or_default().clone();
2857                        let _guard = lock.lock_owned().await;
2858                        match tx.commit().await.map_err(Into::into) {
2859                            Ok(()) => {
2860                                return Ok(Some(RoomGuard {
2861                                    data,
2862                                    _guard,
2863                                    _not_send: PhantomData,
2864                                }));
2865                            }
2866                            Err(error) => {
2867                                if !self.retry_on_serialization_error(&error, i).await {
2868                                    return Err(error);
2869                                }
2870                            }
2871                        }
2872                    }
2873                    Ok(None) => match tx.commit().await.map_err(Into::into) {
2874                        Ok(()) => return Ok(None),
2875                        Err(error) => {
2876                            if !self.retry_on_serialization_error(&error, i).await {
2877                                return Err(error);
2878                            }
2879                        }
2880                    },
2881                    Err(error) => {
2882                        tx.rollback().await?;
2883                        if !self.retry_on_serialization_error(&error, i).await {
2884                            return Err(error);
2885                        }
2886                    }
2887                }
2888                i += 1;
2889            }
2890        };
2891
2892        self.run(body).await
2893    }
2894
2895    async fn room_transaction<F, Fut, T>(&self, room_id: RoomId, f: F) -> Result<RoomGuard<T>>
2896    where
2897        F: Send + Fn(TransactionHandle) -> Fut,
2898        Fut: Send + Future<Output = Result<T>>,
2899    {
2900        let body = async {
2901            let mut i = 0;
2902            loop {
2903                let lock = self.rooms.entry(room_id).or_default().clone();
2904                let _guard = lock.lock_owned().await;
2905                let (tx, result) = self.with_transaction(&f).await?;
2906                match result {
2907                    Ok(data) => match tx.commit().await.map_err(Into::into) {
2908                        Ok(()) => {
2909                            return Ok(RoomGuard {
2910                                data,
2911                                _guard,
2912                                _not_send: PhantomData,
2913                            });
2914                        }
2915                        Err(error) => {
2916                            if !self.retry_on_serialization_error(&error, i).await {
2917                                return Err(error);
2918                            }
2919                        }
2920                    },
2921                    Err(error) => {
2922                        tx.rollback().await?;
2923                        if !self.retry_on_serialization_error(&error, i).await {
2924                            return Err(error);
2925                        }
2926                    }
2927                }
2928                i += 1;
2929            }
2930        };
2931
2932        self.run(body).await
2933    }
2934
2935    async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
2936    where
2937        F: Send + Fn(TransactionHandle) -> Fut,
2938        Fut: Send + Future<Output = Result<T>>,
2939    {
2940        let tx = self
2941            .pool
2942            .begin_with_config(Some(IsolationLevel::Serializable), None)
2943            .await?;
2944
2945        let mut tx = Arc::new(Some(tx));
2946        let result = f(TransactionHandle(tx.clone())).await;
2947        let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
2948            return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
2949        };
2950
2951        Ok((tx, result))
2952    }
2953
2954    async fn run<F, T>(&self, future: F) -> Result<T>
2955    where
2956        F: Future<Output = Result<T>>,
2957    {
2958        #[cfg(test)]
2959        {
2960            if let Executor::Deterministic(executor) = &self.executor {
2961                executor.simulate_random_delay().await;
2962            }
2963
2964            self.runtime.as_ref().unwrap().block_on(future)
2965        }
2966
2967        #[cfg(not(test))]
2968        {
2969            future.await
2970        }
2971    }
2972
2973    async fn retry_on_serialization_error(&self, error: &Error, prev_attempt_count: u32) -> bool {
2974        // If the error is due to a failure to serialize concurrent transactions, then retry
2975        // this transaction after a delay. With each subsequent retry, double the delay duration.
2976        // Also vary the delay randomly in order to ensure different database connections retry
2977        // at different times.
2978        if is_serialization_error(error) {
2979            let base_delay = 4_u64 << prev_attempt_count.min(16);
2980            let randomized_delay = base_delay as f32 * self.rng.lock().await.gen_range(0.5..=2.0);
2981            log::info!(
2982                "retrying transaction after serialization error. delay: {} ms.",
2983                randomized_delay
2984            );
2985            self.executor
2986                .sleep(Duration::from_millis(randomized_delay as u64))
2987                .await;
2988            true
2989        } else {
2990            false
2991        }
2992    }
2993}
2994
2995fn is_serialization_error(error: &Error) -> bool {
2996    const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
2997    match error {
2998        Error::Database(
2999            DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
3000            | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
3001        ) if error
3002            .as_database_error()
3003            .and_then(|error| error.code())
3004            .as_deref()
3005            == Some(SERIALIZATION_FAILURE_CODE) =>
3006        {
3007            true
3008        }
3009        _ => false,
3010    }
3011}
3012
3013struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
3014
3015impl Deref for TransactionHandle {
3016    type Target = DatabaseTransaction;
3017
3018    fn deref(&self) -> &Self::Target {
3019        self.0.as_ref().as_ref().unwrap()
3020    }
3021}
3022
3023pub struct RoomGuard<T> {
3024    data: T,
3025    _guard: OwnedMutexGuard<()>,
3026    _not_send: PhantomData<Rc<()>>,
3027}
3028
3029impl<T> Deref for RoomGuard<T> {
3030    type Target = T;
3031
3032    fn deref(&self) -> &T {
3033        &self.data
3034    }
3035}
3036
3037impl<T> DerefMut for RoomGuard<T> {
3038    fn deref_mut(&mut self) -> &mut T {
3039        &mut self.data
3040    }
3041}
3042
3043#[derive(Debug, Serialize, Deserialize)]
3044pub struct NewUserParams {
3045    pub github_login: String,
3046    pub github_user_id: i32,
3047    pub invite_count: i32,
3048}
3049
3050#[derive(Debug)]
3051pub struct NewUserResult {
3052    pub user_id: UserId,
3053    pub metrics_id: String,
3054    pub inviting_user_id: Option<UserId>,
3055    pub signup_device_id: Option<String>,
3056}
3057
3058fn random_invite_code() -> String {
3059    nanoid::nanoid!(16)
3060}
3061
3062fn random_email_confirmation_code() -> String {
3063    nanoid::nanoid!(64)
3064}
3065
3066macro_rules! id_type {
3067    ($name:ident) => {
3068        #[derive(
3069            Clone,
3070            Copy,
3071            Debug,
3072            Default,
3073            PartialEq,
3074            Eq,
3075            PartialOrd,
3076            Ord,
3077            Hash,
3078            Serialize,
3079            Deserialize,
3080        )]
3081        #[serde(transparent)]
3082        pub struct $name(pub i32);
3083
3084        impl $name {
3085            #[allow(unused)]
3086            pub const MAX: Self = Self(i32::MAX);
3087
3088            #[allow(unused)]
3089            pub fn from_proto(value: u64) -> Self {
3090                Self(value as i32)
3091            }
3092
3093            #[allow(unused)]
3094            pub fn to_proto(self) -> u64 {
3095                self.0 as u64
3096            }
3097        }
3098
3099        impl std::fmt::Display for $name {
3100            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
3101                self.0.fmt(f)
3102            }
3103        }
3104
3105        impl From<$name> for sea_query::Value {
3106            fn from(value: $name) -> Self {
3107                sea_query::Value::Int(Some(value.0))
3108            }
3109        }
3110
3111        impl sea_orm::TryGetable for $name {
3112            fn try_get(
3113                res: &sea_orm::QueryResult,
3114                pre: &str,
3115                col: &str,
3116            ) -> Result<Self, sea_orm::TryGetError> {
3117                Ok(Self(i32::try_get(res, pre, col)?))
3118            }
3119        }
3120
3121        impl sea_query::ValueType for $name {
3122            fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
3123                match v {
3124                    Value::TinyInt(Some(int)) => {
3125                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3126                    }
3127                    Value::SmallInt(Some(int)) => {
3128                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3129                    }
3130                    Value::Int(Some(int)) => {
3131                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3132                    }
3133                    Value::BigInt(Some(int)) => {
3134                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3135                    }
3136                    Value::TinyUnsigned(Some(int)) => {
3137                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3138                    }
3139                    Value::SmallUnsigned(Some(int)) => {
3140                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3141                    }
3142                    Value::Unsigned(Some(int)) => {
3143                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3144                    }
3145                    Value::BigUnsigned(Some(int)) => {
3146                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3147                    }
3148                    _ => Err(sea_query::ValueTypeErr),
3149                }
3150            }
3151
3152            fn type_name() -> String {
3153                stringify!($name).into()
3154            }
3155
3156            fn array_type() -> sea_query::ArrayType {
3157                sea_query::ArrayType::Int
3158            }
3159
3160            fn column_type() -> sea_query::ColumnType {
3161                sea_query::ColumnType::Integer(None)
3162            }
3163        }
3164
3165        impl sea_orm::TryFromU64 for $name {
3166            fn try_from_u64(n: u64) -> Result<Self, DbErr> {
3167                Ok(Self(n.try_into().map_err(|_| {
3168                    DbErr::ConvertFromU64(concat!(
3169                        "error converting ",
3170                        stringify!($name),
3171                        " to u64"
3172                    ))
3173                })?))
3174            }
3175        }
3176
3177        impl sea_query::Nullable for $name {
3178            fn null() -> Value {
3179                Value::Int(None)
3180            }
3181        }
3182    };
3183}
3184
3185id_type!(AccessTokenId);
3186id_type!(ContactId);
3187id_type!(FollowerId);
3188id_type!(RoomId);
3189id_type!(RoomParticipantId);
3190id_type!(ProjectId);
3191id_type!(ProjectCollaboratorId);
3192id_type!(ReplicaId);
3193id_type!(ServerId);
3194id_type!(SignupId);
3195id_type!(UserId);
3196
3197pub struct RejoinedRoom {
3198    pub room: proto::Room,
3199    pub rejoined_projects: Vec<RejoinedProject>,
3200    pub reshared_projects: Vec<ResharedProject>,
3201}
3202
3203pub struct ResharedProject {
3204    pub id: ProjectId,
3205    pub old_connection_id: ConnectionId,
3206    pub collaborators: Vec<ProjectCollaborator>,
3207    pub worktrees: Vec<proto::WorktreeMetadata>,
3208}
3209
3210pub struct RejoinedProject {
3211    pub id: ProjectId,
3212    pub old_connection_id: ConnectionId,
3213    pub collaborators: Vec<ProjectCollaborator>,
3214    pub worktrees: Vec<RejoinedWorktree>,
3215    pub language_servers: Vec<proto::LanguageServer>,
3216}
3217
3218#[derive(Debug)]
3219pub struct RejoinedWorktree {
3220    pub id: u64,
3221    pub abs_path: String,
3222    pub root_name: String,
3223    pub visible: bool,
3224    pub updated_entries: Vec<proto::Entry>,
3225    pub removed_entries: Vec<u64>,
3226    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3227    pub scan_id: u64,
3228    pub completed_scan_id: u64,
3229}
3230
3231pub struct LeftRoom {
3232    pub room: proto::Room,
3233    pub left_projects: HashMap<ProjectId, LeftProject>,
3234    pub canceled_calls_to_user_ids: Vec<UserId>,
3235}
3236
3237pub struct RefreshedRoom {
3238    pub room: proto::Room,
3239    pub stale_participant_user_ids: Vec<UserId>,
3240    pub canceled_calls_to_user_ids: Vec<UserId>,
3241}
3242
3243pub struct Project {
3244    pub collaborators: Vec<ProjectCollaborator>,
3245    pub worktrees: BTreeMap<u64, Worktree>,
3246    pub language_servers: Vec<proto::LanguageServer>,
3247}
3248
3249pub struct ProjectCollaborator {
3250    pub connection_id: ConnectionId,
3251    pub user_id: UserId,
3252    pub replica_id: ReplicaId,
3253    pub is_host: bool,
3254}
3255
3256impl ProjectCollaborator {
3257    pub fn to_proto(&self) -> proto::Collaborator {
3258        proto::Collaborator {
3259            peer_id: Some(self.connection_id.into()),
3260            replica_id: self.replica_id.0 as u32,
3261            user_id: self.user_id.to_proto(),
3262        }
3263    }
3264}
3265
3266#[derive(Debug)]
3267pub struct LeftProject {
3268    pub id: ProjectId,
3269    pub host_user_id: UserId,
3270    pub host_connection_id: ConnectionId,
3271    pub connection_ids: Vec<ConnectionId>,
3272}
3273
3274pub struct Worktree {
3275    pub id: u64,
3276    pub abs_path: String,
3277    pub root_name: String,
3278    pub visible: bool,
3279    pub entries: Vec<proto::Entry>,
3280    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3281    pub scan_id: u64,
3282    pub completed_scan_id: u64,
3283}
3284
3285#[cfg(test)]
3286pub use test::*;
3287
3288#[cfg(test)]
3289mod test {
3290    use super::*;
3291    use gpui::executor::Background;
3292    use lazy_static::lazy_static;
3293    use parking_lot::Mutex;
3294    use sea_orm::ConnectionTrait;
3295    use sqlx::migrate::MigrateDatabase;
3296    use std::sync::Arc;
3297
3298    pub struct TestDb {
3299        pub db: Option<Arc<Database>>,
3300        pub connection: Option<sqlx::AnyConnection>,
3301    }
3302
3303    impl TestDb {
3304        pub fn sqlite(background: Arc<Background>) -> Self {
3305            let url = format!("sqlite::memory:");
3306            let runtime = tokio::runtime::Builder::new_current_thread()
3307                .enable_io()
3308                .enable_time()
3309                .build()
3310                .unwrap();
3311
3312            let mut db = runtime.block_on(async {
3313                let mut options = ConnectOptions::new(url);
3314                options.max_connections(5);
3315                let db = Database::new(options, Executor::Deterministic(background))
3316                    .await
3317                    .unwrap();
3318                let sql = include_str!(concat!(
3319                    env!("CARGO_MANIFEST_DIR"),
3320                    "/migrations.sqlite/20221109000000_test_schema.sql"
3321                ));
3322                db.pool
3323                    .execute(sea_orm::Statement::from_string(
3324                        db.pool.get_database_backend(),
3325                        sql.into(),
3326                    ))
3327                    .await
3328                    .unwrap();
3329                db
3330            });
3331
3332            db.runtime = Some(runtime);
3333
3334            Self {
3335                db: Some(Arc::new(db)),
3336                connection: None,
3337            }
3338        }
3339
3340        pub fn postgres(background: Arc<Background>) -> Self {
3341            lazy_static! {
3342                static ref LOCK: Mutex<()> = Mutex::new(());
3343            }
3344
3345            let _guard = LOCK.lock();
3346            let mut rng = StdRng::from_entropy();
3347            let url = format!(
3348                "postgres://postgres@localhost/zed-test-{}",
3349                rng.gen::<u128>()
3350            );
3351            let runtime = tokio::runtime::Builder::new_current_thread()
3352                .enable_io()
3353                .enable_time()
3354                .build()
3355                .unwrap();
3356
3357            let mut db = runtime.block_on(async {
3358                sqlx::Postgres::create_database(&url)
3359                    .await
3360                    .expect("failed to create test db");
3361                let mut options = ConnectOptions::new(url);
3362                options
3363                    .max_connections(5)
3364                    .idle_timeout(Duration::from_secs(0));
3365                let db = Database::new(options, Executor::Deterministic(background))
3366                    .await
3367                    .unwrap();
3368                let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
3369                db.migrate(Path::new(migrations_path), false).await.unwrap();
3370                db
3371            });
3372
3373            db.runtime = Some(runtime);
3374
3375            Self {
3376                db: Some(Arc::new(db)),
3377                connection: None,
3378            }
3379        }
3380
3381        pub fn db(&self) -> &Arc<Database> {
3382            self.db.as_ref().unwrap()
3383        }
3384    }
3385
3386    impl Drop for TestDb {
3387        fn drop(&mut self) {
3388            let db = self.db.take().unwrap();
3389            if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
3390                db.runtime.as_ref().unwrap().block_on(async {
3391                    use util::ResultExt;
3392                    let query = "
3393                        SELECT pg_terminate_backend(pg_stat_activity.pid)
3394                        FROM pg_stat_activity
3395                        WHERE
3396                            pg_stat_activity.datname = current_database() AND
3397                            pid <> pg_backend_pid();
3398                    ";
3399                    db.pool
3400                        .execute(sea_orm::Statement::from_string(
3401                            db.pool.get_database_backend(),
3402                            query.into(),
3403                        ))
3404                        .await
3405                        .log_err();
3406                    sqlx::Postgres::drop_database(db.options.get_url())
3407                        .await
3408                        .log_err();
3409                })
3410            }
3411        }
3412    }
3413}