db.rs

   1mod access_token;
   2mod contact;
   3mod language_server;
   4mod project;
   5mod project_collaborator;
   6mod room;
   7mod room_participant;
   8mod server;
   9mod signup;
  10#[cfg(test)]
  11mod tests;
  12mod user;
  13mod worktree;
  14mod worktree_diagnostic_summary;
  15mod worktree_entry;
  16
  17use crate::{Error, Result};
  18use anyhow::anyhow;
  19use collections::{BTreeMap, HashMap, HashSet};
  20pub use contact::Contact;
  21use dashmap::DashMap;
  22use futures::StreamExt;
  23use hyper::StatusCode;
  24use rpc::{proto, ConnectionId};
  25use sea_orm::Condition;
  26pub use sea_orm::ConnectOptions;
  27use sea_orm::{
  28    entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
  29    DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
  30    Statement, TransactionTrait,
  31};
  32use sea_query::{Alias, Expr, OnConflict, Query};
  33use serde::{Deserialize, Serialize};
  34pub use signup::{Invite, NewSignup, WaitlistSummary};
  35use sqlx::migrate::{Migrate, Migration, MigrationSource};
  36use sqlx::Connection;
  37use std::ops::{Deref, DerefMut};
  38use std::path::Path;
  39use std::time::Duration;
  40use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
  41use tokio::sync::{Mutex, OwnedMutexGuard};
  42pub use user::Model as User;
  43
  44pub struct Database {
  45    options: ConnectOptions,
  46    pool: DatabaseConnection,
  47    rooms: DashMap<RoomId, Arc<Mutex<()>>>,
  48    #[cfg(test)]
  49    background: Option<std::sync::Arc<gpui::executor::Background>>,
  50    #[cfg(test)]
  51    runtime: Option<tokio::runtime::Runtime>,
  52}
  53
  54impl Database {
  55    pub async fn new(options: ConnectOptions) -> Result<Self> {
  56        Ok(Self {
  57            options: options.clone(),
  58            pool: sea_orm::Database::connect(options).await?,
  59            rooms: DashMap::with_capacity(16384),
  60            #[cfg(test)]
  61            background: None,
  62            #[cfg(test)]
  63            runtime: None,
  64        })
  65    }
  66
  67    #[cfg(test)]
  68    pub fn reset(&self) {
  69        self.rooms.clear();
  70    }
  71
  72    pub async fn migrate(
  73        &self,
  74        migrations_path: &Path,
  75        ignore_checksum_mismatch: bool,
  76    ) -> anyhow::Result<Vec<(Migration, Duration)>> {
  77        let migrations = MigrationSource::resolve(migrations_path)
  78            .await
  79            .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
  80
  81        let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
  82
  83        connection.ensure_migrations_table().await?;
  84        let applied_migrations: HashMap<_, _> = connection
  85            .list_applied_migrations()
  86            .await?
  87            .into_iter()
  88            .map(|m| (m.version, m))
  89            .collect();
  90
  91        let mut new_migrations = Vec::new();
  92        for migration in migrations {
  93            match applied_migrations.get(&migration.version) {
  94                Some(applied_migration) => {
  95                    if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
  96                    {
  97                        Err(anyhow!(
  98                            "checksum mismatch for applied migration {}",
  99                            migration.description
 100                        ))?;
 101                    }
 102                }
 103                None => {
 104                    let elapsed = connection.apply(&migration).await?;
 105                    new_migrations.push((migration, elapsed));
 106                }
 107            }
 108        }
 109
 110        Ok(new_migrations)
 111    }
 112
 113    pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
 114        self.transaction(|tx| async move {
 115            let server = server::ActiveModel {
 116                environment: ActiveValue::set(environment.into()),
 117                ..Default::default()
 118            }
 119            .insert(&*tx)
 120            .await?;
 121            Ok(server.id)
 122        })
 123        .await
 124    }
 125
 126    pub async fn stale_room_ids(
 127        &self,
 128        environment: &str,
 129        new_server_id: ServerId,
 130    ) -> Result<Vec<RoomId>> {
 131        self.transaction(|tx| async move {
 132            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 133            enum QueryAs {
 134                RoomId,
 135            }
 136
 137            let stale_server_epochs = self
 138                .stale_server_ids(environment, new_server_id, &tx)
 139                .await?;
 140            Ok(room_participant::Entity::find()
 141                .select_only()
 142                .column(room_participant::Column::RoomId)
 143                .distinct()
 144                .filter(
 145                    room_participant::Column::AnsweringConnectionServerId
 146                        .is_in(stale_server_epochs),
 147                )
 148                .into_values::<_, QueryAs>()
 149                .all(&*tx)
 150                .await?)
 151        })
 152        .await
 153    }
 154
 155    pub async fn refresh_room(
 156        &self,
 157        room_id: RoomId,
 158        new_server_id: ServerId,
 159    ) -> Result<RoomGuard<RefreshedRoom>> {
 160        self.room_transaction(|tx| async move {
 161            let stale_participant_filter = Condition::all()
 162                .add(room_participant::Column::RoomId.eq(room_id))
 163                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
 164                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
 165
 166            let stale_participant_user_ids = room_participant::Entity::find()
 167                .filter(stale_participant_filter.clone())
 168                .all(&*tx)
 169                .await?
 170                .into_iter()
 171                .map(|participant| participant.user_id)
 172                .collect::<Vec<_>>();
 173
 174            // Delete participants who failed to reconnect.
 175            room_participant::Entity::delete_many()
 176                .filter(stale_participant_filter)
 177                .exec(&*tx)
 178                .await?;
 179
 180            let room = self.get_room(room_id, &tx).await?;
 181            let mut canceled_calls_to_user_ids = Vec::new();
 182            // Delete the room if it becomes empty and cancel pending calls.
 183            if room.participants.is_empty() {
 184                canceled_calls_to_user_ids.extend(
 185                    room.pending_participants
 186                        .iter()
 187                        .map(|pending_participant| UserId::from_proto(pending_participant.user_id)),
 188                );
 189                room_participant::Entity::delete_many()
 190                    .filter(room_participant::Column::RoomId.eq(room_id))
 191                    .exec(&*tx)
 192                    .await?;
 193                room::Entity::delete_by_id(room_id).exec(&*tx).await?;
 194            }
 195
 196            Ok((
 197                room_id,
 198                RefreshedRoom {
 199                    room,
 200                    stale_participant_user_ids,
 201                    canceled_calls_to_user_ids,
 202                },
 203            ))
 204        })
 205        .await
 206    }
 207
 208    pub async fn delete_stale_servers(
 209        &self,
 210        environment: &str,
 211        new_server_id: ServerId,
 212    ) -> Result<()> {
 213        self.transaction(|tx| async move {
 214            server::Entity::delete_many()
 215                .filter(
 216                    Condition::all()
 217                        .add(server::Column::Environment.eq(environment))
 218                        .add(server::Column::Id.ne(new_server_id)),
 219                )
 220                .exec(&*tx)
 221                .await?;
 222            Ok(())
 223        })
 224        .await
 225    }
 226
 227    async fn stale_server_ids(
 228        &self,
 229        environment: &str,
 230        new_server_id: ServerId,
 231        tx: &DatabaseTransaction,
 232    ) -> Result<Vec<ServerId>> {
 233        let stale_servers = server::Entity::find()
 234            .filter(
 235                Condition::all()
 236                    .add(server::Column::Environment.eq(environment))
 237                    .add(server::Column::Id.ne(new_server_id)),
 238            )
 239            .all(&*tx)
 240            .await?;
 241        Ok(stale_servers.into_iter().map(|server| server.id).collect())
 242    }
 243
 244    // users
 245
 246    pub async fn create_user(
 247        &self,
 248        email_address: &str,
 249        admin: bool,
 250        params: NewUserParams,
 251    ) -> Result<NewUserResult> {
 252        self.transaction(|tx| async {
 253            let tx = tx;
 254            let user = user::Entity::insert(user::ActiveModel {
 255                email_address: ActiveValue::set(Some(email_address.into())),
 256                github_login: ActiveValue::set(params.github_login.clone()),
 257                github_user_id: ActiveValue::set(Some(params.github_user_id)),
 258                admin: ActiveValue::set(admin),
 259                metrics_id: ActiveValue::set(Uuid::new_v4()),
 260                ..Default::default()
 261            })
 262            .on_conflict(
 263                OnConflict::column(user::Column::GithubLogin)
 264                    .update_column(user::Column::GithubLogin)
 265                    .to_owned(),
 266            )
 267            .exec_with_returning(&*tx)
 268            .await?;
 269
 270            Ok(NewUserResult {
 271                user_id: user.id,
 272                metrics_id: user.metrics_id.to_string(),
 273                signup_device_id: None,
 274                inviting_user_id: None,
 275            })
 276        })
 277        .await
 278    }
 279
 280    pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
 281        self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
 282            .await
 283    }
 284
 285    pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
 286        self.transaction(|tx| async {
 287            let tx = tx;
 288            Ok(user::Entity::find()
 289                .filter(user::Column::Id.is_in(ids.iter().copied()))
 290                .all(&*tx)
 291                .await?)
 292        })
 293        .await
 294    }
 295
 296    pub async fn get_user_by_github_account(
 297        &self,
 298        github_login: &str,
 299        github_user_id: Option<i32>,
 300    ) -> Result<Option<User>> {
 301        self.transaction(|tx| async move {
 302            let tx = &*tx;
 303            if let Some(github_user_id) = github_user_id {
 304                if let Some(user_by_github_user_id) = user::Entity::find()
 305                    .filter(user::Column::GithubUserId.eq(github_user_id))
 306                    .one(tx)
 307                    .await?
 308                {
 309                    let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
 310                    user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
 311                    Ok(Some(user_by_github_user_id.update(tx).await?))
 312                } else if let Some(user_by_github_login) = user::Entity::find()
 313                    .filter(user::Column::GithubLogin.eq(github_login))
 314                    .one(tx)
 315                    .await?
 316                {
 317                    let mut user_by_github_login = user_by_github_login.into_active_model();
 318                    user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
 319                    Ok(Some(user_by_github_login.update(tx).await?))
 320                } else {
 321                    Ok(None)
 322                }
 323            } else {
 324                Ok(user::Entity::find()
 325                    .filter(user::Column::GithubLogin.eq(github_login))
 326                    .one(tx)
 327                    .await?)
 328            }
 329        })
 330        .await
 331    }
 332
 333    pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
 334        self.transaction(|tx| async move {
 335            Ok(user::Entity::find()
 336                .order_by_asc(user::Column::GithubLogin)
 337                .limit(limit as u64)
 338                .offset(page as u64 * limit as u64)
 339                .all(&*tx)
 340                .await?)
 341        })
 342        .await
 343    }
 344
 345    pub async fn get_users_with_no_invites(
 346        &self,
 347        invited_by_another_user: bool,
 348    ) -> Result<Vec<User>> {
 349        self.transaction(|tx| async move {
 350            Ok(user::Entity::find()
 351                .filter(
 352                    user::Column::InviteCount
 353                        .eq(0)
 354                        .and(if invited_by_another_user {
 355                            user::Column::InviterId.is_not_null()
 356                        } else {
 357                            user::Column::InviterId.is_null()
 358                        }),
 359                )
 360                .all(&*tx)
 361                .await?)
 362        })
 363        .await
 364    }
 365
 366    pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
 367        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 368        enum QueryAs {
 369            MetricsId,
 370        }
 371
 372        self.transaction(|tx| async move {
 373            let metrics_id: Uuid = user::Entity::find_by_id(id)
 374                .select_only()
 375                .column(user::Column::MetricsId)
 376                .into_values::<_, QueryAs>()
 377                .one(&*tx)
 378                .await?
 379                .ok_or_else(|| anyhow!("could not find user"))?;
 380            Ok(metrics_id.to_string())
 381        })
 382        .await
 383    }
 384
 385    pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
 386        self.transaction(|tx| async move {
 387            user::Entity::update_many()
 388                .filter(user::Column::Id.eq(id))
 389                .set(user::ActiveModel {
 390                    admin: ActiveValue::set(is_admin),
 391                    ..Default::default()
 392                })
 393                .exec(&*tx)
 394                .await?;
 395            Ok(())
 396        })
 397        .await
 398    }
 399
 400    pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
 401        self.transaction(|tx| async move {
 402            user::Entity::update_many()
 403                .filter(user::Column::Id.eq(id))
 404                .set(user::ActiveModel {
 405                    connected_once: ActiveValue::set(connected_once),
 406                    ..Default::default()
 407                })
 408                .exec(&*tx)
 409                .await?;
 410            Ok(())
 411        })
 412        .await
 413    }
 414
 415    pub async fn destroy_user(&self, id: UserId) -> Result<()> {
 416        self.transaction(|tx| async move {
 417            access_token::Entity::delete_many()
 418                .filter(access_token::Column::UserId.eq(id))
 419                .exec(&*tx)
 420                .await?;
 421            user::Entity::delete_by_id(id).exec(&*tx).await?;
 422            Ok(())
 423        })
 424        .await
 425    }
 426
 427    // contacts
 428
 429    pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
 430        #[derive(Debug, FromQueryResult)]
 431        struct ContactWithUserBusyStatuses {
 432            user_id_a: UserId,
 433            user_id_b: UserId,
 434            a_to_b: bool,
 435            accepted: bool,
 436            should_notify: bool,
 437            user_a_busy: bool,
 438            user_b_busy: bool,
 439        }
 440
 441        self.transaction(|tx| async move {
 442            let user_a_participant = Alias::new("user_a_participant");
 443            let user_b_participant = Alias::new("user_b_participant");
 444            let mut db_contacts = contact::Entity::find()
 445                .column_as(
 446                    Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
 447                        .is_not_null(),
 448                    "user_a_busy",
 449                )
 450                .column_as(
 451                    Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
 452                        .is_not_null(),
 453                    "user_b_busy",
 454                )
 455                .filter(
 456                    contact::Column::UserIdA
 457                        .eq(user_id)
 458                        .or(contact::Column::UserIdB.eq(user_id)),
 459                )
 460                .join_as(
 461                    JoinType::LeftJoin,
 462                    contact::Relation::UserARoomParticipant.def(),
 463                    user_a_participant,
 464                )
 465                .join_as(
 466                    JoinType::LeftJoin,
 467                    contact::Relation::UserBRoomParticipant.def(),
 468                    user_b_participant,
 469                )
 470                .into_model::<ContactWithUserBusyStatuses>()
 471                .stream(&*tx)
 472                .await?;
 473
 474            let mut contacts = Vec::new();
 475            while let Some(db_contact) = db_contacts.next().await {
 476                let db_contact = db_contact?;
 477                if db_contact.user_id_a == user_id {
 478                    if db_contact.accepted {
 479                        contacts.push(Contact::Accepted {
 480                            user_id: db_contact.user_id_b,
 481                            should_notify: db_contact.should_notify && db_contact.a_to_b,
 482                            busy: db_contact.user_b_busy,
 483                        });
 484                    } else if db_contact.a_to_b {
 485                        contacts.push(Contact::Outgoing {
 486                            user_id: db_contact.user_id_b,
 487                        })
 488                    } else {
 489                        contacts.push(Contact::Incoming {
 490                            user_id: db_contact.user_id_b,
 491                            should_notify: db_contact.should_notify,
 492                        });
 493                    }
 494                } else if db_contact.accepted {
 495                    contacts.push(Contact::Accepted {
 496                        user_id: db_contact.user_id_a,
 497                        should_notify: db_contact.should_notify && !db_contact.a_to_b,
 498                        busy: db_contact.user_a_busy,
 499                    });
 500                } else if db_contact.a_to_b {
 501                    contacts.push(Contact::Incoming {
 502                        user_id: db_contact.user_id_a,
 503                        should_notify: db_contact.should_notify,
 504                    });
 505                } else {
 506                    contacts.push(Contact::Outgoing {
 507                        user_id: db_contact.user_id_a,
 508                    });
 509                }
 510            }
 511
 512            contacts.sort_unstable_by_key(|contact| contact.user_id());
 513
 514            Ok(contacts)
 515        })
 516        .await
 517    }
 518
 519    pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
 520        self.transaction(|tx| async move {
 521            let participant = room_participant::Entity::find()
 522                .filter(room_participant::Column::UserId.eq(user_id))
 523                .one(&*tx)
 524                .await?;
 525            Ok(participant.is_some())
 526        })
 527        .await
 528    }
 529
 530    pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
 531        self.transaction(|tx| async move {
 532            let (id_a, id_b) = if user_id_1 < user_id_2 {
 533                (user_id_1, user_id_2)
 534            } else {
 535                (user_id_2, user_id_1)
 536            };
 537
 538            Ok(contact::Entity::find()
 539                .filter(
 540                    contact::Column::UserIdA
 541                        .eq(id_a)
 542                        .and(contact::Column::UserIdB.eq(id_b))
 543                        .and(contact::Column::Accepted.eq(true)),
 544                )
 545                .one(&*tx)
 546                .await?
 547                .is_some())
 548        })
 549        .await
 550    }
 551
 552    pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
 553        self.transaction(|tx| async move {
 554            let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
 555                (sender_id, receiver_id, true)
 556            } else {
 557                (receiver_id, sender_id, false)
 558            };
 559
 560            let rows_affected = contact::Entity::insert(contact::ActiveModel {
 561                user_id_a: ActiveValue::set(id_a),
 562                user_id_b: ActiveValue::set(id_b),
 563                a_to_b: ActiveValue::set(a_to_b),
 564                accepted: ActiveValue::set(false),
 565                should_notify: ActiveValue::set(true),
 566                ..Default::default()
 567            })
 568            .on_conflict(
 569                OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
 570                    .values([
 571                        (contact::Column::Accepted, true.into()),
 572                        (contact::Column::ShouldNotify, false.into()),
 573                    ])
 574                    .action_and_where(
 575                        contact::Column::Accepted.eq(false).and(
 576                            contact::Column::AToB
 577                                .eq(a_to_b)
 578                                .and(contact::Column::UserIdA.eq(id_b))
 579                                .or(contact::Column::AToB
 580                                    .ne(a_to_b)
 581                                    .and(contact::Column::UserIdA.eq(id_a))),
 582                        ),
 583                    )
 584                    .to_owned(),
 585            )
 586            .exec_without_returning(&*tx)
 587            .await?;
 588
 589            if rows_affected == 1 {
 590                Ok(())
 591            } else {
 592                Err(anyhow!("contact already requested"))?
 593            }
 594        })
 595        .await
 596    }
 597
 598    /// Returns a bool indicating whether the removed contact had originally accepted or not
 599    ///
 600    /// Deletes the contact identified by the requester and responder ids, and then returns
 601    /// whether the deleted contact had originally accepted or was a pending contact request.
 602    ///
 603    /// # Arguments
 604    ///
 605    /// * `requester_id` - The user that initiates this request
 606    /// * `responder_id` - The user that will be removed
 607    pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<bool> {
 608        self.transaction(|tx| async move {
 609            let (id_a, id_b) = if responder_id < requester_id {
 610                (responder_id, requester_id)
 611            } else {
 612                (requester_id, responder_id)
 613            };
 614
 615            let contact = contact::Entity::find()
 616                .filter(
 617                    contact::Column::UserIdA
 618                        .eq(id_a)
 619                        .and(contact::Column::UserIdB.eq(id_b)),
 620                )
 621                .one(&*tx)
 622                .await?
 623                .ok_or_else(|| anyhow!("no such contact"))?;
 624
 625            contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
 626            Ok(contact.accepted)
 627        })
 628        .await
 629    }
 630
 631    pub async fn dismiss_contact_notification(
 632        &self,
 633        user_id: UserId,
 634        contact_user_id: UserId,
 635    ) -> Result<()> {
 636        self.transaction(|tx| async move {
 637            let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
 638                (user_id, contact_user_id, true)
 639            } else {
 640                (contact_user_id, user_id, false)
 641            };
 642
 643            let result = contact::Entity::update_many()
 644                .set(contact::ActiveModel {
 645                    should_notify: ActiveValue::set(false),
 646                    ..Default::default()
 647                })
 648                .filter(
 649                    contact::Column::UserIdA
 650                        .eq(id_a)
 651                        .and(contact::Column::UserIdB.eq(id_b))
 652                        .and(
 653                            contact::Column::AToB
 654                                .eq(a_to_b)
 655                                .and(contact::Column::Accepted.eq(true))
 656                                .or(contact::Column::AToB
 657                                    .ne(a_to_b)
 658                                    .and(contact::Column::Accepted.eq(false))),
 659                        ),
 660                )
 661                .exec(&*tx)
 662                .await?;
 663            if result.rows_affected == 0 {
 664                Err(anyhow!("no such contact request"))?
 665            } else {
 666                Ok(())
 667            }
 668        })
 669        .await
 670    }
 671
 672    pub async fn respond_to_contact_request(
 673        &self,
 674        responder_id: UserId,
 675        requester_id: UserId,
 676        accept: bool,
 677    ) -> Result<()> {
 678        self.transaction(|tx| async move {
 679            let (id_a, id_b, a_to_b) = if responder_id < requester_id {
 680                (responder_id, requester_id, false)
 681            } else {
 682                (requester_id, responder_id, true)
 683            };
 684            let rows_affected = if accept {
 685                let result = contact::Entity::update_many()
 686                    .set(contact::ActiveModel {
 687                        accepted: ActiveValue::set(true),
 688                        should_notify: ActiveValue::set(true),
 689                        ..Default::default()
 690                    })
 691                    .filter(
 692                        contact::Column::UserIdA
 693                            .eq(id_a)
 694                            .and(contact::Column::UserIdB.eq(id_b))
 695                            .and(contact::Column::AToB.eq(a_to_b)),
 696                    )
 697                    .exec(&*tx)
 698                    .await?;
 699                result.rows_affected
 700            } else {
 701                let result = contact::Entity::delete_many()
 702                    .filter(
 703                        contact::Column::UserIdA
 704                            .eq(id_a)
 705                            .and(contact::Column::UserIdB.eq(id_b))
 706                            .and(contact::Column::AToB.eq(a_to_b))
 707                            .and(contact::Column::Accepted.eq(false)),
 708                    )
 709                    .exec(&*tx)
 710                    .await?;
 711
 712                result.rows_affected
 713            };
 714
 715            if rows_affected == 1 {
 716                Ok(())
 717            } else {
 718                Err(anyhow!("no such contact request"))?
 719            }
 720        })
 721        .await
 722    }
 723
 724    pub fn fuzzy_like_string(string: &str) -> String {
 725        let mut result = String::with_capacity(string.len() * 2 + 1);
 726        for c in string.chars() {
 727            if c.is_alphanumeric() {
 728                result.push('%');
 729                result.push(c);
 730            }
 731        }
 732        result.push('%');
 733        result
 734    }
 735
 736    pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
 737        self.transaction(|tx| async {
 738            let tx = tx;
 739            let like_string = Self::fuzzy_like_string(name_query);
 740            let query = "
 741                SELECT users.*
 742                FROM users
 743                WHERE github_login ILIKE $1
 744                ORDER BY github_login <-> $2
 745                LIMIT $3
 746            ";
 747
 748            Ok(user::Entity::find()
 749                .from_raw_sql(Statement::from_sql_and_values(
 750                    self.pool.get_database_backend(),
 751                    query.into(),
 752                    vec![like_string.into(), name_query.into(), limit.into()],
 753                ))
 754                .all(&*tx)
 755                .await?)
 756        })
 757        .await
 758    }
 759
 760    // signups
 761
 762    pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
 763        self.transaction(|tx| async move {
 764            signup::Entity::insert(signup::ActiveModel {
 765                email_address: ActiveValue::set(signup.email_address.clone()),
 766                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 767                email_confirmation_sent: ActiveValue::set(false),
 768                platform_mac: ActiveValue::set(signup.platform_mac),
 769                platform_windows: ActiveValue::set(signup.platform_windows),
 770                platform_linux: ActiveValue::set(signup.platform_linux),
 771                platform_unknown: ActiveValue::set(false),
 772                editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
 773                programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
 774                device_id: ActiveValue::set(signup.device_id.clone()),
 775                added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
 776                ..Default::default()
 777            })
 778            .on_conflict(
 779                OnConflict::column(signup::Column::EmailAddress)
 780                    .update_columns([
 781                        signup::Column::PlatformMac,
 782                        signup::Column::PlatformWindows,
 783                        signup::Column::PlatformLinux,
 784                        signup::Column::EditorFeatures,
 785                        signup::Column::ProgrammingLanguages,
 786                        signup::Column::DeviceId,
 787                        signup::Column::AddedToMailingList,
 788                    ])
 789                    .to_owned(),
 790            )
 791            .exec(&*tx)
 792            .await?;
 793            Ok(())
 794        })
 795        .await
 796    }
 797
 798    pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
 799        self.transaction(|tx| async move {
 800            let signup = signup::Entity::find()
 801                .filter(signup::Column::EmailAddress.eq(email_address))
 802                .one(&*tx)
 803                .await?
 804                .ok_or_else(|| {
 805                    anyhow!("signup with email address {} doesn't exist", email_address)
 806                })?;
 807
 808            Ok(signup)
 809        })
 810        .await
 811    }
 812
 813    pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
 814        self.transaction(|tx| async move {
 815            let query = "
 816                SELECT
 817                    COUNT(*) as count,
 818                    COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
 819                    COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
 820                    COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
 821                    COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
 822                FROM (
 823                    SELECT *
 824                    FROM signups
 825                    WHERE
 826                        NOT email_confirmation_sent
 827                ) AS unsent
 828            ";
 829            Ok(
 830                WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
 831                    self.pool.get_database_backend(),
 832                    query.into(),
 833                    vec![],
 834                ))
 835                .one(&*tx)
 836                .await?
 837                .ok_or_else(|| anyhow!("invalid result"))?,
 838            )
 839        })
 840        .await
 841    }
 842
 843    pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
 844        let emails = invites
 845            .iter()
 846            .map(|s| s.email_address.as_str())
 847            .collect::<Vec<_>>();
 848        self.transaction(|tx| async {
 849            let tx = tx;
 850            signup::Entity::update_many()
 851                .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
 852                .set(signup::ActiveModel {
 853                    email_confirmation_sent: ActiveValue::set(true),
 854                    ..Default::default()
 855                })
 856                .exec(&*tx)
 857                .await?;
 858            Ok(())
 859        })
 860        .await
 861    }
 862
 863    pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
 864        self.transaction(|tx| async move {
 865            Ok(signup::Entity::find()
 866                .select_only()
 867                .column(signup::Column::EmailAddress)
 868                .column(signup::Column::EmailConfirmationCode)
 869                .filter(
 870                    signup::Column::EmailConfirmationSent.eq(false).and(
 871                        signup::Column::PlatformMac
 872                            .eq(true)
 873                            .or(signup::Column::PlatformUnknown.eq(true)),
 874                    ),
 875                )
 876                .order_by_asc(signup::Column::CreatedAt)
 877                .limit(count as u64)
 878                .into_model()
 879                .all(&*tx)
 880                .await?)
 881        })
 882        .await
 883    }
 884
 885    // invite codes
 886
 887    pub async fn create_invite_from_code(
 888        &self,
 889        code: &str,
 890        email_address: &str,
 891        device_id: Option<&str>,
 892        added_to_mailing_list: bool,
 893    ) -> Result<Invite> {
 894        self.transaction(|tx| async move {
 895            let existing_user = user::Entity::find()
 896                .filter(user::Column::EmailAddress.eq(email_address))
 897                .one(&*tx)
 898                .await?;
 899
 900            if existing_user.is_some() {
 901                Err(anyhow!("email address is already in use"))?;
 902            }
 903
 904            let inviting_user_with_invites = match user::Entity::find()
 905                .filter(
 906                    user::Column::InviteCode
 907                        .eq(code)
 908                        .and(user::Column::InviteCount.gt(0)),
 909                )
 910                .one(&*tx)
 911                .await?
 912            {
 913                Some(inviting_user) => inviting_user,
 914                None => {
 915                    return Err(Error::Http(
 916                        StatusCode::UNAUTHORIZED,
 917                        "unable to find an invite code with invites remaining".to_string(),
 918                    ))?
 919                }
 920            };
 921            user::Entity::update_many()
 922                .filter(
 923                    user::Column::Id
 924                        .eq(inviting_user_with_invites.id)
 925                        .and(user::Column::InviteCount.gt(0)),
 926                )
 927                .col_expr(
 928                    user::Column::InviteCount,
 929                    Expr::col(user::Column::InviteCount).sub(1),
 930                )
 931                .exec(&*tx)
 932                .await?;
 933
 934            let signup = signup::Entity::insert(signup::ActiveModel {
 935                email_address: ActiveValue::set(email_address.into()),
 936                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 937                email_confirmation_sent: ActiveValue::set(false),
 938                inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
 939                platform_linux: ActiveValue::set(false),
 940                platform_mac: ActiveValue::set(false),
 941                platform_windows: ActiveValue::set(false),
 942                platform_unknown: ActiveValue::set(true),
 943                device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
 944                added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
 945                ..Default::default()
 946            })
 947            .on_conflict(
 948                OnConflict::column(signup::Column::EmailAddress)
 949                    .update_column(signup::Column::InvitingUserId)
 950                    .to_owned(),
 951            )
 952            .exec_with_returning(&*tx)
 953            .await?;
 954
 955            Ok(Invite {
 956                email_address: signup.email_address,
 957                email_confirmation_code: signup.email_confirmation_code,
 958            })
 959        })
 960        .await
 961    }
 962
 963    pub async fn create_user_from_invite(
 964        &self,
 965        invite: &Invite,
 966        user: NewUserParams,
 967    ) -> Result<Option<NewUserResult>> {
 968        self.transaction(|tx| async {
 969            let tx = tx;
 970            let signup = signup::Entity::find()
 971                .filter(
 972                    signup::Column::EmailAddress
 973                        .eq(invite.email_address.as_str())
 974                        .and(
 975                            signup::Column::EmailConfirmationCode
 976                                .eq(invite.email_confirmation_code.as_str()),
 977                        ),
 978                )
 979                .one(&*tx)
 980                .await?
 981                .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
 982
 983            if signup.user_id.is_some() {
 984                return Ok(None);
 985            }
 986
 987            let user = user::Entity::insert(user::ActiveModel {
 988                email_address: ActiveValue::set(Some(invite.email_address.clone())),
 989                github_login: ActiveValue::set(user.github_login.clone()),
 990                github_user_id: ActiveValue::set(Some(user.github_user_id)),
 991                admin: ActiveValue::set(false),
 992                invite_count: ActiveValue::set(user.invite_count),
 993                invite_code: ActiveValue::set(Some(random_invite_code())),
 994                metrics_id: ActiveValue::set(Uuid::new_v4()),
 995                ..Default::default()
 996            })
 997            .on_conflict(
 998                OnConflict::column(user::Column::GithubLogin)
 999                    .update_columns([
1000                        user::Column::EmailAddress,
1001                        user::Column::GithubUserId,
1002                        user::Column::Admin,
1003                    ])
1004                    .to_owned(),
1005            )
1006            .exec_with_returning(&*tx)
1007            .await?;
1008
1009            let mut signup = signup.into_active_model();
1010            signup.user_id = ActiveValue::set(Some(user.id));
1011            let signup = signup.update(&*tx).await?;
1012
1013            if let Some(inviting_user_id) = signup.inviting_user_id {
1014                let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1015                    (inviting_user_id, user.id, true)
1016                } else {
1017                    (user.id, inviting_user_id, false)
1018                };
1019
1020                contact::Entity::insert(contact::ActiveModel {
1021                    user_id_a: ActiveValue::set(user_id_a),
1022                    user_id_b: ActiveValue::set(user_id_b),
1023                    a_to_b: ActiveValue::set(a_to_b),
1024                    should_notify: ActiveValue::set(true),
1025                    accepted: ActiveValue::set(true),
1026                    ..Default::default()
1027                })
1028                .on_conflict(OnConflict::new().do_nothing().to_owned())
1029                .exec_without_returning(&*tx)
1030                .await?;
1031            }
1032
1033            Ok(Some(NewUserResult {
1034                user_id: user.id,
1035                metrics_id: user.metrics_id.to_string(),
1036                inviting_user_id: signup.inviting_user_id,
1037                signup_device_id: signup.device_id,
1038            }))
1039        })
1040        .await
1041    }
1042
1043    pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1044        self.transaction(|tx| async move {
1045            if count > 0 {
1046                user::Entity::update_many()
1047                    .filter(
1048                        user::Column::Id
1049                            .eq(id)
1050                            .and(user::Column::InviteCode.is_null()),
1051                    )
1052                    .set(user::ActiveModel {
1053                        invite_code: ActiveValue::set(Some(random_invite_code())),
1054                        ..Default::default()
1055                    })
1056                    .exec(&*tx)
1057                    .await?;
1058            }
1059
1060            user::Entity::update_many()
1061                .filter(user::Column::Id.eq(id))
1062                .set(user::ActiveModel {
1063                    invite_count: ActiveValue::set(count),
1064                    ..Default::default()
1065                })
1066                .exec(&*tx)
1067                .await?;
1068            Ok(())
1069        })
1070        .await
1071    }
1072
1073    pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1074        self.transaction(|tx| async move {
1075            match user::Entity::find_by_id(id).one(&*tx).await? {
1076                Some(user) if user.invite_code.is_some() => {
1077                    Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1078                }
1079                _ => Ok(None),
1080            }
1081        })
1082        .await
1083    }
1084
1085    pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1086        self.transaction(|tx| async move {
1087            user::Entity::find()
1088                .filter(user::Column::InviteCode.eq(code))
1089                .one(&*tx)
1090                .await?
1091                .ok_or_else(|| {
1092                    Error::Http(
1093                        StatusCode::NOT_FOUND,
1094                        "that invite code does not exist".to_string(),
1095                    )
1096                })
1097        })
1098        .await
1099    }
1100
1101    // rooms
1102
1103    pub async fn incoming_call_for_user(
1104        &self,
1105        user_id: UserId,
1106    ) -> Result<Option<proto::IncomingCall>> {
1107        self.transaction(|tx| async move {
1108            let pending_participant = room_participant::Entity::find()
1109                .filter(
1110                    room_participant::Column::UserId
1111                        .eq(user_id)
1112                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1113                )
1114                .one(&*tx)
1115                .await?;
1116
1117            if let Some(pending_participant) = pending_participant {
1118                let room = self.get_room(pending_participant.room_id, &tx).await?;
1119                Ok(Self::build_incoming_call(&room, user_id))
1120            } else {
1121                Ok(None)
1122            }
1123        })
1124        .await
1125    }
1126
1127    pub async fn create_room(
1128        &self,
1129        user_id: UserId,
1130        connection: ConnectionId,
1131        live_kit_room: &str,
1132    ) -> Result<RoomGuard<proto::Room>> {
1133        self.room_transaction(|tx| async move {
1134            let room = room::ActiveModel {
1135                live_kit_room: ActiveValue::set(live_kit_room.into()),
1136                ..Default::default()
1137            }
1138            .insert(&*tx)
1139            .await?;
1140            let room_id = room.id;
1141
1142            room_participant::ActiveModel {
1143                room_id: ActiveValue::set(room_id),
1144                user_id: ActiveValue::set(user_id),
1145                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1146                answering_connection_server_id: ActiveValue::set(Some(ServerId(
1147                    connection.owner_id as i32,
1148                ))),
1149                answering_connection_lost: ActiveValue::set(false),
1150                calling_user_id: ActiveValue::set(user_id),
1151                calling_connection_id: ActiveValue::set(connection.id as i32),
1152                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1153                    connection.owner_id as i32,
1154                ))),
1155                ..Default::default()
1156            }
1157            .insert(&*tx)
1158            .await?;
1159
1160            let room = self.get_room(room_id, &tx).await?;
1161            Ok((room_id, room))
1162        })
1163        .await
1164    }
1165
1166    pub async fn call(
1167        &self,
1168        room_id: RoomId,
1169        calling_user_id: UserId,
1170        calling_connection: ConnectionId,
1171        called_user_id: UserId,
1172        initial_project_id: Option<ProjectId>,
1173    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1174        self.room_transaction(|tx| async move {
1175            room_participant::ActiveModel {
1176                room_id: ActiveValue::set(room_id),
1177                user_id: ActiveValue::set(called_user_id),
1178                answering_connection_lost: ActiveValue::set(false),
1179                calling_user_id: ActiveValue::set(calling_user_id),
1180                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1181                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1182                    calling_connection.owner_id as i32,
1183                ))),
1184                initial_project_id: ActiveValue::set(initial_project_id),
1185                ..Default::default()
1186            }
1187            .insert(&*tx)
1188            .await?;
1189
1190            let room = self.get_room(room_id, &tx).await?;
1191            let incoming_call = Self::build_incoming_call(&room, called_user_id)
1192                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1193            Ok((room_id, (room, incoming_call)))
1194        })
1195        .await
1196    }
1197
1198    pub async fn call_failed(
1199        &self,
1200        room_id: RoomId,
1201        called_user_id: UserId,
1202    ) -> Result<RoomGuard<proto::Room>> {
1203        self.room_transaction(|tx| async move {
1204            room_participant::Entity::delete_many()
1205                .filter(
1206                    room_participant::Column::RoomId
1207                        .eq(room_id)
1208                        .and(room_participant::Column::UserId.eq(called_user_id)),
1209                )
1210                .exec(&*tx)
1211                .await?;
1212            let room = self.get_room(room_id, &tx).await?;
1213            Ok((room_id, room))
1214        })
1215        .await
1216    }
1217
1218    pub async fn decline_call(
1219        &self,
1220        expected_room_id: Option<RoomId>,
1221        user_id: UserId,
1222    ) -> Result<Option<RoomGuard<proto::Room>>> {
1223        self.optional_room_transaction(|tx| async move {
1224            let mut filter = Condition::all()
1225                .add(room_participant::Column::UserId.eq(user_id))
1226                .add(room_participant::Column::AnsweringConnectionId.is_null());
1227            if let Some(room_id) = expected_room_id {
1228                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1229            }
1230            let participant = room_participant::Entity::find()
1231                .filter(filter)
1232                .one(&*tx)
1233                .await?;
1234
1235            let participant = if let Some(participant) = participant {
1236                participant
1237            } else if expected_room_id.is_some() {
1238                return Err(anyhow!("could not find call to decline"))?;
1239            } else {
1240                return Ok(None);
1241            };
1242
1243            let room_id = participant.room_id;
1244            room_participant::Entity::delete(participant.into_active_model())
1245                .exec(&*tx)
1246                .await?;
1247
1248            let room = self.get_room(room_id, &tx).await?;
1249            Ok(Some((room_id, room)))
1250        })
1251        .await
1252    }
1253
1254    pub async fn cancel_call(
1255        &self,
1256        room_id: RoomId,
1257        calling_connection: ConnectionId,
1258        called_user_id: UserId,
1259    ) -> Result<RoomGuard<proto::Room>> {
1260        self.room_transaction(|tx| async move {
1261            let participant = room_participant::Entity::find()
1262                .filter(
1263                    Condition::all()
1264                        .add(room_participant::Column::UserId.eq(called_user_id))
1265                        .add(room_participant::Column::RoomId.eq(room_id))
1266                        .add(
1267                            room_participant::Column::CallingConnectionId
1268                                .eq(calling_connection.id as i32),
1269                        )
1270                        .add(
1271                            room_participant::Column::CallingConnectionServerId
1272                                .eq(calling_connection.owner_id as i32),
1273                        )
1274                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1275                )
1276                .one(&*tx)
1277                .await?
1278                .ok_or_else(|| anyhow!("no call to cancel"))?;
1279            let room_id = participant.room_id;
1280
1281            room_participant::Entity::delete(participant.into_active_model())
1282                .exec(&*tx)
1283                .await?;
1284
1285            let room = self.get_room(room_id, &tx).await?;
1286            Ok((room_id, room))
1287        })
1288        .await
1289    }
1290
1291    pub async fn join_room(
1292        &self,
1293        room_id: RoomId,
1294        user_id: UserId,
1295        connection: ConnectionId,
1296    ) -> Result<RoomGuard<proto::Room>> {
1297        self.room_transaction(|tx| async move {
1298            let result = room_participant::Entity::update_many()
1299                .filter(
1300                    Condition::all()
1301                        .add(room_participant::Column::RoomId.eq(room_id))
1302                        .add(room_participant::Column::UserId.eq(user_id))
1303                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1304                )
1305                .set(room_participant::ActiveModel {
1306                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1307                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1308                        connection.owner_id as i32,
1309                    ))),
1310                    answering_connection_lost: ActiveValue::set(false),
1311                    ..Default::default()
1312                })
1313                .exec(&*tx)
1314                .await?;
1315            if result.rows_affected == 0 {
1316                Err(anyhow!("room does not exist or was already joined"))?
1317            } else {
1318                let room = self.get_room(room_id, &tx).await?;
1319                Ok((room_id, room))
1320            }
1321        })
1322        .await
1323    }
1324
1325    pub async fn rejoin_room(
1326        &self,
1327        rejoin_room: proto::RejoinRoom,
1328        user_id: UserId,
1329        connection: ConnectionId,
1330    ) -> Result<RoomGuard<RejoinedRoom>> {
1331        self.room_transaction(|tx| async {
1332            let tx = tx;
1333            let room_id = RoomId::from_proto(rejoin_room.id);
1334            let participant_update = room_participant::Entity::update_many()
1335                .filter(
1336                    Condition::all()
1337                        .add(room_participant::Column::RoomId.eq(room_id))
1338                        .add(room_participant::Column::UserId.eq(user_id))
1339                        .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1340                        .add(
1341                            Condition::any()
1342                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1343                                .add(
1344                                    room_participant::Column::AnsweringConnectionServerId
1345                                        .ne(connection.owner_id as i32),
1346                                ),
1347                        ),
1348                )
1349                .set(room_participant::ActiveModel {
1350                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1351                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1352                        connection.owner_id as i32,
1353                    ))),
1354                    answering_connection_lost: ActiveValue::set(false),
1355                    ..Default::default()
1356                })
1357                .exec(&*tx)
1358                .await?;
1359            if participant_update.rows_affected == 0 {
1360                return Err(anyhow!("room does not exist or was already joined"))?;
1361            }
1362
1363            let mut reshared_projects = Vec::new();
1364            for reshared_project in &rejoin_room.reshared_projects {
1365                let project_id = ProjectId::from_proto(reshared_project.project_id);
1366                let project = project::Entity::find_by_id(project_id)
1367                    .one(&*tx)
1368                    .await?
1369                    .ok_or_else(|| anyhow!("project does not exist"))?;
1370                if project.host_user_id != user_id {
1371                    return Err(anyhow!("no such project"))?;
1372                }
1373
1374                let mut collaborators = project
1375                    .find_related(project_collaborator::Entity)
1376                    .all(&*tx)
1377                    .await?;
1378                let host_ix = collaborators
1379                    .iter()
1380                    .position(|collaborator| {
1381                        collaborator.user_id == user_id && collaborator.is_host
1382                    })
1383                    .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1384                let host = collaborators.swap_remove(host_ix);
1385                let old_connection_id = host.connection();
1386
1387                project::Entity::update(project::ActiveModel {
1388                    host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1389                    host_connection_server_id: ActiveValue::set(Some(ServerId(
1390                        connection.owner_id as i32,
1391                    ))),
1392                    ..project.into_active_model()
1393                })
1394                .exec(&*tx)
1395                .await?;
1396                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1397                    connection_id: ActiveValue::set(connection.id as i32),
1398                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1399                    ..host.into_active_model()
1400                })
1401                .exec(&*tx)
1402                .await?;
1403
1404                self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1405                    .await?;
1406
1407                reshared_projects.push(ResharedProject {
1408                    id: project_id,
1409                    old_connection_id,
1410                    collaborators: collaborators
1411                        .iter()
1412                        .map(|collaborator| ProjectCollaborator {
1413                            connection_id: collaborator.connection(),
1414                            user_id: collaborator.user_id,
1415                            replica_id: collaborator.replica_id,
1416                            is_host: collaborator.is_host,
1417                        })
1418                        .collect(),
1419                    worktrees: reshared_project.worktrees.clone(),
1420                });
1421            }
1422
1423            project::Entity::delete_many()
1424                .filter(
1425                    Condition::all()
1426                        .add(project::Column::RoomId.eq(room_id))
1427                        .add(project::Column::HostUserId.eq(user_id))
1428                        .add(
1429                            project::Column::Id
1430                                .is_not_in(reshared_projects.iter().map(|project| project.id)),
1431                        ),
1432                )
1433                .exec(&*tx)
1434                .await?;
1435
1436            let mut rejoined_projects = Vec::new();
1437            for rejoined_project in &rejoin_room.rejoined_projects {
1438                let project_id = ProjectId::from_proto(rejoined_project.id);
1439                let Some(project) = project::Entity::find_by_id(project_id)
1440                    .one(&*tx)
1441                    .await? else { continue };
1442
1443                let mut worktrees = Vec::new();
1444                let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1445                for db_worktree in db_worktrees {
1446                    let mut worktree = RejoinedWorktree {
1447                        id: db_worktree.id as u64,
1448                        abs_path: db_worktree.abs_path,
1449                        root_name: db_worktree.root_name,
1450                        visible: db_worktree.visible,
1451                        updated_entries: Default::default(),
1452                        removed_entries: Default::default(),
1453                        diagnostic_summaries: Default::default(),
1454                        scan_id: db_worktree.scan_id as u64,
1455                        completed_scan_id: db_worktree.completed_scan_id as u64,
1456                    };
1457
1458                    let rejoined_worktree = rejoined_project
1459                        .worktrees
1460                        .iter()
1461                        .find(|worktree| worktree.id == db_worktree.id as u64);
1462                    let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1463                        worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1464                    } else {
1465                        worktree_entry::Column::IsDeleted.eq(false)
1466                    };
1467
1468                    let mut db_entries = worktree_entry::Entity::find()
1469                        .filter(
1470                            Condition::all()
1471                                .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1472                                .add(entry_filter),
1473                        )
1474                        .stream(&*tx)
1475                        .await?;
1476
1477                    while let Some(db_entry) = db_entries.next().await {
1478                        let db_entry = db_entry?;
1479                        if db_entry.is_deleted {
1480                            worktree.removed_entries.push(db_entry.id as u64);
1481                        } else {
1482                            worktree.updated_entries.push(proto::Entry {
1483                                id: db_entry.id as u64,
1484                                is_dir: db_entry.is_dir,
1485                                path: db_entry.path,
1486                                inode: db_entry.inode as u64,
1487                                mtime: Some(proto::Timestamp {
1488                                    seconds: db_entry.mtime_seconds as u64,
1489                                    nanos: db_entry.mtime_nanos as u32,
1490                                }),
1491                                is_symlink: db_entry.is_symlink,
1492                                is_ignored: db_entry.is_ignored,
1493                            });
1494                        }
1495                    }
1496
1497                    worktrees.push(worktree);
1498                }
1499
1500                let language_servers = project
1501                    .find_related(language_server::Entity)
1502                    .all(&*tx)
1503                    .await?
1504                    .into_iter()
1505                    .map(|language_server| proto::LanguageServer {
1506                        id: language_server.id as u64,
1507                        name: language_server.name,
1508                    })
1509                    .collect::<Vec<_>>();
1510
1511                let mut collaborators = project
1512                    .find_related(project_collaborator::Entity)
1513                    .all(&*tx)
1514                    .await?;
1515                let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1516                    .iter()
1517                    .position(|collaborator| collaborator.user_id == user_id)
1518                {
1519                    collaborators.swap_remove(self_collaborator_ix)
1520                } else {
1521                    continue;
1522                };
1523                let old_connection_id = self_collaborator.connection();
1524                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1525                    connection_id: ActiveValue::set(connection.id as i32),
1526                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1527                    ..self_collaborator.into_active_model()
1528                })
1529                .exec(&*tx)
1530                .await?;
1531
1532                let collaborators = collaborators
1533                    .into_iter()
1534                    .map(|collaborator| ProjectCollaborator {
1535                        connection_id: collaborator.connection(),
1536                        user_id: collaborator.user_id,
1537                        replica_id: collaborator.replica_id,
1538                        is_host: collaborator.is_host,
1539                    })
1540                    .collect::<Vec<_>>();
1541
1542                rejoined_projects.push(RejoinedProject {
1543                    id: project_id,
1544                    old_connection_id,
1545                    collaborators,
1546                    worktrees,
1547                    language_servers,
1548                });
1549            }
1550
1551            let room = self.get_room(room_id, &tx).await?;
1552            Ok((
1553                room_id,
1554                RejoinedRoom {
1555                    room,
1556                    rejoined_projects,
1557                    reshared_projects,
1558                },
1559            ))
1560        })
1561        .await
1562    }
1563
1564    pub async fn leave_room(
1565        &self,
1566        connection: ConnectionId,
1567    ) -> Result<Option<RoomGuard<LeftRoom>>> {
1568        self.optional_room_transaction(|tx| async move {
1569            let leaving_participant = room_participant::Entity::find()
1570                .filter(
1571                    Condition::all()
1572                        .add(
1573                            room_participant::Column::AnsweringConnectionId
1574                                .eq(connection.id as i32),
1575                        )
1576                        .add(
1577                            room_participant::Column::AnsweringConnectionServerId
1578                                .eq(connection.owner_id as i32),
1579                        ),
1580                )
1581                .one(&*tx)
1582                .await?;
1583
1584            if let Some(leaving_participant) = leaving_participant {
1585                // Leave room.
1586                let room_id = leaving_participant.room_id;
1587                room_participant::Entity::delete_by_id(leaving_participant.id)
1588                    .exec(&*tx)
1589                    .await?;
1590
1591                // Cancel pending calls initiated by the leaving user.
1592                let called_participants = room_participant::Entity::find()
1593                    .filter(
1594                        Condition::all()
1595                            .add(
1596                                room_participant::Column::CallingConnectionId
1597                                    .eq(connection.id as i32),
1598                            )
1599                            .add(
1600                                room_participant::Column::CallingConnectionServerId
1601                                    .eq(connection.owner_id as i32),
1602                            )
1603                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
1604                    )
1605                    .all(&*tx)
1606                    .await?;
1607                room_participant::Entity::delete_many()
1608                    .filter(
1609                        room_participant::Column::Id
1610                            .is_in(called_participants.iter().map(|participant| participant.id)),
1611                    )
1612                    .exec(&*tx)
1613                    .await?;
1614                let canceled_calls_to_user_ids = called_participants
1615                    .into_iter()
1616                    .map(|participant| participant.user_id)
1617                    .collect();
1618
1619                // Detect left projects.
1620                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1621                enum QueryProjectIds {
1622                    ProjectId,
1623                }
1624                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1625                    .select_only()
1626                    .column_as(
1627                        project_collaborator::Column::ProjectId,
1628                        QueryProjectIds::ProjectId,
1629                    )
1630                    .filter(
1631                        Condition::all()
1632                            .add(
1633                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1634                            )
1635                            .add(
1636                                project_collaborator::Column::ConnectionServerId
1637                                    .eq(connection.owner_id as i32),
1638                            ),
1639                    )
1640                    .into_values::<_, QueryProjectIds>()
1641                    .all(&*tx)
1642                    .await?;
1643                let mut left_projects = HashMap::default();
1644                let mut collaborators = project_collaborator::Entity::find()
1645                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1646                    .stream(&*tx)
1647                    .await?;
1648                while let Some(collaborator) = collaborators.next().await {
1649                    let collaborator = collaborator?;
1650                    let left_project =
1651                        left_projects
1652                            .entry(collaborator.project_id)
1653                            .or_insert(LeftProject {
1654                                id: collaborator.project_id,
1655                                host_user_id: Default::default(),
1656                                connection_ids: Default::default(),
1657                                host_connection_id: Default::default(),
1658                            });
1659
1660                    let collaborator_connection_id = collaborator.connection();
1661                    if collaborator_connection_id != connection {
1662                        left_project.connection_ids.push(collaborator_connection_id);
1663                    }
1664
1665                    if collaborator.is_host {
1666                        left_project.host_user_id = collaborator.user_id;
1667                        left_project.host_connection_id = collaborator_connection_id;
1668                    }
1669                }
1670                drop(collaborators);
1671
1672                // Leave projects.
1673                project_collaborator::Entity::delete_many()
1674                    .filter(
1675                        Condition::all()
1676                            .add(
1677                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1678                            )
1679                            .add(
1680                                project_collaborator::Column::ConnectionServerId
1681                                    .eq(connection.owner_id as i32),
1682                            ),
1683                    )
1684                    .exec(&*tx)
1685                    .await?;
1686
1687                // Unshare projects.
1688                project::Entity::delete_many()
1689                    .filter(
1690                        Condition::all()
1691                            .add(project::Column::RoomId.eq(room_id))
1692                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
1693                            .add(
1694                                project::Column::HostConnectionServerId
1695                                    .eq(connection.owner_id as i32),
1696                            ),
1697                    )
1698                    .exec(&*tx)
1699                    .await?;
1700
1701                let room = self.get_room(room_id, &tx).await?;
1702                if room.participants.is_empty() {
1703                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
1704                }
1705
1706                let left_room = LeftRoom {
1707                    room,
1708                    left_projects,
1709                    canceled_calls_to_user_ids,
1710                };
1711
1712                if left_room.room.participants.is_empty() {
1713                    self.rooms.remove(&room_id);
1714                }
1715
1716                Ok(Some((room_id, left_room)))
1717            } else {
1718                Ok(None)
1719            }
1720        })
1721        .await
1722    }
1723
1724    pub async fn update_room_participant_location(
1725        &self,
1726        room_id: RoomId,
1727        connection: ConnectionId,
1728        location: proto::ParticipantLocation,
1729    ) -> Result<RoomGuard<proto::Room>> {
1730        self.room_transaction(|tx| async {
1731            let tx = tx;
1732            let location_kind;
1733            let location_project_id;
1734            match location
1735                .variant
1736                .as_ref()
1737                .ok_or_else(|| anyhow!("invalid location"))?
1738            {
1739                proto::participant_location::Variant::SharedProject(project) => {
1740                    location_kind = 0;
1741                    location_project_id = Some(ProjectId::from_proto(project.id));
1742                }
1743                proto::participant_location::Variant::UnsharedProject(_) => {
1744                    location_kind = 1;
1745                    location_project_id = None;
1746                }
1747                proto::participant_location::Variant::External(_) => {
1748                    location_kind = 2;
1749                    location_project_id = None;
1750                }
1751            }
1752
1753            let result = room_participant::Entity::update_many()
1754                .filter(
1755                    Condition::all()
1756                        .add(room_participant::Column::RoomId.eq(room_id))
1757                        .add(
1758                            room_participant::Column::AnsweringConnectionId
1759                                .eq(connection.id as i32),
1760                        )
1761                        .add(
1762                            room_participant::Column::AnsweringConnectionServerId
1763                                .eq(connection.owner_id as i32),
1764                        ),
1765                )
1766                .set(room_participant::ActiveModel {
1767                    location_kind: ActiveValue::set(Some(location_kind)),
1768                    location_project_id: ActiveValue::set(location_project_id),
1769                    ..Default::default()
1770                })
1771                .exec(&*tx)
1772                .await?;
1773
1774            if result.rows_affected == 1 {
1775                let room = self.get_room(room_id, &tx).await?;
1776                Ok((room_id, room))
1777            } else {
1778                Err(anyhow!("could not update room participant location"))?
1779            }
1780        })
1781        .await
1782    }
1783
1784    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1785        self.transaction(|tx| async move {
1786            let participant = room_participant::Entity::find()
1787                .filter(
1788                    Condition::all()
1789                        .add(
1790                            room_participant::Column::AnsweringConnectionId
1791                                .eq(connection.id as i32),
1792                        )
1793                        .add(
1794                            room_participant::Column::AnsweringConnectionServerId
1795                                .eq(connection.owner_id as i32),
1796                        ),
1797                )
1798                .one(&*tx)
1799                .await?
1800                .ok_or_else(|| anyhow!("not a participant in any room"))?;
1801
1802            room_participant::Entity::update(room_participant::ActiveModel {
1803                answering_connection_lost: ActiveValue::set(true),
1804                ..participant.into_active_model()
1805            })
1806            .exec(&*tx)
1807            .await?;
1808
1809            Ok(())
1810        })
1811        .await
1812    }
1813
1814    fn build_incoming_call(
1815        room: &proto::Room,
1816        called_user_id: UserId,
1817    ) -> Option<proto::IncomingCall> {
1818        let pending_participant = room
1819            .pending_participants
1820            .iter()
1821            .find(|participant| participant.user_id == called_user_id.to_proto())?;
1822
1823        Some(proto::IncomingCall {
1824            room_id: room.id,
1825            calling_user_id: pending_participant.calling_user_id,
1826            participant_user_ids: room
1827                .participants
1828                .iter()
1829                .map(|participant| participant.user_id)
1830                .collect(),
1831            initial_project: room.participants.iter().find_map(|participant| {
1832                let initial_project_id = pending_participant.initial_project_id?;
1833                participant
1834                    .projects
1835                    .iter()
1836                    .find(|project| project.id == initial_project_id)
1837                    .cloned()
1838            }),
1839        })
1840    }
1841
1842    async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1843        let db_room = room::Entity::find_by_id(room_id)
1844            .one(tx)
1845            .await?
1846            .ok_or_else(|| anyhow!("could not find room"))?;
1847
1848        let mut db_participants = db_room
1849            .find_related(room_participant::Entity)
1850            .stream(tx)
1851            .await?;
1852        let mut participants = HashMap::default();
1853        let mut pending_participants = Vec::new();
1854        while let Some(db_participant) = db_participants.next().await {
1855            let db_participant = db_participant?;
1856            if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
1857                .answering_connection_id
1858                .zip(db_participant.answering_connection_server_id)
1859            {
1860                let location = match (
1861                    db_participant.location_kind,
1862                    db_participant.location_project_id,
1863                ) {
1864                    (Some(0), Some(project_id)) => {
1865                        Some(proto::participant_location::Variant::SharedProject(
1866                            proto::participant_location::SharedProject {
1867                                id: project_id.to_proto(),
1868                            },
1869                        ))
1870                    }
1871                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1872                        Default::default(),
1873                    )),
1874                    _ => Some(proto::participant_location::Variant::External(
1875                        Default::default(),
1876                    )),
1877                };
1878
1879                let answering_connection = ConnectionId {
1880                    owner_id: answering_connection_server_id.0 as u32,
1881                    id: answering_connection_id as u32,
1882                };
1883                participants.insert(
1884                    answering_connection,
1885                    proto::Participant {
1886                        user_id: db_participant.user_id.to_proto(),
1887                        peer_id: Some(answering_connection.into()),
1888                        projects: Default::default(),
1889                        location: Some(proto::ParticipantLocation { variant: location }),
1890                    },
1891                );
1892            } else {
1893                pending_participants.push(proto::PendingParticipant {
1894                    user_id: db_participant.user_id.to_proto(),
1895                    calling_user_id: db_participant.calling_user_id.to_proto(),
1896                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1897                });
1898            }
1899        }
1900        drop(db_participants);
1901
1902        let mut db_projects = db_room
1903            .find_related(project::Entity)
1904            .find_with_related(worktree::Entity)
1905            .stream(tx)
1906            .await?;
1907
1908        while let Some(row) = db_projects.next().await {
1909            let (db_project, db_worktree) = row?;
1910            let host_connection = db_project.host_connection()?;
1911            if let Some(participant) = participants.get_mut(&host_connection) {
1912                let project = if let Some(project) = participant
1913                    .projects
1914                    .iter_mut()
1915                    .find(|project| project.id == db_project.id.to_proto())
1916                {
1917                    project
1918                } else {
1919                    participant.projects.push(proto::ParticipantProject {
1920                        id: db_project.id.to_proto(),
1921                        worktree_root_names: Default::default(),
1922                    });
1923                    participant.projects.last_mut().unwrap()
1924                };
1925
1926                if let Some(db_worktree) = db_worktree {
1927                    if db_worktree.visible {
1928                        project.worktree_root_names.push(db_worktree.root_name);
1929                    }
1930                }
1931            }
1932        }
1933
1934        Ok(proto::Room {
1935            id: db_room.id.to_proto(),
1936            live_kit_room: db_room.live_kit_room,
1937            participants: participants.into_values().collect(),
1938            pending_participants,
1939        })
1940    }
1941
1942    // projects
1943
1944    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
1945        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1946        enum QueryAs {
1947            Count,
1948        }
1949
1950        self.transaction(|tx| async move {
1951            Ok(project::Entity::find()
1952                .select_only()
1953                .column_as(project::Column::Id.count(), QueryAs::Count)
1954                .inner_join(user::Entity)
1955                .filter(user::Column::Admin.eq(false))
1956                .into_values::<_, QueryAs>()
1957                .one(&*tx)
1958                .await?
1959                .unwrap_or(0i64) as usize)
1960        })
1961        .await
1962    }
1963
1964    pub async fn share_project(
1965        &self,
1966        room_id: RoomId,
1967        connection: ConnectionId,
1968        worktrees: &[proto::WorktreeMetadata],
1969    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
1970        self.room_transaction(|tx| async move {
1971            let participant = room_participant::Entity::find()
1972                .filter(
1973                    Condition::all()
1974                        .add(
1975                            room_participant::Column::AnsweringConnectionId
1976                                .eq(connection.id as i32),
1977                        )
1978                        .add(
1979                            room_participant::Column::AnsweringConnectionServerId
1980                                .eq(connection.owner_id as i32),
1981                        ),
1982                )
1983                .one(&*tx)
1984                .await?
1985                .ok_or_else(|| anyhow!("could not find participant"))?;
1986            if participant.room_id != room_id {
1987                return Err(anyhow!("shared project on unexpected room"))?;
1988            }
1989
1990            let project = project::ActiveModel {
1991                room_id: ActiveValue::set(participant.room_id),
1992                host_user_id: ActiveValue::set(participant.user_id),
1993                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1994                host_connection_server_id: ActiveValue::set(Some(ServerId(
1995                    connection.owner_id as i32,
1996                ))),
1997                ..Default::default()
1998            }
1999            .insert(&*tx)
2000            .await?;
2001
2002            if !worktrees.is_empty() {
2003                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2004                    worktree::ActiveModel {
2005                        id: ActiveValue::set(worktree.id as i64),
2006                        project_id: ActiveValue::set(project.id),
2007                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
2008                        root_name: ActiveValue::set(worktree.root_name.clone()),
2009                        visible: ActiveValue::set(worktree.visible),
2010                        scan_id: ActiveValue::set(0),
2011                        completed_scan_id: ActiveValue::set(0),
2012                    }
2013                }))
2014                .exec(&*tx)
2015                .await?;
2016            }
2017
2018            project_collaborator::ActiveModel {
2019                project_id: ActiveValue::set(project.id),
2020                connection_id: ActiveValue::set(connection.id as i32),
2021                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2022                user_id: ActiveValue::set(participant.user_id),
2023                replica_id: ActiveValue::set(ReplicaId(0)),
2024                is_host: ActiveValue::set(true),
2025                ..Default::default()
2026            }
2027            .insert(&*tx)
2028            .await?;
2029
2030            let room = self.get_room(room_id, &tx).await?;
2031            Ok((room_id, (project.id, room)))
2032        })
2033        .await
2034    }
2035
2036    pub async fn unshare_project(
2037        &self,
2038        project_id: ProjectId,
2039        connection: ConnectionId,
2040    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2041        self.room_transaction(|tx| async move {
2042            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2043
2044            let project = project::Entity::find_by_id(project_id)
2045                .one(&*tx)
2046                .await?
2047                .ok_or_else(|| anyhow!("project not found"))?;
2048            if project.host_connection()? == connection {
2049                let room_id = project.room_id;
2050                project::Entity::delete(project.into_active_model())
2051                    .exec(&*tx)
2052                    .await?;
2053                let room = self.get_room(room_id, &tx).await?;
2054                Ok((room_id, (room, guest_connection_ids)))
2055            } else {
2056                Err(anyhow!("cannot unshare a project hosted by another user"))?
2057            }
2058        })
2059        .await
2060    }
2061
2062    pub async fn update_project(
2063        &self,
2064        project_id: ProjectId,
2065        connection: ConnectionId,
2066        worktrees: &[proto::WorktreeMetadata],
2067    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2068        self.room_transaction(|tx| async move {
2069            let project = project::Entity::find_by_id(project_id)
2070                .filter(
2071                    Condition::all()
2072                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2073                        .add(
2074                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2075                        ),
2076                )
2077                .one(&*tx)
2078                .await?
2079                .ok_or_else(|| anyhow!("no such project"))?;
2080
2081            self.update_project_worktrees(project.id, worktrees, &tx)
2082                .await?;
2083
2084            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2085            let room = self.get_room(project.room_id, &tx).await?;
2086            Ok((project.room_id, (room, guest_connection_ids)))
2087        })
2088        .await
2089    }
2090
2091    async fn update_project_worktrees(
2092        &self,
2093        project_id: ProjectId,
2094        worktrees: &[proto::WorktreeMetadata],
2095        tx: &DatabaseTransaction,
2096    ) -> Result<()> {
2097        if !worktrees.is_empty() {
2098            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2099                id: ActiveValue::set(worktree.id as i64),
2100                project_id: ActiveValue::set(project_id),
2101                abs_path: ActiveValue::set(worktree.abs_path.clone()),
2102                root_name: ActiveValue::set(worktree.root_name.clone()),
2103                visible: ActiveValue::set(worktree.visible),
2104                scan_id: ActiveValue::set(0),
2105                completed_scan_id: ActiveValue::set(0),
2106            }))
2107            .on_conflict(
2108                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2109                    .update_column(worktree::Column::RootName)
2110                    .to_owned(),
2111            )
2112            .exec(&*tx)
2113            .await?;
2114        }
2115
2116        worktree::Entity::delete_many()
2117            .filter(worktree::Column::ProjectId.eq(project_id).and(
2118                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2119            ))
2120            .exec(&*tx)
2121            .await?;
2122
2123        Ok(())
2124    }
2125
2126    pub async fn update_worktree(
2127        &self,
2128        update: &proto::UpdateWorktree,
2129        connection: ConnectionId,
2130    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2131        self.room_transaction(|tx| async move {
2132            let project_id = ProjectId::from_proto(update.project_id);
2133            let worktree_id = update.worktree_id as i64;
2134
2135            // Ensure the update comes from the host.
2136            let project = project::Entity::find_by_id(project_id)
2137                .filter(
2138                    Condition::all()
2139                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2140                        .add(
2141                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2142                        ),
2143                )
2144                .one(&*tx)
2145                .await?
2146                .ok_or_else(|| anyhow!("no such project"))?;
2147            let room_id = project.room_id;
2148
2149            // Update metadata.
2150            worktree::Entity::update(worktree::ActiveModel {
2151                id: ActiveValue::set(worktree_id),
2152                project_id: ActiveValue::set(project_id),
2153                root_name: ActiveValue::set(update.root_name.clone()),
2154                scan_id: ActiveValue::set(update.scan_id as i64),
2155                completed_scan_id: if update.is_last_update {
2156                    ActiveValue::set(update.scan_id as i64)
2157                } else {
2158                    ActiveValue::default()
2159                },
2160                abs_path: ActiveValue::set(update.abs_path.clone()),
2161                ..Default::default()
2162            })
2163            .exec(&*tx)
2164            .await?;
2165
2166            if !update.updated_entries.is_empty() {
2167                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2168                    let mtime = entry.mtime.clone().unwrap_or_default();
2169                    worktree_entry::ActiveModel {
2170                        project_id: ActiveValue::set(project_id),
2171                        worktree_id: ActiveValue::set(worktree_id),
2172                        id: ActiveValue::set(entry.id as i64),
2173                        is_dir: ActiveValue::set(entry.is_dir),
2174                        path: ActiveValue::set(entry.path.clone()),
2175                        inode: ActiveValue::set(entry.inode as i64),
2176                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2177                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2178                        is_symlink: ActiveValue::set(entry.is_symlink),
2179                        is_ignored: ActiveValue::set(entry.is_ignored),
2180                        is_deleted: ActiveValue::set(false),
2181                        scan_id: ActiveValue::set(update.scan_id as i64),
2182                    }
2183                }))
2184                .on_conflict(
2185                    OnConflict::columns([
2186                        worktree_entry::Column::ProjectId,
2187                        worktree_entry::Column::WorktreeId,
2188                        worktree_entry::Column::Id,
2189                    ])
2190                    .update_columns([
2191                        worktree_entry::Column::IsDir,
2192                        worktree_entry::Column::Path,
2193                        worktree_entry::Column::Inode,
2194                        worktree_entry::Column::MtimeSeconds,
2195                        worktree_entry::Column::MtimeNanos,
2196                        worktree_entry::Column::IsSymlink,
2197                        worktree_entry::Column::IsIgnored,
2198                        worktree_entry::Column::ScanId,
2199                    ])
2200                    .to_owned(),
2201                )
2202                .exec(&*tx)
2203                .await?;
2204            }
2205
2206            if !update.removed_entries.is_empty() {
2207                worktree_entry::Entity::update_many()
2208                    .filter(
2209                        worktree_entry::Column::ProjectId
2210                            .eq(project_id)
2211                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2212                            .and(
2213                                worktree_entry::Column::Id
2214                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2215                            ),
2216                    )
2217                    .set(worktree_entry::ActiveModel {
2218                        is_deleted: ActiveValue::Set(true),
2219                        scan_id: ActiveValue::Set(update.scan_id as i64),
2220                        ..Default::default()
2221                    })
2222                    .exec(&*tx)
2223                    .await?;
2224            }
2225
2226            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2227            Ok((room_id, connection_ids))
2228        })
2229        .await
2230    }
2231
2232    pub async fn update_diagnostic_summary(
2233        &self,
2234        update: &proto::UpdateDiagnosticSummary,
2235        connection: ConnectionId,
2236    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2237        self.room_transaction(|tx| async move {
2238            let project_id = ProjectId::from_proto(update.project_id);
2239            let worktree_id = update.worktree_id as i64;
2240            let summary = update
2241                .summary
2242                .as_ref()
2243                .ok_or_else(|| anyhow!("invalid summary"))?;
2244
2245            // Ensure the update comes from the host.
2246            let project = project::Entity::find_by_id(project_id)
2247                .one(&*tx)
2248                .await?
2249                .ok_or_else(|| anyhow!("no such project"))?;
2250            if project.host_connection()? != connection {
2251                return Err(anyhow!("can't update a project hosted by someone else"))?;
2252            }
2253
2254            // Update summary.
2255            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2256                project_id: ActiveValue::set(project_id),
2257                worktree_id: ActiveValue::set(worktree_id),
2258                path: ActiveValue::set(summary.path.clone()),
2259                language_server_id: ActiveValue::set(summary.language_server_id as i64),
2260                error_count: ActiveValue::set(summary.error_count as i32),
2261                warning_count: ActiveValue::set(summary.warning_count as i32),
2262                ..Default::default()
2263            })
2264            .on_conflict(
2265                OnConflict::columns([
2266                    worktree_diagnostic_summary::Column::ProjectId,
2267                    worktree_diagnostic_summary::Column::WorktreeId,
2268                    worktree_diagnostic_summary::Column::Path,
2269                ])
2270                .update_columns([
2271                    worktree_diagnostic_summary::Column::LanguageServerId,
2272                    worktree_diagnostic_summary::Column::ErrorCount,
2273                    worktree_diagnostic_summary::Column::WarningCount,
2274                ])
2275                .to_owned(),
2276            )
2277            .exec(&*tx)
2278            .await?;
2279
2280            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2281            Ok((project.room_id, connection_ids))
2282        })
2283        .await
2284    }
2285
2286    pub async fn start_language_server(
2287        &self,
2288        update: &proto::StartLanguageServer,
2289        connection: ConnectionId,
2290    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2291        self.room_transaction(|tx| async move {
2292            let project_id = ProjectId::from_proto(update.project_id);
2293            let server = update
2294                .server
2295                .as_ref()
2296                .ok_or_else(|| anyhow!("invalid language server"))?;
2297
2298            // Ensure the update comes from the host.
2299            let project = project::Entity::find_by_id(project_id)
2300                .one(&*tx)
2301                .await?
2302                .ok_or_else(|| anyhow!("no such project"))?;
2303            if project.host_connection()? != connection {
2304                return Err(anyhow!("can't update a project hosted by someone else"))?;
2305            }
2306
2307            // Add the newly-started language server.
2308            language_server::Entity::insert(language_server::ActiveModel {
2309                project_id: ActiveValue::set(project_id),
2310                id: ActiveValue::set(server.id as i64),
2311                name: ActiveValue::set(server.name.clone()),
2312                ..Default::default()
2313            })
2314            .on_conflict(
2315                OnConflict::columns([
2316                    language_server::Column::ProjectId,
2317                    language_server::Column::Id,
2318                ])
2319                .update_column(language_server::Column::Name)
2320                .to_owned(),
2321            )
2322            .exec(&*tx)
2323            .await?;
2324
2325            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2326            Ok((project.room_id, connection_ids))
2327        })
2328        .await
2329    }
2330
2331    pub async fn join_project(
2332        &self,
2333        project_id: ProjectId,
2334        connection: ConnectionId,
2335    ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2336        self.room_transaction(|tx| async move {
2337            let participant = room_participant::Entity::find()
2338                .filter(
2339                    Condition::all()
2340                        .add(
2341                            room_participant::Column::AnsweringConnectionId
2342                                .eq(connection.id as i32),
2343                        )
2344                        .add(
2345                            room_participant::Column::AnsweringConnectionServerId
2346                                .eq(connection.owner_id as i32),
2347                        ),
2348                )
2349                .one(&*tx)
2350                .await?
2351                .ok_or_else(|| anyhow!("must join a room first"))?;
2352
2353            let project = project::Entity::find_by_id(project_id)
2354                .one(&*tx)
2355                .await?
2356                .ok_or_else(|| anyhow!("no such project"))?;
2357            if project.room_id != participant.room_id {
2358                return Err(anyhow!("no such project"))?;
2359            }
2360
2361            let mut collaborators = project
2362                .find_related(project_collaborator::Entity)
2363                .all(&*tx)
2364                .await?;
2365            let replica_ids = collaborators
2366                .iter()
2367                .map(|c| c.replica_id)
2368                .collect::<HashSet<_>>();
2369            let mut replica_id = ReplicaId(1);
2370            while replica_ids.contains(&replica_id) {
2371                replica_id.0 += 1;
2372            }
2373            let new_collaborator = project_collaborator::ActiveModel {
2374                project_id: ActiveValue::set(project_id),
2375                connection_id: ActiveValue::set(connection.id as i32),
2376                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2377                user_id: ActiveValue::set(participant.user_id),
2378                replica_id: ActiveValue::set(replica_id),
2379                is_host: ActiveValue::set(false),
2380                ..Default::default()
2381            }
2382            .insert(&*tx)
2383            .await?;
2384            collaborators.push(new_collaborator);
2385
2386            let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2387            let mut worktrees = db_worktrees
2388                .into_iter()
2389                .map(|db_worktree| {
2390                    (
2391                        db_worktree.id as u64,
2392                        Worktree {
2393                            id: db_worktree.id as u64,
2394                            abs_path: db_worktree.abs_path,
2395                            root_name: db_worktree.root_name,
2396                            visible: db_worktree.visible,
2397                            entries: Default::default(),
2398                            diagnostic_summaries: Default::default(),
2399                            scan_id: db_worktree.scan_id as u64,
2400                            completed_scan_id: db_worktree.completed_scan_id as u64,
2401                        },
2402                    )
2403                })
2404                .collect::<BTreeMap<_, _>>();
2405
2406            // Populate worktree entries.
2407            {
2408                let mut db_entries = worktree_entry::Entity::find()
2409                    .filter(
2410                        Condition::all()
2411                            .add(worktree_entry::Column::ProjectId.eq(project_id))
2412                            .add(worktree_entry::Column::IsDeleted.eq(false)),
2413                    )
2414                    .stream(&*tx)
2415                    .await?;
2416                while let Some(db_entry) = db_entries.next().await {
2417                    let db_entry = db_entry?;
2418                    if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2419                        worktree.entries.push(proto::Entry {
2420                            id: db_entry.id as u64,
2421                            is_dir: db_entry.is_dir,
2422                            path: db_entry.path,
2423                            inode: db_entry.inode as u64,
2424                            mtime: Some(proto::Timestamp {
2425                                seconds: db_entry.mtime_seconds as u64,
2426                                nanos: db_entry.mtime_nanos as u32,
2427                            }),
2428                            is_symlink: db_entry.is_symlink,
2429                            is_ignored: db_entry.is_ignored,
2430                        });
2431                    }
2432                }
2433            }
2434
2435            // Populate worktree diagnostic summaries.
2436            {
2437                let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2438                    .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2439                    .stream(&*tx)
2440                    .await?;
2441                while let Some(db_summary) = db_summaries.next().await {
2442                    let db_summary = db_summary?;
2443                    if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2444                        worktree
2445                            .diagnostic_summaries
2446                            .push(proto::DiagnosticSummary {
2447                                path: db_summary.path,
2448                                language_server_id: db_summary.language_server_id as u64,
2449                                error_count: db_summary.error_count as u32,
2450                                warning_count: db_summary.warning_count as u32,
2451                            });
2452                    }
2453                }
2454            }
2455
2456            // Populate language servers.
2457            let language_servers = project
2458                .find_related(language_server::Entity)
2459                .all(&*tx)
2460                .await?;
2461
2462            let room_id = project.room_id;
2463            let project = Project {
2464                collaborators: collaborators
2465                    .into_iter()
2466                    .map(|collaborator| ProjectCollaborator {
2467                        connection_id: collaborator.connection(),
2468                        user_id: collaborator.user_id,
2469                        replica_id: collaborator.replica_id,
2470                        is_host: collaborator.is_host,
2471                    })
2472                    .collect(),
2473                worktrees,
2474                language_servers: language_servers
2475                    .into_iter()
2476                    .map(|language_server| proto::LanguageServer {
2477                        id: language_server.id as u64,
2478                        name: language_server.name,
2479                    })
2480                    .collect(),
2481            };
2482            Ok((room_id, (project, replica_id as ReplicaId)))
2483        })
2484        .await
2485    }
2486
2487    pub async fn leave_project(
2488        &self,
2489        project_id: ProjectId,
2490        connection: ConnectionId,
2491    ) -> Result<RoomGuard<LeftProject>> {
2492        self.room_transaction(|tx| async move {
2493            let result = project_collaborator::Entity::delete_many()
2494                .filter(
2495                    Condition::all()
2496                        .add(project_collaborator::Column::ProjectId.eq(project_id))
2497                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2498                        .add(
2499                            project_collaborator::Column::ConnectionServerId
2500                                .eq(connection.owner_id as i32),
2501                        ),
2502                )
2503                .exec(&*tx)
2504                .await?;
2505            if result.rows_affected == 0 {
2506                Err(anyhow!("not a collaborator on this project"))?;
2507            }
2508
2509            let project = project::Entity::find_by_id(project_id)
2510                .one(&*tx)
2511                .await?
2512                .ok_or_else(|| anyhow!("no such project"))?;
2513            let collaborators = project
2514                .find_related(project_collaborator::Entity)
2515                .all(&*tx)
2516                .await?;
2517            let connection_ids = collaborators
2518                .into_iter()
2519                .map(|collaborator| collaborator.connection())
2520                .collect();
2521
2522            let left_project = LeftProject {
2523                id: project_id,
2524                host_user_id: project.host_user_id,
2525                host_connection_id: project.host_connection()?,
2526                connection_ids,
2527            };
2528            Ok((project.room_id, left_project))
2529        })
2530        .await
2531    }
2532
2533    pub async fn project_collaborators(
2534        &self,
2535        project_id: ProjectId,
2536        connection_id: ConnectionId,
2537    ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
2538        self.room_transaction(|tx| async move {
2539            let project = project::Entity::find_by_id(project_id)
2540                .one(&*tx)
2541                .await?
2542                .ok_or_else(|| anyhow!("no such project"))?;
2543            let collaborators = project_collaborator::Entity::find()
2544                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2545                .all(&*tx)
2546                .await?
2547                .into_iter()
2548                .map(|collaborator| ProjectCollaborator {
2549                    connection_id: collaborator.connection(),
2550                    user_id: collaborator.user_id,
2551                    replica_id: collaborator.replica_id,
2552                    is_host: collaborator.is_host,
2553                })
2554                .collect::<Vec<_>>();
2555
2556            if collaborators
2557                .iter()
2558                .any(|collaborator| collaborator.connection_id == connection_id)
2559            {
2560                Ok((project.room_id, collaborators))
2561            } else {
2562                Err(anyhow!("no such project"))?
2563            }
2564        })
2565        .await
2566    }
2567
2568    pub async fn project_connection_ids(
2569        &self,
2570        project_id: ProjectId,
2571        connection_id: ConnectionId,
2572    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
2573        self.room_transaction(|tx| async move {
2574            let project = project::Entity::find_by_id(project_id)
2575                .one(&*tx)
2576                .await?
2577                .ok_or_else(|| anyhow!("no such project"))?;
2578            let mut collaborators = project_collaborator::Entity::find()
2579                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2580                .stream(&*tx)
2581                .await?;
2582
2583            let mut connection_ids = HashSet::default();
2584            while let Some(collaborator) = collaborators.next().await {
2585                let collaborator = collaborator?;
2586                connection_ids.insert(collaborator.connection());
2587            }
2588
2589            if connection_ids.contains(&connection_id) {
2590                Ok((project.room_id, connection_ids))
2591            } else {
2592                Err(anyhow!("no such project"))?
2593            }
2594        })
2595        .await
2596    }
2597
2598    async fn project_guest_connection_ids(
2599        &self,
2600        project_id: ProjectId,
2601        tx: &DatabaseTransaction,
2602    ) -> Result<Vec<ConnectionId>> {
2603        let mut collaborators = project_collaborator::Entity::find()
2604            .filter(
2605                project_collaborator::Column::ProjectId
2606                    .eq(project_id)
2607                    .and(project_collaborator::Column::IsHost.eq(false)),
2608            )
2609            .stream(tx)
2610            .await?;
2611
2612        let mut guest_connection_ids = Vec::new();
2613        while let Some(collaborator) = collaborators.next().await {
2614            let collaborator = collaborator?;
2615            guest_connection_ids.push(collaborator.connection());
2616        }
2617        Ok(guest_connection_ids)
2618    }
2619
2620    // access tokens
2621
2622    pub async fn create_access_token_hash(
2623        &self,
2624        user_id: UserId,
2625        access_token_hash: &str,
2626        max_access_token_count: usize,
2627    ) -> Result<()> {
2628        self.transaction(|tx| async {
2629            let tx = tx;
2630
2631            access_token::ActiveModel {
2632                user_id: ActiveValue::set(user_id),
2633                hash: ActiveValue::set(access_token_hash.into()),
2634                ..Default::default()
2635            }
2636            .insert(&*tx)
2637            .await?;
2638
2639            access_token::Entity::delete_many()
2640                .filter(
2641                    access_token::Column::Id.in_subquery(
2642                        Query::select()
2643                            .column(access_token::Column::Id)
2644                            .from(access_token::Entity)
2645                            .and_where(access_token::Column::UserId.eq(user_id))
2646                            .order_by(access_token::Column::Id, sea_orm::Order::Desc)
2647                            .limit(10000)
2648                            .offset(max_access_token_count as u64)
2649                            .to_owned(),
2650                    ),
2651                )
2652                .exec(&*tx)
2653                .await?;
2654            Ok(())
2655        })
2656        .await
2657    }
2658
2659    pub async fn get_access_token_hashes(&self, user_id: UserId) -> Result<Vec<String>> {
2660        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2661        enum QueryAs {
2662            Hash,
2663        }
2664
2665        self.transaction(|tx| async move {
2666            Ok(access_token::Entity::find()
2667                .select_only()
2668                .column(access_token::Column::Hash)
2669                .filter(access_token::Column::UserId.eq(user_id))
2670                .order_by_desc(access_token::Column::Id)
2671                .into_values::<_, QueryAs>()
2672                .all(&*tx)
2673                .await?)
2674        })
2675        .await
2676    }
2677
2678    async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
2679    where
2680        F: Send + Fn(TransactionHandle) -> Fut,
2681        Fut: Send + Future<Output = Result<T>>,
2682    {
2683        let body = async {
2684            loop {
2685                let (tx, result) = self.with_transaction(&f).await?;
2686                match result {
2687                    Ok(result) => {
2688                        match tx.commit().await.map_err(Into::into) {
2689                            Ok(()) => return Ok(result),
2690                            Err(error) => {
2691                                if is_serialization_error(&error) {
2692                                    // Retry (don't break the loop)
2693                                } else {
2694                                    return Err(error);
2695                                }
2696                            }
2697                        }
2698                    }
2699                    Err(error) => {
2700                        tx.rollback().await?;
2701                        if is_serialization_error(&error) {
2702                            // Retry (don't break the loop)
2703                        } else {
2704                            return Err(error);
2705                        }
2706                    }
2707                }
2708            }
2709        };
2710
2711        self.run(body).await
2712    }
2713
2714    async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
2715    where
2716        F: Send + Fn(TransactionHandle) -> Fut,
2717        Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
2718    {
2719        let body = async {
2720            loop {
2721                let (tx, result) = self.with_transaction(&f).await?;
2722                match result {
2723                    Ok(Some((room_id, data))) => {
2724                        let lock = self.rooms.entry(room_id).or_default().clone();
2725                        let _guard = lock.lock_owned().await;
2726                        match tx.commit().await.map_err(Into::into) {
2727                            Ok(()) => {
2728                                return Ok(Some(RoomGuard {
2729                                    data,
2730                                    _guard,
2731                                    _not_send: PhantomData,
2732                                }));
2733                            }
2734                            Err(error) => {
2735                                if is_serialization_error(&error) {
2736                                    // Retry (don't break the loop)
2737                                } else {
2738                                    return Err(error);
2739                                }
2740                            }
2741                        }
2742                    }
2743                    Ok(None) => {
2744                        match tx.commit().await.map_err(Into::into) {
2745                            Ok(()) => return Ok(None),
2746                            Err(error) => {
2747                                if is_serialization_error(&error) {
2748                                    // Retry (don't break the loop)
2749                                } else {
2750                                    return Err(error);
2751                                }
2752                            }
2753                        }
2754                    }
2755                    Err(error) => {
2756                        tx.rollback().await?;
2757                        if is_serialization_error(&error) {
2758                            // Retry (don't break the loop)
2759                        } else {
2760                            return Err(error);
2761                        }
2762                    }
2763                }
2764            }
2765        };
2766
2767        self.run(body).await
2768    }
2769
2770    async fn room_transaction<F, Fut, T>(&self, f: F) -> Result<RoomGuard<T>>
2771    where
2772        F: Send + Fn(TransactionHandle) -> Fut,
2773        Fut: Send + Future<Output = Result<(RoomId, T)>>,
2774    {
2775        let data = self
2776            .optional_room_transaction(move |tx| {
2777                let future = f(tx);
2778                async {
2779                    let data = future.await?;
2780                    Ok(Some(data))
2781                }
2782            })
2783            .await?;
2784        Ok(data.unwrap())
2785    }
2786
2787    async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
2788    where
2789        F: Send + Fn(TransactionHandle) -> Fut,
2790        Fut: Send + Future<Output = Result<T>>,
2791    {
2792        let tx = self
2793            .pool
2794            .begin_with_config(Some(IsolationLevel::Serializable), None)
2795            .await?;
2796
2797        let mut tx = Arc::new(Some(tx));
2798        let result = f(TransactionHandle(tx.clone())).await;
2799        let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
2800            return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
2801        };
2802
2803        Ok((tx, result))
2804    }
2805
2806    async fn run<F, T>(&self, future: F) -> T
2807    where
2808        F: Future<Output = T>,
2809    {
2810        #[cfg(test)]
2811        {
2812            if let Some(background) = self.background.as_ref() {
2813                background.simulate_random_delay().await;
2814            }
2815
2816            self.runtime.as_ref().unwrap().block_on(future)
2817        }
2818
2819        #[cfg(not(test))]
2820        {
2821            future.await
2822        }
2823    }
2824}
2825
2826fn is_serialization_error(error: &Error) -> bool {
2827    const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
2828    match error {
2829        Error::Database(
2830            DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
2831            | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
2832        ) if error
2833            .as_database_error()
2834            .and_then(|error| error.code())
2835            .as_deref()
2836            == Some(SERIALIZATION_FAILURE_CODE) =>
2837        {
2838            true
2839        }
2840        _ => false,
2841    }
2842}
2843
2844struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
2845
2846impl Deref for TransactionHandle {
2847    type Target = DatabaseTransaction;
2848
2849    fn deref(&self) -> &Self::Target {
2850        self.0.as_ref().as_ref().unwrap()
2851    }
2852}
2853
2854pub struct RoomGuard<T> {
2855    data: T,
2856    _guard: OwnedMutexGuard<()>,
2857    _not_send: PhantomData<Rc<()>>,
2858}
2859
2860impl<T> Deref for RoomGuard<T> {
2861    type Target = T;
2862
2863    fn deref(&self) -> &T {
2864        &self.data
2865    }
2866}
2867
2868impl<T> DerefMut for RoomGuard<T> {
2869    fn deref_mut(&mut self) -> &mut T {
2870        &mut self.data
2871    }
2872}
2873
2874#[derive(Debug, Serialize, Deserialize)]
2875pub struct NewUserParams {
2876    pub github_login: String,
2877    pub github_user_id: i32,
2878    pub invite_count: i32,
2879}
2880
2881#[derive(Debug)]
2882pub struct NewUserResult {
2883    pub user_id: UserId,
2884    pub metrics_id: String,
2885    pub inviting_user_id: Option<UserId>,
2886    pub signup_device_id: Option<String>,
2887}
2888
2889fn random_invite_code() -> String {
2890    nanoid::nanoid!(16)
2891}
2892
2893fn random_email_confirmation_code() -> String {
2894    nanoid::nanoid!(64)
2895}
2896
2897macro_rules! id_type {
2898    ($name:ident) => {
2899        #[derive(
2900            Clone,
2901            Copy,
2902            Debug,
2903            Default,
2904            PartialEq,
2905            Eq,
2906            PartialOrd,
2907            Ord,
2908            Hash,
2909            Serialize,
2910            Deserialize,
2911        )]
2912        #[serde(transparent)]
2913        pub struct $name(pub i32);
2914
2915        impl $name {
2916            #[allow(unused)]
2917            pub const MAX: Self = Self(i32::MAX);
2918
2919            #[allow(unused)]
2920            pub fn from_proto(value: u64) -> Self {
2921                Self(value as i32)
2922            }
2923
2924            #[allow(unused)]
2925            pub fn to_proto(self) -> u64 {
2926                self.0 as u64
2927            }
2928        }
2929
2930        impl std::fmt::Display for $name {
2931            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2932                self.0.fmt(f)
2933            }
2934        }
2935
2936        impl From<$name> for sea_query::Value {
2937            fn from(value: $name) -> Self {
2938                sea_query::Value::Int(Some(value.0))
2939            }
2940        }
2941
2942        impl sea_orm::TryGetable for $name {
2943            fn try_get(
2944                res: &sea_orm::QueryResult,
2945                pre: &str,
2946                col: &str,
2947            ) -> Result<Self, sea_orm::TryGetError> {
2948                Ok(Self(i32::try_get(res, pre, col)?))
2949            }
2950        }
2951
2952        impl sea_query::ValueType for $name {
2953            fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
2954                match v {
2955                    Value::TinyInt(Some(int)) => {
2956                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2957                    }
2958                    Value::SmallInt(Some(int)) => {
2959                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2960                    }
2961                    Value::Int(Some(int)) => {
2962                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2963                    }
2964                    Value::BigInt(Some(int)) => {
2965                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2966                    }
2967                    Value::TinyUnsigned(Some(int)) => {
2968                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2969                    }
2970                    Value::SmallUnsigned(Some(int)) => {
2971                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2972                    }
2973                    Value::Unsigned(Some(int)) => {
2974                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2975                    }
2976                    Value::BigUnsigned(Some(int)) => {
2977                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2978                    }
2979                    _ => Err(sea_query::ValueTypeErr),
2980                }
2981            }
2982
2983            fn type_name() -> String {
2984                stringify!($name).into()
2985            }
2986
2987            fn array_type() -> sea_query::ArrayType {
2988                sea_query::ArrayType::Int
2989            }
2990
2991            fn column_type() -> sea_query::ColumnType {
2992                sea_query::ColumnType::Integer(None)
2993            }
2994        }
2995
2996        impl sea_orm::TryFromU64 for $name {
2997            fn try_from_u64(n: u64) -> Result<Self, DbErr> {
2998                Ok(Self(n.try_into().map_err(|_| {
2999                    DbErr::ConvertFromU64(concat!(
3000                        "error converting ",
3001                        stringify!($name),
3002                        " to u64"
3003                    ))
3004                })?))
3005            }
3006        }
3007
3008        impl sea_query::Nullable for $name {
3009            fn null() -> Value {
3010                Value::Int(None)
3011            }
3012        }
3013    };
3014}
3015
3016id_type!(AccessTokenId);
3017id_type!(ContactId);
3018id_type!(RoomId);
3019id_type!(RoomParticipantId);
3020id_type!(ProjectId);
3021id_type!(ProjectCollaboratorId);
3022id_type!(ReplicaId);
3023id_type!(ServerId);
3024id_type!(SignupId);
3025id_type!(UserId);
3026
3027pub struct RejoinedRoom {
3028    pub room: proto::Room,
3029    pub rejoined_projects: Vec<RejoinedProject>,
3030    pub reshared_projects: Vec<ResharedProject>,
3031}
3032
3033pub struct ResharedProject {
3034    pub id: ProjectId,
3035    pub old_connection_id: ConnectionId,
3036    pub collaborators: Vec<ProjectCollaborator>,
3037    pub worktrees: Vec<proto::WorktreeMetadata>,
3038}
3039
3040pub struct RejoinedProject {
3041    pub id: ProjectId,
3042    pub old_connection_id: ConnectionId,
3043    pub collaborators: Vec<ProjectCollaborator>,
3044    pub worktrees: Vec<RejoinedWorktree>,
3045    pub language_servers: Vec<proto::LanguageServer>,
3046}
3047
3048#[derive(Debug)]
3049pub struct RejoinedWorktree {
3050    pub id: u64,
3051    pub abs_path: String,
3052    pub root_name: String,
3053    pub visible: bool,
3054    pub updated_entries: Vec<proto::Entry>,
3055    pub removed_entries: Vec<u64>,
3056    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3057    pub scan_id: u64,
3058    pub completed_scan_id: u64,
3059}
3060
3061pub struct LeftRoom {
3062    pub room: proto::Room,
3063    pub left_projects: HashMap<ProjectId, LeftProject>,
3064    pub canceled_calls_to_user_ids: Vec<UserId>,
3065}
3066
3067pub struct RefreshedRoom {
3068    pub room: proto::Room,
3069    pub stale_participant_user_ids: Vec<UserId>,
3070    pub canceled_calls_to_user_ids: Vec<UserId>,
3071}
3072
3073pub struct Project {
3074    pub collaborators: Vec<ProjectCollaborator>,
3075    pub worktrees: BTreeMap<u64, Worktree>,
3076    pub language_servers: Vec<proto::LanguageServer>,
3077}
3078
3079pub struct ProjectCollaborator {
3080    pub connection_id: ConnectionId,
3081    pub user_id: UserId,
3082    pub replica_id: ReplicaId,
3083    pub is_host: bool,
3084}
3085
3086impl ProjectCollaborator {
3087    pub fn to_proto(&self) -> proto::Collaborator {
3088        proto::Collaborator {
3089            peer_id: Some(self.connection_id.into()),
3090            replica_id: self.replica_id.0 as u32,
3091            user_id: self.user_id.to_proto(),
3092        }
3093    }
3094}
3095
3096#[derive(Debug)]
3097pub struct LeftProject {
3098    pub id: ProjectId,
3099    pub host_user_id: UserId,
3100    pub host_connection_id: ConnectionId,
3101    pub connection_ids: Vec<ConnectionId>,
3102}
3103
3104pub struct Worktree {
3105    pub id: u64,
3106    pub abs_path: String,
3107    pub root_name: String,
3108    pub visible: bool,
3109    pub entries: Vec<proto::Entry>,
3110    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3111    pub scan_id: u64,
3112    pub completed_scan_id: u64,
3113}
3114
3115#[cfg(test)]
3116pub use test::*;
3117
3118#[cfg(test)]
3119mod test {
3120    use super::*;
3121    use gpui::executor::Background;
3122    use lazy_static::lazy_static;
3123    use parking_lot::Mutex;
3124    use rand::prelude::*;
3125    use sea_orm::ConnectionTrait;
3126    use sqlx::migrate::MigrateDatabase;
3127    use std::sync::Arc;
3128
3129    pub struct TestDb {
3130        pub db: Option<Arc<Database>>,
3131        pub connection: Option<sqlx::AnyConnection>,
3132    }
3133
3134    impl TestDb {
3135        pub fn sqlite(background: Arc<Background>) -> Self {
3136            let url = format!("sqlite::memory:");
3137            let runtime = tokio::runtime::Builder::new_current_thread()
3138                .enable_io()
3139                .enable_time()
3140                .build()
3141                .unwrap();
3142
3143            let mut db = runtime.block_on(async {
3144                let mut options = ConnectOptions::new(url);
3145                options.max_connections(5);
3146                let db = Database::new(options).await.unwrap();
3147                let sql = include_str!(concat!(
3148                    env!("CARGO_MANIFEST_DIR"),
3149                    "/migrations.sqlite/20221109000000_test_schema.sql"
3150                ));
3151                db.pool
3152                    .execute(sea_orm::Statement::from_string(
3153                        db.pool.get_database_backend(),
3154                        sql.into(),
3155                    ))
3156                    .await
3157                    .unwrap();
3158                db
3159            });
3160
3161            db.background = Some(background);
3162            db.runtime = Some(runtime);
3163
3164            Self {
3165                db: Some(Arc::new(db)),
3166                connection: None,
3167            }
3168        }
3169
3170        pub fn postgres(background: Arc<Background>) -> Self {
3171            lazy_static! {
3172                static ref LOCK: Mutex<()> = Mutex::new(());
3173            }
3174
3175            let _guard = LOCK.lock();
3176            let mut rng = StdRng::from_entropy();
3177            let url = format!(
3178                "postgres://postgres@localhost/zed-test-{}",
3179                rng.gen::<u128>()
3180            );
3181            let runtime = tokio::runtime::Builder::new_current_thread()
3182                .enable_io()
3183                .enable_time()
3184                .build()
3185                .unwrap();
3186
3187            let mut db = runtime.block_on(async {
3188                sqlx::Postgres::create_database(&url)
3189                    .await
3190                    .expect("failed to create test db");
3191                let mut options = ConnectOptions::new(url);
3192                options
3193                    .max_connections(5)
3194                    .idle_timeout(Duration::from_secs(0));
3195                let db = Database::new(options).await.unwrap();
3196                let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
3197                db.migrate(Path::new(migrations_path), false).await.unwrap();
3198                db
3199            });
3200
3201            db.background = Some(background);
3202            db.runtime = Some(runtime);
3203
3204            Self {
3205                db: Some(Arc::new(db)),
3206                connection: None,
3207            }
3208        }
3209
3210        pub fn db(&self) -> &Arc<Database> {
3211            self.db.as_ref().unwrap()
3212        }
3213    }
3214
3215    impl Drop for TestDb {
3216        fn drop(&mut self) {
3217            let db = self.db.take().unwrap();
3218            if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
3219                db.runtime.as_ref().unwrap().block_on(async {
3220                    use util::ResultExt;
3221                    let query = "
3222                        SELECT pg_terminate_backend(pg_stat_activity.pid)
3223                        FROM pg_stat_activity
3224                        WHERE
3225                            pg_stat_activity.datname = current_database() AND
3226                            pid <> pg_backend_pid();
3227                    ";
3228                    db.pool
3229                        .execute(sea_orm::Statement::from_string(
3230                            db.pool.get_database_backend(),
3231                            query.into(),
3232                        ))
3233                        .await
3234                        .log_err();
3235                    sqlx::Postgres::drop_database(db.options.get_url())
3236                        .await
3237                        .log_err();
3238                })
3239            }
3240        }
3241    }
3242}