db.rs

   1mod access_token;
   2mod contact;
   3mod follower;
   4mod language_server;
   5mod project;
   6mod project_collaborator;
   7mod room;
   8mod room_participant;
   9mod server;
  10mod signup;
  11#[cfg(test)]
  12mod tests;
  13mod user;
  14mod worktree;
  15mod worktree_diagnostic_summary;
  16mod worktree_entry;
  17
  18use crate::executor::Executor;
  19use crate::{Error, Result};
  20use anyhow::anyhow;
  21use collections::{BTreeMap, HashMap, HashSet};
  22pub use contact::Contact;
  23use dashmap::DashMap;
  24use futures::StreamExt;
  25use hyper::StatusCode;
  26use rand::prelude::StdRng;
  27use rand::{Rng, SeedableRng};
  28use rpc::{proto, ConnectionId};
  29use sea_orm::Condition;
  30pub use sea_orm::ConnectOptions;
  31use sea_orm::{
  32    entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
  33    DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
  34    Statement, TransactionTrait,
  35};
  36use sea_query::{Alias, Expr, OnConflict, Query};
  37use serde::{Deserialize, Serialize};
  38pub use signup::{Invite, NewSignup, WaitlistSummary};
  39use sqlx::migrate::{Migrate, Migration, MigrationSource};
  40use sqlx::Connection;
  41use std::ops::{Deref, DerefMut};
  42use std::path::Path;
  43use std::time::Duration;
  44use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
  45use tokio::sync::{Mutex, OwnedMutexGuard};
  46pub use user::Model as User;
  47
  48pub struct Database {
  49    options: ConnectOptions,
  50    pool: DatabaseConnection,
  51    rooms: DashMap<RoomId, Arc<Mutex<()>>>,
  52    rng: Mutex<StdRng>,
  53    executor: Executor,
  54    #[cfg(test)]
  55    runtime: Option<tokio::runtime::Runtime>,
  56}
  57
  58impl Database {
  59    pub async fn new(options: ConnectOptions, executor: Executor) -> Result<Self> {
  60        Ok(Self {
  61            options: options.clone(),
  62            pool: sea_orm::Database::connect(options).await?,
  63            rooms: DashMap::with_capacity(16384),
  64            rng: Mutex::new(StdRng::seed_from_u64(0)),
  65            executor,
  66            #[cfg(test)]
  67            runtime: None,
  68        })
  69    }
  70
  71    #[cfg(test)]
  72    pub fn reset(&self) {
  73        self.rooms.clear();
  74    }
  75
  76    pub async fn migrate(
  77        &self,
  78        migrations_path: &Path,
  79        ignore_checksum_mismatch: bool,
  80    ) -> anyhow::Result<Vec<(Migration, Duration)>> {
  81        let migrations = MigrationSource::resolve(migrations_path)
  82            .await
  83            .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
  84
  85        let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
  86
  87        connection.ensure_migrations_table().await?;
  88        let applied_migrations: HashMap<_, _> = connection
  89            .list_applied_migrations()
  90            .await?
  91            .into_iter()
  92            .map(|m| (m.version, m))
  93            .collect();
  94
  95        let mut new_migrations = Vec::new();
  96        for migration in migrations {
  97            match applied_migrations.get(&migration.version) {
  98                Some(applied_migration) => {
  99                    if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
 100                    {
 101                        Err(anyhow!(
 102                            "checksum mismatch for applied migration {}",
 103                            migration.description
 104                        ))?;
 105                    }
 106                }
 107                None => {
 108                    let elapsed = connection.apply(&migration).await?;
 109                    new_migrations.push((migration, elapsed));
 110                }
 111            }
 112        }
 113
 114        Ok(new_migrations)
 115    }
 116
 117    pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
 118        self.transaction(|tx| async move {
 119            let server = server::ActiveModel {
 120                environment: ActiveValue::set(environment.into()),
 121                ..Default::default()
 122            }
 123            .insert(&*tx)
 124            .await?;
 125            Ok(server.id)
 126        })
 127        .await
 128    }
 129
 130    pub async fn stale_room_ids(
 131        &self,
 132        environment: &str,
 133        new_server_id: ServerId,
 134    ) -> Result<Vec<RoomId>> {
 135        self.transaction(|tx| async move {
 136            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 137            enum QueryAs {
 138                RoomId,
 139            }
 140
 141            let stale_server_epochs = self
 142                .stale_server_ids(environment, new_server_id, &tx)
 143                .await?;
 144            Ok(room_participant::Entity::find()
 145                .select_only()
 146                .column(room_participant::Column::RoomId)
 147                .distinct()
 148                .filter(
 149                    room_participant::Column::AnsweringConnectionServerId
 150                        .is_in(stale_server_epochs),
 151                )
 152                .into_values::<_, QueryAs>()
 153                .all(&*tx)
 154                .await?)
 155        })
 156        .await
 157    }
 158
 159    pub async fn refresh_room(
 160        &self,
 161        room_id: RoomId,
 162        new_server_id: ServerId,
 163    ) -> Result<RoomGuard<RefreshedRoom>> {
 164        self.room_transaction(room_id, |tx| async move {
 165            let stale_participant_filter = Condition::all()
 166                .add(room_participant::Column::RoomId.eq(room_id))
 167                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
 168                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
 169
 170            let stale_participant_user_ids = room_participant::Entity::find()
 171                .filter(stale_participant_filter.clone())
 172                .all(&*tx)
 173                .await?
 174                .into_iter()
 175                .map(|participant| participant.user_id)
 176                .collect::<Vec<_>>();
 177
 178            // Delete participants who failed to reconnect.
 179            room_participant::Entity::delete_many()
 180                .filter(stale_participant_filter)
 181                .exec(&*tx)
 182                .await?;
 183
 184            let room = self.get_room(room_id, &tx).await?;
 185            let mut canceled_calls_to_user_ids = Vec::new();
 186            // Delete the room if it becomes empty and cancel pending calls.
 187            if room.participants.is_empty() {
 188                canceled_calls_to_user_ids.extend(
 189                    room.pending_participants
 190                        .iter()
 191                        .map(|pending_participant| UserId::from_proto(pending_participant.user_id)),
 192                );
 193                room_participant::Entity::delete_many()
 194                    .filter(room_participant::Column::RoomId.eq(room_id))
 195                    .exec(&*tx)
 196                    .await?;
 197                project::Entity::delete_many()
 198                    .filter(project::Column::RoomId.eq(room_id))
 199                    .exec(&*tx)
 200                    .await?;
 201                room::Entity::delete_by_id(room_id).exec(&*tx).await?;
 202            }
 203
 204            Ok(RefreshedRoom {
 205                room,
 206                stale_participant_user_ids,
 207                canceled_calls_to_user_ids,
 208            })
 209        })
 210        .await
 211    }
 212
 213    pub async fn delete_stale_servers(
 214        &self,
 215        environment: &str,
 216        new_server_id: ServerId,
 217    ) -> Result<()> {
 218        self.transaction(|tx| async move {
 219            server::Entity::delete_many()
 220                .filter(
 221                    Condition::all()
 222                        .add(server::Column::Environment.eq(environment))
 223                        .add(server::Column::Id.ne(new_server_id)),
 224                )
 225                .exec(&*tx)
 226                .await?;
 227            Ok(())
 228        })
 229        .await
 230    }
 231
 232    async fn stale_server_ids(
 233        &self,
 234        environment: &str,
 235        new_server_id: ServerId,
 236        tx: &DatabaseTransaction,
 237    ) -> Result<Vec<ServerId>> {
 238        let stale_servers = server::Entity::find()
 239            .filter(
 240                Condition::all()
 241                    .add(server::Column::Environment.eq(environment))
 242                    .add(server::Column::Id.ne(new_server_id)),
 243            )
 244            .all(&*tx)
 245            .await?;
 246        Ok(stale_servers.into_iter().map(|server| server.id).collect())
 247    }
 248
 249    // users
 250
 251    pub async fn create_user(
 252        &self,
 253        email_address: &str,
 254        admin: bool,
 255        params: NewUserParams,
 256    ) -> Result<NewUserResult> {
 257        self.transaction(|tx| async {
 258            let tx = tx;
 259            let user = user::Entity::insert(user::ActiveModel {
 260                email_address: ActiveValue::set(Some(email_address.into())),
 261                github_login: ActiveValue::set(params.github_login.clone()),
 262                github_user_id: ActiveValue::set(Some(params.github_user_id)),
 263                admin: ActiveValue::set(admin),
 264                metrics_id: ActiveValue::set(Uuid::new_v4()),
 265                ..Default::default()
 266            })
 267            .on_conflict(
 268                OnConflict::column(user::Column::GithubLogin)
 269                    .update_column(user::Column::GithubLogin)
 270                    .to_owned(),
 271            )
 272            .exec_with_returning(&*tx)
 273            .await?;
 274
 275            Ok(NewUserResult {
 276                user_id: user.id,
 277                metrics_id: user.metrics_id.to_string(),
 278                signup_device_id: None,
 279                inviting_user_id: None,
 280            })
 281        })
 282        .await
 283    }
 284
 285    pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
 286        self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
 287            .await
 288    }
 289
 290    pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
 291        self.transaction(|tx| async {
 292            let tx = tx;
 293            Ok(user::Entity::find()
 294                .filter(user::Column::Id.is_in(ids.iter().copied()))
 295                .all(&*tx)
 296                .await?)
 297        })
 298        .await
 299    }
 300
 301    pub async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>> {
 302        self.transaction(|tx| async move {
 303            Ok(user::Entity::find()
 304                .filter(user::Column::GithubLogin.eq(github_login))
 305                .one(&*tx)
 306                .await?)
 307        })
 308        .await
 309    }
 310
 311    pub async fn get_or_create_user_by_github_account(
 312        &self,
 313        github_login: &str,
 314        github_user_id: Option<i32>,
 315        github_email: Option<&str>,
 316    ) -> Result<Option<User>> {
 317        self.transaction(|tx| async move {
 318            let tx = &*tx;
 319            if let Some(github_user_id) = github_user_id {
 320                if let Some(user_by_github_user_id) = user::Entity::find()
 321                    .filter(user::Column::GithubUserId.eq(github_user_id))
 322                    .one(tx)
 323                    .await?
 324                {
 325                    let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
 326                    user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
 327                    Ok(Some(user_by_github_user_id.update(tx).await?))
 328                } else if let Some(user_by_github_login) = user::Entity::find()
 329                    .filter(user::Column::GithubLogin.eq(github_login))
 330                    .one(tx)
 331                    .await?
 332                {
 333                    let mut user_by_github_login = user_by_github_login.into_active_model();
 334                    user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
 335                    Ok(Some(user_by_github_login.update(tx).await?))
 336                } else {
 337                    let user = user::Entity::insert(user::ActiveModel {
 338                        email_address: ActiveValue::set(github_email.map(|email| email.into())),
 339                        github_login: ActiveValue::set(github_login.into()),
 340                        github_user_id: ActiveValue::set(Some(github_user_id)),
 341                        admin: ActiveValue::set(false),
 342                        invite_count: ActiveValue::set(0),
 343                        invite_code: ActiveValue::set(None),
 344                        metrics_id: ActiveValue::set(Uuid::new_v4()),
 345                        ..Default::default()
 346                    })
 347                    .exec_with_returning(&*tx)
 348                    .await?;
 349                    Ok(Some(user))
 350                }
 351            } else {
 352                Ok(user::Entity::find()
 353                    .filter(user::Column::GithubLogin.eq(github_login))
 354                    .one(tx)
 355                    .await?)
 356            }
 357        })
 358        .await
 359    }
 360
 361    pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
 362        self.transaction(|tx| async move {
 363            Ok(user::Entity::find()
 364                .order_by_asc(user::Column::GithubLogin)
 365                .limit(limit as u64)
 366                .offset(page as u64 * limit as u64)
 367                .all(&*tx)
 368                .await?)
 369        })
 370        .await
 371    }
 372
 373    pub async fn get_users_with_no_invites(
 374        &self,
 375        invited_by_another_user: bool,
 376    ) -> Result<Vec<User>> {
 377        self.transaction(|tx| async move {
 378            Ok(user::Entity::find()
 379                .filter(
 380                    user::Column::InviteCount
 381                        .eq(0)
 382                        .and(if invited_by_another_user {
 383                            user::Column::InviterId.is_not_null()
 384                        } else {
 385                            user::Column::InviterId.is_null()
 386                        }),
 387                )
 388                .all(&*tx)
 389                .await?)
 390        })
 391        .await
 392    }
 393
 394    pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
 395        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 396        enum QueryAs {
 397            MetricsId,
 398        }
 399
 400        self.transaction(|tx| async move {
 401            let metrics_id: Uuid = user::Entity::find_by_id(id)
 402                .select_only()
 403                .column(user::Column::MetricsId)
 404                .into_values::<_, QueryAs>()
 405                .one(&*tx)
 406                .await?
 407                .ok_or_else(|| anyhow!("could not find user"))?;
 408            Ok(metrics_id.to_string())
 409        })
 410        .await
 411    }
 412
 413    pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
 414        self.transaction(|tx| async move {
 415            user::Entity::update_many()
 416                .filter(user::Column::Id.eq(id))
 417                .set(user::ActiveModel {
 418                    admin: ActiveValue::set(is_admin),
 419                    ..Default::default()
 420                })
 421                .exec(&*tx)
 422                .await?;
 423            Ok(())
 424        })
 425        .await
 426    }
 427
 428    pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
 429        self.transaction(|tx| async move {
 430            user::Entity::update_many()
 431                .filter(user::Column::Id.eq(id))
 432                .set(user::ActiveModel {
 433                    connected_once: ActiveValue::set(connected_once),
 434                    ..Default::default()
 435                })
 436                .exec(&*tx)
 437                .await?;
 438            Ok(())
 439        })
 440        .await
 441    }
 442
 443    pub async fn destroy_user(&self, id: UserId) -> Result<()> {
 444        self.transaction(|tx| async move {
 445            access_token::Entity::delete_many()
 446                .filter(access_token::Column::UserId.eq(id))
 447                .exec(&*tx)
 448                .await?;
 449            user::Entity::delete_by_id(id).exec(&*tx).await?;
 450            Ok(())
 451        })
 452        .await
 453    }
 454
 455    // contacts
 456
 457    pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
 458        #[derive(Debug, FromQueryResult)]
 459        struct ContactWithUserBusyStatuses {
 460            user_id_a: UserId,
 461            user_id_b: UserId,
 462            a_to_b: bool,
 463            accepted: bool,
 464            should_notify: bool,
 465            user_a_busy: bool,
 466            user_b_busy: bool,
 467        }
 468
 469        self.transaction(|tx| async move {
 470            let user_a_participant = Alias::new("user_a_participant");
 471            let user_b_participant = Alias::new("user_b_participant");
 472            let mut db_contacts = contact::Entity::find()
 473                .column_as(
 474                    Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
 475                        .is_not_null(),
 476                    "user_a_busy",
 477                )
 478                .column_as(
 479                    Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
 480                        .is_not_null(),
 481                    "user_b_busy",
 482                )
 483                .filter(
 484                    contact::Column::UserIdA
 485                        .eq(user_id)
 486                        .or(contact::Column::UserIdB.eq(user_id)),
 487                )
 488                .join_as(
 489                    JoinType::LeftJoin,
 490                    contact::Relation::UserARoomParticipant.def(),
 491                    user_a_participant,
 492                )
 493                .join_as(
 494                    JoinType::LeftJoin,
 495                    contact::Relation::UserBRoomParticipant.def(),
 496                    user_b_participant,
 497                )
 498                .into_model::<ContactWithUserBusyStatuses>()
 499                .stream(&*tx)
 500                .await?;
 501
 502            let mut contacts = Vec::new();
 503            while let Some(db_contact) = db_contacts.next().await {
 504                let db_contact = db_contact?;
 505                if db_contact.user_id_a == user_id {
 506                    if db_contact.accepted {
 507                        contacts.push(Contact::Accepted {
 508                            user_id: db_contact.user_id_b,
 509                            should_notify: db_contact.should_notify && db_contact.a_to_b,
 510                            busy: db_contact.user_b_busy,
 511                        });
 512                    } else if db_contact.a_to_b {
 513                        contacts.push(Contact::Outgoing {
 514                            user_id: db_contact.user_id_b,
 515                        })
 516                    } else {
 517                        contacts.push(Contact::Incoming {
 518                            user_id: db_contact.user_id_b,
 519                            should_notify: db_contact.should_notify,
 520                        });
 521                    }
 522                } else if db_contact.accepted {
 523                    contacts.push(Contact::Accepted {
 524                        user_id: db_contact.user_id_a,
 525                        should_notify: db_contact.should_notify && !db_contact.a_to_b,
 526                        busy: db_contact.user_a_busy,
 527                    });
 528                } else if db_contact.a_to_b {
 529                    contacts.push(Contact::Incoming {
 530                        user_id: db_contact.user_id_a,
 531                        should_notify: db_contact.should_notify,
 532                    });
 533                } else {
 534                    contacts.push(Contact::Outgoing {
 535                        user_id: db_contact.user_id_a,
 536                    });
 537                }
 538            }
 539
 540            contacts.sort_unstable_by_key(|contact| contact.user_id());
 541
 542            Ok(contacts)
 543        })
 544        .await
 545    }
 546
 547    pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
 548        self.transaction(|tx| async move {
 549            let participant = room_participant::Entity::find()
 550                .filter(room_participant::Column::UserId.eq(user_id))
 551                .one(&*tx)
 552                .await?;
 553            Ok(participant.is_some())
 554        })
 555        .await
 556    }
 557
 558    pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
 559        self.transaction(|tx| async move {
 560            let (id_a, id_b) = if user_id_1 < user_id_2 {
 561                (user_id_1, user_id_2)
 562            } else {
 563                (user_id_2, user_id_1)
 564            };
 565
 566            Ok(contact::Entity::find()
 567                .filter(
 568                    contact::Column::UserIdA
 569                        .eq(id_a)
 570                        .and(contact::Column::UserIdB.eq(id_b))
 571                        .and(contact::Column::Accepted.eq(true)),
 572                )
 573                .one(&*tx)
 574                .await?
 575                .is_some())
 576        })
 577        .await
 578    }
 579
 580    pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
 581        self.transaction(|tx| async move {
 582            let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
 583                (sender_id, receiver_id, true)
 584            } else {
 585                (receiver_id, sender_id, false)
 586            };
 587
 588            let rows_affected = contact::Entity::insert(contact::ActiveModel {
 589                user_id_a: ActiveValue::set(id_a),
 590                user_id_b: ActiveValue::set(id_b),
 591                a_to_b: ActiveValue::set(a_to_b),
 592                accepted: ActiveValue::set(false),
 593                should_notify: ActiveValue::set(true),
 594                ..Default::default()
 595            })
 596            .on_conflict(
 597                OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
 598                    .values([
 599                        (contact::Column::Accepted, true.into()),
 600                        (contact::Column::ShouldNotify, false.into()),
 601                    ])
 602                    .action_and_where(
 603                        contact::Column::Accepted.eq(false).and(
 604                            contact::Column::AToB
 605                                .eq(a_to_b)
 606                                .and(contact::Column::UserIdA.eq(id_b))
 607                                .or(contact::Column::AToB
 608                                    .ne(a_to_b)
 609                                    .and(contact::Column::UserIdA.eq(id_a))),
 610                        ),
 611                    )
 612                    .to_owned(),
 613            )
 614            .exec_without_returning(&*tx)
 615            .await?;
 616
 617            if rows_affected == 1 {
 618                Ok(())
 619            } else {
 620                Err(anyhow!("contact already requested"))?
 621            }
 622        })
 623        .await
 624    }
 625
 626    /// Returns a bool indicating whether the removed contact had originally accepted or not
 627    ///
 628    /// Deletes the contact identified by the requester and responder ids, and then returns
 629    /// whether the deleted contact had originally accepted or was a pending contact request.
 630    ///
 631    /// # Arguments
 632    ///
 633    /// * `requester_id` - The user that initiates this request
 634    /// * `responder_id` - The user that will be removed
 635    pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<bool> {
 636        self.transaction(|tx| async move {
 637            let (id_a, id_b) = if responder_id < requester_id {
 638                (responder_id, requester_id)
 639            } else {
 640                (requester_id, responder_id)
 641            };
 642
 643            let contact = contact::Entity::find()
 644                .filter(
 645                    contact::Column::UserIdA
 646                        .eq(id_a)
 647                        .and(contact::Column::UserIdB.eq(id_b)),
 648                )
 649                .one(&*tx)
 650                .await?
 651                .ok_or_else(|| anyhow!("no such contact"))?;
 652
 653            contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
 654            Ok(contact.accepted)
 655        })
 656        .await
 657    }
 658
 659    pub async fn dismiss_contact_notification(
 660        &self,
 661        user_id: UserId,
 662        contact_user_id: UserId,
 663    ) -> Result<()> {
 664        self.transaction(|tx| async move {
 665            let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
 666                (user_id, contact_user_id, true)
 667            } else {
 668                (contact_user_id, user_id, false)
 669            };
 670
 671            let result = contact::Entity::update_many()
 672                .set(contact::ActiveModel {
 673                    should_notify: ActiveValue::set(false),
 674                    ..Default::default()
 675                })
 676                .filter(
 677                    contact::Column::UserIdA
 678                        .eq(id_a)
 679                        .and(contact::Column::UserIdB.eq(id_b))
 680                        .and(
 681                            contact::Column::AToB
 682                                .eq(a_to_b)
 683                                .and(contact::Column::Accepted.eq(true))
 684                                .or(contact::Column::AToB
 685                                    .ne(a_to_b)
 686                                    .and(contact::Column::Accepted.eq(false))),
 687                        ),
 688                )
 689                .exec(&*tx)
 690                .await?;
 691            if result.rows_affected == 0 {
 692                Err(anyhow!("no such contact request"))?
 693            } else {
 694                Ok(())
 695            }
 696        })
 697        .await
 698    }
 699
 700    pub async fn respond_to_contact_request(
 701        &self,
 702        responder_id: UserId,
 703        requester_id: UserId,
 704        accept: bool,
 705    ) -> Result<()> {
 706        self.transaction(|tx| async move {
 707            let (id_a, id_b, a_to_b) = if responder_id < requester_id {
 708                (responder_id, requester_id, false)
 709            } else {
 710                (requester_id, responder_id, true)
 711            };
 712            let rows_affected = if accept {
 713                let result = contact::Entity::update_many()
 714                    .set(contact::ActiveModel {
 715                        accepted: ActiveValue::set(true),
 716                        should_notify: ActiveValue::set(true),
 717                        ..Default::default()
 718                    })
 719                    .filter(
 720                        contact::Column::UserIdA
 721                            .eq(id_a)
 722                            .and(contact::Column::UserIdB.eq(id_b))
 723                            .and(contact::Column::AToB.eq(a_to_b)),
 724                    )
 725                    .exec(&*tx)
 726                    .await?;
 727                result.rows_affected
 728            } else {
 729                let result = contact::Entity::delete_many()
 730                    .filter(
 731                        contact::Column::UserIdA
 732                            .eq(id_a)
 733                            .and(contact::Column::UserIdB.eq(id_b))
 734                            .and(contact::Column::AToB.eq(a_to_b))
 735                            .and(contact::Column::Accepted.eq(false)),
 736                    )
 737                    .exec(&*tx)
 738                    .await?;
 739
 740                result.rows_affected
 741            };
 742
 743            if rows_affected == 1 {
 744                Ok(())
 745            } else {
 746                Err(anyhow!("no such contact request"))?
 747            }
 748        })
 749        .await
 750    }
 751
 752    pub fn fuzzy_like_string(string: &str) -> String {
 753        let mut result = String::with_capacity(string.len() * 2 + 1);
 754        for c in string.chars() {
 755            if c.is_alphanumeric() {
 756                result.push('%');
 757                result.push(c);
 758            }
 759        }
 760        result.push('%');
 761        result
 762    }
 763
 764    pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
 765        self.transaction(|tx| async {
 766            let tx = tx;
 767            let like_string = Self::fuzzy_like_string(name_query);
 768            let query = "
 769                SELECT users.*
 770                FROM users
 771                WHERE github_login ILIKE $1
 772                ORDER BY github_login <-> $2
 773                LIMIT $3
 774            ";
 775
 776            Ok(user::Entity::find()
 777                .from_raw_sql(Statement::from_sql_and_values(
 778                    self.pool.get_database_backend(),
 779                    query.into(),
 780                    vec![like_string.into(), name_query.into(), limit.into()],
 781                ))
 782                .all(&*tx)
 783                .await?)
 784        })
 785        .await
 786    }
 787
 788    // signups
 789
 790    pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
 791        self.transaction(|tx| async move {
 792            signup::Entity::insert(signup::ActiveModel {
 793                email_address: ActiveValue::set(signup.email_address.clone()),
 794                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 795                email_confirmation_sent: ActiveValue::set(false),
 796                platform_mac: ActiveValue::set(signup.platform_mac),
 797                platform_windows: ActiveValue::set(signup.platform_windows),
 798                platform_linux: ActiveValue::set(signup.platform_linux),
 799                platform_unknown: ActiveValue::set(false),
 800                editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
 801                programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
 802                device_id: ActiveValue::set(signup.device_id.clone()),
 803                added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
 804                ..Default::default()
 805            })
 806            .on_conflict(
 807                OnConflict::column(signup::Column::EmailAddress)
 808                    .update_columns([
 809                        signup::Column::PlatformMac,
 810                        signup::Column::PlatformWindows,
 811                        signup::Column::PlatformLinux,
 812                        signup::Column::EditorFeatures,
 813                        signup::Column::ProgrammingLanguages,
 814                        signup::Column::DeviceId,
 815                        signup::Column::AddedToMailingList,
 816                    ])
 817                    .to_owned(),
 818            )
 819            .exec(&*tx)
 820            .await?;
 821            Ok(())
 822        })
 823        .await
 824    }
 825
 826    pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
 827        self.transaction(|tx| async move {
 828            let signup = signup::Entity::find()
 829                .filter(signup::Column::EmailAddress.eq(email_address))
 830                .one(&*tx)
 831                .await?
 832                .ok_or_else(|| {
 833                    anyhow!("signup with email address {} doesn't exist", email_address)
 834                })?;
 835
 836            Ok(signup)
 837        })
 838        .await
 839    }
 840
 841    pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
 842        self.transaction(|tx| async move {
 843            let query = "
 844                SELECT
 845                    COUNT(*) as count,
 846                    COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
 847                    COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
 848                    COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
 849                    COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
 850                FROM (
 851                    SELECT *
 852                    FROM signups
 853                    WHERE
 854                        NOT email_confirmation_sent
 855                ) AS unsent
 856            ";
 857            Ok(
 858                WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
 859                    self.pool.get_database_backend(),
 860                    query.into(),
 861                    vec![],
 862                ))
 863                .one(&*tx)
 864                .await?
 865                .ok_or_else(|| anyhow!("invalid result"))?,
 866            )
 867        })
 868        .await
 869    }
 870
 871    pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
 872        let emails = invites
 873            .iter()
 874            .map(|s| s.email_address.as_str())
 875            .collect::<Vec<_>>();
 876        self.transaction(|tx| async {
 877            let tx = tx;
 878            signup::Entity::update_many()
 879                .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
 880                .set(signup::ActiveModel {
 881                    email_confirmation_sent: ActiveValue::set(true),
 882                    ..Default::default()
 883                })
 884                .exec(&*tx)
 885                .await?;
 886            Ok(())
 887        })
 888        .await
 889    }
 890
 891    pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
 892        self.transaction(|tx| async move {
 893            Ok(signup::Entity::find()
 894                .select_only()
 895                .column(signup::Column::EmailAddress)
 896                .column(signup::Column::EmailConfirmationCode)
 897                .filter(
 898                    signup::Column::EmailConfirmationSent.eq(false).and(
 899                        signup::Column::PlatformMac
 900                            .eq(true)
 901                            .or(signup::Column::PlatformUnknown.eq(true)),
 902                    ),
 903                )
 904                .order_by_asc(signup::Column::CreatedAt)
 905                .limit(count as u64)
 906                .into_model()
 907                .all(&*tx)
 908                .await?)
 909        })
 910        .await
 911    }
 912
 913    // invite codes
 914
 915    pub async fn create_invite_from_code(
 916        &self,
 917        code: &str,
 918        email_address: &str,
 919        device_id: Option<&str>,
 920        added_to_mailing_list: bool,
 921    ) -> Result<Invite> {
 922        self.transaction(|tx| async move {
 923            let existing_user = user::Entity::find()
 924                .filter(user::Column::EmailAddress.eq(email_address))
 925                .one(&*tx)
 926                .await?;
 927
 928            if existing_user.is_some() {
 929                Err(anyhow!("email address is already in use"))?;
 930            }
 931
 932            let inviting_user_with_invites = match user::Entity::find()
 933                .filter(
 934                    user::Column::InviteCode
 935                        .eq(code)
 936                        .and(user::Column::InviteCount.gt(0)),
 937                )
 938                .one(&*tx)
 939                .await?
 940            {
 941                Some(inviting_user) => inviting_user,
 942                None => {
 943                    return Err(Error::Http(
 944                        StatusCode::UNAUTHORIZED,
 945                        "unable to find an invite code with invites remaining".to_string(),
 946                    ))?
 947                }
 948            };
 949            user::Entity::update_many()
 950                .filter(
 951                    user::Column::Id
 952                        .eq(inviting_user_with_invites.id)
 953                        .and(user::Column::InviteCount.gt(0)),
 954                )
 955                .col_expr(
 956                    user::Column::InviteCount,
 957                    Expr::col(user::Column::InviteCount).sub(1),
 958                )
 959                .exec(&*tx)
 960                .await?;
 961
 962            let signup = signup::Entity::insert(signup::ActiveModel {
 963                email_address: ActiveValue::set(email_address.into()),
 964                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 965                email_confirmation_sent: ActiveValue::set(false),
 966                inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
 967                platform_linux: ActiveValue::set(false),
 968                platform_mac: ActiveValue::set(false),
 969                platform_windows: ActiveValue::set(false),
 970                platform_unknown: ActiveValue::set(true),
 971                device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
 972                added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
 973                ..Default::default()
 974            })
 975            .on_conflict(
 976                OnConflict::column(signup::Column::EmailAddress)
 977                    .update_column(signup::Column::InvitingUserId)
 978                    .to_owned(),
 979            )
 980            .exec_with_returning(&*tx)
 981            .await?;
 982
 983            Ok(Invite {
 984                email_address: signup.email_address,
 985                email_confirmation_code: signup.email_confirmation_code,
 986            })
 987        })
 988        .await
 989    }
 990
 991    pub async fn create_user_from_invite(
 992        &self,
 993        invite: &Invite,
 994        user: NewUserParams,
 995    ) -> Result<Option<NewUserResult>> {
 996        self.transaction(|tx| async {
 997            let tx = tx;
 998            let signup = signup::Entity::find()
 999                .filter(
1000                    signup::Column::EmailAddress
1001                        .eq(invite.email_address.as_str())
1002                        .and(
1003                            signup::Column::EmailConfirmationCode
1004                                .eq(invite.email_confirmation_code.as_str()),
1005                        ),
1006                )
1007                .one(&*tx)
1008                .await?
1009                .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
1010
1011            if signup.user_id.is_some() {
1012                return Ok(None);
1013            }
1014
1015            let user = user::Entity::insert(user::ActiveModel {
1016                email_address: ActiveValue::set(Some(invite.email_address.clone())),
1017                github_login: ActiveValue::set(user.github_login.clone()),
1018                github_user_id: ActiveValue::set(Some(user.github_user_id)),
1019                admin: ActiveValue::set(false),
1020                invite_count: ActiveValue::set(user.invite_count),
1021                invite_code: ActiveValue::set(Some(random_invite_code())),
1022                metrics_id: ActiveValue::set(Uuid::new_v4()),
1023                ..Default::default()
1024            })
1025            .on_conflict(
1026                OnConflict::column(user::Column::GithubLogin)
1027                    .update_columns([
1028                        user::Column::EmailAddress,
1029                        user::Column::GithubUserId,
1030                        user::Column::Admin,
1031                    ])
1032                    .to_owned(),
1033            )
1034            .exec_with_returning(&*tx)
1035            .await?;
1036
1037            let mut signup = signup.into_active_model();
1038            signup.user_id = ActiveValue::set(Some(user.id));
1039            let signup = signup.update(&*tx).await?;
1040
1041            if let Some(inviting_user_id) = signup.inviting_user_id {
1042                let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1043                    (inviting_user_id, user.id, true)
1044                } else {
1045                    (user.id, inviting_user_id, false)
1046                };
1047
1048                contact::Entity::insert(contact::ActiveModel {
1049                    user_id_a: ActiveValue::set(user_id_a),
1050                    user_id_b: ActiveValue::set(user_id_b),
1051                    a_to_b: ActiveValue::set(a_to_b),
1052                    should_notify: ActiveValue::set(true),
1053                    accepted: ActiveValue::set(true),
1054                    ..Default::default()
1055                })
1056                .on_conflict(OnConflict::new().do_nothing().to_owned())
1057                .exec_without_returning(&*tx)
1058                .await?;
1059            }
1060
1061            Ok(Some(NewUserResult {
1062                user_id: user.id,
1063                metrics_id: user.metrics_id.to_string(),
1064                inviting_user_id: signup.inviting_user_id,
1065                signup_device_id: signup.device_id,
1066            }))
1067        })
1068        .await
1069    }
1070
1071    pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1072        self.transaction(|tx| async move {
1073            if count > 0 {
1074                user::Entity::update_many()
1075                    .filter(
1076                        user::Column::Id
1077                            .eq(id)
1078                            .and(user::Column::InviteCode.is_null()),
1079                    )
1080                    .set(user::ActiveModel {
1081                        invite_code: ActiveValue::set(Some(random_invite_code())),
1082                        ..Default::default()
1083                    })
1084                    .exec(&*tx)
1085                    .await?;
1086            }
1087
1088            user::Entity::update_many()
1089                .filter(user::Column::Id.eq(id))
1090                .set(user::ActiveModel {
1091                    invite_count: ActiveValue::set(count),
1092                    ..Default::default()
1093                })
1094                .exec(&*tx)
1095                .await?;
1096            Ok(())
1097        })
1098        .await
1099    }
1100
1101    pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1102        self.transaction(|tx| async move {
1103            match user::Entity::find_by_id(id).one(&*tx).await? {
1104                Some(user) if user.invite_code.is_some() => {
1105                    Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1106                }
1107                _ => Ok(None),
1108            }
1109        })
1110        .await
1111    }
1112
1113    pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1114        self.transaction(|tx| async move {
1115            user::Entity::find()
1116                .filter(user::Column::InviteCode.eq(code))
1117                .one(&*tx)
1118                .await?
1119                .ok_or_else(|| {
1120                    Error::Http(
1121                        StatusCode::NOT_FOUND,
1122                        "that invite code does not exist".to_string(),
1123                    )
1124                })
1125        })
1126        .await
1127    }
1128
1129    // rooms
1130
1131    pub async fn incoming_call_for_user(
1132        &self,
1133        user_id: UserId,
1134    ) -> Result<Option<proto::IncomingCall>> {
1135        self.transaction(|tx| async move {
1136            let pending_participant = room_participant::Entity::find()
1137                .filter(
1138                    room_participant::Column::UserId
1139                        .eq(user_id)
1140                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1141                )
1142                .one(&*tx)
1143                .await?;
1144
1145            if let Some(pending_participant) = pending_participant {
1146                let room = self.get_room(pending_participant.room_id, &tx).await?;
1147                Ok(Self::build_incoming_call(&room, user_id))
1148            } else {
1149                Ok(None)
1150            }
1151        })
1152        .await
1153    }
1154
1155    pub async fn create_room(
1156        &self,
1157        user_id: UserId,
1158        connection: ConnectionId,
1159        live_kit_room: &str,
1160    ) -> Result<proto::Room> {
1161        self.transaction(|tx| async move {
1162            let room = room::ActiveModel {
1163                live_kit_room: ActiveValue::set(live_kit_room.into()),
1164                ..Default::default()
1165            }
1166            .insert(&*tx)
1167            .await?;
1168            room_participant::ActiveModel {
1169                room_id: ActiveValue::set(room.id),
1170                user_id: ActiveValue::set(user_id),
1171                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1172                answering_connection_server_id: ActiveValue::set(Some(ServerId(
1173                    connection.owner_id as i32,
1174                ))),
1175                answering_connection_lost: ActiveValue::set(false),
1176                calling_user_id: ActiveValue::set(user_id),
1177                calling_connection_id: ActiveValue::set(connection.id as i32),
1178                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1179                    connection.owner_id as i32,
1180                ))),
1181                ..Default::default()
1182            }
1183            .insert(&*tx)
1184            .await?;
1185
1186            let room = self.get_room(room.id, &tx).await?;
1187            Ok(room)
1188        })
1189        .await
1190    }
1191
1192    pub async fn call(
1193        &self,
1194        room_id: RoomId,
1195        calling_user_id: UserId,
1196        calling_connection: ConnectionId,
1197        called_user_id: UserId,
1198        initial_project_id: Option<ProjectId>,
1199    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1200        self.room_transaction(room_id, |tx| async move {
1201            room_participant::ActiveModel {
1202                room_id: ActiveValue::set(room_id),
1203                user_id: ActiveValue::set(called_user_id),
1204                answering_connection_lost: ActiveValue::set(false),
1205                calling_user_id: ActiveValue::set(calling_user_id),
1206                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1207                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1208                    calling_connection.owner_id as i32,
1209                ))),
1210                initial_project_id: ActiveValue::set(initial_project_id),
1211                ..Default::default()
1212            }
1213            .insert(&*tx)
1214            .await?;
1215
1216            let room = self.get_room(room_id, &tx).await?;
1217            let incoming_call = Self::build_incoming_call(&room, called_user_id)
1218                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1219            Ok((room, incoming_call))
1220        })
1221        .await
1222    }
1223
1224    pub async fn call_failed(
1225        &self,
1226        room_id: RoomId,
1227        called_user_id: UserId,
1228    ) -> Result<RoomGuard<proto::Room>> {
1229        self.room_transaction(room_id, |tx| async move {
1230            room_participant::Entity::delete_many()
1231                .filter(
1232                    room_participant::Column::RoomId
1233                        .eq(room_id)
1234                        .and(room_participant::Column::UserId.eq(called_user_id)),
1235                )
1236                .exec(&*tx)
1237                .await?;
1238            let room = self.get_room(room_id, &tx).await?;
1239            Ok(room)
1240        })
1241        .await
1242    }
1243
1244    pub async fn decline_call(
1245        &self,
1246        expected_room_id: Option<RoomId>,
1247        user_id: UserId,
1248    ) -> Result<Option<RoomGuard<proto::Room>>> {
1249        self.optional_room_transaction(|tx| async move {
1250            let mut filter = Condition::all()
1251                .add(room_participant::Column::UserId.eq(user_id))
1252                .add(room_participant::Column::AnsweringConnectionId.is_null());
1253            if let Some(room_id) = expected_room_id {
1254                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1255            }
1256            let participant = room_participant::Entity::find()
1257                .filter(filter)
1258                .one(&*tx)
1259                .await?;
1260
1261            let participant = if let Some(participant) = participant {
1262                participant
1263            } else if expected_room_id.is_some() {
1264                return Err(anyhow!("could not find call to decline"))?;
1265            } else {
1266                return Ok(None);
1267            };
1268
1269            let room_id = participant.room_id;
1270            room_participant::Entity::delete(participant.into_active_model())
1271                .exec(&*tx)
1272                .await?;
1273
1274            let room = self.get_room(room_id, &tx).await?;
1275            Ok(Some((room_id, room)))
1276        })
1277        .await
1278    }
1279
1280    pub async fn cancel_call(
1281        &self,
1282        room_id: RoomId,
1283        calling_connection: ConnectionId,
1284        called_user_id: UserId,
1285    ) -> Result<RoomGuard<proto::Room>> {
1286        self.room_transaction(room_id, |tx| async move {
1287            let participant = room_participant::Entity::find()
1288                .filter(
1289                    Condition::all()
1290                        .add(room_participant::Column::UserId.eq(called_user_id))
1291                        .add(room_participant::Column::RoomId.eq(room_id))
1292                        .add(
1293                            room_participant::Column::CallingConnectionId
1294                                .eq(calling_connection.id as i32),
1295                        )
1296                        .add(
1297                            room_participant::Column::CallingConnectionServerId
1298                                .eq(calling_connection.owner_id as i32),
1299                        )
1300                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1301                )
1302                .one(&*tx)
1303                .await?
1304                .ok_or_else(|| anyhow!("no call to cancel"))?;
1305
1306            room_participant::Entity::delete(participant.into_active_model())
1307                .exec(&*tx)
1308                .await?;
1309
1310            let room = self.get_room(room_id, &tx).await?;
1311            Ok(room)
1312        })
1313        .await
1314    }
1315
1316    pub async fn join_room(
1317        &self,
1318        room_id: RoomId,
1319        user_id: UserId,
1320        connection: ConnectionId,
1321    ) -> Result<RoomGuard<proto::Room>> {
1322        self.room_transaction(room_id, |tx| async move {
1323            let result = room_participant::Entity::update_many()
1324                .filter(
1325                    Condition::all()
1326                        .add(room_participant::Column::RoomId.eq(room_id))
1327                        .add(room_participant::Column::UserId.eq(user_id))
1328                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1329                )
1330                .set(room_participant::ActiveModel {
1331                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1332                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1333                        connection.owner_id as i32,
1334                    ))),
1335                    answering_connection_lost: ActiveValue::set(false),
1336                    ..Default::default()
1337                })
1338                .exec(&*tx)
1339                .await?;
1340            if result.rows_affected == 0 {
1341                Err(anyhow!("room does not exist or was already joined"))?
1342            } else {
1343                let room = self.get_room(room_id, &tx).await?;
1344                Ok(room)
1345            }
1346        })
1347        .await
1348    }
1349
1350    pub async fn rejoin_room(
1351        &self,
1352        rejoin_room: proto::RejoinRoom,
1353        user_id: UserId,
1354        connection: ConnectionId,
1355    ) -> Result<RoomGuard<RejoinedRoom>> {
1356        let room_id = RoomId::from_proto(rejoin_room.id);
1357        self.room_transaction(room_id, |tx| async {
1358            let tx = tx;
1359            let participant_update = room_participant::Entity::update_many()
1360                .filter(
1361                    Condition::all()
1362                        .add(room_participant::Column::RoomId.eq(room_id))
1363                        .add(room_participant::Column::UserId.eq(user_id))
1364                        .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1365                        .add(
1366                            Condition::any()
1367                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1368                                .add(
1369                                    room_participant::Column::AnsweringConnectionServerId
1370                                        .ne(connection.owner_id as i32),
1371                                ),
1372                        ),
1373                )
1374                .set(room_participant::ActiveModel {
1375                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1376                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1377                        connection.owner_id as i32,
1378                    ))),
1379                    answering_connection_lost: ActiveValue::set(false),
1380                    ..Default::default()
1381                })
1382                .exec(&*tx)
1383                .await?;
1384            if participant_update.rows_affected == 0 {
1385                return Err(anyhow!("room does not exist or was already joined"))?;
1386            }
1387
1388            let mut reshared_projects = Vec::new();
1389            for reshared_project in &rejoin_room.reshared_projects {
1390                let project_id = ProjectId::from_proto(reshared_project.project_id);
1391                let project = project::Entity::find_by_id(project_id)
1392                    .one(&*tx)
1393                    .await?
1394                    .ok_or_else(|| anyhow!("project does not exist"))?;
1395                if project.host_user_id != user_id {
1396                    return Err(anyhow!("no such project"))?;
1397                }
1398
1399                let mut collaborators = project
1400                    .find_related(project_collaborator::Entity)
1401                    .all(&*tx)
1402                    .await?;
1403                let host_ix = collaborators
1404                    .iter()
1405                    .position(|collaborator| {
1406                        collaborator.user_id == user_id && collaborator.is_host
1407                    })
1408                    .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1409                let host = collaborators.swap_remove(host_ix);
1410                let old_connection_id = host.connection();
1411
1412                project::Entity::update(project::ActiveModel {
1413                    host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1414                    host_connection_server_id: ActiveValue::set(Some(ServerId(
1415                        connection.owner_id as i32,
1416                    ))),
1417                    ..project.into_active_model()
1418                })
1419                .exec(&*tx)
1420                .await?;
1421                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1422                    connection_id: ActiveValue::set(connection.id as i32),
1423                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1424                    ..host.into_active_model()
1425                })
1426                .exec(&*tx)
1427                .await?;
1428
1429                self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1430                    .await?;
1431
1432                reshared_projects.push(ResharedProject {
1433                    id: project_id,
1434                    old_connection_id,
1435                    collaborators: collaborators
1436                        .iter()
1437                        .map(|collaborator| ProjectCollaborator {
1438                            connection_id: collaborator.connection(),
1439                            user_id: collaborator.user_id,
1440                            replica_id: collaborator.replica_id,
1441                            is_host: collaborator.is_host,
1442                        })
1443                        .collect(),
1444                    worktrees: reshared_project.worktrees.clone(),
1445                });
1446            }
1447
1448            project::Entity::delete_many()
1449                .filter(
1450                    Condition::all()
1451                        .add(project::Column::RoomId.eq(room_id))
1452                        .add(project::Column::HostUserId.eq(user_id))
1453                        .add(
1454                            project::Column::Id
1455                                .is_not_in(reshared_projects.iter().map(|project| project.id)),
1456                        ),
1457                )
1458                .exec(&*tx)
1459                .await?;
1460
1461            let mut rejoined_projects = Vec::new();
1462            for rejoined_project in &rejoin_room.rejoined_projects {
1463                let project_id = ProjectId::from_proto(rejoined_project.id);
1464                let Some(project) = project::Entity::find_by_id(project_id)
1465                    .one(&*tx)
1466                    .await? else { continue };
1467
1468                let mut worktrees = Vec::new();
1469                let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1470                for db_worktree in db_worktrees {
1471                    let mut worktree = RejoinedWorktree {
1472                        id: db_worktree.id as u64,
1473                        abs_path: db_worktree.abs_path,
1474                        root_name: db_worktree.root_name,
1475                        visible: db_worktree.visible,
1476                        updated_entries: Default::default(),
1477                        removed_entries: Default::default(),
1478                        diagnostic_summaries: Default::default(),
1479                        scan_id: db_worktree.scan_id as u64,
1480                        completed_scan_id: db_worktree.completed_scan_id as u64,
1481                    };
1482
1483                    let rejoined_worktree = rejoined_project
1484                        .worktrees
1485                        .iter()
1486                        .find(|worktree| worktree.id == db_worktree.id as u64);
1487                    let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1488                        worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1489                    } else {
1490                        worktree_entry::Column::IsDeleted.eq(false)
1491                    };
1492
1493                    let mut db_entries = worktree_entry::Entity::find()
1494                        .filter(
1495                            Condition::all()
1496                                .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1497                                .add(entry_filter),
1498                        )
1499                        .stream(&*tx)
1500                        .await?;
1501
1502                    while let Some(db_entry) = db_entries.next().await {
1503                        let db_entry = db_entry?;
1504                        if db_entry.is_deleted {
1505                            worktree.removed_entries.push(db_entry.id as u64);
1506                        } else {
1507                            worktree.updated_entries.push(proto::Entry {
1508                                id: db_entry.id as u64,
1509                                is_dir: db_entry.is_dir,
1510                                path: db_entry.path,
1511                                inode: db_entry.inode as u64,
1512                                mtime: Some(proto::Timestamp {
1513                                    seconds: db_entry.mtime_seconds as u64,
1514                                    nanos: db_entry.mtime_nanos as u32,
1515                                }),
1516                                is_symlink: db_entry.is_symlink,
1517                                is_ignored: db_entry.is_ignored,
1518                            });
1519                        }
1520                    }
1521
1522                    worktrees.push(worktree);
1523                }
1524
1525                let language_servers = project
1526                    .find_related(language_server::Entity)
1527                    .all(&*tx)
1528                    .await?
1529                    .into_iter()
1530                    .map(|language_server| proto::LanguageServer {
1531                        id: language_server.id as u64,
1532                        name: language_server.name,
1533                    })
1534                    .collect::<Vec<_>>();
1535
1536                let mut collaborators = project
1537                    .find_related(project_collaborator::Entity)
1538                    .all(&*tx)
1539                    .await?;
1540                let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1541                    .iter()
1542                    .position(|collaborator| collaborator.user_id == user_id)
1543                {
1544                    collaborators.swap_remove(self_collaborator_ix)
1545                } else {
1546                    continue;
1547                };
1548                let old_connection_id = self_collaborator.connection();
1549                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1550                    connection_id: ActiveValue::set(connection.id as i32),
1551                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1552                    ..self_collaborator.into_active_model()
1553                })
1554                .exec(&*tx)
1555                .await?;
1556
1557                let collaborators = collaborators
1558                    .into_iter()
1559                    .map(|collaborator| ProjectCollaborator {
1560                        connection_id: collaborator.connection(),
1561                        user_id: collaborator.user_id,
1562                        replica_id: collaborator.replica_id,
1563                        is_host: collaborator.is_host,
1564                    })
1565                    .collect::<Vec<_>>();
1566
1567                rejoined_projects.push(RejoinedProject {
1568                    id: project_id,
1569                    old_connection_id,
1570                    collaborators,
1571                    worktrees,
1572                    language_servers,
1573                });
1574            }
1575
1576            let room = self.get_room(room_id, &tx).await?;
1577            Ok(RejoinedRoom {
1578                room,
1579                rejoined_projects,
1580                reshared_projects,
1581            })
1582        })
1583        .await
1584    }
1585
1586    pub async fn leave_room(
1587        &self,
1588        connection: ConnectionId,
1589    ) -> Result<Option<RoomGuard<LeftRoom>>> {
1590        self.optional_room_transaction(|tx| async move {
1591            let leaving_participant = room_participant::Entity::find()
1592                .filter(
1593                    Condition::all()
1594                        .add(
1595                            room_participant::Column::AnsweringConnectionId
1596                                .eq(connection.id as i32),
1597                        )
1598                        .add(
1599                            room_participant::Column::AnsweringConnectionServerId
1600                                .eq(connection.owner_id as i32),
1601                        ),
1602                )
1603                .one(&*tx)
1604                .await?;
1605
1606            if let Some(leaving_participant) = leaving_participant {
1607                // Leave room.
1608                let room_id = leaving_participant.room_id;
1609                room_participant::Entity::delete_by_id(leaving_participant.id)
1610                    .exec(&*tx)
1611                    .await?;
1612
1613                // Cancel pending calls initiated by the leaving user.
1614                let called_participants = room_participant::Entity::find()
1615                    .filter(
1616                        Condition::all()
1617                            .add(
1618                                room_participant::Column::CallingUserId
1619                                    .eq(leaving_participant.user_id),
1620                            )
1621                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
1622                    )
1623                    .all(&*tx)
1624                    .await?;
1625                room_participant::Entity::delete_many()
1626                    .filter(
1627                        room_participant::Column::Id
1628                            .is_in(called_participants.iter().map(|participant| participant.id)),
1629                    )
1630                    .exec(&*tx)
1631                    .await?;
1632                let canceled_calls_to_user_ids = called_participants
1633                    .into_iter()
1634                    .map(|participant| participant.user_id)
1635                    .collect();
1636
1637                // Detect left projects.
1638                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1639                enum QueryProjectIds {
1640                    ProjectId,
1641                }
1642                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1643                    .select_only()
1644                    .column_as(
1645                        project_collaborator::Column::ProjectId,
1646                        QueryProjectIds::ProjectId,
1647                    )
1648                    .filter(
1649                        Condition::all()
1650                            .add(
1651                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1652                            )
1653                            .add(
1654                                project_collaborator::Column::ConnectionServerId
1655                                    .eq(connection.owner_id as i32),
1656                            ),
1657                    )
1658                    .into_values::<_, QueryProjectIds>()
1659                    .all(&*tx)
1660                    .await?;
1661                let mut left_projects = HashMap::default();
1662                let mut collaborators = project_collaborator::Entity::find()
1663                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1664                    .stream(&*tx)
1665                    .await?;
1666                while let Some(collaborator) = collaborators.next().await {
1667                    let collaborator = collaborator?;
1668                    let left_project =
1669                        left_projects
1670                            .entry(collaborator.project_id)
1671                            .or_insert(LeftProject {
1672                                id: collaborator.project_id,
1673                                host_user_id: Default::default(),
1674                                connection_ids: Default::default(),
1675                                host_connection_id: Default::default(),
1676                            });
1677
1678                    let collaborator_connection_id = collaborator.connection();
1679                    if collaborator_connection_id != connection {
1680                        left_project.connection_ids.push(collaborator_connection_id);
1681                    }
1682
1683                    if collaborator.is_host {
1684                        left_project.host_user_id = collaborator.user_id;
1685                        left_project.host_connection_id = collaborator_connection_id;
1686                    }
1687                }
1688                drop(collaborators);
1689
1690                // Leave projects.
1691                project_collaborator::Entity::delete_many()
1692                    .filter(
1693                        Condition::all()
1694                            .add(
1695                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1696                            )
1697                            .add(
1698                                project_collaborator::Column::ConnectionServerId
1699                                    .eq(connection.owner_id as i32),
1700                            ),
1701                    )
1702                    .exec(&*tx)
1703                    .await?;
1704
1705                // Unshare projects.
1706                project::Entity::delete_many()
1707                    .filter(
1708                        Condition::all()
1709                            .add(project::Column::RoomId.eq(room_id))
1710                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
1711                            .add(
1712                                project::Column::HostConnectionServerId
1713                                    .eq(connection.owner_id as i32),
1714                            ),
1715                    )
1716                    .exec(&*tx)
1717                    .await?;
1718
1719                let room = self.get_room(room_id, &tx).await?;
1720                if room.participants.is_empty() {
1721                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
1722                }
1723
1724                let left_room = LeftRoom {
1725                    room,
1726                    left_projects,
1727                    canceled_calls_to_user_ids,
1728                };
1729
1730                if left_room.room.participants.is_empty() {
1731                    self.rooms.remove(&room_id);
1732                }
1733
1734                Ok(Some((room_id, left_room)))
1735            } else {
1736                Ok(None)
1737            }
1738        })
1739        .await
1740    }
1741
1742    pub async fn follow(
1743        &self,
1744        project_id: ProjectId,
1745        leader_connection: ConnectionId,
1746        follower_connection: ConnectionId,
1747    ) -> Result<RoomGuard<proto::Room>> {
1748        let room_id = self.room_id_for_project(project_id).await?;
1749        self.room_transaction(room_id, |tx| async move {
1750            follower::ActiveModel {
1751                room_id: ActiveValue::set(room_id),
1752                project_id: ActiveValue::set(project_id),
1753                leader_connection_server_id: ActiveValue::set(ServerId(
1754                    leader_connection.owner_id as i32,
1755                )),
1756                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1757                follower_connection_server_id: ActiveValue::set(ServerId(
1758                    follower_connection.owner_id as i32,
1759                )),
1760                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1761                ..Default::default()
1762            }
1763            .insert(&*tx)
1764            .await?;
1765
1766            let room = self.get_room(room_id, &*tx).await?;
1767            Ok(room)
1768        })
1769        .await
1770    }
1771
1772    pub async fn unfollow(
1773        &self,
1774        project_id: ProjectId,
1775        leader_connection: ConnectionId,
1776        follower_connection: ConnectionId,
1777    ) -> Result<RoomGuard<proto::Room>> {
1778        let room_id = self.room_id_for_project(project_id).await?;
1779        self.room_transaction(room_id, |tx| async move {
1780            follower::Entity::delete_many()
1781                .filter(
1782                    Condition::all()
1783                        .add(follower::Column::ProjectId.eq(project_id))
1784                        .add(
1785                            follower::Column::LeaderConnectionServerId
1786                                .eq(leader_connection.owner_id),
1787                        )
1788                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1789                        .add(
1790                            follower::Column::FollowerConnectionServerId
1791                                .eq(follower_connection.owner_id),
1792                        )
1793                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1794                )
1795                .exec(&*tx)
1796                .await?;
1797
1798            let room = self.get_room(room_id, &*tx).await?;
1799            Ok(room)
1800        })
1801        .await
1802    }
1803
1804    pub async fn update_room_participant_location(
1805        &self,
1806        room_id: RoomId,
1807        connection: ConnectionId,
1808        location: proto::ParticipantLocation,
1809    ) -> Result<RoomGuard<proto::Room>> {
1810        self.room_transaction(room_id, |tx| async {
1811            let tx = tx;
1812            let location_kind;
1813            let location_project_id;
1814            match location
1815                .variant
1816                .as_ref()
1817                .ok_or_else(|| anyhow!("invalid location"))?
1818            {
1819                proto::participant_location::Variant::SharedProject(project) => {
1820                    location_kind = 0;
1821                    location_project_id = Some(ProjectId::from_proto(project.id));
1822                }
1823                proto::participant_location::Variant::UnsharedProject(_) => {
1824                    location_kind = 1;
1825                    location_project_id = None;
1826                }
1827                proto::participant_location::Variant::External(_) => {
1828                    location_kind = 2;
1829                    location_project_id = None;
1830                }
1831            }
1832
1833            let result = room_participant::Entity::update_many()
1834                .filter(
1835                    Condition::all()
1836                        .add(room_participant::Column::RoomId.eq(room_id))
1837                        .add(
1838                            room_participant::Column::AnsweringConnectionId
1839                                .eq(connection.id as i32),
1840                        )
1841                        .add(
1842                            room_participant::Column::AnsweringConnectionServerId
1843                                .eq(connection.owner_id as i32),
1844                        ),
1845                )
1846                .set(room_participant::ActiveModel {
1847                    location_kind: ActiveValue::set(Some(location_kind)),
1848                    location_project_id: ActiveValue::set(location_project_id),
1849                    ..Default::default()
1850                })
1851                .exec(&*tx)
1852                .await?;
1853
1854            if result.rows_affected == 1 {
1855                let room = self.get_room(room_id, &tx).await?;
1856                Ok(room)
1857            } else {
1858                Err(anyhow!("could not update room participant location"))?
1859            }
1860        })
1861        .await
1862    }
1863
1864    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1865        self.transaction(|tx| async move {
1866            let participant = room_participant::Entity::find()
1867                .filter(
1868                    Condition::all()
1869                        .add(
1870                            room_participant::Column::AnsweringConnectionId
1871                                .eq(connection.id as i32),
1872                        )
1873                        .add(
1874                            room_participant::Column::AnsweringConnectionServerId
1875                                .eq(connection.owner_id as i32),
1876                        ),
1877                )
1878                .one(&*tx)
1879                .await?
1880                .ok_or_else(|| anyhow!("not a participant in any room"))?;
1881
1882            room_participant::Entity::update(room_participant::ActiveModel {
1883                answering_connection_lost: ActiveValue::set(true),
1884                ..participant.into_active_model()
1885            })
1886            .exec(&*tx)
1887            .await?;
1888
1889            Ok(())
1890        })
1891        .await
1892    }
1893
1894    fn build_incoming_call(
1895        room: &proto::Room,
1896        called_user_id: UserId,
1897    ) -> Option<proto::IncomingCall> {
1898        let pending_participant = room
1899            .pending_participants
1900            .iter()
1901            .find(|participant| participant.user_id == called_user_id.to_proto())?;
1902
1903        Some(proto::IncomingCall {
1904            room_id: room.id,
1905            calling_user_id: pending_participant.calling_user_id,
1906            participant_user_ids: room
1907                .participants
1908                .iter()
1909                .map(|participant| participant.user_id)
1910                .collect(),
1911            initial_project: room.participants.iter().find_map(|participant| {
1912                let initial_project_id = pending_participant.initial_project_id?;
1913                participant
1914                    .projects
1915                    .iter()
1916                    .find(|project| project.id == initial_project_id)
1917                    .cloned()
1918            }),
1919        })
1920    }
1921
1922    async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1923        let db_room = room::Entity::find_by_id(room_id)
1924            .one(tx)
1925            .await?
1926            .ok_or_else(|| anyhow!("could not find room"))?;
1927
1928        let mut db_participants = db_room
1929            .find_related(room_participant::Entity)
1930            .stream(tx)
1931            .await?;
1932        let mut participants = HashMap::default();
1933        let mut pending_participants = Vec::new();
1934        while let Some(db_participant) = db_participants.next().await {
1935            let db_participant = db_participant?;
1936            if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
1937                .answering_connection_id
1938                .zip(db_participant.answering_connection_server_id)
1939            {
1940                let location = match (
1941                    db_participant.location_kind,
1942                    db_participant.location_project_id,
1943                ) {
1944                    (Some(0), Some(project_id)) => {
1945                        Some(proto::participant_location::Variant::SharedProject(
1946                            proto::participant_location::SharedProject {
1947                                id: project_id.to_proto(),
1948                            },
1949                        ))
1950                    }
1951                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1952                        Default::default(),
1953                    )),
1954                    _ => Some(proto::participant_location::Variant::External(
1955                        Default::default(),
1956                    )),
1957                };
1958
1959                let answering_connection = ConnectionId {
1960                    owner_id: answering_connection_server_id.0 as u32,
1961                    id: answering_connection_id as u32,
1962                };
1963                participants.insert(
1964                    answering_connection,
1965                    proto::Participant {
1966                        user_id: db_participant.user_id.to_proto(),
1967                        peer_id: Some(answering_connection.into()),
1968                        projects: Default::default(),
1969                        location: Some(proto::ParticipantLocation { variant: location }),
1970                    },
1971                );
1972            } else {
1973                pending_participants.push(proto::PendingParticipant {
1974                    user_id: db_participant.user_id.to_proto(),
1975                    calling_user_id: db_participant.calling_user_id.to_proto(),
1976                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1977                });
1978            }
1979        }
1980        drop(db_participants);
1981
1982        let mut db_projects = db_room
1983            .find_related(project::Entity)
1984            .find_with_related(worktree::Entity)
1985            .stream(tx)
1986            .await?;
1987
1988        while let Some(row) = db_projects.next().await {
1989            let (db_project, db_worktree) = row?;
1990            let host_connection = db_project.host_connection()?;
1991            if let Some(participant) = participants.get_mut(&host_connection) {
1992                let project = if let Some(project) = participant
1993                    .projects
1994                    .iter_mut()
1995                    .find(|project| project.id == db_project.id.to_proto())
1996                {
1997                    project
1998                } else {
1999                    participant.projects.push(proto::ParticipantProject {
2000                        id: db_project.id.to_proto(),
2001                        worktree_root_names: Default::default(),
2002                    });
2003                    participant.projects.last_mut().unwrap()
2004                };
2005
2006                if let Some(db_worktree) = db_worktree {
2007                    if db_worktree.visible {
2008                        project.worktree_root_names.push(db_worktree.root_name);
2009                    }
2010                }
2011            }
2012        }
2013        drop(db_projects);
2014
2015        let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
2016        let mut followers = Vec::new();
2017        while let Some(db_follower) = db_followers.next().await {
2018            let db_follower = db_follower?;
2019            followers.push(proto::Follower {
2020                leader_id: Some(db_follower.leader_connection().into()),
2021                follower_id: Some(db_follower.follower_connection().into()),
2022                project_id: db_follower.project_id.to_proto(),
2023            });
2024        }
2025
2026        Ok(proto::Room {
2027            id: db_room.id.to_proto(),
2028            live_kit_room: db_room.live_kit_room,
2029            participants: participants.into_values().collect(),
2030            pending_participants,
2031            followers,
2032        })
2033    }
2034
2035    // projects
2036
2037    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
2038        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2039        enum QueryAs {
2040            Count,
2041        }
2042
2043        self.transaction(|tx| async move {
2044            Ok(project::Entity::find()
2045                .select_only()
2046                .column_as(project::Column::Id.count(), QueryAs::Count)
2047                .inner_join(user::Entity)
2048                .filter(user::Column::Admin.eq(false))
2049                .into_values::<_, QueryAs>()
2050                .one(&*tx)
2051                .await?
2052                .unwrap_or(0i64) as usize)
2053        })
2054        .await
2055    }
2056
2057    pub async fn share_project(
2058        &self,
2059        room_id: RoomId,
2060        connection: ConnectionId,
2061        worktrees: &[proto::WorktreeMetadata],
2062    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
2063        self.room_transaction(room_id, |tx| async move {
2064            let participant = room_participant::Entity::find()
2065                .filter(
2066                    Condition::all()
2067                        .add(
2068                            room_participant::Column::AnsweringConnectionId
2069                                .eq(connection.id as i32),
2070                        )
2071                        .add(
2072                            room_participant::Column::AnsweringConnectionServerId
2073                                .eq(connection.owner_id as i32),
2074                        ),
2075                )
2076                .one(&*tx)
2077                .await?
2078                .ok_or_else(|| anyhow!("could not find participant"))?;
2079            if participant.room_id != room_id {
2080                return Err(anyhow!("shared project on unexpected room"))?;
2081            }
2082
2083            let project = project::ActiveModel {
2084                room_id: ActiveValue::set(participant.room_id),
2085                host_user_id: ActiveValue::set(participant.user_id),
2086                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
2087                host_connection_server_id: ActiveValue::set(Some(ServerId(
2088                    connection.owner_id as i32,
2089                ))),
2090                ..Default::default()
2091            }
2092            .insert(&*tx)
2093            .await?;
2094
2095            if !worktrees.is_empty() {
2096                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2097                    worktree::ActiveModel {
2098                        id: ActiveValue::set(worktree.id as i64),
2099                        project_id: ActiveValue::set(project.id),
2100                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
2101                        root_name: ActiveValue::set(worktree.root_name.clone()),
2102                        visible: ActiveValue::set(worktree.visible),
2103                        scan_id: ActiveValue::set(0),
2104                        completed_scan_id: ActiveValue::set(0),
2105                    }
2106                }))
2107                .exec(&*tx)
2108                .await?;
2109            }
2110
2111            project_collaborator::ActiveModel {
2112                project_id: ActiveValue::set(project.id),
2113                connection_id: ActiveValue::set(connection.id as i32),
2114                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2115                user_id: ActiveValue::set(participant.user_id),
2116                replica_id: ActiveValue::set(ReplicaId(0)),
2117                is_host: ActiveValue::set(true),
2118                ..Default::default()
2119            }
2120            .insert(&*tx)
2121            .await?;
2122
2123            let room = self.get_room(room_id, &tx).await?;
2124            Ok((project.id, room))
2125        })
2126        .await
2127    }
2128
2129    pub async fn unshare_project(
2130        &self,
2131        project_id: ProjectId,
2132        connection: ConnectionId,
2133    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2134        let room_id = self.room_id_for_project(project_id).await?;
2135        self.room_transaction(room_id, |tx| async move {
2136            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2137
2138            let project = project::Entity::find_by_id(project_id)
2139                .one(&*tx)
2140                .await?
2141                .ok_or_else(|| anyhow!("project not found"))?;
2142            if project.host_connection()? == connection {
2143                project::Entity::delete(project.into_active_model())
2144                    .exec(&*tx)
2145                    .await?;
2146                let room = self.get_room(room_id, &tx).await?;
2147                Ok((room, guest_connection_ids))
2148            } else {
2149                Err(anyhow!("cannot unshare a project hosted by another user"))?
2150            }
2151        })
2152        .await
2153    }
2154
2155    pub async fn update_project(
2156        &self,
2157        project_id: ProjectId,
2158        connection: ConnectionId,
2159        worktrees: &[proto::WorktreeMetadata],
2160    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2161        let room_id = self.room_id_for_project(project_id).await?;
2162        self.room_transaction(room_id, |tx| async move {
2163            let project = project::Entity::find_by_id(project_id)
2164                .filter(
2165                    Condition::all()
2166                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2167                        .add(
2168                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2169                        ),
2170                )
2171                .one(&*tx)
2172                .await?
2173                .ok_or_else(|| anyhow!("no such project"))?;
2174
2175            self.update_project_worktrees(project.id, worktrees, &tx)
2176                .await?;
2177
2178            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2179            let room = self.get_room(project.room_id, &tx).await?;
2180            Ok((room, guest_connection_ids))
2181        })
2182        .await
2183    }
2184
2185    async fn update_project_worktrees(
2186        &self,
2187        project_id: ProjectId,
2188        worktrees: &[proto::WorktreeMetadata],
2189        tx: &DatabaseTransaction,
2190    ) -> Result<()> {
2191        if !worktrees.is_empty() {
2192            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2193                id: ActiveValue::set(worktree.id as i64),
2194                project_id: ActiveValue::set(project_id),
2195                abs_path: ActiveValue::set(worktree.abs_path.clone()),
2196                root_name: ActiveValue::set(worktree.root_name.clone()),
2197                visible: ActiveValue::set(worktree.visible),
2198                scan_id: ActiveValue::set(0),
2199                completed_scan_id: ActiveValue::set(0),
2200            }))
2201            .on_conflict(
2202                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2203                    .update_column(worktree::Column::RootName)
2204                    .to_owned(),
2205            )
2206            .exec(&*tx)
2207            .await?;
2208        }
2209
2210        worktree::Entity::delete_many()
2211            .filter(worktree::Column::ProjectId.eq(project_id).and(
2212                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2213            ))
2214            .exec(&*tx)
2215            .await?;
2216
2217        Ok(())
2218    }
2219
2220    pub async fn update_worktree(
2221        &self,
2222        update: &proto::UpdateWorktree,
2223        connection: ConnectionId,
2224    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2225        let project_id = ProjectId::from_proto(update.project_id);
2226        let worktree_id = update.worktree_id as i64;
2227        let room_id = self.room_id_for_project(project_id).await?;
2228        self.room_transaction(room_id, |tx| async move {
2229            // Ensure the update comes from the host.
2230            let _project = project::Entity::find_by_id(project_id)
2231                .filter(
2232                    Condition::all()
2233                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2234                        .add(
2235                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2236                        ),
2237                )
2238                .one(&*tx)
2239                .await?
2240                .ok_or_else(|| anyhow!("no such project"))?;
2241
2242            // Update metadata.
2243            worktree::Entity::update(worktree::ActiveModel {
2244                id: ActiveValue::set(worktree_id),
2245                project_id: ActiveValue::set(project_id),
2246                root_name: ActiveValue::set(update.root_name.clone()),
2247                scan_id: ActiveValue::set(update.scan_id as i64),
2248                completed_scan_id: if update.is_last_update {
2249                    ActiveValue::set(update.scan_id as i64)
2250                } else {
2251                    ActiveValue::default()
2252                },
2253                abs_path: ActiveValue::set(update.abs_path.clone()),
2254                ..Default::default()
2255            })
2256            .exec(&*tx)
2257            .await?;
2258
2259            if !update.updated_entries.is_empty() {
2260                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2261                    let mtime = entry.mtime.clone().unwrap_or_default();
2262                    worktree_entry::ActiveModel {
2263                        project_id: ActiveValue::set(project_id),
2264                        worktree_id: ActiveValue::set(worktree_id),
2265                        id: ActiveValue::set(entry.id as i64),
2266                        is_dir: ActiveValue::set(entry.is_dir),
2267                        path: ActiveValue::set(entry.path.clone()),
2268                        inode: ActiveValue::set(entry.inode as i64),
2269                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2270                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2271                        is_symlink: ActiveValue::set(entry.is_symlink),
2272                        is_ignored: ActiveValue::set(entry.is_ignored),
2273                        is_deleted: ActiveValue::set(false),
2274                        scan_id: ActiveValue::set(update.scan_id as i64),
2275                    }
2276                }))
2277                .on_conflict(
2278                    OnConflict::columns([
2279                        worktree_entry::Column::ProjectId,
2280                        worktree_entry::Column::WorktreeId,
2281                        worktree_entry::Column::Id,
2282                    ])
2283                    .update_columns([
2284                        worktree_entry::Column::IsDir,
2285                        worktree_entry::Column::Path,
2286                        worktree_entry::Column::Inode,
2287                        worktree_entry::Column::MtimeSeconds,
2288                        worktree_entry::Column::MtimeNanos,
2289                        worktree_entry::Column::IsSymlink,
2290                        worktree_entry::Column::IsIgnored,
2291                        worktree_entry::Column::ScanId,
2292                    ])
2293                    .to_owned(),
2294                )
2295                .exec(&*tx)
2296                .await?;
2297            }
2298
2299            if !update.removed_entries.is_empty() {
2300                worktree_entry::Entity::update_many()
2301                    .filter(
2302                        worktree_entry::Column::ProjectId
2303                            .eq(project_id)
2304                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2305                            .and(
2306                                worktree_entry::Column::Id
2307                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2308                            ),
2309                    )
2310                    .set(worktree_entry::ActiveModel {
2311                        is_deleted: ActiveValue::Set(true),
2312                        scan_id: ActiveValue::Set(update.scan_id as i64),
2313                        ..Default::default()
2314                    })
2315                    .exec(&*tx)
2316                    .await?;
2317            }
2318
2319            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2320            Ok(connection_ids)
2321        })
2322        .await
2323    }
2324
2325    pub async fn update_diagnostic_summary(
2326        &self,
2327        update: &proto::UpdateDiagnosticSummary,
2328        connection: ConnectionId,
2329    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2330        let project_id = ProjectId::from_proto(update.project_id);
2331        let worktree_id = update.worktree_id as i64;
2332        let room_id = self.room_id_for_project(project_id).await?;
2333        self.room_transaction(room_id, |tx| async move {
2334            let summary = update
2335                .summary
2336                .as_ref()
2337                .ok_or_else(|| anyhow!("invalid summary"))?;
2338
2339            // Ensure the update comes from the host.
2340            let project = project::Entity::find_by_id(project_id)
2341                .one(&*tx)
2342                .await?
2343                .ok_or_else(|| anyhow!("no such project"))?;
2344            if project.host_connection()? != connection {
2345                return Err(anyhow!("can't update a project hosted by someone else"))?;
2346            }
2347
2348            // Update summary.
2349            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2350                project_id: ActiveValue::set(project_id),
2351                worktree_id: ActiveValue::set(worktree_id),
2352                path: ActiveValue::set(summary.path.clone()),
2353                language_server_id: ActiveValue::set(summary.language_server_id as i64),
2354                error_count: ActiveValue::set(summary.error_count as i32),
2355                warning_count: ActiveValue::set(summary.warning_count as i32),
2356                ..Default::default()
2357            })
2358            .on_conflict(
2359                OnConflict::columns([
2360                    worktree_diagnostic_summary::Column::ProjectId,
2361                    worktree_diagnostic_summary::Column::WorktreeId,
2362                    worktree_diagnostic_summary::Column::Path,
2363                ])
2364                .update_columns([
2365                    worktree_diagnostic_summary::Column::LanguageServerId,
2366                    worktree_diagnostic_summary::Column::ErrorCount,
2367                    worktree_diagnostic_summary::Column::WarningCount,
2368                ])
2369                .to_owned(),
2370            )
2371            .exec(&*tx)
2372            .await?;
2373
2374            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2375            Ok(connection_ids)
2376        })
2377        .await
2378    }
2379
2380    pub async fn start_language_server(
2381        &self,
2382        update: &proto::StartLanguageServer,
2383        connection: ConnectionId,
2384    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2385        let project_id = ProjectId::from_proto(update.project_id);
2386        let room_id = self.room_id_for_project(project_id).await?;
2387        self.room_transaction(room_id, |tx| async move {
2388            let server = update
2389                .server
2390                .as_ref()
2391                .ok_or_else(|| anyhow!("invalid language server"))?;
2392
2393            // Ensure the update comes from the host.
2394            let project = project::Entity::find_by_id(project_id)
2395                .one(&*tx)
2396                .await?
2397                .ok_or_else(|| anyhow!("no such project"))?;
2398            if project.host_connection()? != connection {
2399                return Err(anyhow!("can't update a project hosted by someone else"))?;
2400            }
2401
2402            // Add the newly-started language server.
2403            language_server::Entity::insert(language_server::ActiveModel {
2404                project_id: ActiveValue::set(project_id),
2405                id: ActiveValue::set(server.id as i64),
2406                name: ActiveValue::set(server.name.clone()),
2407                ..Default::default()
2408            })
2409            .on_conflict(
2410                OnConflict::columns([
2411                    language_server::Column::ProjectId,
2412                    language_server::Column::Id,
2413                ])
2414                .update_column(language_server::Column::Name)
2415                .to_owned(),
2416            )
2417            .exec(&*tx)
2418            .await?;
2419
2420            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2421            Ok(connection_ids)
2422        })
2423        .await
2424    }
2425
2426    pub async fn join_project(
2427        &self,
2428        project_id: ProjectId,
2429        connection: ConnectionId,
2430    ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2431        let room_id = self.room_id_for_project(project_id).await?;
2432        self.room_transaction(room_id, |tx| async move {
2433            let participant = room_participant::Entity::find()
2434                .filter(
2435                    Condition::all()
2436                        .add(
2437                            room_participant::Column::AnsweringConnectionId
2438                                .eq(connection.id as i32),
2439                        )
2440                        .add(
2441                            room_participant::Column::AnsweringConnectionServerId
2442                                .eq(connection.owner_id as i32),
2443                        ),
2444                )
2445                .one(&*tx)
2446                .await?
2447                .ok_or_else(|| anyhow!("must join a room first"))?;
2448
2449            let project = project::Entity::find_by_id(project_id)
2450                .one(&*tx)
2451                .await?
2452                .ok_or_else(|| anyhow!("no such project"))?;
2453            if project.room_id != participant.room_id {
2454                return Err(anyhow!("no such project"))?;
2455            }
2456
2457            let mut collaborators = project
2458                .find_related(project_collaborator::Entity)
2459                .all(&*tx)
2460                .await?;
2461            let replica_ids = collaborators
2462                .iter()
2463                .map(|c| c.replica_id)
2464                .collect::<HashSet<_>>();
2465            let mut replica_id = ReplicaId(1);
2466            while replica_ids.contains(&replica_id) {
2467                replica_id.0 += 1;
2468            }
2469            let new_collaborator = project_collaborator::ActiveModel {
2470                project_id: ActiveValue::set(project_id),
2471                connection_id: ActiveValue::set(connection.id as i32),
2472                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2473                user_id: ActiveValue::set(participant.user_id),
2474                replica_id: ActiveValue::set(replica_id),
2475                is_host: ActiveValue::set(false),
2476                ..Default::default()
2477            }
2478            .insert(&*tx)
2479            .await?;
2480            collaborators.push(new_collaborator);
2481
2482            let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2483            let mut worktrees = db_worktrees
2484                .into_iter()
2485                .map(|db_worktree| {
2486                    (
2487                        db_worktree.id as u64,
2488                        Worktree {
2489                            id: db_worktree.id as u64,
2490                            abs_path: db_worktree.abs_path,
2491                            root_name: db_worktree.root_name,
2492                            visible: db_worktree.visible,
2493                            entries: Default::default(),
2494                            diagnostic_summaries: Default::default(),
2495                            scan_id: db_worktree.scan_id as u64,
2496                            completed_scan_id: db_worktree.completed_scan_id as u64,
2497                        },
2498                    )
2499                })
2500                .collect::<BTreeMap<_, _>>();
2501
2502            // Populate worktree entries.
2503            {
2504                let mut db_entries = worktree_entry::Entity::find()
2505                    .filter(
2506                        Condition::all()
2507                            .add(worktree_entry::Column::ProjectId.eq(project_id))
2508                            .add(worktree_entry::Column::IsDeleted.eq(false)),
2509                    )
2510                    .stream(&*tx)
2511                    .await?;
2512                while let Some(db_entry) = db_entries.next().await {
2513                    let db_entry = db_entry?;
2514                    if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2515                        worktree.entries.push(proto::Entry {
2516                            id: db_entry.id as u64,
2517                            is_dir: db_entry.is_dir,
2518                            path: db_entry.path,
2519                            inode: db_entry.inode as u64,
2520                            mtime: Some(proto::Timestamp {
2521                                seconds: db_entry.mtime_seconds as u64,
2522                                nanos: db_entry.mtime_nanos as u32,
2523                            }),
2524                            is_symlink: db_entry.is_symlink,
2525                            is_ignored: db_entry.is_ignored,
2526                        });
2527                    }
2528                }
2529            }
2530
2531            // Populate worktree diagnostic summaries.
2532            {
2533                let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2534                    .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2535                    .stream(&*tx)
2536                    .await?;
2537                while let Some(db_summary) = db_summaries.next().await {
2538                    let db_summary = db_summary?;
2539                    if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2540                        worktree
2541                            .diagnostic_summaries
2542                            .push(proto::DiagnosticSummary {
2543                                path: db_summary.path,
2544                                language_server_id: db_summary.language_server_id as u64,
2545                                error_count: db_summary.error_count as u32,
2546                                warning_count: db_summary.warning_count as u32,
2547                            });
2548                    }
2549                }
2550            }
2551
2552            // Populate language servers.
2553            let language_servers = project
2554                .find_related(language_server::Entity)
2555                .all(&*tx)
2556                .await?;
2557
2558            let project = Project {
2559                collaborators: collaborators
2560                    .into_iter()
2561                    .map(|collaborator| ProjectCollaborator {
2562                        connection_id: collaborator.connection(),
2563                        user_id: collaborator.user_id,
2564                        replica_id: collaborator.replica_id,
2565                        is_host: collaborator.is_host,
2566                    })
2567                    .collect(),
2568                worktrees,
2569                language_servers: language_servers
2570                    .into_iter()
2571                    .map(|language_server| proto::LanguageServer {
2572                        id: language_server.id as u64,
2573                        name: language_server.name,
2574                    })
2575                    .collect(),
2576            };
2577            Ok((project, replica_id as ReplicaId))
2578        })
2579        .await
2580    }
2581
2582    pub async fn leave_project(
2583        &self,
2584        project_id: ProjectId,
2585        connection: ConnectionId,
2586    ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
2587        let room_id = self.room_id_for_project(project_id).await?;
2588        self.room_transaction(room_id, |tx| async move {
2589            let result = project_collaborator::Entity::delete_many()
2590                .filter(
2591                    Condition::all()
2592                        .add(project_collaborator::Column::ProjectId.eq(project_id))
2593                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2594                        .add(
2595                            project_collaborator::Column::ConnectionServerId
2596                                .eq(connection.owner_id as i32),
2597                        ),
2598                )
2599                .exec(&*tx)
2600                .await?;
2601            if result.rows_affected == 0 {
2602                Err(anyhow!("not a collaborator on this project"))?;
2603            }
2604
2605            let project = project::Entity::find_by_id(project_id)
2606                .one(&*tx)
2607                .await?
2608                .ok_or_else(|| anyhow!("no such project"))?;
2609            let collaborators = project
2610                .find_related(project_collaborator::Entity)
2611                .all(&*tx)
2612                .await?;
2613            let connection_ids = collaborators
2614                .into_iter()
2615                .map(|collaborator| collaborator.connection())
2616                .collect();
2617
2618            follower::Entity::delete_many()
2619                .filter(
2620                    Condition::any()
2621                        .add(
2622                            Condition::all()
2623                                .add(follower::Column::ProjectId.eq(project_id))
2624                                .add(
2625                                    follower::Column::LeaderConnectionServerId
2626                                        .eq(connection.owner_id),
2627                                )
2628                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
2629                        )
2630                        .add(
2631                            Condition::all()
2632                                .add(follower::Column::ProjectId.eq(project_id))
2633                                .add(
2634                                    follower::Column::FollowerConnectionServerId
2635                                        .eq(connection.owner_id),
2636                                )
2637                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
2638                        ),
2639                )
2640                .exec(&*tx)
2641                .await?;
2642
2643            let room = self.get_room(project.room_id, &tx).await?;
2644            let left_project = LeftProject {
2645                id: project_id,
2646                host_user_id: project.host_user_id,
2647                host_connection_id: project.host_connection()?,
2648                connection_ids,
2649            };
2650            Ok((room, left_project))
2651        })
2652        .await
2653    }
2654
2655    pub async fn project_collaborators(
2656        &self,
2657        project_id: ProjectId,
2658        connection_id: ConnectionId,
2659    ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
2660        let room_id = self.room_id_for_project(project_id).await?;
2661        self.room_transaction(room_id, |tx| async move {
2662            let collaborators = project_collaborator::Entity::find()
2663                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2664                .all(&*tx)
2665                .await?
2666                .into_iter()
2667                .map(|collaborator| ProjectCollaborator {
2668                    connection_id: collaborator.connection(),
2669                    user_id: collaborator.user_id,
2670                    replica_id: collaborator.replica_id,
2671                    is_host: collaborator.is_host,
2672                })
2673                .collect::<Vec<_>>();
2674
2675            if collaborators
2676                .iter()
2677                .any(|collaborator| collaborator.connection_id == connection_id)
2678            {
2679                Ok(collaborators)
2680            } else {
2681                Err(anyhow!("no such project"))?
2682            }
2683        })
2684        .await
2685    }
2686
2687    pub async fn project_connection_ids(
2688        &self,
2689        project_id: ProjectId,
2690        connection_id: ConnectionId,
2691    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
2692        let room_id = self.room_id_for_project(project_id).await?;
2693        self.room_transaction(room_id, |tx| async move {
2694            let mut collaborators = project_collaborator::Entity::find()
2695                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2696                .stream(&*tx)
2697                .await?;
2698
2699            let mut connection_ids = HashSet::default();
2700            while let Some(collaborator) = collaborators.next().await {
2701                let collaborator = collaborator?;
2702                connection_ids.insert(collaborator.connection());
2703            }
2704
2705            if connection_ids.contains(&connection_id) {
2706                Ok(connection_ids)
2707            } else {
2708                Err(anyhow!("no such project"))?
2709            }
2710        })
2711        .await
2712    }
2713
2714    async fn project_guest_connection_ids(
2715        &self,
2716        project_id: ProjectId,
2717        tx: &DatabaseTransaction,
2718    ) -> Result<Vec<ConnectionId>> {
2719        let mut collaborators = project_collaborator::Entity::find()
2720            .filter(
2721                project_collaborator::Column::ProjectId
2722                    .eq(project_id)
2723                    .and(project_collaborator::Column::IsHost.eq(false)),
2724            )
2725            .stream(tx)
2726            .await?;
2727
2728        let mut guest_connection_ids = Vec::new();
2729        while let Some(collaborator) = collaborators.next().await {
2730            let collaborator = collaborator?;
2731            guest_connection_ids.push(collaborator.connection());
2732        }
2733        Ok(guest_connection_ids)
2734    }
2735
2736    async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
2737        self.transaction(|tx| async move {
2738            let project = project::Entity::find_by_id(project_id)
2739                .one(&*tx)
2740                .await?
2741                .ok_or_else(|| anyhow!("project {} not found", project_id))?;
2742            Ok(project.room_id)
2743        })
2744        .await
2745    }
2746
2747    // access tokens
2748
2749    pub async fn create_access_token(
2750        &self,
2751        user_id: UserId,
2752        access_token_hash: &str,
2753        max_access_token_count: usize,
2754    ) -> Result<AccessTokenId> {
2755        self.transaction(|tx| async {
2756            let tx = tx;
2757
2758            let token = access_token::ActiveModel {
2759                user_id: ActiveValue::set(user_id),
2760                hash: ActiveValue::set(access_token_hash.into()),
2761                ..Default::default()
2762            }
2763            .insert(&*tx)
2764            .await?;
2765
2766            access_token::Entity::delete_many()
2767                .filter(
2768                    access_token::Column::Id.in_subquery(
2769                        Query::select()
2770                            .column(access_token::Column::Id)
2771                            .from(access_token::Entity)
2772                            .and_where(access_token::Column::UserId.eq(user_id))
2773                            .order_by(access_token::Column::Id, sea_orm::Order::Desc)
2774                            .limit(10000)
2775                            .offset(max_access_token_count as u64)
2776                            .to_owned(),
2777                    ),
2778                )
2779                .exec(&*tx)
2780                .await?;
2781            Ok(token.id)
2782        })
2783        .await
2784    }
2785
2786    pub async fn get_access_token(
2787        &self,
2788        access_token_id: AccessTokenId,
2789    ) -> Result<access_token::Model> {
2790        self.transaction(|tx| async move {
2791            Ok(access_token::Entity::find_by_id(access_token_id)
2792                .one(&*tx)
2793                .await?
2794                .ok_or_else(|| anyhow!("no such access token"))?)
2795        })
2796        .await
2797    }
2798
2799    async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
2800    where
2801        F: Send + Fn(TransactionHandle) -> Fut,
2802        Fut: Send + Future<Output = Result<T>>,
2803    {
2804        let body = async {
2805            let mut i = 0;
2806            loop {
2807                let (tx, result) = self.with_transaction(&f).await?;
2808                match result {
2809                    Ok(result) => match tx.commit().await.map_err(Into::into) {
2810                        Ok(()) => return Ok(result),
2811                        Err(error) => {
2812                            if !self.retry_on_serialization_error(&error, i).await {
2813                                return Err(error);
2814                            }
2815                        }
2816                    },
2817                    Err(error) => {
2818                        tx.rollback().await?;
2819                        if !self.retry_on_serialization_error(&error, i).await {
2820                            return Err(error);
2821                        }
2822                    }
2823                }
2824                i += 1;
2825            }
2826        };
2827
2828        self.run(body).await
2829    }
2830
2831    async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
2832    where
2833        F: Send + Fn(TransactionHandle) -> Fut,
2834        Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
2835    {
2836        let body = async {
2837            let mut i = 0;
2838            loop {
2839                let (tx, result) = self.with_transaction(&f).await?;
2840                match result {
2841                    Ok(Some((room_id, data))) => {
2842                        let lock = self.rooms.entry(room_id).or_default().clone();
2843                        let _guard = lock.lock_owned().await;
2844                        match tx.commit().await.map_err(Into::into) {
2845                            Ok(()) => {
2846                                return Ok(Some(RoomGuard {
2847                                    data,
2848                                    _guard,
2849                                    _not_send: PhantomData,
2850                                }));
2851                            }
2852                            Err(error) => {
2853                                if !self.retry_on_serialization_error(&error, i).await {
2854                                    return Err(error);
2855                                }
2856                            }
2857                        }
2858                    }
2859                    Ok(None) => match tx.commit().await.map_err(Into::into) {
2860                        Ok(()) => return Ok(None),
2861                        Err(error) => {
2862                            if !self.retry_on_serialization_error(&error, i).await {
2863                                return Err(error);
2864                            }
2865                        }
2866                    },
2867                    Err(error) => {
2868                        tx.rollback().await?;
2869                        if !self.retry_on_serialization_error(&error, i).await {
2870                            return Err(error);
2871                        }
2872                    }
2873                }
2874                i += 1;
2875            }
2876        };
2877
2878        self.run(body).await
2879    }
2880
2881    async fn room_transaction<F, Fut, T>(&self, room_id: RoomId, f: F) -> Result<RoomGuard<T>>
2882    where
2883        F: Send + Fn(TransactionHandle) -> Fut,
2884        Fut: Send + Future<Output = Result<T>>,
2885    {
2886        let body = async {
2887            let mut i = 0;
2888            loop {
2889                let lock = self.rooms.entry(room_id).or_default().clone();
2890                let _guard = lock.lock_owned().await;
2891                let (tx, result) = self.with_transaction(&f).await?;
2892                match result {
2893                    Ok(data) => match tx.commit().await.map_err(Into::into) {
2894                        Ok(()) => {
2895                            return Ok(RoomGuard {
2896                                data,
2897                                _guard,
2898                                _not_send: PhantomData,
2899                            });
2900                        }
2901                        Err(error) => {
2902                            if !self.retry_on_serialization_error(&error, i).await {
2903                                return Err(error);
2904                            }
2905                        }
2906                    },
2907                    Err(error) => {
2908                        tx.rollback().await?;
2909                        if !self.retry_on_serialization_error(&error, i).await {
2910                            return Err(error);
2911                        }
2912                    }
2913                }
2914                i += 1;
2915            }
2916        };
2917
2918        self.run(body).await
2919    }
2920
2921    async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
2922    where
2923        F: Send + Fn(TransactionHandle) -> Fut,
2924        Fut: Send + Future<Output = Result<T>>,
2925    {
2926        let tx = self
2927            .pool
2928            .begin_with_config(Some(IsolationLevel::Serializable), None)
2929            .await?;
2930
2931        let mut tx = Arc::new(Some(tx));
2932        let result = f(TransactionHandle(tx.clone())).await;
2933        let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
2934            return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
2935        };
2936
2937        Ok((tx, result))
2938    }
2939
2940    async fn run<F, T>(&self, future: F) -> Result<T>
2941    where
2942        F: Future<Output = Result<T>>,
2943    {
2944        #[cfg(test)]
2945        {
2946            if let Executor::Deterministic(executor) = &self.executor {
2947                executor.simulate_random_delay().await;
2948            }
2949
2950            self.runtime.as_ref().unwrap().block_on(future)
2951        }
2952
2953        #[cfg(not(test))]
2954        {
2955            future.await
2956        }
2957    }
2958
2959    async fn retry_on_serialization_error(&self, error: &Error, prev_attempt_count: u32) -> bool {
2960        // If the error is due to a failure to serialize concurrent transactions, then retry
2961        // this transaction after a delay. With each subsequent retry, double the delay duration.
2962        // Also vary the delay randomly in order to ensure different database connections retry
2963        // at different times.
2964        if is_serialization_error(error) {
2965            let base_delay = 4_u64 << prev_attempt_count.min(16);
2966            let randomized_delay = base_delay as f32 * self.rng.lock().await.gen_range(0.5..=2.0);
2967            log::info!(
2968                "retrying transaction after serialization error. delay: {} ms.",
2969                randomized_delay
2970            );
2971            self.executor
2972                .sleep(Duration::from_millis(randomized_delay as u64))
2973                .await;
2974            true
2975        } else {
2976            false
2977        }
2978    }
2979}
2980
2981fn is_serialization_error(error: &Error) -> bool {
2982    const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
2983    match error {
2984        Error::Database(
2985            DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
2986            | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
2987        ) if error
2988            .as_database_error()
2989            .and_then(|error| error.code())
2990            .as_deref()
2991            == Some(SERIALIZATION_FAILURE_CODE) =>
2992        {
2993            true
2994        }
2995        _ => false,
2996    }
2997}
2998
2999struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
3000
3001impl Deref for TransactionHandle {
3002    type Target = DatabaseTransaction;
3003
3004    fn deref(&self) -> &Self::Target {
3005        self.0.as_ref().as_ref().unwrap()
3006    }
3007}
3008
3009pub struct RoomGuard<T> {
3010    data: T,
3011    _guard: OwnedMutexGuard<()>,
3012    _not_send: PhantomData<Rc<()>>,
3013}
3014
3015impl<T> Deref for RoomGuard<T> {
3016    type Target = T;
3017
3018    fn deref(&self) -> &T {
3019        &self.data
3020    }
3021}
3022
3023impl<T> DerefMut for RoomGuard<T> {
3024    fn deref_mut(&mut self) -> &mut T {
3025        &mut self.data
3026    }
3027}
3028
3029#[derive(Debug, Serialize, Deserialize)]
3030pub struct NewUserParams {
3031    pub github_login: String,
3032    pub github_user_id: i32,
3033    pub invite_count: i32,
3034}
3035
3036#[derive(Debug)]
3037pub struct NewUserResult {
3038    pub user_id: UserId,
3039    pub metrics_id: String,
3040    pub inviting_user_id: Option<UserId>,
3041    pub signup_device_id: Option<String>,
3042}
3043
3044fn random_invite_code() -> String {
3045    nanoid::nanoid!(16)
3046}
3047
3048fn random_email_confirmation_code() -> String {
3049    nanoid::nanoid!(64)
3050}
3051
3052macro_rules! id_type {
3053    ($name:ident) => {
3054        #[derive(
3055            Clone,
3056            Copy,
3057            Debug,
3058            Default,
3059            PartialEq,
3060            Eq,
3061            PartialOrd,
3062            Ord,
3063            Hash,
3064            Serialize,
3065            Deserialize,
3066        )]
3067        #[serde(transparent)]
3068        pub struct $name(pub i32);
3069
3070        impl $name {
3071            #[allow(unused)]
3072            pub const MAX: Self = Self(i32::MAX);
3073
3074            #[allow(unused)]
3075            pub fn from_proto(value: u64) -> Self {
3076                Self(value as i32)
3077            }
3078
3079            #[allow(unused)]
3080            pub fn to_proto(self) -> u64 {
3081                self.0 as u64
3082            }
3083        }
3084
3085        impl std::fmt::Display for $name {
3086            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
3087                self.0.fmt(f)
3088            }
3089        }
3090
3091        impl From<$name> for sea_query::Value {
3092            fn from(value: $name) -> Self {
3093                sea_query::Value::Int(Some(value.0))
3094            }
3095        }
3096
3097        impl sea_orm::TryGetable for $name {
3098            fn try_get(
3099                res: &sea_orm::QueryResult,
3100                pre: &str,
3101                col: &str,
3102            ) -> Result<Self, sea_orm::TryGetError> {
3103                Ok(Self(i32::try_get(res, pre, col)?))
3104            }
3105        }
3106
3107        impl sea_query::ValueType for $name {
3108            fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
3109                match v {
3110                    Value::TinyInt(Some(int)) => {
3111                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3112                    }
3113                    Value::SmallInt(Some(int)) => {
3114                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3115                    }
3116                    Value::Int(Some(int)) => {
3117                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3118                    }
3119                    Value::BigInt(Some(int)) => {
3120                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3121                    }
3122                    Value::TinyUnsigned(Some(int)) => {
3123                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3124                    }
3125                    Value::SmallUnsigned(Some(int)) => {
3126                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3127                    }
3128                    Value::Unsigned(Some(int)) => {
3129                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3130                    }
3131                    Value::BigUnsigned(Some(int)) => {
3132                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3133                    }
3134                    _ => Err(sea_query::ValueTypeErr),
3135                }
3136            }
3137
3138            fn type_name() -> String {
3139                stringify!($name).into()
3140            }
3141
3142            fn array_type() -> sea_query::ArrayType {
3143                sea_query::ArrayType::Int
3144            }
3145
3146            fn column_type() -> sea_query::ColumnType {
3147                sea_query::ColumnType::Integer(None)
3148            }
3149        }
3150
3151        impl sea_orm::TryFromU64 for $name {
3152            fn try_from_u64(n: u64) -> Result<Self, DbErr> {
3153                Ok(Self(n.try_into().map_err(|_| {
3154                    DbErr::ConvertFromU64(concat!(
3155                        "error converting ",
3156                        stringify!($name),
3157                        " to u64"
3158                    ))
3159                })?))
3160            }
3161        }
3162
3163        impl sea_query::Nullable for $name {
3164            fn null() -> Value {
3165                Value::Int(None)
3166            }
3167        }
3168    };
3169}
3170
3171id_type!(AccessTokenId);
3172id_type!(ContactId);
3173id_type!(FollowerId);
3174id_type!(RoomId);
3175id_type!(RoomParticipantId);
3176id_type!(ProjectId);
3177id_type!(ProjectCollaboratorId);
3178id_type!(ReplicaId);
3179id_type!(ServerId);
3180id_type!(SignupId);
3181id_type!(UserId);
3182
3183pub struct RejoinedRoom {
3184    pub room: proto::Room,
3185    pub rejoined_projects: Vec<RejoinedProject>,
3186    pub reshared_projects: Vec<ResharedProject>,
3187}
3188
3189pub struct ResharedProject {
3190    pub id: ProjectId,
3191    pub old_connection_id: ConnectionId,
3192    pub collaborators: Vec<ProjectCollaborator>,
3193    pub worktrees: Vec<proto::WorktreeMetadata>,
3194}
3195
3196pub struct RejoinedProject {
3197    pub id: ProjectId,
3198    pub old_connection_id: ConnectionId,
3199    pub collaborators: Vec<ProjectCollaborator>,
3200    pub worktrees: Vec<RejoinedWorktree>,
3201    pub language_servers: Vec<proto::LanguageServer>,
3202}
3203
3204#[derive(Debug)]
3205pub struct RejoinedWorktree {
3206    pub id: u64,
3207    pub abs_path: String,
3208    pub root_name: String,
3209    pub visible: bool,
3210    pub updated_entries: Vec<proto::Entry>,
3211    pub removed_entries: Vec<u64>,
3212    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3213    pub scan_id: u64,
3214    pub completed_scan_id: u64,
3215}
3216
3217pub struct LeftRoom {
3218    pub room: proto::Room,
3219    pub left_projects: HashMap<ProjectId, LeftProject>,
3220    pub canceled_calls_to_user_ids: Vec<UserId>,
3221}
3222
3223pub struct RefreshedRoom {
3224    pub room: proto::Room,
3225    pub stale_participant_user_ids: Vec<UserId>,
3226    pub canceled_calls_to_user_ids: Vec<UserId>,
3227}
3228
3229pub struct Project {
3230    pub collaborators: Vec<ProjectCollaborator>,
3231    pub worktrees: BTreeMap<u64, Worktree>,
3232    pub language_servers: Vec<proto::LanguageServer>,
3233}
3234
3235pub struct ProjectCollaborator {
3236    pub connection_id: ConnectionId,
3237    pub user_id: UserId,
3238    pub replica_id: ReplicaId,
3239    pub is_host: bool,
3240}
3241
3242impl ProjectCollaborator {
3243    pub fn to_proto(&self) -> proto::Collaborator {
3244        proto::Collaborator {
3245            peer_id: Some(self.connection_id.into()),
3246            replica_id: self.replica_id.0 as u32,
3247            user_id: self.user_id.to_proto(),
3248        }
3249    }
3250}
3251
3252#[derive(Debug)]
3253pub struct LeftProject {
3254    pub id: ProjectId,
3255    pub host_user_id: UserId,
3256    pub host_connection_id: ConnectionId,
3257    pub connection_ids: Vec<ConnectionId>,
3258}
3259
3260pub struct Worktree {
3261    pub id: u64,
3262    pub abs_path: String,
3263    pub root_name: String,
3264    pub visible: bool,
3265    pub entries: Vec<proto::Entry>,
3266    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3267    pub scan_id: u64,
3268    pub completed_scan_id: u64,
3269}
3270
3271#[cfg(test)]
3272pub use test::*;
3273
3274#[cfg(test)]
3275mod test {
3276    use super::*;
3277    use gpui::executor::Background;
3278    use lazy_static::lazy_static;
3279    use parking_lot::Mutex;
3280    use sea_orm::ConnectionTrait;
3281    use sqlx::migrate::MigrateDatabase;
3282    use std::sync::Arc;
3283
3284    pub struct TestDb {
3285        pub db: Option<Arc<Database>>,
3286        pub connection: Option<sqlx::AnyConnection>,
3287    }
3288
3289    impl TestDb {
3290        pub fn sqlite(background: Arc<Background>) -> Self {
3291            let url = format!("sqlite::memory:");
3292            let runtime = tokio::runtime::Builder::new_current_thread()
3293                .enable_io()
3294                .enable_time()
3295                .build()
3296                .unwrap();
3297
3298            let mut db = runtime.block_on(async {
3299                let mut options = ConnectOptions::new(url);
3300                options.max_connections(5);
3301                let db = Database::new(options, Executor::Deterministic(background))
3302                    .await
3303                    .unwrap();
3304                let sql = include_str!(concat!(
3305                    env!("CARGO_MANIFEST_DIR"),
3306                    "/migrations.sqlite/20221109000000_test_schema.sql"
3307                ));
3308                db.pool
3309                    .execute(sea_orm::Statement::from_string(
3310                        db.pool.get_database_backend(),
3311                        sql.into(),
3312                    ))
3313                    .await
3314                    .unwrap();
3315                db
3316            });
3317
3318            db.runtime = Some(runtime);
3319
3320            Self {
3321                db: Some(Arc::new(db)),
3322                connection: None,
3323            }
3324        }
3325
3326        pub fn postgres(background: Arc<Background>) -> Self {
3327            lazy_static! {
3328                static ref LOCK: Mutex<()> = Mutex::new(());
3329            }
3330
3331            let _guard = LOCK.lock();
3332            let mut rng = StdRng::from_entropy();
3333            let url = format!(
3334                "postgres://postgres@localhost/zed-test-{}",
3335                rng.gen::<u128>()
3336            );
3337            let runtime = tokio::runtime::Builder::new_current_thread()
3338                .enable_io()
3339                .enable_time()
3340                .build()
3341                .unwrap();
3342
3343            let mut db = runtime.block_on(async {
3344                sqlx::Postgres::create_database(&url)
3345                    .await
3346                    .expect("failed to create test db");
3347                let mut options = ConnectOptions::new(url);
3348                options
3349                    .max_connections(5)
3350                    .idle_timeout(Duration::from_secs(0));
3351                let db = Database::new(options, Executor::Deterministic(background))
3352                    .await
3353                    .unwrap();
3354                let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
3355                db.migrate(Path::new(migrations_path), false).await.unwrap();
3356                db
3357            });
3358
3359            db.runtime = Some(runtime);
3360
3361            Self {
3362                db: Some(Arc::new(db)),
3363                connection: None,
3364            }
3365        }
3366
3367        pub fn db(&self) -> &Arc<Database> {
3368            self.db.as_ref().unwrap()
3369        }
3370    }
3371
3372    impl Drop for TestDb {
3373        fn drop(&mut self) {
3374            let db = self.db.take().unwrap();
3375            if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
3376                db.runtime.as_ref().unwrap().block_on(async {
3377                    use util::ResultExt;
3378                    let query = "
3379                        SELECT pg_terminate_backend(pg_stat_activity.pid)
3380                        FROM pg_stat_activity
3381                        WHERE
3382                            pg_stat_activity.datname = current_database() AND
3383                            pid <> pg_backend_pid();
3384                    ";
3385                    db.pool
3386                        .execute(sea_orm::Statement::from_string(
3387                            db.pool.get_database_backend(),
3388                            query.into(),
3389                        ))
3390                        .await
3391                        .log_err();
3392                    sqlx::Postgres::drop_database(db.options.get_url())
3393                        .await
3394                        .log_err();
3395                })
3396            }
3397        }
3398    }
3399}