db.rs

   1mod access_token;
   2mod contact;
   3mod language_server;
   4mod project;
   5mod project_collaborator;
   6mod room;
   7mod room_participant;
   8mod server;
   9mod signup;
  10#[cfg(test)]
  11mod tests;
  12mod user;
  13mod worktree;
  14mod worktree_diagnostic_summary;
  15mod worktree_entry;
  16
  17use crate::{Error, Result};
  18use anyhow::anyhow;
  19use collections::{BTreeMap, HashMap, HashSet};
  20pub use contact::Contact;
  21use dashmap::DashMap;
  22use futures::StreamExt;
  23use hyper::StatusCode;
  24use rpc::{proto, ConnectionId};
  25use sea_orm::Condition;
  26pub use sea_orm::ConnectOptions;
  27use sea_orm::{
  28    entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
  29    DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
  30    Statement, TransactionTrait,
  31};
  32use sea_query::{Alias, Expr, OnConflict, Query};
  33use serde::{Deserialize, Serialize};
  34pub use signup::{Invite, NewSignup, WaitlistSummary};
  35use sqlx::migrate::{Migrate, Migration, MigrationSource};
  36use sqlx::Connection;
  37use std::ops::{Deref, DerefMut};
  38use std::path::Path;
  39use std::time::Duration;
  40use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
  41use tokio::sync::{Mutex, OwnedMutexGuard};
  42pub use user::Model as User;
  43
  44pub struct Database {
  45    options: ConnectOptions,
  46    pool: DatabaseConnection,
  47    rooms: DashMap<RoomId, Arc<Mutex<()>>>,
  48    #[cfg(test)]
  49    background: Option<std::sync::Arc<gpui::executor::Background>>,
  50    #[cfg(test)]
  51    runtime: Option<tokio::runtime::Runtime>,
  52}
  53
  54impl Database {
  55    pub async fn new(options: ConnectOptions) -> Result<Self> {
  56        Ok(Self {
  57            options: options.clone(),
  58            pool: sea_orm::Database::connect(options).await?,
  59            rooms: DashMap::with_capacity(16384),
  60            #[cfg(test)]
  61            background: None,
  62            #[cfg(test)]
  63            runtime: None,
  64        })
  65    }
  66
  67    #[cfg(test)]
  68    pub fn reset(&self) {
  69        self.rooms.clear();
  70    }
  71
  72    pub async fn migrate(
  73        &self,
  74        migrations_path: &Path,
  75        ignore_checksum_mismatch: bool,
  76    ) -> anyhow::Result<Vec<(Migration, Duration)>> {
  77        let migrations = MigrationSource::resolve(migrations_path)
  78            .await
  79            .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
  80
  81        let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
  82
  83        connection.ensure_migrations_table().await?;
  84        let applied_migrations: HashMap<_, _> = connection
  85            .list_applied_migrations()
  86            .await?
  87            .into_iter()
  88            .map(|m| (m.version, m))
  89            .collect();
  90
  91        let mut new_migrations = Vec::new();
  92        for migration in migrations {
  93            match applied_migrations.get(&migration.version) {
  94                Some(applied_migration) => {
  95                    if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
  96                    {
  97                        Err(anyhow!(
  98                            "checksum mismatch for applied migration {}",
  99                            migration.description
 100                        ))?;
 101                    }
 102                }
 103                None => {
 104                    let elapsed = connection.apply(&migration).await?;
 105                    new_migrations.push((migration, elapsed));
 106                }
 107            }
 108        }
 109
 110        Ok(new_migrations)
 111    }
 112
 113    pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
 114        self.transaction(|tx| async move {
 115            let server = server::ActiveModel {
 116                environment: ActiveValue::set(environment.into()),
 117                ..Default::default()
 118            }
 119            .insert(&*tx)
 120            .await?;
 121            Ok(server.id)
 122        })
 123        .await
 124    }
 125
 126    pub async fn delete_stale_projects(
 127        &self,
 128        new_epoch: ServerId,
 129        environment: &str,
 130    ) -> Result<()> {
 131        self.transaction(|tx| async move {
 132            let stale_server_epochs = self.stale_server_ids(environment, new_epoch, &tx).await?;
 133            project_collaborator::Entity::delete_many()
 134                .filter(
 135                    project_collaborator::Column::ConnectionServerId
 136                        .is_in(stale_server_epochs.iter().copied()),
 137                )
 138                .exec(&*tx)
 139                .await?;
 140            project::Entity::delete_many()
 141                .filter(
 142                    project::Column::HostConnectionServerId
 143                        .is_in(stale_server_epochs.iter().copied()),
 144                )
 145                .exec(&*tx)
 146                .await?;
 147            Ok(())
 148        })
 149        .await
 150    }
 151
 152    pub async fn stale_room_ids(
 153        &self,
 154        new_epoch: ServerId,
 155        environment: &str,
 156    ) -> Result<Vec<RoomId>> {
 157        self.transaction(|tx| async move {
 158            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 159            enum QueryAs {
 160                RoomId,
 161            }
 162
 163            let stale_server_epochs = self.stale_server_ids(environment, new_epoch, &tx).await?;
 164            Ok(room_participant::Entity::find()
 165                .select_only()
 166                .column(room_participant::Column::RoomId)
 167                .distinct()
 168                .filter(
 169                    room_participant::Column::AnsweringConnectionServerId
 170                        .is_in(stale_server_epochs),
 171                )
 172                .into_values::<_, QueryAs>()
 173                .all(&*tx)
 174                .await?)
 175        })
 176        .await
 177    }
 178
 179    pub async fn refresh_room(
 180        &self,
 181        room_id: RoomId,
 182        new_epoch: ServerId,
 183    ) -> Result<RoomGuard<RefreshedRoom>> {
 184        self.room_transaction(|tx| async move {
 185            let stale_participant_filter = Condition::all()
 186                .add(room_participant::Column::RoomId.eq(room_id))
 187                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
 188                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_epoch));
 189
 190            let stale_participant_user_ids = room_participant::Entity::find()
 191                .filter(stale_participant_filter.clone())
 192                .all(&*tx)
 193                .await?
 194                .into_iter()
 195                .map(|participant| participant.user_id)
 196                .collect::<Vec<_>>();
 197
 198            // Delete participants who failed to reconnect.
 199            room_participant::Entity::delete_many()
 200                .filter(stale_participant_filter)
 201                .exec(&*tx)
 202                .await?;
 203
 204            let room = self.get_room(room_id, &tx).await?;
 205            let mut canceled_calls_to_user_ids = Vec::new();
 206            // Delete the room if it becomes empty and cancel pending calls.
 207            if room.participants.is_empty() {
 208                canceled_calls_to_user_ids.extend(
 209                    room.pending_participants
 210                        .iter()
 211                        .map(|pending_participant| UserId::from_proto(pending_participant.user_id)),
 212                );
 213                room_participant::Entity::delete_many()
 214                    .filter(room_participant::Column::RoomId.eq(room_id))
 215                    .exec(&*tx)
 216                    .await?;
 217                room::Entity::delete_by_id(room_id).exec(&*tx).await?;
 218            }
 219
 220            Ok((
 221                room_id,
 222                RefreshedRoom {
 223                    room,
 224                    stale_participant_user_ids,
 225                    canceled_calls_to_user_ids,
 226                },
 227            ))
 228        })
 229        .await
 230    }
 231
 232    pub async fn delete_stale_servers(&self, new_epoch: ServerId, environment: &str) -> Result<()> {
 233        self.transaction(|tx| async move {
 234            server::Entity::delete_many()
 235                .filter(
 236                    Condition::all()
 237                        .add(server::Column::Environment.eq(environment))
 238                        .add(server::Column::Id.ne(new_epoch)),
 239                )
 240                .exec(&*tx)
 241                .await?;
 242            Ok(())
 243        })
 244        .await
 245    }
 246
 247    async fn stale_server_ids(
 248        &self,
 249        environment: &str,
 250        new_epoch: ServerId,
 251        tx: &DatabaseTransaction,
 252    ) -> Result<Vec<ServerId>> {
 253        let stale_servers = server::Entity::find()
 254            .filter(
 255                Condition::all()
 256                    .add(server::Column::Environment.eq(environment))
 257                    .add(server::Column::Id.ne(new_epoch)),
 258            )
 259            .all(&*tx)
 260            .await?;
 261        Ok(stale_servers.into_iter().map(|server| server.id).collect())
 262    }
 263
 264    // users
 265
 266    pub async fn create_user(
 267        &self,
 268        email_address: &str,
 269        admin: bool,
 270        params: NewUserParams,
 271    ) -> Result<NewUserResult> {
 272        self.transaction(|tx| async {
 273            let tx = tx;
 274            let user = user::Entity::insert(user::ActiveModel {
 275                email_address: ActiveValue::set(Some(email_address.into())),
 276                github_login: ActiveValue::set(params.github_login.clone()),
 277                github_user_id: ActiveValue::set(Some(params.github_user_id)),
 278                admin: ActiveValue::set(admin),
 279                metrics_id: ActiveValue::set(Uuid::new_v4()),
 280                ..Default::default()
 281            })
 282            .on_conflict(
 283                OnConflict::column(user::Column::GithubLogin)
 284                    .update_column(user::Column::GithubLogin)
 285                    .to_owned(),
 286            )
 287            .exec_with_returning(&*tx)
 288            .await?;
 289
 290            Ok(NewUserResult {
 291                user_id: user.id,
 292                metrics_id: user.metrics_id.to_string(),
 293                signup_device_id: None,
 294                inviting_user_id: None,
 295            })
 296        })
 297        .await
 298    }
 299
 300    pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
 301        self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
 302            .await
 303    }
 304
 305    pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
 306        self.transaction(|tx| async {
 307            let tx = tx;
 308            Ok(user::Entity::find()
 309                .filter(user::Column::Id.is_in(ids.iter().copied()))
 310                .all(&*tx)
 311                .await?)
 312        })
 313        .await
 314    }
 315
 316    pub async fn get_user_by_github_account(
 317        &self,
 318        github_login: &str,
 319        github_user_id: Option<i32>,
 320    ) -> Result<Option<User>> {
 321        self.transaction(|tx| async move {
 322            let tx = &*tx;
 323            if let Some(github_user_id) = github_user_id {
 324                if let Some(user_by_github_user_id) = user::Entity::find()
 325                    .filter(user::Column::GithubUserId.eq(github_user_id))
 326                    .one(tx)
 327                    .await?
 328                {
 329                    let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
 330                    user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
 331                    Ok(Some(user_by_github_user_id.update(tx).await?))
 332                } else if let Some(user_by_github_login) = user::Entity::find()
 333                    .filter(user::Column::GithubLogin.eq(github_login))
 334                    .one(tx)
 335                    .await?
 336                {
 337                    let mut user_by_github_login = user_by_github_login.into_active_model();
 338                    user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
 339                    Ok(Some(user_by_github_login.update(tx).await?))
 340                } else {
 341                    Ok(None)
 342                }
 343            } else {
 344                Ok(user::Entity::find()
 345                    .filter(user::Column::GithubLogin.eq(github_login))
 346                    .one(tx)
 347                    .await?)
 348            }
 349        })
 350        .await
 351    }
 352
 353    pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
 354        self.transaction(|tx| async move {
 355            Ok(user::Entity::find()
 356                .order_by_asc(user::Column::GithubLogin)
 357                .limit(limit as u64)
 358                .offset(page as u64 * limit as u64)
 359                .all(&*tx)
 360                .await?)
 361        })
 362        .await
 363    }
 364
 365    pub async fn get_users_with_no_invites(
 366        &self,
 367        invited_by_another_user: bool,
 368    ) -> Result<Vec<User>> {
 369        self.transaction(|tx| async move {
 370            Ok(user::Entity::find()
 371                .filter(
 372                    user::Column::InviteCount
 373                        .eq(0)
 374                        .and(if invited_by_another_user {
 375                            user::Column::InviterId.is_not_null()
 376                        } else {
 377                            user::Column::InviterId.is_null()
 378                        }),
 379                )
 380                .all(&*tx)
 381                .await?)
 382        })
 383        .await
 384    }
 385
 386    pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
 387        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 388        enum QueryAs {
 389            MetricsId,
 390        }
 391
 392        self.transaction(|tx| async move {
 393            let metrics_id: Uuid = user::Entity::find_by_id(id)
 394                .select_only()
 395                .column(user::Column::MetricsId)
 396                .into_values::<_, QueryAs>()
 397                .one(&*tx)
 398                .await?
 399                .ok_or_else(|| anyhow!("could not find user"))?;
 400            Ok(metrics_id.to_string())
 401        })
 402        .await
 403    }
 404
 405    pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
 406        self.transaction(|tx| async move {
 407            user::Entity::update_many()
 408                .filter(user::Column::Id.eq(id))
 409                .set(user::ActiveModel {
 410                    admin: ActiveValue::set(is_admin),
 411                    ..Default::default()
 412                })
 413                .exec(&*tx)
 414                .await?;
 415            Ok(())
 416        })
 417        .await
 418    }
 419
 420    pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
 421        self.transaction(|tx| async move {
 422            user::Entity::update_many()
 423                .filter(user::Column::Id.eq(id))
 424                .set(user::ActiveModel {
 425                    connected_once: ActiveValue::set(connected_once),
 426                    ..Default::default()
 427                })
 428                .exec(&*tx)
 429                .await?;
 430            Ok(())
 431        })
 432        .await
 433    }
 434
 435    pub async fn destroy_user(&self, id: UserId) -> Result<()> {
 436        self.transaction(|tx| async move {
 437            access_token::Entity::delete_many()
 438                .filter(access_token::Column::UserId.eq(id))
 439                .exec(&*tx)
 440                .await?;
 441            user::Entity::delete_by_id(id).exec(&*tx).await?;
 442            Ok(())
 443        })
 444        .await
 445    }
 446
 447    // contacts
 448
 449    pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
 450        #[derive(Debug, FromQueryResult)]
 451        struct ContactWithUserBusyStatuses {
 452            user_id_a: UserId,
 453            user_id_b: UserId,
 454            a_to_b: bool,
 455            accepted: bool,
 456            should_notify: bool,
 457            user_a_busy: bool,
 458            user_b_busy: bool,
 459        }
 460
 461        self.transaction(|tx| async move {
 462            let user_a_participant = Alias::new("user_a_participant");
 463            let user_b_participant = Alias::new("user_b_participant");
 464            let mut db_contacts = contact::Entity::find()
 465                .column_as(
 466                    Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
 467                        .is_not_null(),
 468                    "user_a_busy",
 469                )
 470                .column_as(
 471                    Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
 472                        .is_not_null(),
 473                    "user_b_busy",
 474                )
 475                .filter(
 476                    contact::Column::UserIdA
 477                        .eq(user_id)
 478                        .or(contact::Column::UserIdB.eq(user_id)),
 479                )
 480                .join_as(
 481                    JoinType::LeftJoin,
 482                    contact::Relation::UserARoomParticipant.def(),
 483                    user_a_participant,
 484                )
 485                .join_as(
 486                    JoinType::LeftJoin,
 487                    contact::Relation::UserBRoomParticipant.def(),
 488                    user_b_participant,
 489                )
 490                .into_model::<ContactWithUserBusyStatuses>()
 491                .stream(&*tx)
 492                .await?;
 493
 494            let mut contacts = Vec::new();
 495            while let Some(db_contact) = db_contacts.next().await {
 496                let db_contact = db_contact?;
 497                if db_contact.user_id_a == user_id {
 498                    if db_contact.accepted {
 499                        contacts.push(Contact::Accepted {
 500                            user_id: db_contact.user_id_b,
 501                            should_notify: db_contact.should_notify && db_contact.a_to_b,
 502                            busy: db_contact.user_b_busy,
 503                        });
 504                    } else if db_contact.a_to_b {
 505                        contacts.push(Contact::Outgoing {
 506                            user_id: db_contact.user_id_b,
 507                        })
 508                    } else {
 509                        contacts.push(Contact::Incoming {
 510                            user_id: db_contact.user_id_b,
 511                            should_notify: db_contact.should_notify,
 512                        });
 513                    }
 514                } else if db_contact.accepted {
 515                    contacts.push(Contact::Accepted {
 516                        user_id: db_contact.user_id_a,
 517                        should_notify: db_contact.should_notify && !db_contact.a_to_b,
 518                        busy: db_contact.user_a_busy,
 519                    });
 520                } else if db_contact.a_to_b {
 521                    contacts.push(Contact::Incoming {
 522                        user_id: db_contact.user_id_a,
 523                        should_notify: db_contact.should_notify,
 524                    });
 525                } else {
 526                    contacts.push(Contact::Outgoing {
 527                        user_id: db_contact.user_id_a,
 528                    });
 529                }
 530            }
 531
 532            contacts.sort_unstable_by_key(|contact| contact.user_id());
 533
 534            Ok(contacts)
 535        })
 536        .await
 537    }
 538
 539    pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
 540        self.transaction(|tx| async move {
 541            let participant = room_participant::Entity::find()
 542                .filter(room_participant::Column::UserId.eq(user_id))
 543                .one(&*tx)
 544                .await?;
 545            Ok(participant.is_some())
 546        })
 547        .await
 548    }
 549
 550    pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
 551        self.transaction(|tx| async move {
 552            let (id_a, id_b) = if user_id_1 < user_id_2 {
 553                (user_id_1, user_id_2)
 554            } else {
 555                (user_id_2, user_id_1)
 556            };
 557
 558            Ok(contact::Entity::find()
 559                .filter(
 560                    contact::Column::UserIdA
 561                        .eq(id_a)
 562                        .and(contact::Column::UserIdB.eq(id_b))
 563                        .and(contact::Column::Accepted.eq(true)),
 564                )
 565                .one(&*tx)
 566                .await?
 567                .is_some())
 568        })
 569        .await
 570    }
 571
 572    pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
 573        self.transaction(|tx| async move {
 574            let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
 575                (sender_id, receiver_id, true)
 576            } else {
 577                (receiver_id, sender_id, false)
 578            };
 579
 580            let rows_affected = contact::Entity::insert(contact::ActiveModel {
 581                user_id_a: ActiveValue::set(id_a),
 582                user_id_b: ActiveValue::set(id_b),
 583                a_to_b: ActiveValue::set(a_to_b),
 584                accepted: ActiveValue::set(false),
 585                should_notify: ActiveValue::set(true),
 586                ..Default::default()
 587            })
 588            .on_conflict(
 589                OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
 590                    .values([
 591                        (contact::Column::Accepted, true.into()),
 592                        (contact::Column::ShouldNotify, false.into()),
 593                    ])
 594                    .action_and_where(
 595                        contact::Column::Accepted.eq(false).and(
 596                            contact::Column::AToB
 597                                .eq(a_to_b)
 598                                .and(contact::Column::UserIdA.eq(id_b))
 599                                .or(contact::Column::AToB
 600                                    .ne(a_to_b)
 601                                    .and(contact::Column::UserIdA.eq(id_a))),
 602                        ),
 603                    )
 604                    .to_owned(),
 605            )
 606            .exec_without_returning(&*tx)
 607            .await?;
 608
 609            if rows_affected == 1 {
 610                Ok(())
 611            } else {
 612                Err(anyhow!("contact already requested"))?
 613            }
 614        })
 615        .await
 616    }
 617
 618    pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<()> {
 619        self.transaction(|tx| async move {
 620            let (id_a, id_b) = if responder_id < requester_id {
 621                (responder_id, requester_id)
 622            } else {
 623                (requester_id, responder_id)
 624            };
 625
 626            let result = contact::Entity::delete_many()
 627                .filter(
 628                    contact::Column::UserIdA
 629                        .eq(id_a)
 630                        .and(contact::Column::UserIdB.eq(id_b)),
 631                )
 632                .exec(&*tx)
 633                .await?;
 634
 635            if result.rows_affected == 1 {
 636                Ok(())
 637            } else {
 638                Err(anyhow!("no such contact"))?
 639            }
 640        })
 641        .await
 642    }
 643
 644    pub async fn dismiss_contact_notification(
 645        &self,
 646        user_id: UserId,
 647        contact_user_id: UserId,
 648    ) -> Result<()> {
 649        self.transaction(|tx| async move {
 650            let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
 651                (user_id, contact_user_id, true)
 652            } else {
 653                (contact_user_id, user_id, false)
 654            };
 655
 656            let result = contact::Entity::update_many()
 657                .set(contact::ActiveModel {
 658                    should_notify: ActiveValue::set(false),
 659                    ..Default::default()
 660                })
 661                .filter(
 662                    contact::Column::UserIdA
 663                        .eq(id_a)
 664                        .and(contact::Column::UserIdB.eq(id_b))
 665                        .and(
 666                            contact::Column::AToB
 667                                .eq(a_to_b)
 668                                .and(contact::Column::Accepted.eq(true))
 669                                .or(contact::Column::AToB
 670                                    .ne(a_to_b)
 671                                    .and(contact::Column::Accepted.eq(false))),
 672                        ),
 673                )
 674                .exec(&*tx)
 675                .await?;
 676            if result.rows_affected == 0 {
 677                Err(anyhow!("no such contact request"))?
 678            } else {
 679                Ok(())
 680            }
 681        })
 682        .await
 683    }
 684
 685    pub async fn respond_to_contact_request(
 686        &self,
 687        responder_id: UserId,
 688        requester_id: UserId,
 689        accept: bool,
 690    ) -> Result<()> {
 691        self.transaction(|tx| async move {
 692            let (id_a, id_b, a_to_b) = if responder_id < requester_id {
 693                (responder_id, requester_id, false)
 694            } else {
 695                (requester_id, responder_id, true)
 696            };
 697            let rows_affected = if accept {
 698                let result = contact::Entity::update_many()
 699                    .set(contact::ActiveModel {
 700                        accepted: ActiveValue::set(true),
 701                        should_notify: ActiveValue::set(true),
 702                        ..Default::default()
 703                    })
 704                    .filter(
 705                        contact::Column::UserIdA
 706                            .eq(id_a)
 707                            .and(contact::Column::UserIdB.eq(id_b))
 708                            .and(contact::Column::AToB.eq(a_to_b)),
 709                    )
 710                    .exec(&*tx)
 711                    .await?;
 712                result.rows_affected
 713            } else {
 714                let result = contact::Entity::delete_many()
 715                    .filter(
 716                        contact::Column::UserIdA
 717                            .eq(id_a)
 718                            .and(contact::Column::UserIdB.eq(id_b))
 719                            .and(contact::Column::AToB.eq(a_to_b))
 720                            .and(contact::Column::Accepted.eq(false)),
 721                    )
 722                    .exec(&*tx)
 723                    .await?;
 724
 725                result.rows_affected
 726            };
 727
 728            if rows_affected == 1 {
 729                Ok(())
 730            } else {
 731                Err(anyhow!("no such contact request"))?
 732            }
 733        })
 734        .await
 735    }
 736
 737    pub fn fuzzy_like_string(string: &str) -> String {
 738        let mut result = String::with_capacity(string.len() * 2 + 1);
 739        for c in string.chars() {
 740            if c.is_alphanumeric() {
 741                result.push('%');
 742                result.push(c);
 743            }
 744        }
 745        result.push('%');
 746        result
 747    }
 748
 749    pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
 750        self.transaction(|tx| async {
 751            let tx = tx;
 752            let like_string = Self::fuzzy_like_string(name_query);
 753            let query = "
 754                SELECT users.*
 755                FROM users
 756                WHERE github_login ILIKE $1
 757                ORDER BY github_login <-> $2
 758                LIMIT $3
 759            ";
 760
 761            Ok(user::Entity::find()
 762                .from_raw_sql(Statement::from_sql_and_values(
 763                    self.pool.get_database_backend(),
 764                    query.into(),
 765                    vec![like_string.into(), name_query.into(), limit.into()],
 766                ))
 767                .all(&*tx)
 768                .await?)
 769        })
 770        .await
 771    }
 772
 773    // signups
 774
 775    pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
 776        self.transaction(|tx| async move {
 777            signup::Entity::insert(signup::ActiveModel {
 778                email_address: ActiveValue::set(signup.email_address.clone()),
 779                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 780                email_confirmation_sent: ActiveValue::set(false),
 781                platform_mac: ActiveValue::set(signup.platform_mac),
 782                platform_windows: ActiveValue::set(signup.platform_windows),
 783                platform_linux: ActiveValue::set(signup.platform_linux),
 784                platform_unknown: ActiveValue::set(false),
 785                editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
 786                programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
 787                device_id: ActiveValue::set(signup.device_id.clone()),
 788                added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
 789                ..Default::default()
 790            })
 791            .on_conflict(
 792                OnConflict::column(signup::Column::EmailAddress)
 793                    .update_columns([
 794                        signup::Column::PlatformMac,
 795                        signup::Column::PlatformWindows,
 796                        signup::Column::PlatformLinux,
 797                        signup::Column::EditorFeatures,
 798                        signup::Column::ProgrammingLanguages,
 799                        signup::Column::DeviceId,
 800                        signup::Column::AddedToMailingList,
 801                    ])
 802                    .to_owned(),
 803            )
 804            .exec(&*tx)
 805            .await?;
 806            Ok(())
 807        })
 808        .await
 809    }
 810
 811    pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
 812        self.transaction(|tx| async move {
 813            let signup = signup::Entity::find()
 814                .filter(signup::Column::EmailAddress.eq(email_address))
 815                .one(&*tx)
 816                .await?
 817                .ok_or_else(|| {
 818                    anyhow!("signup with email address {} doesn't exist", email_address)
 819                })?;
 820
 821            Ok(signup)
 822        })
 823        .await
 824    }
 825
 826    pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
 827        self.transaction(|tx| async move {
 828            let query = "
 829                SELECT
 830                    COUNT(*) as count,
 831                    COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
 832                    COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
 833                    COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
 834                    COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
 835                FROM (
 836                    SELECT *
 837                    FROM signups
 838                    WHERE
 839                        NOT email_confirmation_sent
 840                ) AS unsent
 841            ";
 842            Ok(
 843                WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
 844                    self.pool.get_database_backend(),
 845                    query.into(),
 846                    vec![],
 847                ))
 848                .one(&*tx)
 849                .await?
 850                .ok_or_else(|| anyhow!("invalid result"))?,
 851            )
 852        })
 853        .await
 854    }
 855
 856    pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
 857        let emails = invites
 858            .iter()
 859            .map(|s| s.email_address.as_str())
 860            .collect::<Vec<_>>();
 861        self.transaction(|tx| async {
 862            let tx = tx;
 863            signup::Entity::update_many()
 864                .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
 865                .set(signup::ActiveModel {
 866                    email_confirmation_sent: ActiveValue::set(true),
 867                    ..Default::default()
 868                })
 869                .exec(&*tx)
 870                .await?;
 871            Ok(())
 872        })
 873        .await
 874    }
 875
 876    pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
 877        self.transaction(|tx| async move {
 878            Ok(signup::Entity::find()
 879                .select_only()
 880                .column(signup::Column::EmailAddress)
 881                .column(signup::Column::EmailConfirmationCode)
 882                .filter(
 883                    signup::Column::EmailConfirmationSent.eq(false).and(
 884                        signup::Column::PlatformMac
 885                            .eq(true)
 886                            .or(signup::Column::PlatformUnknown.eq(true)),
 887                    ),
 888                )
 889                .order_by_asc(signup::Column::CreatedAt)
 890                .limit(count as u64)
 891                .into_model()
 892                .all(&*tx)
 893                .await?)
 894        })
 895        .await
 896    }
 897
 898    // invite codes
 899
 900    pub async fn create_invite_from_code(
 901        &self,
 902        code: &str,
 903        email_address: &str,
 904        device_id: Option<&str>,
 905    ) -> Result<Invite> {
 906        self.transaction(|tx| async move {
 907            let existing_user = user::Entity::find()
 908                .filter(user::Column::EmailAddress.eq(email_address))
 909                .one(&*tx)
 910                .await?;
 911
 912            if existing_user.is_some() {
 913                Err(anyhow!("email address is already in use"))?;
 914            }
 915
 916            let inviting_user_with_invites = match user::Entity::find()
 917                .filter(
 918                    user::Column::InviteCode
 919                        .eq(code)
 920                        .and(user::Column::InviteCount.gt(0)),
 921                )
 922                .one(&*tx)
 923                .await?
 924            {
 925                Some(inviting_user) => inviting_user,
 926                None => {
 927                    return Err(Error::Http(
 928                        StatusCode::UNAUTHORIZED,
 929                        "unable to find an invite code with invites remaining".to_string(),
 930                    ))?
 931                }
 932            };
 933            user::Entity::update_many()
 934                .filter(
 935                    user::Column::Id
 936                        .eq(inviting_user_with_invites.id)
 937                        .and(user::Column::InviteCount.gt(0)),
 938                )
 939                .col_expr(
 940                    user::Column::InviteCount,
 941                    Expr::col(user::Column::InviteCount).sub(1),
 942                )
 943                .exec(&*tx)
 944                .await?;
 945
 946            let signup = signup::Entity::insert(signup::ActiveModel {
 947                email_address: ActiveValue::set(email_address.into()),
 948                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 949                email_confirmation_sent: ActiveValue::set(false),
 950                inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
 951                platform_linux: ActiveValue::set(false),
 952                platform_mac: ActiveValue::set(false),
 953                platform_windows: ActiveValue::set(false),
 954                platform_unknown: ActiveValue::set(true),
 955                device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
 956                ..Default::default()
 957            })
 958            .on_conflict(
 959                OnConflict::column(signup::Column::EmailAddress)
 960                    .update_column(signup::Column::InvitingUserId)
 961                    .to_owned(),
 962            )
 963            .exec_with_returning(&*tx)
 964            .await?;
 965
 966            Ok(Invite {
 967                email_address: signup.email_address,
 968                email_confirmation_code: signup.email_confirmation_code,
 969            })
 970        })
 971        .await
 972    }
 973
 974    pub async fn create_user_from_invite(
 975        &self,
 976        invite: &Invite,
 977        user: NewUserParams,
 978    ) -> Result<Option<NewUserResult>> {
 979        self.transaction(|tx| async {
 980            let tx = tx;
 981            let signup = signup::Entity::find()
 982                .filter(
 983                    signup::Column::EmailAddress
 984                        .eq(invite.email_address.as_str())
 985                        .and(
 986                            signup::Column::EmailConfirmationCode
 987                                .eq(invite.email_confirmation_code.as_str()),
 988                        ),
 989                )
 990                .one(&*tx)
 991                .await?
 992                .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
 993
 994            if signup.user_id.is_some() {
 995                return Ok(None);
 996            }
 997
 998            let user = user::Entity::insert(user::ActiveModel {
 999                email_address: ActiveValue::set(Some(invite.email_address.clone())),
1000                github_login: ActiveValue::set(user.github_login.clone()),
1001                github_user_id: ActiveValue::set(Some(user.github_user_id)),
1002                admin: ActiveValue::set(false),
1003                invite_count: ActiveValue::set(user.invite_count),
1004                invite_code: ActiveValue::set(Some(random_invite_code())),
1005                metrics_id: ActiveValue::set(Uuid::new_v4()),
1006                ..Default::default()
1007            })
1008            .on_conflict(
1009                OnConflict::column(user::Column::GithubLogin)
1010                    .update_columns([
1011                        user::Column::EmailAddress,
1012                        user::Column::GithubUserId,
1013                        user::Column::Admin,
1014                    ])
1015                    .to_owned(),
1016            )
1017            .exec_with_returning(&*tx)
1018            .await?;
1019
1020            let mut signup = signup.into_active_model();
1021            signup.user_id = ActiveValue::set(Some(user.id));
1022            let signup = signup.update(&*tx).await?;
1023
1024            if let Some(inviting_user_id) = signup.inviting_user_id {
1025                let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1026                    (inviting_user_id, user.id, true)
1027                } else {
1028                    (user.id, inviting_user_id, false)
1029                };
1030
1031                contact::Entity::insert(contact::ActiveModel {
1032                    user_id_a: ActiveValue::set(user_id_a),
1033                    user_id_b: ActiveValue::set(user_id_b),
1034                    a_to_b: ActiveValue::set(a_to_b),
1035                    should_notify: ActiveValue::set(true),
1036                    accepted: ActiveValue::set(true),
1037                    ..Default::default()
1038                })
1039                .on_conflict(OnConflict::new().do_nothing().to_owned())
1040                .exec_without_returning(&*tx)
1041                .await?;
1042            }
1043
1044            Ok(Some(NewUserResult {
1045                user_id: user.id,
1046                metrics_id: user.metrics_id.to_string(),
1047                inviting_user_id: signup.inviting_user_id,
1048                signup_device_id: signup.device_id,
1049            }))
1050        })
1051        .await
1052    }
1053
1054    pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1055        self.transaction(|tx| async move {
1056            if count > 0 {
1057                user::Entity::update_many()
1058                    .filter(
1059                        user::Column::Id
1060                            .eq(id)
1061                            .and(user::Column::InviteCode.is_null()),
1062                    )
1063                    .set(user::ActiveModel {
1064                        invite_code: ActiveValue::set(Some(random_invite_code())),
1065                        ..Default::default()
1066                    })
1067                    .exec(&*tx)
1068                    .await?;
1069            }
1070
1071            user::Entity::update_many()
1072                .filter(user::Column::Id.eq(id))
1073                .set(user::ActiveModel {
1074                    invite_count: ActiveValue::set(count),
1075                    ..Default::default()
1076                })
1077                .exec(&*tx)
1078                .await?;
1079            Ok(())
1080        })
1081        .await
1082    }
1083
1084    pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1085        self.transaction(|tx| async move {
1086            match user::Entity::find_by_id(id).one(&*tx).await? {
1087                Some(user) if user.invite_code.is_some() => {
1088                    Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1089                }
1090                _ => Ok(None),
1091            }
1092        })
1093        .await
1094    }
1095
1096    pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1097        self.transaction(|tx| async move {
1098            user::Entity::find()
1099                .filter(user::Column::InviteCode.eq(code))
1100                .one(&*tx)
1101                .await?
1102                .ok_or_else(|| {
1103                    Error::Http(
1104                        StatusCode::NOT_FOUND,
1105                        "that invite code does not exist".to_string(),
1106                    )
1107                })
1108        })
1109        .await
1110    }
1111
1112    // rooms
1113
1114    pub async fn incoming_call_for_user(
1115        &self,
1116        user_id: UserId,
1117    ) -> Result<Option<proto::IncomingCall>> {
1118        self.transaction(|tx| async move {
1119            let pending_participant = room_participant::Entity::find()
1120                .filter(
1121                    room_participant::Column::UserId
1122                        .eq(user_id)
1123                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1124                )
1125                .one(&*tx)
1126                .await?;
1127
1128            if let Some(pending_participant) = pending_participant {
1129                let room = self.get_room(pending_participant.room_id, &tx).await?;
1130                Ok(Self::build_incoming_call(&room, user_id))
1131            } else {
1132                Ok(None)
1133            }
1134        })
1135        .await
1136    }
1137
1138    pub async fn create_room(
1139        &self,
1140        user_id: UserId,
1141        connection: ConnectionId,
1142        live_kit_room: &str,
1143    ) -> Result<RoomGuard<proto::Room>> {
1144        self.room_transaction(|tx| async move {
1145            let room = room::ActiveModel {
1146                live_kit_room: ActiveValue::set(live_kit_room.into()),
1147                ..Default::default()
1148            }
1149            .insert(&*tx)
1150            .await?;
1151            let room_id = room.id;
1152
1153            room_participant::ActiveModel {
1154                room_id: ActiveValue::set(room_id),
1155                user_id: ActiveValue::set(user_id),
1156                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1157                answering_connection_server_id: ActiveValue::set(Some(ServerId(
1158                    connection.epoch as i32,
1159                ))),
1160                answering_connection_lost: ActiveValue::set(false),
1161                calling_user_id: ActiveValue::set(user_id),
1162                calling_connection_id: ActiveValue::set(connection.id as i32),
1163                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1164                    connection.epoch as i32,
1165                ))),
1166                ..Default::default()
1167            }
1168            .insert(&*tx)
1169            .await?;
1170
1171            let room = self.get_room(room_id, &tx).await?;
1172            Ok((room_id, room))
1173        })
1174        .await
1175    }
1176
1177    pub async fn call(
1178        &self,
1179        room_id: RoomId,
1180        calling_user_id: UserId,
1181        calling_connection: ConnectionId,
1182        called_user_id: UserId,
1183        initial_project_id: Option<ProjectId>,
1184    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1185        self.room_transaction(|tx| async move {
1186            room_participant::ActiveModel {
1187                room_id: ActiveValue::set(room_id),
1188                user_id: ActiveValue::set(called_user_id),
1189                answering_connection_lost: ActiveValue::set(false),
1190                calling_user_id: ActiveValue::set(calling_user_id),
1191                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1192                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1193                    calling_connection.epoch as i32,
1194                ))),
1195                initial_project_id: ActiveValue::set(initial_project_id),
1196                ..Default::default()
1197            }
1198            .insert(&*tx)
1199            .await?;
1200
1201            let room = self.get_room(room_id, &tx).await?;
1202            let incoming_call = Self::build_incoming_call(&room, called_user_id)
1203                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1204            Ok((room_id, (room, incoming_call)))
1205        })
1206        .await
1207    }
1208
1209    pub async fn call_failed(
1210        &self,
1211        room_id: RoomId,
1212        called_user_id: UserId,
1213    ) -> Result<RoomGuard<proto::Room>> {
1214        self.room_transaction(|tx| async move {
1215            room_participant::Entity::delete_many()
1216                .filter(
1217                    room_participant::Column::RoomId
1218                        .eq(room_id)
1219                        .and(room_participant::Column::UserId.eq(called_user_id)),
1220                )
1221                .exec(&*tx)
1222                .await?;
1223            let room = self.get_room(room_id, &tx).await?;
1224            Ok((room_id, room))
1225        })
1226        .await
1227    }
1228
1229    pub async fn decline_call(
1230        &self,
1231        expected_room_id: Option<RoomId>,
1232        user_id: UserId,
1233    ) -> Result<RoomGuard<proto::Room>> {
1234        self.room_transaction(|tx| async move {
1235            let participant = room_participant::Entity::find()
1236                .filter(
1237                    room_participant::Column::UserId
1238                        .eq(user_id)
1239                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1240                )
1241                .one(&*tx)
1242                .await?
1243                .ok_or_else(|| anyhow!("could not decline call"))?;
1244            let room_id = participant.room_id;
1245
1246            if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) {
1247                return Err(anyhow!("declining call on unexpected room"))?;
1248            }
1249
1250            room_participant::Entity::delete(participant.into_active_model())
1251                .exec(&*tx)
1252                .await?;
1253
1254            let room = self.get_room(room_id, &tx).await?;
1255            Ok((room_id, room))
1256        })
1257        .await
1258    }
1259
1260    pub async fn cancel_call(
1261        &self,
1262        expected_room_id: Option<RoomId>,
1263        calling_connection: ConnectionId,
1264        called_user_id: UserId,
1265    ) -> Result<RoomGuard<proto::Room>> {
1266        self.room_transaction(|tx| async move {
1267            let participant = room_participant::Entity::find()
1268                .filter(
1269                    Condition::all()
1270                        .add(room_participant::Column::UserId.eq(called_user_id))
1271                        .add(
1272                            room_participant::Column::CallingConnectionId
1273                                .eq(calling_connection.id as i32),
1274                        )
1275                        .add(
1276                            room_participant::Column::CallingConnectionServerId
1277                                .eq(calling_connection.epoch as i32),
1278                        )
1279                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1280                )
1281                .one(&*tx)
1282                .await?
1283                .ok_or_else(|| anyhow!("could not cancel call"))?;
1284            let room_id = participant.room_id;
1285            if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) {
1286                return Err(anyhow!("canceling call on unexpected room"))?;
1287            }
1288
1289            room_participant::Entity::delete(participant.into_active_model())
1290                .exec(&*tx)
1291                .await?;
1292
1293            let room = self.get_room(room_id, &tx).await?;
1294            Ok((room_id, room))
1295        })
1296        .await
1297    }
1298
1299    pub async fn join_room(
1300        &self,
1301        room_id: RoomId,
1302        user_id: UserId,
1303        connection: ConnectionId,
1304    ) -> Result<RoomGuard<proto::Room>> {
1305        self.room_transaction(|tx| async move {
1306            let result = room_participant::Entity::update_many()
1307                .filter(
1308                    Condition::all()
1309                        .add(room_participant::Column::RoomId.eq(room_id))
1310                        .add(room_participant::Column::UserId.eq(user_id))
1311                        .add(
1312                            Condition::any()
1313                                .add(room_participant::Column::AnsweringConnectionId.is_null())
1314                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1315                                .add(
1316                                    room_participant::Column::AnsweringConnectionServerId
1317                                        .ne(connection.epoch as i32),
1318                                ),
1319                        ),
1320                )
1321                .set(room_participant::ActiveModel {
1322                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1323                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1324                        connection.epoch as i32,
1325                    ))),
1326                    answering_connection_lost: ActiveValue::set(false),
1327                    ..Default::default()
1328                })
1329                .exec(&*tx)
1330                .await?;
1331            if result.rows_affected == 0 {
1332                Err(anyhow!("room does not exist or was already joined"))?
1333            } else {
1334                let room = self.get_room(room_id, &tx).await?;
1335                Ok((room_id, room))
1336            }
1337        })
1338        .await
1339    }
1340
1341    pub async fn leave_room(&self, connection: ConnectionId) -> Result<RoomGuard<LeftRoom>> {
1342        self.room_transaction(|tx| async move {
1343            let leaving_participant = room_participant::Entity::find()
1344                .filter(
1345                    Condition::all()
1346                        .add(
1347                            room_participant::Column::AnsweringConnectionId
1348                                .eq(connection.id as i32),
1349                        )
1350                        .add(
1351                            room_participant::Column::AnsweringConnectionServerId
1352                                .eq(connection.epoch as i32),
1353                        ),
1354                )
1355                .one(&*tx)
1356                .await?;
1357
1358            if let Some(leaving_participant) = leaving_participant {
1359                // Leave room.
1360                let room_id = leaving_participant.room_id;
1361                room_participant::Entity::delete_by_id(leaving_participant.id)
1362                    .exec(&*tx)
1363                    .await?;
1364
1365                // Cancel pending calls initiated by the leaving user.
1366                let called_participants = room_participant::Entity::find()
1367                    .filter(
1368                        Condition::all()
1369                            .add(
1370                                room_participant::Column::CallingConnectionId
1371                                    .eq(connection.id as i32),
1372                            )
1373                            .add(
1374                                room_participant::Column::CallingConnectionServerId
1375                                    .eq(connection.epoch as i32),
1376                            )
1377                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
1378                    )
1379                    .all(&*tx)
1380                    .await?;
1381                room_participant::Entity::delete_many()
1382                    .filter(
1383                        room_participant::Column::Id
1384                            .is_in(called_participants.iter().map(|participant| participant.id)),
1385                    )
1386                    .exec(&*tx)
1387                    .await?;
1388                let canceled_calls_to_user_ids = called_participants
1389                    .into_iter()
1390                    .map(|participant| participant.user_id)
1391                    .collect();
1392
1393                // Detect left projects.
1394                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1395                enum QueryProjectIds {
1396                    ProjectId,
1397                }
1398                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1399                    .select_only()
1400                    .column_as(
1401                        project_collaborator::Column::ProjectId,
1402                        QueryProjectIds::ProjectId,
1403                    )
1404                    .filter(
1405                        Condition::all()
1406                            .add(
1407                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1408                            )
1409                            .add(
1410                                project_collaborator::Column::ConnectionServerId
1411                                    .eq(connection.epoch as i32),
1412                            ),
1413                    )
1414                    .into_values::<_, QueryProjectIds>()
1415                    .all(&*tx)
1416                    .await?;
1417                let mut left_projects = HashMap::default();
1418                let mut collaborators = project_collaborator::Entity::find()
1419                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1420                    .stream(&*tx)
1421                    .await?;
1422                while let Some(collaborator) = collaborators.next().await {
1423                    let collaborator = collaborator?;
1424                    let left_project =
1425                        left_projects
1426                            .entry(collaborator.project_id)
1427                            .or_insert(LeftProject {
1428                                id: collaborator.project_id,
1429                                host_user_id: Default::default(),
1430                                connection_ids: Default::default(),
1431                                host_connection_id: Default::default(),
1432                            });
1433
1434                    let collaborator_connection_id = ConnectionId {
1435                        epoch: collaborator.connection_server_id.0 as u32,
1436                        id: collaborator.connection_id as u32,
1437                    };
1438                    if collaborator_connection_id != connection {
1439                        left_project.connection_ids.push(collaborator_connection_id);
1440                    }
1441
1442                    if collaborator.is_host {
1443                        left_project.host_user_id = collaborator.user_id;
1444                        left_project.host_connection_id = collaborator_connection_id;
1445                    }
1446                }
1447                drop(collaborators);
1448
1449                // Leave projects.
1450                project_collaborator::Entity::delete_many()
1451                    .filter(
1452                        Condition::all()
1453                            .add(
1454                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1455                            )
1456                            .add(
1457                                project_collaborator::Column::ConnectionServerId
1458                                    .eq(connection.epoch as i32),
1459                            ),
1460                    )
1461                    .exec(&*tx)
1462                    .await?;
1463
1464                // Unshare projects.
1465                project::Entity::delete_many()
1466                    .filter(
1467                        Condition::all()
1468                            .add(project::Column::RoomId.eq(room_id))
1469                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
1470                            .add(
1471                                project::Column::HostConnectionServerId.eq(connection.epoch as i32),
1472                            ),
1473                    )
1474                    .exec(&*tx)
1475                    .await?;
1476
1477                let room = self.get_room(room_id, &tx).await?;
1478                if room.participants.is_empty() {
1479                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
1480                }
1481
1482                let left_room = LeftRoom {
1483                    room,
1484                    left_projects,
1485                    canceled_calls_to_user_ids,
1486                };
1487
1488                if left_room.room.participants.is_empty() {
1489                    self.rooms.remove(&room_id);
1490                }
1491
1492                Ok((room_id, left_room))
1493            } else {
1494                Err(anyhow!("could not leave room"))?
1495            }
1496        })
1497        .await
1498    }
1499
1500    pub async fn update_room_participant_location(
1501        &self,
1502        room_id: RoomId,
1503        connection: ConnectionId,
1504        location: proto::ParticipantLocation,
1505    ) -> Result<RoomGuard<proto::Room>> {
1506        self.room_transaction(|tx| async {
1507            let tx = tx;
1508            let location_kind;
1509            let location_project_id;
1510            match location
1511                .variant
1512                .as_ref()
1513                .ok_or_else(|| anyhow!("invalid location"))?
1514            {
1515                proto::participant_location::Variant::SharedProject(project) => {
1516                    location_kind = 0;
1517                    location_project_id = Some(ProjectId::from_proto(project.id));
1518                }
1519                proto::participant_location::Variant::UnsharedProject(_) => {
1520                    location_kind = 1;
1521                    location_project_id = None;
1522                }
1523                proto::participant_location::Variant::External(_) => {
1524                    location_kind = 2;
1525                    location_project_id = None;
1526                }
1527            }
1528
1529            let result = room_participant::Entity::update_many()
1530                .filter(
1531                    Condition::all()
1532                        .add(room_participant::Column::RoomId.eq(room_id))
1533                        .add(
1534                            room_participant::Column::AnsweringConnectionId
1535                                .eq(connection.id as i32),
1536                        )
1537                        .add(
1538                            room_participant::Column::AnsweringConnectionServerId
1539                                .eq(connection.epoch as i32),
1540                        ),
1541                )
1542                .set(room_participant::ActiveModel {
1543                    location_kind: ActiveValue::set(Some(location_kind)),
1544                    location_project_id: ActiveValue::set(location_project_id),
1545                    ..Default::default()
1546                })
1547                .exec(&*tx)
1548                .await?;
1549
1550            if result.rows_affected == 1 {
1551                let room = self.get_room(room_id, &tx).await?;
1552                Ok((room_id, room))
1553            } else {
1554                Err(anyhow!("could not update room participant location"))?
1555            }
1556        })
1557        .await
1558    }
1559
1560    pub async fn connection_lost(
1561        &self,
1562        connection: ConnectionId,
1563    ) -> Result<RoomGuard<Vec<LeftProject>>> {
1564        self.room_transaction(|tx| async move {
1565            let participant = room_participant::Entity::find()
1566                .filter(
1567                    Condition::all()
1568                        .add(
1569                            room_participant::Column::AnsweringConnectionId
1570                                .eq(connection.id as i32),
1571                        )
1572                        .add(
1573                            room_participant::Column::AnsweringConnectionServerId
1574                                .eq(connection.epoch as i32),
1575                        ),
1576                )
1577                .one(&*tx)
1578                .await?
1579                .ok_or_else(|| anyhow!("not a participant in any room"))?;
1580            let room_id = participant.room_id;
1581
1582            room_participant::Entity::update(room_participant::ActiveModel {
1583                answering_connection_lost: ActiveValue::set(true),
1584                ..participant.into_active_model()
1585            })
1586            .exec(&*tx)
1587            .await?;
1588
1589            let collaborator_on_projects = project_collaborator::Entity::find()
1590                .find_also_related(project::Entity)
1591                .filter(
1592                    Condition::all()
1593                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
1594                        .add(
1595                            project_collaborator::Column::ConnectionServerId
1596                                .eq(connection.epoch as i32),
1597                        ),
1598                )
1599                .all(&*tx)
1600                .await?;
1601            project_collaborator::Entity::delete_many()
1602                .filter(
1603                    Condition::all()
1604                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
1605                        .add(
1606                            project_collaborator::Column::ConnectionServerId
1607                                .eq(connection.epoch as i32),
1608                        ),
1609                )
1610                .exec(&*tx)
1611                .await?;
1612
1613            let mut left_projects = Vec::new();
1614            for (_, project) in collaborator_on_projects {
1615                if let Some(project) = project {
1616                    let collaborators = project
1617                        .find_related(project_collaborator::Entity)
1618                        .all(&*tx)
1619                        .await?;
1620                    let connection_ids = collaborators
1621                        .into_iter()
1622                        .map(|collaborator| ConnectionId {
1623                            id: collaborator.connection_id as u32,
1624                            epoch: collaborator.connection_server_id.0 as u32,
1625                        })
1626                        .collect();
1627
1628                    left_projects.push(LeftProject {
1629                        id: project.id,
1630                        host_user_id: project.host_user_id,
1631                        host_connection_id: ConnectionId {
1632                            id: project.host_connection_id as u32,
1633                            epoch: project.host_connection_server_id.0 as u32,
1634                        },
1635                        connection_ids,
1636                    });
1637                }
1638            }
1639
1640            project::Entity::delete_many()
1641                .filter(
1642                    Condition::all()
1643                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
1644                        .add(project::Column::HostConnectionServerId.eq(connection.epoch as i32)),
1645                )
1646                .exec(&*tx)
1647                .await?;
1648
1649            Ok((room_id, left_projects))
1650        })
1651        .await
1652    }
1653
1654    fn build_incoming_call(
1655        room: &proto::Room,
1656        called_user_id: UserId,
1657    ) -> Option<proto::IncomingCall> {
1658        let pending_participant = room
1659            .pending_participants
1660            .iter()
1661            .find(|participant| participant.user_id == called_user_id.to_proto())?;
1662
1663        Some(proto::IncomingCall {
1664            room_id: room.id,
1665            calling_user_id: pending_participant.calling_user_id,
1666            participant_user_ids: room
1667                .participants
1668                .iter()
1669                .map(|participant| participant.user_id)
1670                .collect(),
1671            initial_project: room.participants.iter().find_map(|participant| {
1672                let initial_project_id = pending_participant.initial_project_id?;
1673                participant
1674                    .projects
1675                    .iter()
1676                    .find(|project| project.id == initial_project_id)
1677                    .cloned()
1678            }),
1679        })
1680    }
1681
1682    async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1683        let db_room = room::Entity::find_by_id(room_id)
1684            .one(tx)
1685            .await?
1686            .ok_or_else(|| anyhow!("could not find room"))?;
1687
1688        let mut db_participants = db_room
1689            .find_related(room_participant::Entity)
1690            .stream(tx)
1691            .await?;
1692        let mut participants = HashMap::default();
1693        let mut pending_participants = Vec::new();
1694        while let Some(db_participant) = db_participants.next().await {
1695            let db_participant = db_participant?;
1696            if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
1697                .answering_connection_id
1698                .zip(db_participant.answering_connection_server_id)
1699            {
1700                let location = match (
1701                    db_participant.location_kind,
1702                    db_participant.location_project_id,
1703                ) {
1704                    (Some(0), Some(project_id)) => {
1705                        Some(proto::participant_location::Variant::SharedProject(
1706                            proto::participant_location::SharedProject {
1707                                id: project_id.to_proto(),
1708                            },
1709                        ))
1710                    }
1711                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1712                        Default::default(),
1713                    )),
1714                    _ => Some(proto::participant_location::Variant::External(
1715                        Default::default(),
1716                    )),
1717                };
1718
1719                let answering_connection = ConnectionId {
1720                    epoch: answering_connection_server_id.0 as u32,
1721                    id: answering_connection_id as u32,
1722                };
1723                participants.insert(
1724                    answering_connection,
1725                    proto::Participant {
1726                        user_id: db_participant.user_id.to_proto(),
1727                        peer_id: Some(answering_connection.into()),
1728                        projects: Default::default(),
1729                        location: Some(proto::ParticipantLocation { variant: location }),
1730                    },
1731                );
1732            } else {
1733                pending_participants.push(proto::PendingParticipant {
1734                    user_id: db_participant.user_id.to_proto(),
1735                    calling_user_id: db_participant.calling_user_id.to_proto(),
1736                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1737                });
1738            }
1739        }
1740        drop(db_participants);
1741
1742        let mut db_projects = db_room
1743            .find_related(project::Entity)
1744            .find_with_related(worktree::Entity)
1745            .stream(tx)
1746            .await?;
1747
1748        while let Some(row) = db_projects.next().await {
1749            let (db_project, db_worktree) = row?;
1750            let host_connection = ConnectionId {
1751                epoch: db_project.host_connection_server_id.0 as u32,
1752                id: db_project.host_connection_id as u32,
1753            };
1754            if let Some(participant) = participants.get_mut(&host_connection) {
1755                let project = if let Some(project) = participant
1756                    .projects
1757                    .iter_mut()
1758                    .find(|project| project.id == db_project.id.to_proto())
1759                {
1760                    project
1761                } else {
1762                    participant.projects.push(proto::ParticipantProject {
1763                        id: db_project.id.to_proto(),
1764                        worktree_root_names: Default::default(),
1765                    });
1766                    participant.projects.last_mut().unwrap()
1767                };
1768
1769                if let Some(db_worktree) = db_worktree {
1770                    project.worktree_root_names.push(db_worktree.root_name);
1771                }
1772            }
1773        }
1774
1775        Ok(proto::Room {
1776            id: db_room.id.to_proto(),
1777            live_kit_room: db_room.live_kit_room,
1778            participants: participants.into_values().collect(),
1779            pending_participants,
1780        })
1781    }
1782
1783    // projects
1784
1785    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
1786        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1787        enum QueryAs {
1788            Count,
1789        }
1790
1791        self.transaction(|tx| async move {
1792            Ok(project::Entity::find()
1793                .select_only()
1794                .column_as(project::Column::Id.count(), QueryAs::Count)
1795                .inner_join(user::Entity)
1796                .filter(user::Column::Admin.eq(false))
1797                .into_values::<_, QueryAs>()
1798                .one(&*tx)
1799                .await?
1800                .unwrap_or(0i64) as usize)
1801        })
1802        .await
1803    }
1804
1805    pub async fn share_project(
1806        &self,
1807        room_id: RoomId,
1808        connection: ConnectionId,
1809        worktrees: &[proto::WorktreeMetadata],
1810    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
1811        self.room_transaction(|tx| async move {
1812            let participant = room_participant::Entity::find()
1813                .filter(
1814                    Condition::all()
1815                        .add(
1816                            room_participant::Column::AnsweringConnectionId
1817                                .eq(connection.id as i32),
1818                        )
1819                        .add(
1820                            room_participant::Column::AnsweringConnectionServerId
1821                                .eq(connection.epoch as i32),
1822                        ),
1823                )
1824                .one(&*tx)
1825                .await?
1826                .ok_or_else(|| anyhow!("could not find participant"))?;
1827            if participant.room_id != room_id {
1828                return Err(anyhow!("shared project on unexpected room"))?;
1829            }
1830
1831            let project = project::ActiveModel {
1832                room_id: ActiveValue::set(participant.room_id),
1833                host_user_id: ActiveValue::set(participant.user_id),
1834                host_connection_id: ActiveValue::set(connection.id as i32),
1835                host_connection_server_id: ActiveValue::set(ServerId(connection.epoch as i32)),
1836                ..Default::default()
1837            }
1838            .insert(&*tx)
1839            .await?;
1840
1841            if !worktrees.is_empty() {
1842                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
1843                    worktree::ActiveModel {
1844                        id: ActiveValue::set(worktree.id as i64),
1845                        project_id: ActiveValue::set(project.id),
1846                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
1847                        root_name: ActiveValue::set(worktree.root_name.clone()),
1848                        visible: ActiveValue::set(worktree.visible),
1849                        scan_id: ActiveValue::set(0),
1850                        is_complete: ActiveValue::set(false),
1851                    }
1852                }))
1853                .exec(&*tx)
1854                .await?;
1855            }
1856
1857            project_collaborator::ActiveModel {
1858                project_id: ActiveValue::set(project.id),
1859                connection_id: ActiveValue::set(connection.id as i32),
1860                connection_server_id: ActiveValue::set(ServerId(connection.epoch as i32)),
1861                user_id: ActiveValue::set(participant.user_id),
1862                replica_id: ActiveValue::set(ReplicaId(0)),
1863                is_host: ActiveValue::set(true),
1864                ..Default::default()
1865            }
1866            .insert(&*tx)
1867            .await?;
1868
1869            let room = self.get_room(room_id, &tx).await?;
1870            Ok((room_id, (project.id, room)))
1871        })
1872        .await
1873    }
1874
1875    pub async fn unshare_project(
1876        &self,
1877        project_id: ProjectId,
1878        connection: ConnectionId,
1879    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
1880        self.room_transaction(|tx| async move {
1881            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
1882
1883            let project = project::Entity::find_by_id(project_id)
1884                .one(&*tx)
1885                .await?
1886                .ok_or_else(|| anyhow!("project not found"))?;
1887            let host_connection = ConnectionId {
1888                epoch: project.host_connection_server_id.0 as u32,
1889                id: project.host_connection_id as u32,
1890            };
1891            if host_connection == connection {
1892                let room_id = project.room_id;
1893                project::Entity::delete(project.into_active_model())
1894                    .exec(&*tx)
1895                    .await?;
1896                let room = self.get_room(room_id, &tx).await?;
1897                Ok((room_id, (room, guest_connection_ids)))
1898            } else {
1899                Err(anyhow!("cannot unshare a project hosted by another user"))?
1900            }
1901        })
1902        .await
1903    }
1904
1905    pub async fn update_project(
1906        &self,
1907        project_id: ProjectId,
1908        connection: ConnectionId,
1909        worktrees: &[proto::WorktreeMetadata],
1910    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
1911        self.room_transaction(|tx| async move {
1912            let project = project::Entity::find_by_id(project_id)
1913                .filter(
1914                    Condition::all()
1915                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
1916                        .add(project::Column::HostConnectionServerId.eq(connection.epoch as i32)),
1917                )
1918                .one(&*tx)
1919                .await?
1920                .ok_or_else(|| anyhow!("no such project"))?;
1921
1922            if !worktrees.is_empty() {
1923                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
1924                    worktree::ActiveModel {
1925                        id: ActiveValue::set(worktree.id as i64),
1926                        project_id: ActiveValue::set(project.id),
1927                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
1928                        root_name: ActiveValue::set(worktree.root_name.clone()),
1929                        visible: ActiveValue::set(worktree.visible),
1930                        scan_id: ActiveValue::set(0),
1931                        is_complete: ActiveValue::set(false),
1932                    }
1933                }))
1934                .on_conflict(
1935                    OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
1936                        .update_column(worktree::Column::RootName)
1937                        .to_owned(),
1938                )
1939                .exec(&*tx)
1940                .await?;
1941            }
1942
1943            worktree::Entity::delete_many()
1944                .filter(
1945                    worktree::Column::ProjectId.eq(project.id).and(
1946                        worktree::Column::Id
1947                            .is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
1948                    ),
1949                )
1950                .exec(&*tx)
1951                .await?;
1952
1953            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
1954            let room = self.get_room(project.room_id, &tx).await?;
1955            Ok((project.room_id, (room, guest_connection_ids)))
1956        })
1957        .await
1958    }
1959
1960    pub async fn update_worktree(
1961        &self,
1962        update: &proto::UpdateWorktree,
1963        connection: ConnectionId,
1964    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
1965        self.room_transaction(|tx| async move {
1966            let project_id = ProjectId::from_proto(update.project_id);
1967            let worktree_id = update.worktree_id as i64;
1968
1969            // Ensure the update comes from the host.
1970            let project = project::Entity::find_by_id(project_id)
1971                .filter(
1972                    Condition::all()
1973                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
1974                        .add(project::Column::HostConnectionServerId.eq(connection.epoch as i32)),
1975                )
1976                .one(&*tx)
1977                .await?
1978                .ok_or_else(|| anyhow!("no such project"))?;
1979            let room_id = project.room_id;
1980
1981            // Update metadata.
1982            worktree::Entity::update(worktree::ActiveModel {
1983                id: ActiveValue::set(worktree_id),
1984                project_id: ActiveValue::set(project_id),
1985                root_name: ActiveValue::set(update.root_name.clone()),
1986                scan_id: ActiveValue::set(update.scan_id as i64),
1987                is_complete: ActiveValue::set(update.is_last_update),
1988                abs_path: ActiveValue::set(update.abs_path.clone()),
1989                ..Default::default()
1990            })
1991            .exec(&*tx)
1992            .await?;
1993
1994            if !update.updated_entries.is_empty() {
1995                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
1996                    let mtime = entry.mtime.clone().unwrap_or_default();
1997                    worktree_entry::ActiveModel {
1998                        project_id: ActiveValue::set(project_id),
1999                        worktree_id: ActiveValue::set(worktree_id),
2000                        id: ActiveValue::set(entry.id as i64),
2001                        is_dir: ActiveValue::set(entry.is_dir),
2002                        path: ActiveValue::set(entry.path.clone()),
2003                        inode: ActiveValue::set(entry.inode as i64),
2004                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2005                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2006                        is_symlink: ActiveValue::set(entry.is_symlink),
2007                        is_ignored: ActiveValue::set(entry.is_ignored),
2008                    }
2009                }))
2010                .on_conflict(
2011                    OnConflict::columns([
2012                        worktree_entry::Column::ProjectId,
2013                        worktree_entry::Column::WorktreeId,
2014                        worktree_entry::Column::Id,
2015                    ])
2016                    .update_columns([
2017                        worktree_entry::Column::IsDir,
2018                        worktree_entry::Column::Path,
2019                        worktree_entry::Column::Inode,
2020                        worktree_entry::Column::MtimeSeconds,
2021                        worktree_entry::Column::MtimeNanos,
2022                        worktree_entry::Column::IsSymlink,
2023                        worktree_entry::Column::IsIgnored,
2024                    ])
2025                    .to_owned(),
2026                )
2027                .exec(&*tx)
2028                .await?;
2029            }
2030
2031            if !update.removed_entries.is_empty() {
2032                worktree_entry::Entity::delete_many()
2033                    .filter(
2034                        worktree_entry::Column::ProjectId
2035                            .eq(project_id)
2036                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2037                            .and(
2038                                worktree_entry::Column::Id
2039                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2040                            ),
2041                    )
2042                    .exec(&*tx)
2043                    .await?;
2044            }
2045
2046            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2047            Ok((room_id, connection_ids))
2048        })
2049        .await
2050    }
2051
2052    pub async fn update_diagnostic_summary(
2053        &self,
2054        update: &proto::UpdateDiagnosticSummary,
2055        connection: ConnectionId,
2056    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2057        self.room_transaction(|tx| async move {
2058            let project_id = ProjectId::from_proto(update.project_id);
2059            let worktree_id = update.worktree_id as i64;
2060            let summary = update
2061                .summary
2062                .as_ref()
2063                .ok_or_else(|| anyhow!("invalid summary"))?;
2064
2065            // Ensure the update comes from the host.
2066            let project = project::Entity::find_by_id(project_id)
2067                .one(&*tx)
2068                .await?
2069                .ok_or_else(|| anyhow!("no such project"))?;
2070            let host_connection = ConnectionId {
2071                epoch: project.host_connection_server_id.0 as u32,
2072                id: project.host_connection_id as u32,
2073            };
2074            if host_connection != connection {
2075                return Err(anyhow!("can't update a project hosted by someone else"))?;
2076            }
2077
2078            // Update summary.
2079            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2080                project_id: ActiveValue::set(project_id),
2081                worktree_id: ActiveValue::set(worktree_id),
2082                path: ActiveValue::set(summary.path.clone()),
2083                language_server_id: ActiveValue::set(summary.language_server_id as i64),
2084                error_count: ActiveValue::set(summary.error_count as i32),
2085                warning_count: ActiveValue::set(summary.warning_count as i32),
2086                ..Default::default()
2087            })
2088            .on_conflict(
2089                OnConflict::columns([
2090                    worktree_diagnostic_summary::Column::ProjectId,
2091                    worktree_diagnostic_summary::Column::WorktreeId,
2092                    worktree_diagnostic_summary::Column::Path,
2093                ])
2094                .update_columns([
2095                    worktree_diagnostic_summary::Column::LanguageServerId,
2096                    worktree_diagnostic_summary::Column::ErrorCount,
2097                    worktree_diagnostic_summary::Column::WarningCount,
2098                ])
2099                .to_owned(),
2100            )
2101            .exec(&*tx)
2102            .await?;
2103
2104            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2105            Ok((project.room_id, connection_ids))
2106        })
2107        .await
2108    }
2109
2110    pub async fn start_language_server(
2111        &self,
2112        update: &proto::StartLanguageServer,
2113        connection: ConnectionId,
2114    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2115        self.room_transaction(|tx| async move {
2116            let project_id = ProjectId::from_proto(update.project_id);
2117            let server = update
2118                .server
2119                .as_ref()
2120                .ok_or_else(|| anyhow!("invalid language server"))?;
2121
2122            // Ensure the update comes from the host.
2123            let project = project::Entity::find_by_id(project_id)
2124                .one(&*tx)
2125                .await?
2126                .ok_or_else(|| anyhow!("no such project"))?;
2127            let host_connection = ConnectionId {
2128                epoch: project.host_connection_server_id.0 as u32,
2129                id: project.host_connection_id as u32,
2130            };
2131            if host_connection != connection {
2132                return Err(anyhow!("can't update a project hosted by someone else"))?;
2133            }
2134
2135            // Add the newly-started language server.
2136            language_server::Entity::insert(language_server::ActiveModel {
2137                project_id: ActiveValue::set(project_id),
2138                id: ActiveValue::set(server.id as i64),
2139                name: ActiveValue::set(server.name.clone()),
2140                ..Default::default()
2141            })
2142            .on_conflict(
2143                OnConflict::columns([
2144                    language_server::Column::ProjectId,
2145                    language_server::Column::Id,
2146                ])
2147                .update_column(language_server::Column::Name)
2148                .to_owned(),
2149            )
2150            .exec(&*tx)
2151            .await?;
2152
2153            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2154            Ok((project.room_id, connection_ids))
2155        })
2156        .await
2157    }
2158
2159    pub async fn join_project(
2160        &self,
2161        project_id: ProjectId,
2162        connection: ConnectionId,
2163    ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2164        self.room_transaction(|tx| async move {
2165            let participant = room_participant::Entity::find()
2166                .filter(
2167                    Condition::all()
2168                        .add(
2169                            room_participant::Column::AnsweringConnectionId
2170                                .eq(connection.id as i32),
2171                        )
2172                        .add(
2173                            room_participant::Column::AnsweringConnectionServerId
2174                                .eq(connection.epoch as i32),
2175                        ),
2176                )
2177                .one(&*tx)
2178                .await?
2179                .ok_or_else(|| anyhow!("must join a room first"))?;
2180
2181            let project = project::Entity::find_by_id(project_id)
2182                .one(&*tx)
2183                .await?
2184                .ok_or_else(|| anyhow!("no such project"))?;
2185            if project.room_id != participant.room_id {
2186                return Err(anyhow!("no such project"))?;
2187            }
2188
2189            let mut collaborators = project
2190                .find_related(project_collaborator::Entity)
2191                .all(&*tx)
2192                .await?;
2193            let replica_ids = collaborators
2194                .iter()
2195                .map(|c| c.replica_id)
2196                .collect::<HashSet<_>>();
2197            let mut replica_id = ReplicaId(1);
2198            while replica_ids.contains(&replica_id) {
2199                replica_id.0 += 1;
2200            }
2201            let new_collaborator = project_collaborator::ActiveModel {
2202                project_id: ActiveValue::set(project_id),
2203                connection_id: ActiveValue::set(connection.id as i32),
2204                connection_server_id: ActiveValue::set(ServerId(connection.epoch as i32)),
2205                user_id: ActiveValue::set(participant.user_id),
2206                replica_id: ActiveValue::set(replica_id),
2207                is_host: ActiveValue::set(false),
2208                ..Default::default()
2209            }
2210            .insert(&*tx)
2211            .await?;
2212            collaborators.push(new_collaborator);
2213
2214            let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2215            let mut worktrees = db_worktrees
2216                .into_iter()
2217                .map(|db_worktree| {
2218                    (
2219                        db_worktree.id as u64,
2220                        Worktree {
2221                            id: db_worktree.id as u64,
2222                            abs_path: db_worktree.abs_path,
2223                            root_name: db_worktree.root_name,
2224                            visible: db_worktree.visible,
2225                            entries: Default::default(),
2226                            diagnostic_summaries: Default::default(),
2227                            scan_id: db_worktree.scan_id as u64,
2228                            is_complete: db_worktree.is_complete,
2229                        },
2230                    )
2231                })
2232                .collect::<BTreeMap<_, _>>();
2233
2234            // Populate worktree entries.
2235            {
2236                let mut db_entries = worktree_entry::Entity::find()
2237                    .filter(worktree_entry::Column::ProjectId.eq(project_id))
2238                    .stream(&*tx)
2239                    .await?;
2240                while let Some(db_entry) = db_entries.next().await {
2241                    let db_entry = db_entry?;
2242                    if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2243                        worktree.entries.push(proto::Entry {
2244                            id: db_entry.id as u64,
2245                            is_dir: db_entry.is_dir,
2246                            path: db_entry.path,
2247                            inode: db_entry.inode as u64,
2248                            mtime: Some(proto::Timestamp {
2249                                seconds: db_entry.mtime_seconds as u64,
2250                                nanos: db_entry.mtime_nanos as u32,
2251                            }),
2252                            is_symlink: db_entry.is_symlink,
2253                            is_ignored: db_entry.is_ignored,
2254                        });
2255                    }
2256                }
2257            }
2258
2259            // Populate worktree diagnostic summaries.
2260            {
2261                let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2262                    .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2263                    .stream(&*tx)
2264                    .await?;
2265                while let Some(db_summary) = db_summaries.next().await {
2266                    let db_summary = db_summary?;
2267                    if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2268                        worktree
2269                            .diagnostic_summaries
2270                            .push(proto::DiagnosticSummary {
2271                                path: db_summary.path,
2272                                language_server_id: db_summary.language_server_id as u64,
2273                                error_count: db_summary.error_count as u32,
2274                                warning_count: db_summary.warning_count as u32,
2275                            });
2276                    }
2277                }
2278            }
2279
2280            // Populate language servers.
2281            let language_servers = project
2282                .find_related(language_server::Entity)
2283                .all(&*tx)
2284                .await?;
2285
2286            let room_id = project.room_id;
2287            let project = Project {
2288                collaborators,
2289                worktrees,
2290                language_servers: language_servers
2291                    .into_iter()
2292                    .map(|language_server| proto::LanguageServer {
2293                        id: language_server.id as u64,
2294                        name: language_server.name,
2295                    })
2296                    .collect(),
2297            };
2298            Ok((room_id, (project, replica_id as ReplicaId)))
2299        })
2300        .await
2301    }
2302
2303    pub async fn leave_project(
2304        &self,
2305        project_id: ProjectId,
2306        connection: ConnectionId,
2307    ) -> Result<RoomGuard<LeftProject>> {
2308        self.room_transaction(|tx| async move {
2309            let result = project_collaborator::Entity::delete_many()
2310                .filter(
2311                    Condition::all()
2312                        .add(project_collaborator::Column::ProjectId.eq(project_id))
2313                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2314                        .add(
2315                            project_collaborator::Column::ConnectionServerId
2316                                .eq(connection.epoch as i32),
2317                        ),
2318                )
2319                .exec(&*tx)
2320                .await?;
2321            if result.rows_affected == 0 {
2322                Err(anyhow!("not a collaborator on this project"))?;
2323            }
2324
2325            let project = project::Entity::find_by_id(project_id)
2326                .one(&*tx)
2327                .await?
2328                .ok_or_else(|| anyhow!("no such project"))?;
2329            let collaborators = project
2330                .find_related(project_collaborator::Entity)
2331                .all(&*tx)
2332                .await?;
2333            let connection_ids = collaborators
2334                .into_iter()
2335                .map(|collaborator| ConnectionId {
2336                    epoch: collaborator.connection_server_id.0 as u32,
2337                    id: collaborator.connection_id as u32,
2338                })
2339                .collect();
2340
2341            let left_project = LeftProject {
2342                id: project_id,
2343                host_user_id: project.host_user_id,
2344                host_connection_id: ConnectionId {
2345                    epoch: project.host_connection_server_id.0 as u32,
2346                    id: project.host_connection_id as u32,
2347                },
2348                connection_ids,
2349            };
2350            Ok((project.room_id, left_project))
2351        })
2352        .await
2353    }
2354
2355    pub async fn project_collaborators(
2356        &self,
2357        project_id: ProjectId,
2358        connection: ConnectionId,
2359    ) -> Result<RoomGuard<Vec<project_collaborator::Model>>> {
2360        self.room_transaction(|tx| async move {
2361            let project = project::Entity::find_by_id(project_id)
2362                .one(&*tx)
2363                .await?
2364                .ok_or_else(|| anyhow!("no such project"))?;
2365            let collaborators = project_collaborator::Entity::find()
2366                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2367                .all(&*tx)
2368                .await?;
2369
2370            if collaborators.iter().any(|collaborator| {
2371                let collaborator_connection = ConnectionId {
2372                    epoch: collaborator.connection_server_id.0 as u32,
2373                    id: collaborator.connection_id as u32,
2374                };
2375                collaborator_connection == connection
2376            }) {
2377                Ok((project.room_id, collaborators))
2378            } else {
2379                Err(anyhow!("no such project"))?
2380            }
2381        })
2382        .await
2383    }
2384
2385    pub async fn project_connection_ids(
2386        &self,
2387        project_id: ProjectId,
2388        connection_id: ConnectionId,
2389    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
2390        self.room_transaction(|tx| async move {
2391            let project = project::Entity::find_by_id(project_id)
2392                .one(&*tx)
2393                .await?
2394                .ok_or_else(|| anyhow!("no such project"))?;
2395            let mut participants = project_collaborator::Entity::find()
2396                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2397                .stream(&*tx)
2398                .await?;
2399
2400            let mut connection_ids = HashSet::default();
2401            while let Some(participant) = participants.next().await {
2402                let participant = participant?;
2403                connection_ids.insert(ConnectionId {
2404                    epoch: participant.connection_server_id.0 as u32,
2405                    id: participant.connection_id as u32,
2406                });
2407            }
2408
2409            if connection_ids.contains(&connection_id) {
2410                Ok((project.room_id, connection_ids))
2411            } else {
2412                Err(anyhow!("no such project"))?
2413            }
2414        })
2415        .await
2416    }
2417
2418    async fn project_guest_connection_ids(
2419        &self,
2420        project_id: ProjectId,
2421        tx: &DatabaseTransaction,
2422    ) -> Result<Vec<ConnectionId>> {
2423        let mut participants = project_collaborator::Entity::find()
2424            .filter(
2425                project_collaborator::Column::ProjectId
2426                    .eq(project_id)
2427                    .and(project_collaborator::Column::IsHost.eq(false)),
2428            )
2429            .stream(tx)
2430            .await?;
2431
2432        let mut guest_connection_ids = Vec::new();
2433        while let Some(participant) = participants.next().await {
2434            let participant = participant?;
2435            guest_connection_ids.push(ConnectionId {
2436                epoch: participant.connection_server_id.0 as u32,
2437                id: participant.connection_id as u32,
2438            });
2439        }
2440        Ok(guest_connection_ids)
2441    }
2442
2443    // access tokens
2444
2445    pub async fn create_access_token_hash(
2446        &self,
2447        user_id: UserId,
2448        access_token_hash: &str,
2449        max_access_token_count: usize,
2450    ) -> Result<()> {
2451        self.transaction(|tx| async {
2452            let tx = tx;
2453
2454            access_token::ActiveModel {
2455                user_id: ActiveValue::set(user_id),
2456                hash: ActiveValue::set(access_token_hash.into()),
2457                ..Default::default()
2458            }
2459            .insert(&*tx)
2460            .await?;
2461
2462            access_token::Entity::delete_many()
2463                .filter(
2464                    access_token::Column::Id.in_subquery(
2465                        Query::select()
2466                            .column(access_token::Column::Id)
2467                            .from(access_token::Entity)
2468                            .and_where(access_token::Column::UserId.eq(user_id))
2469                            .order_by(access_token::Column::Id, sea_orm::Order::Desc)
2470                            .limit(10000)
2471                            .offset(max_access_token_count as u64)
2472                            .to_owned(),
2473                    ),
2474                )
2475                .exec(&*tx)
2476                .await?;
2477            Ok(())
2478        })
2479        .await
2480    }
2481
2482    pub async fn get_access_token_hashes(&self, user_id: UserId) -> Result<Vec<String>> {
2483        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2484        enum QueryAs {
2485            Hash,
2486        }
2487
2488        self.transaction(|tx| async move {
2489            Ok(access_token::Entity::find()
2490                .select_only()
2491                .column(access_token::Column::Hash)
2492                .filter(access_token::Column::UserId.eq(user_id))
2493                .order_by_desc(access_token::Column::Id)
2494                .into_values::<_, QueryAs>()
2495                .all(&*tx)
2496                .await?)
2497        })
2498        .await
2499    }
2500
2501    async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
2502    where
2503        F: Send + Fn(TransactionHandle) -> Fut,
2504        Fut: Send + Future<Output = Result<T>>,
2505    {
2506        let body = async {
2507            loop {
2508                let (tx, result) = self.with_transaction(&f).await?;
2509                match result {
2510                    Ok(result) => {
2511                        match tx.commit().await.map_err(Into::into) {
2512                            Ok(()) => return Ok(result),
2513                            Err(error) => {
2514                                if is_serialization_error(&error) {
2515                                    // Retry (don't break the loop)
2516                                } else {
2517                                    return Err(error);
2518                                }
2519                            }
2520                        }
2521                    }
2522                    Err(error) => {
2523                        tx.rollback().await?;
2524                        if is_serialization_error(&error) {
2525                            // Retry (don't break the loop)
2526                        } else {
2527                            return Err(error);
2528                        }
2529                    }
2530                }
2531            }
2532        };
2533
2534        self.run(body).await
2535    }
2536
2537    async fn room_transaction<F, Fut, T>(&self, f: F) -> Result<RoomGuard<T>>
2538    where
2539        F: Send + Fn(TransactionHandle) -> Fut,
2540        Fut: Send + Future<Output = Result<(RoomId, T)>>,
2541    {
2542        let body = async {
2543            loop {
2544                let (tx, result) = self.with_transaction(&f).await?;
2545                match result {
2546                    Ok((room_id, data)) => {
2547                        let lock = self.rooms.entry(room_id).or_default().clone();
2548                        let _guard = lock.lock_owned().await;
2549                        match tx.commit().await.map_err(Into::into) {
2550                            Ok(()) => {
2551                                return Ok(RoomGuard {
2552                                    data,
2553                                    _guard,
2554                                    _not_send: PhantomData,
2555                                });
2556                            }
2557                            Err(error) => {
2558                                if is_serialization_error(&error) {
2559                                    // Retry (don't break the loop)
2560                                } else {
2561                                    return Err(error);
2562                                }
2563                            }
2564                        }
2565                    }
2566                    Err(error) => {
2567                        tx.rollback().await?;
2568                        if is_serialization_error(&error) {
2569                            // Retry (don't break the loop)
2570                        } else {
2571                            return Err(error);
2572                        }
2573                    }
2574                }
2575            }
2576        };
2577
2578        self.run(body).await
2579    }
2580
2581    async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
2582    where
2583        F: Send + Fn(TransactionHandle) -> Fut,
2584        Fut: Send + Future<Output = Result<T>>,
2585    {
2586        let tx = self
2587            .pool
2588            .begin_with_config(Some(IsolationLevel::Serializable), None)
2589            .await?;
2590
2591        let mut tx = Arc::new(Some(tx));
2592        let result = f(TransactionHandle(tx.clone())).await;
2593        let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
2594            return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
2595        };
2596
2597        Ok((tx, result))
2598    }
2599
2600    async fn run<F, T>(&self, future: F) -> T
2601    where
2602        F: Future<Output = T>,
2603    {
2604        #[cfg(test)]
2605        {
2606            if let Some(background) = self.background.as_ref() {
2607                background.simulate_random_delay().await;
2608            }
2609
2610            self.runtime.as_ref().unwrap().block_on(future)
2611        }
2612
2613        #[cfg(not(test))]
2614        {
2615            future.await
2616        }
2617    }
2618}
2619
2620fn is_serialization_error(error: &Error) -> bool {
2621    const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
2622    match error {
2623        Error::Database(
2624            DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
2625            | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
2626        ) if error
2627            .as_database_error()
2628            .and_then(|error| error.code())
2629            .as_deref()
2630            == Some(SERIALIZATION_FAILURE_CODE) =>
2631        {
2632            true
2633        }
2634        _ => false,
2635    }
2636}
2637
2638struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
2639
2640impl Deref for TransactionHandle {
2641    type Target = DatabaseTransaction;
2642
2643    fn deref(&self) -> &Self::Target {
2644        self.0.as_ref().as_ref().unwrap()
2645    }
2646}
2647
2648pub struct RoomGuard<T> {
2649    data: T,
2650    _guard: OwnedMutexGuard<()>,
2651    _not_send: PhantomData<Rc<()>>,
2652}
2653
2654impl<T> Deref for RoomGuard<T> {
2655    type Target = T;
2656
2657    fn deref(&self) -> &T {
2658        &self.data
2659    }
2660}
2661
2662impl<T> DerefMut for RoomGuard<T> {
2663    fn deref_mut(&mut self) -> &mut T {
2664        &mut self.data
2665    }
2666}
2667
2668#[derive(Debug, Serialize, Deserialize)]
2669pub struct NewUserParams {
2670    pub github_login: String,
2671    pub github_user_id: i32,
2672    pub invite_count: i32,
2673}
2674
2675#[derive(Debug)]
2676pub struct NewUserResult {
2677    pub user_id: UserId,
2678    pub metrics_id: String,
2679    pub inviting_user_id: Option<UserId>,
2680    pub signup_device_id: Option<String>,
2681}
2682
2683fn random_invite_code() -> String {
2684    nanoid::nanoid!(16)
2685}
2686
2687fn random_email_confirmation_code() -> String {
2688    nanoid::nanoid!(64)
2689}
2690
2691macro_rules! id_type {
2692    ($name:ident) => {
2693        #[derive(
2694            Clone,
2695            Copy,
2696            Debug,
2697            Default,
2698            PartialEq,
2699            Eq,
2700            PartialOrd,
2701            Ord,
2702            Hash,
2703            Serialize,
2704            Deserialize,
2705        )]
2706        #[serde(transparent)]
2707        pub struct $name(pub i32);
2708
2709        impl $name {
2710            #[allow(unused)]
2711            pub const MAX: Self = Self(i32::MAX);
2712
2713            #[allow(unused)]
2714            pub fn from_proto(value: u64) -> Self {
2715                Self(value as i32)
2716            }
2717
2718            #[allow(unused)]
2719            pub fn to_proto(self) -> u64 {
2720                self.0 as u64
2721            }
2722        }
2723
2724        impl std::fmt::Display for $name {
2725            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2726                self.0.fmt(f)
2727            }
2728        }
2729
2730        impl From<$name> for sea_query::Value {
2731            fn from(value: $name) -> Self {
2732                sea_query::Value::Int(Some(value.0))
2733            }
2734        }
2735
2736        impl sea_orm::TryGetable for $name {
2737            fn try_get(
2738                res: &sea_orm::QueryResult,
2739                pre: &str,
2740                col: &str,
2741            ) -> Result<Self, sea_orm::TryGetError> {
2742                Ok(Self(i32::try_get(res, pre, col)?))
2743            }
2744        }
2745
2746        impl sea_query::ValueType for $name {
2747            fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
2748                match v {
2749                    Value::TinyInt(Some(int)) => {
2750                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2751                    }
2752                    Value::SmallInt(Some(int)) => {
2753                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2754                    }
2755                    Value::Int(Some(int)) => {
2756                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2757                    }
2758                    Value::BigInt(Some(int)) => {
2759                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2760                    }
2761                    Value::TinyUnsigned(Some(int)) => {
2762                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2763                    }
2764                    Value::SmallUnsigned(Some(int)) => {
2765                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2766                    }
2767                    Value::Unsigned(Some(int)) => {
2768                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2769                    }
2770                    Value::BigUnsigned(Some(int)) => {
2771                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2772                    }
2773                    _ => Err(sea_query::ValueTypeErr),
2774                }
2775            }
2776
2777            fn type_name() -> String {
2778                stringify!($name).into()
2779            }
2780
2781            fn array_type() -> sea_query::ArrayType {
2782                sea_query::ArrayType::Int
2783            }
2784
2785            fn column_type() -> sea_query::ColumnType {
2786                sea_query::ColumnType::Integer(None)
2787            }
2788        }
2789
2790        impl sea_orm::TryFromU64 for $name {
2791            fn try_from_u64(n: u64) -> Result<Self, DbErr> {
2792                Ok(Self(n.try_into().map_err(|_| {
2793                    DbErr::ConvertFromU64(concat!(
2794                        "error converting ",
2795                        stringify!($name),
2796                        " to u64"
2797                    ))
2798                })?))
2799            }
2800        }
2801
2802        impl sea_query::Nullable for $name {
2803            fn null() -> Value {
2804                Value::Int(None)
2805            }
2806        }
2807    };
2808}
2809
2810id_type!(AccessTokenId);
2811id_type!(ContactId);
2812id_type!(RoomId);
2813id_type!(RoomParticipantId);
2814id_type!(ProjectId);
2815id_type!(ProjectCollaboratorId);
2816id_type!(ReplicaId);
2817id_type!(ServerId);
2818id_type!(SignupId);
2819id_type!(UserId);
2820
2821pub struct LeftRoom {
2822    pub room: proto::Room,
2823    pub left_projects: HashMap<ProjectId, LeftProject>,
2824    pub canceled_calls_to_user_ids: Vec<UserId>,
2825}
2826
2827pub struct RefreshedRoom {
2828    pub room: proto::Room,
2829    pub stale_participant_user_ids: Vec<UserId>,
2830    pub canceled_calls_to_user_ids: Vec<UserId>,
2831}
2832
2833pub struct Project {
2834    pub collaborators: Vec<project_collaborator::Model>,
2835    pub worktrees: BTreeMap<u64, Worktree>,
2836    pub language_servers: Vec<proto::LanguageServer>,
2837}
2838
2839pub struct LeftProject {
2840    pub id: ProjectId,
2841    pub host_user_id: UserId,
2842    pub host_connection_id: ConnectionId,
2843    pub connection_ids: Vec<ConnectionId>,
2844}
2845
2846pub struct Worktree {
2847    pub id: u64,
2848    pub abs_path: String,
2849    pub root_name: String,
2850    pub visible: bool,
2851    pub entries: Vec<proto::Entry>,
2852    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
2853    pub scan_id: u64,
2854    pub is_complete: bool,
2855}
2856
2857#[cfg(test)]
2858pub use test::*;
2859
2860#[cfg(test)]
2861mod test {
2862    use super::*;
2863    use gpui::executor::Background;
2864    use lazy_static::lazy_static;
2865    use parking_lot::Mutex;
2866    use rand::prelude::*;
2867    use sea_orm::ConnectionTrait;
2868    use sqlx::migrate::MigrateDatabase;
2869    use std::sync::Arc;
2870
2871    pub struct TestDb {
2872        pub db: Option<Arc<Database>>,
2873        pub connection: Option<sqlx::AnyConnection>,
2874    }
2875
2876    impl TestDb {
2877        pub fn sqlite(background: Arc<Background>) -> Self {
2878            let url = format!("sqlite::memory:");
2879            let runtime = tokio::runtime::Builder::new_current_thread()
2880                .enable_io()
2881                .enable_time()
2882                .build()
2883                .unwrap();
2884
2885            let mut db = runtime.block_on(async {
2886                let mut options = ConnectOptions::new(url);
2887                options.max_connections(5);
2888                let db = Database::new(options).await.unwrap();
2889                let sql = include_str!(concat!(
2890                    env!("CARGO_MANIFEST_DIR"),
2891                    "/migrations.sqlite/20221109000000_test_schema.sql"
2892                ));
2893                db.pool
2894                    .execute(sea_orm::Statement::from_string(
2895                        db.pool.get_database_backend(),
2896                        sql.into(),
2897                    ))
2898                    .await
2899                    .unwrap();
2900                db
2901            });
2902
2903            db.background = Some(background);
2904            db.runtime = Some(runtime);
2905
2906            Self {
2907                db: Some(Arc::new(db)),
2908                connection: None,
2909            }
2910        }
2911
2912        pub fn postgres(background: Arc<Background>) -> Self {
2913            lazy_static! {
2914                static ref LOCK: Mutex<()> = Mutex::new(());
2915            }
2916
2917            let _guard = LOCK.lock();
2918            let mut rng = StdRng::from_entropy();
2919            let url = format!(
2920                "postgres://postgres@localhost/zed-test-{}",
2921                rng.gen::<u128>()
2922            );
2923            let runtime = tokio::runtime::Builder::new_current_thread()
2924                .enable_io()
2925                .enable_time()
2926                .build()
2927                .unwrap();
2928
2929            let mut db = runtime.block_on(async {
2930                sqlx::Postgres::create_database(&url)
2931                    .await
2932                    .expect("failed to create test db");
2933                let mut options = ConnectOptions::new(url);
2934                options
2935                    .max_connections(5)
2936                    .idle_timeout(Duration::from_secs(0));
2937                let db = Database::new(options).await.unwrap();
2938                let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
2939                db.migrate(Path::new(migrations_path), false).await.unwrap();
2940                db
2941            });
2942
2943            db.background = Some(background);
2944            db.runtime = Some(runtime);
2945
2946            Self {
2947                db: Some(Arc::new(db)),
2948                connection: None,
2949            }
2950        }
2951
2952        pub fn db(&self) -> &Arc<Database> {
2953            self.db.as_ref().unwrap()
2954        }
2955    }
2956
2957    impl Drop for TestDb {
2958        fn drop(&mut self) {
2959            let db = self.db.take().unwrap();
2960            if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
2961                db.runtime.as_ref().unwrap().block_on(async {
2962                    use util::ResultExt;
2963                    let query = "
2964                        SELECT pg_terminate_backend(pg_stat_activity.pid)
2965                        FROM pg_stat_activity
2966                        WHERE
2967                            pg_stat_activity.datname = current_database() AND
2968                            pid <> pg_backend_pid();
2969                    ";
2970                    db.pool
2971                        .execute(sea_orm::Statement::from_string(
2972                            db.pool.get_database_backend(),
2973                            query.into(),
2974                        ))
2975                        .await
2976                        .log_err();
2977                    sqlx::Postgres::drop_database(db.options.get_url())
2978                        .await
2979                        .log_err();
2980                })
2981            }
2982        }
2983    }
2984}