db.rs

   1mod access_token;
   2mod contact;
   3mod language_server;
   4mod project;
   5mod project_collaborator;
   6mod room;
   7mod room_participant;
   8mod server;
   9mod signup;
  10#[cfg(test)]
  11mod tests;
  12mod user;
  13mod worktree;
  14mod worktree_diagnostic_summary;
  15mod worktree_entry;
  16
  17use crate::{Error, Result};
  18use anyhow::anyhow;
  19use collections::{BTreeMap, HashMap, HashSet};
  20pub use contact::Contact;
  21use dashmap::DashMap;
  22use futures::StreamExt;
  23use hyper::StatusCode;
  24use rpc::{proto, ConnectionId};
  25use sea_orm::Condition;
  26pub use sea_orm::ConnectOptions;
  27use sea_orm::{
  28    entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
  29    DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
  30    Statement, TransactionTrait,
  31};
  32use sea_query::{Alias, Expr, OnConflict, Query};
  33use serde::{Deserialize, Serialize};
  34pub use signup::{Invite, NewSignup, WaitlistSummary};
  35use sqlx::migrate::{Migrate, Migration, MigrationSource};
  36use sqlx::Connection;
  37use std::ops::{Deref, DerefMut};
  38use std::path::Path;
  39use std::time::Duration;
  40use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
  41use tokio::sync::{Mutex, OwnedMutexGuard};
  42pub use user::Model as User;
  43
  44pub struct Database {
  45    options: ConnectOptions,
  46    pool: DatabaseConnection,
  47    rooms: DashMap<RoomId, Arc<Mutex<()>>>,
  48    #[cfg(test)]
  49    background: Option<std::sync::Arc<gpui::executor::Background>>,
  50    #[cfg(test)]
  51    runtime: Option<tokio::runtime::Runtime>,
  52}
  53
  54impl Database {
  55    pub async fn new(options: ConnectOptions) -> Result<Self> {
  56        Ok(Self {
  57            options: options.clone(),
  58            pool: sea_orm::Database::connect(options).await?,
  59            rooms: DashMap::with_capacity(16384),
  60            #[cfg(test)]
  61            background: None,
  62            #[cfg(test)]
  63            runtime: None,
  64        })
  65    }
  66
  67    #[cfg(test)]
  68    pub fn reset(&self) {
  69        self.rooms.clear();
  70    }
  71
  72    pub async fn migrate(
  73        &self,
  74        migrations_path: &Path,
  75        ignore_checksum_mismatch: bool,
  76    ) -> anyhow::Result<Vec<(Migration, Duration)>> {
  77        let migrations = MigrationSource::resolve(migrations_path)
  78            .await
  79            .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
  80
  81        let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
  82
  83        connection.ensure_migrations_table().await?;
  84        let applied_migrations: HashMap<_, _> = connection
  85            .list_applied_migrations()
  86            .await?
  87            .into_iter()
  88            .map(|m| (m.version, m))
  89            .collect();
  90
  91        let mut new_migrations = Vec::new();
  92        for migration in migrations {
  93            match applied_migrations.get(&migration.version) {
  94                Some(applied_migration) => {
  95                    if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
  96                    {
  97                        Err(anyhow!(
  98                            "checksum mismatch for applied migration {}",
  99                            migration.description
 100                        ))?;
 101                    }
 102                }
 103                None => {
 104                    let elapsed = connection.apply(&migration).await?;
 105                    new_migrations.push((migration, elapsed));
 106                }
 107            }
 108        }
 109
 110        Ok(new_migrations)
 111    }
 112
 113    pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
 114        self.transaction(|tx| async move {
 115            let server = server::ActiveModel {
 116                environment: ActiveValue::set(environment.into()),
 117                ..Default::default()
 118            }
 119            .insert(&*tx)
 120            .await?;
 121            Ok(server.id)
 122        })
 123        .await
 124    }
 125
 126    pub async fn stale_room_ids(
 127        &self,
 128        environment: &str,
 129        new_server_id: ServerId,
 130    ) -> Result<Vec<RoomId>> {
 131        self.transaction(|tx| async move {
 132            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 133            enum QueryAs {
 134                RoomId,
 135            }
 136
 137            let stale_server_epochs = self
 138                .stale_server_ids(environment, new_server_id, &tx)
 139                .await?;
 140            Ok(room_participant::Entity::find()
 141                .select_only()
 142                .column(room_participant::Column::RoomId)
 143                .distinct()
 144                .filter(
 145                    room_participant::Column::AnsweringConnectionServerId
 146                        .is_in(stale_server_epochs),
 147                )
 148                .into_values::<_, QueryAs>()
 149                .all(&*tx)
 150                .await?)
 151        })
 152        .await
 153    }
 154
 155    pub async fn refresh_room(
 156        &self,
 157        room_id: RoomId,
 158        new_server_id: ServerId,
 159    ) -> Result<RoomGuard<RefreshedRoom>> {
 160        self.room_transaction(|tx| async move {
 161            let stale_participant_filter = Condition::all()
 162                .add(room_participant::Column::RoomId.eq(room_id))
 163                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
 164                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
 165
 166            let stale_participant_user_ids = room_participant::Entity::find()
 167                .filter(stale_participant_filter.clone())
 168                .all(&*tx)
 169                .await?
 170                .into_iter()
 171                .map(|participant| participant.user_id)
 172                .collect::<Vec<_>>();
 173
 174            // Delete participants who failed to reconnect.
 175            room_participant::Entity::delete_many()
 176                .filter(stale_participant_filter)
 177                .exec(&*tx)
 178                .await?;
 179
 180            let room = self.get_room(room_id, &tx).await?;
 181            let mut canceled_calls_to_user_ids = Vec::new();
 182            // Delete the room if it becomes empty and cancel pending calls.
 183            if room.participants.is_empty() {
 184                canceled_calls_to_user_ids.extend(
 185                    room.pending_participants
 186                        .iter()
 187                        .map(|pending_participant| UserId::from_proto(pending_participant.user_id)),
 188                );
 189                room_participant::Entity::delete_many()
 190                    .filter(room_participant::Column::RoomId.eq(room_id))
 191                    .exec(&*tx)
 192                    .await?;
 193                room::Entity::delete_by_id(room_id).exec(&*tx).await?;
 194            }
 195
 196            Ok((
 197                room_id,
 198                RefreshedRoom {
 199                    room,
 200                    stale_participant_user_ids,
 201                    canceled_calls_to_user_ids,
 202                },
 203            ))
 204        })
 205        .await
 206    }
 207
 208    pub async fn delete_stale_servers(
 209        &self,
 210        environment: &str,
 211        new_server_id: ServerId,
 212    ) -> Result<()> {
 213        self.transaction(|tx| async move {
 214            server::Entity::delete_many()
 215                .filter(
 216                    Condition::all()
 217                        .add(server::Column::Environment.eq(environment))
 218                        .add(server::Column::Id.ne(new_server_id)),
 219                )
 220                .exec(&*tx)
 221                .await?;
 222            Ok(())
 223        })
 224        .await
 225    }
 226
 227    async fn stale_server_ids(
 228        &self,
 229        environment: &str,
 230        new_server_id: ServerId,
 231        tx: &DatabaseTransaction,
 232    ) -> Result<Vec<ServerId>> {
 233        let stale_servers = server::Entity::find()
 234            .filter(
 235                Condition::all()
 236                    .add(server::Column::Environment.eq(environment))
 237                    .add(server::Column::Id.ne(new_server_id)),
 238            )
 239            .all(&*tx)
 240            .await?;
 241        Ok(stale_servers.into_iter().map(|server| server.id).collect())
 242    }
 243
 244    // users
 245
 246    pub async fn create_user(
 247        &self,
 248        email_address: &str,
 249        admin: bool,
 250        params: NewUserParams,
 251    ) -> Result<NewUserResult> {
 252        self.transaction(|tx| async {
 253            let tx = tx;
 254            let user = user::Entity::insert(user::ActiveModel {
 255                email_address: ActiveValue::set(Some(email_address.into())),
 256                github_login: ActiveValue::set(params.github_login.clone()),
 257                github_user_id: ActiveValue::set(Some(params.github_user_id)),
 258                admin: ActiveValue::set(admin),
 259                metrics_id: ActiveValue::set(Uuid::new_v4()),
 260                ..Default::default()
 261            })
 262            .on_conflict(
 263                OnConflict::column(user::Column::GithubLogin)
 264                    .update_column(user::Column::GithubLogin)
 265                    .to_owned(),
 266            )
 267            .exec_with_returning(&*tx)
 268            .await?;
 269
 270            Ok(NewUserResult {
 271                user_id: user.id,
 272                metrics_id: user.metrics_id.to_string(),
 273                signup_device_id: None,
 274                inviting_user_id: None,
 275            })
 276        })
 277        .await
 278    }
 279
 280    pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
 281        self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
 282            .await
 283    }
 284
 285    pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
 286        self.transaction(|tx| async {
 287            let tx = tx;
 288            Ok(user::Entity::find()
 289                .filter(user::Column::Id.is_in(ids.iter().copied()))
 290                .all(&*tx)
 291                .await?)
 292        })
 293        .await
 294    }
 295
 296    pub async fn get_user_by_github_account(
 297        &self,
 298        github_login: &str,
 299        github_user_id: Option<i32>,
 300    ) -> Result<Option<User>> {
 301        self.transaction(|tx| async move {
 302            let tx = &*tx;
 303            if let Some(github_user_id) = github_user_id {
 304                if let Some(user_by_github_user_id) = user::Entity::find()
 305                    .filter(user::Column::GithubUserId.eq(github_user_id))
 306                    .one(tx)
 307                    .await?
 308                {
 309                    let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
 310                    user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
 311                    Ok(Some(user_by_github_user_id.update(tx).await?))
 312                } else if let Some(user_by_github_login) = user::Entity::find()
 313                    .filter(user::Column::GithubLogin.eq(github_login))
 314                    .one(tx)
 315                    .await?
 316                {
 317                    let mut user_by_github_login = user_by_github_login.into_active_model();
 318                    user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
 319                    Ok(Some(user_by_github_login.update(tx).await?))
 320                } else {
 321                    Ok(None)
 322                }
 323            } else {
 324                Ok(user::Entity::find()
 325                    .filter(user::Column::GithubLogin.eq(github_login))
 326                    .one(tx)
 327                    .await?)
 328            }
 329        })
 330        .await
 331    }
 332
 333    pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
 334        self.transaction(|tx| async move {
 335            Ok(user::Entity::find()
 336                .order_by_asc(user::Column::GithubLogin)
 337                .limit(limit as u64)
 338                .offset(page as u64 * limit as u64)
 339                .all(&*tx)
 340                .await?)
 341        })
 342        .await
 343    }
 344
 345    pub async fn get_users_with_no_invites(
 346        &self,
 347        invited_by_another_user: bool,
 348    ) -> Result<Vec<User>> {
 349        self.transaction(|tx| async move {
 350            Ok(user::Entity::find()
 351                .filter(
 352                    user::Column::InviteCount
 353                        .eq(0)
 354                        .and(if invited_by_another_user {
 355                            user::Column::InviterId.is_not_null()
 356                        } else {
 357                            user::Column::InviterId.is_null()
 358                        }),
 359                )
 360                .all(&*tx)
 361                .await?)
 362        })
 363        .await
 364    }
 365
 366    pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
 367        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 368        enum QueryAs {
 369            MetricsId,
 370        }
 371
 372        self.transaction(|tx| async move {
 373            let metrics_id: Uuid = user::Entity::find_by_id(id)
 374                .select_only()
 375                .column(user::Column::MetricsId)
 376                .into_values::<_, QueryAs>()
 377                .one(&*tx)
 378                .await?
 379                .ok_or_else(|| anyhow!("could not find user"))?;
 380            Ok(metrics_id.to_string())
 381        })
 382        .await
 383    }
 384
 385    pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
 386        self.transaction(|tx| async move {
 387            user::Entity::update_many()
 388                .filter(user::Column::Id.eq(id))
 389                .set(user::ActiveModel {
 390                    admin: ActiveValue::set(is_admin),
 391                    ..Default::default()
 392                })
 393                .exec(&*tx)
 394                .await?;
 395            Ok(())
 396        })
 397        .await
 398    }
 399
 400    pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
 401        self.transaction(|tx| async move {
 402            user::Entity::update_many()
 403                .filter(user::Column::Id.eq(id))
 404                .set(user::ActiveModel {
 405                    connected_once: ActiveValue::set(connected_once),
 406                    ..Default::default()
 407                })
 408                .exec(&*tx)
 409                .await?;
 410            Ok(())
 411        })
 412        .await
 413    }
 414
 415    pub async fn destroy_user(&self, id: UserId) -> Result<()> {
 416        self.transaction(|tx| async move {
 417            access_token::Entity::delete_many()
 418                .filter(access_token::Column::UserId.eq(id))
 419                .exec(&*tx)
 420                .await?;
 421            user::Entity::delete_by_id(id).exec(&*tx).await?;
 422            Ok(())
 423        })
 424        .await
 425    }
 426
 427    // contacts
 428
 429    pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
 430        #[derive(Debug, FromQueryResult)]
 431        struct ContactWithUserBusyStatuses {
 432            user_id_a: UserId,
 433            user_id_b: UserId,
 434            a_to_b: bool,
 435            accepted: bool,
 436            should_notify: bool,
 437            user_a_busy: bool,
 438            user_b_busy: bool,
 439        }
 440
 441        self.transaction(|tx| async move {
 442            let user_a_participant = Alias::new("user_a_participant");
 443            let user_b_participant = Alias::new("user_b_participant");
 444            let mut db_contacts = contact::Entity::find()
 445                .column_as(
 446                    Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
 447                        .is_not_null(),
 448                    "user_a_busy",
 449                )
 450                .column_as(
 451                    Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
 452                        .is_not_null(),
 453                    "user_b_busy",
 454                )
 455                .filter(
 456                    contact::Column::UserIdA
 457                        .eq(user_id)
 458                        .or(contact::Column::UserIdB.eq(user_id)),
 459                )
 460                .join_as(
 461                    JoinType::LeftJoin,
 462                    contact::Relation::UserARoomParticipant.def(),
 463                    user_a_participant,
 464                )
 465                .join_as(
 466                    JoinType::LeftJoin,
 467                    contact::Relation::UserBRoomParticipant.def(),
 468                    user_b_participant,
 469                )
 470                .into_model::<ContactWithUserBusyStatuses>()
 471                .stream(&*tx)
 472                .await?;
 473
 474            let mut contacts = Vec::new();
 475            while let Some(db_contact) = db_contacts.next().await {
 476                let db_contact = db_contact?;
 477                if db_contact.user_id_a == user_id {
 478                    if db_contact.accepted {
 479                        contacts.push(Contact::Accepted {
 480                            user_id: db_contact.user_id_b,
 481                            should_notify: db_contact.should_notify && db_contact.a_to_b,
 482                            busy: db_contact.user_b_busy,
 483                        });
 484                    } else if db_contact.a_to_b {
 485                        contacts.push(Contact::Outgoing {
 486                            user_id: db_contact.user_id_b,
 487                        })
 488                    } else {
 489                        contacts.push(Contact::Incoming {
 490                            user_id: db_contact.user_id_b,
 491                            should_notify: db_contact.should_notify,
 492                        });
 493                    }
 494                } else if db_contact.accepted {
 495                    contacts.push(Contact::Accepted {
 496                        user_id: db_contact.user_id_a,
 497                        should_notify: db_contact.should_notify && !db_contact.a_to_b,
 498                        busy: db_contact.user_a_busy,
 499                    });
 500                } else if db_contact.a_to_b {
 501                    contacts.push(Contact::Incoming {
 502                        user_id: db_contact.user_id_a,
 503                        should_notify: db_contact.should_notify,
 504                    });
 505                } else {
 506                    contacts.push(Contact::Outgoing {
 507                        user_id: db_contact.user_id_a,
 508                    });
 509                }
 510            }
 511
 512            contacts.sort_unstable_by_key(|contact| contact.user_id());
 513
 514            Ok(contacts)
 515        })
 516        .await
 517    }
 518
 519    pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
 520        self.transaction(|tx| async move {
 521            let participant = room_participant::Entity::find()
 522                .filter(room_participant::Column::UserId.eq(user_id))
 523                .one(&*tx)
 524                .await?;
 525            Ok(participant.is_some())
 526        })
 527        .await
 528    }
 529
 530    pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
 531        self.transaction(|tx| async move {
 532            let (id_a, id_b) = if user_id_1 < user_id_2 {
 533                (user_id_1, user_id_2)
 534            } else {
 535                (user_id_2, user_id_1)
 536            };
 537
 538            Ok(contact::Entity::find()
 539                .filter(
 540                    contact::Column::UserIdA
 541                        .eq(id_a)
 542                        .and(contact::Column::UserIdB.eq(id_b))
 543                        .and(contact::Column::Accepted.eq(true)),
 544                )
 545                .one(&*tx)
 546                .await?
 547                .is_some())
 548        })
 549        .await
 550    }
 551
 552    pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
 553        self.transaction(|tx| async move {
 554            let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
 555                (sender_id, receiver_id, true)
 556            } else {
 557                (receiver_id, sender_id, false)
 558            };
 559
 560            let rows_affected = contact::Entity::insert(contact::ActiveModel {
 561                user_id_a: ActiveValue::set(id_a),
 562                user_id_b: ActiveValue::set(id_b),
 563                a_to_b: ActiveValue::set(a_to_b),
 564                accepted: ActiveValue::set(false),
 565                should_notify: ActiveValue::set(true),
 566                ..Default::default()
 567            })
 568            .on_conflict(
 569                OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
 570                    .values([
 571                        (contact::Column::Accepted, true.into()),
 572                        (contact::Column::ShouldNotify, false.into()),
 573                    ])
 574                    .action_and_where(
 575                        contact::Column::Accepted.eq(false).and(
 576                            contact::Column::AToB
 577                                .eq(a_to_b)
 578                                .and(contact::Column::UserIdA.eq(id_b))
 579                                .or(contact::Column::AToB
 580                                    .ne(a_to_b)
 581                                    .and(contact::Column::UserIdA.eq(id_a))),
 582                        ),
 583                    )
 584                    .to_owned(),
 585            )
 586            .exec_without_returning(&*tx)
 587            .await?;
 588
 589            if rows_affected == 1 {
 590                Ok(())
 591            } else {
 592                Err(anyhow!("contact already requested"))?
 593            }
 594        })
 595        .await
 596    }
 597
 598    pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<()> {
 599        self.transaction(|tx| async move {
 600            let (id_a, id_b) = if responder_id < requester_id {
 601                (responder_id, requester_id)
 602            } else {
 603                (requester_id, responder_id)
 604            };
 605
 606            let result = contact::Entity::delete_many()
 607                .filter(
 608                    contact::Column::UserIdA
 609                        .eq(id_a)
 610                        .and(contact::Column::UserIdB.eq(id_b)),
 611                )
 612                .exec(&*tx)
 613                .await?;
 614
 615            if result.rows_affected == 1 {
 616                Ok(())
 617            } else {
 618                Err(anyhow!("no such contact"))?
 619            }
 620        })
 621        .await
 622    }
 623
 624    pub async fn dismiss_contact_notification(
 625        &self,
 626        user_id: UserId,
 627        contact_user_id: UserId,
 628    ) -> Result<()> {
 629        self.transaction(|tx| async move {
 630            let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
 631                (user_id, contact_user_id, true)
 632            } else {
 633                (contact_user_id, user_id, false)
 634            };
 635
 636            let result = contact::Entity::update_many()
 637                .set(contact::ActiveModel {
 638                    should_notify: ActiveValue::set(false),
 639                    ..Default::default()
 640                })
 641                .filter(
 642                    contact::Column::UserIdA
 643                        .eq(id_a)
 644                        .and(contact::Column::UserIdB.eq(id_b))
 645                        .and(
 646                            contact::Column::AToB
 647                                .eq(a_to_b)
 648                                .and(contact::Column::Accepted.eq(true))
 649                                .or(contact::Column::AToB
 650                                    .ne(a_to_b)
 651                                    .and(contact::Column::Accepted.eq(false))),
 652                        ),
 653                )
 654                .exec(&*tx)
 655                .await?;
 656            if result.rows_affected == 0 {
 657                Err(anyhow!("no such contact request"))?
 658            } else {
 659                Ok(())
 660            }
 661        })
 662        .await
 663    }
 664
 665    pub async fn respond_to_contact_request(
 666        &self,
 667        responder_id: UserId,
 668        requester_id: UserId,
 669        accept: bool,
 670    ) -> Result<()> {
 671        self.transaction(|tx| async move {
 672            let (id_a, id_b, a_to_b) = if responder_id < requester_id {
 673                (responder_id, requester_id, false)
 674            } else {
 675                (requester_id, responder_id, true)
 676            };
 677            let rows_affected = if accept {
 678                let result = contact::Entity::update_many()
 679                    .set(contact::ActiveModel {
 680                        accepted: ActiveValue::set(true),
 681                        should_notify: ActiveValue::set(true),
 682                        ..Default::default()
 683                    })
 684                    .filter(
 685                        contact::Column::UserIdA
 686                            .eq(id_a)
 687                            .and(contact::Column::UserIdB.eq(id_b))
 688                            .and(contact::Column::AToB.eq(a_to_b)),
 689                    )
 690                    .exec(&*tx)
 691                    .await?;
 692                result.rows_affected
 693            } else {
 694                let result = contact::Entity::delete_many()
 695                    .filter(
 696                        contact::Column::UserIdA
 697                            .eq(id_a)
 698                            .and(contact::Column::UserIdB.eq(id_b))
 699                            .and(contact::Column::AToB.eq(a_to_b))
 700                            .and(contact::Column::Accepted.eq(false)),
 701                    )
 702                    .exec(&*tx)
 703                    .await?;
 704
 705                result.rows_affected
 706            };
 707
 708            if rows_affected == 1 {
 709                Ok(())
 710            } else {
 711                Err(anyhow!("no such contact request"))?
 712            }
 713        })
 714        .await
 715    }
 716
 717    pub fn fuzzy_like_string(string: &str) -> String {
 718        let mut result = String::with_capacity(string.len() * 2 + 1);
 719        for c in string.chars() {
 720            if c.is_alphanumeric() {
 721                result.push('%');
 722                result.push(c);
 723            }
 724        }
 725        result.push('%');
 726        result
 727    }
 728
 729    pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
 730        self.transaction(|tx| async {
 731            let tx = tx;
 732            let like_string = Self::fuzzy_like_string(name_query);
 733            let query = "
 734                SELECT users.*
 735                FROM users
 736                WHERE github_login ILIKE $1
 737                ORDER BY github_login <-> $2
 738                LIMIT $3
 739            ";
 740
 741            Ok(user::Entity::find()
 742                .from_raw_sql(Statement::from_sql_and_values(
 743                    self.pool.get_database_backend(),
 744                    query.into(),
 745                    vec![like_string.into(), name_query.into(), limit.into()],
 746                ))
 747                .all(&*tx)
 748                .await?)
 749        })
 750        .await
 751    }
 752
 753    // signups
 754
 755    pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
 756        self.transaction(|tx| async move {
 757            signup::Entity::insert(signup::ActiveModel {
 758                email_address: ActiveValue::set(signup.email_address.clone()),
 759                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 760                email_confirmation_sent: ActiveValue::set(false),
 761                platform_mac: ActiveValue::set(signup.platform_mac),
 762                platform_windows: ActiveValue::set(signup.platform_windows),
 763                platform_linux: ActiveValue::set(signup.platform_linux),
 764                platform_unknown: ActiveValue::set(false),
 765                editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
 766                programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
 767                device_id: ActiveValue::set(signup.device_id.clone()),
 768                added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
 769                ..Default::default()
 770            })
 771            .on_conflict(
 772                OnConflict::column(signup::Column::EmailAddress)
 773                    .update_columns([
 774                        signup::Column::PlatformMac,
 775                        signup::Column::PlatformWindows,
 776                        signup::Column::PlatformLinux,
 777                        signup::Column::EditorFeatures,
 778                        signup::Column::ProgrammingLanguages,
 779                        signup::Column::DeviceId,
 780                        signup::Column::AddedToMailingList,
 781                    ])
 782                    .to_owned(),
 783            )
 784            .exec(&*tx)
 785            .await?;
 786            Ok(())
 787        })
 788        .await
 789    }
 790
 791    pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
 792        self.transaction(|tx| async move {
 793            let signup = signup::Entity::find()
 794                .filter(signup::Column::EmailAddress.eq(email_address))
 795                .one(&*tx)
 796                .await?
 797                .ok_or_else(|| {
 798                    anyhow!("signup with email address {} doesn't exist", email_address)
 799                })?;
 800
 801            Ok(signup)
 802        })
 803        .await
 804    }
 805
 806    pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
 807        self.transaction(|tx| async move {
 808            let query = "
 809                SELECT
 810                    COUNT(*) as count,
 811                    COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
 812                    COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
 813                    COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
 814                    COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
 815                FROM (
 816                    SELECT *
 817                    FROM signups
 818                    WHERE
 819                        NOT email_confirmation_sent
 820                ) AS unsent
 821            ";
 822            Ok(
 823                WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
 824                    self.pool.get_database_backend(),
 825                    query.into(),
 826                    vec![],
 827                ))
 828                .one(&*tx)
 829                .await?
 830                .ok_or_else(|| anyhow!("invalid result"))?,
 831            )
 832        })
 833        .await
 834    }
 835
 836    pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
 837        let emails = invites
 838            .iter()
 839            .map(|s| s.email_address.as_str())
 840            .collect::<Vec<_>>();
 841        self.transaction(|tx| async {
 842            let tx = tx;
 843            signup::Entity::update_many()
 844                .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
 845                .set(signup::ActiveModel {
 846                    email_confirmation_sent: ActiveValue::set(true),
 847                    ..Default::default()
 848                })
 849                .exec(&*tx)
 850                .await?;
 851            Ok(())
 852        })
 853        .await
 854    }
 855
 856    pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
 857        self.transaction(|tx| async move {
 858            Ok(signup::Entity::find()
 859                .select_only()
 860                .column(signup::Column::EmailAddress)
 861                .column(signup::Column::EmailConfirmationCode)
 862                .filter(
 863                    signup::Column::EmailConfirmationSent.eq(false).and(
 864                        signup::Column::PlatformMac
 865                            .eq(true)
 866                            .or(signup::Column::PlatformUnknown.eq(true)),
 867                    ),
 868                )
 869                .order_by_asc(signup::Column::CreatedAt)
 870                .limit(count as u64)
 871                .into_model()
 872                .all(&*tx)
 873                .await?)
 874        })
 875        .await
 876    }
 877
 878    // invite codes
 879
 880    pub async fn create_invite_from_code(
 881        &self,
 882        code: &str,
 883        email_address: &str,
 884        device_id: Option<&str>,
 885        added_to_mailing_list: bool,
 886    ) -> Result<Invite> {
 887        self.transaction(|tx| async move {
 888            let existing_user = user::Entity::find()
 889                .filter(user::Column::EmailAddress.eq(email_address))
 890                .one(&*tx)
 891                .await?;
 892
 893            if existing_user.is_some() {
 894                Err(anyhow!("email address is already in use"))?;
 895            }
 896
 897            let inviting_user_with_invites = match user::Entity::find()
 898                .filter(
 899                    user::Column::InviteCode
 900                        .eq(code)
 901                        .and(user::Column::InviteCount.gt(0)),
 902                )
 903                .one(&*tx)
 904                .await?
 905            {
 906                Some(inviting_user) => inviting_user,
 907                None => {
 908                    return Err(Error::Http(
 909                        StatusCode::UNAUTHORIZED,
 910                        "unable to find an invite code with invites remaining".to_string(),
 911                    ))?
 912                }
 913            };
 914            user::Entity::update_many()
 915                .filter(
 916                    user::Column::Id
 917                        .eq(inviting_user_with_invites.id)
 918                        .and(user::Column::InviteCount.gt(0)),
 919                )
 920                .col_expr(
 921                    user::Column::InviteCount,
 922                    Expr::col(user::Column::InviteCount).sub(1),
 923                )
 924                .exec(&*tx)
 925                .await?;
 926
 927            let signup = signup::Entity::insert(signup::ActiveModel {
 928                email_address: ActiveValue::set(email_address.into()),
 929                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 930                email_confirmation_sent: ActiveValue::set(false),
 931                inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
 932                platform_linux: ActiveValue::set(false),
 933                platform_mac: ActiveValue::set(false),
 934                platform_windows: ActiveValue::set(false),
 935                platform_unknown: ActiveValue::set(true),
 936                device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
 937                added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
 938                ..Default::default()
 939            })
 940            .on_conflict(
 941                OnConflict::column(signup::Column::EmailAddress)
 942                    .update_column(signup::Column::InvitingUserId)
 943                    .to_owned(),
 944            )
 945            .exec_with_returning(&*tx)
 946            .await?;
 947
 948            Ok(Invite {
 949                email_address: signup.email_address,
 950                email_confirmation_code: signup.email_confirmation_code,
 951            })
 952        })
 953        .await
 954    }
 955
 956    pub async fn create_user_from_invite(
 957        &self,
 958        invite: &Invite,
 959        user: NewUserParams,
 960    ) -> Result<Option<NewUserResult>> {
 961        self.transaction(|tx| async {
 962            let tx = tx;
 963            let signup = signup::Entity::find()
 964                .filter(
 965                    signup::Column::EmailAddress
 966                        .eq(invite.email_address.as_str())
 967                        .and(
 968                            signup::Column::EmailConfirmationCode
 969                                .eq(invite.email_confirmation_code.as_str()),
 970                        ),
 971                )
 972                .one(&*tx)
 973                .await?
 974                .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
 975
 976            if signup.user_id.is_some() {
 977                return Ok(None);
 978            }
 979
 980            let user = user::Entity::insert(user::ActiveModel {
 981                email_address: ActiveValue::set(Some(invite.email_address.clone())),
 982                github_login: ActiveValue::set(user.github_login.clone()),
 983                github_user_id: ActiveValue::set(Some(user.github_user_id)),
 984                admin: ActiveValue::set(false),
 985                invite_count: ActiveValue::set(user.invite_count),
 986                invite_code: ActiveValue::set(Some(random_invite_code())),
 987                metrics_id: ActiveValue::set(Uuid::new_v4()),
 988                ..Default::default()
 989            })
 990            .on_conflict(
 991                OnConflict::column(user::Column::GithubLogin)
 992                    .update_columns([
 993                        user::Column::EmailAddress,
 994                        user::Column::GithubUserId,
 995                        user::Column::Admin,
 996                    ])
 997                    .to_owned(),
 998            )
 999            .exec_with_returning(&*tx)
1000            .await?;
1001
1002            let mut signup = signup.into_active_model();
1003            signup.user_id = ActiveValue::set(Some(user.id));
1004            let signup = signup.update(&*tx).await?;
1005
1006            if let Some(inviting_user_id) = signup.inviting_user_id {
1007                let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1008                    (inviting_user_id, user.id, true)
1009                } else {
1010                    (user.id, inviting_user_id, false)
1011                };
1012
1013                contact::Entity::insert(contact::ActiveModel {
1014                    user_id_a: ActiveValue::set(user_id_a),
1015                    user_id_b: ActiveValue::set(user_id_b),
1016                    a_to_b: ActiveValue::set(a_to_b),
1017                    should_notify: ActiveValue::set(true),
1018                    accepted: ActiveValue::set(true),
1019                    ..Default::default()
1020                })
1021                .on_conflict(OnConflict::new().do_nothing().to_owned())
1022                .exec_without_returning(&*tx)
1023                .await?;
1024            }
1025
1026            Ok(Some(NewUserResult {
1027                user_id: user.id,
1028                metrics_id: user.metrics_id.to_string(),
1029                inviting_user_id: signup.inviting_user_id,
1030                signup_device_id: signup.device_id,
1031            }))
1032        })
1033        .await
1034    }
1035
1036    pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1037        self.transaction(|tx| async move {
1038            if count > 0 {
1039                user::Entity::update_many()
1040                    .filter(
1041                        user::Column::Id
1042                            .eq(id)
1043                            .and(user::Column::InviteCode.is_null()),
1044                    )
1045                    .set(user::ActiveModel {
1046                        invite_code: ActiveValue::set(Some(random_invite_code())),
1047                        ..Default::default()
1048                    })
1049                    .exec(&*tx)
1050                    .await?;
1051            }
1052
1053            user::Entity::update_many()
1054                .filter(user::Column::Id.eq(id))
1055                .set(user::ActiveModel {
1056                    invite_count: ActiveValue::set(count),
1057                    ..Default::default()
1058                })
1059                .exec(&*tx)
1060                .await?;
1061            Ok(())
1062        })
1063        .await
1064    }
1065
1066    pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1067        self.transaction(|tx| async move {
1068            match user::Entity::find_by_id(id).one(&*tx).await? {
1069                Some(user) if user.invite_code.is_some() => {
1070                    Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1071                }
1072                _ => Ok(None),
1073            }
1074        })
1075        .await
1076    }
1077
1078    pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1079        self.transaction(|tx| async move {
1080            user::Entity::find()
1081                .filter(user::Column::InviteCode.eq(code))
1082                .one(&*tx)
1083                .await?
1084                .ok_or_else(|| {
1085                    Error::Http(
1086                        StatusCode::NOT_FOUND,
1087                        "that invite code does not exist".to_string(),
1088                    )
1089                })
1090        })
1091        .await
1092    }
1093
1094    // rooms
1095
1096    pub async fn incoming_call_for_user(
1097        &self,
1098        user_id: UserId,
1099    ) -> Result<Option<proto::IncomingCall>> {
1100        self.transaction(|tx| async move {
1101            let pending_participant = room_participant::Entity::find()
1102                .filter(
1103                    room_participant::Column::UserId
1104                        .eq(user_id)
1105                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1106                )
1107                .one(&*tx)
1108                .await?;
1109
1110            if let Some(pending_participant) = pending_participant {
1111                let room = self.get_room(pending_participant.room_id, &tx).await?;
1112                Ok(Self::build_incoming_call(&room, user_id))
1113            } else {
1114                Ok(None)
1115            }
1116        })
1117        .await
1118    }
1119
1120    pub async fn create_room(
1121        &self,
1122        user_id: UserId,
1123        connection: ConnectionId,
1124        live_kit_room: &str,
1125    ) -> Result<RoomGuard<proto::Room>> {
1126        self.room_transaction(|tx| async move {
1127            let room = room::ActiveModel {
1128                live_kit_room: ActiveValue::set(live_kit_room.into()),
1129                ..Default::default()
1130            }
1131            .insert(&*tx)
1132            .await?;
1133            let room_id = room.id;
1134
1135            room_participant::ActiveModel {
1136                room_id: ActiveValue::set(room_id),
1137                user_id: ActiveValue::set(user_id),
1138                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1139                answering_connection_server_id: ActiveValue::set(Some(ServerId(
1140                    connection.owner_id as i32,
1141                ))),
1142                answering_connection_lost: ActiveValue::set(false),
1143                calling_user_id: ActiveValue::set(user_id),
1144                calling_connection_id: ActiveValue::set(connection.id as i32),
1145                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1146                    connection.owner_id as i32,
1147                ))),
1148                ..Default::default()
1149            }
1150            .insert(&*tx)
1151            .await?;
1152
1153            let room = self.get_room(room_id, &tx).await?;
1154            Ok((room_id, room))
1155        })
1156        .await
1157    }
1158
1159    pub async fn call(
1160        &self,
1161        room_id: RoomId,
1162        calling_user_id: UserId,
1163        calling_connection: ConnectionId,
1164        called_user_id: UserId,
1165        initial_project_id: Option<ProjectId>,
1166    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1167        self.room_transaction(|tx| async move {
1168            room_participant::ActiveModel {
1169                room_id: ActiveValue::set(room_id),
1170                user_id: ActiveValue::set(called_user_id),
1171                answering_connection_lost: ActiveValue::set(false),
1172                calling_user_id: ActiveValue::set(calling_user_id),
1173                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1174                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1175                    calling_connection.owner_id as i32,
1176                ))),
1177                initial_project_id: ActiveValue::set(initial_project_id),
1178                ..Default::default()
1179            }
1180            .insert(&*tx)
1181            .await?;
1182
1183            let room = self.get_room(room_id, &tx).await?;
1184            let incoming_call = Self::build_incoming_call(&room, called_user_id)
1185                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1186            Ok((room_id, (room, incoming_call)))
1187        })
1188        .await
1189    }
1190
1191    pub async fn call_failed(
1192        &self,
1193        room_id: RoomId,
1194        called_user_id: UserId,
1195    ) -> Result<RoomGuard<proto::Room>> {
1196        self.room_transaction(|tx| async move {
1197            room_participant::Entity::delete_many()
1198                .filter(
1199                    room_participant::Column::RoomId
1200                        .eq(room_id)
1201                        .and(room_participant::Column::UserId.eq(called_user_id)),
1202                )
1203                .exec(&*tx)
1204                .await?;
1205            let room = self.get_room(room_id, &tx).await?;
1206            Ok((room_id, room))
1207        })
1208        .await
1209    }
1210
1211    pub async fn decline_call(
1212        &self,
1213        expected_room_id: Option<RoomId>,
1214        user_id: UserId,
1215    ) -> Result<Option<RoomGuard<proto::Room>>> {
1216        self.optional_room_transaction(|tx| async move {
1217            let mut filter = Condition::all()
1218                .add(room_participant::Column::UserId.eq(user_id))
1219                .add(room_participant::Column::AnsweringConnectionId.is_null());
1220            if let Some(room_id) = expected_room_id {
1221                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1222            }
1223            let participant = room_participant::Entity::find()
1224                .filter(filter)
1225                .one(&*tx)
1226                .await?;
1227
1228            let participant = if let Some(participant) = participant {
1229                participant
1230            } else if expected_room_id.is_some() {
1231                return Err(anyhow!("could not find call to decline"))?;
1232            } else {
1233                return Ok(None);
1234            };
1235
1236            let room_id = participant.room_id;
1237            room_participant::Entity::delete(participant.into_active_model())
1238                .exec(&*tx)
1239                .await?;
1240
1241            let room = self.get_room(room_id, &tx).await?;
1242            Ok(Some((room_id, room)))
1243        })
1244        .await
1245    }
1246
1247    pub async fn cancel_call(
1248        &self,
1249        room_id: RoomId,
1250        calling_connection: ConnectionId,
1251        called_user_id: UserId,
1252    ) -> Result<RoomGuard<proto::Room>> {
1253        self.room_transaction(|tx| async move {
1254            let participant = room_participant::Entity::find()
1255                .filter(
1256                    Condition::all()
1257                        .add(room_participant::Column::UserId.eq(called_user_id))
1258                        .add(room_participant::Column::RoomId.eq(room_id))
1259                        .add(
1260                            room_participant::Column::CallingConnectionId
1261                                .eq(calling_connection.id as i32),
1262                        )
1263                        .add(
1264                            room_participant::Column::CallingConnectionServerId
1265                                .eq(calling_connection.owner_id as i32),
1266                        )
1267                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1268                )
1269                .one(&*tx)
1270                .await?
1271                .ok_or_else(|| anyhow!("no call to cancel"))?;
1272            let room_id = participant.room_id;
1273
1274            room_participant::Entity::delete(participant.into_active_model())
1275                .exec(&*tx)
1276                .await?;
1277
1278            let room = self.get_room(room_id, &tx).await?;
1279            Ok((room_id, room))
1280        })
1281        .await
1282    }
1283
1284    pub async fn join_room(
1285        &self,
1286        room_id: RoomId,
1287        user_id: UserId,
1288        connection: ConnectionId,
1289    ) -> Result<RoomGuard<proto::Room>> {
1290        self.room_transaction(|tx| async move {
1291            let result = room_participant::Entity::update_many()
1292                .filter(
1293                    Condition::all()
1294                        .add(room_participant::Column::RoomId.eq(room_id))
1295                        .add(room_participant::Column::UserId.eq(user_id))
1296                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1297                )
1298                .set(room_participant::ActiveModel {
1299                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1300                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1301                        connection.owner_id as i32,
1302                    ))),
1303                    answering_connection_lost: ActiveValue::set(false),
1304                    ..Default::default()
1305                })
1306                .exec(&*tx)
1307                .await?;
1308            if result.rows_affected == 0 {
1309                Err(anyhow!("room does not exist or was already joined"))?
1310            } else {
1311                let room = self.get_room(room_id, &tx).await?;
1312                Ok((room_id, room))
1313            }
1314        })
1315        .await
1316    }
1317
1318    pub async fn rejoin_room(
1319        &self,
1320        rejoin_room: proto::RejoinRoom,
1321        user_id: UserId,
1322        connection: ConnectionId,
1323    ) -> Result<RoomGuard<RejoinedRoom>> {
1324        self.room_transaction(|tx| async {
1325            let tx = tx;
1326            let room_id = RoomId::from_proto(rejoin_room.id);
1327            let participant_update = room_participant::Entity::update_many()
1328                .filter(
1329                    Condition::all()
1330                        .add(room_participant::Column::RoomId.eq(room_id))
1331                        .add(room_participant::Column::UserId.eq(user_id))
1332                        .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1333                        .add(
1334                            Condition::any()
1335                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1336                                .add(
1337                                    room_participant::Column::AnsweringConnectionServerId
1338                                        .ne(connection.owner_id as i32),
1339                                ),
1340                        ),
1341                )
1342                .set(room_participant::ActiveModel {
1343                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1344                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1345                        connection.owner_id as i32,
1346                    ))),
1347                    answering_connection_lost: ActiveValue::set(false),
1348                    ..Default::default()
1349                })
1350                .exec(&*tx)
1351                .await?;
1352            if participant_update.rows_affected == 0 {
1353                return Err(anyhow!("room does not exist or was already joined"))?;
1354            }
1355
1356            let mut reshared_projects = Vec::new();
1357            for reshared_project in &rejoin_room.reshared_projects {
1358                let project_id = ProjectId::from_proto(reshared_project.project_id);
1359                let project = project::Entity::find_by_id(project_id)
1360                    .one(&*tx)
1361                    .await?
1362                    .ok_or_else(|| anyhow!("project does not exist"))?;
1363                if project.host_user_id != user_id {
1364                    return Err(anyhow!("no such project"))?;
1365                }
1366
1367                let mut collaborators = project
1368                    .find_related(project_collaborator::Entity)
1369                    .all(&*tx)
1370                    .await?;
1371                let host_ix = collaborators
1372                    .iter()
1373                    .position(|collaborator| {
1374                        collaborator.user_id == user_id && collaborator.is_host
1375                    })
1376                    .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1377                let host = collaborators.swap_remove(host_ix);
1378                let old_connection_id = host.connection();
1379
1380                project::Entity::update(project::ActiveModel {
1381                    host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1382                    host_connection_server_id: ActiveValue::set(Some(ServerId(
1383                        connection.owner_id as i32,
1384                    ))),
1385                    ..project.into_active_model()
1386                })
1387                .exec(&*tx)
1388                .await?;
1389                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1390                    connection_id: ActiveValue::set(connection.id as i32),
1391                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1392                    ..host.into_active_model()
1393                })
1394                .exec(&*tx)
1395                .await?;
1396
1397                self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1398                    .await?;
1399
1400                reshared_projects.push(ResharedProject {
1401                    id: project_id,
1402                    old_connection_id,
1403                    collaborators: collaborators
1404                        .iter()
1405                        .map(|collaborator| ProjectCollaborator {
1406                            connection_id: collaborator.connection(),
1407                            user_id: collaborator.user_id,
1408                            replica_id: collaborator.replica_id,
1409                            is_host: collaborator.is_host,
1410                        })
1411                        .collect(),
1412                    worktrees: reshared_project.worktrees.clone(),
1413                });
1414            }
1415
1416            project::Entity::delete_many()
1417                .filter(
1418                    Condition::all()
1419                        .add(project::Column::RoomId.eq(room_id))
1420                        .add(project::Column::HostUserId.eq(user_id))
1421                        .add(
1422                            project::Column::Id
1423                                .is_not_in(reshared_projects.iter().map(|project| project.id)),
1424                        ),
1425                )
1426                .exec(&*tx)
1427                .await?;
1428
1429            let mut rejoined_projects = Vec::new();
1430            for rejoined_project in &rejoin_room.rejoined_projects {
1431                let project_id = ProjectId::from_proto(rejoined_project.id);
1432                let Some(project) = project::Entity::find_by_id(project_id)
1433                    .one(&*tx)
1434                    .await? else { continue };
1435
1436                let mut worktrees = Vec::new();
1437                let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1438                for db_worktree in db_worktrees {
1439                    let mut worktree = RejoinedWorktree {
1440                        id: db_worktree.id as u64,
1441                        abs_path: db_worktree.abs_path,
1442                        root_name: db_worktree.root_name,
1443                        visible: db_worktree.visible,
1444                        updated_entries: Default::default(),
1445                        removed_entries: Default::default(),
1446                        diagnostic_summaries: Default::default(),
1447                        scan_id: db_worktree.scan_id as u64,
1448                        completed_scan_id: db_worktree.completed_scan_id as u64,
1449                    };
1450
1451                    let rejoined_worktree = rejoined_project
1452                        .worktrees
1453                        .iter()
1454                        .find(|worktree| worktree.id == db_worktree.id as u64);
1455                    let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1456                        worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1457                    } else {
1458                        worktree_entry::Column::IsDeleted.eq(false)
1459                    };
1460
1461                    let mut db_entries = worktree_entry::Entity::find()
1462                        .filter(
1463                            Condition::all()
1464                                .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1465                                .add(entry_filter),
1466                        )
1467                        .stream(&*tx)
1468                        .await?;
1469
1470                    while let Some(db_entry) = db_entries.next().await {
1471                        let db_entry = db_entry?;
1472                        if db_entry.is_deleted {
1473                            worktree.removed_entries.push(db_entry.id as u64);
1474                        } else {
1475                            worktree.updated_entries.push(proto::Entry {
1476                                id: db_entry.id as u64,
1477                                is_dir: db_entry.is_dir,
1478                                path: db_entry.path,
1479                                inode: db_entry.inode as u64,
1480                                mtime: Some(proto::Timestamp {
1481                                    seconds: db_entry.mtime_seconds as u64,
1482                                    nanos: db_entry.mtime_nanos as u32,
1483                                }),
1484                                is_symlink: db_entry.is_symlink,
1485                                is_ignored: db_entry.is_ignored,
1486                            });
1487                        }
1488                    }
1489
1490                    worktrees.push(worktree);
1491                }
1492
1493                let language_servers = project
1494                    .find_related(language_server::Entity)
1495                    .all(&*tx)
1496                    .await?
1497                    .into_iter()
1498                    .map(|language_server| proto::LanguageServer {
1499                        id: language_server.id as u64,
1500                        name: language_server.name,
1501                    })
1502                    .collect::<Vec<_>>();
1503
1504                let mut collaborators = project
1505                    .find_related(project_collaborator::Entity)
1506                    .all(&*tx)
1507                    .await?;
1508                let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1509                    .iter()
1510                    .position(|collaborator| collaborator.user_id == user_id)
1511                {
1512                    collaborators.swap_remove(self_collaborator_ix)
1513                } else {
1514                    continue;
1515                };
1516                let old_connection_id = self_collaborator.connection();
1517                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1518                    connection_id: ActiveValue::set(connection.id as i32),
1519                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1520                    ..self_collaborator.into_active_model()
1521                })
1522                .exec(&*tx)
1523                .await?;
1524
1525                let collaborators = collaborators
1526                    .into_iter()
1527                    .map(|collaborator| ProjectCollaborator {
1528                        connection_id: collaborator.connection(),
1529                        user_id: collaborator.user_id,
1530                        replica_id: collaborator.replica_id,
1531                        is_host: collaborator.is_host,
1532                    })
1533                    .collect::<Vec<_>>();
1534
1535                rejoined_projects.push(RejoinedProject {
1536                    id: project_id,
1537                    old_connection_id,
1538                    collaborators,
1539                    worktrees,
1540                    language_servers,
1541                });
1542            }
1543
1544            let room = self.get_room(room_id, &tx).await?;
1545            Ok((
1546                room_id,
1547                RejoinedRoom {
1548                    room,
1549                    rejoined_projects,
1550                    reshared_projects,
1551                },
1552            ))
1553        })
1554        .await
1555    }
1556
1557    pub async fn leave_room(
1558        &self,
1559        connection: ConnectionId,
1560    ) -> Result<Option<RoomGuard<LeftRoom>>> {
1561        self.optional_room_transaction(|tx| async move {
1562            let leaving_participant = room_participant::Entity::find()
1563                .filter(
1564                    Condition::all()
1565                        .add(
1566                            room_participant::Column::AnsweringConnectionId
1567                                .eq(connection.id as i32),
1568                        )
1569                        .add(
1570                            room_participant::Column::AnsweringConnectionServerId
1571                                .eq(connection.owner_id as i32),
1572                        ),
1573                )
1574                .one(&*tx)
1575                .await?;
1576
1577            if let Some(leaving_participant) = leaving_participant {
1578                // Leave room.
1579                let room_id = leaving_participant.room_id;
1580                room_participant::Entity::delete_by_id(leaving_participant.id)
1581                    .exec(&*tx)
1582                    .await?;
1583
1584                // Cancel pending calls initiated by the leaving user.
1585                let called_participants = room_participant::Entity::find()
1586                    .filter(
1587                        Condition::all()
1588                            .add(
1589                                room_participant::Column::CallingConnectionId
1590                                    .eq(connection.id as i32),
1591                            )
1592                            .add(
1593                                room_participant::Column::CallingConnectionServerId
1594                                    .eq(connection.owner_id as i32),
1595                            )
1596                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
1597                    )
1598                    .all(&*tx)
1599                    .await?;
1600                room_participant::Entity::delete_many()
1601                    .filter(
1602                        room_participant::Column::Id
1603                            .is_in(called_participants.iter().map(|participant| participant.id)),
1604                    )
1605                    .exec(&*tx)
1606                    .await?;
1607                let canceled_calls_to_user_ids = called_participants
1608                    .into_iter()
1609                    .map(|participant| participant.user_id)
1610                    .collect();
1611
1612                // Detect left projects.
1613                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1614                enum QueryProjectIds {
1615                    ProjectId,
1616                }
1617                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1618                    .select_only()
1619                    .column_as(
1620                        project_collaborator::Column::ProjectId,
1621                        QueryProjectIds::ProjectId,
1622                    )
1623                    .filter(
1624                        Condition::all()
1625                            .add(
1626                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1627                            )
1628                            .add(
1629                                project_collaborator::Column::ConnectionServerId
1630                                    .eq(connection.owner_id as i32),
1631                            ),
1632                    )
1633                    .into_values::<_, QueryProjectIds>()
1634                    .all(&*tx)
1635                    .await?;
1636                let mut left_projects = HashMap::default();
1637                let mut collaborators = project_collaborator::Entity::find()
1638                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1639                    .stream(&*tx)
1640                    .await?;
1641                while let Some(collaborator) = collaborators.next().await {
1642                    let collaborator = collaborator?;
1643                    let left_project =
1644                        left_projects
1645                            .entry(collaborator.project_id)
1646                            .or_insert(LeftProject {
1647                                id: collaborator.project_id,
1648                                host_user_id: Default::default(),
1649                                connection_ids: Default::default(),
1650                                host_connection_id: Default::default(),
1651                            });
1652
1653                    let collaborator_connection_id = collaborator.connection();
1654                    if collaborator_connection_id != connection {
1655                        left_project.connection_ids.push(collaborator_connection_id);
1656                    }
1657
1658                    if collaborator.is_host {
1659                        left_project.host_user_id = collaborator.user_id;
1660                        left_project.host_connection_id = collaborator_connection_id;
1661                    }
1662                }
1663                drop(collaborators);
1664
1665                // Leave projects.
1666                project_collaborator::Entity::delete_many()
1667                    .filter(
1668                        Condition::all()
1669                            .add(
1670                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1671                            )
1672                            .add(
1673                                project_collaborator::Column::ConnectionServerId
1674                                    .eq(connection.owner_id as i32),
1675                            ),
1676                    )
1677                    .exec(&*tx)
1678                    .await?;
1679
1680                // Unshare projects.
1681                project::Entity::delete_many()
1682                    .filter(
1683                        Condition::all()
1684                            .add(project::Column::RoomId.eq(room_id))
1685                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
1686                            .add(
1687                                project::Column::HostConnectionServerId
1688                                    .eq(connection.owner_id as i32),
1689                            ),
1690                    )
1691                    .exec(&*tx)
1692                    .await?;
1693
1694                let room = self.get_room(room_id, &tx).await?;
1695                if room.participants.is_empty() {
1696                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
1697                }
1698
1699                let left_room = LeftRoom {
1700                    room,
1701                    left_projects,
1702                    canceled_calls_to_user_ids,
1703                };
1704
1705                if left_room.room.participants.is_empty() {
1706                    self.rooms.remove(&room_id);
1707                }
1708
1709                Ok(Some((room_id, left_room)))
1710            } else {
1711                Ok(None)
1712            }
1713        })
1714        .await
1715    }
1716
1717    pub async fn update_room_participant_location(
1718        &self,
1719        room_id: RoomId,
1720        connection: ConnectionId,
1721        location: proto::ParticipantLocation,
1722    ) -> Result<RoomGuard<proto::Room>> {
1723        self.room_transaction(|tx| async {
1724            let tx = tx;
1725            let location_kind;
1726            let location_project_id;
1727            match location
1728                .variant
1729                .as_ref()
1730                .ok_or_else(|| anyhow!("invalid location"))?
1731            {
1732                proto::participant_location::Variant::SharedProject(project) => {
1733                    location_kind = 0;
1734                    location_project_id = Some(ProjectId::from_proto(project.id));
1735                }
1736                proto::participant_location::Variant::UnsharedProject(_) => {
1737                    location_kind = 1;
1738                    location_project_id = None;
1739                }
1740                proto::participant_location::Variant::External(_) => {
1741                    location_kind = 2;
1742                    location_project_id = None;
1743                }
1744            }
1745
1746            let result = room_participant::Entity::update_many()
1747                .filter(
1748                    Condition::all()
1749                        .add(room_participant::Column::RoomId.eq(room_id))
1750                        .add(
1751                            room_participant::Column::AnsweringConnectionId
1752                                .eq(connection.id as i32),
1753                        )
1754                        .add(
1755                            room_participant::Column::AnsweringConnectionServerId
1756                                .eq(connection.owner_id as i32),
1757                        ),
1758                )
1759                .set(room_participant::ActiveModel {
1760                    location_kind: ActiveValue::set(Some(location_kind)),
1761                    location_project_id: ActiveValue::set(location_project_id),
1762                    ..Default::default()
1763                })
1764                .exec(&*tx)
1765                .await?;
1766
1767            if result.rows_affected == 1 {
1768                let room = self.get_room(room_id, &tx).await?;
1769                Ok((room_id, room))
1770            } else {
1771                Err(anyhow!("could not update room participant location"))?
1772            }
1773        })
1774        .await
1775    }
1776
1777    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1778        self.transaction(|tx| async move {
1779            let participant = room_participant::Entity::find()
1780                .filter(
1781                    Condition::all()
1782                        .add(
1783                            room_participant::Column::AnsweringConnectionId
1784                                .eq(connection.id as i32),
1785                        )
1786                        .add(
1787                            room_participant::Column::AnsweringConnectionServerId
1788                                .eq(connection.owner_id as i32),
1789                        ),
1790                )
1791                .one(&*tx)
1792                .await?
1793                .ok_or_else(|| anyhow!("not a participant in any room"))?;
1794
1795            room_participant::Entity::update(room_participant::ActiveModel {
1796                answering_connection_lost: ActiveValue::set(true),
1797                ..participant.into_active_model()
1798            })
1799            .exec(&*tx)
1800            .await?;
1801
1802            Ok(())
1803        })
1804        .await
1805    }
1806
1807    fn build_incoming_call(
1808        room: &proto::Room,
1809        called_user_id: UserId,
1810    ) -> Option<proto::IncomingCall> {
1811        let pending_participant = room
1812            .pending_participants
1813            .iter()
1814            .find(|participant| participant.user_id == called_user_id.to_proto())?;
1815
1816        Some(proto::IncomingCall {
1817            room_id: room.id,
1818            calling_user_id: pending_participant.calling_user_id,
1819            participant_user_ids: room
1820                .participants
1821                .iter()
1822                .map(|participant| participant.user_id)
1823                .collect(),
1824            initial_project: room.participants.iter().find_map(|participant| {
1825                let initial_project_id = pending_participant.initial_project_id?;
1826                participant
1827                    .projects
1828                    .iter()
1829                    .find(|project| project.id == initial_project_id)
1830                    .cloned()
1831            }),
1832        })
1833    }
1834
1835    async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1836        let db_room = room::Entity::find_by_id(room_id)
1837            .one(tx)
1838            .await?
1839            .ok_or_else(|| anyhow!("could not find room"))?;
1840
1841        let mut db_participants = db_room
1842            .find_related(room_participant::Entity)
1843            .stream(tx)
1844            .await?;
1845        let mut participants = HashMap::default();
1846        let mut pending_participants = Vec::new();
1847        while let Some(db_participant) = db_participants.next().await {
1848            let db_participant = db_participant?;
1849            if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
1850                .answering_connection_id
1851                .zip(db_participant.answering_connection_server_id)
1852            {
1853                let location = match (
1854                    db_participant.location_kind,
1855                    db_participant.location_project_id,
1856                ) {
1857                    (Some(0), Some(project_id)) => {
1858                        Some(proto::participant_location::Variant::SharedProject(
1859                            proto::participant_location::SharedProject {
1860                                id: project_id.to_proto(),
1861                            },
1862                        ))
1863                    }
1864                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1865                        Default::default(),
1866                    )),
1867                    _ => Some(proto::participant_location::Variant::External(
1868                        Default::default(),
1869                    )),
1870                };
1871
1872                let answering_connection = ConnectionId {
1873                    owner_id: answering_connection_server_id.0 as u32,
1874                    id: answering_connection_id as u32,
1875                };
1876                participants.insert(
1877                    answering_connection,
1878                    proto::Participant {
1879                        user_id: db_participant.user_id.to_proto(),
1880                        peer_id: Some(answering_connection.into()),
1881                        projects: Default::default(),
1882                        location: Some(proto::ParticipantLocation { variant: location }),
1883                    },
1884                );
1885            } else {
1886                pending_participants.push(proto::PendingParticipant {
1887                    user_id: db_participant.user_id.to_proto(),
1888                    calling_user_id: db_participant.calling_user_id.to_proto(),
1889                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1890                });
1891            }
1892        }
1893        drop(db_participants);
1894
1895        let mut db_projects = db_room
1896            .find_related(project::Entity)
1897            .find_with_related(worktree::Entity)
1898            .stream(tx)
1899            .await?;
1900
1901        while let Some(row) = db_projects.next().await {
1902            let (db_project, db_worktree) = row?;
1903            let host_connection = db_project.host_connection()?;
1904            if let Some(participant) = participants.get_mut(&host_connection) {
1905                let project = if let Some(project) = participant
1906                    .projects
1907                    .iter_mut()
1908                    .find(|project| project.id == db_project.id.to_proto())
1909                {
1910                    project
1911                } else {
1912                    participant.projects.push(proto::ParticipantProject {
1913                        id: db_project.id.to_proto(),
1914                        worktree_root_names: Default::default(),
1915                    });
1916                    participant.projects.last_mut().unwrap()
1917                };
1918
1919                if let Some(db_worktree) = db_worktree {
1920                    project.worktree_root_names.push(db_worktree.root_name);
1921                }
1922            }
1923        }
1924
1925        Ok(proto::Room {
1926            id: db_room.id.to_proto(),
1927            live_kit_room: db_room.live_kit_room,
1928            participants: participants.into_values().collect(),
1929            pending_participants,
1930        })
1931    }
1932
1933    // projects
1934
1935    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
1936        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1937        enum QueryAs {
1938            Count,
1939        }
1940
1941        self.transaction(|tx| async move {
1942            Ok(project::Entity::find()
1943                .select_only()
1944                .column_as(project::Column::Id.count(), QueryAs::Count)
1945                .inner_join(user::Entity)
1946                .filter(user::Column::Admin.eq(false))
1947                .into_values::<_, QueryAs>()
1948                .one(&*tx)
1949                .await?
1950                .unwrap_or(0i64) as usize)
1951        })
1952        .await
1953    }
1954
1955    pub async fn share_project(
1956        &self,
1957        room_id: RoomId,
1958        connection: ConnectionId,
1959        worktrees: &[proto::WorktreeMetadata],
1960    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
1961        self.room_transaction(|tx| async move {
1962            let participant = room_participant::Entity::find()
1963                .filter(
1964                    Condition::all()
1965                        .add(
1966                            room_participant::Column::AnsweringConnectionId
1967                                .eq(connection.id as i32),
1968                        )
1969                        .add(
1970                            room_participant::Column::AnsweringConnectionServerId
1971                                .eq(connection.owner_id as i32),
1972                        ),
1973                )
1974                .one(&*tx)
1975                .await?
1976                .ok_or_else(|| anyhow!("could not find participant"))?;
1977            if participant.room_id != room_id {
1978                return Err(anyhow!("shared project on unexpected room"))?;
1979            }
1980
1981            let project = project::ActiveModel {
1982                room_id: ActiveValue::set(participant.room_id),
1983                host_user_id: ActiveValue::set(participant.user_id),
1984                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1985                host_connection_server_id: ActiveValue::set(Some(ServerId(
1986                    connection.owner_id as i32,
1987                ))),
1988                ..Default::default()
1989            }
1990            .insert(&*tx)
1991            .await?;
1992
1993            if !worktrees.is_empty() {
1994                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
1995                    worktree::ActiveModel {
1996                        id: ActiveValue::set(worktree.id as i64),
1997                        project_id: ActiveValue::set(project.id),
1998                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
1999                        root_name: ActiveValue::set(worktree.root_name.clone()),
2000                        visible: ActiveValue::set(worktree.visible),
2001                        scan_id: ActiveValue::set(0),
2002                        completed_scan_id: ActiveValue::set(0),
2003                    }
2004                }))
2005                .exec(&*tx)
2006                .await?;
2007            }
2008
2009            project_collaborator::ActiveModel {
2010                project_id: ActiveValue::set(project.id),
2011                connection_id: ActiveValue::set(connection.id as i32),
2012                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2013                user_id: ActiveValue::set(participant.user_id),
2014                replica_id: ActiveValue::set(ReplicaId(0)),
2015                is_host: ActiveValue::set(true),
2016                ..Default::default()
2017            }
2018            .insert(&*tx)
2019            .await?;
2020
2021            let room = self.get_room(room_id, &tx).await?;
2022            Ok((room_id, (project.id, room)))
2023        })
2024        .await
2025    }
2026
2027    pub async fn unshare_project(
2028        &self,
2029        project_id: ProjectId,
2030        connection: ConnectionId,
2031    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2032        self.room_transaction(|tx| async move {
2033            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2034
2035            let project = project::Entity::find_by_id(project_id)
2036                .one(&*tx)
2037                .await?
2038                .ok_or_else(|| anyhow!("project not found"))?;
2039            if project.host_connection()? == connection {
2040                let room_id = project.room_id;
2041                project::Entity::delete(project.into_active_model())
2042                    .exec(&*tx)
2043                    .await?;
2044                let room = self.get_room(room_id, &tx).await?;
2045                Ok((room_id, (room, guest_connection_ids)))
2046            } else {
2047                Err(anyhow!("cannot unshare a project hosted by another user"))?
2048            }
2049        })
2050        .await
2051    }
2052
2053    pub async fn update_project(
2054        &self,
2055        project_id: ProjectId,
2056        connection: ConnectionId,
2057        worktrees: &[proto::WorktreeMetadata],
2058    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2059        self.room_transaction(|tx| async move {
2060            let project = project::Entity::find_by_id(project_id)
2061                .filter(
2062                    Condition::all()
2063                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2064                        .add(
2065                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2066                        ),
2067                )
2068                .one(&*tx)
2069                .await?
2070                .ok_or_else(|| anyhow!("no such project"))?;
2071
2072            self.update_project_worktrees(project.id, worktrees, &tx)
2073                .await?;
2074
2075            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2076            let room = self.get_room(project.room_id, &tx).await?;
2077            Ok((project.room_id, (room, guest_connection_ids)))
2078        })
2079        .await
2080    }
2081
2082    async fn update_project_worktrees(
2083        &self,
2084        project_id: ProjectId,
2085        worktrees: &[proto::WorktreeMetadata],
2086        tx: &DatabaseTransaction,
2087    ) -> Result<()> {
2088        if !worktrees.is_empty() {
2089            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2090                id: ActiveValue::set(worktree.id as i64),
2091                project_id: ActiveValue::set(project_id),
2092                abs_path: ActiveValue::set(worktree.abs_path.clone()),
2093                root_name: ActiveValue::set(worktree.root_name.clone()),
2094                visible: ActiveValue::set(worktree.visible),
2095                scan_id: ActiveValue::set(0),
2096                completed_scan_id: ActiveValue::set(0),
2097            }))
2098            .on_conflict(
2099                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2100                    .update_column(worktree::Column::RootName)
2101                    .to_owned(),
2102            )
2103            .exec(&*tx)
2104            .await?;
2105        }
2106
2107        worktree::Entity::delete_many()
2108            .filter(worktree::Column::ProjectId.eq(project_id).and(
2109                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2110            ))
2111            .exec(&*tx)
2112            .await?;
2113
2114        Ok(())
2115    }
2116
2117    pub async fn update_worktree(
2118        &self,
2119        update: &proto::UpdateWorktree,
2120        connection: ConnectionId,
2121    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2122        self.room_transaction(|tx| async move {
2123            let project_id = ProjectId::from_proto(update.project_id);
2124            let worktree_id = update.worktree_id as i64;
2125
2126            // Ensure the update comes from the host.
2127            let project = project::Entity::find_by_id(project_id)
2128                .filter(
2129                    Condition::all()
2130                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2131                        .add(
2132                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2133                        ),
2134                )
2135                .one(&*tx)
2136                .await?
2137                .ok_or_else(|| anyhow!("no such project"))?;
2138            let room_id = project.room_id;
2139
2140            // Update metadata.
2141            worktree::Entity::update(worktree::ActiveModel {
2142                id: ActiveValue::set(worktree_id),
2143                project_id: ActiveValue::set(project_id),
2144                root_name: ActiveValue::set(update.root_name.clone()),
2145                scan_id: ActiveValue::set(update.scan_id as i64),
2146                completed_scan_id: if update.is_last_update {
2147                    ActiveValue::set(update.scan_id as i64)
2148                } else {
2149                    ActiveValue::default()
2150                },
2151                abs_path: ActiveValue::set(update.abs_path.clone()),
2152                ..Default::default()
2153            })
2154            .exec(&*tx)
2155            .await?;
2156
2157            if !update.updated_entries.is_empty() {
2158                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2159                    let mtime = entry.mtime.clone().unwrap_or_default();
2160                    worktree_entry::ActiveModel {
2161                        project_id: ActiveValue::set(project_id),
2162                        worktree_id: ActiveValue::set(worktree_id),
2163                        id: ActiveValue::set(entry.id as i64),
2164                        is_dir: ActiveValue::set(entry.is_dir),
2165                        path: ActiveValue::set(entry.path.clone()),
2166                        inode: ActiveValue::set(entry.inode as i64),
2167                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2168                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2169                        is_symlink: ActiveValue::set(entry.is_symlink),
2170                        is_ignored: ActiveValue::set(entry.is_ignored),
2171                        is_deleted: ActiveValue::set(false),
2172                        scan_id: ActiveValue::set(update.scan_id as i64),
2173                    }
2174                }))
2175                .on_conflict(
2176                    OnConflict::columns([
2177                        worktree_entry::Column::ProjectId,
2178                        worktree_entry::Column::WorktreeId,
2179                        worktree_entry::Column::Id,
2180                    ])
2181                    .update_columns([
2182                        worktree_entry::Column::IsDir,
2183                        worktree_entry::Column::Path,
2184                        worktree_entry::Column::Inode,
2185                        worktree_entry::Column::MtimeSeconds,
2186                        worktree_entry::Column::MtimeNanos,
2187                        worktree_entry::Column::IsSymlink,
2188                        worktree_entry::Column::IsIgnored,
2189                        worktree_entry::Column::ScanId,
2190                    ])
2191                    .to_owned(),
2192                )
2193                .exec(&*tx)
2194                .await?;
2195            }
2196
2197            if !update.removed_entries.is_empty() {
2198                worktree_entry::Entity::update_many()
2199                    .filter(
2200                        worktree_entry::Column::ProjectId
2201                            .eq(project_id)
2202                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2203                            .and(
2204                                worktree_entry::Column::Id
2205                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2206                            ),
2207                    )
2208                    .set(worktree_entry::ActiveModel {
2209                        is_deleted: ActiveValue::Set(true),
2210                        scan_id: ActiveValue::Set(update.scan_id as i64),
2211                        ..Default::default()
2212                    })
2213                    .exec(&*tx)
2214                    .await?;
2215            }
2216
2217            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2218            Ok((room_id, connection_ids))
2219        })
2220        .await
2221    }
2222
2223    pub async fn update_diagnostic_summary(
2224        &self,
2225        update: &proto::UpdateDiagnosticSummary,
2226        connection: ConnectionId,
2227    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2228        self.room_transaction(|tx| async move {
2229            let project_id = ProjectId::from_proto(update.project_id);
2230            let worktree_id = update.worktree_id as i64;
2231            let summary = update
2232                .summary
2233                .as_ref()
2234                .ok_or_else(|| anyhow!("invalid summary"))?;
2235
2236            // Ensure the update comes from the host.
2237            let project = project::Entity::find_by_id(project_id)
2238                .one(&*tx)
2239                .await?
2240                .ok_or_else(|| anyhow!("no such project"))?;
2241            if project.host_connection()? != connection {
2242                return Err(anyhow!("can't update a project hosted by someone else"))?;
2243            }
2244
2245            // Update summary.
2246            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2247                project_id: ActiveValue::set(project_id),
2248                worktree_id: ActiveValue::set(worktree_id),
2249                path: ActiveValue::set(summary.path.clone()),
2250                language_server_id: ActiveValue::set(summary.language_server_id as i64),
2251                error_count: ActiveValue::set(summary.error_count as i32),
2252                warning_count: ActiveValue::set(summary.warning_count as i32),
2253                ..Default::default()
2254            })
2255            .on_conflict(
2256                OnConflict::columns([
2257                    worktree_diagnostic_summary::Column::ProjectId,
2258                    worktree_diagnostic_summary::Column::WorktreeId,
2259                    worktree_diagnostic_summary::Column::Path,
2260                ])
2261                .update_columns([
2262                    worktree_diagnostic_summary::Column::LanguageServerId,
2263                    worktree_diagnostic_summary::Column::ErrorCount,
2264                    worktree_diagnostic_summary::Column::WarningCount,
2265                ])
2266                .to_owned(),
2267            )
2268            .exec(&*tx)
2269            .await?;
2270
2271            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2272            Ok((project.room_id, connection_ids))
2273        })
2274        .await
2275    }
2276
2277    pub async fn start_language_server(
2278        &self,
2279        update: &proto::StartLanguageServer,
2280        connection: ConnectionId,
2281    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2282        self.room_transaction(|tx| async move {
2283            let project_id = ProjectId::from_proto(update.project_id);
2284            let server = update
2285                .server
2286                .as_ref()
2287                .ok_or_else(|| anyhow!("invalid language server"))?;
2288
2289            // Ensure the update comes from the host.
2290            let project = project::Entity::find_by_id(project_id)
2291                .one(&*tx)
2292                .await?
2293                .ok_or_else(|| anyhow!("no such project"))?;
2294            if project.host_connection()? != connection {
2295                return Err(anyhow!("can't update a project hosted by someone else"))?;
2296            }
2297
2298            // Add the newly-started language server.
2299            language_server::Entity::insert(language_server::ActiveModel {
2300                project_id: ActiveValue::set(project_id),
2301                id: ActiveValue::set(server.id as i64),
2302                name: ActiveValue::set(server.name.clone()),
2303                ..Default::default()
2304            })
2305            .on_conflict(
2306                OnConflict::columns([
2307                    language_server::Column::ProjectId,
2308                    language_server::Column::Id,
2309                ])
2310                .update_column(language_server::Column::Name)
2311                .to_owned(),
2312            )
2313            .exec(&*tx)
2314            .await?;
2315
2316            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2317            Ok((project.room_id, connection_ids))
2318        })
2319        .await
2320    }
2321
2322    pub async fn join_project(
2323        &self,
2324        project_id: ProjectId,
2325        connection: ConnectionId,
2326    ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2327        self.room_transaction(|tx| async move {
2328            let participant = room_participant::Entity::find()
2329                .filter(
2330                    Condition::all()
2331                        .add(
2332                            room_participant::Column::AnsweringConnectionId
2333                                .eq(connection.id as i32),
2334                        )
2335                        .add(
2336                            room_participant::Column::AnsweringConnectionServerId
2337                                .eq(connection.owner_id as i32),
2338                        ),
2339                )
2340                .one(&*tx)
2341                .await?
2342                .ok_or_else(|| anyhow!("must join a room first"))?;
2343
2344            let project = project::Entity::find_by_id(project_id)
2345                .one(&*tx)
2346                .await?
2347                .ok_or_else(|| anyhow!("no such project"))?;
2348            if project.room_id != participant.room_id {
2349                return Err(anyhow!("no such project"))?;
2350            }
2351
2352            let mut collaborators = project
2353                .find_related(project_collaborator::Entity)
2354                .all(&*tx)
2355                .await?;
2356            let replica_ids = collaborators
2357                .iter()
2358                .map(|c| c.replica_id)
2359                .collect::<HashSet<_>>();
2360            let mut replica_id = ReplicaId(1);
2361            while replica_ids.contains(&replica_id) {
2362                replica_id.0 += 1;
2363            }
2364            let new_collaborator = project_collaborator::ActiveModel {
2365                project_id: ActiveValue::set(project_id),
2366                connection_id: ActiveValue::set(connection.id as i32),
2367                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2368                user_id: ActiveValue::set(participant.user_id),
2369                replica_id: ActiveValue::set(replica_id),
2370                is_host: ActiveValue::set(false),
2371                ..Default::default()
2372            }
2373            .insert(&*tx)
2374            .await?;
2375            collaborators.push(new_collaborator);
2376
2377            let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2378            let mut worktrees = db_worktrees
2379                .into_iter()
2380                .map(|db_worktree| {
2381                    (
2382                        db_worktree.id as u64,
2383                        Worktree {
2384                            id: db_worktree.id as u64,
2385                            abs_path: db_worktree.abs_path,
2386                            root_name: db_worktree.root_name,
2387                            visible: db_worktree.visible,
2388                            entries: Default::default(),
2389                            diagnostic_summaries: Default::default(),
2390                            scan_id: db_worktree.scan_id as u64,
2391                            completed_scan_id: db_worktree.completed_scan_id as u64,
2392                        },
2393                    )
2394                })
2395                .collect::<BTreeMap<_, _>>();
2396
2397            // Populate worktree entries.
2398            {
2399                let mut db_entries = worktree_entry::Entity::find()
2400                    .filter(
2401                        Condition::all()
2402                            .add(worktree_entry::Column::ProjectId.eq(project_id))
2403                            .add(worktree_entry::Column::IsDeleted.eq(false)),
2404                    )
2405                    .stream(&*tx)
2406                    .await?;
2407                while let Some(db_entry) = db_entries.next().await {
2408                    let db_entry = db_entry?;
2409                    if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2410                        worktree.entries.push(proto::Entry {
2411                            id: db_entry.id as u64,
2412                            is_dir: db_entry.is_dir,
2413                            path: db_entry.path,
2414                            inode: db_entry.inode as u64,
2415                            mtime: Some(proto::Timestamp {
2416                                seconds: db_entry.mtime_seconds as u64,
2417                                nanos: db_entry.mtime_nanos as u32,
2418                            }),
2419                            is_symlink: db_entry.is_symlink,
2420                            is_ignored: db_entry.is_ignored,
2421                        });
2422                    }
2423                }
2424            }
2425
2426            // Populate worktree diagnostic summaries.
2427            {
2428                let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2429                    .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2430                    .stream(&*tx)
2431                    .await?;
2432                while let Some(db_summary) = db_summaries.next().await {
2433                    let db_summary = db_summary?;
2434                    if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2435                        worktree
2436                            .diagnostic_summaries
2437                            .push(proto::DiagnosticSummary {
2438                                path: db_summary.path,
2439                                language_server_id: db_summary.language_server_id as u64,
2440                                error_count: db_summary.error_count as u32,
2441                                warning_count: db_summary.warning_count as u32,
2442                            });
2443                    }
2444                }
2445            }
2446
2447            // Populate language servers.
2448            let language_servers = project
2449                .find_related(language_server::Entity)
2450                .all(&*tx)
2451                .await?;
2452
2453            let room_id = project.room_id;
2454            let project = Project {
2455                collaborators: collaborators
2456                    .into_iter()
2457                    .map(|collaborator| ProjectCollaborator {
2458                        connection_id: collaborator.connection(),
2459                        user_id: collaborator.user_id,
2460                        replica_id: collaborator.replica_id,
2461                        is_host: collaborator.is_host,
2462                    })
2463                    .collect(),
2464                worktrees,
2465                language_servers: language_servers
2466                    .into_iter()
2467                    .map(|language_server| proto::LanguageServer {
2468                        id: language_server.id as u64,
2469                        name: language_server.name,
2470                    })
2471                    .collect(),
2472            };
2473            Ok((room_id, (project, replica_id as ReplicaId)))
2474        })
2475        .await
2476    }
2477
2478    pub async fn leave_project(
2479        &self,
2480        project_id: ProjectId,
2481        connection: ConnectionId,
2482    ) -> Result<RoomGuard<LeftProject>> {
2483        self.room_transaction(|tx| async move {
2484            let result = project_collaborator::Entity::delete_many()
2485                .filter(
2486                    Condition::all()
2487                        .add(project_collaborator::Column::ProjectId.eq(project_id))
2488                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2489                        .add(
2490                            project_collaborator::Column::ConnectionServerId
2491                                .eq(connection.owner_id as i32),
2492                        ),
2493                )
2494                .exec(&*tx)
2495                .await?;
2496            if result.rows_affected == 0 {
2497                Err(anyhow!("not a collaborator on this project"))?;
2498            }
2499
2500            let project = project::Entity::find_by_id(project_id)
2501                .one(&*tx)
2502                .await?
2503                .ok_or_else(|| anyhow!("no such project"))?;
2504            let collaborators = project
2505                .find_related(project_collaborator::Entity)
2506                .all(&*tx)
2507                .await?;
2508            let connection_ids = collaborators
2509                .into_iter()
2510                .map(|collaborator| collaborator.connection())
2511                .collect();
2512
2513            let left_project = LeftProject {
2514                id: project_id,
2515                host_user_id: project.host_user_id,
2516                host_connection_id: project.host_connection()?,
2517                connection_ids,
2518            };
2519            Ok((project.room_id, left_project))
2520        })
2521        .await
2522    }
2523
2524    pub async fn project_collaborators(
2525        &self,
2526        project_id: ProjectId,
2527        connection_id: ConnectionId,
2528    ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
2529        self.room_transaction(|tx| async move {
2530            let project = project::Entity::find_by_id(project_id)
2531                .one(&*tx)
2532                .await?
2533                .ok_or_else(|| anyhow!("no such project"))?;
2534            let collaborators = project_collaborator::Entity::find()
2535                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2536                .all(&*tx)
2537                .await?
2538                .into_iter()
2539                .map(|collaborator| ProjectCollaborator {
2540                    connection_id: collaborator.connection(),
2541                    user_id: collaborator.user_id,
2542                    replica_id: collaborator.replica_id,
2543                    is_host: collaborator.is_host,
2544                })
2545                .collect::<Vec<_>>();
2546
2547            if collaborators
2548                .iter()
2549                .any(|collaborator| collaborator.connection_id == connection_id)
2550            {
2551                Ok((project.room_id, collaborators))
2552            } else {
2553                Err(anyhow!("no such project"))?
2554            }
2555        })
2556        .await
2557    }
2558
2559    pub async fn project_connection_ids(
2560        &self,
2561        project_id: ProjectId,
2562        connection_id: ConnectionId,
2563    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
2564        self.room_transaction(|tx| async move {
2565            let project = project::Entity::find_by_id(project_id)
2566                .one(&*tx)
2567                .await?
2568                .ok_or_else(|| anyhow!("no such project"))?;
2569            let mut collaborators = project_collaborator::Entity::find()
2570                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2571                .stream(&*tx)
2572                .await?;
2573
2574            let mut connection_ids = HashSet::default();
2575            while let Some(collaborator) = collaborators.next().await {
2576                let collaborator = collaborator?;
2577                connection_ids.insert(collaborator.connection());
2578            }
2579
2580            if connection_ids.contains(&connection_id) {
2581                Ok((project.room_id, connection_ids))
2582            } else {
2583                Err(anyhow!("no such project"))?
2584            }
2585        })
2586        .await
2587    }
2588
2589    async fn project_guest_connection_ids(
2590        &self,
2591        project_id: ProjectId,
2592        tx: &DatabaseTransaction,
2593    ) -> Result<Vec<ConnectionId>> {
2594        let mut collaborators = project_collaborator::Entity::find()
2595            .filter(
2596                project_collaborator::Column::ProjectId
2597                    .eq(project_id)
2598                    .and(project_collaborator::Column::IsHost.eq(false)),
2599            )
2600            .stream(tx)
2601            .await?;
2602
2603        let mut guest_connection_ids = Vec::new();
2604        while let Some(collaborator) = collaborators.next().await {
2605            let collaborator = collaborator?;
2606            guest_connection_ids.push(collaborator.connection());
2607        }
2608        Ok(guest_connection_ids)
2609    }
2610
2611    // access tokens
2612
2613    pub async fn create_access_token_hash(
2614        &self,
2615        user_id: UserId,
2616        access_token_hash: &str,
2617        max_access_token_count: usize,
2618    ) -> Result<()> {
2619        self.transaction(|tx| async {
2620            let tx = tx;
2621
2622            access_token::ActiveModel {
2623                user_id: ActiveValue::set(user_id),
2624                hash: ActiveValue::set(access_token_hash.into()),
2625                ..Default::default()
2626            }
2627            .insert(&*tx)
2628            .await?;
2629
2630            access_token::Entity::delete_many()
2631                .filter(
2632                    access_token::Column::Id.in_subquery(
2633                        Query::select()
2634                            .column(access_token::Column::Id)
2635                            .from(access_token::Entity)
2636                            .and_where(access_token::Column::UserId.eq(user_id))
2637                            .order_by(access_token::Column::Id, sea_orm::Order::Desc)
2638                            .limit(10000)
2639                            .offset(max_access_token_count as u64)
2640                            .to_owned(),
2641                    ),
2642                )
2643                .exec(&*tx)
2644                .await?;
2645            Ok(())
2646        })
2647        .await
2648    }
2649
2650    pub async fn get_access_token_hashes(&self, user_id: UserId) -> Result<Vec<String>> {
2651        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2652        enum QueryAs {
2653            Hash,
2654        }
2655
2656        self.transaction(|tx| async move {
2657            Ok(access_token::Entity::find()
2658                .select_only()
2659                .column(access_token::Column::Hash)
2660                .filter(access_token::Column::UserId.eq(user_id))
2661                .order_by_desc(access_token::Column::Id)
2662                .into_values::<_, QueryAs>()
2663                .all(&*tx)
2664                .await?)
2665        })
2666        .await
2667    }
2668
2669    async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
2670    where
2671        F: Send + Fn(TransactionHandle) -> Fut,
2672        Fut: Send + Future<Output = Result<T>>,
2673    {
2674        let body = async {
2675            loop {
2676                let (tx, result) = self.with_transaction(&f).await?;
2677                match result {
2678                    Ok(result) => {
2679                        match tx.commit().await.map_err(Into::into) {
2680                            Ok(()) => return Ok(result),
2681                            Err(error) => {
2682                                if is_serialization_error(&error) {
2683                                    // Retry (don't break the loop)
2684                                } else {
2685                                    return Err(error);
2686                                }
2687                            }
2688                        }
2689                    }
2690                    Err(error) => {
2691                        tx.rollback().await?;
2692                        if is_serialization_error(&error) {
2693                            // Retry (don't break the loop)
2694                        } else {
2695                            return Err(error);
2696                        }
2697                    }
2698                }
2699            }
2700        };
2701
2702        self.run(body).await
2703    }
2704
2705    async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
2706    where
2707        F: Send + Fn(TransactionHandle) -> Fut,
2708        Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
2709    {
2710        let body = async {
2711            loop {
2712                let (tx, result) = self.with_transaction(&f).await?;
2713                match result {
2714                    Ok(Some((room_id, data))) => {
2715                        let lock = self.rooms.entry(room_id).or_default().clone();
2716                        let _guard = lock.lock_owned().await;
2717                        match tx.commit().await.map_err(Into::into) {
2718                            Ok(()) => {
2719                                return Ok(Some(RoomGuard {
2720                                    data,
2721                                    _guard,
2722                                    _not_send: PhantomData,
2723                                }));
2724                            }
2725                            Err(error) => {
2726                                if is_serialization_error(&error) {
2727                                    // Retry (don't break the loop)
2728                                } else {
2729                                    return Err(error);
2730                                }
2731                            }
2732                        }
2733                    }
2734                    Ok(None) => {
2735                        match tx.commit().await.map_err(Into::into) {
2736                            Ok(()) => return Ok(None),
2737                            Err(error) => {
2738                                if is_serialization_error(&error) {
2739                                    // Retry (don't break the loop)
2740                                } else {
2741                                    return Err(error);
2742                                }
2743                            }
2744                        }
2745                    }
2746                    Err(error) => {
2747                        tx.rollback().await?;
2748                        if is_serialization_error(&error) {
2749                            // Retry (don't break the loop)
2750                        } else {
2751                            return Err(error);
2752                        }
2753                    }
2754                }
2755            }
2756        };
2757
2758        self.run(body).await
2759    }
2760
2761    async fn room_transaction<F, Fut, T>(&self, f: F) -> Result<RoomGuard<T>>
2762    where
2763        F: Send + Fn(TransactionHandle) -> Fut,
2764        Fut: Send + Future<Output = Result<(RoomId, T)>>,
2765    {
2766        let data = self
2767            .optional_room_transaction(move |tx| {
2768                let future = f(tx);
2769                async {
2770                    let data = future.await?;
2771                    Ok(Some(data))
2772                }
2773            })
2774            .await?;
2775        Ok(data.unwrap())
2776    }
2777
2778    async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
2779    where
2780        F: Send + Fn(TransactionHandle) -> Fut,
2781        Fut: Send + Future<Output = Result<T>>,
2782    {
2783        let tx = self
2784            .pool
2785            .begin_with_config(Some(IsolationLevel::Serializable), None)
2786            .await?;
2787
2788        let mut tx = Arc::new(Some(tx));
2789        let result = f(TransactionHandle(tx.clone())).await;
2790        let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
2791            return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
2792        };
2793
2794        Ok((tx, result))
2795    }
2796
2797    async fn run<F, T>(&self, future: F) -> T
2798    where
2799        F: Future<Output = T>,
2800    {
2801        #[cfg(test)]
2802        {
2803            if let Some(background) = self.background.as_ref() {
2804                background.simulate_random_delay().await;
2805            }
2806
2807            self.runtime.as_ref().unwrap().block_on(future)
2808        }
2809
2810        #[cfg(not(test))]
2811        {
2812            future.await
2813        }
2814    }
2815}
2816
2817fn is_serialization_error(error: &Error) -> bool {
2818    const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
2819    match error {
2820        Error::Database(
2821            DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
2822            | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
2823        ) if error
2824            .as_database_error()
2825            .and_then(|error| error.code())
2826            .as_deref()
2827            == Some(SERIALIZATION_FAILURE_CODE) =>
2828        {
2829            true
2830        }
2831        _ => false,
2832    }
2833}
2834
2835struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
2836
2837impl Deref for TransactionHandle {
2838    type Target = DatabaseTransaction;
2839
2840    fn deref(&self) -> &Self::Target {
2841        self.0.as_ref().as_ref().unwrap()
2842    }
2843}
2844
2845pub struct RoomGuard<T> {
2846    data: T,
2847    _guard: OwnedMutexGuard<()>,
2848    _not_send: PhantomData<Rc<()>>,
2849}
2850
2851impl<T> Deref for RoomGuard<T> {
2852    type Target = T;
2853
2854    fn deref(&self) -> &T {
2855        &self.data
2856    }
2857}
2858
2859impl<T> DerefMut for RoomGuard<T> {
2860    fn deref_mut(&mut self) -> &mut T {
2861        &mut self.data
2862    }
2863}
2864
2865#[derive(Debug, Serialize, Deserialize)]
2866pub struct NewUserParams {
2867    pub github_login: String,
2868    pub github_user_id: i32,
2869    pub invite_count: i32,
2870}
2871
2872#[derive(Debug)]
2873pub struct NewUserResult {
2874    pub user_id: UserId,
2875    pub metrics_id: String,
2876    pub inviting_user_id: Option<UserId>,
2877    pub signup_device_id: Option<String>,
2878}
2879
2880fn random_invite_code() -> String {
2881    nanoid::nanoid!(16)
2882}
2883
2884fn random_email_confirmation_code() -> String {
2885    nanoid::nanoid!(64)
2886}
2887
2888macro_rules! id_type {
2889    ($name:ident) => {
2890        #[derive(
2891            Clone,
2892            Copy,
2893            Debug,
2894            Default,
2895            PartialEq,
2896            Eq,
2897            PartialOrd,
2898            Ord,
2899            Hash,
2900            Serialize,
2901            Deserialize,
2902        )]
2903        #[serde(transparent)]
2904        pub struct $name(pub i32);
2905
2906        impl $name {
2907            #[allow(unused)]
2908            pub const MAX: Self = Self(i32::MAX);
2909
2910            #[allow(unused)]
2911            pub fn from_proto(value: u64) -> Self {
2912                Self(value as i32)
2913            }
2914
2915            #[allow(unused)]
2916            pub fn to_proto(self) -> u64 {
2917                self.0 as u64
2918            }
2919        }
2920
2921        impl std::fmt::Display for $name {
2922            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2923                self.0.fmt(f)
2924            }
2925        }
2926
2927        impl From<$name> for sea_query::Value {
2928            fn from(value: $name) -> Self {
2929                sea_query::Value::Int(Some(value.0))
2930            }
2931        }
2932
2933        impl sea_orm::TryGetable for $name {
2934            fn try_get(
2935                res: &sea_orm::QueryResult,
2936                pre: &str,
2937                col: &str,
2938            ) -> Result<Self, sea_orm::TryGetError> {
2939                Ok(Self(i32::try_get(res, pre, col)?))
2940            }
2941        }
2942
2943        impl sea_query::ValueType for $name {
2944            fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
2945                match v {
2946                    Value::TinyInt(Some(int)) => {
2947                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2948                    }
2949                    Value::SmallInt(Some(int)) => {
2950                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2951                    }
2952                    Value::Int(Some(int)) => {
2953                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2954                    }
2955                    Value::BigInt(Some(int)) => {
2956                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2957                    }
2958                    Value::TinyUnsigned(Some(int)) => {
2959                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2960                    }
2961                    Value::SmallUnsigned(Some(int)) => {
2962                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2963                    }
2964                    Value::Unsigned(Some(int)) => {
2965                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2966                    }
2967                    Value::BigUnsigned(Some(int)) => {
2968                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2969                    }
2970                    _ => Err(sea_query::ValueTypeErr),
2971                }
2972            }
2973
2974            fn type_name() -> String {
2975                stringify!($name).into()
2976            }
2977
2978            fn array_type() -> sea_query::ArrayType {
2979                sea_query::ArrayType::Int
2980            }
2981
2982            fn column_type() -> sea_query::ColumnType {
2983                sea_query::ColumnType::Integer(None)
2984            }
2985        }
2986
2987        impl sea_orm::TryFromU64 for $name {
2988            fn try_from_u64(n: u64) -> Result<Self, DbErr> {
2989                Ok(Self(n.try_into().map_err(|_| {
2990                    DbErr::ConvertFromU64(concat!(
2991                        "error converting ",
2992                        stringify!($name),
2993                        " to u64"
2994                    ))
2995                })?))
2996            }
2997        }
2998
2999        impl sea_query::Nullable for $name {
3000            fn null() -> Value {
3001                Value::Int(None)
3002            }
3003        }
3004    };
3005}
3006
3007id_type!(AccessTokenId);
3008id_type!(ContactId);
3009id_type!(RoomId);
3010id_type!(RoomParticipantId);
3011id_type!(ProjectId);
3012id_type!(ProjectCollaboratorId);
3013id_type!(ReplicaId);
3014id_type!(ServerId);
3015id_type!(SignupId);
3016id_type!(UserId);
3017
3018pub struct RejoinedRoom {
3019    pub room: proto::Room,
3020    pub rejoined_projects: Vec<RejoinedProject>,
3021    pub reshared_projects: Vec<ResharedProject>,
3022}
3023
3024pub struct ResharedProject {
3025    pub id: ProjectId,
3026    pub old_connection_id: ConnectionId,
3027    pub collaborators: Vec<ProjectCollaborator>,
3028    pub worktrees: Vec<proto::WorktreeMetadata>,
3029}
3030
3031pub struct RejoinedProject {
3032    pub id: ProjectId,
3033    pub old_connection_id: ConnectionId,
3034    pub collaborators: Vec<ProjectCollaborator>,
3035    pub worktrees: Vec<RejoinedWorktree>,
3036    pub language_servers: Vec<proto::LanguageServer>,
3037}
3038
3039#[derive(Debug)]
3040pub struct RejoinedWorktree {
3041    pub id: u64,
3042    pub abs_path: String,
3043    pub root_name: String,
3044    pub visible: bool,
3045    pub updated_entries: Vec<proto::Entry>,
3046    pub removed_entries: Vec<u64>,
3047    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3048    pub scan_id: u64,
3049    pub completed_scan_id: u64,
3050}
3051
3052pub struct LeftRoom {
3053    pub room: proto::Room,
3054    pub left_projects: HashMap<ProjectId, LeftProject>,
3055    pub canceled_calls_to_user_ids: Vec<UserId>,
3056}
3057
3058pub struct RefreshedRoom {
3059    pub room: proto::Room,
3060    pub stale_participant_user_ids: Vec<UserId>,
3061    pub canceled_calls_to_user_ids: Vec<UserId>,
3062}
3063
3064pub struct Project {
3065    pub collaborators: Vec<ProjectCollaborator>,
3066    pub worktrees: BTreeMap<u64, Worktree>,
3067    pub language_servers: Vec<proto::LanguageServer>,
3068}
3069
3070pub struct ProjectCollaborator {
3071    pub connection_id: ConnectionId,
3072    pub user_id: UserId,
3073    pub replica_id: ReplicaId,
3074    pub is_host: bool,
3075}
3076
3077impl ProjectCollaborator {
3078    pub fn to_proto(&self) -> proto::Collaborator {
3079        proto::Collaborator {
3080            peer_id: Some(self.connection_id.into()),
3081            replica_id: self.replica_id.0 as u32,
3082            user_id: self.user_id.to_proto(),
3083        }
3084    }
3085}
3086
3087#[derive(Debug)]
3088pub struct LeftProject {
3089    pub id: ProjectId,
3090    pub host_user_id: UserId,
3091    pub host_connection_id: ConnectionId,
3092    pub connection_ids: Vec<ConnectionId>,
3093}
3094
3095pub struct Worktree {
3096    pub id: u64,
3097    pub abs_path: String,
3098    pub root_name: String,
3099    pub visible: bool,
3100    pub entries: Vec<proto::Entry>,
3101    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3102    pub scan_id: u64,
3103    pub completed_scan_id: u64,
3104}
3105
3106#[cfg(test)]
3107pub use test::*;
3108
3109#[cfg(test)]
3110mod test {
3111    use super::*;
3112    use gpui::executor::Background;
3113    use lazy_static::lazy_static;
3114    use parking_lot::Mutex;
3115    use rand::prelude::*;
3116    use sea_orm::ConnectionTrait;
3117    use sqlx::migrate::MigrateDatabase;
3118    use std::sync::Arc;
3119
3120    pub struct TestDb {
3121        pub db: Option<Arc<Database>>,
3122        pub connection: Option<sqlx::AnyConnection>,
3123    }
3124
3125    impl TestDb {
3126        pub fn sqlite(background: Arc<Background>) -> Self {
3127            let url = format!("sqlite::memory:");
3128            let runtime = tokio::runtime::Builder::new_current_thread()
3129                .enable_io()
3130                .enable_time()
3131                .build()
3132                .unwrap();
3133
3134            let mut db = runtime.block_on(async {
3135                let mut options = ConnectOptions::new(url);
3136                options.max_connections(5);
3137                let db = Database::new(options).await.unwrap();
3138                let sql = include_str!(concat!(
3139                    env!("CARGO_MANIFEST_DIR"),
3140                    "/migrations.sqlite/20221109000000_test_schema.sql"
3141                ));
3142                db.pool
3143                    .execute(sea_orm::Statement::from_string(
3144                        db.pool.get_database_backend(),
3145                        sql.into(),
3146                    ))
3147                    .await
3148                    .unwrap();
3149                db
3150            });
3151
3152            db.background = Some(background);
3153            db.runtime = Some(runtime);
3154
3155            Self {
3156                db: Some(Arc::new(db)),
3157                connection: None,
3158            }
3159        }
3160
3161        pub fn postgres(background: Arc<Background>) -> Self {
3162            lazy_static! {
3163                static ref LOCK: Mutex<()> = Mutex::new(());
3164            }
3165
3166            let _guard = LOCK.lock();
3167            let mut rng = StdRng::from_entropy();
3168            let url = format!(
3169                "postgres://postgres@localhost/zed-test-{}",
3170                rng.gen::<u128>()
3171            );
3172            let runtime = tokio::runtime::Builder::new_current_thread()
3173                .enable_io()
3174                .enable_time()
3175                .build()
3176                .unwrap();
3177
3178            let mut db = runtime.block_on(async {
3179                sqlx::Postgres::create_database(&url)
3180                    .await
3181                    .expect("failed to create test db");
3182                let mut options = ConnectOptions::new(url);
3183                options
3184                    .max_connections(5)
3185                    .idle_timeout(Duration::from_secs(0));
3186                let db = Database::new(options).await.unwrap();
3187                let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
3188                db.migrate(Path::new(migrations_path), false).await.unwrap();
3189                db
3190            });
3191
3192            db.background = Some(background);
3193            db.runtime = Some(runtime);
3194
3195            Self {
3196                db: Some(Arc::new(db)),
3197                connection: None,
3198            }
3199        }
3200
3201        pub fn db(&self) -> &Arc<Database> {
3202            self.db.as_ref().unwrap()
3203        }
3204    }
3205
3206    impl Drop for TestDb {
3207        fn drop(&mut self) {
3208            let db = self.db.take().unwrap();
3209            if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
3210                db.runtime.as_ref().unwrap().block_on(async {
3211                    use util::ResultExt;
3212                    let query = "
3213                        SELECT pg_terminate_backend(pg_stat_activity.pid)
3214                        FROM pg_stat_activity
3215                        WHERE
3216                            pg_stat_activity.datname = current_database() AND
3217                            pid <> pg_backend_pid();
3218                    ";
3219                    db.pool
3220                        .execute(sea_orm::Statement::from_string(
3221                            db.pool.get_database_backend(),
3222                            query.into(),
3223                        ))
3224                        .await
3225                        .log_err();
3226                    sqlx::Postgres::drop_database(db.options.get_url())
3227                        .await
3228                        .log_err();
3229                })
3230            }
3231        }
3232    }
3233}