db.rs

   1mod access_token;
   2mod contact;
   3mod follower;
   4mod language_server;
   5mod project;
   6mod project_collaborator;
   7mod room;
   8mod room_participant;
   9mod server;
  10mod signup;
  11#[cfg(test)]
  12mod tests;
  13mod user;
  14mod worktree;
  15mod worktree_diagnostic_summary;
  16mod worktree_entry;
  17mod worktree_repository;
  18mod worktree_repository_statuses;
  19mod worktree_settings_file;
  20
  21use crate::executor::Executor;
  22use crate::{Error, Result};
  23use anyhow::anyhow;
  24use collections::{BTreeMap, HashMap, HashSet};
  25pub use contact::Contact;
  26use dashmap::DashMap;
  27use futures::StreamExt;
  28use hyper::StatusCode;
  29use rand::prelude::StdRng;
  30use rand::{Rng, SeedableRng};
  31use rpc::{proto, ConnectionId};
  32use sea_orm::Condition;
  33pub use sea_orm::ConnectOptions;
  34use sea_orm::{
  35    entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
  36    DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
  37    Statement, TransactionTrait,
  38};
  39use sea_query::{Alias, Expr, OnConflict, Query};
  40use serde::{Deserialize, Serialize};
  41pub use signup::{Invite, NewSignup, WaitlistSummary};
  42use sqlx::migrate::{Migrate, Migration, MigrationSource};
  43use sqlx::Connection;
  44use std::ops::{Deref, DerefMut};
  45use std::path::Path;
  46use std::time::Duration;
  47use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
  48use tokio::sync::{Mutex, OwnedMutexGuard};
  49pub use user::Model as User;
  50
  51pub struct Database {
  52    options: ConnectOptions,
  53    pool: DatabaseConnection,
  54    rooms: DashMap<RoomId, Arc<Mutex<()>>>,
  55    rng: Mutex<StdRng>,
  56    executor: Executor,
  57    #[cfg(test)]
  58    runtime: Option<tokio::runtime::Runtime>,
  59}
  60
  61impl Database {
  62    pub async fn new(options: ConnectOptions, executor: Executor) -> Result<Self> {
  63        Ok(Self {
  64            options: options.clone(),
  65            pool: sea_orm::Database::connect(options).await?,
  66            rooms: DashMap::with_capacity(16384),
  67            rng: Mutex::new(StdRng::seed_from_u64(0)),
  68            executor,
  69            #[cfg(test)]
  70            runtime: None,
  71        })
  72    }
  73
  74    #[cfg(test)]
  75    pub fn reset(&self) {
  76        self.rooms.clear();
  77    }
  78
  79    pub async fn migrate(
  80        &self,
  81        migrations_path: &Path,
  82        ignore_checksum_mismatch: bool,
  83    ) -> anyhow::Result<Vec<(Migration, Duration)>> {
  84        let migrations = MigrationSource::resolve(migrations_path)
  85            .await
  86            .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
  87
  88        let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
  89
  90        connection.ensure_migrations_table().await?;
  91        let applied_migrations: HashMap<_, _> = connection
  92            .list_applied_migrations()
  93            .await?
  94            .into_iter()
  95            .map(|m| (m.version, m))
  96            .collect();
  97
  98        let mut new_migrations = Vec::new();
  99        for migration in migrations {
 100            match applied_migrations.get(&migration.version) {
 101                Some(applied_migration) => {
 102                    if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
 103                    {
 104                        Err(anyhow!(
 105                            "checksum mismatch for applied migration {}",
 106                            migration.description
 107                        ))?;
 108                    }
 109                }
 110                None => {
 111                    let elapsed = connection.apply(&migration).await?;
 112                    new_migrations.push((migration, elapsed));
 113                }
 114            }
 115        }
 116
 117        Ok(new_migrations)
 118    }
 119
 120    pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
 121        self.transaction(|tx| async move {
 122            let server = server::ActiveModel {
 123                environment: ActiveValue::set(environment.into()),
 124                ..Default::default()
 125            }
 126            .insert(&*tx)
 127            .await?;
 128            Ok(server.id)
 129        })
 130        .await
 131    }
 132
 133    pub async fn stale_room_ids(
 134        &self,
 135        environment: &str,
 136        new_server_id: ServerId,
 137    ) -> Result<Vec<RoomId>> {
 138        self.transaction(|tx| async move {
 139            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 140            enum QueryAs {
 141                RoomId,
 142            }
 143
 144            let stale_server_epochs = self
 145                .stale_server_ids(environment, new_server_id, &tx)
 146                .await?;
 147            Ok(room_participant::Entity::find()
 148                .select_only()
 149                .column(room_participant::Column::RoomId)
 150                .distinct()
 151                .filter(
 152                    room_participant::Column::AnsweringConnectionServerId
 153                        .is_in(stale_server_epochs),
 154                )
 155                .into_values::<_, QueryAs>()
 156                .all(&*tx)
 157                .await?)
 158        })
 159        .await
 160    }
 161
 162    pub async fn refresh_room(
 163        &self,
 164        room_id: RoomId,
 165        new_server_id: ServerId,
 166    ) -> Result<RoomGuard<RefreshedRoom>> {
 167        self.room_transaction(room_id, |tx| async move {
 168            let stale_participant_filter = Condition::all()
 169                .add(room_participant::Column::RoomId.eq(room_id))
 170                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
 171                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
 172
 173            let stale_participant_user_ids = room_participant::Entity::find()
 174                .filter(stale_participant_filter.clone())
 175                .all(&*tx)
 176                .await?
 177                .into_iter()
 178                .map(|participant| participant.user_id)
 179                .collect::<Vec<_>>();
 180
 181            // Delete participants who failed to reconnect and cancel their calls.
 182            let mut canceled_calls_to_user_ids = Vec::new();
 183            room_participant::Entity::delete_many()
 184                .filter(stale_participant_filter)
 185                .exec(&*tx)
 186                .await?;
 187            let called_participants = room_participant::Entity::find()
 188                .filter(
 189                    Condition::all()
 190                        .add(
 191                            room_participant::Column::CallingUserId
 192                                .is_in(stale_participant_user_ids.iter().copied()),
 193                        )
 194                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
 195                )
 196                .all(&*tx)
 197                .await?;
 198            room_participant::Entity::delete_many()
 199                .filter(
 200                    room_participant::Column::Id
 201                        .is_in(called_participants.iter().map(|participant| participant.id)),
 202                )
 203                .exec(&*tx)
 204                .await?;
 205            canceled_calls_to_user_ids.extend(
 206                called_participants
 207                    .into_iter()
 208                    .map(|participant| participant.user_id),
 209            );
 210
 211            let room = self.get_room(room_id, &tx).await?;
 212            // Delete the room if it becomes empty.
 213            if room.participants.is_empty() {
 214                project::Entity::delete_many()
 215                    .filter(project::Column::RoomId.eq(room_id))
 216                    .exec(&*tx)
 217                    .await?;
 218                room::Entity::delete_by_id(room_id).exec(&*tx).await?;
 219            }
 220
 221            Ok(RefreshedRoom {
 222                room,
 223                stale_participant_user_ids,
 224                canceled_calls_to_user_ids,
 225            })
 226        })
 227        .await
 228    }
 229
 230    pub async fn delete_stale_servers(
 231        &self,
 232        environment: &str,
 233        new_server_id: ServerId,
 234    ) -> Result<()> {
 235        self.transaction(|tx| async move {
 236            server::Entity::delete_many()
 237                .filter(
 238                    Condition::all()
 239                        .add(server::Column::Environment.eq(environment))
 240                        .add(server::Column::Id.ne(new_server_id)),
 241                )
 242                .exec(&*tx)
 243                .await?;
 244            Ok(())
 245        })
 246        .await
 247    }
 248
 249    async fn stale_server_ids(
 250        &self,
 251        environment: &str,
 252        new_server_id: ServerId,
 253        tx: &DatabaseTransaction,
 254    ) -> Result<Vec<ServerId>> {
 255        let stale_servers = server::Entity::find()
 256            .filter(
 257                Condition::all()
 258                    .add(server::Column::Environment.eq(environment))
 259                    .add(server::Column::Id.ne(new_server_id)),
 260            )
 261            .all(&*tx)
 262            .await?;
 263        Ok(stale_servers.into_iter().map(|server| server.id).collect())
 264    }
 265
 266    // users
 267
 268    pub async fn create_user(
 269        &self,
 270        email_address: &str,
 271        admin: bool,
 272        params: NewUserParams,
 273    ) -> Result<NewUserResult> {
 274        self.transaction(|tx| async {
 275            let tx = tx;
 276            let user = user::Entity::insert(user::ActiveModel {
 277                email_address: ActiveValue::set(Some(email_address.into())),
 278                github_login: ActiveValue::set(params.github_login.clone()),
 279                github_user_id: ActiveValue::set(Some(params.github_user_id)),
 280                admin: ActiveValue::set(admin),
 281                metrics_id: ActiveValue::set(Uuid::new_v4()),
 282                ..Default::default()
 283            })
 284            .on_conflict(
 285                OnConflict::column(user::Column::GithubLogin)
 286                    .update_column(user::Column::GithubLogin)
 287                    .to_owned(),
 288            )
 289            .exec_with_returning(&*tx)
 290            .await?;
 291
 292            Ok(NewUserResult {
 293                user_id: user.id,
 294                metrics_id: user.metrics_id.to_string(),
 295                signup_device_id: None,
 296                inviting_user_id: None,
 297            })
 298        })
 299        .await
 300    }
 301
 302    pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
 303        self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
 304            .await
 305    }
 306
 307    pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
 308        self.transaction(|tx| async {
 309            let tx = tx;
 310            Ok(user::Entity::find()
 311                .filter(user::Column::Id.is_in(ids.iter().copied()))
 312                .all(&*tx)
 313                .await?)
 314        })
 315        .await
 316    }
 317
 318    pub async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>> {
 319        self.transaction(|tx| async move {
 320            Ok(user::Entity::find()
 321                .filter(user::Column::GithubLogin.eq(github_login))
 322                .one(&*tx)
 323                .await?)
 324        })
 325        .await
 326    }
 327
 328    pub async fn get_or_create_user_by_github_account(
 329        &self,
 330        github_login: &str,
 331        github_user_id: Option<i32>,
 332        github_email: Option<&str>,
 333    ) -> Result<Option<User>> {
 334        self.transaction(|tx| async move {
 335            let tx = &*tx;
 336            if let Some(github_user_id) = github_user_id {
 337                if let Some(user_by_github_user_id) = user::Entity::find()
 338                    .filter(user::Column::GithubUserId.eq(github_user_id))
 339                    .one(tx)
 340                    .await?
 341                {
 342                    let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
 343                    user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
 344                    Ok(Some(user_by_github_user_id.update(tx).await?))
 345                } else if let Some(user_by_github_login) = user::Entity::find()
 346                    .filter(user::Column::GithubLogin.eq(github_login))
 347                    .one(tx)
 348                    .await?
 349                {
 350                    let mut user_by_github_login = user_by_github_login.into_active_model();
 351                    user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
 352                    Ok(Some(user_by_github_login.update(tx).await?))
 353                } else {
 354                    let user = user::Entity::insert(user::ActiveModel {
 355                        email_address: ActiveValue::set(github_email.map(|email| email.into())),
 356                        github_login: ActiveValue::set(github_login.into()),
 357                        github_user_id: ActiveValue::set(Some(github_user_id)),
 358                        admin: ActiveValue::set(false),
 359                        invite_count: ActiveValue::set(0),
 360                        invite_code: ActiveValue::set(None),
 361                        metrics_id: ActiveValue::set(Uuid::new_v4()),
 362                        ..Default::default()
 363                    })
 364                    .exec_with_returning(&*tx)
 365                    .await?;
 366                    Ok(Some(user))
 367                }
 368            } else {
 369                Ok(user::Entity::find()
 370                    .filter(user::Column::GithubLogin.eq(github_login))
 371                    .one(tx)
 372                    .await?)
 373            }
 374        })
 375        .await
 376    }
 377
 378    pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
 379        self.transaction(|tx| async move {
 380            Ok(user::Entity::find()
 381                .order_by_asc(user::Column::GithubLogin)
 382                .limit(limit as u64)
 383                .offset(page as u64 * limit as u64)
 384                .all(&*tx)
 385                .await?)
 386        })
 387        .await
 388    }
 389
 390    pub async fn get_users_with_no_invites(
 391        &self,
 392        invited_by_another_user: bool,
 393    ) -> Result<Vec<User>> {
 394        self.transaction(|tx| async move {
 395            Ok(user::Entity::find()
 396                .filter(
 397                    user::Column::InviteCount
 398                        .eq(0)
 399                        .and(if invited_by_another_user {
 400                            user::Column::InviterId.is_not_null()
 401                        } else {
 402                            user::Column::InviterId.is_null()
 403                        }),
 404                )
 405                .all(&*tx)
 406                .await?)
 407        })
 408        .await
 409    }
 410
 411    pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
 412        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 413        enum QueryAs {
 414            MetricsId,
 415        }
 416
 417        self.transaction(|tx| async move {
 418            let metrics_id: Uuid = user::Entity::find_by_id(id)
 419                .select_only()
 420                .column(user::Column::MetricsId)
 421                .into_values::<_, QueryAs>()
 422                .one(&*tx)
 423                .await?
 424                .ok_or_else(|| anyhow!("could not find user"))?;
 425            Ok(metrics_id.to_string())
 426        })
 427        .await
 428    }
 429
 430    pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
 431        self.transaction(|tx| async move {
 432            user::Entity::update_many()
 433                .filter(user::Column::Id.eq(id))
 434                .set(user::ActiveModel {
 435                    admin: ActiveValue::set(is_admin),
 436                    ..Default::default()
 437                })
 438                .exec(&*tx)
 439                .await?;
 440            Ok(())
 441        })
 442        .await
 443    }
 444
 445    pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
 446        self.transaction(|tx| async move {
 447            user::Entity::update_many()
 448                .filter(user::Column::Id.eq(id))
 449                .set(user::ActiveModel {
 450                    connected_once: ActiveValue::set(connected_once),
 451                    ..Default::default()
 452                })
 453                .exec(&*tx)
 454                .await?;
 455            Ok(())
 456        })
 457        .await
 458    }
 459
 460    pub async fn destroy_user(&self, id: UserId) -> Result<()> {
 461        self.transaction(|tx| async move {
 462            access_token::Entity::delete_many()
 463                .filter(access_token::Column::UserId.eq(id))
 464                .exec(&*tx)
 465                .await?;
 466            user::Entity::delete_by_id(id).exec(&*tx).await?;
 467            Ok(())
 468        })
 469        .await
 470    }
 471
 472    // contacts
 473
 474    pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
 475        #[derive(Debug, FromQueryResult)]
 476        struct ContactWithUserBusyStatuses {
 477            user_id_a: UserId,
 478            user_id_b: UserId,
 479            a_to_b: bool,
 480            accepted: bool,
 481            should_notify: bool,
 482            user_a_busy: bool,
 483            user_b_busy: bool,
 484        }
 485
 486        self.transaction(|tx| async move {
 487            let user_a_participant = Alias::new("user_a_participant");
 488            let user_b_participant = Alias::new("user_b_participant");
 489            let mut db_contacts = contact::Entity::find()
 490                .column_as(
 491                    Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
 492                        .is_not_null(),
 493                    "user_a_busy",
 494                )
 495                .column_as(
 496                    Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
 497                        .is_not_null(),
 498                    "user_b_busy",
 499                )
 500                .filter(
 501                    contact::Column::UserIdA
 502                        .eq(user_id)
 503                        .or(contact::Column::UserIdB.eq(user_id)),
 504                )
 505                .join_as(
 506                    JoinType::LeftJoin,
 507                    contact::Relation::UserARoomParticipant.def(),
 508                    user_a_participant,
 509                )
 510                .join_as(
 511                    JoinType::LeftJoin,
 512                    contact::Relation::UserBRoomParticipant.def(),
 513                    user_b_participant,
 514                )
 515                .into_model::<ContactWithUserBusyStatuses>()
 516                .stream(&*tx)
 517                .await?;
 518
 519            let mut contacts = Vec::new();
 520            while let Some(db_contact) = db_contacts.next().await {
 521                let db_contact = db_contact?;
 522                if db_contact.user_id_a == user_id {
 523                    if db_contact.accepted {
 524                        contacts.push(Contact::Accepted {
 525                            user_id: db_contact.user_id_b,
 526                            should_notify: db_contact.should_notify && db_contact.a_to_b,
 527                            busy: db_contact.user_b_busy,
 528                        });
 529                    } else if db_contact.a_to_b {
 530                        contacts.push(Contact::Outgoing {
 531                            user_id: db_contact.user_id_b,
 532                        })
 533                    } else {
 534                        contacts.push(Contact::Incoming {
 535                            user_id: db_contact.user_id_b,
 536                            should_notify: db_contact.should_notify,
 537                        });
 538                    }
 539                } else if db_contact.accepted {
 540                    contacts.push(Contact::Accepted {
 541                        user_id: db_contact.user_id_a,
 542                        should_notify: db_contact.should_notify && !db_contact.a_to_b,
 543                        busy: db_contact.user_a_busy,
 544                    });
 545                } else if db_contact.a_to_b {
 546                    contacts.push(Contact::Incoming {
 547                        user_id: db_contact.user_id_a,
 548                        should_notify: db_contact.should_notify,
 549                    });
 550                } else {
 551                    contacts.push(Contact::Outgoing {
 552                        user_id: db_contact.user_id_a,
 553                    });
 554                }
 555            }
 556
 557            contacts.sort_unstable_by_key(|contact| contact.user_id());
 558
 559            Ok(contacts)
 560        })
 561        .await
 562    }
 563
 564    pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
 565        self.transaction(|tx| async move {
 566            let participant = room_participant::Entity::find()
 567                .filter(room_participant::Column::UserId.eq(user_id))
 568                .one(&*tx)
 569                .await?;
 570            Ok(participant.is_some())
 571        })
 572        .await
 573    }
 574
 575    pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
 576        self.transaction(|tx| async move {
 577            let (id_a, id_b) = if user_id_1 < user_id_2 {
 578                (user_id_1, user_id_2)
 579            } else {
 580                (user_id_2, user_id_1)
 581            };
 582
 583            Ok(contact::Entity::find()
 584                .filter(
 585                    contact::Column::UserIdA
 586                        .eq(id_a)
 587                        .and(contact::Column::UserIdB.eq(id_b))
 588                        .and(contact::Column::Accepted.eq(true)),
 589                )
 590                .one(&*tx)
 591                .await?
 592                .is_some())
 593        })
 594        .await
 595    }
 596
 597    pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
 598        self.transaction(|tx| async move {
 599            let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
 600                (sender_id, receiver_id, true)
 601            } else {
 602                (receiver_id, sender_id, false)
 603            };
 604
 605            let rows_affected = contact::Entity::insert(contact::ActiveModel {
 606                user_id_a: ActiveValue::set(id_a),
 607                user_id_b: ActiveValue::set(id_b),
 608                a_to_b: ActiveValue::set(a_to_b),
 609                accepted: ActiveValue::set(false),
 610                should_notify: ActiveValue::set(true),
 611                ..Default::default()
 612            })
 613            .on_conflict(
 614                OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
 615                    .values([
 616                        (contact::Column::Accepted, true.into()),
 617                        (contact::Column::ShouldNotify, false.into()),
 618                    ])
 619                    .action_and_where(
 620                        contact::Column::Accepted.eq(false).and(
 621                            contact::Column::AToB
 622                                .eq(a_to_b)
 623                                .and(contact::Column::UserIdA.eq(id_b))
 624                                .or(contact::Column::AToB
 625                                    .ne(a_to_b)
 626                                    .and(contact::Column::UserIdA.eq(id_a))),
 627                        ),
 628                    )
 629                    .to_owned(),
 630            )
 631            .exec_without_returning(&*tx)
 632            .await?;
 633
 634            if rows_affected == 1 {
 635                Ok(())
 636            } else {
 637                Err(anyhow!("contact already requested"))?
 638            }
 639        })
 640        .await
 641    }
 642
 643    /// Returns a bool indicating whether the removed contact had originally accepted or not
 644    ///
 645    /// Deletes the contact identified by the requester and responder ids, and then returns
 646    /// whether the deleted contact had originally accepted or was a pending contact request.
 647    ///
 648    /// # Arguments
 649    ///
 650    /// * `requester_id` - The user that initiates this request
 651    /// * `responder_id` - The user that will be removed
 652    pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<bool> {
 653        self.transaction(|tx| async move {
 654            let (id_a, id_b) = if responder_id < requester_id {
 655                (responder_id, requester_id)
 656            } else {
 657                (requester_id, responder_id)
 658            };
 659
 660            let contact = contact::Entity::find()
 661                .filter(
 662                    contact::Column::UserIdA
 663                        .eq(id_a)
 664                        .and(contact::Column::UserIdB.eq(id_b)),
 665                )
 666                .one(&*tx)
 667                .await?
 668                .ok_or_else(|| anyhow!("no such contact"))?;
 669
 670            contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
 671            Ok(contact.accepted)
 672        })
 673        .await
 674    }
 675
 676    pub async fn dismiss_contact_notification(
 677        &self,
 678        user_id: UserId,
 679        contact_user_id: UserId,
 680    ) -> Result<()> {
 681        self.transaction(|tx| async move {
 682            let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
 683                (user_id, contact_user_id, true)
 684            } else {
 685                (contact_user_id, user_id, false)
 686            };
 687
 688            let result = contact::Entity::update_many()
 689                .set(contact::ActiveModel {
 690                    should_notify: ActiveValue::set(false),
 691                    ..Default::default()
 692                })
 693                .filter(
 694                    contact::Column::UserIdA
 695                        .eq(id_a)
 696                        .and(contact::Column::UserIdB.eq(id_b))
 697                        .and(
 698                            contact::Column::AToB
 699                                .eq(a_to_b)
 700                                .and(contact::Column::Accepted.eq(true))
 701                                .or(contact::Column::AToB
 702                                    .ne(a_to_b)
 703                                    .and(contact::Column::Accepted.eq(false))),
 704                        ),
 705                )
 706                .exec(&*tx)
 707                .await?;
 708            if result.rows_affected == 0 {
 709                Err(anyhow!("no such contact request"))?
 710            } else {
 711                Ok(())
 712            }
 713        })
 714        .await
 715    }
 716
 717    pub async fn respond_to_contact_request(
 718        &self,
 719        responder_id: UserId,
 720        requester_id: UserId,
 721        accept: bool,
 722    ) -> Result<()> {
 723        self.transaction(|tx| async move {
 724            let (id_a, id_b, a_to_b) = if responder_id < requester_id {
 725                (responder_id, requester_id, false)
 726            } else {
 727                (requester_id, responder_id, true)
 728            };
 729            let rows_affected = if accept {
 730                let result = contact::Entity::update_many()
 731                    .set(contact::ActiveModel {
 732                        accepted: ActiveValue::set(true),
 733                        should_notify: ActiveValue::set(true),
 734                        ..Default::default()
 735                    })
 736                    .filter(
 737                        contact::Column::UserIdA
 738                            .eq(id_a)
 739                            .and(contact::Column::UserIdB.eq(id_b))
 740                            .and(contact::Column::AToB.eq(a_to_b)),
 741                    )
 742                    .exec(&*tx)
 743                    .await?;
 744                result.rows_affected
 745            } else {
 746                let result = contact::Entity::delete_many()
 747                    .filter(
 748                        contact::Column::UserIdA
 749                            .eq(id_a)
 750                            .and(contact::Column::UserIdB.eq(id_b))
 751                            .and(contact::Column::AToB.eq(a_to_b))
 752                            .and(contact::Column::Accepted.eq(false)),
 753                    )
 754                    .exec(&*tx)
 755                    .await?;
 756
 757                result.rows_affected
 758            };
 759
 760            if rows_affected == 1 {
 761                Ok(())
 762            } else {
 763                Err(anyhow!("no such contact request"))?
 764            }
 765        })
 766        .await
 767    }
 768
 769    pub fn fuzzy_like_string(string: &str) -> String {
 770        let mut result = String::with_capacity(string.len() * 2 + 1);
 771        for c in string.chars() {
 772            if c.is_alphanumeric() {
 773                result.push('%');
 774                result.push(c);
 775            }
 776        }
 777        result.push('%');
 778        result
 779    }
 780
 781    pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
 782        self.transaction(|tx| async {
 783            let tx = tx;
 784            let like_string = Self::fuzzy_like_string(name_query);
 785            let query = "
 786                SELECT users.*
 787                FROM users
 788                WHERE github_login ILIKE $1
 789                ORDER BY github_login <-> $2
 790                LIMIT $3
 791            ";
 792
 793            Ok(user::Entity::find()
 794                .from_raw_sql(Statement::from_sql_and_values(
 795                    self.pool.get_database_backend(),
 796                    query.into(),
 797                    vec![like_string.into(), name_query.into(), limit.into()],
 798                ))
 799                .all(&*tx)
 800                .await?)
 801        })
 802        .await
 803    }
 804
 805    // signups
 806
 807    pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
 808        self.transaction(|tx| async move {
 809            signup::Entity::insert(signup::ActiveModel {
 810                email_address: ActiveValue::set(signup.email_address.clone()),
 811                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 812                email_confirmation_sent: ActiveValue::set(false),
 813                platform_mac: ActiveValue::set(signup.platform_mac),
 814                platform_windows: ActiveValue::set(signup.platform_windows),
 815                platform_linux: ActiveValue::set(signup.platform_linux),
 816                platform_unknown: ActiveValue::set(false),
 817                editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
 818                programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
 819                device_id: ActiveValue::set(signup.device_id.clone()),
 820                added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
 821                ..Default::default()
 822            })
 823            .on_conflict(
 824                OnConflict::column(signup::Column::EmailAddress)
 825                    .update_columns([
 826                        signup::Column::PlatformMac,
 827                        signup::Column::PlatformWindows,
 828                        signup::Column::PlatformLinux,
 829                        signup::Column::EditorFeatures,
 830                        signup::Column::ProgrammingLanguages,
 831                        signup::Column::DeviceId,
 832                        signup::Column::AddedToMailingList,
 833                    ])
 834                    .to_owned(),
 835            )
 836            .exec(&*tx)
 837            .await?;
 838            Ok(())
 839        })
 840        .await
 841    }
 842
 843    pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
 844        self.transaction(|tx| async move {
 845            let signup = signup::Entity::find()
 846                .filter(signup::Column::EmailAddress.eq(email_address))
 847                .one(&*tx)
 848                .await?
 849                .ok_or_else(|| {
 850                    anyhow!("signup with email address {} doesn't exist", email_address)
 851                })?;
 852
 853            Ok(signup)
 854        })
 855        .await
 856    }
 857
 858    pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
 859        self.transaction(|tx| async move {
 860            let query = "
 861                SELECT
 862                    COUNT(*) as count,
 863                    COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
 864                    COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
 865                    COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
 866                    COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
 867                FROM (
 868                    SELECT *
 869                    FROM signups
 870                    WHERE
 871                        NOT email_confirmation_sent
 872                ) AS unsent
 873            ";
 874            Ok(
 875                WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
 876                    self.pool.get_database_backend(),
 877                    query.into(),
 878                    vec![],
 879                ))
 880                .one(&*tx)
 881                .await?
 882                .ok_or_else(|| anyhow!("invalid result"))?,
 883            )
 884        })
 885        .await
 886    }
 887
 888    pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
 889        let emails = invites
 890            .iter()
 891            .map(|s| s.email_address.as_str())
 892            .collect::<Vec<_>>();
 893        self.transaction(|tx| async {
 894            let tx = tx;
 895            signup::Entity::update_many()
 896                .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
 897                .set(signup::ActiveModel {
 898                    email_confirmation_sent: ActiveValue::set(true),
 899                    ..Default::default()
 900                })
 901                .exec(&*tx)
 902                .await?;
 903            Ok(())
 904        })
 905        .await
 906    }
 907
 908    pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
 909        self.transaction(|tx| async move {
 910            Ok(signup::Entity::find()
 911                .select_only()
 912                .column(signup::Column::EmailAddress)
 913                .column(signup::Column::EmailConfirmationCode)
 914                .filter(
 915                    signup::Column::EmailConfirmationSent.eq(false).and(
 916                        signup::Column::PlatformMac
 917                            .eq(true)
 918                            .or(signup::Column::PlatformUnknown.eq(true)),
 919                    ),
 920                )
 921                .order_by_asc(signup::Column::CreatedAt)
 922                .limit(count as u64)
 923                .into_model()
 924                .all(&*tx)
 925                .await?)
 926        })
 927        .await
 928    }
 929
 930    // invite codes
 931
 932    pub async fn create_invite_from_code(
 933        &self,
 934        code: &str,
 935        email_address: &str,
 936        device_id: Option<&str>,
 937        added_to_mailing_list: bool,
 938    ) -> Result<Invite> {
 939        self.transaction(|tx| async move {
 940            let existing_user = user::Entity::find()
 941                .filter(user::Column::EmailAddress.eq(email_address))
 942                .one(&*tx)
 943                .await?;
 944
 945            if existing_user.is_some() {
 946                Err(anyhow!("email address is already in use"))?;
 947            }
 948
 949            let inviting_user_with_invites = match user::Entity::find()
 950                .filter(
 951                    user::Column::InviteCode
 952                        .eq(code)
 953                        .and(user::Column::InviteCount.gt(0)),
 954                )
 955                .one(&*tx)
 956                .await?
 957            {
 958                Some(inviting_user) => inviting_user,
 959                None => {
 960                    return Err(Error::Http(
 961                        StatusCode::UNAUTHORIZED,
 962                        "unable to find an invite code with invites remaining".to_string(),
 963                    ))?
 964                }
 965            };
 966            user::Entity::update_many()
 967                .filter(
 968                    user::Column::Id
 969                        .eq(inviting_user_with_invites.id)
 970                        .and(user::Column::InviteCount.gt(0)),
 971                )
 972                .col_expr(
 973                    user::Column::InviteCount,
 974                    Expr::col(user::Column::InviteCount).sub(1),
 975                )
 976                .exec(&*tx)
 977                .await?;
 978
 979            let signup = signup::Entity::insert(signup::ActiveModel {
 980                email_address: ActiveValue::set(email_address.into()),
 981                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 982                email_confirmation_sent: ActiveValue::set(false),
 983                inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
 984                platform_linux: ActiveValue::set(false),
 985                platform_mac: ActiveValue::set(false),
 986                platform_windows: ActiveValue::set(false),
 987                platform_unknown: ActiveValue::set(true),
 988                device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
 989                added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
 990                ..Default::default()
 991            })
 992            .on_conflict(
 993                OnConflict::column(signup::Column::EmailAddress)
 994                    .update_column(signup::Column::InvitingUserId)
 995                    .to_owned(),
 996            )
 997            .exec_with_returning(&*tx)
 998            .await?;
 999
1000            Ok(Invite {
1001                email_address: signup.email_address,
1002                email_confirmation_code: signup.email_confirmation_code,
1003            })
1004        })
1005        .await
1006    }
1007
1008    pub async fn create_user_from_invite(
1009        &self,
1010        invite: &Invite,
1011        user: NewUserParams,
1012    ) -> Result<Option<NewUserResult>> {
1013        self.transaction(|tx| async {
1014            let tx = tx;
1015            let signup = signup::Entity::find()
1016                .filter(
1017                    signup::Column::EmailAddress
1018                        .eq(invite.email_address.as_str())
1019                        .and(
1020                            signup::Column::EmailConfirmationCode
1021                                .eq(invite.email_confirmation_code.as_str()),
1022                        ),
1023                )
1024                .one(&*tx)
1025                .await?
1026                .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
1027
1028            if signup.user_id.is_some() {
1029                return Ok(None);
1030            }
1031
1032            let user = user::Entity::insert(user::ActiveModel {
1033                email_address: ActiveValue::set(Some(invite.email_address.clone())),
1034                github_login: ActiveValue::set(user.github_login.clone()),
1035                github_user_id: ActiveValue::set(Some(user.github_user_id)),
1036                admin: ActiveValue::set(false),
1037                invite_count: ActiveValue::set(user.invite_count),
1038                invite_code: ActiveValue::set(Some(random_invite_code())),
1039                metrics_id: ActiveValue::set(Uuid::new_v4()),
1040                ..Default::default()
1041            })
1042            .on_conflict(
1043                OnConflict::column(user::Column::GithubLogin)
1044                    .update_columns([
1045                        user::Column::EmailAddress,
1046                        user::Column::GithubUserId,
1047                        user::Column::Admin,
1048                    ])
1049                    .to_owned(),
1050            )
1051            .exec_with_returning(&*tx)
1052            .await?;
1053
1054            let mut signup = signup.into_active_model();
1055            signup.user_id = ActiveValue::set(Some(user.id));
1056            let signup = signup.update(&*tx).await?;
1057
1058            if let Some(inviting_user_id) = signup.inviting_user_id {
1059                let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1060                    (inviting_user_id, user.id, true)
1061                } else {
1062                    (user.id, inviting_user_id, false)
1063                };
1064
1065                contact::Entity::insert(contact::ActiveModel {
1066                    user_id_a: ActiveValue::set(user_id_a),
1067                    user_id_b: ActiveValue::set(user_id_b),
1068                    a_to_b: ActiveValue::set(a_to_b),
1069                    should_notify: ActiveValue::set(true),
1070                    accepted: ActiveValue::set(true),
1071                    ..Default::default()
1072                })
1073                .on_conflict(OnConflict::new().do_nothing().to_owned())
1074                .exec_without_returning(&*tx)
1075                .await?;
1076            }
1077
1078            Ok(Some(NewUserResult {
1079                user_id: user.id,
1080                metrics_id: user.metrics_id.to_string(),
1081                inviting_user_id: signup.inviting_user_id,
1082                signup_device_id: signup.device_id,
1083            }))
1084        })
1085        .await
1086    }
1087
1088    pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1089        self.transaction(|tx| async move {
1090            if count > 0 {
1091                user::Entity::update_many()
1092                    .filter(
1093                        user::Column::Id
1094                            .eq(id)
1095                            .and(user::Column::InviteCode.is_null()),
1096                    )
1097                    .set(user::ActiveModel {
1098                        invite_code: ActiveValue::set(Some(random_invite_code())),
1099                        ..Default::default()
1100                    })
1101                    .exec(&*tx)
1102                    .await?;
1103            }
1104
1105            user::Entity::update_many()
1106                .filter(user::Column::Id.eq(id))
1107                .set(user::ActiveModel {
1108                    invite_count: ActiveValue::set(count),
1109                    ..Default::default()
1110                })
1111                .exec(&*tx)
1112                .await?;
1113            Ok(())
1114        })
1115        .await
1116    }
1117
1118    pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1119        self.transaction(|tx| async move {
1120            match user::Entity::find_by_id(id).one(&*tx).await? {
1121                Some(user) if user.invite_code.is_some() => {
1122                    Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1123                }
1124                _ => Ok(None),
1125            }
1126        })
1127        .await
1128    }
1129
1130    pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1131        self.transaction(|tx| async move {
1132            user::Entity::find()
1133                .filter(user::Column::InviteCode.eq(code))
1134                .one(&*tx)
1135                .await?
1136                .ok_or_else(|| {
1137                    Error::Http(
1138                        StatusCode::NOT_FOUND,
1139                        "that invite code does not exist".to_string(),
1140                    )
1141                })
1142        })
1143        .await
1144    }
1145
1146    // rooms
1147
1148    pub async fn incoming_call_for_user(
1149        &self,
1150        user_id: UserId,
1151    ) -> Result<Option<proto::IncomingCall>> {
1152        self.transaction(|tx| async move {
1153            let pending_participant = room_participant::Entity::find()
1154                .filter(
1155                    room_participant::Column::UserId
1156                        .eq(user_id)
1157                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1158                )
1159                .one(&*tx)
1160                .await?;
1161
1162            if let Some(pending_participant) = pending_participant {
1163                let room = self.get_room(pending_participant.room_id, &tx).await?;
1164                Ok(Self::build_incoming_call(&room, user_id))
1165            } else {
1166                Ok(None)
1167            }
1168        })
1169        .await
1170    }
1171
1172    pub async fn create_room(
1173        &self,
1174        user_id: UserId,
1175        connection: ConnectionId,
1176        live_kit_room: &str,
1177    ) -> Result<proto::Room> {
1178        self.transaction(|tx| async move {
1179            let room = room::ActiveModel {
1180                live_kit_room: ActiveValue::set(live_kit_room.into()),
1181                ..Default::default()
1182            }
1183            .insert(&*tx)
1184            .await?;
1185            room_participant::ActiveModel {
1186                room_id: ActiveValue::set(room.id),
1187                user_id: ActiveValue::set(user_id),
1188                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1189                answering_connection_server_id: ActiveValue::set(Some(ServerId(
1190                    connection.owner_id as i32,
1191                ))),
1192                answering_connection_lost: ActiveValue::set(false),
1193                calling_user_id: ActiveValue::set(user_id),
1194                calling_connection_id: ActiveValue::set(connection.id as i32),
1195                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1196                    connection.owner_id as i32,
1197                ))),
1198                ..Default::default()
1199            }
1200            .insert(&*tx)
1201            .await?;
1202
1203            let room = self.get_room(room.id, &tx).await?;
1204            Ok(room)
1205        })
1206        .await
1207    }
1208
1209    pub async fn call(
1210        &self,
1211        room_id: RoomId,
1212        calling_user_id: UserId,
1213        calling_connection: ConnectionId,
1214        called_user_id: UserId,
1215        initial_project_id: Option<ProjectId>,
1216    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1217        self.room_transaction(room_id, |tx| async move {
1218            room_participant::ActiveModel {
1219                room_id: ActiveValue::set(room_id),
1220                user_id: ActiveValue::set(called_user_id),
1221                answering_connection_lost: ActiveValue::set(false),
1222                calling_user_id: ActiveValue::set(calling_user_id),
1223                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1224                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1225                    calling_connection.owner_id as i32,
1226                ))),
1227                initial_project_id: ActiveValue::set(initial_project_id),
1228                ..Default::default()
1229            }
1230            .insert(&*tx)
1231            .await?;
1232
1233            let room = self.get_room(room_id, &tx).await?;
1234            let incoming_call = Self::build_incoming_call(&room, called_user_id)
1235                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1236            Ok((room, incoming_call))
1237        })
1238        .await
1239    }
1240
1241    pub async fn call_failed(
1242        &self,
1243        room_id: RoomId,
1244        called_user_id: UserId,
1245    ) -> Result<RoomGuard<proto::Room>> {
1246        self.room_transaction(room_id, |tx| async move {
1247            room_participant::Entity::delete_many()
1248                .filter(
1249                    room_participant::Column::RoomId
1250                        .eq(room_id)
1251                        .and(room_participant::Column::UserId.eq(called_user_id)),
1252                )
1253                .exec(&*tx)
1254                .await?;
1255            let room = self.get_room(room_id, &tx).await?;
1256            Ok(room)
1257        })
1258        .await
1259    }
1260
1261    pub async fn decline_call(
1262        &self,
1263        expected_room_id: Option<RoomId>,
1264        user_id: UserId,
1265    ) -> Result<Option<RoomGuard<proto::Room>>> {
1266        self.optional_room_transaction(|tx| async move {
1267            let mut filter = Condition::all()
1268                .add(room_participant::Column::UserId.eq(user_id))
1269                .add(room_participant::Column::AnsweringConnectionId.is_null());
1270            if let Some(room_id) = expected_room_id {
1271                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1272            }
1273            let participant = room_participant::Entity::find()
1274                .filter(filter)
1275                .one(&*tx)
1276                .await?;
1277
1278            let participant = if let Some(participant) = participant {
1279                participant
1280            } else if expected_room_id.is_some() {
1281                return Err(anyhow!("could not find call to decline"))?;
1282            } else {
1283                return Ok(None);
1284            };
1285
1286            let room_id = participant.room_id;
1287            room_participant::Entity::delete(participant.into_active_model())
1288                .exec(&*tx)
1289                .await?;
1290
1291            let room = self.get_room(room_id, &tx).await?;
1292            Ok(Some((room_id, room)))
1293        })
1294        .await
1295    }
1296
1297    pub async fn cancel_call(
1298        &self,
1299        room_id: RoomId,
1300        calling_connection: ConnectionId,
1301        called_user_id: UserId,
1302    ) -> Result<RoomGuard<proto::Room>> {
1303        self.room_transaction(room_id, |tx| async move {
1304            let participant = room_participant::Entity::find()
1305                .filter(
1306                    Condition::all()
1307                        .add(room_participant::Column::UserId.eq(called_user_id))
1308                        .add(room_participant::Column::RoomId.eq(room_id))
1309                        .add(
1310                            room_participant::Column::CallingConnectionId
1311                                .eq(calling_connection.id as i32),
1312                        )
1313                        .add(
1314                            room_participant::Column::CallingConnectionServerId
1315                                .eq(calling_connection.owner_id as i32),
1316                        )
1317                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1318                )
1319                .one(&*tx)
1320                .await?
1321                .ok_or_else(|| anyhow!("no call to cancel"))?;
1322
1323            room_participant::Entity::delete(participant.into_active_model())
1324                .exec(&*tx)
1325                .await?;
1326
1327            let room = self.get_room(room_id, &tx).await?;
1328            Ok(room)
1329        })
1330        .await
1331    }
1332
1333    pub async fn join_room(
1334        &self,
1335        room_id: RoomId,
1336        user_id: UserId,
1337        connection: ConnectionId,
1338    ) -> Result<RoomGuard<proto::Room>> {
1339        self.room_transaction(room_id, |tx| async move {
1340            let result = room_participant::Entity::update_many()
1341                .filter(
1342                    Condition::all()
1343                        .add(room_participant::Column::RoomId.eq(room_id))
1344                        .add(room_participant::Column::UserId.eq(user_id))
1345                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1346                )
1347                .set(room_participant::ActiveModel {
1348                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1349                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1350                        connection.owner_id as i32,
1351                    ))),
1352                    answering_connection_lost: ActiveValue::set(false),
1353                    ..Default::default()
1354                })
1355                .exec(&*tx)
1356                .await?;
1357            if result.rows_affected == 0 {
1358                Err(anyhow!("room does not exist or was already joined"))?
1359            } else {
1360                let room = self.get_room(room_id, &tx).await?;
1361                Ok(room)
1362            }
1363        })
1364        .await
1365    }
1366
1367    pub async fn rejoin_room(
1368        &self,
1369        rejoin_room: proto::RejoinRoom,
1370        user_id: UserId,
1371        connection: ConnectionId,
1372    ) -> Result<RoomGuard<RejoinedRoom>> {
1373        let room_id = RoomId::from_proto(rejoin_room.id);
1374        self.room_transaction(room_id, |tx| async {
1375            let tx = tx;
1376            let participant_update = room_participant::Entity::update_many()
1377                .filter(
1378                    Condition::all()
1379                        .add(room_participant::Column::RoomId.eq(room_id))
1380                        .add(room_participant::Column::UserId.eq(user_id))
1381                        .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1382                        .add(
1383                            Condition::any()
1384                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1385                                .add(
1386                                    room_participant::Column::AnsweringConnectionServerId
1387                                        .ne(connection.owner_id as i32),
1388                                ),
1389                        ),
1390                )
1391                .set(room_participant::ActiveModel {
1392                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1393                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1394                        connection.owner_id as i32,
1395                    ))),
1396                    answering_connection_lost: ActiveValue::set(false),
1397                    ..Default::default()
1398                })
1399                .exec(&*tx)
1400                .await?;
1401            if participant_update.rows_affected == 0 {
1402                return Err(anyhow!("room does not exist or was already joined"))?;
1403            }
1404
1405            let mut reshared_projects = Vec::new();
1406            for reshared_project in &rejoin_room.reshared_projects {
1407                let project_id = ProjectId::from_proto(reshared_project.project_id);
1408                let project = project::Entity::find_by_id(project_id)
1409                    .one(&*tx)
1410                    .await?
1411                    .ok_or_else(|| anyhow!("project does not exist"))?;
1412                if project.host_user_id != user_id {
1413                    return Err(anyhow!("no such project"))?;
1414                }
1415
1416                let mut collaborators = project
1417                    .find_related(project_collaborator::Entity)
1418                    .all(&*tx)
1419                    .await?;
1420                let host_ix = collaborators
1421                    .iter()
1422                    .position(|collaborator| {
1423                        collaborator.user_id == user_id && collaborator.is_host
1424                    })
1425                    .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1426                let host = collaborators.swap_remove(host_ix);
1427                let old_connection_id = host.connection();
1428
1429                project::Entity::update(project::ActiveModel {
1430                    host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1431                    host_connection_server_id: ActiveValue::set(Some(ServerId(
1432                        connection.owner_id as i32,
1433                    ))),
1434                    ..project.into_active_model()
1435                })
1436                .exec(&*tx)
1437                .await?;
1438                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1439                    connection_id: ActiveValue::set(connection.id as i32),
1440                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1441                    ..host.into_active_model()
1442                })
1443                .exec(&*tx)
1444                .await?;
1445
1446                self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1447                    .await?;
1448
1449                reshared_projects.push(ResharedProject {
1450                    id: project_id,
1451                    old_connection_id,
1452                    collaborators: collaborators
1453                        .iter()
1454                        .map(|collaborator| ProjectCollaborator {
1455                            connection_id: collaborator.connection(),
1456                            user_id: collaborator.user_id,
1457                            replica_id: collaborator.replica_id,
1458                            is_host: collaborator.is_host,
1459                        })
1460                        .collect(),
1461                    worktrees: reshared_project.worktrees.clone(),
1462                });
1463            }
1464
1465            project::Entity::delete_many()
1466                .filter(
1467                    Condition::all()
1468                        .add(project::Column::RoomId.eq(room_id))
1469                        .add(project::Column::HostUserId.eq(user_id))
1470                        .add(
1471                            project::Column::Id
1472                                .is_not_in(reshared_projects.iter().map(|project| project.id)),
1473                        ),
1474                )
1475                .exec(&*tx)
1476                .await?;
1477
1478            let mut rejoined_projects = Vec::new();
1479            for rejoined_project in &rejoin_room.rejoined_projects {
1480                let project_id = ProjectId::from_proto(rejoined_project.id);
1481                let Some(project) = project::Entity::find_by_id(project_id)
1482                    .one(&*tx)
1483                    .await? else { continue };
1484
1485                let mut worktrees = Vec::new();
1486                let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1487                for db_worktree in db_worktrees {
1488                    let mut worktree = RejoinedWorktree {
1489                        id: db_worktree.id as u64,
1490                        abs_path: db_worktree.abs_path,
1491                        root_name: db_worktree.root_name,
1492                        visible: db_worktree.visible,
1493                        updated_entries: Default::default(),
1494                        removed_entries: Default::default(),
1495                        updated_repositories: Default::default(),
1496                        removed_repositories: Default::default(),
1497                        diagnostic_summaries: Default::default(),
1498                        settings_files: Default::default(),
1499                        scan_id: db_worktree.scan_id as u64,
1500                        completed_scan_id: db_worktree.completed_scan_id as u64,
1501                    };
1502
1503                    let rejoined_worktree = rejoined_project
1504                        .worktrees
1505                        .iter()
1506                        .find(|worktree| worktree.id == db_worktree.id as u64);
1507
1508                    // File entries
1509                    {
1510                        let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1511                            worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1512                        } else {
1513                            worktree_entry::Column::IsDeleted.eq(false)
1514                        };
1515
1516                        let mut db_entries = worktree_entry::Entity::find()
1517                            .filter(
1518                                Condition::all()
1519                                    .add(worktree_entry::Column::ProjectId.eq(project.id))
1520                                    .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1521                                    .add(entry_filter),
1522                            )
1523                            .stream(&*tx)
1524                            .await?;
1525
1526                        while let Some(db_entry) = db_entries.next().await {
1527                            let db_entry = db_entry?;
1528                            if db_entry.is_deleted {
1529                                worktree.removed_entries.push(db_entry.id as u64);
1530                            } else {
1531                                worktree.updated_entries.push(proto::Entry {
1532                                    id: db_entry.id as u64,
1533                                    is_dir: db_entry.is_dir,
1534                                    path: db_entry.path,
1535                                    inode: db_entry.inode as u64,
1536                                    mtime: Some(proto::Timestamp {
1537                                        seconds: db_entry.mtime_seconds as u64,
1538                                        nanos: db_entry.mtime_nanos as u32,
1539                                    }),
1540                                    is_symlink: db_entry.is_symlink,
1541                                    is_ignored: db_entry.is_ignored,
1542                                });
1543                            }
1544                        }
1545                    }
1546
1547                    // Repository Entries
1548                    {
1549                        let repository_entry_filter =
1550                            if let Some(rejoined_worktree) = rejoined_worktree {
1551                                worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
1552                            } else {
1553                                worktree_repository::Column::IsDeleted.eq(false)
1554                            };
1555
1556                        let mut db_repositories = worktree_repository::Entity::find()
1557                            .filter(
1558                                Condition::all()
1559                                    .add(worktree_repository::Column::ProjectId.eq(project.id))
1560                                    .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
1561                                    .add(repository_entry_filter),
1562                            )
1563                            .stream(&*tx)
1564                            .await?;
1565
1566                        while let Some(db_repository) = db_repositories.next().await {
1567                            let db_repository = db_repository?;
1568                            if db_repository.is_deleted {
1569                                worktree
1570                                    .removed_repositories
1571                                    .push(db_repository.work_directory_id as u64);
1572                            } else {
1573                                worktree.updated_repositories.push(proto::RepositoryEntry {
1574                                    work_directory_id: db_repository.work_directory_id as u64,
1575                                    branch: db_repository.branch,
1576                                    removed_repo_paths: Default::default(),
1577                                    updated_statuses: Default::default(),
1578                                });
1579                            }
1580                        }
1581                    }
1582
1583                    // Repository Status Entries
1584                    for repository in worktree.updated_repositories.iter_mut() {
1585                        let repository_status_entry_filter =
1586                            if let Some(rejoined_worktree) = rejoined_worktree {
1587                                worktree_repository_statuses::Column::ScanId
1588                                    .gt(rejoined_worktree.scan_id)
1589                            } else {
1590                                worktree_repository_statuses::Column::IsDeleted.eq(false)
1591                            };
1592
1593                        let mut db_repository_statuses =
1594                            worktree_repository_statuses::Entity::find()
1595                                .filter(
1596                                    Condition::all()
1597                                        .add(
1598                                            worktree_repository_statuses::Column::ProjectId
1599                                                .eq(project.id),
1600                                        )
1601                                        .add(
1602                                            worktree_repository_statuses::Column::WorktreeId
1603                                                .eq(worktree.id),
1604                                        )
1605                                        .add(
1606                                            worktree_repository_statuses::Column::WorkDirectoryId
1607                                                .eq(repository.work_directory_id),
1608                                        )
1609                                        .add(repository_status_entry_filter),
1610                                )
1611                                .stream(&*tx)
1612                                .await?;
1613
1614                        while let Some(db_status_entry) = db_repository_statuses.next().await {
1615                            let db_status_entry = db_status_entry?;
1616                            if db_status_entry.is_deleted {
1617                                repository
1618                                    .removed_repo_paths
1619                                    .push(db_status_entry.repo_path);
1620                            } else {
1621                                repository.updated_statuses.push(proto::StatusEntry {
1622                                    repo_path: db_status_entry.repo_path,
1623                                    status: db_status_entry.status as i32,
1624                                });
1625                            }
1626                        }
1627                    }
1628
1629                    worktrees.push(worktree);
1630                }
1631
1632                let language_servers = project
1633                    .find_related(language_server::Entity)
1634                    .all(&*tx)
1635                    .await?
1636                    .into_iter()
1637                    .map(|language_server| proto::LanguageServer {
1638                        id: language_server.id as u64,
1639                        name: language_server.name,
1640                    })
1641                    .collect::<Vec<_>>();
1642
1643                {
1644                    let mut db_settings_files = worktree_settings_file::Entity::find()
1645                        .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
1646                        .stream(&*tx)
1647                        .await?;
1648                    while let Some(db_settings_file) = db_settings_files.next().await {
1649                        let db_settings_file = db_settings_file?;
1650                        if let Some(worktree) = worktrees
1651                            .iter_mut()
1652                            .find(|w| w.id == db_settings_file.worktree_id as u64)
1653                        {
1654                            worktree.settings_files.push(WorktreeSettingsFile {
1655                                path: db_settings_file.path,
1656                                content: db_settings_file.content,
1657                            });
1658                        }
1659                    }
1660                }
1661
1662                let mut collaborators = project
1663                    .find_related(project_collaborator::Entity)
1664                    .all(&*tx)
1665                    .await?;
1666                let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1667                    .iter()
1668                    .position(|collaborator| collaborator.user_id == user_id)
1669                {
1670                    collaborators.swap_remove(self_collaborator_ix)
1671                } else {
1672                    continue;
1673                };
1674                let old_connection_id = self_collaborator.connection();
1675                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1676                    connection_id: ActiveValue::set(connection.id as i32),
1677                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1678                    ..self_collaborator.into_active_model()
1679                })
1680                .exec(&*tx)
1681                .await?;
1682
1683                let collaborators = collaborators
1684                    .into_iter()
1685                    .map(|collaborator| ProjectCollaborator {
1686                        connection_id: collaborator.connection(),
1687                        user_id: collaborator.user_id,
1688                        replica_id: collaborator.replica_id,
1689                        is_host: collaborator.is_host,
1690                    })
1691                    .collect::<Vec<_>>();
1692
1693                rejoined_projects.push(RejoinedProject {
1694                    id: project_id,
1695                    old_connection_id,
1696                    collaborators,
1697                    worktrees,
1698                    language_servers,
1699                });
1700            }
1701
1702            let room = self.get_room(room_id, &tx).await?;
1703            Ok(RejoinedRoom {
1704                room,
1705                rejoined_projects,
1706                reshared_projects,
1707            })
1708        })
1709        .await
1710    }
1711
1712    pub async fn leave_room(
1713        &self,
1714        connection: ConnectionId,
1715    ) -> Result<Option<RoomGuard<LeftRoom>>> {
1716        self.optional_room_transaction(|tx| async move {
1717            let leaving_participant = room_participant::Entity::find()
1718                .filter(
1719                    Condition::all()
1720                        .add(
1721                            room_participant::Column::AnsweringConnectionId
1722                                .eq(connection.id as i32),
1723                        )
1724                        .add(
1725                            room_participant::Column::AnsweringConnectionServerId
1726                                .eq(connection.owner_id as i32),
1727                        ),
1728                )
1729                .one(&*tx)
1730                .await?;
1731
1732            if let Some(leaving_participant) = leaving_participant {
1733                // Leave room.
1734                let room_id = leaving_participant.room_id;
1735                room_participant::Entity::delete_by_id(leaving_participant.id)
1736                    .exec(&*tx)
1737                    .await?;
1738
1739                // Cancel pending calls initiated by the leaving user.
1740                let called_participants = room_participant::Entity::find()
1741                    .filter(
1742                        Condition::all()
1743                            .add(
1744                                room_participant::Column::CallingUserId
1745                                    .eq(leaving_participant.user_id),
1746                            )
1747                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
1748                    )
1749                    .all(&*tx)
1750                    .await?;
1751                room_participant::Entity::delete_many()
1752                    .filter(
1753                        room_participant::Column::Id
1754                            .is_in(called_participants.iter().map(|participant| participant.id)),
1755                    )
1756                    .exec(&*tx)
1757                    .await?;
1758                let canceled_calls_to_user_ids = called_participants
1759                    .into_iter()
1760                    .map(|participant| participant.user_id)
1761                    .collect();
1762
1763                // Detect left projects.
1764                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1765                enum QueryProjectIds {
1766                    ProjectId,
1767                }
1768                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1769                    .select_only()
1770                    .column_as(
1771                        project_collaborator::Column::ProjectId,
1772                        QueryProjectIds::ProjectId,
1773                    )
1774                    .filter(
1775                        Condition::all()
1776                            .add(
1777                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1778                            )
1779                            .add(
1780                                project_collaborator::Column::ConnectionServerId
1781                                    .eq(connection.owner_id as i32),
1782                            ),
1783                    )
1784                    .into_values::<_, QueryProjectIds>()
1785                    .all(&*tx)
1786                    .await?;
1787                let mut left_projects = HashMap::default();
1788                let mut collaborators = project_collaborator::Entity::find()
1789                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1790                    .stream(&*tx)
1791                    .await?;
1792                while let Some(collaborator) = collaborators.next().await {
1793                    let collaborator = collaborator?;
1794                    let left_project =
1795                        left_projects
1796                            .entry(collaborator.project_id)
1797                            .or_insert(LeftProject {
1798                                id: collaborator.project_id,
1799                                host_user_id: Default::default(),
1800                                connection_ids: Default::default(),
1801                                host_connection_id: Default::default(),
1802                            });
1803
1804                    let collaborator_connection_id = collaborator.connection();
1805                    if collaborator_connection_id != connection {
1806                        left_project.connection_ids.push(collaborator_connection_id);
1807                    }
1808
1809                    if collaborator.is_host {
1810                        left_project.host_user_id = collaborator.user_id;
1811                        left_project.host_connection_id = collaborator_connection_id;
1812                    }
1813                }
1814                drop(collaborators);
1815
1816                // Leave projects.
1817                project_collaborator::Entity::delete_many()
1818                    .filter(
1819                        Condition::all()
1820                            .add(
1821                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1822                            )
1823                            .add(
1824                                project_collaborator::Column::ConnectionServerId
1825                                    .eq(connection.owner_id as i32),
1826                            ),
1827                    )
1828                    .exec(&*tx)
1829                    .await?;
1830
1831                // Unshare projects.
1832                project::Entity::delete_many()
1833                    .filter(
1834                        Condition::all()
1835                            .add(project::Column::RoomId.eq(room_id))
1836                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
1837                            .add(
1838                                project::Column::HostConnectionServerId
1839                                    .eq(connection.owner_id as i32),
1840                            ),
1841                    )
1842                    .exec(&*tx)
1843                    .await?;
1844
1845                let room = self.get_room(room_id, &tx).await?;
1846                if room.participants.is_empty() {
1847                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
1848                }
1849
1850                let left_room = LeftRoom {
1851                    room,
1852                    left_projects,
1853                    canceled_calls_to_user_ids,
1854                };
1855
1856                if left_room.room.participants.is_empty() {
1857                    self.rooms.remove(&room_id);
1858                }
1859
1860                Ok(Some((room_id, left_room)))
1861            } else {
1862                Ok(None)
1863            }
1864        })
1865        .await
1866    }
1867
1868    pub async fn follow(
1869        &self,
1870        project_id: ProjectId,
1871        leader_connection: ConnectionId,
1872        follower_connection: ConnectionId,
1873    ) -> Result<RoomGuard<proto::Room>> {
1874        let room_id = self.room_id_for_project(project_id).await?;
1875        self.room_transaction(room_id, |tx| async move {
1876            follower::ActiveModel {
1877                room_id: ActiveValue::set(room_id),
1878                project_id: ActiveValue::set(project_id),
1879                leader_connection_server_id: ActiveValue::set(ServerId(
1880                    leader_connection.owner_id as i32,
1881                )),
1882                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1883                follower_connection_server_id: ActiveValue::set(ServerId(
1884                    follower_connection.owner_id as i32,
1885                )),
1886                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1887                ..Default::default()
1888            }
1889            .insert(&*tx)
1890            .await?;
1891
1892            let room = self.get_room(room_id, &*tx).await?;
1893            Ok(room)
1894        })
1895        .await
1896    }
1897
1898    pub async fn unfollow(
1899        &self,
1900        project_id: ProjectId,
1901        leader_connection: ConnectionId,
1902        follower_connection: ConnectionId,
1903    ) -> Result<RoomGuard<proto::Room>> {
1904        let room_id = self.room_id_for_project(project_id).await?;
1905        self.room_transaction(room_id, |tx| async move {
1906            follower::Entity::delete_many()
1907                .filter(
1908                    Condition::all()
1909                        .add(follower::Column::ProjectId.eq(project_id))
1910                        .add(
1911                            follower::Column::LeaderConnectionServerId
1912                                .eq(leader_connection.owner_id),
1913                        )
1914                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1915                        .add(
1916                            follower::Column::FollowerConnectionServerId
1917                                .eq(follower_connection.owner_id),
1918                        )
1919                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1920                )
1921                .exec(&*tx)
1922                .await?;
1923
1924            let room = self.get_room(room_id, &*tx).await?;
1925            Ok(room)
1926        })
1927        .await
1928    }
1929
1930    pub async fn update_room_participant_location(
1931        &self,
1932        room_id: RoomId,
1933        connection: ConnectionId,
1934        location: proto::ParticipantLocation,
1935    ) -> Result<RoomGuard<proto::Room>> {
1936        self.room_transaction(room_id, |tx| async {
1937            let tx = tx;
1938            let location_kind;
1939            let location_project_id;
1940            match location
1941                .variant
1942                .as_ref()
1943                .ok_or_else(|| anyhow!("invalid location"))?
1944            {
1945                proto::participant_location::Variant::SharedProject(project) => {
1946                    location_kind = 0;
1947                    location_project_id = Some(ProjectId::from_proto(project.id));
1948                }
1949                proto::participant_location::Variant::UnsharedProject(_) => {
1950                    location_kind = 1;
1951                    location_project_id = None;
1952                }
1953                proto::participant_location::Variant::External(_) => {
1954                    location_kind = 2;
1955                    location_project_id = None;
1956                }
1957            }
1958
1959            let result = room_participant::Entity::update_many()
1960                .filter(
1961                    Condition::all()
1962                        .add(room_participant::Column::RoomId.eq(room_id))
1963                        .add(
1964                            room_participant::Column::AnsweringConnectionId
1965                                .eq(connection.id as i32),
1966                        )
1967                        .add(
1968                            room_participant::Column::AnsweringConnectionServerId
1969                                .eq(connection.owner_id as i32),
1970                        ),
1971                )
1972                .set(room_participant::ActiveModel {
1973                    location_kind: ActiveValue::set(Some(location_kind)),
1974                    location_project_id: ActiveValue::set(location_project_id),
1975                    ..Default::default()
1976                })
1977                .exec(&*tx)
1978                .await?;
1979
1980            if result.rows_affected == 1 {
1981                let room = self.get_room(room_id, &tx).await?;
1982                Ok(room)
1983            } else {
1984                Err(anyhow!("could not update room participant location"))?
1985            }
1986        })
1987        .await
1988    }
1989
1990    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1991        self.transaction(|tx| async move {
1992            let participant = room_participant::Entity::find()
1993                .filter(
1994                    Condition::all()
1995                        .add(
1996                            room_participant::Column::AnsweringConnectionId
1997                                .eq(connection.id as i32),
1998                        )
1999                        .add(
2000                            room_participant::Column::AnsweringConnectionServerId
2001                                .eq(connection.owner_id as i32),
2002                        ),
2003                )
2004                .one(&*tx)
2005                .await?
2006                .ok_or_else(|| anyhow!("not a participant in any room"))?;
2007
2008            room_participant::Entity::update(room_participant::ActiveModel {
2009                answering_connection_lost: ActiveValue::set(true),
2010                ..participant.into_active_model()
2011            })
2012            .exec(&*tx)
2013            .await?;
2014
2015            Ok(())
2016        })
2017        .await
2018    }
2019
2020    fn build_incoming_call(
2021        room: &proto::Room,
2022        called_user_id: UserId,
2023    ) -> Option<proto::IncomingCall> {
2024        let pending_participant = room
2025            .pending_participants
2026            .iter()
2027            .find(|participant| participant.user_id == called_user_id.to_proto())?;
2028
2029        Some(proto::IncomingCall {
2030            room_id: room.id,
2031            calling_user_id: pending_participant.calling_user_id,
2032            participant_user_ids: room
2033                .participants
2034                .iter()
2035                .map(|participant| participant.user_id)
2036                .collect(),
2037            initial_project: room.participants.iter().find_map(|participant| {
2038                let initial_project_id = pending_participant.initial_project_id?;
2039                participant
2040                    .projects
2041                    .iter()
2042                    .find(|project| project.id == initial_project_id)
2043                    .cloned()
2044            }),
2045        })
2046    }
2047
2048    async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
2049        let db_room = room::Entity::find_by_id(room_id)
2050            .one(tx)
2051            .await?
2052            .ok_or_else(|| anyhow!("could not find room"))?;
2053
2054        let mut db_participants = db_room
2055            .find_related(room_participant::Entity)
2056            .stream(tx)
2057            .await?;
2058        let mut participants = HashMap::default();
2059        let mut pending_participants = Vec::new();
2060        while let Some(db_participant) = db_participants.next().await {
2061            let db_participant = db_participant?;
2062            if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
2063                .answering_connection_id
2064                .zip(db_participant.answering_connection_server_id)
2065            {
2066                let location = match (
2067                    db_participant.location_kind,
2068                    db_participant.location_project_id,
2069                ) {
2070                    (Some(0), Some(project_id)) => {
2071                        Some(proto::participant_location::Variant::SharedProject(
2072                            proto::participant_location::SharedProject {
2073                                id: project_id.to_proto(),
2074                            },
2075                        ))
2076                    }
2077                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
2078                        Default::default(),
2079                    )),
2080                    _ => Some(proto::participant_location::Variant::External(
2081                        Default::default(),
2082                    )),
2083                };
2084
2085                let answering_connection = ConnectionId {
2086                    owner_id: answering_connection_server_id.0 as u32,
2087                    id: answering_connection_id as u32,
2088                };
2089                participants.insert(
2090                    answering_connection,
2091                    proto::Participant {
2092                        user_id: db_participant.user_id.to_proto(),
2093                        peer_id: Some(answering_connection.into()),
2094                        projects: Default::default(),
2095                        location: Some(proto::ParticipantLocation { variant: location }),
2096                    },
2097                );
2098            } else {
2099                pending_participants.push(proto::PendingParticipant {
2100                    user_id: db_participant.user_id.to_proto(),
2101                    calling_user_id: db_participant.calling_user_id.to_proto(),
2102                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
2103                });
2104            }
2105        }
2106        drop(db_participants);
2107
2108        let mut db_projects = db_room
2109            .find_related(project::Entity)
2110            .find_with_related(worktree::Entity)
2111            .stream(tx)
2112            .await?;
2113
2114        while let Some(row) = db_projects.next().await {
2115            let (db_project, db_worktree) = row?;
2116            let host_connection = db_project.host_connection()?;
2117            if let Some(participant) = participants.get_mut(&host_connection) {
2118                let project = if let Some(project) = participant
2119                    .projects
2120                    .iter_mut()
2121                    .find(|project| project.id == db_project.id.to_proto())
2122                {
2123                    project
2124                } else {
2125                    participant.projects.push(proto::ParticipantProject {
2126                        id: db_project.id.to_proto(),
2127                        worktree_root_names: Default::default(),
2128                    });
2129                    participant.projects.last_mut().unwrap()
2130                };
2131
2132                if let Some(db_worktree) = db_worktree {
2133                    if db_worktree.visible {
2134                        project.worktree_root_names.push(db_worktree.root_name);
2135                    }
2136                }
2137            }
2138        }
2139        drop(db_projects);
2140
2141        let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
2142        let mut followers = Vec::new();
2143        while let Some(db_follower) = db_followers.next().await {
2144            let db_follower = db_follower?;
2145            followers.push(proto::Follower {
2146                leader_id: Some(db_follower.leader_connection().into()),
2147                follower_id: Some(db_follower.follower_connection().into()),
2148                project_id: db_follower.project_id.to_proto(),
2149            });
2150        }
2151
2152        Ok(proto::Room {
2153            id: db_room.id.to_proto(),
2154            live_kit_room: db_room.live_kit_room,
2155            participants: participants.into_values().collect(),
2156            pending_participants,
2157            followers,
2158        })
2159    }
2160
2161    // projects
2162
2163    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
2164        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2165        enum QueryAs {
2166            Count,
2167        }
2168
2169        self.transaction(|tx| async move {
2170            Ok(project::Entity::find()
2171                .select_only()
2172                .column_as(project::Column::Id.count(), QueryAs::Count)
2173                .inner_join(user::Entity)
2174                .filter(user::Column::Admin.eq(false))
2175                .into_values::<_, QueryAs>()
2176                .one(&*tx)
2177                .await?
2178                .unwrap_or(0i64) as usize)
2179        })
2180        .await
2181    }
2182
2183    pub async fn share_project(
2184        &self,
2185        room_id: RoomId,
2186        connection: ConnectionId,
2187        worktrees: &[proto::WorktreeMetadata],
2188    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
2189        self.room_transaction(room_id, |tx| async move {
2190            let participant = room_participant::Entity::find()
2191                .filter(
2192                    Condition::all()
2193                        .add(
2194                            room_participant::Column::AnsweringConnectionId
2195                                .eq(connection.id as i32),
2196                        )
2197                        .add(
2198                            room_participant::Column::AnsweringConnectionServerId
2199                                .eq(connection.owner_id as i32),
2200                        ),
2201                )
2202                .one(&*tx)
2203                .await?
2204                .ok_or_else(|| anyhow!("could not find participant"))?;
2205            if participant.room_id != room_id {
2206                return Err(anyhow!("shared project on unexpected room"))?;
2207            }
2208
2209            let project = project::ActiveModel {
2210                room_id: ActiveValue::set(participant.room_id),
2211                host_user_id: ActiveValue::set(participant.user_id),
2212                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
2213                host_connection_server_id: ActiveValue::set(Some(ServerId(
2214                    connection.owner_id as i32,
2215                ))),
2216                ..Default::default()
2217            }
2218            .insert(&*tx)
2219            .await?;
2220
2221            if !worktrees.is_empty() {
2222                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2223                    worktree::ActiveModel {
2224                        id: ActiveValue::set(worktree.id as i64),
2225                        project_id: ActiveValue::set(project.id),
2226                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
2227                        root_name: ActiveValue::set(worktree.root_name.clone()),
2228                        visible: ActiveValue::set(worktree.visible),
2229                        scan_id: ActiveValue::set(0),
2230                        completed_scan_id: ActiveValue::set(0),
2231                    }
2232                }))
2233                .exec(&*tx)
2234                .await?;
2235            }
2236
2237            project_collaborator::ActiveModel {
2238                project_id: ActiveValue::set(project.id),
2239                connection_id: ActiveValue::set(connection.id as i32),
2240                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2241                user_id: ActiveValue::set(participant.user_id),
2242                replica_id: ActiveValue::set(ReplicaId(0)),
2243                is_host: ActiveValue::set(true),
2244                ..Default::default()
2245            }
2246            .insert(&*tx)
2247            .await?;
2248
2249            let room = self.get_room(room_id, &tx).await?;
2250            Ok((project.id, room))
2251        })
2252        .await
2253    }
2254
2255    pub async fn unshare_project(
2256        &self,
2257        project_id: ProjectId,
2258        connection: ConnectionId,
2259    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2260        let room_id = self.room_id_for_project(project_id).await?;
2261        self.room_transaction(room_id, |tx| async move {
2262            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2263
2264            let project = project::Entity::find_by_id(project_id)
2265                .one(&*tx)
2266                .await?
2267                .ok_or_else(|| anyhow!("project not found"))?;
2268            if project.host_connection()? == connection {
2269                project::Entity::delete(project.into_active_model())
2270                    .exec(&*tx)
2271                    .await?;
2272                let room = self.get_room(room_id, &tx).await?;
2273                Ok((room, guest_connection_ids))
2274            } else {
2275                Err(anyhow!("cannot unshare a project hosted by another user"))?
2276            }
2277        })
2278        .await
2279    }
2280
2281    pub async fn update_project(
2282        &self,
2283        project_id: ProjectId,
2284        connection: ConnectionId,
2285        worktrees: &[proto::WorktreeMetadata],
2286    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2287        let room_id = self.room_id_for_project(project_id).await?;
2288        self.room_transaction(room_id, |tx| async move {
2289            let project = project::Entity::find_by_id(project_id)
2290                .filter(
2291                    Condition::all()
2292                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2293                        .add(
2294                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2295                        ),
2296                )
2297                .one(&*tx)
2298                .await?
2299                .ok_or_else(|| anyhow!("no such project"))?;
2300
2301            self.update_project_worktrees(project.id, worktrees, &tx)
2302                .await?;
2303
2304            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2305            let room = self.get_room(project.room_id, &tx).await?;
2306            Ok((room, guest_connection_ids))
2307        })
2308        .await
2309    }
2310
2311    async fn update_project_worktrees(
2312        &self,
2313        project_id: ProjectId,
2314        worktrees: &[proto::WorktreeMetadata],
2315        tx: &DatabaseTransaction,
2316    ) -> Result<()> {
2317        if !worktrees.is_empty() {
2318            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2319                id: ActiveValue::set(worktree.id as i64),
2320                project_id: ActiveValue::set(project_id),
2321                abs_path: ActiveValue::set(worktree.abs_path.clone()),
2322                root_name: ActiveValue::set(worktree.root_name.clone()),
2323                visible: ActiveValue::set(worktree.visible),
2324                scan_id: ActiveValue::set(0),
2325                completed_scan_id: ActiveValue::set(0),
2326            }))
2327            .on_conflict(
2328                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2329                    .update_column(worktree::Column::RootName)
2330                    .to_owned(),
2331            )
2332            .exec(&*tx)
2333            .await?;
2334        }
2335
2336        worktree::Entity::delete_many()
2337            .filter(worktree::Column::ProjectId.eq(project_id).and(
2338                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2339            ))
2340            .exec(&*tx)
2341            .await?;
2342
2343        Ok(())
2344    }
2345
2346    pub async fn update_worktree(
2347        &self,
2348        update: &proto::UpdateWorktree,
2349        connection: ConnectionId,
2350    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2351        let project_id = ProjectId::from_proto(update.project_id);
2352        let worktree_id = update.worktree_id as i64;
2353        let room_id = self.room_id_for_project(project_id).await?;
2354        self.room_transaction(room_id, |tx| async move {
2355            // Ensure the update comes from the host.
2356            let _project = project::Entity::find_by_id(project_id)
2357                .filter(
2358                    Condition::all()
2359                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2360                        .add(
2361                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2362                        ),
2363                )
2364                .one(&*tx)
2365                .await?
2366                .ok_or_else(|| anyhow!("no such project"))?;
2367
2368            // Update metadata.
2369            worktree::Entity::update(worktree::ActiveModel {
2370                id: ActiveValue::set(worktree_id),
2371                project_id: ActiveValue::set(project_id),
2372                root_name: ActiveValue::set(update.root_name.clone()),
2373                scan_id: ActiveValue::set(update.scan_id as i64),
2374                completed_scan_id: if update.is_last_update {
2375                    ActiveValue::set(update.scan_id as i64)
2376                } else {
2377                    ActiveValue::default()
2378                },
2379                abs_path: ActiveValue::set(update.abs_path.clone()),
2380                ..Default::default()
2381            })
2382            .exec(&*tx)
2383            .await?;
2384
2385            if !update.updated_entries.is_empty() {
2386                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2387                    let mtime = entry.mtime.clone().unwrap_or_default();
2388                    worktree_entry::ActiveModel {
2389                        project_id: ActiveValue::set(project_id),
2390                        worktree_id: ActiveValue::set(worktree_id),
2391                        id: ActiveValue::set(entry.id as i64),
2392                        is_dir: ActiveValue::set(entry.is_dir),
2393                        path: ActiveValue::set(entry.path.clone()),
2394                        inode: ActiveValue::set(entry.inode as i64),
2395                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2396                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2397                        is_symlink: ActiveValue::set(entry.is_symlink),
2398                        is_ignored: ActiveValue::set(entry.is_ignored),
2399                        is_deleted: ActiveValue::set(false),
2400                        scan_id: ActiveValue::set(update.scan_id as i64),
2401                    }
2402                }))
2403                .on_conflict(
2404                    OnConflict::columns([
2405                        worktree_entry::Column::ProjectId,
2406                        worktree_entry::Column::WorktreeId,
2407                        worktree_entry::Column::Id,
2408                    ])
2409                    .update_columns([
2410                        worktree_entry::Column::IsDir,
2411                        worktree_entry::Column::Path,
2412                        worktree_entry::Column::Inode,
2413                        worktree_entry::Column::MtimeSeconds,
2414                        worktree_entry::Column::MtimeNanos,
2415                        worktree_entry::Column::IsSymlink,
2416                        worktree_entry::Column::IsIgnored,
2417                        worktree_entry::Column::ScanId,
2418                    ])
2419                    .to_owned(),
2420                )
2421                .exec(&*tx)
2422                .await?;
2423            }
2424
2425            if !update.removed_entries.is_empty() {
2426                worktree_entry::Entity::update_many()
2427                    .filter(
2428                        worktree_entry::Column::ProjectId
2429                            .eq(project_id)
2430                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2431                            .and(
2432                                worktree_entry::Column::Id
2433                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2434                            ),
2435                    )
2436                    .set(worktree_entry::ActiveModel {
2437                        is_deleted: ActiveValue::Set(true),
2438                        scan_id: ActiveValue::Set(update.scan_id as i64),
2439                        ..Default::default()
2440                    })
2441                    .exec(&*tx)
2442                    .await?;
2443            }
2444
2445            if !update.updated_repositories.is_empty() {
2446                worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
2447                    |repository| worktree_repository::ActiveModel {
2448                        project_id: ActiveValue::set(project_id),
2449                        worktree_id: ActiveValue::set(worktree_id),
2450                        work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
2451                        scan_id: ActiveValue::set(update.scan_id as i64),
2452                        branch: ActiveValue::set(repository.branch.clone()),
2453                        is_deleted: ActiveValue::set(false),
2454                    },
2455                ))
2456                .on_conflict(
2457                    OnConflict::columns([
2458                        worktree_repository::Column::ProjectId,
2459                        worktree_repository::Column::WorktreeId,
2460                        worktree_repository::Column::WorkDirectoryId,
2461                    ])
2462                    .update_columns([
2463                        worktree_repository::Column::ScanId,
2464                        worktree_repository::Column::Branch,
2465                    ])
2466                    .to_owned(),
2467                )
2468                .exec(&*tx)
2469                .await?;
2470
2471                for repository in update.updated_repositories.iter() {
2472                    if !repository.updated_statuses.is_empty() {
2473                        worktree_repository_statuses::Entity::insert_many(
2474                            repository.updated_statuses.iter().map(|status_entry| {
2475                                worktree_repository_statuses::ActiveModel {
2476                                    project_id: ActiveValue::set(project_id),
2477                                    worktree_id: ActiveValue::set(worktree_id),
2478                                    work_directory_id: ActiveValue::set(
2479                                        repository.work_directory_id as i64,
2480                                    ),
2481                                    repo_path: ActiveValue::set(status_entry.repo_path.clone()),
2482                                    status: ActiveValue::set(status_entry.status as i64),
2483                                    scan_id: ActiveValue::set(update.scan_id as i64),
2484                                    is_deleted: ActiveValue::set(false),
2485                                }
2486                            }),
2487                        )
2488                        .on_conflict(
2489                            OnConflict::columns([
2490                                worktree_repository_statuses::Column::ProjectId,
2491                                worktree_repository_statuses::Column::WorktreeId,
2492                                worktree_repository_statuses::Column::WorkDirectoryId,
2493                                worktree_repository_statuses::Column::RepoPath,
2494                            ])
2495                            .update_columns([
2496                                worktree_repository_statuses::Column::ScanId,
2497                                worktree_repository_statuses::Column::Status,
2498                                worktree_repository_statuses::Column::IsDeleted,
2499                            ])
2500                            .to_owned(),
2501                        )
2502                        .exec(&*tx)
2503                        .await?;
2504                    }
2505
2506                    if !repository.removed_repo_paths.is_empty() {
2507                        worktree_repository_statuses::Entity::update_many()
2508                            .filter(
2509                                worktree_repository_statuses::Column::ProjectId
2510                                    .eq(project_id)
2511                                    .and(
2512                                        worktree_repository_statuses::Column::WorktreeId
2513                                            .eq(worktree_id),
2514                                    )
2515                                    .and(
2516                                        worktree_repository_statuses::Column::WorkDirectoryId
2517                                            .eq(repository.work_directory_id as i64),
2518                                    )
2519                                    .and(worktree_repository_statuses::Column::RepoPath.is_in(
2520                                        repository.removed_repo_paths.iter().map(String::as_str),
2521                                    )),
2522                            )
2523                            .set(worktree_repository_statuses::ActiveModel {
2524                                is_deleted: ActiveValue::Set(true),
2525                                scan_id: ActiveValue::Set(update.scan_id as i64),
2526                                ..Default::default()
2527                            })
2528                            .exec(&*tx)
2529                            .await?;
2530                    }
2531                }
2532            }
2533
2534            if !update.removed_repositories.is_empty() {
2535                worktree_repository::Entity::update_many()
2536                    .filter(
2537                        worktree_repository::Column::ProjectId
2538                            .eq(project_id)
2539                            .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
2540                            .and(
2541                                worktree_repository::Column::WorkDirectoryId
2542                                    .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
2543                            ),
2544                    )
2545                    .set(worktree_repository::ActiveModel {
2546                        is_deleted: ActiveValue::Set(true),
2547                        scan_id: ActiveValue::Set(update.scan_id as i64),
2548                        ..Default::default()
2549                    })
2550                    .exec(&*tx)
2551                    .await?;
2552            }
2553
2554            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2555            Ok(connection_ids)
2556        })
2557        .await
2558    }
2559
2560    pub async fn update_diagnostic_summary(
2561        &self,
2562        update: &proto::UpdateDiagnosticSummary,
2563        connection: ConnectionId,
2564    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2565        let project_id = ProjectId::from_proto(update.project_id);
2566        let worktree_id = update.worktree_id as i64;
2567        let room_id = self.room_id_for_project(project_id).await?;
2568        self.room_transaction(room_id, |tx| async move {
2569            let summary = update
2570                .summary
2571                .as_ref()
2572                .ok_or_else(|| anyhow!("invalid summary"))?;
2573
2574            // Ensure the update comes from the host.
2575            let project = project::Entity::find_by_id(project_id)
2576                .one(&*tx)
2577                .await?
2578                .ok_or_else(|| anyhow!("no such project"))?;
2579            if project.host_connection()? != connection {
2580                return Err(anyhow!("can't update a project hosted by someone else"))?;
2581            }
2582
2583            // Update summary.
2584            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2585                project_id: ActiveValue::set(project_id),
2586                worktree_id: ActiveValue::set(worktree_id),
2587                path: ActiveValue::set(summary.path.clone()),
2588                language_server_id: ActiveValue::set(summary.language_server_id as i64),
2589                error_count: ActiveValue::set(summary.error_count as i32),
2590                warning_count: ActiveValue::set(summary.warning_count as i32),
2591                ..Default::default()
2592            })
2593            .on_conflict(
2594                OnConflict::columns([
2595                    worktree_diagnostic_summary::Column::ProjectId,
2596                    worktree_diagnostic_summary::Column::WorktreeId,
2597                    worktree_diagnostic_summary::Column::Path,
2598                ])
2599                .update_columns([
2600                    worktree_diagnostic_summary::Column::LanguageServerId,
2601                    worktree_diagnostic_summary::Column::ErrorCount,
2602                    worktree_diagnostic_summary::Column::WarningCount,
2603                ])
2604                .to_owned(),
2605            )
2606            .exec(&*tx)
2607            .await?;
2608
2609            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2610            Ok(connection_ids)
2611        })
2612        .await
2613    }
2614
2615    pub async fn start_language_server(
2616        &self,
2617        update: &proto::StartLanguageServer,
2618        connection: ConnectionId,
2619    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2620        let project_id = ProjectId::from_proto(update.project_id);
2621        let room_id = self.room_id_for_project(project_id).await?;
2622        self.room_transaction(room_id, |tx| async move {
2623            let server = update
2624                .server
2625                .as_ref()
2626                .ok_or_else(|| anyhow!("invalid language server"))?;
2627
2628            // Ensure the update comes from the host.
2629            let project = project::Entity::find_by_id(project_id)
2630                .one(&*tx)
2631                .await?
2632                .ok_or_else(|| anyhow!("no such project"))?;
2633            if project.host_connection()? != connection {
2634                return Err(anyhow!("can't update a project hosted by someone else"))?;
2635            }
2636
2637            // Add the newly-started language server.
2638            language_server::Entity::insert(language_server::ActiveModel {
2639                project_id: ActiveValue::set(project_id),
2640                id: ActiveValue::set(server.id as i64),
2641                name: ActiveValue::set(server.name.clone()),
2642                ..Default::default()
2643            })
2644            .on_conflict(
2645                OnConflict::columns([
2646                    language_server::Column::ProjectId,
2647                    language_server::Column::Id,
2648                ])
2649                .update_column(language_server::Column::Name)
2650                .to_owned(),
2651            )
2652            .exec(&*tx)
2653            .await?;
2654
2655            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2656            Ok(connection_ids)
2657        })
2658        .await
2659    }
2660
2661    pub async fn update_worktree_settings(
2662        &self,
2663        update: &proto::UpdateWorktreeSettings,
2664        connection: ConnectionId,
2665    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2666        let project_id = ProjectId::from_proto(update.project_id);
2667        let room_id = self.room_id_for_project(project_id).await?;
2668        self.room_transaction(room_id, |tx| async move {
2669            // Ensure the update comes from the host.
2670            let project = project::Entity::find_by_id(project_id)
2671                .one(&*tx)
2672                .await?
2673                .ok_or_else(|| anyhow!("no such project"))?;
2674            if project.host_connection()? != connection {
2675                return Err(anyhow!("can't update a project hosted by someone else"))?;
2676            }
2677
2678            if let Some(content) = &update.content {
2679                worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
2680                    project_id: ActiveValue::Set(project_id),
2681                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
2682                    path: ActiveValue::Set(update.path.clone()),
2683                    content: ActiveValue::Set(content.clone()),
2684                })
2685                .on_conflict(
2686                    OnConflict::columns([
2687                        worktree_settings_file::Column::ProjectId,
2688                        worktree_settings_file::Column::WorktreeId,
2689                        worktree_settings_file::Column::Path,
2690                    ])
2691                    .update_column(worktree_settings_file::Column::Content)
2692                    .to_owned(),
2693                )
2694                .exec(&*tx)
2695                .await?;
2696            } else {
2697                worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
2698                    project_id: ActiveValue::Set(project_id),
2699                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
2700                    path: ActiveValue::Set(update.path.clone()),
2701                    ..Default::default()
2702                })
2703                .exec(&*tx)
2704                .await?;
2705            }
2706
2707            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2708            Ok(connection_ids)
2709        })
2710        .await
2711    }
2712
2713    pub async fn join_project(
2714        &self,
2715        project_id: ProjectId,
2716        connection: ConnectionId,
2717    ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2718        let room_id = self.room_id_for_project(project_id).await?;
2719        self.room_transaction(room_id, |tx| async move {
2720            let participant = room_participant::Entity::find()
2721                .filter(
2722                    Condition::all()
2723                        .add(
2724                            room_participant::Column::AnsweringConnectionId
2725                                .eq(connection.id as i32),
2726                        )
2727                        .add(
2728                            room_participant::Column::AnsweringConnectionServerId
2729                                .eq(connection.owner_id as i32),
2730                        ),
2731                )
2732                .one(&*tx)
2733                .await?
2734                .ok_or_else(|| anyhow!("must join a room first"))?;
2735
2736            let project = project::Entity::find_by_id(project_id)
2737                .one(&*tx)
2738                .await?
2739                .ok_or_else(|| anyhow!("no such project"))?;
2740            if project.room_id != participant.room_id {
2741                return Err(anyhow!("no such project"))?;
2742            }
2743
2744            let mut collaborators = project
2745                .find_related(project_collaborator::Entity)
2746                .all(&*tx)
2747                .await?;
2748            let replica_ids = collaborators
2749                .iter()
2750                .map(|c| c.replica_id)
2751                .collect::<HashSet<_>>();
2752            let mut replica_id = ReplicaId(1);
2753            while replica_ids.contains(&replica_id) {
2754                replica_id.0 += 1;
2755            }
2756            let new_collaborator = project_collaborator::ActiveModel {
2757                project_id: ActiveValue::set(project_id),
2758                connection_id: ActiveValue::set(connection.id as i32),
2759                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2760                user_id: ActiveValue::set(participant.user_id),
2761                replica_id: ActiveValue::set(replica_id),
2762                is_host: ActiveValue::set(false),
2763                ..Default::default()
2764            }
2765            .insert(&*tx)
2766            .await?;
2767            collaborators.push(new_collaborator);
2768
2769            let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2770            let mut worktrees = db_worktrees
2771                .into_iter()
2772                .map(|db_worktree| {
2773                    (
2774                        db_worktree.id as u64,
2775                        Worktree {
2776                            id: db_worktree.id as u64,
2777                            abs_path: db_worktree.abs_path,
2778                            root_name: db_worktree.root_name,
2779                            visible: db_worktree.visible,
2780                            entries: Default::default(),
2781                            repository_entries: Default::default(),
2782                            diagnostic_summaries: Default::default(),
2783                            settings_files: Default::default(),
2784                            scan_id: db_worktree.scan_id as u64,
2785                            completed_scan_id: db_worktree.completed_scan_id as u64,
2786                        },
2787                    )
2788                })
2789                .collect::<BTreeMap<_, _>>();
2790
2791            // Populate worktree entries.
2792            {
2793                let mut db_entries = worktree_entry::Entity::find()
2794                    .filter(
2795                        Condition::all()
2796                            .add(worktree_entry::Column::ProjectId.eq(project_id))
2797                            .add(worktree_entry::Column::IsDeleted.eq(false)),
2798                    )
2799                    .stream(&*tx)
2800                    .await?;
2801                while let Some(db_entry) = db_entries.next().await {
2802                    let db_entry = db_entry?;
2803                    if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2804                        worktree.entries.push(proto::Entry {
2805                            id: db_entry.id as u64,
2806                            is_dir: db_entry.is_dir,
2807                            path: db_entry.path,
2808                            inode: db_entry.inode as u64,
2809                            mtime: Some(proto::Timestamp {
2810                                seconds: db_entry.mtime_seconds as u64,
2811                                nanos: db_entry.mtime_nanos as u32,
2812                            }),
2813                            is_symlink: db_entry.is_symlink,
2814                            is_ignored: db_entry.is_ignored,
2815                        });
2816                    }
2817                }
2818            }
2819
2820            // Populate repository entries.
2821            {
2822                let mut db_repository_entries = worktree_repository::Entity::find()
2823                    .filter(
2824                        Condition::all()
2825                            .add(worktree_repository::Column::ProjectId.eq(project_id))
2826                            .add(worktree_repository::Column::IsDeleted.eq(false)),
2827                    )
2828                    .stream(&*tx)
2829                    .await?;
2830                while let Some(db_repository_entry) = db_repository_entries.next().await {
2831                    let db_repository_entry = db_repository_entry?;
2832                    if let Some(worktree) =
2833                        worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
2834                    {
2835                        worktree.repository_entries.insert(
2836                            db_repository_entry.work_directory_id as u64,
2837                            proto::RepositoryEntry {
2838                                work_directory_id: db_repository_entry.work_directory_id as u64,
2839                                branch: db_repository_entry.branch,
2840                                removed_repo_paths: Default::default(),
2841                                updated_statuses: Default::default(),
2842                            },
2843                        );
2844                    }
2845                }
2846            }
2847
2848            {
2849                let mut db_status_entries = worktree_repository_statuses::Entity::find()
2850                    .filter(
2851                        Condition::all()
2852                            .add(worktree_repository_statuses::Column::ProjectId.eq(project_id))
2853                            .add(worktree_repository_statuses::Column::IsDeleted.eq(false)),
2854                    )
2855                    .stream(&*tx)
2856                    .await?;
2857
2858                while let Some(db_status_entry) = db_status_entries.next().await {
2859                    let db_status_entry = db_status_entry?;
2860                    if let Some(worktree) = worktrees.get_mut(&(db_status_entry.worktree_id as u64))
2861                    {
2862                        if let Some(repository_entry) = worktree
2863                            .repository_entries
2864                            .get_mut(&(db_status_entry.work_directory_id as u64))
2865                        {
2866                            repository_entry.updated_statuses.push(proto::StatusEntry {
2867                                repo_path: db_status_entry.repo_path,
2868                                status: db_status_entry.status as i32,
2869                            });
2870                        }
2871                    }
2872                }
2873            }
2874
2875            // Populate worktree diagnostic summaries.
2876            {
2877                let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2878                    .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2879                    .stream(&*tx)
2880                    .await?;
2881                while let Some(db_summary) = db_summaries.next().await {
2882                    let db_summary = db_summary?;
2883                    if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2884                        worktree
2885                            .diagnostic_summaries
2886                            .push(proto::DiagnosticSummary {
2887                                path: db_summary.path,
2888                                language_server_id: db_summary.language_server_id as u64,
2889                                error_count: db_summary.error_count as u32,
2890                                warning_count: db_summary.warning_count as u32,
2891                            });
2892                    }
2893                }
2894            }
2895
2896            // Populate worktree settings files
2897            {
2898                let mut db_settings_files = worktree_settings_file::Entity::find()
2899                    .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
2900                    .stream(&*tx)
2901                    .await?;
2902                while let Some(db_settings_file) = db_settings_files.next().await {
2903                    let db_settings_file = db_settings_file?;
2904                    if let Some(worktree) =
2905                        worktrees.get_mut(&(db_settings_file.worktree_id as u64))
2906                    {
2907                        worktree.settings_files.push(WorktreeSettingsFile {
2908                            path: db_settings_file.path,
2909                            content: db_settings_file.content,
2910                        });
2911                    }
2912                }
2913            }
2914
2915            // Populate language servers.
2916            let language_servers = project
2917                .find_related(language_server::Entity)
2918                .all(&*tx)
2919                .await?;
2920
2921            let project = Project {
2922                collaborators: collaborators
2923                    .into_iter()
2924                    .map(|collaborator| ProjectCollaborator {
2925                        connection_id: collaborator.connection(),
2926                        user_id: collaborator.user_id,
2927                        replica_id: collaborator.replica_id,
2928                        is_host: collaborator.is_host,
2929                    })
2930                    .collect(),
2931                worktrees,
2932                language_servers: language_servers
2933                    .into_iter()
2934                    .map(|language_server| proto::LanguageServer {
2935                        id: language_server.id as u64,
2936                        name: language_server.name,
2937                    })
2938                    .collect(),
2939            };
2940            Ok((project, replica_id as ReplicaId))
2941        })
2942        .await
2943    }
2944
2945    pub async fn leave_project(
2946        &self,
2947        project_id: ProjectId,
2948        connection: ConnectionId,
2949    ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
2950        let room_id = self.room_id_for_project(project_id).await?;
2951        self.room_transaction(room_id, |tx| async move {
2952            let result = project_collaborator::Entity::delete_many()
2953                .filter(
2954                    Condition::all()
2955                        .add(project_collaborator::Column::ProjectId.eq(project_id))
2956                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2957                        .add(
2958                            project_collaborator::Column::ConnectionServerId
2959                                .eq(connection.owner_id as i32),
2960                        ),
2961                )
2962                .exec(&*tx)
2963                .await?;
2964            if result.rows_affected == 0 {
2965                Err(anyhow!("not a collaborator on this project"))?;
2966            }
2967
2968            let project = project::Entity::find_by_id(project_id)
2969                .one(&*tx)
2970                .await?
2971                .ok_or_else(|| anyhow!("no such project"))?;
2972            let collaborators = project
2973                .find_related(project_collaborator::Entity)
2974                .all(&*tx)
2975                .await?;
2976            let connection_ids = collaborators
2977                .into_iter()
2978                .map(|collaborator| collaborator.connection())
2979                .collect();
2980
2981            follower::Entity::delete_many()
2982                .filter(
2983                    Condition::any()
2984                        .add(
2985                            Condition::all()
2986                                .add(follower::Column::ProjectId.eq(project_id))
2987                                .add(
2988                                    follower::Column::LeaderConnectionServerId
2989                                        .eq(connection.owner_id),
2990                                )
2991                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
2992                        )
2993                        .add(
2994                            Condition::all()
2995                                .add(follower::Column::ProjectId.eq(project_id))
2996                                .add(
2997                                    follower::Column::FollowerConnectionServerId
2998                                        .eq(connection.owner_id),
2999                                )
3000                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
3001                        ),
3002                )
3003                .exec(&*tx)
3004                .await?;
3005
3006            let room = self.get_room(project.room_id, &tx).await?;
3007            let left_project = LeftProject {
3008                id: project_id,
3009                host_user_id: project.host_user_id,
3010                host_connection_id: project.host_connection()?,
3011                connection_ids,
3012            };
3013            Ok((room, left_project))
3014        })
3015        .await
3016    }
3017
3018    pub async fn project_collaborators(
3019        &self,
3020        project_id: ProjectId,
3021        connection_id: ConnectionId,
3022    ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
3023        let room_id = self.room_id_for_project(project_id).await?;
3024        self.room_transaction(room_id, |tx| async move {
3025            let collaborators = project_collaborator::Entity::find()
3026                .filter(project_collaborator::Column::ProjectId.eq(project_id))
3027                .all(&*tx)
3028                .await?
3029                .into_iter()
3030                .map(|collaborator| ProjectCollaborator {
3031                    connection_id: collaborator.connection(),
3032                    user_id: collaborator.user_id,
3033                    replica_id: collaborator.replica_id,
3034                    is_host: collaborator.is_host,
3035                })
3036                .collect::<Vec<_>>();
3037
3038            if collaborators
3039                .iter()
3040                .any(|collaborator| collaborator.connection_id == connection_id)
3041            {
3042                Ok(collaborators)
3043            } else {
3044                Err(anyhow!("no such project"))?
3045            }
3046        })
3047        .await
3048    }
3049
3050    pub async fn project_connection_ids(
3051        &self,
3052        project_id: ProjectId,
3053        connection_id: ConnectionId,
3054    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
3055        let room_id = self.room_id_for_project(project_id).await?;
3056        self.room_transaction(room_id, |tx| async move {
3057            let mut collaborators = project_collaborator::Entity::find()
3058                .filter(project_collaborator::Column::ProjectId.eq(project_id))
3059                .stream(&*tx)
3060                .await?;
3061
3062            let mut connection_ids = HashSet::default();
3063            while let Some(collaborator) = collaborators.next().await {
3064                let collaborator = collaborator?;
3065                connection_ids.insert(collaborator.connection());
3066            }
3067
3068            if connection_ids.contains(&connection_id) {
3069                Ok(connection_ids)
3070            } else {
3071                Err(anyhow!("no such project"))?
3072            }
3073        })
3074        .await
3075    }
3076
3077    async fn project_guest_connection_ids(
3078        &self,
3079        project_id: ProjectId,
3080        tx: &DatabaseTransaction,
3081    ) -> Result<Vec<ConnectionId>> {
3082        let mut collaborators = project_collaborator::Entity::find()
3083            .filter(
3084                project_collaborator::Column::ProjectId
3085                    .eq(project_id)
3086                    .and(project_collaborator::Column::IsHost.eq(false)),
3087            )
3088            .stream(tx)
3089            .await?;
3090
3091        let mut guest_connection_ids = Vec::new();
3092        while let Some(collaborator) = collaborators.next().await {
3093            let collaborator = collaborator?;
3094            guest_connection_ids.push(collaborator.connection());
3095        }
3096        Ok(guest_connection_ids)
3097    }
3098
3099    async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
3100        self.transaction(|tx| async move {
3101            let project = project::Entity::find_by_id(project_id)
3102                .one(&*tx)
3103                .await?
3104                .ok_or_else(|| anyhow!("project {} not found", project_id))?;
3105            Ok(project.room_id)
3106        })
3107        .await
3108    }
3109
3110    // access tokens
3111
3112    pub async fn create_access_token(
3113        &self,
3114        user_id: UserId,
3115        access_token_hash: &str,
3116        max_access_token_count: usize,
3117    ) -> Result<AccessTokenId> {
3118        self.transaction(|tx| async {
3119            let tx = tx;
3120
3121            let token = access_token::ActiveModel {
3122                user_id: ActiveValue::set(user_id),
3123                hash: ActiveValue::set(access_token_hash.into()),
3124                ..Default::default()
3125            }
3126            .insert(&*tx)
3127            .await?;
3128
3129            access_token::Entity::delete_many()
3130                .filter(
3131                    access_token::Column::Id.in_subquery(
3132                        Query::select()
3133                            .column(access_token::Column::Id)
3134                            .from(access_token::Entity)
3135                            .and_where(access_token::Column::UserId.eq(user_id))
3136                            .order_by(access_token::Column::Id, sea_orm::Order::Desc)
3137                            .limit(10000)
3138                            .offset(max_access_token_count as u64)
3139                            .to_owned(),
3140                    ),
3141                )
3142                .exec(&*tx)
3143                .await?;
3144            Ok(token.id)
3145        })
3146        .await
3147    }
3148
3149    pub async fn get_access_token(
3150        &self,
3151        access_token_id: AccessTokenId,
3152    ) -> Result<access_token::Model> {
3153        self.transaction(|tx| async move {
3154            Ok(access_token::Entity::find_by_id(access_token_id)
3155                .one(&*tx)
3156                .await?
3157                .ok_or_else(|| anyhow!("no such access token"))?)
3158        })
3159        .await
3160    }
3161
3162    async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
3163    where
3164        F: Send + Fn(TransactionHandle) -> Fut,
3165        Fut: Send + Future<Output = Result<T>>,
3166    {
3167        let body = async {
3168            let mut i = 0;
3169            loop {
3170                let (tx, result) = self.with_transaction(&f).await?;
3171                match result {
3172                    Ok(result) => match tx.commit().await.map_err(Into::into) {
3173                        Ok(()) => return Ok(result),
3174                        Err(error) => {
3175                            if !self.retry_on_serialization_error(&error, i).await {
3176                                return Err(error);
3177                            }
3178                        }
3179                    },
3180                    Err(error) => {
3181                        tx.rollback().await?;
3182                        if !self.retry_on_serialization_error(&error, i).await {
3183                            return Err(error);
3184                        }
3185                    }
3186                }
3187                i += 1;
3188            }
3189        };
3190
3191        self.run(body).await
3192    }
3193
3194    async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
3195    where
3196        F: Send + Fn(TransactionHandle) -> Fut,
3197        Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
3198    {
3199        let body = async {
3200            let mut i = 0;
3201            loop {
3202                let (tx, result) = self.with_transaction(&f).await?;
3203                match result {
3204                    Ok(Some((room_id, data))) => {
3205                        let lock = self.rooms.entry(room_id).or_default().clone();
3206                        let _guard = lock.lock_owned().await;
3207                        match tx.commit().await.map_err(Into::into) {
3208                            Ok(()) => {
3209                                return Ok(Some(RoomGuard {
3210                                    data,
3211                                    _guard,
3212                                    _not_send: PhantomData,
3213                                }));
3214                            }
3215                            Err(error) => {
3216                                if !self.retry_on_serialization_error(&error, i).await {
3217                                    return Err(error);
3218                                }
3219                            }
3220                        }
3221                    }
3222                    Ok(None) => match tx.commit().await.map_err(Into::into) {
3223                        Ok(()) => return Ok(None),
3224                        Err(error) => {
3225                            if !self.retry_on_serialization_error(&error, i).await {
3226                                return Err(error);
3227                            }
3228                        }
3229                    },
3230                    Err(error) => {
3231                        tx.rollback().await?;
3232                        if !self.retry_on_serialization_error(&error, i).await {
3233                            return Err(error);
3234                        }
3235                    }
3236                }
3237                i += 1;
3238            }
3239        };
3240
3241        self.run(body).await
3242    }
3243
3244    async fn room_transaction<F, Fut, T>(&self, room_id: RoomId, f: F) -> Result<RoomGuard<T>>
3245    where
3246        F: Send + Fn(TransactionHandle) -> Fut,
3247        Fut: Send + Future<Output = Result<T>>,
3248    {
3249        let body = async {
3250            let mut i = 0;
3251            loop {
3252                let lock = self.rooms.entry(room_id).or_default().clone();
3253                let _guard = lock.lock_owned().await;
3254                let (tx, result) = self.with_transaction(&f).await?;
3255                match result {
3256                    Ok(data) => match tx.commit().await.map_err(Into::into) {
3257                        Ok(()) => {
3258                            return Ok(RoomGuard {
3259                                data,
3260                                _guard,
3261                                _not_send: PhantomData,
3262                            });
3263                        }
3264                        Err(error) => {
3265                            if !self.retry_on_serialization_error(&error, i).await {
3266                                return Err(error);
3267                            }
3268                        }
3269                    },
3270                    Err(error) => {
3271                        tx.rollback().await?;
3272                        if !self.retry_on_serialization_error(&error, i).await {
3273                            return Err(error);
3274                        }
3275                    }
3276                }
3277                i += 1;
3278            }
3279        };
3280
3281        self.run(body).await
3282    }
3283
3284    async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
3285    where
3286        F: Send + Fn(TransactionHandle) -> Fut,
3287        Fut: Send + Future<Output = Result<T>>,
3288    {
3289        let tx = self
3290            .pool
3291            .begin_with_config(Some(IsolationLevel::Serializable), None)
3292            .await?;
3293
3294        let mut tx = Arc::new(Some(tx));
3295        let result = f(TransactionHandle(tx.clone())).await;
3296        let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
3297            return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
3298        };
3299
3300        Ok((tx, result))
3301    }
3302
3303    async fn run<F, T>(&self, future: F) -> Result<T>
3304    where
3305        F: Future<Output = Result<T>>,
3306    {
3307        #[cfg(test)]
3308        {
3309            if let Executor::Deterministic(executor) = &self.executor {
3310                executor.simulate_random_delay().await;
3311            }
3312
3313            self.runtime.as_ref().unwrap().block_on(future)
3314        }
3315
3316        #[cfg(not(test))]
3317        {
3318            future.await
3319        }
3320    }
3321
3322    async fn retry_on_serialization_error(&self, error: &Error, prev_attempt_count: u32) -> bool {
3323        // If the error is due to a failure to serialize concurrent transactions, then retry
3324        // this transaction after a delay. With each subsequent retry, double the delay duration.
3325        // Also vary the delay randomly in order to ensure different database connections retry
3326        // at different times.
3327        if is_serialization_error(error) {
3328            let base_delay = 4_u64 << prev_attempt_count.min(16);
3329            let randomized_delay = base_delay as f32 * self.rng.lock().await.gen_range(0.5..=2.0);
3330            log::info!(
3331                "retrying transaction after serialization error. delay: {} ms.",
3332                randomized_delay
3333            );
3334            self.executor
3335                .sleep(Duration::from_millis(randomized_delay as u64))
3336                .await;
3337            true
3338        } else {
3339            false
3340        }
3341    }
3342}
3343
3344fn is_serialization_error(error: &Error) -> bool {
3345    const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
3346    match error {
3347        Error::Database(
3348            DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
3349            | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
3350        ) if error
3351            .as_database_error()
3352            .and_then(|error| error.code())
3353            .as_deref()
3354            == Some(SERIALIZATION_FAILURE_CODE) =>
3355        {
3356            true
3357        }
3358        _ => false,
3359    }
3360}
3361
3362struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
3363
3364impl Deref for TransactionHandle {
3365    type Target = DatabaseTransaction;
3366
3367    fn deref(&self) -> &Self::Target {
3368        self.0.as_ref().as_ref().unwrap()
3369    }
3370}
3371
3372pub struct RoomGuard<T> {
3373    data: T,
3374    _guard: OwnedMutexGuard<()>,
3375    _not_send: PhantomData<Rc<()>>,
3376}
3377
3378impl<T> Deref for RoomGuard<T> {
3379    type Target = T;
3380
3381    fn deref(&self) -> &T {
3382        &self.data
3383    }
3384}
3385
3386impl<T> DerefMut for RoomGuard<T> {
3387    fn deref_mut(&mut self) -> &mut T {
3388        &mut self.data
3389    }
3390}
3391
3392#[derive(Debug, Serialize, Deserialize)]
3393pub struct NewUserParams {
3394    pub github_login: String,
3395    pub github_user_id: i32,
3396    pub invite_count: i32,
3397}
3398
3399#[derive(Debug)]
3400pub struct NewUserResult {
3401    pub user_id: UserId,
3402    pub metrics_id: String,
3403    pub inviting_user_id: Option<UserId>,
3404    pub signup_device_id: Option<String>,
3405}
3406
3407fn random_invite_code() -> String {
3408    nanoid::nanoid!(16)
3409}
3410
3411fn random_email_confirmation_code() -> String {
3412    nanoid::nanoid!(64)
3413}
3414
3415macro_rules! id_type {
3416    ($name:ident) => {
3417        #[derive(
3418            Clone,
3419            Copy,
3420            Debug,
3421            Default,
3422            PartialEq,
3423            Eq,
3424            PartialOrd,
3425            Ord,
3426            Hash,
3427            Serialize,
3428            Deserialize,
3429        )]
3430        #[serde(transparent)]
3431        pub struct $name(pub i32);
3432
3433        impl $name {
3434            #[allow(unused)]
3435            pub const MAX: Self = Self(i32::MAX);
3436
3437            #[allow(unused)]
3438            pub fn from_proto(value: u64) -> Self {
3439                Self(value as i32)
3440            }
3441
3442            #[allow(unused)]
3443            pub fn to_proto(self) -> u64 {
3444                self.0 as u64
3445            }
3446        }
3447
3448        impl std::fmt::Display for $name {
3449            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
3450                self.0.fmt(f)
3451            }
3452        }
3453
3454        impl From<$name> for sea_query::Value {
3455            fn from(value: $name) -> Self {
3456                sea_query::Value::Int(Some(value.0))
3457            }
3458        }
3459
3460        impl sea_orm::TryGetable for $name {
3461            fn try_get(
3462                res: &sea_orm::QueryResult,
3463                pre: &str,
3464                col: &str,
3465            ) -> Result<Self, sea_orm::TryGetError> {
3466                Ok(Self(i32::try_get(res, pre, col)?))
3467            }
3468        }
3469
3470        impl sea_query::ValueType for $name {
3471            fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
3472                match v {
3473                    Value::TinyInt(Some(int)) => {
3474                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3475                    }
3476                    Value::SmallInt(Some(int)) => {
3477                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3478                    }
3479                    Value::Int(Some(int)) => {
3480                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3481                    }
3482                    Value::BigInt(Some(int)) => {
3483                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3484                    }
3485                    Value::TinyUnsigned(Some(int)) => {
3486                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3487                    }
3488                    Value::SmallUnsigned(Some(int)) => {
3489                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3490                    }
3491                    Value::Unsigned(Some(int)) => {
3492                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3493                    }
3494                    Value::BigUnsigned(Some(int)) => {
3495                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3496                    }
3497                    _ => Err(sea_query::ValueTypeErr),
3498                }
3499            }
3500
3501            fn type_name() -> String {
3502                stringify!($name).into()
3503            }
3504
3505            fn array_type() -> sea_query::ArrayType {
3506                sea_query::ArrayType::Int
3507            }
3508
3509            fn column_type() -> sea_query::ColumnType {
3510                sea_query::ColumnType::Integer(None)
3511            }
3512        }
3513
3514        impl sea_orm::TryFromU64 for $name {
3515            fn try_from_u64(n: u64) -> Result<Self, DbErr> {
3516                Ok(Self(n.try_into().map_err(|_| {
3517                    DbErr::ConvertFromU64(concat!(
3518                        "error converting ",
3519                        stringify!($name),
3520                        " to u64"
3521                    ))
3522                })?))
3523            }
3524        }
3525
3526        impl sea_query::Nullable for $name {
3527            fn null() -> Value {
3528                Value::Int(None)
3529            }
3530        }
3531    };
3532}
3533
3534id_type!(AccessTokenId);
3535id_type!(ContactId);
3536id_type!(FollowerId);
3537id_type!(RoomId);
3538id_type!(RoomParticipantId);
3539id_type!(ProjectId);
3540id_type!(ProjectCollaboratorId);
3541id_type!(ReplicaId);
3542id_type!(ServerId);
3543id_type!(SignupId);
3544id_type!(UserId);
3545
3546pub struct RejoinedRoom {
3547    pub room: proto::Room,
3548    pub rejoined_projects: Vec<RejoinedProject>,
3549    pub reshared_projects: Vec<ResharedProject>,
3550}
3551
3552pub struct ResharedProject {
3553    pub id: ProjectId,
3554    pub old_connection_id: ConnectionId,
3555    pub collaborators: Vec<ProjectCollaborator>,
3556    pub worktrees: Vec<proto::WorktreeMetadata>,
3557}
3558
3559pub struct RejoinedProject {
3560    pub id: ProjectId,
3561    pub old_connection_id: ConnectionId,
3562    pub collaborators: Vec<ProjectCollaborator>,
3563    pub worktrees: Vec<RejoinedWorktree>,
3564    pub language_servers: Vec<proto::LanguageServer>,
3565}
3566
3567#[derive(Debug)]
3568pub struct RejoinedWorktree {
3569    pub id: u64,
3570    pub abs_path: String,
3571    pub root_name: String,
3572    pub visible: bool,
3573    pub updated_entries: Vec<proto::Entry>,
3574    pub removed_entries: Vec<u64>,
3575    pub updated_repositories: Vec<proto::RepositoryEntry>,
3576    pub removed_repositories: Vec<u64>,
3577    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3578    pub settings_files: Vec<WorktreeSettingsFile>,
3579    pub scan_id: u64,
3580    pub completed_scan_id: u64,
3581}
3582
3583pub struct LeftRoom {
3584    pub room: proto::Room,
3585    pub left_projects: HashMap<ProjectId, LeftProject>,
3586    pub canceled_calls_to_user_ids: Vec<UserId>,
3587}
3588
3589pub struct RefreshedRoom {
3590    pub room: proto::Room,
3591    pub stale_participant_user_ids: Vec<UserId>,
3592    pub canceled_calls_to_user_ids: Vec<UserId>,
3593}
3594
3595pub struct Project {
3596    pub collaborators: Vec<ProjectCollaborator>,
3597    pub worktrees: BTreeMap<u64, Worktree>,
3598    pub language_servers: Vec<proto::LanguageServer>,
3599}
3600
3601pub struct ProjectCollaborator {
3602    pub connection_id: ConnectionId,
3603    pub user_id: UserId,
3604    pub replica_id: ReplicaId,
3605    pub is_host: bool,
3606}
3607
3608impl ProjectCollaborator {
3609    pub fn to_proto(&self) -> proto::Collaborator {
3610        proto::Collaborator {
3611            peer_id: Some(self.connection_id.into()),
3612            replica_id: self.replica_id.0 as u32,
3613            user_id: self.user_id.to_proto(),
3614        }
3615    }
3616}
3617
3618#[derive(Debug)]
3619pub struct LeftProject {
3620    pub id: ProjectId,
3621    pub host_user_id: UserId,
3622    pub host_connection_id: ConnectionId,
3623    pub connection_ids: Vec<ConnectionId>,
3624}
3625
3626pub struct Worktree {
3627    pub id: u64,
3628    pub abs_path: String,
3629    pub root_name: String,
3630    pub visible: bool,
3631    pub entries: Vec<proto::Entry>,
3632    pub repository_entries: BTreeMap<u64, proto::RepositoryEntry>,
3633    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3634    pub settings_files: Vec<WorktreeSettingsFile>,
3635    pub scan_id: u64,
3636    pub completed_scan_id: u64,
3637}
3638
3639#[derive(Debug)]
3640pub struct WorktreeSettingsFile {
3641    pub path: String,
3642    pub content: String,
3643}
3644
3645#[cfg(test)]
3646pub use test::*;
3647
3648#[cfg(test)]
3649mod test {
3650    use super::*;
3651    use gpui::executor::Background;
3652    use lazy_static::lazy_static;
3653    use parking_lot::Mutex;
3654    use sea_orm::ConnectionTrait;
3655    use sqlx::migrate::MigrateDatabase;
3656    use std::sync::Arc;
3657
3658    pub struct TestDb {
3659        pub db: Option<Arc<Database>>,
3660        pub connection: Option<sqlx::AnyConnection>,
3661    }
3662
3663    impl TestDb {
3664        pub fn sqlite(background: Arc<Background>) -> Self {
3665            let url = format!("sqlite::memory:");
3666            let runtime = tokio::runtime::Builder::new_current_thread()
3667                .enable_io()
3668                .enable_time()
3669                .build()
3670                .unwrap();
3671
3672            let mut db = runtime.block_on(async {
3673                let mut options = ConnectOptions::new(url);
3674                options.max_connections(5);
3675                let db = Database::new(options, Executor::Deterministic(background))
3676                    .await
3677                    .unwrap();
3678                let sql = include_str!(concat!(
3679                    env!("CARGO_MANIFEST_DIR"),
3680                    "/migrations.sqlite/20221109000000_test_schema.sql"
3681                ));
3682                db.pool
3683                    .execute(sea_orm::Statement::from_string(
3684                        db.pool.get_database_backend(),
3685                        sql.into(),
3686                    ))
3687                    .await
3688                    .unwrap();
3689                db
3690            });
3691
3692            db.runtime = Some(runtime);
3693
3694            Self {
3695                db: Some(Arc::new(db)),
3696                connection: None,
3697            }
3698        }
3699
3700        pub fn postgres(background: Arc<Background>) -> Self {
3701            lazy_static! {
3702                static ref LOCK: Mutex<()> = Mutex::new(());
3703            }
3704
3705            let _guard = LOCK.lock();
3706            let mut rng = StdRng::from_entropy();
3707            let url = format!(
3708                "postgres://postgres@localhost/zed-test-{}",
3709                rng.gen::<u128>()
3710            );
3711            let runtime = tokio::runtime::Builder::new_current_thread()
3712                .enable_io()
3713                .enable_time()
3714                .build()
3715                .unwrap();
3716
3717            let mut db = runtime.block_on(async {
3718                sqlx::Postgres::create_database(&url)
3719                    .await
3720                    .expect("failed to create test db");
3721                let mut options = ConnectOptions::new(url);
3722                options
3723                    .max_connections(5)
3724                    .idle_timeout(Duration::from_secs(0));
3725                let db = Database::new(options, Executor::Deterministic(background))
3726                    .await
3727                    .unwrap();
3728                let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
3729                db.migrate(Path::new(migrations_path), false).await.unwrap();
3730                db
3731            });
3732
3733            db.runtime = Some(runtime);
3734
3735            Self {
3736                db: Some(Arc::new(db)),
3737                connection: None,
3738            }
3739        }
3740
3741        pub fn db(&self) -> &Arc<Database> {
3742            self.db.as_ref().unwrap()
3743        }
3744    }
3745
3746    impl Drop for TestDb {
3747        fn drop(&mut self) {
3748            let db = self.db.take().unwrap();
3749            if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
3750                db.runtime.as_ref().unwrap().block_on(async {
3751                    use util::ResultExt;
3752                    let query = "
3753                        SELECT pg_terminate_backend(pg_stat_activity.pid)
3754                        FROM pg_stat_activity
3755                        WHERE
3756                            pg_stat_activity.datname = current_database() AND
3757                            pid <> pg_backend_pid();
3758                    ";
3759                    db.pool
3760                        .execute(sea_orm::Statement::from_string(
3761                            db.pool.get_database_backend(),
3762                            query.into(),
3763                        ))
3764                        .await
3765                        .log_err();
3766                    sqlx::Postgres::drop_database(db.options.get_url())
3767                        .await
3768                        .log_err();
3769                })
3770            }
3771        }
3772    }
3773}