db.rs

   1mod access_token;
   2mod contact;
   3mod follower;
   4mod language_server;
   5mod project;
   6mod project_collaborator;
   7mod room;
   8mod room_participant;
   9mod server;
  10mod signup;
  11#[cfg(test)]
  12mod tests;
  13mod user;
  14mod worktree;
  15mod worktree_diagnostic_summary;
  16mod worktree_entry;
  17
  18use crate::{Error, Result};
  19use anyhow::anyhow;
  20use collections::{BTreeMap, HashMap, HashSet};
  21pub use contact::Contact;
  22use dashmap::DashMap;
  23use futures::StreamExt;
  24use hyper::StatusCode;
  25use rpc::{proto, ConnectionId};
  26use sea_orm::Condition;
  27pub use sea_orm::ConnectOptions;
  28use sea_orm::{
  29    entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
  30    DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
  31    Statement, TransactionTrait,
  32};
  33use sea_query::{Alias, Expr, OnConflict, Query};
  34use serde::{Deserialize, Serialize};
  35pub use signup::{Invite, NewSignup, WaitlistSummary};
  36use sqlx::migrate::{Migrate, Migration, MigrationSource};
  37use sqlx::Connection;
  38use std::ops::{Deref, DerefMut};
  39use std::path::Path;
  40use std::time::Duration;
  41use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
  42use tokio::sync::{Mutex, OwnedMutexGuard};
  43pub use user::Model as User;
  44
  45pub struct Database {
  46    options: ConnectOptions,
  47    pool: DatabaseConnection,
  48    rooms: DashMap<RoomId, Arc<Mutex<()>>>,
  49    #[cfg(test)]
  50    background: Option<std::sync::Arc<gpui::executor::Background>>,
  51    #[cfg(test)]
  52    runtime: Option<tokio::runtime::Runtime>,
  53}
  54
  55impl Database {
  56    pub async fn new(options: ConnectOptions) -> Result<Self> {
  57        Ok(Self {
  58            options: options.clone(),
  59            pool: sea_orm::Database::connect(options).await?,
  60            rooms: DashMap::with_capacity(16384),
  61            #[cfg(test)]
  62            background: None,
  63            #[cfg(test)]
  64            runtime: None,
  65        })
  66    }
  67
  68    #[cfg(test)]
  69    pub fn reset(&self) {
  70        self.rooms.clear();
  71    }
  72
  73    pub async fn migrate(
  74        &self,
  75        migrations_path: &Path,
  76        ignore_checksum_mismatch: bool,
  77    ) -> anyhow::Result<Vec<(Migration, Duration)>> {
  78        let migrations = MigrationSource::resolve(migrations_path)
  79            .await
  80            .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
  81
  82        let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
  83
  84        connection.ensure_migrations_table().await?;
  85        let applied_migrations: HashMap<_, _> = connection
  86            .list_applied_migrations()
  87            .await?
  88            .into_iter()
  89            .map(|m| (m.version, m))
  90            .collect();
  91
  92        let mut new_migrations = Vec::new();
  93        for migration in migrations {
  94            match applied_migrations.get(&migration.version) {
  95                Some(applied_migration) => {
  96                    if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
  97                    {
  98                        Err(anyhow!(
  99                            "checksum mismatch for applied migration {}",
 100                            migration.description
 101                        ))?;
 102                    }
 103                }
 104                None => {
 105                    let elapsed = connection.apply(&migration).await?;
 106                    new_migrations.push((migration, elapsed));
 107                }
 108            }
 109        }
 110
 111        Ok(new_migrations)
 112    }
 113
 114    pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
 115        self.transaction(|tx| async move {
 116            let server = server::ActiveModel {
 117                environment: ActiveValue::set(environment.into()),
 118                ..Default::default()
 119            }
 120            .insert(&*tx)
 121            .await?;
 122            Ok(server.id)
 123        })
 124        .await
 125    }
 126
 127    pub async fn stale_room_ids(
 128        &self,
 129        environment: &str,
 130        new_server_id: ServerId,
 131    ) -> Result<Vec<RoomId>> {
 132        self.transaction(|tx| async move {
 133            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 134            enum QueryAs {
 135                RoomId,
 136            }
 137
 138            let stale_server_epochs = self
 139                .stale_server_ids(environment, new_server_id, &tx)
 140                .await?;
 141            Ok(room_participant::Entity::find()
 142                .select_only()
 143                .column(room_participant::Column::RoomId)
 144                .distinct()
 145                .filter(
 146                    room_participant::Column::AnsweringConnectionServerId
 147                        .is_in(stale_server_epochs),
 148                )
 149                .into_values::<_, QueryAs>()
 150                .all(&*tx)
 151                .await?)
 152        })
 153        .await
 154    }
 155
 156    pub async fn refresh_room(
 157        &self,
 158        room_id: RoomId,
 159        new_server_id: ServerId,
 160    ) -> Result<RoomGuard<RefreshedRoom>> {
 161        self.room_transaction(room_id, |tx| async move {
 162            let stale_participant_filter = Condition::all()
 163                .add(room_participant::Column::RoomId.eq(room_id))
 164                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
 165                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
 166
 167            let stale_participant_user_ids = room_participant::Entity::find()
 168                .filter(stale_participant_filter.clone())
 169                .all(&*tx)
 170                .await?
 171                .into_iter()
 172                .map(|participant| participant.user_id)
 173                .collect::<Vec<_>>();
 174
 175            // Delete participants who failed to reconnect.
 176            room_participant::Entity::delete_many()
 177                .filter(stale_participant_filter)
 178                .exec(&*tx)
 179                .await?;
 180
 181            let room = self.get_room(room_id, &tx).await?;
 182            let mut canceled_calls_to_user_ids = Vec::new();
 183            // Delete the room if it becomes empty and cancel pending calls.
 184            if room.participants.is_empty() {
 185                canceled_calls_to_user_ids.extend(
 186                    room.pending_participants
 187                        .iter()
 188                        .map(|pending_participant| UserId::from_proto(pending_participant.user_id)),
 189                );
 190                room_participant::Entity::delete_many()
 191                    .filter(room_participant::Column::RoomId.eq(room_id))
 192                    .exec(&*tx)
 193                    .await?;
 194                project::Entity::delete_many()
 195                    .filter(project::Column::RoomId.eq(room_id))
 196                    .exec(&*tx)
 197                    .await?;
 198                room::Entity::delete_by_id(room_id).exec(&*tx).await?;
 199            }
 200
 201            Ok(RefreshedRoom {
 202                room,
 203                stale_participant_user_ids,
 204                canceled_calls_to_user_ids,
 205            })
 206        })
 207        .await
 208    }
 209
 210    pub async fn delete_stale_servers(
 211        &self,
 212        environment: &str,
 213        new_server_id: ServerId,
 214    ) -> Result<()> {
 215        self.transaction(|tx| async move {
 216            server::Entity::delete_many()
 217                .filter(
 218                    Condition::all()
 219                        .add(server::Column::Environment.eq(environment))
 220                        .add(server::Column::Id.ne(new_server_id)),
 221                )
 222                .exec(&*tx)
 223                .await?;
 224            Ok(())
 225        })
 226        .await
 227    }
 228
 229    async fn stale_server_ids(
 230        &self,
 231        environment: &str,
 232        new_server_id: ServerId,
 233        tx: &DatabaseTransaction,
 234    ) -> Result<Vec<ServerId>> {
 235        let stale_servers = server::Entity::find()
 236            .filter(
 237                Condition::all()
 238                    .add(server::Column::Environment.eq(environment))
 239                    .add(server::Column::Id.ne(new_server_id)),
 240            )
 241            .all(&*tx)
 242            .await?;
 243        Ok(stale_servers.into_iter().map(|server| server.id).collect())
 244    }
 245
 246    // users
 247
 248    pub async fn create_user(
 249        &self,
 250        email_address: &str,
 251        admin: bool,
 252        params: NewUserParams,
 253    ) -> Result<NewUserResult> {
 254        self.transaction(|tx| async {
 255            let tx = tx;
 256            let user = user::Entity::insert(user::ActiveModel {
 257                email_address: ActiveValue::set(Some(email_address.into())),
 258                github_login: ActiveValue::set(params.github_login.clone()),
 259                github_user_id: ActiveValue::set(Some(params.github_user_id)),
 260                admin: ActiveValue::set(admin),
 261                metrics_id: ActiveValue::set(Uuid::new_v4()),
 262                ..Default::default()
 263            })
 264            .on_conflict(
 265                OnConflict::column(user::Column::GithubLogin)
 266                    .update_column(user::Column::GithubLogin)
 267                    .to_owned(),
 268            )
 269            .exec_with_returning(&*tx)
 270            .await?;
 271
 272            Ok(NewUserResult {
 273                user_id: user.id,
 274                metrics_id: user.metrics_id.to_string(),
 275                signup_device_id: None,
 276                inviting_user_id: None,
 277            })
 278        })
 279        .await
 280    }
 281
 282    pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
 283        self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
 284            .await
 285    }
 286
 287    pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
 288        self.transaction(|tx| async {
 289            let tx = tx;
 290            Ok(user::Entity::find()
 291                .filter(user::Column::Id.is_in(ids.iter().copied()))
 292                .all(&*tx)
 293                .await?)
 294        })
 295        .await
 296    }
 297
 298    pub async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>> {
 299        self.transaction(|tx| async move {
 300            Ok(user::Entity::find()
 301                .filter(user::Column::GithubLogin.eq(github_login))
 302                .one(&*tx)
 303                .await?)
 304        })
 305        .await
 306    }
 307
 308    pub async fn get_or_create_user_by_github_account(
 309        &self,
 310        github_login: &str,
 311        github_user_id: Option<i32>,
 312        github_email: Option<&str>,
 313    ) -> Result<Option<User>> {
 314        self.transaction(|tx| async move {
 315            let tx = &*tx;
 316            if let Some(github_user_id) = github_user_id {
 317                if let Some(user_by_github_user_id) = user::Entity::find()
 318                    .filter(user::Column::GithubUserId.eq(github_user_id))
 319                    .one(tx)
 320                    .await?
 321                {
 322                    let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
 323                    user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
 324                    Ok(Some(user_by_github_user_id.update(tx).await?))
 325                } else if let Some(user_by_github_login) = user::Entity::find()
 326                    .filter(user::Column::GithubLogin.eq(github_login))
 327                    .one(tx)
 328                    .await?
 329                {
 330                    let mut user_by_github_login = user_by_github_login.into_active_model();
 331                    user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
 332                    Ok(Some(user_by_github_login.update(tx).await?))
 333                } else {
 334                    let user = user::Entity::insert(user::ActiveModel {
 335                        email_address: ActiveValue::set(github_email.map(|email| email.into())),
 336                        github_login: ActiveValue::set(github_login.into()),
 337                        github_user_id: ActiveValue::set(Some(github_user_id)),
 338                        admin: ActiveValue::set(false),
 339                        invite_count: ActiveValue::set(0),
 340                        invite_code: ActiveValue::set(None),
 341                        metrics_id: ActiveValue::set(Uuid::new_v4()),
 342                        ..Default::default()
 343                    })
 344                    .exec_with_returning(&*tx)
 345                    .await?;
 346                    Ok(Some(user))
 347                }
 348            } else {
 349                Ok(user::Entity::find()
 350                    .filter(user::Column::GithubLogin.eq(github_login))
 351                    .one(tx)
 352                    .await?)
 353            }
 354        })
 355        .await
 356    }
 357
 358    pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
 359        self.transaction(|tx| async move {
 360            Ok(user::Entity::find()
 361                .order_by_asc(user::Column::GithubLogin)
 362                .limit(limit as u64)
 363                .offset(page as u64 * limit as u64)
 364                .all(&*tx)
 365                .await?)
 366        })
 367        .await
 368    }
 369
 370    pub async fn get_users_with_no_invites(
 371        &self,
 372        invited_by_another_user: bool,
 373    ) -> Result<Vec<User>> {
 374        self.transaction(|tx| async move {
 375            Ok(user::Entity::find()
 376                .filter(
 377                    user::Column::InviteCount
 378                        .eq(0)
 379                        .and(if invited_by_another_user {
 380                            user::Column::InviterId.is_not_null()
 381                        } else {
 382                            user::Column::InviterId.is_null()
 383                        }),
 384                )
 385                .all(&*tx)
 386                .await?)
 387        })
 388        .await
 389    }
 390
 391    pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
 392        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 393        enum QueryAs {
 394            MetricsId,
 395        }
 396
 397        self.transaction(|tx| async move {
 398            let metrics_id: Uuid = user::Entity::find_by_id(id)
 399                .select_only()
 400                .column(user::Column::MetricsId)
 401                .into_values::<_, QueryAs>()
 402                .one(&*tx)
 403                .await?
 404                .ok_or_else(|| anyhow!("could not find user"))?;
 405            Ok(metrics_id.to_string())
 406        })
 407        .await
 408    }
 409
 410    pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
 411        self.transaction(|tx| async move {
 412            user::Entity::update_many()
 413                .filter(user::Column::Id.eq(id))
 414                .set(user::ActiveModel {
 415                    admin: ActiveValue::set(is_admin),
 416                    ..Default::default()
 417                })
 418                .exec(&*tx)
 419                .await?;
 420            Ok(())
 421        })
 422        .await
 423    }
 424
 425    pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
 426        self.transaction(|tx| async move {
 427            user::Entity::update_many()
 428                .filter(user::Column::Id.eq(id))
 429                .set(user::ActiveModel {
 430                    connected_once: ActiveValue::set(connected_once),
 431                    ..Default::default()
 432                })
 433                .exec(&*tx)
 434                .await?;
 435            Ok(())
 436        })
 437        .await
 438    }
 439
 440    pub async fn destroy_user(&self, id: UserId) -> Result<()> {
 441        self.transaction(|tx| async move {
 442            access_token::Entity::delete_many()
 443                .filter(access_token::Column::UserId.eq(id))
 444                .exec(&*tx)
 445                .await?;
 446            user::Entity::delete_by_id(id).exec(&*tx).await?;
 447            Ok(())
 448        })
 449        .await
 450    }
 451
 452    // contacts
 453
 454    pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
 455        #[derive(Debug, FromQueryResult)]
 456        struct ContactWithUserBusyStatuses {
 457            user_id_a: UserId,
 458            user_id_b: UserId,
 459            a_to_b: bool,
 460            accepted: bool,
 461            should_notify: bool,
 462            user_a_busy: bool,
 463            user_b_busy: bool,
 464        }
 465
 466        self.transaction(|tx| async move {
 467            let user_a_participant = Alias::new("user_a_participant");
 468            let user_b_participant = Alias::new("user_b_participant");
 469            let mut db_contacts = contact::Entity::find()
 470                .column_as(
 471                    Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
 472                        .is_not_null(),
 473                    "user_a_busy",
 474                )
 475                .column_as(
 476                    Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
 477                        .is_not_null(),
 478                    "user_b_busy",
 479                )
 480                .filter(
 481                    contact::Column::UserIdA
 482                        .eq(user_id)
 483                        .or(contact::Column::UserIdB.eq(user_id)),
 484                )
 485                .join_as(
 486                    JoinType::LeftJoin,
 487                    contact::Relation::UserARoomParticipant.def(),
 488                    user_a_participant,
 489                )
 490                .join_as(
 491                    JoinType::LeftJoin,
 492                    contact::Relation::UserBRoomParticipant.def(),
 493                    user_b_participant,
 494                )
 495                .into_model::<ContactWithUserBusyStatuses>()
 496                .stream(&*tx)
 497                .await?;
 498
 499            let mut contacts = Vec::new();
 500            while let Some(db_contact) = db_contacts.next().await {
 501                let db_contact = db_contact?;
 502                if db_contact.user_id_a == user_id {
 503                    if db_contact.accepted {
 504                        contacts.push(Contact::Accepted {
 505                            user_id: db_contact.user_id_b,
 506                            should_notify: db_contact.should_notify && db_contact.a_to_b,
 507                            busy: db_contact.user_b_busy,
 508                        });
 509                    } else if db_contact.a_to_b {
 510                        contacts.push(Contact::Outgoing {
 511                            user_id: db_contact.user_id_b,
 512                        })
 513                    } else {
 514                        contacts.push(Contact::Incoming {
 515                            user_id: db_contact.user_id_b,
 516                            should_notify: db_contact.should_notify,
 517                        });
 518                    }
 519                } else if db_contact.accepted {
 520                    contacts.push(Contact::Accepted {
 521                        user_id: db_contact.user_id_a,
 522                        should_notify: db_contact.should_notify && !db_contact.a_to_b,
 523                        busy: db_contact.user_a_busy,
 524                    });
 525                } else if db_contact.a_to_b {
 526                    contacts.push(Contact::Incoming {
 527                        user_id: db_contact.user_id_a,
 528                        should_notify: db_contact.should_notify,
 529                    });
 530                } else {
 531                    contacts.push(Contact::Outgoing {
 532                        user_id: db_contact.user_id_a,
 533                    });
 534                }
 535            }
 536
 537            contacts.sort_unstable_by_key(|contact| contact.user_id());
 538
 539            Ok(contacts)
 540        })
 541        .await
 542    }
 543
 544    pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
 545        self.transaction(|tx| async move {
 546            let participant = room_participant::Entity::find()
 547                .filter(room_participant::Column::UserId.eq(user_id))
 548                .one(&*tx)
 549                .await?;
 550            Ok(participant.is_some())
 551        })
 552        .await
 553    }
 554
 555    pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
 556        self.transaction(|tx| async move {
 557            let (id_a, id_b) = if user_id_1 < user_id_2 {
 558                (user_id_1, user_id_2)
 559            } else {
 560                (user_id_2, user_id_1)
 561            };
 562
 563            Ok(contact::Entity::find()
 564                .filter(
 565                    contact::Column::UserIdA
 566                        .eq(id_a)
 567                        .and(contact::Column::UserIdB.eq(id_b))
 568                        .and(contact::Column::Accepted.eq(true)),
 569                )
 570                .one(&*tx)
 571                .await?
 572                .is_some())
 573        })
 574        .await
 575    }
 576
 577    pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
 578        self.transaction(|tx| async move {
 579            let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
 580                (sender_id, receiver_id, true)
 581            } else {
 582                (receiver_id, sender_id, false)
 583            };
 584
 585            let rows_affected = contact::Entity::insert(contact::ActiveModel {
 586                user_id_a: ActiveValue::set(id_a),
 587                user_id_b: ActiveValue::set(id_b),
 588                a_to_b: ActiveValue::set(a_to_b),
 589                accepted: ActiveValue::set(false),
 590                should_notify: ActiveValue::set(true),
 591                ..Default::default()
 592            })
 593            .on_conflict(
 594                OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
 595                    .values([
 596                        (contact::Column::Accepted, true.into()),
 597                        (contact::Column::ShouldNotify, false.into()),
 598                    ])
 599                    .action_and_where(
 600                        contact::Column::Accepted.eq(false).and(
 601                            contact::Column::AToB
 602                                .eq(a_to_b)
 603                                .and(contact::Column::UserIdA.eq(id_b))
 604                                .or(contact::Column::AToB
 605                                    .ne(a_to_b)
 606                                    .and(contact::Column::UserIdA.eq(id_a))),
 607                        ),
 608                    )
 609                    .to_owned(),
 610            )
 611            .exec_without_returning(&*tx)
 612            .await?;
 613
 614            if rows_affected == 1 {
 615                Ok(())
 616            } else {
 617                Err(anyhow!("contact already requested"))?
 618            }
 619        })
 620        .await
 621    }
 622
 623    /// Returns a bool indicating whether the removed contact had originally accepted or not
 624    ///
 625    /// Deletes the contact identified by the requester and responder ids, and then returns
 626    /// whether the deleted contact had originally accepted or was a pending contact request.
 627    ///
 628    /// # Arguments
 629    ///
 630    /// * `requester_id` - The user that initiates this request
 631    /// * `responder_id` - The user that will be removed
 632    pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<bool> {
 633        self.transaction(|tx| async move {
 634            let (id_a, id_b) = if responder_id < requester_id {
 635                (responder_id, requester_id)
 636            } else {
 637                (requester_id, responder_id)
 638            };
 639
 640            let contact = contact::Entity::find()
 641                .filter(
 642                    contact::Column::UserIdA
 643                        .eq(id_a)
 644                        .and(contact::Column::UserIdB.eq(id_b)),
 645                )
 646                .one(&*tx)
 647                .await?
 648                .ok_or_else(|| anyhow!("no such contact"))?;
 649
 650            contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
 651            Ok(contact.accepted)
 652        })
 653        .await
 654    }
 655
 656    pub async fn dismiss_contact_notification(
 657        &self,
 658        user_id: UserId,
 659        contact_user_id: UserId,
 660    ) -> Result<()> {
 661        self.transaction(|tx| async move {
 662            let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
 663                (user_id, contact_user_id, true)
 664            } else {
 665                (contact_user_id, user_id, false)
 666            };
 667
 668            let result = contact::Entity::update_many()
 669                .set(contact::ActiveModel {
 670                    should_notify: ActiveValue::set(false),
 671                    ..Default::default()
 672                })
 673                .filter(
 674                    contact::Column::UserIdA
 675                        .eq(id_a)
 676                        .and(contact::Column::UserIdB.eq(id_b))
 677                        .and(
 678                            contact::Column::AToB
 679                                .eq(a_to_b)
 680                                .and(contact::Column::Accepted.eq(true))
 681                                .or(contact::Column::AToB
 682                                    .ne(a_to_b)
 683                                    .and(contact::Column::Accepted.eq(false))),
 684                        ),
 685                )
 686                .exec(&*tx)
 687                .await?;
 688            if result.rows_affected == 0 {
 689                Err(anyhow!("no such contact request"))?
 690            } else {
 691                Ok(())
 692            }
 693        })
 694        .await
 695    }
 696
 697    pub async fn respond_to_contact_request(
 698        &self,
 699        responder_id: UserId,
 700        requester_id: UserId,
 701        accept: bool,
 702    ) -> Result<()> {
 703        self.transaction(|tx| async move {
 704            let (id_a, id_b, a_to_b) = if responder_id < requester_id {
 705                (responder_id, requester_id, false)
 706            } else {
 707                (requester_id, responder_id, true)
 708            };
 709            let rows_affected = if accept {
 710                let result = contact::Entity::update_many()
 711                    .set(contact::ActiveModel {
 712                        accepted: ActiveValue::set(true),
 713                        should_notify: ActiveValue::set(true),
 714                        ..Default::default()
 715                    })
 716                    .filter(
 717                        contact::Column::UserIdA
 718                            .eq(id_a)
 719                            .and(contact::Column::UserIdB.eq(id_b))
 720                            .and(contact::Column::AToB.eq(a_to_b)),
 721                    )
 722                    .exec(&*tx)
 723                    .await?;
 724                result.rows_affected
 725            } else {
 726                let result = contact::Entity::delete_many()
 727                    .filter(
 728                        contact::Column::UserIdA
 729                            .eq(id_a)
 730                            .and(contact::Column::UserIdB.eq(id_b))
 731                            .and(contact::Column::AToB.eq(a_to_b))
 732                            .and(contact::Column::Accepted.eq(false)),
 733                    )
 734                    .exec(&*tx)
 735                    .await?;
 736
 737                result.rows_affected
 738            };
 739
 740            if rows_affected == 1 {
 741                Ok(())
 742            } else {
 743                Err(anyhow!("no such contact request"))?
 744            }
 745        })
 746        .await
 747    }
 748
 749    pub fn fuzzy_like_string(string: &str) -> String {
 750        let mut result = String::with_capacity(string.len() * 2 + 1);
 751        for c in string.chars() {
 752            if c.is_alphanumeric() {
 753                result.push('%');
 754                result.push(c);
 755            }
 756        }
 757        result.push('%');
 758        result
 759    }
 760
 761    pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
 762        self.transaction(|tx| async {
 763            let tx = tx;
 764            let like_string = Self::fuzzy_like_string(name_query);
 765            let query = "
 766                SELECT users.*
 767                FROM users
 768                WHERE github_login ILIKE $1
 769                ORDER BY github_login <-> $2
 770                LIMIT $3
 771            ";
 772
 773            Ok(user::Entity::find()
 774                .from_raw_sql(Statement::from_sql_and_values(
 775                    self.pool.get_database_backend(),
 776                    query.into(),
 777                    vec![like_string.into(), name_query.into(), limit.into()],
 778                ))
 779                .all(&*tx)
 780                .await?)
 781        })
 782        .await
 783    }
 784
 785    // signups
 786
 787    pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
 788        self.transaction(|tx| async move {
 789            signup::Entity::insert(signup::ActiveModel {
 790                email_address: ActiveValue::set(signup.email_address.clone()),
 791                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 792                email_confirmation_sent: ActiveValue::set(false),
 793                platform_mac: ActiveValue::set(signup.platform_mac),
 794                platform_windows: ActiveValue::set(signup.platform_windows),
 795                platform_linux: ActiveValue::set(signup.platform_linux),
 796                platform_unknown: ActiveValue::set(false),
 797                editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
 798                programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
 799                device_id: ActiveValue::set(signup.device_id.clone()),
 800                added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
 801                ..Default::default()
 802            })
 803            .on_conflict(
 804                OnConflict::column(signup::Column::EmailAddress)
 805                    .update_columns([
 806                        signup::Column::PlatformMac,
 807                        signup::Column::PlatformWindows,
 808                        signup::Column::PlatformLinux,
 809                        signup::Column::EditorFeatures,
 810                        signup::Column::ProgrammingLanguages,
 811                        signup::Column::DeviceId,
 812                        signup::Column::AddedToMailingList,
 813                    ])
 814                    .to_owned(),
 815            )
 816            .exec(&*tx)
 817            .await?;
 818            Ok(())
 819        })
 820        .await
 821    }
 822
 823    pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
 824        self.transaction(|tx| async move {
 825            let signup = signup::Entity::find()
 826                .filter(signup::Column::EmailAddress.eq(email_address))
 827                .one(&*tx)
 828                .await?
 829                .ok_or_else(|| {
 830                    anyhow!("signup with email address {} doesn't exist", email_address)
 831                })?;
 832
 833            Ok(signup)
 834        })
 835        .await
 836    }
 837
 838    pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
 839        self.transaction(|tx| async move {
 840            let query = "
 841                SELECT
 842                    COUNT(*) as count,
 843                    COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
 844                    COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
 845                    COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
 846                    COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
 847                FROM (
 848                    SELECT *
 849                    FROM signups
 850                    WHERE
 851                        NOT email_confirmation_sent
 852                ) AS unsent
 853            ";
 854            Ok(
 855                WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
 856                    self.pool.get_database_backend(),
 857                    query.into(),
 858                    vec![],
 859                ))
 860                .one(&*tx)
 861                .await?
 862                .ok_or_else(|| anyhow!("invalid result"))?,
 863            )
 864        })
 865        .await
 866    }
 867
 868    pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
 869        let emails = invites
 870            .iter()
 871            .map(|s| s.email_address.as_str())
 872            .collect::<Vec<_>>();
 873        self.transaction(|tx| async {
 874            let tx = tx;
 875            signup::Entity::update_many()
 876                .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
 877                .set(signup::ActiveModel {
 878                    email_confirmation_sent: ActiveValue::set(true),
 879                    ..Default::default()
 880                })
 881                .exec(&*tx)
 882                .await?;
 883            Ok(())
 884        })
 885        .await
 886    }
 887
 888    pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
 889        self.transaction(|tx| async move {
 890            Ok(signup::Entity::find()
 891                .select_only()
 892                .column(signup::Column::EmailAddress)
 893                .column(signup::Column::EmailConfirmationCode)
 894                .filter(
 895                    signup::Column::EmailConfirmationSent.eq(false).and(
 896                        signup::Column::PlatformMac
 897                            .eq(true)
 898                            .or(signup::Column::PlatformUnknown.eq(true)),
 899                    ),
 900                )
 901                .order_by_asc(signup::Column::CreatedAt)
 902                .limit(count as u64)
 903                .into_model()
 904                .all(&*tx)
 905                .await?)
 906        })
 907        .await
 908    }
 909
 910    // invite codes
 911
 912    pub async fn create_invite_from_code(
 913        &self,
 914        code: &str,
 915        email_address: &str,
 916        device_id: Option<&str>,
 917        added_to_mailing_list: bool,
 918    ) -> Result<Invite> {
 919        self.transaction(|tx| async move {
 920            let existing_user = user::Entity::find()
 921                .filter(user::Column::EmailAddress.eq(email_address))
 922                .one(&*tx)
 923                .await?;
 924
 925            if existing_user.is_some() {
 926                Err(anyhow!("email address is already in use"))?;
 927            }
 928
 929            let inviting_user_with_invites = match user::Entity::find()
 930                .filter(
 931                    user::Column::InviteCode
 932                        .eq(code)
 933                        .and(user::Column::InviteCount.gt(0)),
 934                )
 935                .one(&*tx)
 936                .await?
 937            {
 938                Some(inviting_user) => inviting_user,
 939                None => {
 940                    return Err(Error::Http(
 941                        StatusCode::UNAUTHORIZED,
 942                        "unable to find an invite code with invites remaining".to_string(),
 943                    ))?
 944                }
 945            };
 946            user::Entity::update_many()
 947                .filter(
 948                    user::Column::Id
 949                        .eq(inviting_user_with_invites.id)
 950                        .and(user::Column::InviteCount.gt(0)),
 951                )
 952                .col_expr(
 953                    user::Column::InviteCount,
 954                    Expr::col(user::Column::InviteCount).sub(1),
 955                )
 956                .exec(&*tx)
 957                .await?;
 958
 959            let signup = signup::Entity::insert(signup::ActiveModel {
 960                email_address: ActiveValue::set(email_address.into()),
 961                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 962                email_confirmation_sent: ActiveValue::set(false),
 963                inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
 964                platform_linux: ActiveValue::set(false),
 965                platform_mac: ActiveValue::set(false),
 966                platform_windows: ActiveValue::set(false),
 967                platform_unknown: ActiveValue::set(true),
 968                device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
 969                added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
 970                ..Default::default()
 971            })
 972            .on_conflict(
 973                OnConflict::column(signup::Column::EmailAddress)
 974                    .update_column(signup::Column::InvitingUserId)
 975                    .to_owned(),
 976            )
 977            .exec_with_returning(&*tx)
 978            .await?;
 979
 980            Ok(Invite {
 981                email_address: signup.email_address,
 982                email_confirmation_code: signup.email_confirmation_code,
 983            })
 984        })
 985        .await
 986    }
 987
 988    pub async fn create_user_from_invite(
 989        &self,
 990        invite: &Invite,
 991        user: NewUserParams,
 992    ) -> Result<Option<NewUserResult>> {
 993        self.transaction(|tx| async {
 994            let tx = tx;
 995            let signup = signup::Entity::find()
 996                .filter(
 997                    signup::Column::EmailAddress
 998                        .eq(invite.email_address.as_str())
 999                        .and(
1000                            signup::Column::EmailConfirmationCode
1001                                .eq(invite.email_confirmation_code.as_str()),
1002                        ),
1003                )
1004                .one(&*tx)
1005                .await?
1006                .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
1007
1008            if signup.user_id.is_some() {
1009                return Ok(None);
1010            }
1011
1012            let user = user::Entity::insert(user::ActiveModel {
1013                email_address: ActiveValue::set(Some(invite.email_address.clone())),
1014                github_login: ActiveValue::set(user.github_login.clone()),
1015                github_user_id: ActiveValue::set(Some(user.github_user_id)),
1016                admin: ActiveValue::set(false),
1017                invite_count: ActiveValue::set(user.invite_count),
1018                invite_code: ActiveValue::set(Some(random_invite_code())),
1019                metrics_id: ActiveValue::set(Uuid::new_v4()),
1020                ..Default::default()
1021            })
1022            .on_conflict(
1023                OnConflict::column(user::Column::GithubLogin)
1024                    .update_columns([
1025                        user::Column::EmailAddress,
1026                        user::Column::GithubUserId,
1027                        user::Column::Admin,
1028                    ])
1029                    .to_owned(),
1030            )
1031            .exec_with_returning(&*tx)
1032            .await?;
1033
1034            let mut signup = signup.into_active_model();
1035            signup.user_id = ActiveValue::set(Some(user.id));
1036            let signup = signup.update(&*tx).await?;
1037
1038            if let Some(inviting_user_id) = signup.inviting_user_id {
1039                let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1040                    (inviting_user_id, user.id, true)
1041                } else {
1042                    (user.id, inviting_user_id, false)
1043                };
1044
1045                contact::Entity::insert(contact::ActiveModel {
1046                    user_id_a: ActiveValue::set(user_id_a),
1047                    user_id_b: ActiveValue::set(user_id_b),
1048                    a_to_b: ActiveValue::set(a_to_b),
1049                    should_notify: ActiveValue::set(true),
1050                    accepted: ActiveValue::set(true),
1051                    ..Default::default()
1052                })
1053                .on_conflict(OnConflict::new().do_nothing().to_owned())
1054                .exec_without_returning(&*tx)
1055                .await?;
1056            }
1057
1058            Ok(Some(NewUserResult {
1059                user_id: user.id,
1060                metrics_id: user.metrics_id.to_string(),
1061                inviting_user_id: signup.inviting_user_id,
1062                signup_device_id: signup.device_id,
1063            }))
1064        })
1065        .await
1066    }
1067
1068    pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1069        self.transaction(|tx| async move {
1070            if count > 0 {
1071                user::Entity::update_many()
1072                    .filter(
1073                        user::Column::Id
1074                            .eq(id)
1075                            .and(user::Column::InviteCode.is_null()),
1076                    )
1077                    .set(user::ActiveModel {
1078                        invite_code: ActiveValue::set(Some(random_invite_code())),
1079                        ..Default::default()
1080                    })
1081                    .exec(&*tx)
1082                    .await?;
1083            }
1084
1085            user::Entity::update_many()
1086                .filter(user::Column::Id.eq(id))
1087                .set(user::ActiveModel {
1088                    invite_count: ActiveValue::set(count),
1089                    ..Default::default()
1090                })
1091                .exec(&*tx)
1092                .await?;
1093            Ok(())
1094        })
1095        .await
1096    }
1097
1098    pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1099        self.transaction(|tx| async move {
1100            match user::Entity::find_by_id(id).one(&*tx).await? {
1101                Some(user) if user.invite_code.is_some() => {
1102                    Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1103                }
1104                _ => Ok(None),
1105            }
1106        })
1107        .await
1108    }
1109
1110    pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1111        self.transaction(|tx| async move {
1112            user::Entity::find()
1113                .filter(user::Column::InviteCode.eq(code))
1114                .one(&*tx)
1115                .await?
1116                .ok_or_else(|| {
1117                    Error::Http(
1118                        StatusCode::NOT_FOUND,
1119                        "that invite code does not exist".to_string(),
1120                    )
1121                })
1122        })
1123        .await
1124    }
1125
1126    // rooms
1127
1128    pub async fn incoming_call_for_user(
1129        &self,
1130        user_id: UserId,
1131    ) -> Result<Option<proto::IncomingCall>> {
1132        self.transaction(|tx| async move {
1133            let pending_participant = room_participant::Entity::find()
1134                .filter(
1135                    room_participant::Column::UserId
1136                        .eq(user_id)
1137                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1138                )
1139                .one(&*tx)
1140                .await?;
1141
1142            if let Some(pending_participant) = pending_participant {
1143                let room = self.get_room(pending_participant.room_id, &tx).await?;
1144                Ok(Self::build_incoming_call(&room, user_id))
1145            } else {
1146                Ok(None)
1147            }
1148        })
1149        .await
1150    }
1151
1152    pub async fn create_room(
1153        &self,
1154        user_id: UserId,
1155        connection: ConnectionId,
1156        live_kit_room: &str,
1157    ) -> Result<proto::Room> {
1158        self.transaction(|tx| async move {
1159            let room = room::ActiveModel {
1160                live_kit_room: ActiveValue::set(live_kit_room.into()),
1161                ..Default::default()
1162            }
1163            .insert(&*tx)
1164            .await?;
1165            room_participant::ActiveModel {
1166                room_id: ActiveValue::set(room.id),
1167                user_id: ActiveValue::set(user_id),
1168                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1169                answering_connection_server_id: ActiveValue::set(Some(ServerId(
1170                    connection.owner_id as i32,
1171                ))),
1172                answering_connection_lost: ActiveValue::set(false),
1173                calling_user_id: ActiveValue::set(user_id),
1174                calling_connection_id: ActiveValue::set(connection.id as i32),
1175                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1176                    connection.owner_id as i32,
1177                ))),
1178                ..Default::default()
1179            }
1180            .insert(&*tx)
1181            .await?;
1182
1183            let room = self.get_room(room.id, &tx).await?;
1184            Ok(room)
1185        })
1186        .await
1187    }
1188
1189    pub async fn call(
1190        &self,
1191        room_id: RoomId,
1192        calling_user_id: UserId,
1193        calling_connection: ConnectionId,
1194        called_user_id: UserId,
1195        initial_project_id: Option<ProjectId>,
1196    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1197        self.room_transaction(room_id, |tx| async move {
1198            room_participant::ActiveModel {
1199                room_id: ActiveValue::set(room_id),
1200                user_id: ActiveValue::set(called_user_id),
1201                answering_connection_lost: ActiveValue::set(false),
1202                calling_user_id: ActiveValue::set(calling_user_id),
1203                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1204                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1205                    calling_connection.owner_id as i32,
1206                ))),
1207                initial_project_id: ActiveValue::set(initial_project_id),
1208                ..Default::default()
1209            }
1210            .insert(&*tx)
1211            .await?;
1212
1213            let room = self.get_room(room_id, &tx).await?;
1214            let incoming_call = Self::build_incoming_call(&room, called_user_id)
1215                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1216            Ok((room, incoming_call))
1217        })
1218        .await
1219    }
1220
1221    pub async fn call_failed(
1222        &self,
1223        room_id: RoomId,
1224        called_user_id: UserId,
1225    ) -> Result<RoomGuard<proto::Room>> {
1226        self.room_transaction(room_id, |tx| async move {
1227            room_participant::Entity::delete_many()
1228                .filter(
1229                    room_participant::Column::RoomId
1230                        .eq(room_id)
1231                        .and(room_participant::Column::UserId.eq(called_user_id)),
1232                )
1233                .exec(&*tx)
1234                .await?;
1235            let room = self.get_room(room_id, &tx).await?;
1236            Ok(room)
1237        })
1238        .await
1239    }
1240
1241    pub async fn decline_call(
1242        &self,
1243        expected_room_id: Option<RoomId>,
1244        user_id: UserId,
1245    ) -> Result<Option<RoomGuard<proto::Room>>> {
1246        self.optional_room_transaction(|tx| async move {
1247            let mut filter = Condition::all()
1248                .add(room_participant::Column::UserId.eq(user_id))
1249                .add(room_participant::Column::AnsweringConnectionId.is_null());
1250            if let Some(room_id) = expected_room_id {
1251                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1252            }
1253            let participant = room_participant::Entity::find()
1254                .filter(filter)
1255                .one(&*tx)
1256                .await?;
1257
1258            let participant = if let Some(participant) = participant {
1259                participant
1260            } else if expected_room_id.is_some() {
1261                return Err(anyhow!("could not find call to decline"))?;
1262            } else {
1263                return Ok(None);
1264            };
1265
1266            let room_id = participant.room_id;
1267            room_participant::Entity::delete(participant.into_active_model())
1268                .exec(&*tx)
1269                .await?;
1270
1271            let room = self.get_room(room_id, &tx).await?;
1272            Ok(Some((room_id, room)))
1273        })
1274        .await
1275    }
1276
1277    pub async fn cancel_call(
1278        &self,
1279        room_id: RoomId,
1280        calling_connection: ConnectionId,
1281        called_user_id: UserId,
1282    ) -> Result<RoomGuard<proto::Room>> {
1283        self.room_transaction(room_id, |tx| async move {
1284            let participant = room_participant::Entity::find()
1285                .filter(
1286                    Condition::all()
1287                        .add(room_participant::Column::UserId.eq(called_user_id))
1288                        .add(room_participant::Column::RoomId.eq(room_id))
1289                        .add(
1290                            room_participant::Column::CallingConnectionId
1291                                .eq(calling_connection.id as i32),
1292                        )
1293                        .add(
1294                            room_participant::Column::CallingConnectionServerId
1295                                .eq(calling_connection.owner_id as i32),
1296                        )
1297                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1298                )
1299                .one(&*tx)
1300                .await?
1301                .ok_or_else(|| anyhow!("no call to cancel"))?;
1302
1303            room_participant::Entity::delete(participant.into_active_model())
1304                .exec(&*tx)
1305                .await?;
1306
1307            let room = self.get_room(room_id, &tx).await?;
1308            Ok(room)
1309        })
1310        .await
1311    }
1312
1313    pub async fn join_room(
1314        &self,
1315        room_id: RoomId,
1316        user_id: UserId,
1317        connection: ConnectionId,
1318    ) -> Result<RoomGuard<proto::Room>> {
1319        self.room_transaction(room_id, |tx| async move {
1320            let result = room_participant::Entity::update_many()
1321                .filter(
1322                    Condition::all()
1323                        .add(room_participant::Column::RoomId.eq(room_id))
1324                        .add(room_participant::Column::UserId.eq(user_id))
1325                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1326                )
1327                .set(room_participant::ActiveModel {
1328                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1329                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1330                        connection.owner_id as i32,
1331                    ))),
1332                    answering_connection_lost: ActiveValue::set(false),
1333                    ..Default::default()
1334                })
1335                .exec(&*tx)
1336                .await?;
1337            if result.rows_affected == 0 {
1338                Err(anyhow!("room does not exist or was already joined"))?
1339            } else {
1340                let room = self.get_room(room_id, &tx).await?;
1341                Ok(room)
1342            }
1343        })
1344        .await
1345    }
1346
1347    pub async fn rejoin_room(
1348        &self,
1349        rejoin_room: proto::RejoinRoom,
1350        user_id: UserId,
1351        connection: ConnectionId,
1352    ) -> Result<RoomGuard<RejoinedRoom>> {
1353        let room_id = RoomId::from_proto(rejoin_room.id);
1354        self.room_transaction(room_id, |tx| async {
1355            let tx = tx;
1356            let participant_update = room_participant::Entity::update_many()
1357                .filter(
1358                    Condition::all()
1359                        .add(room_participant::Column::RoomId.eq(room_id))
1360                        .add(room_participant::Column::UserId.eq(user_id))
1361                        .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1362                        .add(
1363                            Condition::any()
1364                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1365                                .add(
1366                                    room_participant::Column::AnsweringConnectionServerId
1367                                        .ne(connection.owner_id as i32),
1368                                ),
1369                        ),
1370                )
1371                .set(room_participant::ActiveModel {
1372                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1373                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1374                        connection.owner_id as i32,
1375                    ))),
1376                    answering_connection_lost: ActiveValue::set(false),
1377                    ..Default::default()
1378                })
1379                .exec(&*tx)
1380                .await?;
1381            if participant_update.rows_affected == 0 {
1382                return Err(anyhow!("room does not exist or was already joined"))?;
1383            }
1384
1385            let mut reshared_projects = Vec::new();
1386            for reshared_project in &rejoin_room.reshared_projects {
1387                let project_id = ProjectId::from_proto(reshared_project.project_id);
1388                let project = project::Entity::find_by_id(project_id)
1389                    .one(&*tx)
1390                    .await?
1391                    .ok_or_else(|| anyhow!("project does not exist"))?;
1392                if project.host_user_id != user_id {
1393                    return Err(anyhow!("no such project"))?;
1394                }
1395
1396                let mut collaborators = project
1397                    .find_related(project_collaborator::Entity)
1398                    .all(&*tx)
1399                    .await?;
1400                let host_ix = collaborators
1401                    .iter()
1402                    .position(|collaborator| {
1403                        collaborator.user_id == user_id && collaborator.is_host
1404                    })
1405                    .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1406                let host = collaborators.swap_remove(host_ix);
1407                let old_connection_id = host.connection();
1408
1409                project::Entity::update(project::ActiveModel {
1410                    host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1411                    host_connection_server_id: ActiveValue::set(Some(ServerId(
1412                        connection.owner_id as i32,
1413                    ))),
1414                    ..project.into_active_model()
1415                })
1416                .exec(&*tx)
1417                .await?;
1418                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1419                    connection_id: ActiveValue::set(connection.id as i32),
1420                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1421                    ..host.into_active_model()
1422                })
1423                .exec(&*tx)
1424                .await?;
1425
1426                self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1427                    .await?;
1428
1429                reshared_projects.push(ResharedProject {
1430                    id: project_id,
1431                    old_connection_id,
1432                    collaborators: collaborators
1433                        .iter()
1434                        .map(|collaborator| ProjectCollaborator {
1435                            connection_id: collaborator.connection(),
1436                            user_id: collaborator.user_id,
1437                            replica_id: collaborator.replica_id,
1438                            is_host: collaborator.is_host,
1439                        })
1440                        .collect(),
1441                    worktrees: reshared_project.worktrees.clone(),
1442                });
1443            }
1444
1445            project::Entity::delete_many()
1446                .filter(
1447                    Condition::all()
1448                        .add(project::Column::RoomId.eq(room_id))
1449                        .add(project::Column::HostUserId.eq(user_id))
1450                        .add(
1451                            project::Column::Id
1452                                .is_not_in(reshared_projects.iter().map(|project| project.id)),
1453                        ),
1454                )
1455                .exec(&*tx)
1456                .await?;
1457
1458            let mut rejoined_projects = Vec::new();
1459            for rejoined_project in &rejoin_room.rejoined_projects {
1460                let project_id = ProjectId::from_proto(rejoined_project.id);
1461                let Some(project) = project::Entity::find_by_id(project_id)
1462                    .one(&*tx)
1463                    .await? else { continue };
1464
1465                let mut worktrees = Vec::new();
1466                let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1467                for db_worktree in db_worktrees {
1468                    let mut worktree = RejoinedWorktree {
1469                        id: db_worktree.id as u64,
1470                        abs_path: db_worktree.abs_path,
1471                        root_name: db_worktree.root_name,
1472                        visible: db_worktree.visible,
1473                        updated_entries: Default::default(),
1474                        removed_entries: Default::default(),
1475                        diagnostic_summaries: Default::default(),
1476                        scan_id: db_worktree.scan_id as u64,
1477                        completed_scan_id: db_worktree.completed_scan_id as u64,
1478                    };
1479
1480                    let rejoined_worktree = rejoined_project
1481                        .worktrees
1482                        .iter()
1483                        .find(|worktree| worktree.id == db_worktree.id as u64);
1484                    let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1485                        worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1486                    } else {
1487                        worktree_entry::Column::IsDeleted.eq(false)
1488                    };
1489
1490                    let mut db_entries = worktree_entry::Entity::find()
1491                        .filter(
1492                            Condition::all()
1493                                .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1494                                .add(entry_filter),
1495                        )
1496                        .stream(&*tx)
1497                        .await?;
1498
1499                    while let Some(db_entry) = db_entries.next().await {
1500                        let db_entry = db_entry?;
1501                        if db_entry.is_deleted {
1502                            worktree.removed_entries.push(db_entry.id as u64);
1503                        } else {
1504                            worktree.updated_entries.push(proto::Entry {
1505                                id: db_entry.id as u64,
1506                                is_dir: db_entry.is_dir,
1507                                path: db_entry.path,
1508                                inode: db_entry.inode as u64,
1509                                mtime: Some(proto::Timestamp {
1510                                    seconds: db_entry.mtime_seconds as u64,
1511                                    nanos: db_entry.mtime_nanos as u32,
1512                                }),
1513                                is_symlink: db_entry.is_symlink,
1514                                is_ignored: db_entry.is_ignored,
1515                            });
1516                        }
1517                    }
1518
1519                    worktrees.push(worktree);
1520                }
1521
1522                let language_servers = project
1523                    .find_related(language_server::Entity)
1524                    .all(&*tx)
1525                    .await?
1526                    .into_iter()
1527                    .map(|language_server| proto::LanguageServer {
1528                        id: language_server.id as u64,
1529                        name: language_server.name,
1530                    })
1531                    .collect::<Vec<_>>();
1532
1533                let mut collaborators = project
1534                    .find_related(project_collaborator::Entity)
1535                    .all(&*tx)
1536                    .await?;
1537                let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1538                    .iter()
1539                    .position(|collaborator| collaborator.user_id == user_id)
1540                {
1541                    collaborators.swap_remove(self_collaborator_ix)
1542                } else {
1543                    continue;
1544                };
1545                let old_connection_id = self_collaborator.connection();
1546                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1547                    connection_id: ActiveValue::set(connection.id as i32),
1548                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1549                    ..self_collaborator.into_active_model()
1550                })
1551                .exec(&*tx)
1552                .await?;
1553
1554                let collaborators = collaborators
1555                    .into_iter()
1556                    .map(|collaborator| ProjectCollaborator {
1557                        connection_id: collaborator.connection(),
1558                        user_id: collaborator.user_id,
1559                        replica_id: collaborator.replica_id,
1560                        is_host: collaborator.is_host,
1561                    })
1562                    .collect::<Vec<_>>();
1563
1564                rejoined_projects.push(RejoinedProject {
1565                    id: project_id,
1566                    old_connection_id,
1567                    collaborators,
1568                    worktrees,
1569                    language_servers,
1570                });
1571            }
1572
1573            let room = self.get_room(room_id, &tx).await?;
1574            Ok(RejoinedRoom {
1575                room,
1576                rejoined_projects,
1577                reshared_projects,
1578            })
1579        })
1580        .await
1581    }
1582
1583    pub async fn leave_room(
1584        &self,
1585        connection: ConnectionId,
1586    ) -> Result<Option<RoomGuard<LeftRoom>>> {
1587        self.optional_room_transaction(|tx| async move {
1588            let leaving_participant = room_participant::Entity::find()
1589                .filter(
1590                    Condition::all()
1591                        .add(
1592                            room_participant::Column::AnsweringConnectionId
1593                                .eq(connection.id as i32),
1594                        )
1595                        .add(
1596                            room_participant::Column::AnsweringConnectionServerId
1597                                .eq(connection.owner_id as i32),
1598                        ),
1599                )
1600                .one(&*tx)
1601                .await?;
1602
1603            if let Some(leaving_participant) = leaving_participant {
1604                // Leave room.
1605                let room_id = leaving_participant.room_id;
1606                room_participant::Entity::delete_by_id(leaving_participant.id)
1607                    .exec(&*tx)
1608                    .await?;
1609
1610                // Cancel pending calls initiated by the leaving user.
1611                let called_participants = room_participant::Entity::find()
1612                    .filter(
1613                        Condition::all()
1614                            .add(
1615                                room_participant::Column::CallingUserId
1616                                    .eq(leaving_participant.user_id),
1617                            )
1618                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
1619                    )
1620                    .all(&*tx)
1621                    .await?;
1622                room_participant::Entity::delete_many()
1623                    .filter(
1624                        room_participant::Column::Id
1625                            .is_in(called_participants.iter().map(|participant| participant.id)),
1626                    )
1627                    .exec(&*tx)
1628                    .await?;
1629                let canceled_calls_to_user_ids = called_participants
1630                    .into_iter()
1631                    .map(|participant| participant.user_id)
1632                    .collect();
1633
1634                // Detect left projects.
1635                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1636                enum QueryProjectIds {
1637                    ProjectId,
1638                }
1639                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1640                    .select_only()
1641                    .column_as(
1642                        project_collaborator::Column::ProjectId,
1643                        QueryProjectIds::ProjectId,
1644                    )
1645                    .filter(
1646                        Condition::all()
1647                            .add(
1648                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1649                            )
1650                            .add(
1651                                project_collaborator::Column::ConnectionServerId
1652                                    .eq(connection.owner_id as i32),
1653                            ),
1654                    )
1655                    .into_values::<_, QueryProjectIds>()
1656                    .all(&*tx)
1657                    .await?;
1658                let mut left_projects = HashMap::default();
1659                let mut collaborators = project_collaborator::Entity::find()
1660                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1661                    .stream(&*tx)
1662                    .await?;
1663                while let Some(collaborator) = collaborators.next().await {
1664                    let collaborator = collaborator?;
1665                    let left_project =
1666                        left_projects
1667                            .entry(collaborator.project_id)
1668                            .or_insert(LeftProject {
1669                                id: collaborator.project_id,
1670                                host_user_id: Default::default(),
1671                                connection_ids: Default::default(),
1672                                host_connection_id: Default::default(),
1673                            });
1674
1675                    let collaborator_connection_id = collaborator.connection();
1676                    if collaborator_connection_id != connection {
1677                        left_project.connection_ids.push(collaborator_connection_id);
1678                    }
1679
1680                    if collaborator.is_host {
1681                        left_project.host_user_id = collaborator.user_id;
1682                        left_project.host_connection_id = collaborator_connection_id;
1683                    }
1684                }
1685                drop(collaborators);
1686
1687                // Leave projects.
1688                project_collaborator::Entity::delete_many()
1689                    .filter(
1690                        Condition::all()
1691                            .add(
1692                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1693                            )
1694                            .add(
1695                                project_collaborator::Column::ConnectionServerId
1696                                    .eq(connection.owner_id as i32),
1697                            ),
1698                    )
1699                    .exec(&*tx)
1700                    .await?;
1701
1702                // Unshare projects.
1703                project::Entity::delete_many()
1704                    .filter(
1705                        Condition::all()
1706                            .add(project::Column::RoomId.eq(room_id))
1707                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
1708                            .add(
1709                                project::Column::HostConnectionServerId
1710                                    .eq(connection.owner_id as i32),
1711                            ),
1712                    )
1713                    .exec(&*tx)
1714                    .await?;
1715
1716                let room = self.get_room(room_id, &tx).await?;
1717                if room.participants.is_empty() {
1718                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
1719                }
1720
1721                let left_room = LeftRoom {
1722                    room,
1723                    left_projects,
1724                    canceled_calls_to_user_ids,
1725                };
1726
1727                if left_room.room.participants.is_empty() {
1728                    self.rooms.remove(&room_id);
1729                }
1730
1731                Ok(Some((room_id, left_room)))
1732            } else {
1733                Ok(None)
1734            }
1735        })
1736        .await
1737    }
1738
1739    pub async fn follow(
1740        &self,
1741        project_id: ProjectId,
1742        leader_connection: ConnectionId,
1743        follower_connection: ConnectionId,
1744    ) -> Result<RoomGuard<proto::Room>> {
1745        let room_id = self.room_id_for_project(project_id).await?;
1746        self.room_transaction(room_id, |tx| async move {
1747            follower::ActiveModel {
1748                room_id: ActiveValue::set(room_id),
1749                project_id: ActiveValue::set(project_id),
1750                leader_connection_server_id: ActiveValue::set(ServerId(
1751                    leader_connection.owner_id as i32,
1752                )),
1753                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1754                follower_connection_server_id: ActiveValue::set(ServerId(
1755                    follower_connection.owner_id as i32,
1756                )),
1757                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1758                ..Default::default()
1759            }
1760            .insert(&*tx)
1761            .await?;
1762
1763            let room = self.get_room(room_id, &*tx).await?;
1764            Ok(room)
1765        })
1766        .await
1767    }
1768
1769    pub async fn unfollow(
1770        &self,
1771        project_id: ProjectId,
1772        leader_connection: ConnectionId,
1773        follower_connection: ConnectionId,
1774    ) -> Result<RoomGuard<proto::Room>> {
1775        let room_id = self.room_id_for_project(project_id).await?;
1776        self.room_transaction(room_id, |tx| async move {
1777            follower::Entity::delete_many()
1778                .filter(
1779                    Condition::all()
1780                        .add(follower::Column::ProjectId.eq(project_id))
1781                        .add(
1782                            follower::Column::LeaderConnectionServerId
1783                                .eq(leader_connection.owner_id),
1784                        )
1785                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1786                        .add(
1787                            follower::Column::FollowerConnectionServerId
1788                                .eq(follower_connection.owner_id),
1789                        )
1790                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1791                )
1792                .exec(&*tx)
1793                .await?;
1794
1795            let room = self.get_room(room_id, &*tx).await?;
1796            Ok(room)
1797        })
1798        .await
1799    }
1800
1801    pub async fn update_room_participant_location(
1802        &self,
1803        room_id: RoomId,
1804        connection: ConnectionId,
1805        location: proto::ParticipantLocation,
1806    ) -> Result<RoomGuard<proto::Room>> {
1807        self.room_transaction(room_id, |tx| async {
1808            let tx = tx;
1809            let location_kind;
1810            let location_project_id;
1811            match location
1812                .variant
1813                .as_ref()
1814                .ok_or_else(|| anyhow!("invalid location"))?
1815            {
1816                proto::participant_location::Variant::SharedProject(project) => {
1817                    location_kind = 0;
1818                    location_project_id = Some(ProjectId::from_proto(project.id));
1819                }
1820                proto::participant_location::Variant::UnsharedProject(_) => {
1821                    location_kind = 1;
1822                    location_project_id = None;
1823                }
1824                proto::participant_location::Variant::External(_) => {
1825                    location_kind = 2;
1826                    location_project_id = None;
1827                }
1828            }
1829
1830            let result = room_participant::Entity::update_many()
1831                .filter(
1832                    Condition::all()
1833                        .add(room_participant::Column::RoomId.eq(room_id))
1834                        .add(
1835                            room_participant::Column::AnsweringConnectionId
1836                                .eq(connection.id as i32),
1837                        )
1838                        .add(
1839                            room_participant::Column::AnsweringConnectionServerId
1840                                .eq(connection.owner_id as i32),
1841                        ),
1842                )
1843                .set(room_participant::ActiveModel {
1844                    location_kind: ActiveValue::set(Some(location_kind)),
1845                    location_project_id: ActiveValue::set(location_project_id),
1846                    ..Default::default()
1847                })
1848                .exec(&*tx)
1849                .await?;
1850
1851            if result.rows_affected == 1 {
1852                let room = self.get_room(room_id, &tx).await?;
1853                Ok(room)
1854            } else {
1855                Err(anyhow!("could not update room participant location"))?
1856            }
1857        })
1858        .await
1859    }
1860
1861    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1862        self.transaction(|tx| async move {
1863            let participant = room_participant::Entity::find()
1864                .filter(
1865                    Condition::all()
1866                        .add(
1867                            room_participant::Column::AnsweringConnectionId
1868                                .eq(connection.id as i32),
1869                        )
1870                        .add(
1871                            room_participant::Column::AnsweringConnectionServerId
1872                                .eq(connection.owner_id as i32),
1873                        ),
1874                )
1875                .one(&*tx)
1876                .await?
1877                .ok_or_else(|| anyhow!("not a participant in any room"))?;
1878
1879            room_participant::Entity::update(room_participant::ActiveModel {
1880                answering_connection_lost: ActiveValue::set(true),
1881                ..participant.into_active_model()
1882            })
1883            .exec(&*tx)
1884            .await?;
1885
1886            Ok(())
1887        })
1888        .await
1889    }
1890
1891    fn build_incoming_call(
1892        room: &proto::Room,
1893        called_user_id: UserId,
1894    ) -> Option<proto::IncomingCall> {
1895        let pending_participant = room
1896            .pending_participants
1897            .iter()
1898            .find(|participant| participant.user_id == called_user_id.to_proto())?;
1899
1900        Some(proto::IncomingCall {
1901            room_id: room.id,
1902            calling_user_id: pending_participant.calling_user_id,
1903            participant_user_ids: room
1904                .participants
1905                .iter()
1906                .map(|participant| participant.user_id)
1907                .collect(),
1908            initial_project: room.participants.iter().find_map(|participant| {
1909                let initial_project_id = pending_participant.initial_project_id?;
1910                participant
1911                    .projects
1912                    .iter()
1913                    .find(|project| project.id == initial_project_id)
1914                    .cloned()
1915            }),
1916        })
1917    }
1918
1919    async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1920        let db_room = room::Entity::find_by_id(room_id)
1921            .one(tx)
1922            .await?
1923            .ok_or_else(|| anyhow!("could not find room"))?;
1924
1925        let mut db_participants = db_room
1926            .find_related(room_participant::Entity)
1927            .stream(tx)
1928            .await?;
1929        let mut participants = HashMap::default();
1930        let mut pending_participants = Vec::new();
1931        while let Some(db_participant) = db_participants.next().await {
1932            let db_participant = db_participant?;
1933            if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
1934                .answering_connection_id
1935                .zip(db_participant.answering_connection_server_id)
1936            {
1937                let location = match (
1938                    db_participant.location_kind,
1939                    db_participant.location_project_id,
1940                ) {
1941                    (Some(0), Some(project_id)) => {
1942                        Some(proto::participant_location::Variant::SharedProject(
1943                            proto::participant_location::SharedProject {
1944                                id: project_id.to_proto(),
1945                            },
1946                        ))
1947                    }
1948                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1949                        Default::default(),
1950                    )),
1951                    _ => Some(proto::participant_location::Variant::External(
1952                        Default::default(),
1953                    )),
1954                };
1955
1956                let answering_connection = ConnectionId {
1957                    owner_id: answering_connection_server_id.0 as u32,
1958                    id: answering_connection_id as u32,
1959                };
1960                participants.insert(
1961                    answering_connection,
1962                    proto::Participant {
1963                        user_id: db_participant.user_id.to_proto(),
1964                        peer_id: Some(answering_connection.into()),
1965                        projects: Default::default(),
1966                        location: Some(proto::ParticipantLocation { variant: location }),
1967                    },
1968                );
1969            } else {
1970                pending_participants.push(proto::PendingParticipant {
1971                    user_id: db_participant.user_id.to_proto(),
1972                    calling_user_id: db_participant.calling_user_id.to_proto(),
1973                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1974                });
1975            }
1976        }
1977        drop(db_participants);
1978
1979        let mut db_projects = db_room
1980            .find_related(project::Entity)
1981            .find_with_related(worktree::Entity)
1982            .stream(tx)
1983            .await?;
1984
1985        while let Some(row) = db_projects.next().await {
1986            let (db_project, db_worktree) = row?;
1987            let host_connection = db_project.host_connection()?;
1988            if let Some(participant) = participants.get_mut(&host_connection) {
1989                let project = if let Some(project) = participant
1990                    .projects
1991                    .iter_mut()
1992                    .find(|project| project.id == db_project.id.to_proto())
1993                {
1994                    project
1995                } else {
1996                    participant.projects.push(proto::ParticipantProject {
1997                        id: db_project.id.to_proto(),
1998                        worktree_root_names: Default::default(),
1999                    });
2000                    participant.projects.last_mut().unwrap()
2001                };
2002
2003                if let Some(db_worktree) = db_worktree {
2004                    if db_worktree.visible {
2005                        project.worktree_root_names.push(db_worktree.root_name);
2006                    }
2007                }
2008            }
2009        }
2010        drop(db_projects);
2011
2012        let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
2013        let mut followers = Vec::new();
2014        while let Some(db_follower) = db_followers.next().await {
2015            let db_follower = db_follower?;
2016            followers.push(proto::Follower {
2017                leader_id: Some(db_follower.leader_connection().into()),
2018                follower_id: Some(db_follower.follower_connection().into()),
2019                project_id: db_follower.project_id.to_proto(),
2020            });
2021        }
2022
2023        Ok(proto::Room {
2024            id: db_room.id.to_proto(),
2025            live_kit_room: db_room.live_kit_room,
2026            participants: participants.into_values().collect(),
2027            pending_participants,
2028            followers,
2029        })
2030    }
2031
2032    // projects
2033
2034    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
2035        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2036        enum QueryAs {
2037            Count,
2038        }
2039
2040        self.transaction(|tx| async move {
2041            Ok(project::Entity::find()
2042                .select_only()
2043                .column_as(project::Column::Id.count(), QueryAs::Count)
2044                .inner_join(user::Entity)
2045                .filter(user::Column::Admin.eq(false))
2046                .into_values::<_, QueryAs>()
2047                .one(&*tx)
2048                .await?
2049                .unwrap_or(0i64) as usize)
2050        })
2051        .await
2052    }
2053
2054    pub async fn share_project(
2055        &self,
2056        room_id: RoomId,
2057        connection: ConnectionId,
2058        worktrees: &[proto::WorktreeMetadata],
2059    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
2060        self.room_transaction(room_id, |tx| async move {
2061            let participant = room_participant::Entity::find()
2062                .filter(
2063                    Condition::all()
2064                        .add(
2065                            room_participant::Column::AnsweringConnectionId
2066                                .eq(connection.id as i32),
2067                        )
2068                        .add(
2069                            room_participant::Column::AnsweringConnectionServerId
2070                                .eq(connection.owner_id as i32),
2071                        ),
2072                )
2073                .one(&*tx)
2074                .await?
2075                .ok_or_else(|| anyhow!("could not find participant"))?;
2076            if participant.room_id != room_id {
2077                return Err(anyhow!("shared project on unexpected room"))?;
2078            }
2079
2080            let project = project::ActiveModel {
2081                room_id: ActiveValue::set(participant.room_id),
2082                host_user_id: ActiveValue::set(participant.user_id),
2083                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
2084                host_connection_server_id: ActiveValue::set(Some(ServerId(
2085                    connection.owner_id as i32,
2086                ))),
2087                ..Default::default()
2088            }
2089            .insert(&*tx)
2090            .await?;
2091
2092            if !worktrees.is_empty() {
2093                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2094                    worktree::ActiveModel {
2095                        id: ActiveValue::set(worktree.id as i64),
2096                        project_id: ActiveValue::set(project.id),
2097                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
2098                        root_name: ActiveValue::set(worktree.root_name.clone()),
2099                        visible: ActiveValue::set(worktree.visible),
2100                        scan_id: ActiveValue::set(0),
2101                        completed_scan_id: ActiveValue::set(0),
2102                    }
2103                }))
2104                .exec(&*tx)
2105                .await?;
2106            }
2107
2108            project_collaborator::ActiveModel {
2109                project_id: ActiveValue::set(project.id),
2110                connection_id: ActiveValue::set(connection.id as i32),
2111                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2112                user_id: ActiveValue::set(participant.user_id),
2113                replica_id: ActiveValue::set(ReplicaId(0)),
2114                is_host: ActiveValue::set(true),
2115                ..Default::default()
2116            }
2117            .insert(&*tx)
2118            .await?;
2119
2120            let room = self.get_room(room_id, &tx).await?;
2121            Ok((project.id, room))
2122        })
2123        .await
2124    }
2125
2126    pub async fn unshare_project(
2127        &self,
2128        project_id: ProjectId,
2129        connection: ConnectionId,
2130    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2131        let room_id = self.room_id_for_project(project_id).await?;
2132        self.room_transaction(room_id, |tx| async move {
2133            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2134
2135            let project = project::Entity::find_by_id(project_id)
2136                .one(&*tx)
2137                .await?
2138                .ok_or_else(|| anyhow!("project not found"))?;
2139            if project.host_connection()? == connection {
2140                project::Entity::delete(project.into_active_model())
2141                    .exec(&*tx)
2142                    .await?;
2143                let room = self.get_room(room_id, &tx).await?;
2144                Ok((room, guest_connection_ids))
2145            } else {
2146                Err(anyhow!("cannot unshare a project hosted by another user"))?
2147            }
2148        })
2149        .await
2150    }
2151
2152    pub async fn update_project(
2153        &self,
2154        project_id: ProjectId,
2155        connection: ConnectionId,
2156        worktrees: &[proto::WorktreeMetadata],
2157    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2158        let room_id = self.room_id_for_project(project_id).await?;
2159        self.room_transaction(room_id, |tx| async move {
2160            let project = project::Entity::find_by_id(project_id)
2161                .filter(
2162                    Condition::all()
2163                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2164                        .add(
2165                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2166                        ),
2167                )
2168                .one(&*tx)
2169                .await?
2170                .ok_or_else(|| anyhow!("no such project"))?;
2171
2172            self.update_project_worktrees(project.id, worktrees, &tx)
2173                .await?;
2174
2175            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2176            let room = self.get_room(project.room_id, &tx).await?;
2177            Ok((room, guest_connection_ids))
2178        })
2179        .await
2180    }
2181
2182    async fn update_project_worktrees(
2183        &self,
2184        project_id: ProjectId,
2185        worktrees: &[proto::WorktreeMetadata],
2186        tx: &DatabaseTransaction,
2187    ) -> Result<()> {
2188        if !worktrees.is_empty() {
2189            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2190                id: ActiveValue::set(worktree.id as i64),
2191                project_id: ActiveValue::set(project_id),
2192                abs_path: ActiveValue::set(worktree.abs_path.clone()),
2193                root_name: ActiveValue::set(worktree.root_name.clone()),
2194                visible: ActiveValue::set(worktree.visible),
2195                scan_id: ActiveValue::set(0),
2196                completed_scan_id: ActiveValue::set(0),
2197            }))
2198            .on_conflict(
2199                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2200                    .update_column(worktree::Column::RootName)
2201                    .to_owned(),
2202            )
2203            .exec(&*tx)
2204            .await?;
2205        }
2206
2207        worktree::Entity::delete_many()
2208            .filter(worktree::Column::ProjectId.eq(project_id).and(
2209                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2210            ))
2211            .exec(&*tx)
2212            .await?;
2213
2214        Ok(())
2215    }
2216
2217    pub async fn update_worktree(
2218        &self,
2219        update: &proto::UpdateWorktree,
2220        connection: ConnectionId,
2221    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2222        let project_id = ProjectId::from_proto(update.project_id);
2223        let worktree_id = update.worktree_id as i64;
2224        let room_id = self.room_id_for_project(project_id).await?;
2225        self.room_transaction(room_id, |tx| async move {
2226            // Ensure the update comes from the host.
2227            let _project = project::Entity::find_by_id(project_id)
2228                .filter(
2229                    Condition::all()
2230                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2231                        .add(
2232                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2233                        ),
2234                )
2235                .one(&*tx)
2236                .await?
2237                .ok_or_else(|| anyhow!("no such project"))?;
2238
2239            // Update metadata.
2240            worktree::Entity::update(worktree::ActiveModel {
2241                id: ActiveValue::set(worktree_id),
2242                project_id: ActiveValue::set(project_id),
2243                root_name: ActiveValue::set(update.root_name.clone()),
2244                scan_id: ActiveValue::set(update.scan_id as i64),
2245                completed_scan_id: if update.is_last_update {
2246                    ActiveValue::set(update.scan_id as i64)
2247                } else {
2248                    ActiveValue::default()
2249                },
2250                abs_path: ActiveValue::set(update.abs_path.clone()),
2251                ..Default::default()
2252            })
2253            .exec(&*tx)
2254            .await?;
2255
2256            if !update.updated_entries.is_empty() {
2257                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2258                    let mtime = entry.mtime.clone().unwrap_or_default();
2259                    worktree_entry::ActiveModel {
2260                        project_id: ActiveValue::set(project_id),
2261                        worktree_id: ActiveValue::set(worktree_id),
2262                        id: ActiveValue::set(entry.id as i64),
2263                        is_dir: ActiveValue::set(entry.is_dir),
2264                        path: ActiveValue::set(entry.path.clone()),
2265                        inode: ActiveValue::set(entry.inode as i64),
2266                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2267                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2268                        is_symlink: ActiveValue::set(entry.is_symlink),
2269                        is_ignored: ActiveValue::set(entry.is_ignored),
2270                        is_deleted: ActiveValue::set(false),
2271                        scan_id: ActiveValue::set(update.scan_id as i64),
2272                    }
2273                }))
2274                .on_conflict(
2275                    OnConflict::columns([
2276                        worktree_entry::Column::ProjectId,
2277                        worktree_entry::Column::WorktreeId,
2278                        worktree_entry::Column::Id,
2279                    ])
2280                    .update_columns([
2281                        worktree_entry::Column::IsDir,
2282                        worktree_entry::Column::Path,
2283                        worktree_entry::Column::Inode,
2284                        worktree_entry::Column::MtimeSeconds,
2285                        worktree_entry::Column::MtimeNanos,
2286                        worktree_entry::Column::IsSymlink,
2287                        worktree_entry::Column::IsIgnored,
2288                        worktree_entry::Column::ScanId,
2289                    ])
2290                    .to_owned(),
2291                )
2292                .exec(&*tx)
2293                .await?;
2294            }
2295
2296            if !update.removed_entries.is_empty() {
2297                worktree_entry::Entity::update_many()
2298                    .filter(
2299                        worktree_entry::Column::ProjectId
2300                            .eq(project_id)
2301                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2302                            .and(
2303                                worktree_entry::Column::Id
2304                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2305                            ),
2306                    )
2307                    .set(worktree_entry::ActiveModel {
2308                        is_deleted: ActiveValue::Set(true),
2309                        scan_id: ActiveValue::Set(update.scan_id as i64),
2310                        ..Default::default()
2311                    })
2312                    .exec(&*tx)
2313                    .await?;
2314            }
2315
2316            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2317            Ok(connection_ids)
2318        })
2319        .await
2320    }
2321
2322    pub async fn update_diagnostic_summary(
2323        &self,
2324        update: &proto::UpdateDiagnosticSummary,
2325        connection: ConnectionId,
2326    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2327        let project_id = ProjectId::from_proto(update.project_id);
2328        let worktree_id = update.worktree_id as i64;
2329        let room_id = self.room_id_for_project(project_id).await?;
2330        self.room_transaction(room_id, |tx| async move {
2331            let summary = update
2332                .summary
2333                .as_ref()
2334                .ok_or_else(|| anyhow!("invalid summary"))?;
2335
2336            // Ensure the update comes from the host.
2337            let project = project::Entity::find_by_id(project_id)
2338                .one(&*tx)
2339                .await?
2340                .ok_or_else(|| anyhow!("no such project"))?;
2341            if project.host_connection()? != connection {
2342                return Err(anyhow!("can't update a project hosted by someone else"))?;
2343            }
2344
2345            // Update summary.
2346            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2347                project_id: ActiveValue::set(project_id),
2348                worktree_id: ActiveValue::set(worktree_id),
2349                path: ActiveValue::set(summary.path.clone()),
2350                language_server_id: ActiveValue::set(summary.language_server_id as i64),
2351                error_count: ActiveValue::set(summary.error_count as i32),
2352                warning_count: ActiveValue::set(summary.warning_count as i32),
2353                ..Default::default()
2354            })
2355            .on_conflict(
2356                OnConflict::columns([
2357                    worktree_diagnostic_summary::Column::ProjectId,
2358                    worktree_diagnostic_summary::Column::WorktreeId,
2359                    worktree_diagnostic_summary::Column::Path,
2360                ])
2361                .update_columns([
2362                    worktree_diagnostic_summary::Column::LanguageServerId,
2363                    worktree_diagnostic_summary::Column::ErrorCount,
2364                    worktree_diagnostic_summary::Column::WarningCount,
2365                ])
2366                .to_owned(),
2367            )
2368            .exec(&*tx)
2369            .await?;
2370
2371            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2372            Ok(connection_ids)
2373        })
2374        .await
2375    }
2376
2377    pub async fn start_language_server(
2378        &self,
2379        update: &proto::StartLanguageServer,
2380        connection: ConnectionId,
2381    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2382        let project_id = ProjectId::from_proto(update.project_id);
2383        let room_id = self.room_id_for_project(project_id).await?;
2384        self.room_transaction(room_id, |tx| async move {
2385            let server = update
2386                .server
2387                .as_ref()
2388                .ok_or_else(|| anyhow!("invalid language server"))?;
2389
2390            // Ensure the update comes from the host.
2391            let project = project::Entity::find_by_id(project_id)
2392                .one(&*tx)
2393                .await?
2394                .ok_or_else(|| anyhow!("no such project"))?;
2395            if project.host_connection()? != connection {
2396                return Err(anyhow!("can't update a project hosted by someone else"))?;
2397            }
2398
2399            // Add the newly-started language server.
2400            language_server::Entity::insert(language_server::ActiveModel {
2401                project_id: ActiveValue::set(project_id),
2402                id: ActiveValue::set(server.id as i64),
2403                name: ActiveValue::set(server.name.clone()),
2404                ..Default::default()
2405            })
2406            .on_conflict(
2407                OnConflict::columns([
2408                    language_server::Column::ProjectId,
2409                    language_server::Column::Id,
2410                ])
2411                .update_column(language_server::Column::Name)
2412                .to_owned(),
2413            )
2414            .exec(&*tx)
2415            .await?;
2416
2417            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2418            Ok(connection_ids)
2419        })
2420        .await
2421    }
2422
2423    pub async fn join_project(
2424        &self,
2425        project_id: ProjectId,
2426        connection: ConnectionId,
2427    ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2428        let room_id = self.room_id_for_project(project_id).await?;
2429        self.room_transaction(room_id, |tx| async move {
2430            let participant = room_participant::Entity::find()
2431                .filter(
2432                    Condition::all()
2433                        .add(
2434                            room_participant::Column::AnsweringConnectionId
2435                                .eq(connection.id as i32),
2436                        )
2437                        .add(
2438                            room_participant::Column::AnsweringConnectionServerId
2439                                .eq(connection.owner_id as i32),
2440                        ),
2441                )
2442                .one(&*tx)
2443                .await?
2444                .ok_or_else(|| anyhow!("must join a room first"))?;
2445
2446            let project = project::Entity::find_by_id(project_id)
2447                .one(&*tx)
2448                .await?
2449                .ok_or_else(|| anyhow!("no such project"))?;
2450            if project.room_id != participant.room_id {
2451                return Err(anyhow!("no such project"))?;
2452            }
2453
2454            let mut collaborators = project
2455                .find_related(project_collaborator::Entity)
2456                .all(&*tx)
2457                .await?;
2458            let replica_ids = collaborators
2459                .iter()
2460                .map(|c| c.replica_id)
2461                .collect::<HashSet<_>>();
2462            let mut replica_id = ReplicaId(1);
2463            while replica_ids.contains(&replica_id) {
2464                replica_id.0 += 1;
2465            }
2466            let new_collaborator = project_collaborator::ActiveModel {
2467                project_id: ActiveValue::set(project_id),
2468                connection_id: ActiveValue::set(connection.id as i32),
2469                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2470                user_id: ActiveValue::set(participant.user_id),
2471                replica_id: ActiveValue::set(replica_id),
2472                is_host: ActiveValue::set(false),
2473                ..Default::default()
2474            }
2475            .insert(&*tx)
2476            .await?;
2477            collaborators.push(new_collaborator);
2478
2479            let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2480            let mut worktrees = db_worktrees
2481                .into_iter()
2482                .map(|db_worktree| {
2483                    (
2484                        db_worktree.id as u64,
2485                        Worktree {
2486                            id: db_worktree.id as u64,
2487                            abs_path: db_worktree.abs_path,
2488                            root_name: db_worktree.root_name,
2489                            visible: db_worktree.visible,
2490                            entries: Default::default(),
2491                            diagnostic_summaries: Default::default(),
2492                            scan_id: db_worktree.scan_id as u64,
2493                            completed_scan_id: db_worktree.completed_scan_id as u64,
2494                        },
2495                    )
2496                })
2497                .collect::<BTreeMap<_, _>>();
2498
2499            // Populate worktree entries.
2500            {
2501                let mut db_entries = worktree_entry::Entity::find()
2502                    .filter(
2503                        Condition::all()
2504                            .add(worktree_entry::Column::ProjectId.eq(project_id))
2505                            .add(worktree_entry::Column::IsDeleted.eq(false)),
2506                    )
2507                    .stream(&*tx)
2508                    .await?;
2509                while let Some(db_entry) = db_entries.next().await {
2510                    let db_entry = db_entry?;
2511                    if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2512                        worktree.entries.push(proto::Entry {
2513                            id: db_entry.id as u64,
2514                            is_dir: db_entry.is_dir,
2515                            path: db_entry.path,
2516                            inode: db_entry.inode as u64,
2517                            mtime: Some(proto::Timestamp {
2518                                seconds: db_entry.mtime_seconds as u64,
2519                                nanos: db_entry.mtime_nanos as u32,
2520                            }),
2521                            is_symlink: db_entry.is_symlink,
2522                            is_ignored: db_entry.is_ignored,
2523                        });
2524                    }
2525                }
2526            }
2527
2528            // Populate worktree diagnostic summaries.
2529            {
2530                let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2531                    .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2532                    .stream(&*tx)
2533                    .await?;
2534                while let Some(db_summary) = db_summaries.next().await {
2535                    let db_summary = db_summary?;
2536                    if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2537                        worktree
2538                            .diagnostic_summaries
2539                            .push(proto::DiagnosticSummary {
2540                                path: db_summary.path,
2541                                language_server_id: db_summary.language_server_id as u64,
2542                                error_count: db_summary.error_count as u32,
2543                                warning_count: db_summary.warning_count as u32,
2544                            });
2545                    }
2546                }
2547            }
2548
2549            // Populate language servers.
2550            let language_servers = project
2551                .find_related(language_server::Entity)
2552                .all(&*tx)
2553                .await?;
2554
2555            let project = Project {
2556                collaborators: collaborators
2557                    .into_iter()
2558                    .map(|collaborator| ProjectCollaborator {
2559                        connection_id: collaborator.connection(),
2560                        user_id: collaborator.user_id,
2561                        replica_id: collaborator.replica_id,
2562                        is_host: collaborator.is_host,
2563                    })
2564                    .collect(),
2565                worktrees,
2566                language_servers: language_servers
2567                    .into_iter()
2568                    .map(|language_server| proto::LanguageServer {
2569                        id: language_server.id as u64,
2570                        name: language_server.name,
2571                    })
2572                    .collect(),
2573            };
2574            Ok((project, replica_id as ReplicaId))
2575        })
2576        .await
2577    }
2578
2579    pub async fn leave_project(
2580        &self,
2581        project_id: ProjectId,
2582        connection: ConnectionId,
2583    ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
2584        let room_id = self.room_id_for_project(project_id).await?;
2585        self.room_transaction(room_id, |tx| async move {
2586            let result = project_collaborator::Entity::delete_many()
2587                .filter(
2588                    Condition::all()
2589                        .add(project_collaborator::Column::ProjectId.eq(project_id))
2590                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2591                        .add(
2592                            project_collaborator::Column::ConnectionServerId
2593                                .eq(connection.owner_id as i32),
2594                        ),
2595                )
2596                .exec(&*tx)
2597                .await?;
2598            if result.rows_affected == 0 {
2599                Err(anyhow!("not a collaborator on this project"))?;
2600            }
2601
2602            let project = project::Entity::find_by_id(project_id)
2603                .one(&*tx)
2604                .await?
2605                .ok_or_else(|| anyhow!("no such project"))?;
2606            let collaborators = project
2607                .find_related(project_collaborator::Entity)
2608                .all(&*tx)
2609                .await?;
2610            let connection_ids = collaborators
2611                .into_iter()
2612                .map(|collaborator| collaborator.connection())
2613                .collect();
2614
2615            follower::Entity::delete_many()
2616                .filter(
2617                    Condition::any()
2618                        .add(
2619                            Condition::all()
2620                                .add(follower::Column::ProjectId.eq(project_id))
2621                                .add(
2622                                    follower::Column::LeaderConnectionServerId
2623                                        .eq(connection.owner_id),
2624                                )
2625                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
2626                        )
2627                        .add(
2628                            Condition::all()
2629                                .add(follower::Column::ProjectId.eq(project_id))
2630                                .add(
2631                                    follower::Column::FollowerConnectionServerId
2632                                        .eq(connection.owner_id),
2633                                )
2634                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
2635                        ),
2636                )
2637                .exec(&*tx)
2638                .await?;
2639
2640            let room = self.get_room(project.room_id, &tx).await?;
2641            let left_project = LeftProject {
2642                id: project_id,
2643                host_user_id: project.host_user_id,
2644                host_connection_id: project.host_connection()?,
2645                connection_ids,
2646            };
2647            Ok((room, left_project))
2648        })
2649        .await
2650    }
2651
2652    pub async fn project_collaborators(
2653        &self,
2654        project_id: ProjectId,
2655        connection_id: ConnectionId,
2656    ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
2657        let room_id = self.room_id_for_project(project_id).await?;
2658        self.room_transaction(room_id, |tx| async move {
2659            let collaborators = project_collaborator::Entity::find()
2660                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2661                .all(&*tx)
2662                .await?
2663                .into_iter()
2664                .map(|collaborator| ProjectCollaborator {
2665                    connection_id: collaborator.connection(),
2666                    user_id: collaborator.user_id,
2667                    replica_id: collaborator.replica_id,
2668                    is_host: collaborator.is_host,
2669                })
2670                .collect::<Vec<_>>();
2671
2672            if collaborators
2673                .iter()
2674                .any(|collaborator| collaborator.connection_id == connection_id)
2675            {
2676                Ok(collaborators)
2677            } else {
2678                Err(anyhow!("no such project"))?
2679            }
2680        })
2681        .await
2682    }
2683
2684    pub async fn project_connection_ids(
2685        &self,
2686        project_id: ProjectId,
2687        connection_id: ConnectionId,
2688    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
2689        let room_id = self.room_id_for_project(project_id).await?;
2690        self.room_transaction(room_id, |tx| async move {
2691            let mut collaborators = project_collaborator::Entity::find()
2692                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2693                .stream(&*tx)
2694                .await?;
2695
2696            let mut connection_ids = HashSet::default();
2697            while let Some(collaborator) = collaborators.next().await {
2698                let collaborator = collaborator?;
2699                connection_ids.insert(collaborator.connection());
2700            }
2701
2702            if connection_ids.contains(&connection_id) {
2703                Ok(connection_ids)
2704            } else {
2705                Err(anyhow!("no such project"))?
2706            }
2707        })
2708        .await
2709    }
2710
2711    async fn project_guest_connection_ids(
2712        &self,
2713        project_id: ProjectId,
2714        tx: &DatabaseTransaction,
2715    ) -> Result<Vec<ConnectionId>> {
2716        let mut collaborators = project_collaborator::Entity::find()
2717            .filter(
2718                project_collaborator::Column::ProjectId
2719                    .eq(project_id)
2720                    .and(project_collaborator::Column::IsHost.eq(false)),
2721            )
2722            .stream(tx)
2723            .await?;
2724
2725        let mut guest_connection_ids = Vec::new();
2726        while let Some(collaborator) = collaborators.next().await {
2727            let collaborator = collaborator?;
2728            guest_connection_ids.push(collaborator.connection());
2729        }
2730        Ok(guest_connection_ids)
2731    }
2732
2733    async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
2734        self.transaction(|tx| async move {
2735            let project = project::Entity::find_by_id(project_id)
2736                .one(&*tx)
2737                .await?
2738                .ok_or_else(|| anyhow!("project {} not found", project_id))?;
2739            Ok(project.room_id)
2740        })
2741        .await
2742    }
2743
2744    // access tokens
2745
2746    pub async fn create_access_token_hash(
2747        &self,
2748        user_id: UserId,
2749        access_token_hash: &str,
2750        max_access_token_count: usize,
2751    ) -> Result<()> {
2752        self.transaction(|tx| async {
2753            let tx = tx;
2754
2755            access_token::ActiveModel {
2756                user_id: ActiveValue::set(user_id),
2757                hash: ActiveValue::set(access_token_hash.into()),
2758                ..Default::default()
2759            }
2760            .insert(&*tx)
2761            .await?;
2762
2763            access_token::Entity::delete_many()
2764                .filter(
2765                    access_token::Column::Id.in_subquery(
2766                        Query::select()
2767                            .column(access_token::Column::Id)
2768                            .from(access_token::Entity)
2769                            .and_where(access_token::Column::UserId.eq(user_id))
2770                            .order_by(access_token::Column::Id, sea_orm::Order::Desc)
2771                            .limit(10000)
2772                            .offset(max_access_token_count as u64)
2773                            .to_owned(),
2774                    ),
2775                )
2776                .exec(&*tx)
2777                .await?;
2778            Ok(())
2779        })
2780        .await
2781    }
2782
2783    pub async fn get_access_token_hashes(&self, user_id: UserId) -> Result<Vec<String>> {
2784        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2785        enum QueryAs {
2786            Hash,
2787        }
2788
2789        self.transaction(|tx| async move {
2790            Ok(access_token::Entity::find()
2791                .select_only()
2792                .column(access_token::Column::Hash)
2793                .filter(access_token::Column::UserId.eq(user_id))
2794                .order_by_desc(access_token::Column::Id)
2795                .into_values::<_, QueryAs>()
2796                .all(&*tx)
2797                .await?)
2798        })
2799        .await
2800    }
2801
2802    async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
2803    where
2804        F: Send + Fn(TransactionHandle) -> Fut,
2805        Fut: Send + Future<Output = Result<T>>,
2806    {
2807        let body = async {
2808            loop {
2809                let (tx, result) = self.with_transaction(&f).await?;
2810                match result {
2811                    Ok(result) => {
2812                        match tx.commit().await.map_err(Into::into) {
2813                            Ok(()) => return Ok(result),
2814                            Err(error) => {
2815                                if is_serialization_error(&error) {
2816                                    // Retry (don't break the loop)
2817                                } else {
2818                                    return Err(error);
2819                                }
2820                            }
2821                        }
2822                    }
2823                    Err(error) => {
2824                        tx.rollback().await?;
2825                        if is_serialization_error(&error) {
2826                            // Retry (don't break the loop)
2827                        } else {
2828                            return Err(error);
2829                        }
2830                    }
2831                }
2832            }
2833        };
2834
2835        self.run(body).await
2836    }
2837
2838    async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
2839    where
2840        F: Send + Fn(TransactionHandle) -> Fut,
2841        Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
2842    {
2843        let body = async {
2844            loop {
2845                let (tx, result) = self.with_transaction(&f).await?;
2846                match result {
2847                    Ok(Some((room_id, data))) => {
2848                        let lock = self.rooms.entry(room_id).or_default().clone();
2849                        let _guard = lock.lock_owned().await;
2850                        match tx.commit().await.map_err(Into::into) {
2851                            Ok(()) => {
2852                                return Ok(Some(RoomGuard {
2853                                    data,
2854                                    _guard,
2855                                    _not_send: PhantomData,
2856                                }));
2857                            }
2858                            Err(error) => {
2859                                if is_serialization_error(&error) {
2860                                    // Retry (don't break the loop)
2861                                } else {
2862                                    return Err(error);
2863                                }
2864                            }
2865                        }
2866                    }
2867                    Ok(None) => {
2868                        match tx.commit().await.map_err(Into::into) {
2869                            Ok(()) => return Ok(None),
2870                            Err(error) => {
2871                                if is_serialization_error(&error) {
2872                                    // Retry (don't break the loop)
2873                                } else {
2874                                    return Err(error);
2875                                }
2876                            }
2877                        }
2878                    }
2879                    Err(error) => {
2880                        tx.rollback().await?;
2881                        if is_serialization_error(&error) {
2882                            // Retry (don't break the loop)
2883                        } else {
2884                            return Err(error);
2885                        }
2886                    }
2887                }
2888            }
2889        };
2890
2891        self.run(body).await
2892    }
2893
2894    async fn room_transaction<F, Fut, T>(&self, room_id: RoomId, f: F) -> Result<RoomGuard<T>>
2895    where
2896        F: Send + Fn(TransactionHandle) -> Fut,
2897        Fut: Send + Future<Output = Result<T>>,
2898    {
2899        let body = async {
2900            loop {
2901                let lock = self.rooms.entry(room_id).or_default().clone();
2902                let _guard = lock.lock_owned().await;
2903                let (tx, result) = self.with_transaction(&f).await?;
2904                match result {
2905                    Ok(data) => {
2906                        match tx.commit().await.map_err(Into::into) {
2907                            Ok(()) => {
2908                                return Ok(RoomGuard {
2909                                    data,
2910                                    _guard,
2911                                    _not_send: PhantomData,
2912                                });
2913                            }
2914                            Err(error) => {
2915                                if is_serialization_error(&error) {
2916                                    // Retry (don't break the loop)
2917                                } else {
2918                                    return Err(error);
2919                                }
2920                            }
2921                        }
2922                    }
2923                    Err(error) => {
2924                        tx.rollback().await?;
2925                        if is_serialization_error(&error) {
2926                            // Retry (don't break the loop)
2927                        } else {
2928                            return Err(error);
2929                        }
2930                    }
2931                }
2932            }
2933        };
2934
2935        self.run(body).await
2936    }
2937
2938    async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
2939    where
2940        F: Send + Fn(TransactionHandle) -> Fut,
2941        Fut: Send + Future<Output = Result<T>>,
2942    {
2943        let tx = self
2944            .pool
2945            .begin_with_config(Some(IsolationLevel::Serializable), None)
2946            .await?;
2947
2948        let mut tx = Arc::new(Some(tx));
2949        let result = f(TransactionHandle(tx.clone())).await;
2950        let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
2951            return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
2952        };
2953
2954        Ok((tx, result))
2955    }
2956
2957    async fn run<F, T>(&self, future: F) -> T
2958    where
2959        F: Future<Output = T>,
2960    {
2961        #[cfg(test)]
2962        {
2963            if let Some(background) = self.background.as_ref() {
2964                background.simulate_random_delay().await;
2965            }
2966
2967            self.runtime.as_ref().unwrap().block_on(future)
2968        }
2969
2970        #[cfg(not(test))]
2971        {
2972            future.await
2973        }
2974    }
2975}
2976
2977fn is_serialization_error(error: &Error) -> bool {
2978    const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
2979    match error {
2980        Error::Database(
2981            DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
2982            | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
2983        ) if error
2984            .as_database_error()
2985            .and_then(|error| error.code())
2986            .as_deref()
2987            == Some(SERIALIZATION_FAILURE_CODE) =>
2988        {
2989            true
2990        }
2991        _ => false,
2992    }
2993}
2994
2995struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
2996
2997impl Deref for TransactionHandle {
2998    type Target = DatabaseTransaction;
2999
3000    fn deref(&self) -> &Self::Target {
3001        self.0.as_ref().as_ref().unwrap()
3002    }
3003}
3004
3005pub struct RoomGuard<T> {
3006    data: T,
3007    _guard: OwnedMutexGuard<()>,
3008    _not_send: PhantomData<Rc<()>>,
3009}
3010
3011impl<T> Deref for RoomGuard<T> {
3012    type Target = T;
3013
3014    fn deref(&self) -> &T {
3015        &self.data
3016    }
3017}
3018
3019impl<T> DerefMut for RoomGuard<T> {
3020    fn deref_mut(&mut self) -> &mut T {
3021        &mut self.data
3022    }
3023}
3024
3025#[derive(Debug, Serialize, Deserialize)]
3026pub struct NewUserParams {
3027    pub github_login: String,
3028    pub github_user_id: i32,
3029    pub invite_count: i32,
3030}
3031
3032#[derive(Debug)]
3033pub struct NewUserResult {
3034    pub user_id: UserId,
3035    pub metrics_id: String,
3036    pub inviting_user_id: Option<UserId>,
3037    pub signup_device_id: Option<String>,
3038}
3039
3040fn random_invite_code() -> String {
3041    nanoid::nanoid!(16)
3042}
3043
3044fn random_email_confirmation_code() -> String {
3045    nanoid::nanoid!(64)
3046}
3047
3048macro_rules! id_type {
3049    ($name:ident) => {
3050        #[derive(
3051            Clone,
3052            Copy,
3053            Debug,
3054            Default,
3055            PartialEq,
3056            Eq,
3057            PartialOrd,
3058            Ord,
3059            Hash,
3060            Serialize,
3061            Deserialize,
3062        )]
3063        #[serde(transparent)]
3064        pub struct $name(pub i32);
3065
3066        impl $name {
3067            #[allow(unused)]
3068            pub const MAX: Self = Self(i32::MAX);
3069
3070            #[allow(unused)]
3071            pub fn from_proto(value: u64) -> Self {
3072                Self(value as i32)
3073            }
3074
3075            #[allow(unused)]
3076            pub fn to_proto(self) -> u64 {
3077                self.0 as u64
3078            }
3079        }
3080
3081        impl std::fmt::Display for $name {
3082            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
3083                self.0.fmt(f)
3084            }
3085        }
3086
3087        impl From<$name> for sea_query::Value {
3088            fn from(value: $name) -> Self {
3089                sea_query::Value::Int(Some(value.0))
3090            }
3091        }
3092
3093        impl sea_orm::TryGetable for $name {
3094            fn try_get(
3095                res: &sea_orm::QueryResult,
3096                pre: &str,
3097                col: &str,
3098            ) -> Result<Self, sea_orm::TryGetError> {
3099                Ok(Self(i32::try_get(res, pre, col)?))
3100            }
3101        }
3102
3103        impl sea_query::ValueType for $name {
3104            fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
3105                match v {
3106                    Value::TinyInt(Some(int)) => {
3107                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3108                    }
3109                    Value::SmallInt(Some(int)) => {
3110                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3111                    }
3112                    Value::Int(Some(int)) => {
3113                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3114                    }
3115                    Value::BigInt(Some(int)) => {
3116                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3117                    }
3118                    Value::TinyUnsigned(Some(int)) => {
3119                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3120                    }
3121                    Value::SmallUnsigned(Some(int)) => {
3122                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3123                    }
3124                    Value::Unsigned(Some(int)) => {
3125                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3126                    }
3127                    Value::BigUnsigned(Some(int)) => {
3128                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3129                    }
3130                    _ => Err(sea_query::ValueTypeErr),
3131                }
3132            }
3133
3134            fn type_name() -> String {
3135                stringify!($name).into()
3136            }
3137
3138            fn array_type() -> sea_query::ArrayType {
3139                sea_query::ArrayType::Int
3140            }
3141
3142            fn column_type() -> sea_query::ColumnType {
3143                sea_query::ColumnType::Integer(None)
3144            }
3145        }
3146
3147        impl sea_orm::TryFromU64 for $name {
3148            fn try_from_u64(n: u64) -> Result<Self, DbErr> {
3149                Ok(Self(n.try_into().map_err(|_| {
3150                    DbErr::ConvertFromU64(concat!(
3151                        "error converting ",
3152                        stringify!($name),
3153                        " to u64"
3154                    ))
3155                })?))
3156            }
3157        }
3158
3159        impl sea_query::Nullable for $name {
3160            fn null() -> Value {
3161                Value::Int(None)
3162            }
3163        }
3164    };
3165}
3166
3167id_type!(AccessTokenId);
3168id_type!(ContactId);
3169id_type!(FollowerId);
3170id_type!(RoomId);
3171id_type!(RoomParticipantId);
3172id_type!(ProjectId);
3173id_type!(ProjectCollaboratorId);
3174id_type!(ReplicaId);
3175id_type!(ServerId);
3176id_type!(SignupId);
3177id_type!(UserId);
3178
3179pub struct RejoinedRoom {
3180    pub room: proto::Room,
3181    pub rejoined_projects: Vec<RejoinedProject>,
3182    pub reshared_projects: Vec<ResharedProject>,
3183}
3184
3185pub struct ResharedProject {
3186    pub id: ProjectId,
3187    pub old_connection_id: ConnectionId,
3188    pub collaborators: Vec<ProjectCollaborator>,
3189    pub worktrees: Vec<proto::WorktreeMetadata>,
3190}
3191
3192pub struct RejoinedProject {
3193    pub id: ProjectId,
3194    pub old_connection_id: ConnectionId,
3195    pub collaborators: Vec<ProjectCollaborator>,
3196    pub worktrees: Vec<RejoinedWorktree>,
3197    pub language_servers: Vec<proto::LanguageServer>,
3198}
3199
3200#[derive(Debug)]
3201pub struct RejoinedWorktree {
3202    pub id: u64,
3203    pub abs_path: String,
3204    pub root_name: String,
3205    pub visible: bool,
3206    pub updated_entries: Vec<proto::Entry>,
3207    pub removed_entries: Vec<u64>,
3208    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3209    pub scan_id: u64,
3210    pub completed_scan_id: u64,
3211}
3212
3213pub struct LeftRoom {
3214    pub room: proto::Room,
3215    pub left_projects: HashMap<ProjectId, LeftProject>,
3216    pub canceled_calls_to_user_ids: Vec<UserId>,
3217}
3218
3219pub struct RefreshedRoom {
3220    pub room: proto::Room,
3221    pub stale_participant_user_ids: Vec<UserId>,
3222    pub canceled_calls_to_user_ids: Vec<UserId>,
3223}
3224
3225pub struct Project {
3226    pub collaborators: Vec<ProjectCollaborator>,
3227    pub worktrees: BTreeMap<u64, Worktree>,
3228    pub language_servers: Vec<proto::LanguageServer>,
3229}
3230
3231pub struct ProjectCollaborator {
3232    pub connection_id: ConnectionId,
3233    pub user_id: UserId,
3234    pub replica_id: ReplicaId,
3235    pub is_host: bool,
3236}
3237
3238impl ProjectCollaborator {
3239    pub fn to_proto(&self) -> proto::Collaborator {
3240        proto::Collaborator {
3241            peer_id: Some(self.connection_id.into()),
3242            replica_id: self.replica_id.0 as u32,
3243            user_id: self.user_id.to_proto(),
3244        }
3245    }
3246}
3247
3248#[derive(Debug)]
3249pub struct LeftProject {
3250    pub id: ProjectId,
3251    pub host_user_id: UserId,
3252    pub host_connection_id: ConnectionId,
3253    pub connection_ids: Vec<ConnectionId>,
3254}
3255
3256pub struct Worktree {
3257    pub id: u64,
3258    pub abs_path: String,
3259    pub root_name: String,
3260    pub visible: bool,
3261    pub entries: Vec<proto::Entry>,
3262    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3263    pub scan_id: u64,
3264    pub completed_scan_id: u64,
3265}
3266
3267#[cfg(test)]
3268pub use test::*;
3269
3270#[cfg(test)]
3271mod test {
3272    use super::*;
3273    use gpui::executor::Background;
3274    use lazy_static::lazy_static;
3275    use parking_lot::Mutex;
3276    use rand::prelude::*;
3277    use sea_orm::ConnectionTrait;
3278    use sqlx::migrate::MigrateDatabase;
3279    use std::sync::Arc;
3280
3281    pub struct TestDb {
3282        pub db: Option<Arc<Database>>,
3283        pub connection: Option<sqlx::AnyConnection>,
3284    }
3285
3286    impl TestDb {
3287        pub fn sqlite(background: Arc<Background>) -> Self {
3288            let url = format!("sqlite::memory:");
3289            let runtime = tokio::runtime::Builder::new_current_thread()
3290                .enable_io()
3291                .enable_time()
3292                .build()
3293                .unwrap();
3294
3295            let mut db = runtime.block_on(async {
3296                let mut options = ConnectOptions::new(url);
3297                options.max_connections(5);
3298                let db = Database::new(options).await.unwrap();
3299                let sql = include_str!(concat!(
3300                    env!("CARGO_MANIFEST_DIR"),
3301                    "/migrations.sqlite/20221109000000_test_schema.sql"
3302                ));
3303                db.pool
3304                    .execute(sea_orm::Statement::from_string(
3305                        db.pool.get_database_backend(),
3306                        sql.into(),
3307                    ))
3308                    .await
3309                    .unwrap();
3310                db
3311            });
3312
3313            db.background = Some(background);
3314            db.runtime = Some(runtime);
3315
3316            Self {
3317                db: Some(Arc::new(db)),
3318                connection: None,
3319            }
3320        }
3321
3322        pub fn postgres(background: Arc<Background>) -> Self {
3323            lazy_static! {
3324                static ref LOCK: Mutex<()> = Mutex::new(());
3325            }
3326
3327            let _guard = LOCK.lock();
3328            let mut rng = StdRng::from_entropy();
3329            let url = format!(
3330                "postgres://postgres@localhost/zed-test-{}",
3331                rng.gen::<u128>()
3332            );
3333            let runtime = tokio::runtime::Builder::new_current_thread()
3334                .enable_io()
3335                .enable_time()
3336                .build()
3337                .unwrap();
3338
3339            let mut db = runtime.block_on(async {
3340                sqlx::Postgres::create_database(&url)
3341                    .await
3342                    .expect("failed to create test db");
3343                let mut options = ConnectOptions::new(url);
3344                options
3345                    .max_connections(5)
3346                    .idle_timeout(Duration::from_secs(0));
3347                let db = Database::new(options).await.unwrap();
3348                let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
3349                db.migrate(Path::new(migrations_path), false).await.unwrap();
3350                db
3351            });
3352
3353            db.background = Some(background);
3354            db.runtime = Some(runtime);
3355
3356            Self {
3357                db: Some(Arc::new(db)),
3358                connection: None,
3359            }
3360        }
3361
3362        pub fn db(&self) -> &Arc<Database> {
3363            self.db.as_ref().unwrap()
3364        }
3365    }
3366
3367    impl Drop for TestDb {
3368        fn drop(&mut self) {
3369            let db = self.db.take().unwrap();
3370            if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
3371                db.runtime.as_ref().unwrap().block_on(async {
3372                    use util::ResultExt;
3373                    let query = "
3374                        SELECT pg_terminate_backend(pg_stat_activity.pid)
3375                        FROM pg_stat_activity
3376                        WHERE
3377                            pg_stat_activity.datname = current_database() AND
3378                            pid <> pg_backend_pid();
3379                    ";
3380                    db.pool
3381                        .execute(sea_orm::Statement::from_string(
3382                            db.pool.get_database_backend(),
3383                            query.into(),
3384                        ))
3385                        .await
3386                        .log_err();
3387                    sqlx::Postgres::drop_database(db.options.get_url())
3388                        .await
3389                        .log_err();
3390                })
3391            }
3392        }
3393    }
3394}