db.rs

   1mod access_token;
   2mod contact;
   3mod language_server;
   4mod project;
   5mod project_collaborator;
   6mod room;
   7mod room_participant;
   8mod server;
   9mod signup;
  10#[cfg(test)]
  11mod tests;
  12mod user;
  13mod worktree;
  14mod worktree_diagnostic_summary;
  15mod worktree_entry;
  16
  17use crate::{Error, Result};
  18use anyhow::anyhow;
  19use collections::{BTreeMap, HashMap, HashSet};
  20pub use contact::Contact;
  21use dashmap::DashMap;
  22use futures::StreamExt;
  23use hyper::StatusCode;
  24use rpc::{proto, ConnectionId};
  25use sea_orm::Condition;
  26pub use sea_orm::ConnectOptions;
  27use sea_orm::{
  28    entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
  29    DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
  30    Statement, TransactionTrait,
  31};
  32use sea_query::{Alias, Expr, OnConflict, Query};
  33use serde::{Deserialize, Serialize};
  34pub use signup::{Invite, NewSignup, WaitlistSummary};
  35use sqlx::migrate::{Migrate, Migration, MigrationSource};
  36use sqlx::Connection;
  37use std::ops::{Deref, DerefMut};
  38use std::path::Path;
  39use std::time::Duration;
  40use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
  41use tokio::sync::{Mutex, OwnedMutexGuard};
  42pub use user::Model as User;
  43
  44pub struct Database {
  45    options: ConnectOptions,
  46    pool: DatabaseConnection,
  47    rooms: DashMap<RoomId, Arc<Mutex<()>>>,
  48    #[cfg(test)]
  49    background: Option<std::sync::Arc<gpui::executor::Background>>,
  50    #[cfg(test)]
  51    runtime: Option<tokio::runtime::Runtime>,
  52}
  53
  54impl Database {
  55    pub async fn new(options: ConnectOptions) -> Result<Self> {
  56        Ok(Self {
  57            options: options.clone(),
  58            pool: sea_orm::Database::connect(options).await?,
  59            rooms: DashMap::with_capacity(16384),
  60            #[cfg(test)]
  61            background: None,
  62            #[cfg(test)]
  63            runtime: None,
  64        })
  65    }
  66
  67    #[cfg(test)]
  68    pub fn reset(&self) {
  69        self.rooms.clear();
  70    }
  71
  72    pub async fn migrate(
  73        &self,
  74        migrations_path: &Path,
  75        ignore_checksum_mismatch: bool,
  76    ) -> anyhow::Result<Vec<(Migration, Duration)>> {
  77        let migrations = MigrationSource::resolve(migrations_path)
  78            .await
  79            .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
  80
  81        let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
  82
  83        connection.ensure_migrations_table().await?;
  84        let applied_migrations: HashMap<_, _> = connection
  85            .list_applied_migrations()
  86            .await?
  87            .into_iter()
  88            .map(|m| (m.version, m))
  89            .collect();
  90
  91        let mut new_migrations = Vec::new();
  92        for migration in migrations {
  93            match applied_migrations.get(&migration.version) {
  94                Some(applied_migration) => {
  95                    if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
  96                    {
  97                        Err(anyhow!(
  98                            "checksum mismatch for applied migration {}",
  99                            migration.description
 100                        ))?;
 101                    }
 102                }
 103                None => {
 104                    let elapsed = connection.apply(&migration).await?;
 105                    new_migrations.push((migration, elapsed));
 106                }
 107            }
 108        }
 109
 110        Ok(new_migrations)
 111    }
 112
 113    pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
 114        self.transaction(|tx| async move {
 115            let server = server::ActiveModel {
 116                environment: ActiveValue::set(environment.into()),
 117                ..Default::default()
 118            }
 119            .insert(&*tx)
 120            .await?;
 121            Ok(server.id)
 122        })
 123        .await
 124    }
 125
 126    pub async fn delete_stale_projects(
 127        &self,
 128        environment: &str,
 129        new_server_id: ServerId,
 130    ) -> Result<()> {
 131        self.transaction(|tx| async move {
 132            let stale_server_epochs = self
 133                .stale_server_ids(environment, new_server_id, &tx)
 134                .await?;
 135            project_collaborator::Entity::delete_many()
 136                .filter(
 137                    project_collaborator::Column::ConnectionServerId
 138                        .is_in(stale_server_epochs.iter().copied()),
 139                )
 140                .exec(&*tx)
 141                .await?;
 142            project::Entity::delete_many()
 143                .filter(
 144                    project::Column::HostConnectionServerId
 145                        .is_in(stale_server_epochs.iter().copied()),
 146                )
 147                .exec(&*tx)
 148                .await?;
 149            Ok(())
 150        })
 151        .await
 152    }
 153
 154    pub async fn stale_room_ids(
 155        &self,
 156        environment: &str,
 157        new_server_id: ServerId,
 158    ) -> Result<Vec<RoomId>> {
 159        self.transaction(|tx| async move {
 160            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 161            enum QueryAs {
 162                RoomId,
 163            }
 164
 165            let stale_server_epochs = self
 166                .stale_server_ids(environment, new_server_id, &tx)
 167                .await?;
 168            Ok(room_participant::Entity::find()
 169                .select_only()
 170                .column(room_participant::Column::RoomId)
 171                .distinct()
 172                .filter(
 173                    room_participant::Column::AnsweringConnectionServerId
 174                        .is_in(stale_server_epochs),
 175                )
 176                .into_values::<_, QueryAs>()
 177                .all(&*tx)
 178                .await?)
 179        })
 180        .await
 181    }
 182
 183    pub async fn refresh_room(
 184        &self,
 185        room_id: RoomId,
 186        new_server_id: ServerId,
 187    ) -> Result<RoomGuard<RefreshedRoom>> {
 188        self.room_transaction(|tx| async move {
 189            let stale_participant_filter = Condition::all()
 190                .add(room_participant::Column::RoomId.eq(room_id))
 191                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
 192                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
 193
 194            let stale_participant_user_ids = room_participant::Entity::find()
 195                .filter(stale_participant_filter.clone())
 196                .all(&*tx)
 197                .await?
 198                .into_iter()
 199                .map(|participant| participant.user_id)
 200                .collect::<Vec<_>>();
 201
 202            // Delete participants who failed to reconnect.
 203            room_participant::Entity::delete_many()
 204                .filter(stale_participant_filter)
 205                .exec(&*tx)
 206                .await?;
 207
 208            let room = self.get_room(room_id, &tx).await?;
 209            let mut canceled_calls_to_user_ids = Vec::new();
 210            // Delete the room if it becomes empty and cancel pending calls.
 211            if room.participants.is_empty() {
 212                canceled_calls_to_user_ids.extend(
 213                    room.pending_participants
 214                        .iter()
 215                        .map(|pending_participant| UserId::from_proto(pending_participant.user_id)),
 216                );
 217                room_participant::Entity::delete_many()
 218                    .filter(room_participant::Column::RoomId.eq(room_id))
 219                    .exec(&*tx)
 220                    .await?;
 221                room::Entity::delete_by_id(room_id).exec(&*tx).await?;
 222            }
 223
 224            Ok((
 225                room_id,
 226                RefreshedRoom {
 227                    room,
 228                    stale_participant_user_ids,
 229                    canceled_calls_to_user_ids,
 230                },
 231            ))
 232        })
 233        .await
 234    }
 235
 236    pub async fn delete_stale_servers(
 237        &self,
 238        new_server_id: ServerId,
 239        environment: &str,
 240    ) -> Result<()> {
 241        self.transaction(|tx| async move {
 242            server::Entity::delete_many()
 243                .filter(
 244                    Condition::all()
 245                        .add(server::Column::Environment.eq(environment))
 246                        .add(server::Column::Id.ne(new_server_id)),
 247                )
 248                .exec(&*tx)
 249                .await?;
 250            Ok(())
 251        })
 252        .await
 253    }
 254
 255    async fn stale_server_ids(
 256        &self,
 257        environment: &str,
 258        new_server_id: ServerId,
 259        tx: &DatabaseTransaction,
 260    ) -> Result<Vec<ServerId>> {
 261        let stale_servers = server::Entity::find()
 262            .filter(
 263                Condition::all()
 264                    .add(server::Column::Environment.eq(environment))
 265                    .add(server::Column::Id.ne(new_server_id)),
 266            )
 267            .all(&*tx)
 268            .await?;
 269        Ok(stale_servers.into_iter().map(|server| server.id).collect())
 270    }
 271
 272    // users
 273
 274    pub async fn create_user(
 275        &self,
 276        email_address: &str,
 277        admin: bool,
 278        params: NewUserParams,
 279    ) -> Result<NewUserResult> {
 280        self.transaction(|tx| async {
 281            let tx = tx;
 282            let user = user::Entity::insert(user::ActiveModel {
 283                email_address: ActiveValue::set(Some(email_address.into())),
 284                github_login: ActiveValue::set(params.github_login.clone()),
 285                github_user_id: ActiveValue::set(Some(params.github_user_id)),
 286                admin: ActiveValue::set(admin),
 287                metrics_id: ActiveValue::set(Uuid::new_v4()),
 288                ..Default::default()
 289            })
 290            .on_conflict(
 291                OnConflict::column(user::Column::GithubLogin)
 292                    .update_column(user::Column::GithubLogin)
 293                    .to_owned(),
 294            )
 295            .exec_with_returning(&*tx)
 296            .await?;
 297
 298            Ok(NewUserResult {
 299                user_id: user.id,
 300                metrics_id: user.metrics_id.to_string(),
 301                signup_device_id: None,
 302                inviting_user_id: None,
 303            })
 304        })
 305        .await
 306    }
 307
 308    pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
 309        self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
 310            .await
 311    }
 312
 313    pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
 314        self.transaction(|tx| async {
 315            let tx = tx;
 316            Ok(user::Entity::find()
 317                .filter(user::Column::Id.is_in(ids.iter().copied()))
 318                .all(&*tx)
 319                .await?)
 320        })
 321        .await
 322    }
 323
 324    pub async fn get_user_by_github_account(
 325        &self,
 326        github_login: &str,
 327        github_user_id: Option<i32>,
 328    ) -> Result<Option<User>> {
 329        self.transaction(|tx| async move {
 330            let tx = &*tx;
 331            if let Some(github_user_id) = github_user_id {
 332                if let Some(user_by_github_user_id) = user::Entity::find()
 333                    .filter(user::Column::GithubUserId.eq(github_user_id))
 334                    .one(tx)
 335                    .await?
 336                {
 337                    let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
 338                    user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
 339                    Ok(Some(user_by_github_user_id.update(tx).await?))
 340                } else if let Some(user_by_github_login) = user::Entity::find()
 341                    .filter(user::Column::GithubLogin.eq(github_login))
 342                    .one(tx)
 343                    .await?
 344                {
 345                    let mut user_by_github_login = user_by_github_login.into_active_model();
 346                    user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
 347                    Ok(Some(user_by_github_login.update(tx).await?))
 348                } else {
 349                    Ok(None)
 350                }
 351            } else {
 352                Ok(user::Entity::find()
 353                    .filter(user::Column::GithubLogin.eq(github_login))
 354                    .one(tx)
 355                    .await?)
 356            }
 357        })
 358        .await
 359    }
 360
 361    pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
 362        self.transaction(|tx| async move {
 363            Ok(user::Entity::find()
 364                .order_by_asc(user::Column::GithubLogin)
 365                .limit(limit as u64)
 366                .offset(page as u64 * limit as u64)
 367                .all(&*tx)
 368                .await?)
 369        })
 370        .await
 371    }
 372
 373    pub async fn get_users_with_no_invites(
 374        &self,
 375        invited_by_another_user: bool,
 376    ) -> Result<Vec<User>> {
 377        self.transaction(|tx| async move {
 378            Ok(user::Entity::find()
 379                .filter(
 380                    user::Column::InviteCount
 381                        .eq(0)
 382                        .and(if invited_by_another_user {
 383                            user::Column::InviterId.is_not_null()
 384                        } else {
 385                            user::Column::InviterId.is_null()
 386                        }),
 387                )
 388                .all(&*tx)
 389                .await?)
 390        })
 391        .await
 392    }
 393
 394    pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
 395        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 396        enum QueryAs {
 397            MetricsId,
 398        }
 399
 400        self.transaction(|tx| async move {
 401            let metrics_id: Uuid = user::Entity::find_by_id(id)
 402                .select_only()
 403                .column(user::Column::MetricsId)
 404                .into_values::<_, QueryAs>()
 405                .one(&*tx)
 406                .await?
 407                .ok_or_else(|| anyhow!("could not find user"))?;
 408            Ok(metrics_id.to_string())
 409        })
 410        .await
 411    }
 412
 413    pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
 414        self.transaction(|tx| async move {
 415            user::Entity::update_many()
 416                .filter(user::Column::Id.eq(id))
 417                .set(user::ActiveModel {
 418                    admin: ActiveValue::set(is_admin),
 419                    ..Default::default()
 420                })
 421                .exec(&*tx)
 422                .await?;
 423            Ok(())
 424        })
 425        .await
 426    }
 427
 428    pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
 429        self.transaction(|tx| async move {
 430            user::Entity::update_many()
 431                .filter(user::Column::Id.eq(id))
 432                .set(user::ActiveModel {
 433                    connected_once: ActiveValue::set(connected_once),
 434                    ..Default::default()
 435                })
 436                .exec(&*tx)
 437                .await?;
 438            Ok(())
 439        })
 440        .await
 441    }
 442
 443    pub async fn destroy_user(&self, id: UserId) -> Result<()> {
 444        self.transaction(|tx| async move {
 445            access_token::Entity::delete_many()
 446                .filter(access_token::Column::UserId.eq(id))
 447                .exec(&*tx)
 448                .await?;
 449            user::Entity::delete_by_id(id).exec(&*tx).await?;
 450            Ok(())
 451        })
 452        .await
 453    }
 454
 455    // contacts
 456
 457    pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
 458        #[derive(Debug, FromQueryResult)]
 459        struct ContactWithUserBusyStatuses {
 460            user_id_a: UserId,
 461            user_id_b: UserId,
 462            a_to_b: bool,
 463            accepted: bool,
 464            should_notify: bool,
 465            user_a_busy: bool,
 466            user_b_busy: bool,
 467        }
 468
 469        self.transaction(|tx| async move {
 470            let user_a_participant = Alias::new("user_a_participant");
 471            let user_b_participant = Alias::new("user_b_participant");
 472            let mut db_contacts = contact::Entity::find()
 473                .column_as(
 474                    Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
 475                        .is_not_null(),
 476                    "user_a_busy",
 477                )
 478                .column_as(
 479                    Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
 480                        .is_not_null(),
 481                    "user_b_busy",
 482                )
 483                .filter(
 484                    contact::Column::UserIdA
 485                        .eq(user_id)
 486                        .or(contact::Column::UserIdB.eq(user_id)),
 487                )
 488                .join_as(
 489                    JoinType::LeftJoin,
 490                    contact::Relation::UserARoomParticipant.def(),
 491                    user_a_participant,
 492                )
 493                .join_as(
 494                    JoinType::LeftJoin,
 495                    contact::Relation::UserBRoomParticipant.def(),
 496                    user_b_participant,
 497                )
 498                .into_model::<ContactWithUserBusyStatuses>()
 499                .stream(&*tx)
 500                .await?;
 501
 502            let mut contacts = Vec::new();
 503            while let Some(db_contact) = db_contacts.next().await {
 504                let db_contact = db_contact?;
 505                if db_contact.user_id_a == user_id {
 506                    if db_contact.accepted {
 507                        contacts.push(Contact::Accepted {
 508                            user_id: db_contact.user_id_b,
 509                            should_notify: db_contact.should_notify && db_contact.a_to_b,
 510                            busy: db_contact.user_b_busy,
 511                        });
 512                    } else if db_contact.a_to_b {
 513                        contacts.push(Contact::Outgoing {
 514                            user_id: db_contact.user_id_b,
 515                        })
 516                    } else {
 517                        contacts.push(Contact::Incoming {
 518                            user_id: db_contact.user_id_b,
 519                            should_notify: db_contact.should_notify,
 520                        });
 521                    }
 522                } else if db_contact.accepted {
 523                    contacts.push(Contact::Accepted {
 524                        user_id: db_contact.user_id_a,
 525                        should_notify: db_contact.should_notify && !db_contact.a_to_b,
 526                        busy: db_contact.user_a_busy,
 527                    });
 528                } else if db_contact.a_to_b {
 529                    contacts.push(Contact::Incoming {
 530                        user_id: db_contact.user_id_a,
 531                        should_notify: db_contact.should_notify,
 532                    });
 533                } else {
 534                    contacts.push(Contact::Outgoing {
 535                        user_id: db_contact.user_id_a,
 536                    });
 537                }
 538            }
 539
 540            contacts.sort_unstable_by_key(|contact| contact.user_id());
 541
 542            Ok(contacts)
 543        })
 544        .await
 545    }
 546
 547    pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
 548        self.transaction(|tx| async move {
 549            let participant = room_participant::Entity::find()
 550                .filter(room_participant::Column::UserId.eq(user_id))
 551                .one(&*tx)
 552                .await?;
 553            Ok(participant.is_some())
 554        })
 555        .await
 556    }
 557
 558    pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
 559        self.transaction(|tx| async move {
 560            let (id_a, id_b) = if user_id_1 < user_id_2 {
 561                (user_id_1, user_id_2)
 562            } else {
 563                (user_id_2, user_id_1)
 564            };
 565
 566            Ok(contact::Entity::find()
 567                .filter(
 568                    contact::Column::UserIdA
 569                        .eq(id_a)
 570                        .and(contact::Column::UserIdB.eq(id_b))
 571                        .and(contact::Column::Accepted.eq(true)),
 572                )
 573                .one(&*tx)
 574                .await?
 575                .is_some())
 576        })
 577        .await
 578    }
 579
 580    pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
 581        self.transaction(|tx| async move {
 582            let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
 583                (sender_id, receiver_id, true)
 584            } else {
 585                (receiver_id, sender_id, false)
 586            };
 587
 588            let rows_affected = contact::Entity::insert(contact::ActiveModel {
 589                user_id_a: ActiveValue::set(id_a),
 590                user_id_b: ActiveValue::set(id_b),
 591                a_to_b: ActiveValue::set(a_to_b),
 592                accepted: ActiveValue::set(false),
 593                should_notify: ActiveValue::set(true),
 594                ..Default::default()
 595            })
 596            .on_conflict(
 597                OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
 598                    .values([
 599                        (contact::Column::Accepted, true.into()),
 600                        (contact::Column::ShouldNotify, false.into()),
 601                    ])
 602                    .action_and_where(
 603                        contact::Column::Accepted.eq(false).and(
 604                            contact::Column::AToB
 605                                .eq(a_to_b)
 606                                .and(contact::Column::UserIdA.eq(id_b))
 607                                .or(contact::Column::AToB
 608                                    .ne(a_to_b)
 609                                    .and(contact::Column::UserIdA.eq(id_a))),
 610                        ),
 611                    )
 612                    .to_owned(),
 613            )
 614            .exec_without_returning(&*tx)
 615            .await?;
 616
 617            if rows_affected == 1 {
 618                Ok(())
 619            } else {
 620                Err(anyhow!("contact already requested"))?
 621            }
 622        })
 623        .await
 624    }
 625
 626    pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<()> {
 627        self.transaction(|tx| async move {
 628            let (id_a, id_b) = if responder_id < requester_id {
 629                (responder_id, requester_id)
 630            } else {
 631                (requester_id, responder_id)
 632            };
 633
 634            let result = contact::Entity::delete_many()
 635                .filter(
 636                    contact::Column::UserIdA
 637                        .eq(id_a)
 638                        .and(contact::Column::UserIdB.eq(id_b)),
 639                )
 640                .exec(&*tx)
 641                .await?;
 642
 643            if result.rows_affected == 1 {
 644                Ok(())
 645            } else {
 646                Err(anyhow!("no such contact"))?
 647            }
 648        })
 649        .await
 650    }
 651
 652    pub async fn dismiss_contact_notification(
 653        &self,
 654        user_id: UserId,
 655        contact_user_id: UserId,
 656    ) -> Result<()> {
 657        self.transaction(|tx| async move {
 658            let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
 659                (user_id, contact_user_id, true)
 660            } else {
 661                (contact_user_id, user_id, false)
 662            };
 663
 664            let result = contact::Entity::update_many()
 665                .set(contact::ActiveModel {
 666                    should_notify: ActiveValue::set(false),
 667                    ..Default::default()
 668                })
 669                .filter(
 670                    contact::Column::UserIdA
 671                        .eq(id_a)
 672                        .and(contact::Column::UserIdB.eq(id_b))
 673                        .and(
 674                            contact::Column::AToB
 675                                .eq(a_to_b)
 676                                .and(contact::Column::Accepted.eq(true))
 677                                .or(contact::Column::AToB
 678                                    .ne(a_to_b)
 679                                    .and(contact::Column::Accepted.eq(false))),
 680                        ),
 681                )
 682                .exec(&*tx)
 683                .await?;
 684            if result.rows_affected == 0 {
 685                Err(anyhow!("no such contact request"))?
 686            } else {
 687                Ok(())
 688            }
 689        })
 690        .await
 691    }
 692
 693    pub async fn respond_to_contact_request(
 694        &self,
 695        responder_id: UserId,
 696        requester_id: UserId,
 697        accept: bool,
 698    ) -> Result<()> {
 699        self.transaction(|tx| async move {
 700            let (id_a, id_b, a_to_b) = if responder_id < requester_id {
 701                (responder_id, requester_id, false)
 702            } else {
 703                (requester_id, responder_id, true)
 704            };
 705            let rows_affected = if accept {
 706                let result = contact::Entity::update_many()
 707                    .set(contact::ActiveModel {
 708                        accepted: ActiveValue::set(true),
 709                        should_notify: ActiveValue::set(true),
 710                        ..Default::default()
 711                    })
 712                    .filter(
 713                        contact::Column::UserIdA
 714                            .eq(id_a)
 715                            .and(contact::Column::UserIdB.eq(id_b))
 716                            .and(contact::Column::AToB.eq(a_to_b)),
 717                    )
 718                    .exec(&*tx)
 719                    .await?;
 720                result.rows_affected
 721            } else {
 722                let result = contact::Entity::delete_many()
 723                    .filter(
 724                        contact::Column::UserIdA
 725                            .eq(id_a)
 726                            .and(contact::Column::UserIdB.eq(id_b))
 727                            .and(contact::Column::AToB.eq(a_to_b))
 728                            .and(contact::Column::Accepted.eq(false)),
 729                    )
 730                    .exec(&*tx)
 731                    .await?;
 732
 733                result.rows_affected
 734            };
 735
 736            if rows_affected == 1 {
 737                Ok(())
 738            } else {
 739                Err(anyhow!("no such contact request"))?
 740            }
 741        })
 742        .await
 743    }
 744
 745    pub fn fuzzy_like_string(string: &str) -> String {
 746        let mut result = String::with_capacity(string.len() * 2 + 1);
 747        for c in string.chars() {
 748            if c.is_alphanumeric() {
 749                result.push('%');
 750                result.push(c);
 751            }
 752        }
 753        result.push('%');
 754        result
 755    }
 756
 757    pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
 758        self.transaction(|tx| async {
 759            let tx = tx;
 760            let like_string = Self::fuzzy_like_string(name_query);
 761            let query = "
 762                SELECT users.*
 763                FROM users
 764                WHERE github_login ILIKE $1
 765                ORDER BY github_login <-> $2
 766                LIMIT $3
 767            ";
 768
 769            Ok(user::Entity::find()
 770                .from_raw_sql(Statement::from_sql_and_values(
 771                    self.pool.get_database_backend(),
 772                    query.into(),
 773                    vec![like_string.into(), name_query.into(), limit.into()],
 774                ))
 775                .all(&*tx)
 776                .await?)
 777        })
 778        .await
 779    }
 780
 781    // signups
 782
 783    pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
 784        self.transaction(|tx| async move {
 785            signup::Entity::insert(signup::ActiveModel {
 786                email_address: ActiveValue::set(signup.email_address.clone()),
 787                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 788                email_confirmation_sent: ActiveValue::set(false),
 789                platform_mac: ActiveValue::set(signup.platform_mac),
 790                platform_windows: ActiveValue::set(signup.platform_windows),
 791                platform_linux: ActiveValue::set(signup.platform_linux),
 792                platform_unknown: ActiveValue::set(false),
 793                editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
 794                programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
 795                device_id: ActiveValue::set(signup.device_id.clone()),
 796                added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
 797                ..Default::default()
 798            })
 799            .on_conflict(
 800                OnConflict::column(signup::Column::EmailAddress)
 801                    .update_columns([
 802                        signup::Column::PlatformMac,
 803                        signup::Column::PlatformWindows,
 804                        signup::Column::PlatformLinux,
 805                        signup::Column::EditorFeatures,
 806                        signup::Column::ProgrammingLanguages,
 807                        signup::Column::DeviceId,
 808                        signup::Column::AddedToMailingList,
 809                    ])
 810                    .to_owned(),
 811            )
 812            .exec(&*tx)
 813            .await?;
 814            Ok(())
 815        })
 816        .await
 817    }
 818
 819    pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
 820        self.transaction(|tx| async move {
 821            let signup = signup::Entity::find()
 822                .filter(signup::Column::EmailAddress.eq(email_address))
 823                .one(&*tx)
 824                .await?
 825                .ok_or_else(|| {
 826                    anyhow!("signup with email address {} doesn't exist", email_address)
 827                })?;
 828
 829            Ok(signup)
 830        })
 831        .await
 832    }
 833
 834    pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
 835        self.transaction(|tx| async move {
 836            let query = "
 837                SELECT
 838                    COUNT(*) as count,
 839                    COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
 840                    COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
 841                    COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
 842                    COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
 843                FROM (
 844                    SELECT *
 845                    FROM signups
 846                    WHERE
 847                        NOT email_confirmation_sent
 848                ) AS unsent
 849            ";
 850            Ok(
 851                WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
 852                    self.pool.get_database_backend(),
 853                    query.into(),
 854                    vec![],
 855                ))
 856                .one(&*tx)
 857                .await?
 858                .ok_or_else(|| anyhow!("invalid result"))?,
 859            )
 860        })
 861        .await
 862    }
 863
 864    pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
 865        let emails = invites
 866            .iter()
 867            .map(|s| s.email_address.as_str())
 868            .collect::<Vec<_>>();
 869        self.transaction(|tx| async {
 870            let tx = tx;
 871            signup::Entity::update_many()
 872                .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
 873                .set(signup::ActiveModel {
 874                    email_confirmation_sent: ActiveValue::set(true),
 875                    ..Default::default()
 876                })
 877                .exec(&*tx)
 878                .await?;
 879            Ok(())
 880        })
 881        .await
 882    }
 883
 884    pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
 885        self.transaction(|tx| async move {
 886            Ok(signup::Entity::find()
 887                .select_only()
 888                .column(signup::Column::EmailAddress)
 889                .column(signup::Column::EmailConfirmationCode)
 890                .filter(
 891                    signup::Column::EmailConfirmationSent.eq(false).and(
 892                        signup::Column::PlatformMac
 893                            .eq(true)
 894                            .or(signup::Column::PlatformUnknown.eq(true)),
 895                    ),
 896                )
 897                .order_by_asc(signup::Column::CreatedAt)
 898                .limit(count as u64)
 899                .into_model()
 900                .all(&*tx)
 901                .await?)
 902        })
 903        .await
 904    }
 905
 906    // invite codes
 907
 908    pub async fn create_invite_from_code(
 909        &self,
 910        code: &str,
 911        email_address: &str,
 912        device_id: Option<&str>,
 913    ) -> Result<Invite> {
 914        self.transaction(|tx| async move {
 915            let existing_user = user::Entity::find()
 916                .filter(user::Column::EmailAddress.eq(email_address))
 917                .one(&*tx)
 918                .await?;
 919
 920            if existing_user.is_some() {
 921                Err(anyhow!("email address is already in use"))?;
 922            }
 923
 924            let inviting_user_with_invites = match user::Entity::find()
 925                .filter(
 926                    user::Column::InviteCode
 927                        .eq(code)
 928                        .and(user::Column::InviteCount.gt(0)),
 929                )
 930                .one(&*tx)
 931                .await?
 932            {
 933                Some(inviting_user) => inviting_user,
 934                None => {
 935                    return Err(Error::Http(
 936                        StatusCode::UNAUTHORIZED,
 937                        "unable to find an invite code with invites remaining".to_string(),
 938                    ))?
 939                }
 940            };
 941            user::Entity::update_many()
 942                .filter(
 943                    user::Column::Id
 944                        .eq(inviting_user_with_invites.id)
 945                        .and(user::Column::InviteCount.gt(0)),
 946                )
 947                .col_expr(
 948                    user::Column::InviteCount,
 949                    Expr::col(user::Column::InviteCount).sub(1),
 950                )
 951                .exec(&*tx)
 952                .await?;
 953
 954            let signup = signup::Entity::insert(signup::ActiveModel {
 955                email_address: ActiveValue::set(email_address.into()),
 956                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 957                email_confirmation_sent: ActiveValue::set(false),
 958                inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
 959                platform_linux: ActiveValue::set(false),
 960                platform_mac: ActiveValue::set(false),
 961                platform_windows: ActiveValue::set(false),
 962                platform_unknown: ActiveValue::set(true),
 963                device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
 964                ..Default::default()
 965            })
 966            .on_conflict(
 967                OnConflict::column(signup::Column::EmailAddress)
 968                    .update_column(signup::Column::InvitingUserId)
 969                    .to_owned(),
 970            )
 971            .exec_with_returning(&*tx)
 972            .await?;
 973
 974            Ok(Invite {
 975                email_address: signup.email_address,
 976                email_confirmation_code: signup.email_confirmation_code,
 977            })
 978        })
 979        .await
 980    }
 981
 982    pub async fn create_user_from_invite(
 983        &self,
 984        invite: &Invite,
 985        user: NewUserParams,
 986    ) -> Result<Option<NewUserResult>> {
 987        self.transaction(|tx| async {
 988            let tx = tx;
 989            let signup = signup::Entity::find()
 990                .filter(
 991                    signup::Column::EmailAddress
 992                        .eq(invite.email_address.as_str())
 993                        .and(
 994                            signup::Column::EmailConfirmationCode
 995                                .eq(invite.email_confirmation_code.as_str()),
 996                        ),
 997                )
 998                .one(&*tx)
 999                .await?
1000                .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
1001
1002            if signup.user_id.is_some() {
1003                return Ok(None);
1004            }
1005
1006            let user = user::Entity::insert(user::ActiveModel {
1007                email_address: ActiveValue::set(Some(invite.email_address.clone())),
1008                github_login: ActiveValue::set(user.github_login.clone()),
1009                github_user_id: ActiveValue::set(Some(user.github_user_id)),
1010                admin: ActiveValue::set(false),
1011                invite_count: ActiveValue::set(user.invite_count),
1012                invite_code: ActiveValue::set(Some(random_invite_code())),
1013                metrics_id: ActiveValue::set(Uuid::new_v4()),
1014                ..Default::default()
1015            })
1016            .on_conflict(
1017                OnConflict::column(user::Column::GithubLogin)
1018                    .update_columns([
1019                        user::Column::EmailAddress,
1020                        user::Column::GithubUserId,
1021                        user::Column::Admin,
1022                    ])
1023                    .to_owned(),
1024            )
1025            .exec_with_returning(&*tx)
1026            .await?;
1027
1028            let mut signup = signup.into_active_model();
1029            signup.user_id = ActiveValue::set(Some(user.id));
1030            let signup = signup.update(&*tx).await?;
1031
1032            if let Some(inviting_user_id) = signup.inviting_user_id {
1033                let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1034                    (inviting_user_id, user.id, true)
1035                } else {
1036                    (user.id, inviting_user_id, false)
1037                };
1038
1039                contact::Entity::insert(contact::ActiveModel {
1040                    user_id_a: ActiveValue::set(user_id_a),
1041                    user_id_b: ActiveValue::set(user_id_b),
1042                    a_to_b: ActiveValue::set(a_to_b),
1043                    should_notify: ActiveValue::set(true),
1044                    accepted: ActiveValue::set(true),
1045                    ..Default::default()
1046                })
1047                .on_conflict(OnConflict::new().do_nothing().to_owned())
1048                .exec_without_returning(&*tx)
1049                .await?;
1050            }
1051
1052            Ok(Some(NewUserResult {
1053                user_id: user.id,
1054                metrics_id: user.metrics_id.to_string(),
1055                inviting_user_id: signup.inviting_user_id,
1056                signup_device_id: signup.device_id,
1057            }))
1058        })
1059        .await
1060    }
1061
1062    pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1063        self.transaction(|tx| async move {
1064            if count > 0 {
1065                user::Entity::update_many()
1066                    .filter(
1067                        user::Column::Id
1068                            .eq(id)
1069                            .and(user::Column::InviteCode.is_null()),
1070                    )
1071                    .set(user::ActiveModel {
1072                        invite_code: ActiveValue::set(Some(random_invite_code())),
1073                        ..Default::default()
1074                    })
1075                    .exec(&*tx)
1076                    .await?;
1077            }
1078
1079            user::Entity::update_many()
1080                .filter(user::Column::Id.eq(id))
1081                .set(user::ActiveModel {
1082                    invite_count: ActiveValue::set(count),
1083                    ..Default::default()
1084                })
1085                .exec(&*tx)
1086                .await?;
1087            Ok(())
1088        })
1089        .await
1090    }
1091
1092    pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1093        self.transaction(|tx| async move {
1094            match user::Entity::find_by_id(id).one(&*tx).await? {
1095                Some(user) if user.invite_code.is_some() => {
1096                    Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1097                }
1098                _ => Ok(None),
1099            }
1100        })
1101        .await
1102    }
1103
1104    pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1105        self.transaction(|tx| async move {
1106            user::Entity::find()
1107                .filter(user::Column::InviteCode.eq(code))
1108                .one(&*tx)
1109                .await?
1110                .ok_or_else(|| {
1111                    Error::Http(
1112                        StatusCode::NOT_FOUND,
1113                        "that invite code does not exist".to_string(),
1114                    )
1115                })
1116        })
1117        .await
1118    }
1119
1120    // rooms
1121
1122    pub async fn incoming_call_for_user(
1123        &self,
1124        user_id: UserId,
1125    ) -> Result<Option<proto::IncomingCall>> {
1126        self.transaction(|tx| async move {
1127            let pending_participant = room_participant::Entity::find()
1128                .filter(
1129                    room_participant::Column::UserId
1130                        .eq(user_id)
1131                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1132                )
1133                .one(&*tx)
1134                .await?;
1135
1136            if let Some(pending_participant) = pending_participant {
1137                let room = self.get_room(pending_participant.room_id, &tx).await?;
1138                Ok(Self::build_incoming_call(&room, user_id))
1139            } else {
1140                Ok(None)
1141            }
1142        })
1143        .await
1144    }
1145
1146    pub async fn create_room(
1147        &self,
1148        user_id: UserId,
1149        connection: ConnectionId,
1150        live_kit_room: &str,
1151    ) -> Result<RoomGuard<proto::Room>> {
1152        self.room_transaction(|tx| async move {
1153            let room = room::ActiveModel {
1154                live_kit_room: ActiveValue::set(live_kit_room.into()),
1155                ..Default::default()
1156            }
1157            .insert(&*tx)
1158            .await?;
1159            let room_id = room.id;
1160
1161            room_participant::ActiveModel {
1162                room_id: ActiveValue::set(room_id),
1163                user_id: ActiveValue::set(user_id),
1164                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1165                answering_connection_server_id: ActiveValue::set(Some(ServerId(
1166                    connection.owner_id as i32,
1167                ))),
1168                answering_connection_lost: ActiveValue::set(false),
1169                calling_user_id: ActiveValue::set(user_id),
1170                calling_connection_id: ActiveValue::set(connection.id as i32),
1171                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1172                    connection.owner_id as i32,
1173                ))),
1174                ..Default::default()
1175            }
1176            .insert(&*tx)
1177            .await?;
1178
1179            let room = self.get_room(room_id, &tx).await?;
1180            Ok((room_id, room))
1181        })
1182        .await
1183    }
1184
1185    pub async fn call(
1186        &self,
1187        room_id: RoomId,
1188        calling_user_id: UserId,
1189        calling_connection: ConnectionId,
1190        called_user_id: UserId,
1191        initial_project_id: Option<ProjectId>,
1192    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1193        self.room_transaction(|tx| async move {
1194            room_participant::ActiveModel {
1195                room_id: ActiveValue::set(room_id),
1196                user_id: ActiveValue::set(called_user_id),
1197                answering_connection_lost: ActiveValue::set(false),
1198                calling_user_id: ActiveValue::set(calling_user_id),
1199                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1200                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1201                    calling_connection.owner_id as i32,
1202                ))),
1203                initial_project_id: ActiveValue::set(initial_project_id),
1204                ..Default::default()
1205            }
1206            .insert(&*tx)
1207            .await?;
1208
1209            let room = self.get_room(room_id, &tx).await?;
1210            let incoming_call = Self::build_incoming_call(&room, called_user_id)
1211                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1212            Ok((room_id, (room, incoming_call)))
1213        })
1214        .await
1215    }
1216
1217    pub async fn call_failed(
1218        &self,
1219        room_id: RoomId,
1220        called_user_id: UserId,
1221    ) -> Result<RoomGuard<proto::Room>> {
1222        self.room_transaction(|tx| async move {
1223            room_participant::Entity::delete_many()
1224                .filter(
1225                    room_participant::Column::RoomId
1226                        .eq(room_id)
1227                        .and(room_participant::Column::UserId.eq(called_user_id)),
1228                )
1229                .exec(&*tx)
1230                .await?;
1231            let room = self.get_room(room_id, &tx).await?;
1232            Ok((room_id, room))
1233        })
1234        .await
1235    }
1236
1237    pub async fn decline_call(
1238        &self,
1239        expected_room_id: Option<RoomId>,
1240        user_id: UserId,
1241    ) -> Result<Option<RoomGuard<proto::Room>>> {
1242        self.optional_room_transaction(|tx| async move {
1243            let mut filter = Condition::all()
1244                .add(room_participant::Column::UserId.eq(user_id))
1245                .add(room_participant::Column::AnsweringConnectionId.is_null());
1246            if let Some(room_id) = expected_room_id {
1247                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1248            }
1249            let participant = room_participant::Entity::find()
1250                .filter(filter)
1251                .one(&*tx)
1252                .await?;
1253
1254            let participant = if let Some(participant) = participant {
1255                participant
1256            } else if expected_room_id.is_some() {
1257                return Err(anyhow!("could not find call to decline"))?;
1258            } else {
1259                return Ok(None);
1260            };
1261
1262            let room_id = participant.room_id;
1263            room_participant::Entity::delete(participant.into_active_model())
1264                .exec(&*tx)
1265                .await?;
1266
1267            let room = self.get_room(room_id, &tx).await?;
1268            Ok(Some((room_id, room)))
1269        })
1270        .await
1271    }
1272
1273    pub async fn cancel_call(
1274        &self,
1275        room_id: RoomId,
1276        calling_connection: ConnectionId,
1277        called_user_id: UserId,
1278    ) -> Result<RoomGuard<proto::Room>> {
1279        self.room_transaction(|tx| async move {
1280            let participant = room_participant::Entity::find()
1281                .filter(
1282                    Condition::all()
1283                        .add(room_participant::Column::UserId.eq(called_user_id))
1284                        .add(room_participant::Column::RoomId.eq(room_id))
1285                        .add(
1286                            room_participant::Column::CallingConnectionId
1287                                .eq(calling_connection.id as i32),
1288                        )
1289                        .add(
1290                            room_participant::Column::CallingConnectionServerId
1291                                .eq(calling_connection.owner_id as i32),
1292                        )
1293                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1294                )
1295                .one(&*tx)
1296                .await?
1297                .ok_or_else(|| anyhow!("no call to cancel"))?;
1298            let room_id = participant.room_id;
1299
1300            room_participant::Entity::delete(participant.into_active_model())
1301                .exec(&*tx)
1302                .await?;
1303
1304            let room = self.get_room(room_id, &tx).await?;
1305            Ok((room_id, room))
1306        })
1307        .await
1308    }
1309
1310    pub async fn join_room(
1311        &self,
1312        room_id: RoomId,
1313        user_id: UserId,
1314        connection: ConnectionId,
1315    ) -> Result<RoomGuard<proto::Room>> {
1316        self.room_transaction(|tx| async move {
1317            let result = room_participant::Entity::update_many()
1318                .filter(
1319                    Condition::all()
1320                        .add(room_participant::Column::RoomId.eq(room_id))
1321                        .add(room_participant::Column::UserId.eq(user_id))
1322                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1323                )
1324                .set(room_participant::ActiveModel {
1325                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1326                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1327                        connection.owner_id as i32,
1328                    ))),
1329                    answering_connection_lost: ActiveValue::set(false),
1330                    ..Default::default()
1331                })
1332                .exec(&*tx)
1333                .await?;
1334            if result.rows_affected == 0 {
1335                Err(anyhow!("room does not exist or was already joined"))?
1336            } else {
1337                let room = self.get_room(room_id, &tx).await?;
1338                Ok((room_id, room))
1339            }
1340        })
1341        .await
1342    }
1343
1344    pub async fn rejoin_room(
1345        &self,
1346        rejoin_room: proto::RejoinRoom,
1347        user_id: UserId,
1348        connection: ConnectionId,
1349    ) -> Result<RoomGuard<RejoinedRoom>> {
1350        self.room_transaction(|tx| async {
1351            let tx = tx;
1352            let room_id = RoomId::from_proto(rejoin_room.id);
1353            let participant_update = room_participant::Entity::update_many()
1354                .filter(
1355                    Condition::all()
1356                        .add(room_participant::Column::RoomId.eq(room_id))
1357                        .add(room_participant::Column::UserId.eq(user_id))
1358                        .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1359                        .add(
1360                            Condition::any()
1361                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1362                                .add(
1363                                    room_participant::Column::AnsweringConnectionServerId
1364                                        .ne(connection.owner_id as i32),
1365                                ),
1366                        ),
1367                )
1368                .set(room_participant::ActiveModel {
1369                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1370                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1371                        connection.owner_id as i32,
1372                    ))),
1373                    answering_connection_lost: ActiveValue::set(false),
1374                    ..Default::default()
1375                })
1376                .exec(&*tx)
1377                .await?;
1378            if participant_update.rows_affected == 0 {
1379                Err(anyhow!("room does not exist or was already joined"))?
1380            } else {
1381                let mut reshared_projects = Vec::new();
1382                for reshared_project in &rejoin_room.reshared_projects {
1383                    let project_id = ProjectId::from_proto(reshared_project.project_id);
1384                    let project = project::Entity::find_by_id(project_id)
1385                        .one(&*tx)
1386                        .await?
1387                        .ok_or_else(|| anyhow!("project does not exist"))?;
1388                    if project.host_user_id != user_id {
1389                        return Err(anyhow!("no such project"))?;
1390                    }
1391
1392                    let mut collaborators = project
1393                        .find_related(project_collaborator::Entity)
1394                        .all(&*tx)
1395                        .await?;
1396                    let host_ix = collaborators
1397                        .iter()
1398                        .position(|collaborator| {
1399                            collaborator.user_id == user_id && collaborator.is_host
1400                        })
1401                        .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1402                    let host = collaborators.swap_remove(host_ix);
1403                    let old_connection_id = host.connection();
1404
1405                    project::Entity::update(project::ActiveModel {
1406                        host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1407                        host_connection_server_id: ActiveValue::set(Some(ServerId(
1408                            connection.owner_id as i32,
1409                        ))),
1410                        ..project.into_active_model()
1411                    })
1412                    .exec(&*tx)
1413                    .await?;
1414                    project_collaborator::Entity::update(project_collaborator::ActiveModel {
1415                        connection_id: ActiveValue::set(connection.id as i32),
1416                        connection_server_id: ActiveValue::set(ServerId(
1417                            connection.owner_id as i32,
1418                        )),
1419                        ..host.into_active_model()
1420                    })
1421                    .exec(&*tx)
1422                    .await?;
1423
1424                    self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1425                        .await?;
1426
1427                    reshared_projects.push(ResharedProject {
1428                        id: project_id,
1429                        old_connection_id,
1430                        collaborators: collaborators
1431                            .iter()
1432                            .map(|collaborator| ProjectCollaborator {
1433                                connection_id: collaborator.connection(),
1434                                user_id: collaborator.user_id,
1435                                replica_id: collaborator.replica_id,
1436                                is_host: collaborator.is_host,
1437                            })
1438                            .collect(),
1439                        worktrees: reshared_project.worktrees.clone(),
1440                    });
1441                }
1442
1443                project::Entity::delete_many()
1444                    .filter(
1445                        Condition::all()
1446                            .add(project::Column::RoomId.eq(room_id))
1447                            .add(project::Column::HostUserId.eq(user_id))
1448                            .add(
1449                                project::Column::Id
1450                                    .is_not_in(reshared_projects.iter().map(|project| project.id)),
1451                            ),
1452                    )
1453                    .exec(&*tx)
1454                    .await?;
1455
1456                let mut rejoined_projects = Vec::new();
1457                for rejoined_project in &rejoin_room.rejoined_projects {
1458                    let project_id = ProjectId::from_proto(rejoined_project.id);
1459                    let Some(project) = project::Entity::find_by_id(project_id)
1460                        .one(&*tx)
1461                        .await? else {
1462                            continue
1463                        };
1464
1465                    let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1466                    let mut worktrees = Vec::new();
1467                    for db_worktree in db_worktrees {
1468                        let mut worktree = RejoinedWorktree {
1469                            id: db_worktree.id as u64,
1470                            abs_path: db_worktree.abs_path,
1471                            root_name: db_worktree.root_name,
1472                            visible: db_worktree.visible,
1473                            updated_entries: Default::default(),
1474                            removed_entries: Default::default(),
1475                            diagnostic_summaries: Default::default(),
1476                            scan_id: db_worktree.scan_id as u64,
1477                            is_complete: db_worktree.is_complete,
1478                        };
1479
1480                        let rejoined_worktree = rejoined_project
1481                            .worktrees
1482                            .iter()
1483                            .find(|worktree| worktree.id == db_worktree.id as u64);
1484
1485                        let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1486                            Condition::all()
1487                                .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1488                                .add(worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id))
1489                        } else {
1490                            Condition::all()
1491                                .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1492                                .add(worktree_entry::Column::IsDeleted.eq(false))
1493                        };
1494
1495                        let mut db_entries = worktree_entry::Entity::find()
1496                            .filter(entry_filter)
1497                            .stream(&*tx)
1498                            .await?;
1499
1500                        while let Some(db_entry) = db_entries.next().await {
1501                            let db_entry = db_entry?;
1502
1503                            if db_entry.is_deleted {
1504                                worktree.removed_entries.push(db_entry.id as u64);
1505                            } else {
1506                                worktree.updated_entries.push(proto::Entry {
1507                                    id: db_entry.id as u64,
1508                                    is_dir: db_entry.is_dir,
1509                                    path: db_entry.path,
1510                                    inode: db_entry.inode as u64,
1511                                    mtime: Some(proto::Timestamp {
1512                                        seconds: db_entry.mtime_seconds as u64,
1513                                        nanos: db_entry.mtime_nanos as u32,
1514                                    }),
1515                                    is_symlink: db_entry.is_symlink,
1516                                    is_ignored: db_entry.is_ignored,
1517                                });
1518                            }
1519                        }
1520
1521                        worktrees.push(worktree);
1522                    }
1523
1524                    let language_servers = project
1525                        .find_related(language_server::Entity)
1526                        .all(&*tx)
1527                        .await?
1528                        .into_iter()
1529                        .map(|language_server| proto::LanguageServer {
1530                            id: language_server.id as u64,
1531                            name: language_server.name,
1532                        })
1533                        .collect::<Vec<_>>();
1534
1535                    let mut collaborators = project
1536                        .find_related(project_collaborator::Entity)
1537                        .all(&*tx)
1538                        .await?
1539                        .into_iter()
1540                        .map(|collaborator| ProjectCollaborator {
1541                            connection_id: collaborator.connection(),
1542                            user_id: collaborator.user_id,
1543                            replica_id: collaborator.replica_id,
1544                            is_host: collaborator.is_host,
1545                        })
1546                        .collect::<Vec<_>>();
1547
1548                    let old_connection_id;
1549                    if let Some(self_collaborator_ix) = collaborators
1550                        .iter()
1551                        .position(|collaborator| collaborator.user_id == user_id)
1552                    {
1553                        let self_collaborator = collaborators.swap_remove(self_collaborator_ix);
1554                        old_connection_id = self_collaborator.connection_id;
1555                    } else {
1556                        continue;
1557                    }
1558
1559                    rejoined_projects.push(RejoinedProject {
1560                        id: project_id,
1561                        old_connection_id,
1562                        collaborators,
1563                        worktrees,
1564                        language_servers,
1565                    });
1566                }
1567
1568                let room = self.get_room(room_id, &tx).await?;
1569                Ok((
1570                    room_id,
1571                    RejoinedRoom {
1572                        room,
1573                        rejoined_projects,
1574                        reshared_projects,
1575                    },
1576                ))
1577            }
1578        })
1579        .await
1580    }
1581
1582    pub async fn leave_room(
1583        &self,
1584        connection: ConnectionId,
1585    ) -> Result<Option<RoomGuard<LeftRoom>>> {
1586        self.optional_room_transaction(|tx| async move {
1587            let leaving_participant = room_participant::Entity::find()
1588                .filter(
1589                    Condition::all()
1590                        .add(
1591                            room_participant::Column::AnsweringConnectionId
1592                                .eq(connection.id as i32),
1593                        )
1594                        .add(
1595                            room_participant::Column::AnsweringConnectionServerId
1596                                .eq(connection.owner_id as i32),
1597                        ),
1598                )
1599                .one(&*tx)
1600                .await?;
1601
1602            if let Some(leaving_participant) = leaving_participant {
1603                // Leave room.
1604                let room_id = leaving_participant.room_id;
1605                room_participant::Entity::delete_by_id(leaving_participant.id)
1606                    .exec(&*tx)
1607                    .await?;
1608
1609                // Cancel pending calls initiated by the leaving user.
1610                let called_participants = room_participant::Entity::find()
1611                    .filter(
1612                        Condition::all()
1613                            .add(
1614                                room_participant::Column::CallingConnectionId
1615                                    .eq(connection.id as i32),
1616                            )
1617                            .add(
1618                                room_participant::Column::CallingConnectionServerId
1619                                    .eq(connection.owner_id as i32),
1620                            )
1621                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
1622                    )
1623                    .all(&*tx)
1624                    .await?;
1625                room_participant::Entity::delete_many()
1626                    .filter(
1627                        room_participant::Column::Id
1628                            .is_in(called_participants.iter().map(|participant| participant.id)),
1629                    )
1630                    .exec(&*tx)
1631                    .await?;
1632                let canceled_calls_to_user_ids = called_participants
1633                    .into_iter()
1634                    .map(|participant| participant.user_id)
1635                    .collect();
1636
1637                // Detect left projects.
1638                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1639                enum QueryProjectIds {
1640                    ProjectId,
1641                }
1642                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1643                    .select_only()
1644                    .column_as(
1645                        project_collaborator::Column::ProjectId,
1646                        QueryProjectIds::ProjectId,
1647                    )
1648                    .filter(
1649                        Condition::all()
1650                            .add(
1651                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1652                            )
1653                            .add(
1654                                project_collaborator::Column::ConnectionServerId
1655                                    .eq(connection.owner_id as i32),
1656                            ),
1657                    )
1658                    .into_values::<_, QueryProjectIds>()
1659                    .all(&*tx)
1660                    .await?;
1661                let mut left_projects = HashMap::default();
1662                let mut collaborators = project_collaborator::Entity::find()
1663                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1664                    .stream(&*tx)
1665                    .await?;
1666                while let Some(collaborator) = collaborators.next().await {
1667                    let collaborator = collaborator?;
1668                    let left_project =
1669                        left_projects
1670                            .entry(collaborator.project_id)
1671                            .or_insert(LeftProject {
1672                                id: collaborator.project_id,
1673                                host_user_id: Default::default(),
1674                                connection_ids: Default::default(),
1675                                host_connection_id: Default::default(),
1676                            });
1677
1678                    let collaborator_connection_id = collaborator.connection();
1679                    if collaborator_connection_id != connection {
1680                        left_project.connection_ids.push(collaborator_connection_id);
1681                    }
1682
1683                    if collaborator.is_host {
1684                        left_project.host_user_id = collaborator.user_id;
1685                        left_project.host_connection_id = collaborator_connection_id;
1686                    }
1687                }
1688                drop(collaborators);
1689
1690                // Leave projects.
1691                project_collaborator::Entity::delete_many()
1692                    .filter(
1693                        Condition::all()
1694                            .add(
1695                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1696                            )
1697                            .add(
1698                                project_collaborator::Column::ConnectionServerId
1699                                    .eq(connection.owner_id as i32),
1700                            ),
1701                    )
1702                    .exec(&*tx)
1703                    .await?;
1704
1705                // Unshare projects.
1706                project::Entity::delete_many()
1707                    .filter(
1708                        Condition::all()
1709                            .add(project::Column::RoomId.eq(room_id))
1710                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
1711                            .add(
1712                                project::Column::HostConnectionServerId
1713                                    .eq(connection.owner_id as i32),
1714                            ),
1715                    )
1716                    .exec(&*tx)
1717                    .await?;
1718
1719                let room = self.get_room(room_id, &tx).await?;
1720                if room.participants.is_empty() {
1721                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
1722                }
1723
1724                let left_room = LeftRoom {
1725                    room,
1726                    left_projects,
1727                    canceled_calls_to_user_ids,
1728                };
1729
1730                if left_room.room.participants.is_empty() {
1731                    self.rooms.remove(&room_id);
1732                }
1733
1734                Ok(Some((room_id, left_room)))
1735            } else {
1736                Ok(None)
1737            }
1738        })
1739        .await
1740    }
1741
1742    pub async fn update_room_participant_location(
1743        &self,
1744        room_id: RoomId,
1745        connection: ConnectionId,
1746        location: proto::ParticipantLocation,
1747    ) -> Result<RoomGuard<proto::Room>> {
1748        self.room_transaction(|tx| async {
1749            let tx = tx;
1750            let location_kind;
1751            let location_project_id;
1752            match location
1753                .variant
1754                .as_ref()
1755                .ok_or_else(|| anyhow!("invalid location"))?
1756            {
1757                proto::participant_location::Variant::SharedProject(project) => {
1758                    location_kind = 0;
1759                    location_project_id = Some(ProjectId::from_proto(project.id));
1760                }
1761                proto::participant_location::Variant::UnsharedProject(_) => {
1762                    location_kind = 1;
1763                    location_project_id = None;
1764                }
1765                proto::participant_location::Variant::External(_) => {
1766                    location_kind = 2;
1767                    location_project_id = None;
1768                }
1769            }
1770
1771            let result = room_participant::Entity::update_many()
1772                .filter(
1773                    Condition::all()
1774                        .add(room_participant::Column::RoomId.eq(room_id))
1775                        .add(
1776                            room_participant::Column::AnsweringConnectionId
1777                                .eq(connection.id as i32),
1778                        )
1779                        .add(
1780                            room_participant::Column::AnsweringConnectionServerId
1781                                .eq(connection.owner_id as i32),
1782                        ),
1783                )
1784                .set(room_participant::ActiveModel {
1785                    location_kind: ActiveValue::set(Some(location_kind)),
1786                    location_project_id: ActiveValue::set(location_project_id),
1787                    ..Default::default()
1788                })
1789                .exec(&*tx)
1790                .await?;
1791
1792            if result.rows_affected == 1 {
1793                let room = self.get_room(room_id, &tx).await?;
1794                Ok((room_id, room))
1795            } else {
1796                Err(anyhow!("could not update room participant location"))?
1797            }
1798        })
1799        .await
1800    }
1801
1802    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1803        self.transaction(|tx| async move {
1804            let participant = room_participant::Entity::find()
1805                .filter(
1806                    Condition::all()
1807                        .add(
1808                            room_participant::Column::AnsweringConnectionId
1809                                .eq(connection.id as i32),
1810                        )
1811                        .add(
1812                            room_participant::Column::AnsweringConnectionServerId
1813                                .eq(connection.owner_id as i32),
1814                        ),
1815                )
1816                .one(&*tx)
1817                .await?
1818                .ok_or_else(|| anyhow!("not a participant in any room"))?;
1819
1820            room_participant::Entity::update(room_participant::ActiveModel {
1821                answering_connection_lost: ActiveValue::set(true),
1822                ..participant.into_active_model()
1823            })
1824            .exec(&*tx)
1825            .await?;
1826
1827            Ok(())
1828        })
1829        .await
1830    }
1831
1832    fn build_incoming_call(
1833        room: &proto::Room,
1834        called_user_id: UserId,
1835    ) -> Option<proto::IncomingCall> {
1836        let pending_participant = room
1837            .pending_participants
1838            .iter()
1839            .find(|participant| participant.user_id == called_user_id.to_proto())?;
1840
1841        Some(proto::IncomingCall {
1842            room_id: room.id,
1843            calling_user_id: pending_participant.calling_user_id,
1844            participant_user_ids: room
1845                .participants
1846                .iter()
1847                .map(|participant| participant.user_id)
1848                .collect(),
1849            initial_project: room.participants.iter().find_map(|participant| {
1850                let initial_project_id = pending_participant.initial_project_id?;
1851                participant
1852                    .projects
1853                    .iter()
1854                    .find(|project| project.id == initial_project_id)
1855                    .cloned()
1856            }),
1857        })
1858    }
1859
1860    async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1861        let db_room = room::Entity::find_by_id(room_id)
1862            .one(tx)
1863            .await?
1864            .ok_or_else(|| anyhow!("could not find room"))?;
1865
1866        let mut db_participants = db_room
1867            .find_related(room_participant::Entity)
1868            .stream(tx)
1869            .await?;
1870        let mut participants = HashMap::default();
1871        let mut pending_participants = Vec::new();
1872        while let Some(db_participant) = db_participants.next().await {
1873            let db_participant = db_participant?;
1874            if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
1875                .answering_connection_id
1876                .zip(db_participant.answering_connection_server_id)
1877            {
1878                let location = match (
1879                    db_participant.location_kind,
1880                    db_participant.location_project_id,
1881                ) {
1882                    (Some(0), Some(project_id)) => {
1883                        Some(proto::participant_location::Variant::SharedProject(
1884                            proto::participant_location::SharedProject {
1885                                id: project_id.to_proto(),
1886                            },
1887                        ))
1888                    }
1889                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1890                        Default::default(),
1891                    )),
1892                    _ => Some(proto::participant_location::Variant::External(
1893                        Default::default(),
1894                    )),
1895                };
1896
1897                let answering_connection = ConnectionId {
1898                    owner_id: answering_connection_server_id.0 as u32,
1899                    id: answering_connection_id as u32,
1900                };
1901                participants.insert(
1902                    answering_connection,
1903                    proto::Participant {
1904                        user_id: db_participant.user_id.to_proto(),
1905                        peer_id: Some(answering_connection.into()),
1906                        projects: Default::default(),
1907                        location: Some(proto::ParticipantLocation { variant: location }),
1908                    },
1909                );
1910            } else {
1911                pending_participants.push(proto::PendingParticipant {
1912                    user_id: db_participant.user_id.to_proto(),
1913                    calling_user_id: db_participant.calling_user_id.to_proto(),
1914                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1915                });
1916            }
1917        }
1918        drop(db_participants);
1919
1920        let mut db_projects = db_room
1921            .find_related(project::Entity)
1922            .find_with_related(worktree::Entity)
1923            .stream(tx)
1924            .await?;
1925
1926        while let Some(row) = db_projects.next().await {
1927            let (db_project, db_worktree) = row?;
1928            let host_connection = db_project.host_connection()?;
1929            if let Some(participant) = participants.get_mut(&host_connection) {
1930                let project = if let Some(project) = participant
1931                    .projects
1932                    .iter_mut()
1933                    .find(|project| project.id == db_project.id.to_proto())
1934                {
1935                    project
1936                } else {
1937                    participant.projects.push(proto::ParticipantProject {
1938                        id: db_project.id.to_proto(),
1939                        worktree_root_names: Default::default(),
1940                    });
1941                    participant.projects.last_mut().unwrap()
1942                };
1943
1944                if let Some(db_worktree) = db_worktree {
1945                    project.worktree_root_names.push(db_worktree.root_name);
1946                }
1947            }
1948        }
1949
1950        Ok(proto::Room {
1951            id: db_room.id.to_proto(),
1952            live_kit_room: db_room.live_kit_room,
1953            participants: participants.into_values().collect(),
1954            pending_participants,
1955        })
1956    }
1957
1958    // projects
1959
1960    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
1961        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1962        enum QueryAs {
1963            Count,
1964        }
1965
1966        self.transaction(|tx| async move {
1967            Ok(project::Entity::find()
1968                .select_only()
1969                .column_as(project::Column::Id.count(), QueryAs::Count)
1970                .inner_join(user::Entity)
1971                .filter(user::Column::Admin.eq(false))
1972                .into_values::<_, QueryAs>()
1973                .one(&*tx)
1974                .await?
1975                .unwrap_or(0i64) as usize)
1976        })
1977        .await
1978    }
1979
1980    pub async fn share_project(
1981        &self,
1982        room_id: RoomId,
1983        connection: ConnectionId,
1984        worktrees: &[proto::WorktreeMetadata],
1985    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
1986        self.room_transaction(|tx| async move {
1987            let participant = room_participant::Entity::find()
1988                .filter(
1989                    Condition::all()
1990                        .add(
1991                            room_participant::Column::AnsweringConnectionId
1992                                .eq(connection.id as i32),
1993                        )
1994                        .add(
1995                            room_participant::Column::AnsweringConnectionServerId
1996                                .eq(connection.owner_id as i32),
1997                        ),
1998                )
1999                .one(&*tx)
2000                .await?
2001                .ok_or_else(|| anyhow!("could not find participant"))?;
2002            if participant.room_id != room_id {
2003                return Err(anyhow!("shared project on unexpected room"))?;
2004            }
2005
2006            let project = project::ActiveModel {
2007                room_id: ActiveValue::set(participant.room_id),
2008                host_user_id: ActiveValue::set(participant.user_id),
2009                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
2010                host_connection_server_id: ActiveValue::set(Some(ServerId(
2011                    connection.owner_id as i32,
2012                ))),
2013                ..Default::default()
2014            }
2015            .insert(&*tx)
2016            .await?;
2017
2018            if !worktrees.is_empty() {
2019                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2020                    worktree::ActiveModel {
2021                        id: ActiveValue::set(worktree.id as i64),
2022                        project_id: ActiveValue::set(project.id),
2023                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
2024                        root_name: ActiveValue::set(worktree.root_name.clone()),
2025                        visible: ActiveValue::set(worktree.visible),
2026                        scan_id: ActiveValue::set(0),
2027                        is_complete: ActiveValue::set(false),
2028                    }
2029                }))
2030                .exec(&*tx)
2031                .await?;
2032            }
2033
2034            project_collaborator::ActiveModel {
2035                project_id: ActiveValue::set(project.id),
2036                connection_id: ActiveValue::set(connection.id as i32),
2037                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2038                user_id: ActiveValue::set(participant.user_id),
2039                replica_id: ActiveValue::set(ReplicaId(0)),
2040                is_host: ActiveValue::set(true),
2041                ..Default::default()
2042            }
2043            .insert(&*tx)
2044            .await?;
2045
2046            let room = self.get_room(room_id, &tx).await?;
2047            Ok((room_id, (project.id, room)))
2048        })
2049        .await
2050    }
2051
2052    pub async fn unshare_project(
2053        &self,
2054        project_id: ProjectId,
2055        connection: ConnectionId,
2056    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2057        self.room_transaction(|tx| async move {
2058            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2059
2060            let project = project::Entity::find_by_id(project_id)
2061                .one(&*tx)
2062                .await?
2063                .ok_or_else(|| anyhow!("project not found"))?;
2064            if project.host_connection()? == connection {
2065                let room_id = project.room_id;
2066                project::Entity::delete(project.into_active_model())
2067                    .exec(&*tx)
2068                    .await?;
2069                let room = self.get_room(room_id, &tx).await?;
2070                Ok((room_id, (room, guest_connection_ids)))
2071            } else {
2072                Err(anyhow!("cannot unshare a project hosted by another user"))?
2073            }
2074        })
2075        .await
2076    }
2077
2078    pub async fn update_project(
2079        &self,
2080        project_id: ProjectId,
2081        connection: ConnectionId,
2082        worktrees: &[proto::WorktreeMetadata],
2083    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2084        self.room_transaction(|tx| async move {
2085            let project = project::Entity::find_by_id(project_id)
2086                .filter(
2087                    Condition::all()
2088                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2089                        .add(
2090                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2091                        ),
2092                )
2093                .one(&*tx)
2094                .await?
2095                .ok_or_else(|| anyhow!("no such project"))?;
2096
2097            self.update_project_worktrees(project.id, worktrees, &tx)
2098                .await?;
2099
2100            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2101            let room = self.get_room(project.room_id, &tx).await?;
2102            Ok((project.room_id, (room, guest_connection_ids)))
2103        })
2104        .await
2105    }
2106
2107    async fn update_project_worktrees(
2108        &self,
2109        project_id: ProjectId,
2110        worktrees: &[proto::WorktreeMetadata],
2111        tx: &DatabaseTransaction,
2112    ) -> Result<()> {
2113        if !worktrees.is_empty() {
2114            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2115                id: ActiveValue::set(worktree.id as i64),
2116                project_id: ActiveValue::set(project_id),
2117                abs_path: ActiveValue::set(worktree.abs_path.clone()),
2118                root_name: ActiveValue::set(worktree.root_name.clone()),
2119                visible: ActiveValue::set(worktree.visible),
2120                scan_id: ActiveValue::set(0),
2121                is_complete: ActiveValue::set(false),
2122            }))
2123            .on_conflict(
2124                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2125                    .update_column(worktree::Column::RootName)
2126                    .to_owned(),
2127            )
2128            .exec(&*tx)
2129            .await?;
2130        }
2131
2132        worktree::Entity::delete_many()
2133            .filter(worktree::Column::ProjectId.eq(project_id).and(
2134                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2135            ))
2136            .exec(&*tx)
2137            .await?;
2138
2139        Ok(())
2140    }
2141
2142    pub async fn update_worktree(
2143        &self,
2144        update: &proto::UpdateWorktree,
2145        connection: ConnectionId,
2146    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2147        self.room_transaction(|tx| async move {
2148            let project_id = ProjectId::from_proto(update.project_id);
2149            let worktree_id = update.worktree_id as i64;
2150
2151            // Ensure the update comes from the host.
2152            let project = project::Entity::find_by_id(project_id)
2153                .filter(
2154                    Condition::all()
2155                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2156                        .add(
2157                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2158                        ),
2159                )
2160                .one(&*tx)
2161                .await?
2162                .ok_or_else(|| anyhow!("no such project"))?;
2163            let room_id = project.room_id;
2164
2165            // Update metadata.
2166            worktree::Entity::update(worktree::ActiveModel {
2167                id: ActiveValue::set(worktree_id),
2168                project_id: ActiveValue::set(project_id),
2169                root_name: ActiveValue::set(update.root_name.clone()),
2170                scan_id: ActiveValue::set(update.scan_id as i64),
2171                is_complete: ActiveValue::set(update.is_last_update),
2172                abs_path: ActiveValue::set(update.abs_path.clone()),
2173                ..Default::default()
2174            })
2175            .exec(&*tx)
2176            .await?;
2177
2178            if !update.updated_entries.is_empty() {
2179                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2180                    let mtime = entry.mtime.clone().unwrap_or_default();
2181                    worktree_entry::ActiveModel {
2182                        project_id: ActiveValue::set(project_id),
2183                        worktree_id: ActiveValue::set(worktree_id),
2184                        id: ActiveValue::set(entry.id as i64),
2185                        is_dir: ActiveValue::set(entry.is_dir),
2186                        path: ActiveValue::set(entry.path.clone()),
2187                        inode: ActiveValue::set(entry.inode as i64),
2188                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2189                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2190                        is_symlink: ActiveValue::set(entry.is_symlink),
2191                        is_ignored: ActiveValue::set(entry.is_ignored),
2192                        is_deleted: ActiveValue::set(false),
2193                        scan_id: ActiveValue::set(update.scan_id as i64),
2194                    }
2195                }))
2196                .on_conflict(
2197                    OnConflict::columns([
2198                        worktree_entry::Column::ProjectId,
2199                        worktree_entry::Column::WorktreeId,
2200                        worktree_entry::Column::Id,
2201                    ])
2202                    .update_columns([
2203                        worktree_entry::Column::IsDir,
2204                        worktree_entry::Column::Path,
2205                        worktree_entry::Column::Inode,
2206                        worktree_entry::Column::MtimeSeconds,
2207                        worktree_entry::Column::MtimeNanos,
2208                        worktree_entry::Column::IsSymlink,
2209                        worktree_entry::Column::IsIgnored,
2210                    ])
2211                    .to_owned(),
2212                )
2213                .exec(&*tx)
2214                .await?;
2215            }
2216
2217            if !update.removed_entries.is_empty() {
2218                worktree_entry::Entity::update_many()
2219                    .filter(
2220                        worktree_entry::Column::ProjectId
2221                            .eq(project_id)
2222                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2223                            .and(
2224                                worktree_entry::Column::Id
2225                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2226                            ),
2227                    )
2228                    .set(worktree_entry::ActiveModel {
2229                        is_deleted: ActiveValue::Set(true),
2230                        scan_id: ActiveValue::Set(update.scan_id as i64),
2231                        ..Default::default()
2232                    })
2233                    .exec(&*tx)
2234                    .await?;
2235            }
2236
2237            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2238            Ok((room_id, connection_ids))
2239        })
2240        .await
2241    }
2242
2243    pub async fn update_diagnostic_summary(
2244        &self,
2245        update: &proto::UpdateDiagnosticSummary,
2246        connection: ConnectionId,
2247    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2248        self.room_transaction(|tx| async move {
2249            let project_id = ProjectId::from_proto(update.project_id);
2250            let worktree_id = update.worktree_id as i64;
2251            let summary = update
2252                .summary
2253                .as_ref()
2254                .ok_or_else(|| anyhow!("invalid summary"))?;
2255
2256            // Ensure the update comes from the host.
2257            let project = project::Entity::find_by_id(project_id)
2258                .one(&*tx)
2259                .await?
2260                .ok_or_else(|| anyhow!("no such project"))?;
2261            if project.host_connection()? != connection {
2262                return Err(anyhow!("can't update a project hosted by someone else"))?;
2263            }
2264
2265            // Update summary.
2266            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2267                project_id: ActiveValue::set(project_id),
2268                worktree_id: ActiveValue::set(worktree_id),
2269                path: ActiveValue::set(summary.path.clone()),
2270                language_server_id: ActiveValue::set(summary.language_server_id as i64),
2271                error_count: ActiveValue::set(summary.error_count as i32),
2272                warning_count: ActiveValue::set(summary.warning_count as i32),
2273                ..Default::default()
2274            })
2275            .on_conflict(
2276                OnConflict::columns([
2277                    worktree_diagnostic_summary::Column::ProjectId,
2278                    worktree_diagnostic_summary::Column::WorktreeId,
2279                    worktree_diagnostic_summary::Column::Path,
2280                ])
2281                .update_columns([
2282                    worktree_diagnostic_summary::Column::LanguageServerId,
2283                    worktree_diagnostic_summary::Column::ErrorCount,
2284                    worktree_diagnostic_summary::Column::WarningCount,
2285                ])
2286                .to_owned(),
2287            )
2288            .exec(&*tx)
2289            .await?;
2290
2291            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2292            Ok((project.room_id, connection_ids))
2293        })
2294        .await
2295    }
2296
2297    pub async fn start_language_server(
2298        &self,
2299        update: &proto::StartLanguageServer,
2300        connection: ConnectionId,
2301    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2302        self.room_transaction(|tx| async move {
2303            let project_id = ProjectId::from_proto(update.project_id);
2304            let server = update
2305                .server
2306                .as_ref()
2307                .ok_or_else(|| anyhow!("invalid language server"))?;
2308
2309            // Ensure the update comes from the host.
2310            let project = project::Entity::find_by_id(project_id)
2311                .one(&*tx)
2312                .await?
2313                .ok_or_else(|| anyhow!("no such project"))?;
2314            if project.host_connection()? != connection {
2315                return Err(anyhow!("can't update a project hosted by someone else"))?;
2316            }
2317
2318            // Add the newly-started language server.
2319            language_server::Entity::insert(language_server::ActiveModel {
2320                project_id: ActiveValue::set(project_id),
2321                id: ActiveValue::set(server.id as i64),
2322                name: ActiveValue::set(server.name.clone()),
2323                ..Default::default()
2324            })
2325            .on_conflict(
2326                OnConflict::columns([
2327                    language_server::Column::ProjectId,
2328                    language_server::Column::Id,
2329                ])
2330                .update_column(language_server::Column::Name)
2331                .to_owned(),
2332            )
2333            .exec(&*tx)
2334            .await?;
2335
2336            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2337            Ok((project.room_id, connection_ids))
2338        })
2339        .await
2340    }
2341
2342    pub async fn join_project(
2343        &self,
2344        project_id: ProjectId,
2345        connection: ConnectionId,
2346    ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2347        self.room_transaction(|tx| async move {
2348            let participant = room_participant::Entity::find()
2349                .filter(
2350                    Condition::all()
2351                        .add(
2352                            room_participant::Column::AnsweringConnectionId
2353                                .eq(connection.id as i32),
2354                        )
2355                        .add(
2356                            room_participant::Column::AnsweringConnectionServerId
2357                                .eq(connection.owner_id as i32),
2358                        ),
2359                )
2360                .one(&*tx)
2361                .await?
2362                .ok_or_else(|| anyhow!("must join a room first"))?;
2363
2364            let project = project::Entity::find_by_id(project_id)
2365                .one(&*tx)
2366                .await?
2367                .ok_or_else(|| anyhow!("no such project"))?;
2368            if project.room_id != participant.room_id {
2369                return Err(anyhow!("no such project"))?;
2370            }
2371
2372            let mut collaborators = project
2373                .find_related(project_collaborator::Entity)
2374                .all(&*tx)
2375                .await?;
2376            let replica_ids = collaborators
2377                .iter()
2378                .map(|c| c.replica_id)
2379                .collect::<HashSet<_>>();
2380            let mut replica_id = ReplicaId(1);
2381            while replica_ids.contains(&replica_id) {
2382                replica_id.0 += 1;
2383            }
2384            let new_collaborator = project_collaborator::ActiveModel {
2385                project_id: ActiveValue::set(project_id),
2386                connection_id: ActiveValue::set(connection.id as i32),
2387                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2388                user_id: ActiveValue::set(participant.user_id),
2389                replica_id: ActiveValue::set(replica_id),
2390                is_host: ActiveValue::set(false),
2391                ..Default::default()
2392            }
2393            .insert(&*tx)
2394            .await?;
2395            collaborators.push(new_collaborator);
2396
2397            let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2398            let mut worktrees = db_worktrees
2399                .into_iter()
2400                .map(|db_worktree| {
2401                    (
2402                        db_worktree.id as u64,
2403                        Worktree {
2404                            id: db_worktree.id as u64,
2405                            abs_path: db_worktree.abs_path,
2406                            root_name: db_worktree.root_name,
2407                            visible: db_worktree.visible,
2408                            entries: Default::default(),
2409                            diagnostic_summaries: Default::default(),
2410                            scan_id: db_worktree.scan_id as u64,
2411                            is_complete: db_worktree.is_complete,
2412                        },
2413                    )
2414                })
2415                .collect::<BTreeMap<_, _>>();
2416
2417            // Populate worktree entries.
2418            {
2419                let mut db_entries = worktree_entry::Entity::find()
2420                    .filter(worktree_entry::Column::ProjectId.eq(project_id))
2421                    .stream(&*tx)
2422                    .await?;
2423                while let Some(db_entry) = db_entries.next().await {
2424                    let db_entry = db_entry?;
2425                    if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2426                        worktree.entries.push(proto::Entry {
2427                            id: db_entry.id as u64,
2428                            is_dir: db_entry.is_dir,
2429                            path: db_entry.path,
2430                            inode: db_entry.inode as u64,
2431                            mtime: Some(proto::Timestamp {
2432                                seconds: db_entry.mtime_seconds as u64,
2433                                nanos: db_entry.mtime_nanos as u32,
2434                            }),
2435                            is_symlink: db_entry.is_symlink,
2436                            is_ignored: db_entry.is_ignored,
2437                        });
2438                    }
2439                }
2440            }
2441
2442            // Populate worktree diagnostic summaries.
2443            {
2444                let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2445                    .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2446                    .stream(&*tx)
2447                    .await?;
2448                while let Some(db_summary) = db_summaries.next().await {
2449                    let db_summary = db_summary?;
2450                    if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2451                        worktree
2452                            .diagnostic_summaries
2453                            .push(proto::DiagnosticSummary {
2454                                path: db_summary.path,
2455                                language_server_id: db_summary.language_server_id as u64,
2456                                error_count: db_summary.error_count as u32,
2457                                warning_count: db_summary.warning_count as u32,
2458                            });
2459                    }
2460                }
2461            }
2462
2463            // Populate language servers.
2464            let language_servers = project
2465                .find_related(language_server::Entity)
2466                .all(&*tx)
2467                .await?;
2468
2469            let room_id = project.room_id;
2470            let project = Project {
2471                collaborators: collaborators
2472                    .into_iter()
2473                    .map(|collaborator| ProjectCollaborator {
2474                        connection_id: collaborator.connection(),
2475                        user_id: collaborator.user_id,
2476                        replica_id: collaborator.replica_id,
2477                        is_host: collaborator.is_host,
2478                    })
2479                    .collect(),
2480                worktrees,
2481                language_servers: language_servers
2482                    .into_iter()
2483                    .map(|language_server| proto::LanguageServer {
2484                        id: language_server.id as u64,
2485                        name: language_server.name,
2486                    })
2487                    .collect(),
2488            };
2489            Ok((room_id, (project, replica_id as ReplicaId)))
2490        })
2491        .await
2492    }
2493
2494    pub async fn leave_project(
2495        &self,
2496        project_id: ProjectId,
2497        connection: ConnectionId,
2498    ) -> Result<RoomGuard<LeftProject>> {
2499        self.room_transaction(|tx| async move {
2500            let result = project_collaborator::Entity::delete_many()
2501                .filter(
2502                    Condition::all()
2503                        .add(project_collaborator::Column::ProjectId.eq(project_id))
2504                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2505                        .add(
2506                            project_collaborator::Column::ConnectionServerId
2507                                .eq(connection.owner_id as i32),
2508                        ),
2509                )
2510                .exec(&*tx)
2511                .await?;
2512            if result.rows_affected == 0 {
2513                Err(anyhow!("not a collaborator on this project"))?;
2514            }
2515
2516            let project = project::Entity::find_by_id(project_id)
2517                .one(&*tx)
2518                .await?
2519                .ok_or_else(|| anyhow!("no such project"))?;
2520            let collaborators = project
2521                .find_related(project_collaborator::Entity)
2522                .all(&*tx)
2523                .await?;
2524            let connection_ids = collaborators
2525                .into_iter()
2526                .map(|collaborator| collaborator.connection())
2527                .collect();
2528
2529            let left_project = LeftProject {
2530                id: project_id,
2531                host_user_id: project.host_user_id,
2532                host_connection_id: project.host_connection()?,
2533                connection_ids,
2534            };
2535            Ok((project.room_id, left_project))
2536        })
2537        .await
2538    }
2539
2540    pub async fn project_collaborators(
2541        &self,
2542        project_id: ProjectId,
2543        connection_id: ConnectionId,
2544    ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
2545        self.room_transaction(|tx| async move {
2546            let project = project::Entity::find_by_id(project_id)
2547                .one(&*tx)
2548                .await?
2549                .ok_or_else(|| anyhow!("no such project"))?;
2550            let collaborators = project_collaborator::Entity::find()
2551                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2552                .all(&*tx)
2553                .await?
2554                .into_iter()
2555                .map(|collaborator| ProjectCollaborator {
2556                    connection_id: collaborator.connection(),
2557                    user_id: collaborator.user_id,
2558                    replica_id: collaborator.replica_id,
2559                    is_host: collaborator.is_host,
2560                })
2561                .collect::<Vec<_>>();
2562
2563            if collaborators
2564                .iter()
2565                .any(|collaborator| collaborator.connection_id == connection_id)
2566            {
2567                Ok((project.room_id, collaborators))
2568            } else {
2569                Err(anyhow!("no such project"))?
2570            }
2571        })
2572        .await
2573    }
2574
2575    pub async fn project_connection_ids(
2576        &self,
2577        project_id: ProjectId,
2578        connection_id: ConnectionId,
2579    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
2580        self.room_transaction(|tx| async move {
2581            let project = project::Entity::find_by_id(project_id)
2582                .one(&*tx)
2583                .await?
2584                .ok_or_else(|| anyhow!("no such project"))?;
2585            let mut collaborators = project_collaborator::Entity::find()
2586                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2587                .stream(&*tx)
2588                .await?;
2589
2590            let mut connection_ids = HashSet::default();
2591            while let Some(collaborator) = collaborators.next().await {
2592                let collaborator = collaborator?;
2593                connection_ids.insert(collaborator.connection());
2594            }
2595
2596            if connection_ids.contains(&connection_id) {
2597                Ok((project.room_id, connection_ids))
2598            } else {
2599                Err(anyhow!("no such project"))?
2600            }
2601        })
2602        .await
2603    }
2604
2605    async fn project_guest_connection_ids(
2606        &self,
2607        project_id: ProjectId,
2608        tx: &DatabaseTransaction,
2609    ) -> Result<Vec<ConnectionId>> {
2610        let mut collaborators = project_collaborator::Entity::find()
2611            .filter(
2612                project_collaborator::Column::ProjectId
2613                    .eq(project_id)
2614                    .and(project_collaborator::Column::IsHost.eq(false)),
2615            )
2616            .stream(tx)
2617            .await?;
2618
2619        let mut guest_connection_ids = Vec::new();
2620        while let Some(collaborator) = collaborators.next().await {
2621            let collaborator = collaborator?;
2622            guest_connection_ids.push(collaborator.connection());
2623        }
2624        Ok(guest_connection_ids)
2625    }
2626
2627    // access tokens
2628
2629    pub async fn create_access_token_hash(
2630        &self,
2631        user_id: UserId,
2632        access_token_hash: &str,
2633        max_access_token_count: usize,
2634    ) -> Result<()> {
2635        self.transaction(|tx| async {
2636            let tx = tx;
2637
2638            access_token::ActiveModel {
2639                user_id: ActiveValue::set(user_id),
2640                hash: ActiveValue::set(access_token_hash.into()),
2641                ..Default::default()
2642            }
2643            .insert(&*tx)
2644            .await?;
2645
2646            access_token::Entity::delete_many()
2647                .filter(
2648                    access_token::Column::Id.in_subquery(
2649                        Query::select()
2650                            .column(access_token::Column::Id)
2651                            .from(access_token::Entity)
2652                            .and_where(access_token::Column::UserId.eq(user_id))
2653                            .order_by(access_token::Column::Id, sea_orm::Order::Desc)
2654                            .limit(10000)
2655                            .offset(max_access_token_count as u64)
2656                            .to_owned(),
2657                    ),
2658                )
2659                .exec(&*tx)
2660                .await?;
2661            Ok(())
2662        })
2663        .await
2664    }
2665
2666    pub async fn get_access_token_hashes(&self, user_id: UserId) -> Result<Vec<String>> {
2667        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2668        enum QueryAs {
2669            Hash,
2670        }
2671
2672        self.transaction(|tx| async move {
2673            Ok(access_token::Entity::find()
2674                .select_only()
2675                .column(access_token::Column::Hash)
2676                .filter(access_token::Column::UserId.eq(user_id))
2677                .order_by_desc(access_token::Column::Id)
2678                .into_values::<_, QueryAs>()
2679                .all(&*tx)
2680                .await?)
2681        })
2682        .await
2683    }
2684
2685    async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
2686    where
2687        F: Send + Fn(TransactionHandle) -> Fut,
2688        Fut: Send + Future<Output = Result<T>>,
2689    {
2690        let body = async {
2691            loop {
2692                let (tx, result) = self.with_transaction(&f).await?;
2693                match result {
2694                    Ok(result) => {
2695                        match tx.commit().await.map_err(Into::into) {
2696                            Ok(()) => return Ok(result),
2697                            Err(error) => {
2698                                if is_serialization_error(&error) {
2699                                    // Retry (don't break the loop)
2700                                } else {
2701                                    return Err(error);
2702                                }
2703                            }
2704                        }
2705                    }
2706                    Err(error) => {
2707                        tx.rollback().await?;
2708                        if is_serialization_error(&error) {
2709                            // Retry (don't break the loop)
2710                        } else {
2711                            return Err(error);
2712                        }
2713                    }
2714                }
2715            }
2716        };
2717
2718        self.run(body).await
2719    }
2720
2721    async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
2722    where
2723        F: Send + Fn(TransactionHandle) -> Fut,
2724        Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
2725    {
2726        let body = async {
2727            loop {
2728                let (tx, result) = self.with_transaction(&f).await?;
2729                match result {
2730                    Ok(Some((room_id, data))) => {
2731                        let lock = self.rooms.entry(room_id).or_default().clone();
2732                        let _guard = lock.lock_owned().await;
2733                        match tx.commit().await.map_err(Into::into) {
2734                            Ok(()) => {
2735                                return Ok(Some(RoomGuard {
2736                                    data,
2737                                    _guard,
2738                                    _not_send: PhantomData,
2739                                }));
2740                            }
2741                            Err(error) => {
2742                                if is_serialization_error(&error) {
2743                                    // Retry (don't break the loop)
2744                                } else {
2745                                    return Err(error);
2746                                }
2747                            }
2748                        }
2749                    }
2750                    Ok(None) => {
2751                        match tx.commit().await.map_err(Into::into) {
2752                            Ok(()) => return Ok(None),
2753                            Err(error) => {
2754                                if is_serialization_error(&error) {
2755                                    // Retry (don't break the loop)
2756                                } else {
2757                                    return Err(error);
2758                                }
2759                            }
2760                        }
2761                    }
2762                    Err(error) => {
2763                        tx.rollback().await?;
2764                        if is_serialization_error(&error) {
2765                            // Retry (don't break the loop)
2766                        } else {
2767                            return Err(error);
2768                        }
2769                    }
2770                }
2771            }
2772        };
2773
2774        self.run(body).await
2775    }
2776
2777    async fn room_transaction<F, Fut, T>(&self, f: F) -> Result<RoomGuard<T>>
2778    where
2779        F: Send + Fn(TransactionHandle) -> Fut,
2780        Fut: Send + Future<Output = Result<(RoomId, T)>>,
2781    {
2782        let data = self
2783            .optional_room_transaction(move |tx| {
2784                let future = f(tx);
2785                async {
2786                    let data = future.await?;
2787                    Ok(Some(data))
2788                }
2789            })
2790            .await?;
2791        Ok(data.unwrap())
2792    }
2793
2794    async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
2795    where
2796        F: Send + Fn(TransactionHandle) -> Fut,
2797        Fut: Send + Future<Output = Result<T>>,
2798    {
2799        let tx = self
2800            .pool
2801            .begin_with_config(Some(IsolationLevel::Serializable), None)
2802            .await?;
2803
2804        let mut tx = Arc::new(Some(tx));
2805        let result = f(TransactionHandle(tx.clone())).await;
2806        let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
2807            return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
2808        };
2809
2810        Ok((tx, result))
2811    }
2812
2813    async fn run<F, T>(&self, future: F) -> T
2814    where
2815        F: Future<Output = T>,
2816    {
2817        #[cfg(test)]
2818        {
2819            if let Some(background) = self.background.as_ref() {
2820                background.simulate_random_delay().await;
2821            }
2822
2823            self.runtime.as_ref().unwrap().block_on(future)
2824        }
2825
2826        #[cfg(not(test))]
2827        {
2828            future.await
2829        }
2830    }
2831}
2832
2833fn is_serialization_error(error: &Error) -> bool {
2834    const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
2835    match error {
2836        Error::Database(
2837            DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
2838            | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
2839        ) if error
2840            .as_database_error()
2841            .and_then(|error| error.code())
2842            .as_deref()
2843            == Some(SERIALIZATION_FAILURE_CODE) =>
2844        {
2845            true
2846        }
2847        _ => false,
2848    }
2849}
2850
2851struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
2852
2853impl Deref for TransactionHandle {
2854    type Target = DatabaseTransaction;
2855
2856    fn deref(&self) -> &Self::Target {
2857        self.0.as_ref().as_ref().unwrap()
2858    }
2859}
2860
2861pub struct RoomGuard<T> {
2862    data: T,
2863    _guard: OwnedMutexGuard<()>,
2864    _not_send: PhantomData<Rc<()>>,
2865}
2866
2867impl<T> Deref for RoomGuard<T> {
2868    type Target = T;
2869
2870    fn deref(&self) -> &T {
2871        &self.data
2872    }
2873}
2874
2875impl<T> DerefMut for RoomGuard<T> {
2876    fn deref_mut(&mut self) -> &mut T {
2877        &mut self.data
2878    }
2879}
2880
2881#[derive(Debug, Serialize, Deserialize)]
2882pub struct NewUserParams {
2883    pub github_login: String,
2884    pub github_user_id: i32,
2885    pub invite_count: i32,
2886}
2887
2888#[derive(Debug)]
2889pub struct NewUserResult {
2890    pub user_id: UserId,
2891    pub metrics_id: String,
2892    pub inviting_user_id: Option<UserId>,
2893    pub signup_device_id: Option<String>,
2894}
2895
2896fn random_invite_code() -> String {
2897    nanoid::nanoid!(16)
2898}
2899
2900fn random_email_confirmation_code() -> String {
2901    nanoid::nanoid!(64)
2902}
2903
2904macro_rules! id_type {
2905    ($name:ident) => {
2906        #[derive(
2907            Clone,
2908            Copy,
2909            Debug,
2910            Default,
2911            PartialEq,
2912            Eq,
2913            PartialOrd,
2914            Ord,
2915            Hash,
2916            Serialize,
2917            Deserialize,
2918        )]
2919        #[serde(transparent)]
2920        pub struct $name(pub i32);
2921
2922        impl $name {
2923            #[allow(unused)]
2924            pub const MAX: Self = Self(i32::MAX);
2925
2926            #[allow(unused)]
2927            pub fn from_proto(value: u64) -> Self {
2928                Self(value as i32)
2929            }
2930
2931            #[allow(unused)]
2932            pub fn to_proto(self) -> u64 {
2933                self.0 as u64
2934            }
2935        }
2936
2937        impl std::fmt::Display for $name {
2938            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2939                self.0.fmt(f)
2940            }
2941        }
2942
2943        impl From<$name> for sea_query::Value {
2944            fn from(value: $name) -> Self {
2945                sea_query::Value::Int(Some(value.0))
2946            }
2947        }
2948
2949        impl sea_orm::TryGetable for $name {
2950            fn try_get(
2951                res: &sea_orm::QueryResult,
2952                pre: &str,
2953                col: &str,
2954            ) -> Result<Self, sea_orm::TryGetError> {
2955                Ok(Self(i32::try_get(res, pre, col)?))
2956            }
2957        }
2958
2959        impl sea_query::ValueType for $name {
2960            fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
2961                match v {
2962                    Value::TinyInt(Some(int)) => {
2963                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2964                    }
2965                    Value::SmallInt(Some(int)) => {
2966                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2967                    }
2968                    Value::Int(Some(int)) => {
2969                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2970                    }
2971                    Value::BigInt(Some(int)) => {
2972                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2973                    }
2974                    Value::TinyUnsigned(Some(int)) => {
2975                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2976                    }
2977                    Value::SmallUnsigned(Some(int)) => {
2978                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2979                    }
2980                    Value::Unsigned(Some(int)) => {
2981                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2982                    }
2983                    Value::BigUnsigned(Some(int)) => {
2984                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2985                    }
2986                    _ => Err(sea_query::ValueTypeErr),
2987                }
2988            }
2989
2990            fn type_name() -> String {
2991                stringify!($name).into()
2992            }
2993
2994            fn array_type() -> sea_query::ArrayType {
2995                sea_query::ArrayType::Int
2996            }
2997
2998            fn column_type() -> sea_query::ColumnType {
2999                sea_query::ColumnType::Integer(None)
3000            }
3001        }
3002
3003        impl sea_orm::TryFromU64 for $name {
3004            fn try_from_u64(n: u64) -> Result<Self, DbErr> {
3005                Ok(Self(n.try_into().map_err(|_| {
3006                    DbErr::ConvertFromU64(concat!(
3007                        "error converting ",
3008                        stringify!($name),
3009                        " to u64"
3010                    ))
3011                })?))
3012            }
3013        }
3014
3015        impl sea_query::Nullable for $name {
3016            fn null() -> Value {
3017                Value::Int(None)
3018            }
3019        }
3020    };
3021}
3022
3023id_type!(AccessTokenId);
3024id_type!(ContactId);
3025id_type!(RoomId);
3026id_type!(RoomParticipantId);
3027id_type!(ProjectId);
3028id_type!(ProjectCollaboratorId);
3029id_type!(ReplicaId);
3030id_type!(ServerId);
3031id_type!(SignupId);
3032id_type!(UserId);
3033
3034pub struct RejoinedRoom {
3035    pub room: proto::Room,
3036    pub rejoined_projects: Vec<RejoinedProject>,
3037    pub reshared_projects: Vec<ResharedProject>,
3038}
3039
3040pub struct ResharedProject {
3041    pub id: ProjectId,
3042    pub old_connection_id: ConnectionId,
3043    pub collaborators: Vec<ProjectCollaborator>,
3044    pub worktrees: Vec<proto::WorktreeMetadata>,
3045}
3046
3047pub struct RejoinedProject {
3048    pub id: ProjectId,
3049    pub old_connection_id: ConnectionId,
3050    pub collaborators: Vec<ProjectCollaborator>,
3051    pub worktrees: Vec<RejoinedWorktree>,
3052    pub language_servers: Vec<proto::LanguageServer>,
3053}
3054
3055#[derive(Debug)]
3056pub struct RejoinedWorktree {
3057    pub id: u64,
3058    pub abs_path: String,
3059    pub root_name: String,
3060    pub visible: bool,
3061    pub updated_entries: Vec<proto::Entry>,
3062    pub removed_entries: Vec<u64>,
3063    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3064    pub scan_id: u64,
3065    pub is_complete: bool,
3066}
3067
3068pub struct LeftRoom {
3069    pub room: proto::Room,
3070    pub left_projects: HashMap<ProjectId, LeftProject>,
3071    pub canceled_calls_to_user_ids: Vec<UserId>,
3072}
3073
3074pub struct RefreshedRoom {
3075    pub room: proto::Room,
3076    pub stale_participant_user_ids: Vec<UserId>,
3077    pub canceled_calls_to_user_ids: Vec<UserId>,
3078}
3079
3080pub struct Project {
3081    pub collaborators: Vec<ProjectCollaborator>,
3082    pub worktrees: BTreeMap<u64, Worktree>,
3083    pub language_servers: Vec<proto::LanguageServer>,
3084}
3085
3086pub struct ProjectCollaborator {
3087    pub connection_id: ConnectionId,
3088    pub user_id: UserId,
3089    pub replica_id: ReplicaId,
3090    pub is_host: bool,
3091}
3092
3093impl ProjectCollaborator {
3094    pub fn to_proto(&self) -> proto::Collaborator {
3095        proto::Collaborator {
3096            peer_id: Some(self.connection_id.into()),
3097            replica_id: self.replica_id.0 as u32,
3098            user_id: self.user_id.to_proto(),
3099        }
3100    }
3101}
3102
3103#[derive(Debug)]
3104pub struct LeftProject {
3105    pub id: ProjectId,
3106    pub host_user_id: UserId,
3107    pub host_connection_id: ConnectionId,
3108    pub connection_ids: Vec<ConnectionId>,
3109}
3110
3111pub struct Worktree {
3112    pub id: u64,
3113    pub abs_path: String,
3114    pub root_name: String,
3115    pub visible: bool,
3116    pub entries: Vec<proto::Entry>,
3117    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3118    pub scan_id: u64,
3119    pub is_complete: bool,
3120}
3121
3122#[cfg(test)]
3123pub use test::*;
3124
3125#[cfg(test)]
3126mod test {
3127    use super::*;
3128    use gpui::executor::Background;
3129    use lazy_static::lazy_static;
3130    use parking_lot::Mutex;
3131    use rand::prelude::*;
3132    use sea_orm::ConnectionTrait;
3133    use sqlx::migrate::MigrateDatabase;
3134    use std::sync::Arc;
3135
3136    pub struct TestDb {
3137        pub db: Option<Arc<Database>>,
3138        pub connection: Option<sqlx::AnyConnection>,
3139    }
3140
3141    impl TestDb {
3142        pub fn sqlite(background: Arc<Background>) -> Self {
3143            let url = format!("sqlite::memory:");
3144            let runtime = tokio::runtime::Builder::new_current_thread()
3145                .enable_io()
3146                .enable_time()
3147                .build()
3148                .unwrap();
3149
3150            let mut db = runtime.block_on(async {
3151                let mut options = ConnectOptions::new(url);
3152                options.max_connections(5);
3153                let db = Database::new(options).await.unwrap();
3154                let sql = include_str!(concat!(
3155                    env!("CARGO_MANIFEST_DIR"),
3156                    "/migrations.sqlite/20221109000000_test_schema.sql"
3157                ));
3158                db.pool
3159                    .execute(sea_orm::Statement::from_string(
3160                        db.pool.get_database_backend(),
3161                        sql.into(),
3162                    ))
3163                    .await
3164                    .unwrap();
3165                db
3166            });
3167
3168            db.background = Some(background);
3169            db.runtime = Some(runtime);
3170
3171            Self {
3172                db: Some(Arc::new(db)),
3173                connection: None,
3174            }
3175        }
3176
3177        pub fn postgres(background: Arc<Background>) -> Self {
3178            lazy_static! {
3179                static ref LOCK: Mutex<()> = Mutex::new(());
3180            }
3181
3182            let _guard = LOCK.lock();
3183            let mut rng = StdRng::from_entropy();
3184            let url = format!(
3185                "postgres://postgres@localhost/zed-test-{}",
3186                rng.gen::<u128>()
3187            );
3188            let runtime = tokio::runtime::Builder::new_current_thread()
3189                .enable_io()
3190                .enable_time()
3191                .build()
3192                .unwrap();
3193
3194            let mut db = runtime.block_on(async {
3195                sqlx::Postgres::create_database(&url)
3196                    .await
3197                    .expect("failed to create test db");
3198                let mut options = ConnectOptions::new(url);
3199                options
3200                    .max_connections(5)
3201                    .idle_timeout(Duration::from_secs(0));
3202                let db = Database::new(options).await.unwrap();
3203                let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
3204                db.migrate(Path::new(migrations_path), false).await.unwrap();
3205                db
3206            });
3207
3208            db.background = Some(background);
3209            db.runtime = Some(runtime);
3210
3211            Self {
3212                db: Some(Arc::new(db)),
3213                connection: None,
3214            }
3215        }
3216
3217        pub fn db(&self) -> &Arc<Database> {
3218            self.db.as_ref().unwrap()
3219        }
3220    }
3221
3222    impl Drop for TestDb {
3223        fn drop(&mut self) {
3224            let db = self.db.take().unwrap();
3225            if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
3226                db.runtime.as_ref().unwrap().block_on(async {
3227                    use util::ResultExt;
3228                    let query = "
3229                        SELECT pg_terminate_backend(pg_stat_activity.pid)
3230                        FROM pg_stat_activity
3231                        WHERE
3232                            pg_stat_activity.datname = current_database() AND
3233                            pid <> pg_backend_pid();
3234                    ";
3235                    db.pool
3236                        .execute(sea_orm::Statement::from_string(
3237                            db.pool.get_database_backend(),
3238                            query.into(),
3239                        ))
3240                        .await
3241                        .log_err();
3242                    sqlx::Postgres::drop_database(db.options.get_url())
3243                        .await
3244                        .log_err();
3245                })
3246            }
3247        }
3248    }
3249}