db.rs

   1mod access_token;
   2mod contact;
   3mod language_server;
   4mod project;
   5mod project_collaborator;
   6mod room;
   7mod room_participant;
   8mod server;
   9mod signup;
  10#[cfg(test)]
  11mod tests;
  12mod user;
  13mod worktree;
  14mod worktree_diagnostic_summary;
  15mod worktree_entry;
  16
  17use crate::{Error, Result};
  18use anyhow::anyhow;
  19use collections::{BTreeMap, HashMap, HashSet};
  20pub use contact::Contact;
  21use dashmap::DashMap;
  22use futures::StreamExt;
  23use hyper::StatusCode;
  24use rpc::{proto, ConnectionId};
  25use sea_orm::Condition;
  26pub use sea_orm::ConnectOptions;
  27use sea_orm::{
  28    entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
  29    DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
  30    Statement, TransactionTrait,
  31};
  32use sea_query::{Alias, Expr, OnConflict, Query};
  33use serde::{Deserialize, Serialize};
  34pub use signup::{Invite, NewSignup, WaitlistSummary};
  35use sqlx::migrate::{Migrate, Migration, MigrationSource};
  36use sqlx::Connection;
  37use std::ops::{Deref, DerefMut};
  38use std::path::Path;
  39use std::time::Duration;
  40use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
  41use tokio::sync::{Mutex, OwnedMutexGuard};
  42pub use user::Model as User;
  43
  44pub struct Database {
  45    options: ConnectOptions,
  46    pool: DatabaseConnection,
  47    rooms: DashMap<RoomId, Arc<Mutex<()>>>,
  48    #[cfg(test)]
  49    background: Option<std::sync::Arc<gpui::executor::Background>>,
  50    #[cfg(test)]
  51    runtime: Option<tokio::runtime::Runtime>,
  52}
  53
  54impl Database {
  55    pub async fn new(options: ConnectOptions) -> Result<Self> {
  56        Ok(Self {
  57            options: options.clone(),
  58            pool: sea_orm::Database::connect(options).await?,
  59            rooms: DashMap::with_capacity(16384),
  60            #[cfg(test)]
  61            background: None,
  62            #[cfg(test)]
  63            runtime: None,
  64        })
  65    }
  66
  67    #[cfg(test)]
  68    pub fn reset(&self) {
  69        self.rooms.clear();
  70    }
  71
  72    pub async fn migrate(
  73        &self,
  74        migrations_path: &Path,
  75        ignore_checksum_mismatch: bool,
  76    ) -> anyhow::Result<Vec<(Migration, Duration)>> {
  77        let migrations = MigrationSource::resolve(migrations_path)
  78            .await
  79            .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
  80
  81        let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
  82
  83        connection.ensure_migrations_table().await?;
  84        let applied_migrations: HashMap<_, _> = connection
  85            .list_applied_migrations()
  86            .await?
  87            .into_iter()
  88            .map(|m| (m.version, m))
  89            .collect();
  90
  91        let mut new_migrations = Vec::new();
  92        for migration in migrations {
  93            match applied_migrations.get(&migration.version) {
  94                Some(applied_migration) => {
  95                    if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
  96                    {
  97                        Err(anyhow!(
  98                            "checksum mismatch for applied migration {}",
  99                            migration.description
 100                        ))?;
 101                    }
 102                }
 103                None => {
 104                    let elapsed = connection.apply(&migration).await?;
 105                    new_migrations.push((migration, elapsed));
 106                }
 107            }
 108        }
 109
 110        Ok(new_migrations)
 111    }
 112
 113    pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
 114        self.transaction(|tx| async move {
 115            let server = server::ActiveModel {
 116                environment: ActiveValue::set(environment.into()),
 117                ..Default::default()
 118            }
 119            .insert(&*tx)
 120            .await?;
 121            Ok(server.id)
 122        })
 123        .await
 124    }
 125
 126    pub async fn stale_room_ids(
 127        &self,
 128        environment: &str,
 129        new_server_id: ServerId,
 130    ) -> Result<Vec<RoomId>> {
 131        self.transaction(|tx| async move {
 132            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 133            enum QueryAs {
 134                RoomId,
 135            }
 136
 137            let stale_server_epochs = self
 138                .stale_server_ids(environment, new_server_id, &tx)
 139                .await?;
 140            Ok(room_participant::Entity::find()
 141                .select_only()
 142                .column(room_participant::Column::RoomId)
 143                .distinct()
 144                .filter(
 145                    room_participant::Column::AnsweringConnectionServerId
 146                        .is_in(stale_server_epochs),
 147                )
 148                .into_values::<_, QueryAs>()
 149                .all(&*tx)
 150                .await?)
 151        })
 152        .await
 153    }
 154
 155    pub async fn refresh_room(
 156        &self,
 157        room_id: RoomId,
 158        new_server_id: ServerId,
 159    ) -> Result<RoomGuard<RefreshedRoom>> {
 160        self.room_transaction(|tx| async move {
 161            let stale_participant_filter = Condition::all()
 162                .add(room_participant::Column::RoomId.eq(room_id))
 163                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
 164                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
 165
 166            let stale_participant_user_ids = room_participant::Entity::find()
 167                .filter(stale_participant_filter.clone())
 168                .all(&*tx)
 169                .await?
 170                .into_iter()
 171                .map(|participant| participant.user_id)
 172                .collect::<Vec<_>>();
 173
 174            // Delete participants who failed to reconnect.
 175            room_participant::Entity::delete_many()
 176                .filter(stale_participant_filter)
 177                .exec(&*tx)
 178                .await?;
 179
 180            let room = self.get_room(room_id, &tx).await?;
 181            let mut canceled_calls_to_user_ids = Vec::new();
 182            // Delete the room if it becomes empty and cancel pending calls.
 183            if room.participants.is_empty() {
 184                canceled_calls_to_user_ids.extend(
 185                    room.pending_participants
 186                        .iter()
 187                        .map(|pending_participant| UserId::from_proto(pending_participant.user_id)),
 188                );
 189                room_participant::Entity::delete_many()
 190                    .filter(room_participant::Column::RoomId.eq(room_id))
 191                    .exec(&*tx)
 192                    .await?;
 193                room::Entity::delete_by_id(room_id).exec(&*tx).await?;
 194            }
 195
 196            Ok((
 197                room_id,
 198                RefreshedRoom {
 199                    room,
 200                    stale_participant_user_ids,
 201                    canceled_calls_to_user_ids,
 202                },
 203            ))
 204        })
 205        .await
 206    }
 207
 208    pub async fn delete_stale_servers(
 209        &self,
 210        environment: &str,
 211        new_server_id: ServerId,
 212    ) -> Result<()> {
 213        self.transaction(|tx| async move {
 214            server::Entity::delete_many()
 215                .filter(
 216                    Condition::all()
 217                        .add(server::Column::Environment.eq(environment))
 218                        .add(server::Column::Id.ne(new_server_id)),
 219                )
 220                .exec(&*tx)
 221                .await?;
 222            Ok(())
 223        })
 224        .await
 225    }
 226
 227    async fn stale_server_ids(
 228        &self,
 229        environment: &str,
 230        new_server_id: ServerId,
 231        tx: &DatabaseTransaction,
 232    ) -> Result<Vec<ServerId>> {
 233        let stale_servers = server::Entity::find()
 234            .filter(
 235                Condition::all()
 236                    .add(server::Column::Environment.eq(environment))
 237                    .add(server::Column::Id.ne(new_server_id)),
 238            )
 239            .all(&*tx)
 240            .await?;
 241        Ok(stale_servers.into_iter().map(|server| server.id).collect())
 242    }
 243
 244    // users
 245
 246    pub async fn create_user(
 247        &self,
 248        email_address: &str,
 249        admin: bool,
 250        params: NewUserParams,
 251    ) -> Result<NewUserResult> {
 252        self.transaction(|tx| async {
 253            let tx = tx;
 254            let user = user::Entity::insert(user::ActiveModel {
 255                email_address: ActiveValue::set(Some(email_address.into())),
 256                github_login: ActiveValue::set(params.github_login.clone()),
 257                github_user_id: ActiveValue::set(Some(params.github_user_id)),
 258                admin: ActiveValue::set(admin),
 259                metrics_id: ActiveValue::set(Uuid::new_v4()),
 260                ..Default::default()
 261            })
 262            .on_conflict(
 263                OnConflict::column(user::Column::GithubLogin)
 264                    .update_column(user::Column::GithubLogin)
 265                    .to_owned(),
 266            )
 267            .exec_with_returning(&*tx)
 268            .await?;
 269
 270            Ok(NewUserResult {
 271                user_id: user.id,
 272                metrics_id: user.metrics_id.to_string(),
 273                signup_device_id: None,
 274                inviting_user_id: None,
 275            })
 276        })
 277        .await
 278    }
 279
 280    pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
 281        self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
 282            .await
 283    }
 284
 285    pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
 286        self.transaction(|tx| async {
 287            let tx = tx;
 288            Ok(user::Entity::find()
 289                .filter(user::Column::Id.is_in(ids.iter().copied()))
 290                .all(&*tx)
 291                .await?)
 292        })
 293        .await
 294    }
 295
 296    pub async fn get_user_by_github_account(
 297        &self,
 298        github_login: &str,
 299        github_user_id: Option<i32>,
 300    ) -> Result<Option<User>> {
 301        self.transaction(|tx| async move {
 302            let tx = &*tx;
 303            if let Some(github_user_id) = github_user_id {
 304                if let Some(user_by_github_user_id) = user::Entity::find()
 305                    .filter(user::Column::GithubUserId.eq(github_user_id))
 306                    .one(tx)
 307                    .await?
 308                {
 309                    let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
 310                    user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
 311                    Ok(Some(user_by_github_user_id.update(tx).await?))
 312                } else if let Some(user_by_github_login) = user::Entity::find()
 313                    .filter(user::Column::GithubLogin.eq(github_login))
 314                    .one(tx)
 315                    .await?
 316                {
 317                    let mut user_by_github_login = user_by_github_login.into_active_model();
 318                    user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
 319                    Ok(Some(user_by_github_login.update(tx).await?))
 320                } else {
 321                    Ok(None)
 322                }
 323            } else {
 324                Ok(user::Entity::find()
 325                    .filter(user::Column::GithubLogin.eq(github_login))
 326                    .one(tx)
 327                    .await?)
 328            }
 329        })
 330        .await
 331    }
 332
 333    pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
 334        self.transaction(|tx| async move {
 335            Ok(user::Entity::find()
 336                .order_by_asc(user::Column::GithubLogin)
 337                .limit(limit as u64)
 338                .offset(page as u64 * limit as u64)
 339                .all(&*tx)
 340                .await?)
 341        })
 342        .await
 343    }
 344
 345    pub async fn get_users_with_no_invites(
 346        &self,
 347        invited_by_another_user: bool,
 348    ) -> Result<Vec<User>> {
 349        self.transaction(|tx| async move {
 350            Ok(user::Entity::find()
 351                .filter(
 352                    user::Column::InviteCount
 353                        .eq(0)
 354                        .and(if invited_by_another_user {
 355                            user::Column::InviterId.is_not_null()
 356                        } else {
 357                            user::Column::InviterId.is_null()
 358                        }),
 359                )
 360                .all(&*tx)
 361                .await?)
 362        })
 363        .await
 364    }
 365
 366    pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
 367        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 368        enum QueryAs {
 369            MetricsId,
 370        }
 371
 372        self.transaction(|tx| async move {
 373            let metrics_id: Uuid = user::Entity::find_by_id(id)
 374                .select_only()
 375                .column(user::Column::MetricsId)
 376                .into_values::<_, QueryAs>()
 377                .one(&*tx)
 378                .await?
 379                .ok_or_else(|| anyhow!("could not find user"))?;
 380            Ok(metrics_id.to_string())
 381        })
 382        .await
 383    }
 384
 385    pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
 386        self.transaction(|tx| async move {
 387            user::Entity::update_many()
 388                .filter(user::Column::Id.eq(id))
 389                .set(user::ActiveModel {
 390                    admin: ActiveValue::set(is_admin),
 391                    ..Default::default()
 392                })
 393                .exec(&*tx)
 394                .await?;
 395            Ok(())
 396        })
 397        .await
 398    }
 399
 400    pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
 401        self.transaction(|tx| async move {
 402            user::Entity::update_many()
 403                .filter(user::Column::Id.eq(id))
 404                .set(user::ActiveModel {
 405                    connected_once: ActiveValue::set(connected_once),
 406                    ..Default::default()
 407                })
 408                .exec(&*tx)
 409                .await?;
 410            Ok(())
 411        })
 412        .await
 413    }
 414
 415    pub async fn destroy_user(&self, id: UserId) -> Result<()> {
 416        self.transaction(|tx| async move {
 417            access_token::Entity::delete_many()
 418                .filter(access_token::Column::UserId.eq(id))
 419                .exec(&*tx)
 420                .await?;
 421            user::Entity::delete_by_id(id).exec(&*tx).await?;
 422            Ok(())
 423        })
 424        .await
 425    }
 426
 427    // contacts
 428
 429    pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
 430        #[derive(Debug, FromQueryResult)]
 431        struct ContactWithUserBusyStatuses {
 432            user_id_a: UserId,
 433            user_id_b: UserId,
 434            a_to_b: bool,
 435            accepted: bool,
 436            should_notify: bool,
 437            user_a_busy: bool,
 438            user_b_busy: bool,
 439        }
 440
 441        self.transaction(|tx| async move {
 442            let user_a_participant = Alias::new("user_a_participant");
 443            let user_b_participant = Alias::new("user_b_participant");
 444            let mut db_contacts = contact::Entity::find()
 445                .column_as(
 446                    Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
 447                        .is_not_null(),
 448                    "user_a_busy",
 449                )
 450                .column_as(
 451                    Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
 452                        .is_not_null(),
 453                    "user_b_busy",
 454                )
 455                .filter(
 456                    contact::Column::UserIdA
 457                        .eq(user_id)
 458                        .or(contact::Column::UserIdB.eq(user_id)),
 459                )
 460                .join_as(
 461                    JoinType::LeftJoin,
 462                    contact::Relation::UserARoomParticipant.def(),
 463                    user_a_participant,
 464                )
 465                .join_as(
 466                    JoinType::LeftJoin,
 467                    contact::Relation::UserBRoomParticipant.def(),
 468                    user_b_participant,
 469                )
 470                .into_model::<ContactWithUserBusyStatuses>()
 471                .stream(&*tx)
 472                .await?;
 473
 474            let mut contacts = Vec::new();
 475            while let Some(db_contact) = db_contacts.next().await {
 476                let db_contact = db_contact?;
 477                if db_contact.user_id_a == user_id {
 478                    if db_contact.accepted {
 479                        contacts.push(Contact::Accepted {
 480                            user_id: db_contact.user_id_b,
 481                            should_notify: db_contact.should_notify && db_contact.a_to_b,
 482                            busy: db_contact.user_b_busy,
 483                        });
 484                    } else if db_contact.a_to_b {
 485                        contacts.push(Contact::Outgoing {
 486                            user_id: db_contact.user_id_b,
 487                        })
 488                    } else {
 489                        contacts.push(Contact::Incoming {
 490                            user_id: db_contact.user_id_b,
 491                            should_notify: db_contact.should_notify,
 492                        });
 493                    }
 494                } else if db_contact.accepted {
 495                    contacts.push(Contact::Accepted {
 496                        user_id: db_contact.user_id_a,
 497                        should_notify: db_contact.should_notify && !db_contact.a_to_b,
 498                        busy: db_contact.user_a_busy,
 499                    });
 500                } else if db_contact.a_to_b {
 501                    contacts.push(Contact::Incoming {
 502                        user_id: db_contact.user_id_a,
 503                        should_notify: db_contact.should_notify,
 504                    });
 505                } else {
 506                    contacts.push(Contact::Outgoing {
 507                        user_id: db_contact.user_id_a,
 508                    });
 509                }
 510            }
 511
 512            contacts.sort_unstable_by_key(|contact| contact.user_id());
 513
 514            Ok(contacts)
 515        })
 516        .await
 517    }
 518
 519    pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
 520        self.transaction(|tx| async move {
 521            let participant = room_participant::Entity::find()
 522                .filter(room_participant::Column::UserId.eq(user_id))
 523                .one(&*tx)
 524                .await?;
 525            Ok(participant.is_some())
 526        })
 527        .await
 528    }
 529
 530    pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
 531        self.transaction(|tx| async move {
 532            let (id_a, id_b) = if user_id_1 < user_id_2 {
 533                (user_id_1, user_id_2)
 534            } else {
 535                (user_id_2, user_id_1)
 536            };
 537
 538            Ok(contact::Entity::find()
 539                .filter(
 540                    contact::Column::UserIdA
 541                        .eq(id_a)
 542                        .and(contact::Column::UserIdB.eq(id_b))
 543                        .and(contact::Column::Accepted.eq(true)),
 544                )
 545                .one(&*tx)
 546                .await?
 547                .is_some())
 548        })
 549        .await
 550    }
 551
 552    pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
 553        self.transaction(|tx| async move {
 554            let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
 555                (sender_id, receiver_id, true)
 556            } else {
 557                (receiver_id, sender_id, false)
 558            };
 559
 560            let rows_affected = contact::Entity::insert(contact::ActiveModel {
 561                user_id_a: ActiveValue::set(id_a),
 562                user_id_b: ActiveValue::set(id_b),
 563                a_to_b: ActiveValue::set(a_to_b),
 564                accepted: ActiveValue::set(false),
 565                should_notify: ActiveValue::set(true),
 566                ..Default::default()
 567            })
 568            .on_conflict(
 569                OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
 570                    .values([
 571                        (contact::Column::Accepted, true.into()),
 572                        (contact::Column::ShouldNotify, false.into()),
 573                    ])
 574                    .action_and_where(
 575                        contact::Column::Accepted.eq(false).and(
 576                            contact::Column::AToB
 577                                .eq(a_to_b)
 578                                .and(contact::Column::UserIdA.eq(id_b))
 579                                .or(contact::Column::AToB
 580                                    .ne(a_to_b)
 581                                    .and(contact::Column::UserIdA.eq(id_a))),
 582                        ),
 583                    )
 584                    .to_owned(),
 585            )
 586            .exec_without_returning(&*tx)
 587            .await?;
 588
 589            if rows_affected == 1 {
 590                Ok(())
 591            } else {
 592                Err(anyhow!("contact already requested"))?
 593            }
 594        })
 595        .await
 596    }
 597
 598    pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<()> {
 599        self.transaction(|tx| async move {
 600            let (id_a, id_b) = if responder_id < requester_id {
 601                (responder_id, requester_id)
 602            } else {
 603                (requester_id, responder_id)
 604            };
 605
 606            let result = contact::Entity::delete_many()
 607                .filter(
 608                    contact::Column::UserIdA
 609                        .eq(id_a)
 610                        .and(contact::Column::UserIdB.eq(id_b)),
 611                )
 612                .exec(&*tx)
 613                .await?;
 614
 615            if result.rows_affected == 1 {
 616                Ok(())
 617            } else {
 618                Err(anyhow!("no such contact"))?
 619            }
 620        })
 621        .await
 622    }
 623
 624    pub async fn dismiss_contact_notification(
 625        &self,
 626        user_id: UserId,
 627        contact_user_id: UserId,
 628    ) -> Result<()> {
 629        self.transaction(|tx| async move {
 630            let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
 631                (user_id, contact_user_id, true)
 632            } else {
 633                (contact_user_id, user_id, false)
 634            };
 635
 636            let result = contact::Entity::update_many()
 637                .set(contact::ActiveModel {
 638                    should_notify: ActiveValue::set(false),
 639                    ..Default::default()
 640                })
 641                .filter(
 642                    contact::Column::UserIdA
 643                        .eq(id_a)
 644                        .and(contact::Column::UserIdB.eq(id_b))
 645                        .and(
 646                            contact::Column::AToB
 647                                .eq(a_to_b)
 648                                .and(contact::Column::Accepted.eq(true))
 649                                .or(contact::Column::AToB
 650                                    .ne(a_to_b)
 651                                    .and(contact::Column::Accepted.eq(false))),
 652                        ),
 653                )
 654                .exec(&*tx)
 655                .await?;
 656            if result.rows_affected == 0 {
 657                Err(anyhow!("no such contact request"))?
 658            } else {
 659                Ok(())
 660            }
 661        })
 662        .await
 663    }
 664
 665    pub async fn respond_to_contact_request(
 666        &self,
 667        responder_id: UserId,
 668        requester_id: UserId,
 669        accept: bool,
 670    ) -> Result<()> {
 671        self.transaction(|tx| async move {
 672            let (id_a, id_b, a_to_b) = if responder_id < requester_id {
 673                (responder_id, requester_id, false)
 674            } else {
 675                (requester_id, responder_id, true)
 676            };
 677            let rows_affected = if accept {
 678                let result = contact::Entity::update_many()
 679                    .set(contact::ActiveModel {
 680                        accepted: ActiveValue::set(true),
 681                        should_notify: ActiveValue::set(true),
 682                        ..Default::default()
 683                    })
 684                    .filter(
 685                        contact::Column::UserIdA
 686                            .eq(id_a)
 687                            .and(contact::Column::UserIdB.eq(id_b))
 688                            .and(contact::Column::AToB.eq(a_to_b)),
 689                    )
 690                    .exec(&*tx)
 691                    .await?;
 692                result.rows_affected
 693            } else {
 694                let result = contact::Entity::delete_many()
 695                    .filter(
 696                        contact::Column::UserIdA
 697                            .eq(id_a)
 698                            .and(contact::Column::UserIdB.eq(id_b))
 699                            .and(contact::Column::AToB.eq(a_to_b))
 700                            .and(contact::Column::Accepted.eq(false)),
 701                    )
 702                    .exec(&*tx)
 703                    .await?;
 704
 705                result.rows_affected
 706            };
 707
 708            if rows_affected == 1 {
 709                Ok(())
 710            } else {
 711                Err(anyhow!("no such contact request"))?
 712            }
 713        })
 714        .await
 715    }
 716
 717    pub fn fuzzy_like_string(string: &str) -> String {
 718        let mut result = String::with_capacity(string.len() * 2 + 1);
 719        for c in string.chars() {
 720            if c.is_alphanumeric() {
 721                result.push('%');
 722                result.push(c);
 723            }
 724        }
 725        result.push('%');
 726        result
 727    }
 728
 729    pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
 730        self.transaction(|tx| async {
 731            let tx = tx;
 732            let like_string = Self::fuzzy_like_string(name_query);
 733            let query = "
 734                SELECT users.*
 735                FROM users
 736                WHERE github_login ILIKE $1
 737                ORDER BY github_login <-> $2
 738                LIMIT $3
 739            ";
 740
 741            Ok(user::Entity::find()
 742                .from_raw_sql(Statement::from_sql_and_values(
 743                    self.pool.get_database_backend(),
 744                    query.into(),
 745                    vec![like_string.into(), name_query.into(), limit.into()],
 746                ))
 747                .all(&*tx)
 748                .await?)
 749        })
 750        .await
 751    }
 752
 753    // signups
 754
 755    pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
 756        self.transaction(|tx| async move {
 757            signup::Entity::insert(signup::ActiveModel {
 758                email_address: ActiveValue::set(signup.email_address.clone()),
 759                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 760                email_confirmation_sent: ActiveValue::set(false),
 761                platform_mac: ActiveValue::set(signup.platform_mac),
 762                platform_windows: ActiveValue::set(signup.platform_windows),
 763                platform_linux: ActiveValue::set(signup.platform_linux),
 764                platform_unknown: ActiveValue::set(false),
 765                editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
 766                programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
 767                device_id: ActiveValue::set(signup.device_id.clone()),
 768                added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
 769                ..Default::default()
 770            })
 771            .on_conflict(
 772                OnConflict::column(signup::Column::EmailAddress)
 773                    .update_columns([
 774                        signup::Column::PlatformMac,
 775                        signup::Column::PlatformWindows,
 776                        signup::Column::PlatformLinux,
 777                        signup::Column::EditorFeatures,
 778                        signup::Column::ProgrammingLanguages,
 779                        signup::Column::DeviceId,
 780                        signup::Column::AddedToMailingList,
 781                    ])
 782                    .to_owned(),
 783            )
 784            .exec(&*tx)
 785            .await?;
 786            Ok(())
 787        })
 788        .await
 789    }
 790
 791    pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
 792        self.transaction(|tx| async move {
 793            let signup = signup::Entity::find()
 794                .filter(signup::Column::EmailAddress.eq(email_address))
 795                .one(&*tx)
 796                .await?
 797                .ok_or_else(|| {
 798                    anyhow!("signup with email address {} doesn't exist", email_address)
 799                })?;
 800
 801            Ok(signup)
 802        })
 803        .await
 804    }
 805
 806    pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
 807        self.transaction(|tx| async move {
 808            let query = "
 809                SELECT
 810                    COUNT(*) as count,
 811                    COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
 812                    COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
 813                    COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
 814                    COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
 815                FROM (
 816                    SELECT *
 817                    FROM signups
 818                    WHERE
 819                        NOT email_confirmation_sent
 820                ) AS unsent
 821            ";
 822            Ok(
 823                WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
 824                    self.pool.get_database_backend(),
 825                    query.into(),
 826                    vec![],
 827                ))
 828                .one(&*tx)
 829                .await?
 830                .ok_or_else(|| anyhow!("invalid result"))?,
 831            )
 832        })
 833        .await
 834    }
 835
 836    pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
 837        let emails = invites
 838            .iter()
 839            .map(|s| s.email_address.as_str())
 840            .collect::<Vec<_>>();
 841        self.transaction(|tx| async {
 842            let tx = tx;
 843            signup::Entity::update_many()
 844                .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
 845                .set(signup::ActiveModel {
 846                    email_confirmation_sent: ActiveValue::set(true),
 847                    ..Default::default()
 848                })
 849                .exec(&*tx)
 850                .await?;
 851            Ok(())
 852        })
 853        .await
 854    }
 855
 856    pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
 857        self.transaction(|tx| async move {
 858            Ok(signup::Entity::find()
 859                .select_only()
 860                .column(signup::Column::EmailAddress)
 861                .column(signup::Column::EmailConfirmationCode)
 862                .filter(
 863                    signup::Column::EmailConfirmationSent.eq(false).and(
 864                        signup::Column::PlatformMac
 865                            .eq(true)
 866                            .or(signup::Column::PlatformUnknown.eq(true)),
 867                    ),
 868                )
 869                .order_by_asc(signup::Column::CreatedAt)
 870                .limit(count as u64)
 871                .into_model()
 872                .all(&*tx)
 873                .await?)
 874        })
 875        .await
 876    }
 877
 878    // invite codes
 879
 880    pub async fn create_invite_from_code(
 881        &self,
 882        code: &str,
 883        email_address: &str,
 884        device_id: Option<&str>,
 885    ) -> Result<Invite> {
 886        self.transaction(|tx| async move {
 887            let existing_user = user::Entity::find()
 888                .filter(user::Column::EmailAddress.eq(email_address))
 889                .one(&*tx)
 890                .await?;
 891
 892            if existing_user.is_some() {
 893                Err(anyhow!("email address is already in use"))?;
 894            }
 895
 896            let inviting_user_with_invites = match user::Entity::find()
 897                .filter(
 898                    user::Column::InviteCode
 899                        .eq(code)
 900                        .and(user::Column::InviteCount.gt(0)),
 901                )
 902                .one(&*tx)
 903                .await?
 904            {
 905                Some(inviting_user) => inviting_user,
 906                None => {
 907                    return Err(Error::Http(
 908                        StatusCode::UNAUTHORIZED,
 909                        "unable to find an invite code with invites remaining".to_string(),
 910                    ))?
 911                }
 912            };
 913            user::Entity::update_many()
 914                .filter(
 915                    user::Column::Id
 916                        .eq(inviting_user_with_invites.id)
 917                        .and(user::Column::InviteCount.gt(0)),
 918                )
 919                .col_expr(
 920                    user::Column::InviteCount,
 921                    Expr::col(user::Column::InviteCount).sub(1),
 922                )
 923                .exec(&*tx)
 924                .await?;
 925
 926            let signup = signup::Entity::insert(signup::ActiveModel {
 927                email_address: ActiveValue::set(email_address.into()),
 928                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 929                email_confirmation_sent: ActiveValue::set(false),
 930                inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
 931                platform_linux: ActiveValue::set(false),
 932                platform_mac: ActiveValue::set(false),
 933                platform_windows: ActiveValue::set(false),
 934                platform_unknown: ActiveValue::set(true),
 935                device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
 936                ..Default::default()
 937            })
 938            .on_conflict(
 939                OnConflict::column(signup::Column::EmailAddress)
 940                    .update_column(signup::Column::InvitingUserId)
 941                    .to_owned(),
 942            )
 943            .exec_with_returning(&*tx)
 944            .await?;
 945
 946            Ok(Invite {
 947                email_address: signup.email_address,
 948                email_confirmation_code: signup.email_confirmation_code,
 949            })
 950        })
 951        .await
 952    }
 953
 954    pub async fn create_user_from_invite(
 955        &self,
 956        invite: &Invite,
 957        user: NewUserParams,
 958    ) -> Result<Option<NewUserResult>> {
 959        self.transaction(|tx| async {
 960            let tx = tx;
 961            let signup = signup::Entity::find()
 962                .filter(
 963                    signup::Column::EmailAddress
 964                        .eq(invite.email_address.as_str())
 965                        .and(
 966                            signup::Column::EmailConfirmationCode
 967                                .eq(invite.email_confirmation_code.as_str()),
 968                        ),
 969                )
 970                .one(&*tx)
 971                .await?
 972                .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
 973
 974            if signup.user_id.is_some() {
 975                return Ok(None);
 976            }
 977
 978            let user = user::Entity::insert(user::ActiveModel {
 979                email_address: ActiveValue::set(Some(invite.email_address.clone())),
 980                github_login: ActiveValue::set(user.github_login.clone()),
 981                github_user_id: ActiveValue::set(Some(user.github_user_id)),
 982                admin: ActiveValue::set(false),
 983                invite_count: ActiveValue::set(user.invite_count),
 984                invite_code: ActiveValue::set(Some(random_invite_code())),
 985                metrics_id: ActiveValue::set(Uuid::new_v4()),
 986                ..Default::default()
 987            })
 988            .on_conflict(
 989                OnConflict::column(user::Column::GithubLogin)
 990                    .update_columns([
 991                        user::Column::EmailAddress,
 992                        user::Column::GithubUserId,
 993                        user::Column::Admin,
 994                    ])
 995                    .to_owned(),
 996            )
 997            .exec_with_returning(&*tx)
 998            .await?;
 999
1000            let mut signup = signup.into_active_model();
1001            signup.user_id = ActiveValue::set(Some(user.id));
1002            let signup = signup.update(&*tx).await?;
1003
1004            if let Some(inviting_user_id) = signup.inviting_user_id {
1005                let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1006                    (inviting_user_id, user.id, true)
1007                } else {
1008                    (user.id, inviting_user_id, false)
1009                };
1010
1011                contact::Entity::insert(contact::ActiveModel {
1012                    user_id_a: ActiveValue::set(user_id_a),
1013                    user_id_b: ActiveValue::set(user_id_b),
1014                    a_to_b: ActiveValue::set(a_to_b),
1015                    should_notify: ActiveValue::set(true),
1016                    accepted: ActiveValue::set(true),
1017                    ..Default::default()
1018                })
1019                .on_conflict(OnConflict::new().do_nothing().to_owned())
1020                .exec_without_returning(&*tx)
1021                .await?;
1022            }
1023
1024            Ok(Some(NewUserResult {
1025                user_id: user.id,
1026                metrics_id: user.metrics_id.to_string(),
1027                inviting_user_id: signup.inviting_user_id,
1028                signup_device_id: signup.device_id,
1029            }))
1030        })
1031        .await
1032    }
1033
1034    pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1035        self.transaction(|tx| async move {
1036            if count > 0 {
1037                user::Entity::update_many()
1038                    .filter(
1039                        user::Column::Id
1040                            .eq(id)
1041                            .and(user::Column::InviteCode.is_null()),
1042                    )
1043                    .set(user::ActiveModel {
1044                        invite_code: ActiveValue::set(Some(random_invite_code())),
1045                        ..Default::default()
1046                    })
1047                    .exec(&*tx)
1048                    .await?;
1049            }
1050
1051            user::Entity::update_many()
1052                .filter(user::Column::Id.eq(id))
1053                .set(user::ActiveModel {
1054                    invite_count: ActiveValue::set(count),
1055                    ..Default::default()
1056                })
1057                .exec(&*tx)
1058                .await?;
1059            Ok(())
1060        })
1061        .await
1062    }
1063
1064    pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1065        self.transaction(|tx| async move {
1066            match user::Entity::find_by_id(id).one(&*tx).await? {
1067                Some(user) if user.invite_code.is_some() => {
1068                    Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1069                }
1070                _ => Ok(None),
1071            }
1072        })
1073        .await
1074    }
1075
1076    pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1077        self.transaction(|tx| async move {
1078            user::Entity::find()
1079                .filter(user::Column::InviteCode.eq(code))
1080                .one(&*tx)
1081                .await?
1082                .ok_or_else(|| {
1083                    Error::Http(
1084                        StatusCode::NOT_FOUND,
1085                        "that invite code does not exist".to_string(),
1086                    )
1087                })
1088        })
1089        .await
1090    }
1091
1092    // rooms
1093
1094    pub async fn incoming_call_for_user(
1095        &self,
1096        user_id: UserId,
1097    ) -> Result<Option<proto::IncomingCall>> {
1098        self.transaction(|tx| async move {
1099            let pending_participant = room_participant::Entity::find()
1100                .filter(
1101                    room_participant::Column::UserId
1102                        .eq(user_id)
1103                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1104                )
1105                .one(&*tx)
1106                .await?;
1107
1108            if let Some(pending_participant) = pending_participant {
1109                let room = self.get_room(pending_participant.room_id, &tx).await?;
1110                Ok(Self::build_incoming_call(&room, user_id))
1111            } else {
1112                Ok(None)
1113            }
1114        })
1115        .await
1116    }
1117
1118    pub async fn create_room(
1119        &self,
1120        user_id: UserId,
1121        connection: ConnectionId,
1122        live_kit_room: &str,
1123    ) -> Result<RoomGuard<proto::Room>> {
1124        self.room_transaction(|tx| async move {
1125            let room = room::ActiveModel {
1126                live_kit_room: ActiveValue::set(live_kit_room.into()),
1127                ..Default::default()
1128            }
1129            .insert(&*tx)
1130            .await?;
1131            let room_id = room.id;
1132
1133            room_participant::ActiveModel {
1134                room_id: ActiveValue::set(room_id),
1135                user_id: ActiveValue::set(user_id),
1136                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1137                answering_connection_server_id: ActiveValue::set(Some(ServerId(
1138                    connection.owner_id as i32,
1139                ))),
1140                answering_connection_lost: ActiveValue::set(false),
1141                calling_user_id: ActiveValue::set(user_id),
1142                calling_connection_id: ActiveValue::set(connection.id as i32),
1143                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1144                    connection.owner_id as i32,
1145                ))),
1146                ..Default::default()
1147            }
1148            .insert(&*tx)
1149            .await?;
1150
1151            let room = self.get_room(room_id, &tx).await?;
1152            Ok((room_id, room))
1153        })
1154        .await
1155    }
1156
1157    pub async fn call(
1158        &self,
1159        room_id: RoomId,
1160        calling_user_id: UserId,
1161        calling_connection: ConnectionId,
1162        called_user_id: UserId,
1163        initial_project_id: Option<ProjectId>,
1164    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1165        self.room_transaction(|tx| async move {
1166            room_participant::ActiveModel {
1167                room_id: ActiveValue::set(room_id),
1168                user_id: ActiveValue::set(called_user_id),
1169                answering_connection_lost: ActiveValue::set(false),
1170                calling_user_id: ActiveValue::set(calling_user_id),
1171                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1172                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1173                    calling_connection.owner_id as i32,
1174                ))),
1175                initial_project_id: ActiveValue::set(initial_project_id),
1176                ..Default::default()
1177            }
1178            .insert(&*tx)
1179            .await?;
1180
1181            let room = self.get_room(room_id, &tx).await?;
1182            let incoming_call = Self::build_incoming_call(&room, called_user_id)
1183                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1184            Ok((room_id, (room, incoming_call)))
1185        })
1186        .await
1187    }
1188
1189    pub async fn call_failed(
1190        &self,
1191        room_id: RoomId,
1192        called_user_id: UserId,
1193    ) -> Result<RoomGuard<proto::Room>> {
1194        self.room_transaction(|tx| async move {
1195            room_participant::Entity::delete_many()
1196                .filter(
1197                    room_participant::Column::RoomId
1198                        .eq(room_id)
1199                        .and(room_participant::Column::UserId.eq(called_user_id)),
1200                )
1201                .exec(&*tx)
1202                .await?;
1203            let room = self.get_room(room_id, &tx).await?;
1204            Ok((room_id, room))
1205        })
1206        .await
1207    }
1208
1209    pub async fn decline_call(
1210        &self,
1211        expected_room_id: Option<RoomId>,
1212        user_id: UserId,
1213    ) -> Result<Option<RoomGuard<proto::Room>>> {
1214        self.optional_room_transaction(|tx| async move {
1215            let mut filter = Condition::all()
1216                .add(room_participant::Column::UserId.eq(user_id))
1217                .add(room_participant::Column::AnsweringConnectionId.is_null());
1218            if let Some(room_id) = expected_room_id {
1219                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1220            }
1221            let participant = room_participant::Entity::find()
1222                .filter(filter)
1223                .one(&*tx)
1224                .await?;
1225
1226            let participant = if let Some(participant) = participant {
1227                participant
1228            } else if expected_room_id.is_some() {
1229                return Err(anyhow!("could not find call to decline"))?;
1230            } else {
1231                return Ok(None);
1232            };
1233
1234            let room_id = participant.room_id;
1235            room_participant::Entity::delete(participant.into_active_model())
1236                .exec(&*tx)
1237                .await?;
1238
1239            let room = self.get_room(room_id, &tx).await?;
1240            Ok(Some((room_id, room)))
1241        })
1242        .await
1243    }
1244
1245    pub async fn cancel_call(
1246        &self,
1247        room_id: RoomId,
1248        calling_connection: ConnectionId,
1249        called_user_id: UserId,
1250    ) -> Result<RoomGuard<proto::Room>> {
1251        self.room_transaction(|tx| async move {
1252            let participant = room_participant::Entity::find()
1253                .filter(
1254                    Condition::all()
1255                        .add(room_participant::Column::UserId.eq(called_user_id))
1256                        .add(room_participant::Column::RoomId.eq(room_id))
1257                        .add(
1258                            room_participant::Column::CallingConnectionId
1259                                .eq(calling_connection.id as i32),
1260                        )
1261                        .add(
1262                            room_participant::Column::CallingConnectionServerId
1263                                .eq(calling_connection.owner_id as i32),
1264                        )
1265                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1266                )
1267                .one(&*tx)
1268                .await?
1269                .ok_or_else(|| anyhow!("no call to cancel"))?;
1270            let room_id = participant.room_id;
1271
1272            room_participant::Entity::delete(participant.into_active_model())
1273                .exec(&*tx)
1274                .await?;
1275
1276            let room = self.get_room(room_id, &tx).await?;
1277            Ok((room_id, room))
1278        })
1279        .await
1280    }
1281
1282    pub async fn join_room(
1283        &self,
1284        room_id: RoomId,
1285        user_id: UserId,
1286        connection: ConnectionId,
1287    ) -> Result<RoomGuard<proto::Room>> {
1288        self.room_transaction(|tx| async move {
1289            let result = room_participant::Entity::update_many()
1290                .filter(
1291                    Condition::all()
1292                        .add(room_participant::Column::RoomId.eq(room_id))
1293                        .add(room_participant::Column::UserId.eq(user_id))
1294                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1295                )
1296                .set(room_participant::ActiveModel {
1297                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1298                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1299                        connection.owner_id as i32,
1300                    ))),
1301                    answering_connection_lost: ActiveValue::set(false),
1302                    ..Default::default()
1303                })
1304                .exec(&*tx)
1305                .await?;
1306            if result.rows_affected == 0 {
1307                Err(anyhow!("room does not exist or was already joined"))?
1308            } else {
1309                let room = self.get_room(room_id, &tx).await?;
1310                Ok((room_id, room))
1311            }
1312        })
1313        .await
1314    }
1315
1316    pub async fn rejoin_room(
1317        &self,
1318        rejoin_room: proto::RejoinRoom,
1319        user_id: UserId,
1320        connection: ConnectionId,
1321    ) -> Result<RoomGuard<RejoinedRoom>> {
1322        self.room_transaction(|tx| async {
1323            let tx = tx;
1324            let room_id = RoomId::from_proto(rejoin_room.id);
1325            let participant_update = room_participant::Entity::update_many()
1326                .filter(
1327                    Condition::all()
1328                        .add(room_participant::Column::RoomId.eq(room_id))
1329                        .add(room_participant::Column::UserId.eq(user_id))
1330                        .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1331                        .add(
1332                            Condition::any()
1333                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1334                                .add(
1335                                    room_participant::Column::AnsweringConnectionServerId
1336                                        .ne(connection.owner_id as i32),
1337                                ),
1338                        ),
1339                )
1340                .set(room_participant::ActiveModel {
1341                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1342                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1343                        connection.owner_id as i32,
1344                    ))),
1345                    answering_connection_lost: ActiveValue::set(false),
1346                    ..Default::default()
1347                })
1348                .exec(&*tx)
1349                .await?;
1350            if participant_update.rows_affected == 0 {
1351                return Err(anyhow!("room does not exist or was already joined"))?;
1352            }
1353
1354            let mut reshared_projects = Vec::new();
1355            for reshared_project in &rejoin_room.reshared_projects {
1356                let project_id = ProjectId::from_proto(reshared_project.project_id);
1357                let project = project::Entity::find_by_id(project_id)
1358                    .one(&*tx)
1359                    .await?
1360                    .ok_or_else(|| anyhow!("project does not exist"))?;
1361                if project.host_user_id != user_id {
1362                    return Err(anyhow!("no such project"))?;
1363                }
1364
1365                let mut collaborators = project
1366                    .find_related(project_collaborator::Entity)
1367                    .all(&*tx)
1368                    .await?;
1369                let host_ix = collaborators
1370                    .iter()
1371                    .position(|collaborator| {
1372                        collaborator.user_id == user_id && collaborator.is_host
1373                    })
1374                    .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1375                let host = collaborators.swap_remove(host_ix);
1376                let old_connection_id = host.connection();
1377
1378                project::Entity::update(project::ActiveModel {
1379                    host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1380                    host_connection_server_id: ActiveValue::set(Some(ServerId(
1381                        connection.owner_id as i32,
1382                    ))),
1383                    ..project.into_active_model()
1384                })
1385                .exec(&*tx)
1386                .await?;
1387                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1388                    connection_id: ActiveValue::set(connection.id as i32),
1389                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1390                    ..host.into_active_model()
1391                })
1392                .exec(&*tx)
1393                .await?;
1394
1395                self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1396                    .await?;
1397
1398                reshared_projects.push(ResharedProject {
1399                    id: project_id,
1400                    old_connection_id,
1401                    collaborators: collaborators
1402                        .iter()
1403                        .map(|collaborator| ProjectCollaborator {
1404                            connection_id: collaborator.connection(),
1405                            user_id: collaborator.user_id,
1406                            replica_id: collaborator.replica_id,
1407                            is_host: collaborator.is_host,
1408                        })
1409                        .collect(),
1410                    worktrees: reshared_project.worktrees.clone(),
1411                });
1412            }
1413
1414            project::Entity::delete_many()
1415                .filter(
1416                    Condition::all()
1417                        .add(project::Column::RoomId.eq(room_id))
1418                        .add(project::Column::HostUserId.eq(user_id))
1419                        .add(
1420                            project::Column::Id
1421                                .is_not_in(reshared_projects.iter().map(|project| project.id)),
1422                        ),
1423                )
1424                .exec(&*tx)
1425                .await?;
1426
1427            let mut rejoined_projects = Vec::new();
1428            for rejoined_project in &rejoin_room.rejoined_projects {
1429                let project_id = ProjectId::from_proto(rejoined_project.id);
1430                let Some(project) = project::Entity::find_by_id(project_id)
1431                    .one(&*tx)
1432                    .await? else { continue };
1433
1434                let mut worktrees = Vec::new();
1435                let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1436                for db_worktree in db_worktrees {
1437                    let mut worktree = RejoinedWorktree {
1438                        id: db_worktree.id as u64,
1439                        abs_path: db_worktree.abs_path,
1440                        root_name: db_worktree.root_name,
1441                        visible: db_worktree.visible,
1442                        updated_entries: Default::default(),
1443                        removed_entries: Default::default(),
1444                        diagnostic_summaries: Default::default(),
1445                        scan_id: db_worktree.scan_id as u64,
1446                        is_complete: db_worktree.is_complete,
1447                    };
1448
1449                    let rejoined_worktree = rejoined_project
1450                        .worktrees
1451                        .iter()
1452                        .find(|worktree| worktree.id == db_worktree.id as u64);
1453                    let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1454                        worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1455                    } else {
1456                        worktree_entry::Column::IsDeleted.eq(false)
1457                    };
1458
1459                    let mut db_entries = worktree_entry::Entity::find()
1460                        .filter(
1461                            Condition::all()
1462                                .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1463                                .add(entry_filter),
1464                        )
1465                        .stream(&*tx)
1466                        .await?;
1467
1468                    while let Some(db_entry) = db_entries.next().await {
1469                        let db_entry = db_entry?;
1470                        if db_entry.is_deleted {
1471                            worktree.removed_entries.push(db_entry.id as u64);
1472                        } else {
1473                            worktree.updated_entries.push(proto::Entry {
1474                                id: db_entry.id as u64,
1475                                is_dir: db_entry.is_dir,
1476                                path: db_entry.path,
1477                                inode: db_entry.inode as u64,
1478                                mtime: Some(proto::Timestamp {
1479                                    seconds: db_entry.mtime_seconds as u64,
1480                                    nanos: db_entry.mtime_nanos as u32,
1481                                }),
1482                                is_symlink: db_entry.is_symlink,
1483                                is_ignored: db_entry.is_ignored,
1484                            });
1485                        }
1486                    }
1487
1488                    worktrees.push(worktree);
1489                }
1490
1491                let language_servers = project
1492                    .find_related(language_server::Entity)
1493                    .all(&*tx)
1494                    .await?
1495                    .into_iter()
1496                    .map(|language_server| proto::LanguageServer {
1497                        id: language_server.id as u64,
1498                        name: language_server.name,
1499                    })
1500                    .collect::<Vec<_>>();
1501
1502                let mut collaborators = project
1503                    .find_related(project_collaborator::Entity)
1504                    .all(&*tx)
1505                    .await?;
1506                let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1507                    .iter()
1508                    .position(|collaborator| collaborator.user_id == user_id)
1509                {
1510                    collaborators.swap_remove(self_collaborator_ix)
1511                } else {
1512                    continue;
1513                };
1514                let old_connection_id = self_collaborator.connection();
1515                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1516                    connection_id: ActiveValue::set(connection.id as i32),
1517                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1518                    ..self_collaborator.into_active_model()
1519                })
1520                .exec(&*tx)
1521                .await?;
1522
1523                let collaborators = collaborators
1524                    .into_iter()
1525                    .map(|collaborator| ProjectCollaborator {
1526                        connection_id: collaborator.connection(),
1527                        user_id: collaborator.user_id,
1528                        replica_id: collaborator.replica_id,
1529                        is_host: collaborator.is_host,
1530                    })
1531                    .collect::<Vec<_>>();
1532
1533                rejoined_projects.push(RejoinedProject {
1534                    id: project_id,
1535                    old_connection_id,
1536                    collaborators,
1537                    worktrees,
1538                    language_servers,
1539                });
1540            }
1541
1542            let room = self.get_room(room_id, &tx).await?;
1543            Ok((
1544                room_id,
1545                RejoinedRoom {
1546                    room,
1547                    rejoined_projects,
1548                    reshared_projects,
1549                },
1550            ))
1551        })
1552        .await
1553    }
1554
1555    pub async fn leave_room(
1556        &self,
1557        connection: ConnectionId,
1558    ) -> Result<Option<RoomGuard<LeftRoom>>> {
1559        self.optional_room_transaction(|tx| async move {
1560            let leaving_participant = room_participant::Entity::find()
1561                .filter(
1562                    Condition::all()
1563                        .add(
1564                            room_participant::Column::AnsweringConnectionId
1565                                .eq(connection.id as i32),
1566                        )
1567                        .add(
1568                            room_participant::Column::AnsweringConnectionServerId
1569                                .eq(connection.owner_id as i32),
1570                        ),
1571                )
1572                .one(&*tx)
1573                .await?;
1574
1575            if let Some(leaving_participant) = leaving_participant {
1576                // Leave room.
1577                let room_id = leaving_participant.room_id;
1578                room_participant::Entity::delete_by_id(leaving_participant.id)
1579                    .exec(&*tx)
1580                    .await?;
1581
1582                // Cancel pending calls initiated by the leaving user.
1583                let called_participants = room_participant::Entity::find()
1584                    .filter(
1585                        Condition::all()
1586                            .add(
1587                                room_participant::Column::CallingConnectionId
1588                                    .eq(connection.id as i32),
1589                            )
1590                            .add(
1591                                room_participant::Column::CallingConnectionServerId
1592                                    .eq(connection.owner_id as i32),
1593                            )
1594                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
1595                    )
1596                    .all(&*tx)
1597                    .await?;
1598                room_participant::Entity::delete_many()
1599                    .filter(
1600                        room_participant::Column::Id
1601                            .is_in(called_participants.iter().map(|participant| participant.id)),
1602                    )
1603                    .exec(&*tx)
1604                    .await?;
1605                let canceled_calls_to_user_ids = called_participants
1606                    .into_iter()
1607                    .map(|participant| participant.user_id)
1608                    .collect();
1609
1610                // Detect left projects.
1611                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1612                enum QueryProjectIds {
1613                    ProjectId,
1614                }
1615                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1616                    .select_only()
1617                    .column_as(
1618                        project_collaborator::Column::ProjectId,
1619                        QueryProjectIds::ProjectId,
1620                    )
1621                    .filter(
1622                        Condition::all()
1623                            .add(
1624                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1625                            )
1626                            .add(
1627                                project_collaborator::Column::ConnectionServerId
1628                                    .eq(connection.owner_id as i32),
1629                            ),
1630                    )
1631                    .into_values::<_, QueryProjectIds>()
1632                    .all(&*tx)
1633                    .await?;
1634                let mut left_projects = HashMap::default();
1635                let mut collaborators = project_collaborator::Entity::find()
1636                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1637                    .stream(&*tx)
1638                    .await?;
1639                while let Some(collaborator) = collaborators.next().await {
1640                    let collaborator = collaborator?;
1641                    let left_project =
1642                        left_projects
1643                            .entry(collaborator.project_id)
1644                            .or_insert(LeftProject {
1645                                id: collaborator.project_id,
1646                                host_user_id: Default::default(),
1647                                connection_ids: Default::default(),
1648                                host_connection_id: Default::default(),
1649                            });
1650
1651                    let collaborator_connection_id = collaborator.connection();
1652                    if collaborator_connection_id != connection {
1653                        left_project.connection_ids.push(collaborator_connection_id);
1654                    }
1655
1656                    if collaborator.is_host {
1657                        left_project.host_user_id = collaborator.user_id;
1658                        left_project.host_connection_id = collaborator_connection_id;
1659                    }
1660                }
1661                drop(collaborators);
1662
1663                // Leave projects.
1664                project_collaborator::Entity::delete_many()
1665                    .filter(
1666                        Condition::all()
1667                            .add(
1668                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1669                            )
1670                            .add(
1671                                project_collaborator::Column::ConnectionServerId
1672                                    .eq(connection.owner_id as i32),
1673                            ),
1674                    )
1675                    .exec(&*tx)
1676                    .await?;
1677
1678                // Unshare projects.
1679                project::Entity::delete_many()
1680                    .filter(
1681                        Condition::all()
1682                            .add(project::Column::RoomId.eq(room_id))
1683                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
1684                            .add(
1685                                project::Column::HostConnectionServerId
1686                                    .eq(connection.owner_id as i32),
1687                            ),
1688                    )
1689                    .exec(&*tx)
1690                    .await?;
1691
1692                let room = self.get_room(room_id, &tx).await?;
1693                if room.participants.is_empty() {
1694                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
1695                }
1696
1697                let left_room = LeftRoom {
1698                    room,
1699                    left_projects,
1700                    canceled_calls_to_user_ids,
1701                };
1702
1703                if left_room.room.participants.is_empty() {
1704                    self.rooms.remove(&room_id);
1705                }
1706
1707                Ok(Some((room_id, left_room)))
1708            } else {
1709                Ok(None)
1710            }
1711        })
1712        .await
1713    }
1714
1715    pub async fn update_room_participant_location(
1716        &self,
1717        room_id: RoomId,
1718        connection: ConnectionId,
1719        location: proto::ParticipantLocation,
1720    ) -> Result<RoomGuard<proto::Room>> {
1721        self.room_transaction(|tx| async {
1722            let tx = tx;
1723            let location_kind;
1724            let location_project_id;
1725            match location
1726                .variant
1727                .as_ref()
1728                .ok_or_else(|| anyhow!("invalid location"))?
1729            {
1730                proto::participant_location::Variant::SharedProject(project) => {
1731                    location_kind = 0;
1732                    location_project_id = Some(ProjectId::from_proto(project.id));
1733                }
1734                proto::participant_location::Variant::UnsharedProject(_) => {
1735                    location_kind = 1;
1736                    location_project_id = None;
1737                }
1738                proto::participant_location::Variant::External(_) => {
1739                    location_kind = 2;
1740                    location_project_id = None;
1741                }
1742            }
1743
1744            let result = room_participant::Entity::update_many()
1745                .filter(
1746                    Condition::all()
1747                        .add(room_participant::Column::RoomId.eq(room_id))
1748                        .add(
1749                            room_participant::Column::AnsweringConnectionId
1750                                .eq(connection.id as i32),
1751                        )
1752                        .add(
1753                            room_participant::Column::AnsweringConnectionServerId
1754                                .eq(connection.owner_id as i32),
1755                        ),
1756                )
1757                .set(room_participant::ActiveModel {
1758                    location_kind: ActiveValue::set(Some(location_kind)),
1759                    location_project_id: ActiveValue::set(location_project_id),
1760                    ..Default::default()
1761                })
1762                .exec(&*tx)
1763                .await?;
1764
1765            if result.rows_affected == 1 {
1766                let room = self.get_room(room_id, &tx).await?;
1767                Ok((room_id, room))
1768            } else {
1769                Err(anyhow!("could not update room participant location"))?
1770            }
1771        })
1772        .await
1773    }
1774
1775    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1776        self.transaction(|tx| async move {
1777            let participant = room_participant::Entity::find()
1778                .filter(
1779                    Condition::all()
1780                        .add(
1781                            room_participant::Column::AnsweringConnectionId
1782                                .eq(connection.id as i32),
1783                        )
1784                        .add(
1785                            room_participant::Column::AnsweringConnectionServerId
1786                                .eq(connection.owner_id as i32),
1787                        ),
1788                )
1789                .one(&*tx)
1790                .await?
1791                .ok_or_else(|| anyhow!("not a participant in any room"))?;
1792
1793            room_participant::Entity::update(room_participant::ActiveModel {
1794                answering_connection_lost: ActiveValue::set(true),
1795                ..participant.into_active_model()
1796            })
1797            .exec(&*tx)
1798            .await?;
1799
1800            Ok(())
1801        })
1802        .await
1803    }
1804
1805    fn build_incoming_call(
1806        room: &proto::Room,
1807        called_user_id: UserId,
1808    ) -> Option<proto::IncomingCall> {
1809        let pending_participant = room
1810            .pending_participants
1811            .iter()
1812            .find(|participant| participant.user_id == called_user_id.to_proto())?;
1813
1814        Some(proto::IncomingCall {
1815            room_id: room.id,
1816            calling_user_id: pending_participant.calling_user_id,
1817            participant_user_ids: room
1818                .participants
1819                .iter()
1820                .map(|participant| participant.user_id)
1821                .collect(),
1822            initial_project: room.participants.iter().find_map(|participant| {
1823                let initial_project_id = pending_participant.initial_project_id?;
1824                participant
1825                    .projects
1826                    .iter()
1827                    .find(|project| project.id == initial_project_id)
1828                    .cloned()
1829            }),
1830        })
1831    }
1832
1833    async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1834        let db_room = room::Entity::find_by_id(room_id)
1835            .one(tx)
1836            .await?
1837            .ok_or_else(|| anyhow!("could not find room"))?;
1838
1839        let mut db_participants = db_room
1840            .find_related(room_participant::Entity)
1841            .stream(tx)
1842            .await?;
1843        let mut participants = HashMap::default();
1844        let mut pending_participants = Vec::new();
1845        while let Some(db_participant) = db_participants.next().await {
1846            let db_participant = db_participant?;
1847            if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
1848                .answering_connection_id
1849                .zip(db_participant.answering_connection_server_id)
1850            {
1851                let location = match (
1852                    db_participant.location_kind,
1853                    db_participant.location_project_id,
1854                ) {
1855                    (Some(0), Some(project_id)) => {
1856                        Some(proto::participant_location::Variant::SharedProject(
1857                            proto::participant_location::SharedProject {
1858                                id: project_id.to_proto(),
1859                            },
1860                        ))
1861                    }
1862                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1863                        Default::default(),
1864                    )),
1865                    _ => Some(proto::participant_location::Variant::External(
1866                        Default::default(),
1867                    )),
1868                };
1869
1870                let answering_connection = ConnectionId {
1871                    owner_id: answering_connection_server_id.0 as u32,
1872                    id: answering_connection_id as u32,
1873                };
1874                participants.insert(
1875                    answering_connection,
1876                    proto::Participant {
1877                        user_id: db_participant.user_id.to_proto(),
1878                        peer_id: Some(answering_connection.into()),
1879                        projects: Default::default(),
1880                        location: Some(proto::ParticipantLocation { variant: location }),
1881                    },
1882                );
1883            } else {
1884                pending_participants.push(proto::PendingParticipant {
1885                    user_id: db_participant.user_id.to_proto(),
1886                    calling_user_id: db_participant.calling_user_id.to_proto(),
1887                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1888                });
1889            }
1890        }
1891        drop(db_participants);
1892
1893        let mut db_projects = db_room
1894            .find_related(project::Entity)
1895            .find_with_related(worktree::Entity)
1896            .stream(tx)
1897            .await?;
1898
1899        while let Some(row) = db_projects.next().await {
1900            let (db_project, db_worktree) = row?;
1901            let host_connection = db_project.host_connection()?;
1902            if let Some(participant) = participants.get_mut(&host_connection) {
1903                let project = if let Some(project) = participant
1904                    .projects
1905                    .iter_mut()
1906                    .find(|project| project.id == db_project.id.to_proto())
1907                {
1908                    project
1909                } else {
1910                    participant.projects.push(proto::ParticipantProject {
1911                        id: db_project.id.to_proto(),
1912                        worktree_root_names: Default::default(),
1913                    });
1914                    participant.projects.last_mut().unwrap()
1915                };
1916
1917                if let Some(db_worktree) = db_worktree {
1918                    project.worktree_root_names.push(db_worktree.root_name);
1919                }
1920            }
1921        }
1922
1923        Ok(proto::Room {
1924            id: db_room.id.to_proto(),
1925            live_kit_room: db_room.live_kit_room,
1926            participants: participants.into_values().collect(),
1927            pending_participants,
1928        })
1929    }
1930
1931    // projects
1932
1933    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
1934        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1935        enum QueryAs {
1936            Count,
1937        }
1938
1939        self.transaction(|tx| async move {
1940            Ok(project::Entity::find()
1941                .select_only()
1942                .column_as(project::Column::Id.count(), QueryAs::Count)
1943                .inner_join(user::Entity)
1944                .filter(user::Column::Admin.eq(false))
1945                .into_values::<_, QueryAs>()
1946                .one(&*tx)
1947                .await?
1948                .unwrap_or(0i64) as usize)
1949        })
1950        .await
1951    }
1952
1953    pub async fn share_project(
1954        &self,
1955        room_id: RoomId,
1956        connection: ConnectionId,
1957        worktrees: &[proto::WorktreeMetadata],
1958    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
1959        self.room_transaction(|tx| async move {
1960            let participant = room_participant::Entity::find()
1961                .filter(
1962                    Condition::all()
1963                        .add(
1964                            room_participant::Column::AnsweringConnectionId
1965                                .eq(connection.id as i32),
1966                        )
1967                        .add(
1968                            room_participant::Column::AnsweringConnectionServerId
1969                                .eq(connection.owner_id as i32),
1970                        ),
1971                )
1972                .one(&*tx)
1973                .await?
1974                .ok_or_else(|| anyhow!("could not find participant"))?;
1975            if participant.room_id != room_id {
1976                return Err(anyhow!("shared project on unexpected room"))?;
1977            }
1978
1979            let project = project::ActiveModel {
1980                room_id: ActiveValue::set(participant.room_id),
1981                host_user_id: ActiveValue::set(participant.user_id),
1982                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1983                host_connection_server_id: ActiveValue::set(Some(ServerId(
1984                    connection.owner_id as i32,
1985                ))),
1986                ..Default::default()
1987            }
1988            .insert(&*tx)
1989            .await?;
1990
1991            if !worktrees.is_empty() {
1992                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
1993                    worktree::ActiveModel {
1994                        id: ActiveValue::set(worktree.id as i64),
1995                        project_id: ActiveValue::set(project.id),
1996                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
1997                        root_name: ActiveValue::set(worktree.root_name.clone()),
1998                        visible: ActiveValue::set(worktree.visible),
1999                        scan_id: ActiveValue::set(0),
2000                        is_complete: ActiveValue::set(false),
2001                    }
2002                }))
2003                .exec(&*tx)
2004                .await?;
2005            }
2006
2007            project_collaborator::ActiveModel {
2008                project_id: ActiveValue::set(project.id),
2009                connection_id: ActiveValue::set(connection.id as i32),
2010                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2011                user_id: ActiveValue::set(participant.user_id),
2012                replica_id: ActiveValue::set(ReplicaId(0)),
2013                is_host: ActiveValue::set(true),
2014                ..Default::default()
2015            }
2016            .insert(&*tx)
2017            .await?;
2018
2019            let room = self.get_room(room_id, &tx).await?;
2020            Ok((room_id, (project.id, room)))
2021        })
2022        .await
2023    }
2024
2025    pub async fn unshare_project(
2026        &self,
2027        project_id: ProjectId,
2028        connection: ConnectionId,
2029    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2030        self.room_transaction(|tx| async move {
2031            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2032
2033            let project = project::Entity::find_by_id(project_id)
2034                .one(&*tx)
2035                .await?
2036                .ok_or_else(|| anyhow!("project not found"))?;
2037            if project.host_connection()? == connection {
2038                let room_id = project.room_id;
2039                project::Entity::delete(project.into_active_model())
2040                    .exec(&*tx)
2041                    .await?;
2042                let room = self.get_room(room_id, &tx).await?;
2043                Ok((room_id, (room, guest_connection_ids)))
2044            } else {
2045                Err(anyhow!("cannot unshare a project hosted by another user"))?
2046            }
2047        })
2048        .await
2049    }
2050
2051    pub async fn update_project(
2052        &self,
2053        project_id: ProjectId,
2054        connection: ConnectionId,
2055        worktrees: &[proto::WorktreeMetadata],
2056    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2057        self.room_transaction(|tx| async move {
2058            let project = project::Entity::find_by_id(project_id)
2059                .filter(
2060                    Condition::all()
2061                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2062                        .add(
2063                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2064                        ),
2065                )
2066                .one(&*tx)
2067                .await?
2068                .ok_or_else(|| anyhow!("no such project"))?;
2069
2070            self.update_project_worktrees(project.id, worktrees, &tx)
2071                .await?;
2072
2073            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2074            let room = self.get_room(project.room_id, &tx).await?;
2075            Ok((project.room_id, (room, guest_connection_ids)))
2076        })
2077        .await
2078    }
2079
2080    async fn update_project_worktrees(
2081        &self,
2082        project_id: ProjectId,
2083        worktrees: &[proto::WorktreeMetadata],
2084        tx: &DatabaseTransaction,
2085    ) -> Result<()> {
2086        if !worktrees.is_empty() {
2087            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2088                id: ActiveValue::set(worktree.id as i64),
2089                project_id: ActiveValue::set(project_id),
2090                abs_path: ActiveValue::set(worktree.abs_path.clone()),
2091                root_name: ActiveValue::set(worktree.root_name.clone()),
2092                visible: ActiveValue::set(worktree.visible),
2093                scan_id: ActiveValue::set(0),
2094                is_complete: ActiveValue::set(false),
2095            }))
2096            .on_conflict(
2097                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2098                    .update_column(worktree::Column::RootName)
2099                    .to_owned(),
2100            )
2101            .exec(&*tx)
2102            .await?;
2103        }
2104
2105        worktree::Entity::delete_many()
2106            .filter(worktree::Column::ProjectId.eq(project_id).and(
2107                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2108            ))
2109            .exec(&*tx)
2110            .await?;
2111
2112        Ok(())
2113    }
2114
2115    pub async fn update_worktree(
2116        &self,
2117        update: &proto::UpdateWorktree,
2118        connection: ConnectionId,
2119    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2120        self.room_transaction(|tx| async move {
2121            let project_id = ProjectId::from_proto(update.project_id);
2122            let worktree_id = update.worktree_id as i64;
2123
2124            // Ensure the update comes from the host.
2125            let project = project::Entity::find_by_id(project_id)
2126                .filter(
2127                    Condition::all()
2128                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2129                        .add(
2130                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2131                        ),
2132                )
2133                .one(&*tx)
2134                .await?
2135                .ok_or_else(|| anyhow!("no such project"))?;
2136            let room_id = project.room_id;
2137
2138            // Update metadata.
2139            worktree::Entity::update(worktree::ActiveModel {
2140                id: ActiveValue::set(worktree_id),
2141                project_id: ActiveValue::set(project_id),
2142                root_name: ActiveValue::set(update.root_name.clone()),
2143                scan_id: ActiveValue::set(update.scan_id as i64),
2144                is_complete: ActiveValue::set(update.is_last_update),
2145                abs_path: ActiveValue::set(update.abs_path.clone()),
2146                ..Default::default()
2147            })
2148            .exec(&*tx)
2149            .await?;
2150
2151            if !update.updated_entries.is_empty() {
2152                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2153                    let mtime = entry.mtime.clone().unwrap_or_default();
2154                    worktree_entry::ActiveModel {
2155                        project_id: ActiveValue::set(project_id),
2156                        worktree_id: ActiveValue::set(worktree_id),
2157                        id: ActiveValue::set(entry.id as i64),
2158                        is_dir: ActiveValue::set(entry.is_dir),
2159                        path: ActiveValue::set(entry.path.clone()),
2160                        inode: ActiveValue::set(entry.inode as i64),
2161                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2162                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2163                        is_symlink: ActiveValue::set(entry.is_symlink),
2164                        is_ignored: ActiveValue::set(entry.is_ignored),
2165                        is_deleted: ActiveValue::set(false),
2166                        scan_id: ActiveValue::set(update.scan_id as i64),
2167                    }
2168                }))
2169                .on_conflict(
2170                    OnConflict::columns([
2171                        worktree_entry::Column::ProjectId,
2172                        worktree_entry::Column::WorktreeId,
2173                        worktree_entry::Column::Id,
2174                    ])
2175                    .update_columns([
2176                        worktree_entry::Column::IsDir,
2177                        worktree_entry::Column::Path,
2178                        worktree_entry::Column::Inode,
2179                        worktree_entry::Column::MtimeSeconds,
2180                        worktree_entry::Column::MtimeNanos,
2181                        worktree_entry::Column::IsSymlink,
2182                        worktree_entry::Column::IsIgnored,
2183                    ])
2184                    .to_owned(),
2185                )
2186                .exec(&*tx)
2187                .await?;
2188            }
2189
2190            if !update.removed_entries.is_empty() {
2191                worktree_entry::Entity::update_many()
2192                    .filter(
2193                        worktree_entry::Column::ProjectId
2194                            .eq(project_id)
2195                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2196                            .and(
2197                                worktree_entry::Column::Id
2198                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2199                            ),
2200                    )
2201                    .set(worktree_entry::ActiveModel {
2202                        is_deleted: ActiveValue::Set(true),
2203                        scan_id: ActiveValue::Set(update.scan_id as i64),
2204                        ..Default::default()
2205                    })
2206                    .exec(&*tx)
2207                    .await?;
2208            }
2209
2210            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2211            Ok((room_id, connection_ids))
2212        })
2213        .await
2214    }
2215
2216    pub async fn update_diagnostic_summary(
2217        &self,
2218        update: &proto::UpdateDiagnosticSummary,
2219        connection: ConnectionId,
2220    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2221        self.room_transaction(|tx| async move {
2222            let project_id = ProjectId::from_proto(update.project_id);
2223            let worktree_id = update.worktree_id as i64;
2224            let summary = update
2225                .summary
2226                .as_ref()
2227                .ok_or_else(|| anyhow!("invalid summary"))?;
2228
2229            // Ensure the update comes from the host.
2230            let project = project::Entity::find_by_id(project_id)
2231                .one(&*tx)
2232                .await?
2233                .ok_or_else(|| anyhow!("no such project"))?;
2234            if project.host_connection()? != connection {
2235                return Err(anyhow!("can't update a project hosted by someone else"))?;
2236            }
2237
2238            // Update summary.
2239            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2240                project_id: ActiveValue::set(project_id),
2241                worktree_id: ActiveValue::set(worktree_id),
2242                path: ActiveValue::set(summary.path.clone()),
2243                language_server_id: ActiveValue::set(summary.language_server_id as i64),
2244                error_count: ActiveValue::set(summary.error_count as i32),
2245                warning_count: ActiveValue::set(summary.warning_count as i32),
2246                ..Default::default()
2247            })
2248            .on_conflict(
2249                OnConflict::columns([
2250                    worktree_diagnostic_summary::Column::ProjectId,
2251                    worktree_diagnostic_summary::Column::WorktreeId,
2252                    worktree_diagnostic_summary::Column::Path,
2253                ])
2254                .update_columns([
2255                    worktree_diagnostic_summary::Column::LanguageServerId,
2256                    worktree_diagnostic_summary::Column::ErrorCount,
2257                    worktree_diagnostic_summary::Column::WarningCount,
2258                ])
2259                .to_owned(),
2260            )
2261            .exec(&*tx)
2262            .await?;
2263
2264            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2265            Ok((project.room_id, connection_ids))
2266        })
2267        .await
2268    }
2269
2270    pub async fn start_language_server(
2271        &self,
2272        update: &proto::StartLanguageServer,
2273        connection: ConnectionId,
2274    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2275        self.room_transaction(|tx| async move {
2276            let project_id = ProjectId::from_proto(update.project_id);
2277            let server = update
2278                .server
2279                .as_ref()
2280                .ok_or_else(|| anyhow!("invalid language server"))?;
2281
2282            // Ensure the update comes from the host.
2283            let project = project::Entity::find_by_id(project_id)
2284                .one(&*tx)
2285                .await?
2286                .ok_or_else(|| anyhow!("no such project"))?;
2287            if project.host_connection()? != connection {
2288                return Err(anyhow!("can't update a project hosted by someone else"))?;
2289            }
2290
2291            // Add the newly-started language server.
2292            language_server::Entity::insert(language_server::ActiveModel {
2293                project_id: ActiveValue::set(project_id),
2294                id: ActiveValue::set(server.id as i64),
2295                name: ActiveValue::set(server.name.clone()),
2296                ..Default::default()
2297            })
2298            .on_conflict(
2299                OnConflict::columns([
2300                    language_server::Column::ProjectId,
2301                    language_server::Column::Id,
2302                ])
2303                .update_column(language_server::Column::Name)
2304                .to_owned(),
2305            )
2306            .exec(&*tx)
2307            .await?;
2308
2309            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2310            Ok((project.room_id, connection_ids))
2311        })
2312        .await
2313    }
2314
2315    pub async fn join_project(
2316        &self,
2317        project_id: ProjectId,
2318        connection: ConnectionId,
2319    ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2320        self.room_transaction(|tx| async move {
2321            let participant = room_participant::Entity::find()
2322                .filter(
2323                    Condition::all()
2324                        .add(
2325                            room_participant::Column::AnsweringConnectionId
2326                                .eq(connection.id as i32),
2327                        )
2328                        .add(
2329                            room_participant::Column::AnsweringConnectionServerId
2330                                .eq(connection.owner_id as i32),
2331                        ),
2332                )
2333                .one(&*tx)
2334                .await?
2335                .ok_or_else(|| anyhow!("must join a room first"))?;
2336
2337            let project = project::Entity::find_by_id(project_id)
2338                .one(&*tx)
2339                .await?
2340                .ok_or_else(|| anyhow!("no such project"))?;
2341            if project.room_id != participant.room_id {
2342                return Err(anyhow!("no such project"))?;
2343            }
2344
2345            let mut collaborators = project
2346                .find_related(project_collaborator::Entity)
2347                .all(&*tx)
2348                .await?;
2349            let replica_ids = collaborators
2350                .iter()
2351                .map(|c| c.replica_id)
2352                .collect::<HashSet<_>>();
2353            let mut replica_id = ReplicaId(1);
2354            while replica_ids.contains(&replica_id) {
2355                replica_id.0 += 1;
2356            }
2357            let new_collaborator = project_collaborator::ActiveModel {
2358                project_id: ActiveValue::set(project_id),
2359                connection_id: ActiveValue::set(connection.id as i32),
2360                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2361                user_id: ActiveValue::set(participant.user_id),
2362                replica_id: ActiveValue::set(replica_id),
2363                is_host: ActiveValue::set(false),
2364                ..Default::default()
2365            }
2366            .insert(&*tx)
2367            .await?;
2368            collaborators.push(new_collaborator);
2369
2370            let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2371            let mut worktrees = db_worktrees
2372                .into_iter()
2373                .map(|db_worktree| {
2374                    (
2375                        db_worktree.id as u64,
2376                        Worktree {
2377                            id: db_worktree.id as u64,
2378                            abs_path: db_worktree.abs_path,
2379                            root_name: db_worktree.root_name,
2380                            visible: db_worktree.visible,
2381                            entries: Default::default(),
2382                            diagnostic_summaries: Default::default(),
2383                            scan_id: db_worktree.scan_id as u64,
2384                            is_complete: db_worktree.is_complete,
2385                        },
2386                    )
2387                })
2388                .collect::<BTreeMap<_, _>>();
2389
2390            // Populate worktree entries.
2391            {
2392                let mut db_entries = worktree_entry::Entity::find()
2393                    .filter(worktree_entry::Column::ProjectId.eq(project_id))
2394                    .stream(&*tx)
2395                    .await?;
2396                while let Some(db_entry) = db_entries.next().await {
2397                    let db_entry = db_entry?;
2398                    if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2399                        worktree.entries.push(proto::Entry {
2400                            id: db_entry.id as u64,
2401                            is_dir: db_entry.is_dir,
2402                            path: db_entry.path,
2403                            inode: db_entry.inode as u64,
2404                            mtime: Some(proto::Timestamp {
2405                                seconds: db_entry.mtime_seconds as u64,
2406                                nanos: db_entry.mtime_nanos as u32,
2407                            }),
2408                            is_symlink: db_entry.is_symlink,
2409                            is_ignored: db_entry.is_ignored,
2410                        });
2411                    }
2412                }
2413            }
2414
2415            // Populate worktree diagnostic summaries.
2416            {
2417                let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2418                    .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2419                    .stream(&*tx)
2420                    .await?;
2421                while let Some(db_summary) = db_summaries.next().await {
2422                    let db_summary = db_summary?;
2423                    if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2424                        worktree
2425                            .diagnostic_summaries
2426                            .push(proto::DiagnosticSummary {
2427                                path: db_summary.path,
2428                                language_server_id: db_summary.language_server_id as u64,
2429                                error_count: db_summary.error_count as u32,
2430                                warning_count: db_summary.warning_count as u32,
2431                            });
2432                    }
2433                }
2434            }
2435
2436            // Populate language servers.
2437            let language_servers = project
2438                .find_related(language_server::Entity)
2439                .all(&*tx)
2440                .await?;
2441
2442            let room_id = project.room_id;
2443            let project = Project {
2444                collaborators: collaborators
2445                    .into_iter()
2446                    .map(|collaborator| ProjectCollaborator {
2447                        connection_id: collaborator.connection(),
2448                        user_id: collaborator.user_id,
2449                        replica_id: collaborator.replica_id,
2450                        is_host: collaborator.is_host,
2451                    })
2452                    .collect(),
2453                worktrees,
2454                language_servers: language_servers
2455                    .into_iter()
2456                    .map(|language_server| proto::LanguageServer {
2457                        id: language_server.id as u64,
2458                        name: language_server.name,
2459                    })
2460                    .collect(),
2461            };
2462            Ok((room_id, (project, replica_id as ReplicaId)))
2463        })
2464        .await
2465    }
2466
2467    pub async fn leave_project(
2468        &self,
2469        project_id: ProjectId,
2470        connection: ConnectionId,
2471    ) -> Result<RoomGuard<LeftProject>> {
2472        self.room_transaction(|tx| async move {
2473            let result = project_collaborator::Entity::delete_many()
2474                .filter(
2475                    Condition::all()
2476                        .add(project_collaborator::Column::ProjectId.eq(project_id))
2477                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2478                        .add(
2479                            project_collaborator::Column::ConnectionServerId
2480                                .eq(connection.owner_id as i32),
2481                        ),
2482                )
2483                .exec(&*tx)
2484                .await?;
2485            if result.rows_affected == 0 {
2486                Err(anyhow!("not a collaborator on this project"))?;
2487            }
2488
2489            let project = project::Entity::find_by_id(project_id)
2490                .one(&*tx)
2491                .await?
2492                .ok_or_else(|| anyhow!("no such project"))?;
2493            let collaborators = project
2494                .find_related(project_collaborator::Entity)
2495                .all(&*tx)
2496                .await?;
2497            let connection_ids = collaborators
2498                .into_iter()
2499                .map(|collaborator| collaborator.connection())
2500                .collect();
2501
2502            let left_project = LeftProject {
2503                id: project_id,
2504                host_user_id: project.host_user_id,
2505                host_connection_id: project.host_connection()?,
2506                connection_ids,
2507            };
2508            Ok((project.room_id, left_project))
2509        })
2510        .await
2511    }
2512
2513    pub async fn project_collaborators(
2514        &self,
2515        project_id: ProjectId,
2516        connection_id: ConnectionId,
2517    ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
2518        self.room_transaction(|tx| async move {
2519            let project = project::Entity::find_by_id(project_id)
2520                .one(&*tx)
2521                .await?
2522                .ok_or_else(|| anyhow!("no such project"))?;
2523            let collaborators = project_collaborator::Entity::find()
2524                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2525                .all(&*tx)
2526                .await?
2527                .into_iter()
2528                .map(|collaborator| ProjectCollaborator {
2529                    connection_id: collaborator.connection(),
2530                    user_id: collaborator.user_id,
2531                    replica_id: collaborator.replica_id,
2532                    is_host: collaborator.is_host,
2533                })
2534                .collect::<Vec<_>>();
2535
2536            if collaborators
2537                .iter()
2538                .any(|collaborator| collaborator.connection_id == connection_id)
2539            {
2540                Ok((project.room_id, collaborators))
2541            } else {
2542                Err(anyhow!("no such project"))?
2543            }
2544        })
2545        .await
2546    }
2547
2548    pub async fn project_connection_ids(
2549        &self,
2550        project_id: ProjectId,
2551        connection_id: ConnectionId,
2552    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
2553        self.room_transaction(|tx| async move {
2554            let project = project::Entity::find_by_id(project_id)
2555                .one(&*tx)
2556                .await?
2557                .ok_or_else(|| anyhow!("no such project"))?;
2558            let mut collaborators = project_collaborator::Entity::find()
2559                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2560                .stream(&*tx)
2561                .await?;
2562
2563            let mut connection_ids = HashSet::default();
2564            while let Some(collaborator) = collaborators.next().await {
2565                let collaborator = collaborator?;
2566                connection_ids.insert(collaborator.connection());
2567            }
2568
2569            if connection_ids.contains(&connection_id) {
2570                Ok((project.room_id, connection_ids))
2571            } else {
2572                Err(anyhow!("no such project"))?
2573            }
2574        })
2575        .await
2576    }
2577
2578    async fn project_guest_connection_ids(
2579        &self,
2580        project_id: ProjectId,
2581        tx: &DatabaseTransaction,
2582    ) -> Result<Vec<ConnectionId>> {
2583        let mut collaborators = project_collaborator::Entity::find()
2584            .filter(
2585                project_collaborator::Column::ProjectId
2586                    .eq(project_id)
2587                    .and(project_collaborator::Column::IsHost.eq(false)),
2588            )
2589            .stream(tx)
2590            .await?;
2591
2592        let mut guest_connection_ids = Vec::new();
2593        while let Some(collaborator) = collaborators.next().await {
2594            let collaborator = collaborator?;
2595            guest_connection_ids.push(collaborator.connection());
2596        }
2597        Ok(guest_connection_ids)
2598    }
2599
2600    // access tokens
2601
2602    pub async fn create_access_token_hash(
2603        &self,
2604        user_id: UserId,
2605        access_token_hash: &str,
2606        max_access_token_count: usize,
2607    ) -> Result<()> {
2608        self.transaction(|tx| async {
2609            let tx = tx;
2610
2611            access_token::ActiveModel {
2612                user_id: ActiveValue::set(user_id),
2613                hash: ActiveValue::set(access_token_hash.into()),
2614                ..Default::default()
2615            }
2616            .insert(&*tx)
2617            .await?;
2618
2619            access_token::Entity::delete_many()
2620                .filter(
2621                    access_token::Column::Id.in_subquery(
2622                        Query::select()
2623                            .column(access_token::Column::Id)
2624                            .from(access_token::Entity)
2625                            .and_where(access_token::Column::UserId.eq(user_id))
2626                            .order_by(access_token::Column::Id, sea_orm::Order::Desc)
2627                            .limit(10000)
2628                            .offset(max_access_token_count as u64)
2629                            .to_owned(),
2630                    ),
2631                )
2632                .exec(&*tx)
2633                .await?;
2634            Ok(())
2635        })
2636        .await
2637    }
2638
2639    pub async fn get_access_token_hashes(&self, user_id: UserId) -> Result<Vec<String>> {
2640        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2641        enum QueryAs {
2642            Hash,
2643        }
2644
2645        self.transaction(|tx| async move {
2646            Ok(access_token::Entity::find()
2647                .select_only()
2648                .column(access_token::Column::Hash)
2649                .filter(access_token::Column::UserId.eq(user_id))
2650                .order_by_desc(access_token::Column::Id)
2651                .into_values::<_, QueryAs>()
2652                .all(&*tx)
2653                .await?)
2654        })
2655        .await
2656    }
2657
2658    async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
2659    where
2660        F: Send + Fn(TransactionHandle) -> Fut,
2661        Fut: Send + Future<Output = Result<T>>,
2662    {
2663        let body = async {
2664            loop {
2665                let (tx, result) = self.with_transaction(&f).await?;
2666                match result {
2667                    Ok(result) => {
2668                        match tx.commit().await.map_err(Into::into) {
2669                            Ok(()) => return Ok(result),
2670                            Err(error) => {
2671                                if is_serialization_error(&error) {
2672                                    // Retry (don't break the loop)
2673                                } else {
2674                                    return Err(error);
2675                                }
2676                            }
2677                        }
2678                    }
2679                    Err(error) => {
2680                        tx.rollback().await?;
2681                        if is_serialization_error(&error) {
2682                            // Retry (don't break the loop)
2683                        } else {
2684                            return Err(error);
2685                        }
2686                    }
2687                }
2688            }
2689        };
2690
2691        self.run(body).await
2692    }
2693
2694    async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
2695    where
2696        F: Send + Fn(TransactionHandle) -> Fut,
2697        Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
2698    {
2699        let body = async {
2700            loop {
2701                let (tx, result) = self.with_transaction(&f).await?;
2702                match result {
2703                    Ok(Some((room_id, data))) => {
2704                        let lock = self.rooms.entry(room_id).or_default().clone();
2705                        let _guard = lock.lock_owned().await;
2706                        match tx.commit().await.map_err(Into::into) {
2707                            Ok(()) => {
2708                                return Ok(Some(RoomGuard {
2709                                    data,
2710                                    _guard,
2711                                    _not_send: PhantomData,
2712                                }));
2713                            }
2714                            Err(error) => {
2715                                if is_serialization_error(&error) {
2716                                    // Retry (don't break the loop)
2717                                } else {
2718                                    return Err(error);
2719                                }
2720                            }
2721                        }
2722                    }
2723                    Ok(None) => {
2724                        match tx.commit().await.map_err(Into::into) {
2725                            Ok(()) => return Ok(None),
2726                            Err(error) => {
2727                                if is_serialization_error(&error) {
2728                                    // Retry (don't break the loop)
2729                                } else {
2730                                    return Err(error);
2731                                }
2732                            }
2733                        }
2734                    }
2735                    Err(error) => {
2736                        tx.rollback().await?;
2737                        if is_serialization_error(&error) {
2738                            // Retry (don't break the loop)
2739                        } else {
2740                            return Err(error);
2741                        }
2742                    }
2743                }
2744            }
2745        };
2746
2747        self.run(body).await
2748    }
2749
2750    async fn room_transaction<F, Fut, T>(&self, f: F) -> Result<RoomGuard<T>>
2751    where
2752        F: Send + Fn(TransactionHandle) -> Fut,
2753        Fut: Send + Future<Output = Result<(RoomId, T)>>,
2754    {
2755        let data = self
2756            .optional_room_transaction(move |tx| {
2757                let future = f(tx);
2758                async {
2759                    let data = future.await?;
2760                    Ok(Some(data))
2761                }
2762            })
2763            .await?;
2764        Ok(data.unwrap())
2765    }
2766
2767    async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
2768    where
2769        F: Send + Fn(TransactionHandle) -> Fut,
2770        Fut: Send + Future<Output = Result<T>>,
2771    {
2772        let tx = self
2773            .pool
2774            .begin_with_config(Some(IsolationLevel::Serializable), None)
2775            .await?;
2776
2777        let mut tx = Arc::new(Some(tx));
2778        let result = f(TransactionHandle(tx.clone())).await;
2779        let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
2780            return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
2781        };
2782
2783        Ok((tx, result))
2784    }
2785
2786    async fn run<F, T>(&self, future: F) -> T
2787    where
2788        F: Future<Output = T>,
2789    {
2790        #[cfg(test)]
2791        {
2792            if let Some(background) = self.background.as_ref() {
2793                background.simulate_random_delay().await;
2794            }
2795
2796            self.runtime.as_ref().unwrap().block_on(future)
2797        }
2798
2799        #[cfg(not(test))]
2800        {
2801            future.await
2802        }
2803    }
2804}
2805
2806fn is_serialization_error(error: &Error) -> bool {
2807    const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
2808    match error {
2809        Error::Database(
2810            DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
2811            | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
2812        ) if error
2813            .as_database_error()
2814            .and_then(|error| error.code())
2815            .as_deref()
2816            == Some(SERIALIZATION_FAILURE_CODE) =>
2817        {
2818            true
2819        }
2820        _ => false,
2821    }
2822}
2823
2824struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
2825
2826impl Deref for TransactionHandle {
2827    type Target = DatabaseTransaction;
2828
2829    fn deref(&self) -> &Self::Target {
2830        self.0.as_ref().as_ref().unwrap()
2831    }
2832}
2833
2834pub struct RoomGuard<T> {
2835    data: T,
2836    _guard: OwnedMutexGuard<()>,
2837    _not_send: PhantomData<Rc<()>>,
2838}
2839
2840impl<T> Deref for RoomGuard<T> {
2841    type Target = T;
2842
2843    fn deref(&self) -> &T {
2844        &self.data
2845    }
2846}
2847
2848impl<T> DerefMut for RoomGuard<T> {
2849    fn deref_mut(&mut self) -> &mut T {
2850        &mut self.data
2851    }
2852}
2853
2854#[derive(Debug, Serialize, Deserialize)]
2855pub struct NewUserParams {
2856    pub github_login: String,
2857    pub github_user_id: i32,
2858    pub invite_count: i32,
2859}
2860
2861#[derive(Debug)]
2862pub struct NewUserResult {
2863    pub user_id: UserId,
2864    pub metrics_id: String,
2865    pub inviting_user_id: Option<UserId>,
2866    pub signup_device_id: Option<String>,
2867}
2868
2869fn random_invite_code() -> String {
2870    nanoid::nanoid!(16)
2871}
2872
2873fn random_email_confirmation_code() -> String {
2874    nanoid::nanoid!(64)
2875}
2876
2877macro_rules! id_type {
2878    ($name:ident) => {
2879        #[derive(
2880            Clone,
2881            Copy,
2882            Debug,
2883            Default,
2884            PartialEq,
2885            Eq,
2886            PartialOrd,
2887            Ord,
2888            Hash,
2889            Serialize,
2890            Deserialize,
2891        )]
2892        #[serde(transparent)]
2893        pub struct $name(pub i32);
2894
2895        impl $name {
2896            #[allow(unused)]
2897            pub const MAX: Self = Self(i32::MAX);
2898
2899            #[allow(unused)]
2900            pub fn from_proto(value: u64) -> Self {
2901                Self(value as i32)
2902            }
2903
2904            #[allow(unused)]
2905            pub fn to_proto(self) -> u64 {
2906                self.0 as u64
2907            }
2908        }
2909
2910        impl std::fmt::Display for $name {
2911            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2912                self.0.fmt(f)
2913            }
2914        }
2915
2916        impl From<$name> for sea_query::Value {
2917            fn from(value: $name) -> Self {
2918                sea_query::Value::Int(Some(value.0))
2919            }
2920        }
2921
2922        impl sea_orm::TryGetable for $name {
2923            fn try_get(
2924                res: &sea_orm::QueryResult,
2925                pre: &str,
2926                col: &str,
2927            ) -> Result<Self, sea_orm::TryGetError> {
2928                Ok(Self(i32::try_get(res, pre, col)?))
2929            }
2930        }
2931
2932        impl sea_query::ValueType for $name {
2933            fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
2934                match v {
2935                    Value::TinyInt(Some(int)) => {
2936                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2937                    }
2938                    Value::SmallInt(Some(int)) => {
2939                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2940                    }
2941                    Value::Int(Some(int)) => {
2942                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2943                    }
2944                    Value::BigInt(Some(int)) => {
2945                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2946                    }
2947                    Value::TinyUnsigned(Some(int)) => {
2948                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2949                    }
2950                    Value::SmallUnsigned(Some(int)) => {
2951                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2952                    }
2953                    Value::Unsigned(Some(int)) => {
2954                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2955                    }
2956                    Value::BigUnsigned(Some(int)) => {
2957                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2958                    }
2959                    _ => Err(sea_query::ValueTypeErr),
2960                }
2961            }
2962
2963            fn type_name() -> String {
2964                stringify!($name).into()
2965            }
2966
2967            fn array_type() -> sea_query::ArrayType {
2968                sea_query::ArrayType::Int
2969            }
2970
2971            fn column_type() -> sea_query::ColumnType {
2972                sea_query::ColumnType::Integer(None)
2973            }
2974        }
2975
2976        impl sea_orm::TryFromU64 for $name {
2977            fn try_from_u64(n: u64) -> Result<Self, DbErr> {
2978                Ok(Self(n.try_into().map_err(|_| {
2979                    DbErr::ConvertFromU64(concat!(
2980                        "error converting ",
2981                        stringify!($name),
2982                        " to u64"
2983                    ))
2984                })?))
2985            }
2986        }
2987
2988        impl sea_query::Nullable for $name {
2989            fn null() -> Value {
2990                Value::Int(None)
2991            }
2992        }
2993    };
2994}
2995
2996id_type!(AccessTokenId);
2997id_type!(ContactId);
2998id_type!(RoomId);
2999id_type!(RoomParticipantId);
3000id_type!(ProjectId);
3001id_type!(ProjectCollaboratorId);
3002id_type!(ReplicaId);
3003id_type!(ServerId);
3004id_type!(SignupId);
3005id_type!(UserId);
3006
3007pub struct RejoinedRoom {
3008    pub room: proto::Room,
3009    pub rejoined_projects: Vec<RejoinedProject>,
3010    pub reshared_projects: Vec<ResharedProject>,
3011}
3012
3013pub struct ResharedProject {
3014    pub id: ProjectId,
3015    pub old_connection_id: ConnectionId,
3016    pub collaborators: Vec<ProjectCollaborator>,
3017    pub worktrees: Vec<proto::WorktreeMetadata>,
3018}
3019
3020pub struct RejoinedProject {
3021    pub id: ProjectId,
3022    pub old_connection_id: ConnectionId,
3023    pub collaborators: Vec<ProjectCollaborator>,
3024    pub worktrees: Vec<RejoinedWorktree>,
3025    pub language_servers: Vec<proto::LanguageServer>,
3026}
3027
3028#[derive(Debug)]
3029pub struct RejoinedWorktree {
3030    pub id: u64,
3031    pub abs_path: String,
3032    pub root_name: String,
3033    pub visible: bool,
3034    pub updated_entries: Vec<proto::Entry>,
3035    pub removed_entries: Vec<u64>,
3036    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3037    pub scan_id: u64,
3038    pub is_complete: bool,
3039}
3040
3041pub struct LeftRoom {
3042    pub room: proto::Room,
3043    pub left_projects: HashMap<ProjectId, LeftProject>,
3044    pub canceled_calls_to_user_ids: Vec<UserId>,
3045}
3046
3047pub struct RefreshedRoom {
3048    pub room: proto::Room,
3049    pub stale_participant_user_ids: Vec<UserId>,
3050    pub canceled_calls_to_user_ids: Vec<UserId>,
3051}
3052
3053pub struct Project {
3054    pub collaborators: Vec<ProjectCollaborator>,
3055    pub worktrees: BTreeMap<u64, Worktree>,
3056    pub language_servers: Vec<proto::LanguageServer>,
3057}
3058
3059pub struct ProjectCollaborator {
3060    pub connection_id: ConnectionId,
3061    pub user_id: UserId,
3062    pub replica_id: ReplicaId,
3063    pub is_host: bool,
3064}
3065
3066impl ProjectCollaborator {
3067    pub fn to_proto(&self) -> proto::Collaborator {
3068        proto::Collaborator {
3069            peer_id: Some(self.connection_id.into()),
3070            replica_id: self.replica_id.0 as u32,
3071            user_id: self.user_id.to_proto(),
3072        }
3073    }
3074}
3075
3076#[derive(Debug)]
3077pub struct LeftProject {
3078    pub id: ProjectId,
3079    pub host_user_id: UserId,
3080    pub host_connection_id: ConnectionId,
3081    pub connection_ids: Vec<ConnectionId>,
3082}
3083
3084pub struct Worktree {
3085    pub id: u64,
3086    pub abs_path: String,
3087    pub root_name: String,
3088    pub visible: bool,
3089    pub entries: Vec<proto::Entry>,
3090    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3091    pub scan_id: u64,
3092    pub is_complete: bool,
3093}
3094
3095#[cfg(test)]
3096pub use test::*;
3097
3098#[cfg(test)]
3099mod test {
3100    use super::*;
3101    use gpui::executor::Background;
3102    use lazy_static::lazy_static;
3103    use parking_lot::Mutex;
3104    use rand::prelude::*;
3105    use sea_orm::ConnectionTrait;
3106    use sqlx::migrate::MigrateDatabase;
3107    use std::sync::Arc;
3108
3109    pub struct TestDb {
3110        pub db: Option<Arc<Database>>,
3111        pub connection: Option<sqlx::AnyConnection>,
3112    }
3113
3114    impl TestDb {
3115        pub fn sqlite(background: Arc<Background>) -> Self {
3116            let url = format!("sqlite::memory:");
3117            let runtime = tokio::runtime::Builder::new_current_thread()
3118                .enable_io()
3119                .enable_time()
3120                .build()
3121                .unwrap();
3122
3123            let mut db = runtime.block_on(async {
3124                let mut options = ConnectOptions::new(url);
3125                options.max_connections(5);
3126                let db = Database::new(options).await.unwrap();
3127                let sql = include_str!(concat!(
3128                    env!("CARGO_MANIFEST_DIR"),
3129                    "/migrations.sqlite/20221109000000_test_schema.sql"
3130                ));
3131                db.pool
3132                    .execute(sea_orm::Statement::from_string(
3133                        db.pool.get_database_backend(),
3134                        sql.into(),
3135                    ))
3136                    .await
3137                    .unwrap();
3138                db
3139            });
3140
3141            db.background = Some(background);
3142            db.runtime = Some(runtime);
3143
3144            Self {
3145                db: Some(Arc::new(db)),
3146                connection: None,
3147            }
3148        }
3149
3150        pub fn postgres(background: Arc<Background>) -> Self {
3151            lazy_static! {
3152                static ref LOCK: Mutex<()> = Mutex::new(());
3153            }
3154
3155            let _guard = LOCK.lock();
3156            let mut rng = StdRng::from_entropy();
3157            let url = format!(
3158                "postgres://postgres@localhost/zed-test-{}",
3159                rng.gen::<u128>()
3160            );
3161            let runtime = tokio::runtime::Builder::new_current_thread()
3162                .enable_io()
3163                .enable_time()
3164                .build()
3165                .unwrap();
3166
3167            let mut db = runtime.block_on(async {
3168                sqlx::Postgres::create_database(&url)
3169                    .await
3170                    .expect("failed to create test db");
3171                let mut options = ConnectOptions::new(url);
3172                options
3173                    .max_connections(5)
3174                    .idle_timeout(Duration::from_secs(0));
3175                let db = Database::new(options).await.unwrap();
3176                let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
3177                db.migrate(Path::new(migrations_path), false).await.unwrap();
3178                db
3179            });
3180
3181            db.background = Some(background);
3182            db.runtime = Some(runtime);
3183
3184            Self {
3185                db: Some(Arc::new(db)),
3186                connection: None,
3187            }
3188        }
3189
3190        pub fn db(&self) -> &Arc<Database> {
3191            self.db.as_ref().unwrap()
3192        }
3193    }
3194
3195    impl Drop for TestDb {
3196        fn drop(&mut self) {
3197            let db = self.db.take().unwrap();
3198            if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
3199                db.runtime.as_ref().unwrap().block_on(async {
3200                    use util::ResultExt;
3201                    let query = "
3202                        SELECT pg_terminate_backend(pg_stat_activity.pid)
3203                        FROM pg_stat_activity
3204                        WHERE
3205                            pg_stat_activity.datname = current_database() AND
3206                            pid <> pg_backend_pid();
3207                    ";
3208                    db.pool
3209                        .execute(sea_orm::Statement::from_string(
3210                            db.pool.get_database_backend(),
3211                            query.into(),
3212                        ))
3213                        .await
3214                        .log_err();
3215                    sqlx::Postgres::drop_database(db.options.get_url())
3216                        .await
3217                        .log_err();
3218                })
3219            }
3220        }
3221    }
3222}