db.rs

   1mod access_token;
   2mod channel;
   3mod channel_member;
   4mod channel_parent;
   5mod contact;
   6mod follower;
   7mod language_server;
   8mod project;
   9mod project_collaborator;
  10mod room;
  11mod room_participant;
  12mod server;
  13mod signup;
  14#[cfg(test)]
  15mod tests;
  16mod user;
  17mod worktree;
  18mod worktree_diagnostic_summary;
  19mod worktree_entry;
  20mod worktree_repository;
  21mod worktree_repository_statuses;
  22mod worktree_settings_file;
  23
  24use crate::executor::Executor;
  25use crate::{Error, Result};
  26use anyhow::anyhow;
  27use collections::{BTreeMap, HashMap, HashSet};
  28pub use contact::Contact;
  29use dashmap::DashMap;
  30use futures::StreamExt;
  31use hyper::StatusCode;
  32use rand::prelude::StdRng;
  33use rand::{Rng, SeedableRng};
  34use rpc::{proto, ConnectionId};
  35use sea_orm::Condition;
  36pub use sea_orm::ConnectOptions;
  37use sea_orm::{
  38    entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
  39    DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
  40    Statement, TransactionTrait,
  41};
  42use sea_query::{
  43    Alias, ColumnRef, CommonTableExpression, Expr, OnConflict, Order, Query, QueryStatementWriter,
  44    SelectStatement, UnionType, WithClause,
  45};
  46use serde::{Deserialize, Serialize};
  47pub use signup::{Invite, NewSignup, WaitlistSummary};
  48use sqlx::migrate::{Migrate, Migration, MigrationSource};
  49use sqlx::Connection;
  50use std::ops::{Deref, DerefMut};
  51use std::path::Path;
  52use std::time::Duration;
  53use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
  54use tokio::sync::{Mutex, OwnedMutexGuard};
  55pub use user::Model as User;
  56
  57pub struct Database {
  58    options: ConnectOptions,
  59    pool: DatabaseConnection,
  60    rooms: DashMap<RoomId, Arc<Mutex<()>>>,
  61    rng: Mutex<StdRng>,
  62    executor: Executor,
  63    #[cfg(test)]
  64    runtime: Option<tokio::runtime::Runtime>,
  65}
  66
  67impl Database {
  68    pub async fn new(options: ConnectOptions, executor: Executor) -> Result<Self> {
  69        Ok(Self {
  70            options: options.clone(),
  71            pool: sea_orm::Database::connect(options).await?,
  72            rooms: DashMap::with_capacity(16384),
  73            rng: Mutex::new(StdRng::seed_from_u64(0)),
  74            executor,
  75            #[cfg(test)]
  76            runtime: None,
  77        })
  78    }
  79
  80    #[cfg(test)]
  81    pub fn reset(&self) {
  82        self.rooms.clear();
  83    }
  84
  85    pub async fn migrate(
  86        &self,
  87        migrations_path: &Path,
  88        ignore_checksum_mismatch: bool,
  89    ) -> anyhow::Result<Vec<(Migration, Duration)>> {
  90        let migrations = MigrationSource::resolve(migrations_path)
  91            .await
  92            .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
  93
  94        let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
  95
  96        connection.ensure_migrations_table().await?;
  97        let applied_migrations: HashMap<_, _> = connection
  98            .list_applied_migrations()
  99            .await?
 100            .into_iter()
 101            .map(|m| (m.version, m))
 102            .collect();
 103
 104        let mut new_migrations = Vec::new();
 105        for migration in migrations {
 106            match applied_migrations.get(&migration.version) {
 107                Some(applied_migration) => {
 108                    if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
 109                    {
 110                        Err(anyhow!(
 111                            "checksum mismatch for applied migration {}",
 112                            migration.description
 113                        ))?;
 114                    }
 115                }
 116                None => {
 117                    let elapsed = connection.apply(&migration).await?;
 118                    new_migrations.push((migration, elapsed));
 119                }
 120            }
 121        }
 122
 123        Ok(new_migrations)
 124    }
 125
 126    pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
 127        self.transaction(|tx| async move {
 128            let server = server::ActiveModel {
 129                environment: ActiveValue::set(environment.into()),
 130                ..Default::default()
 131            }
 132            .insert(&*tx)
 133            .await?;
 134            Ok(server.id)
 135        })
 136        .await
 137    }
 138
 139    pub async fn stale_room_ids(
 140        &self,
 141        environment: &str,
 142        new_server_id: ServerId,
 143    ) -> Result<Vec<RoomId>> {
 144        self.transaction(|tx| async move {
 145            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 146            enum QueryAs {
 147                RoomId,
 148            }
 149
 150            let stale_server_epochs = self
 151                .stale_server_ids(environment, new_server_id, &tx)
 152                .await?;
 153            Ok(room_participant::Entity::find()
 154                .select_only()
 155                .column(room_participant::Column::RoomId)
 156                .distinct()
 157                .filter(
 158                    room_participant::Column::AnsweringConnectionServerId
 159                        .is_in(stale_server_epochs),
 160                )
 161                .into_values::<_, QueryAs>()
 162                .all(&*tx)
 163                .await?)
 164        })
 165        .await
 166    }
 167
 168    pub async fn refresh_room(
 169        &self,
 170        room_id: RoomId,
 171        new_server_id: ServerId,
 172    ) -> Result<RoomGuard<RefreshedRoom>> {
 173        self.room_transaction(room_id, |tx| async move {
 174            let stale_participant_filter = Condition::all()
 175                .add(room_participant::Column::RoomId.eq(room_id))
 176                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
 177                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
 178
 179            let stale_participant_user_ids = room_participant::Entity::find()
 180                .filter(stale_participant_filter.clone())
 181                .all(&*tx)
 182                .await?
 183                .into_iter()
 184                .map(|participant| participant.user_id)
 185                .collect::<Vec<_>>();
 186
 187            // Delete participants who failed to reconnect and cancel their calls.
 188            let mut canceled_calls_to_user_ids = Vec::new();
 189            room_participant::Entity::delete_many()
 190                .filter(stale_participant_filter)
 191                .exec(&*tx)
 192                .await?;
 193            let called_participants = room_participant::Entity::find()
 194                .filter(
 195                    Condition::all()
 196                        .add(
 197                            room_participant::Column::CallingUserId
 198                                .is_in(stale_participant_user_ids.iter().copied()),
 199                        )
 200                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
 201                )
 202                .all(&*tx)
 203                .await?;
 204            room_participant::Entity::delete_many()
 205                .filter(
 206                    room_participant::Column::Id
 207                        .is_in(called_participants.iter().map(|participant| participant.id)),
 208                )
 209                .exec(&*tx)
 210                .await?;
 211            canceled_calls_to_user_ids.extend(
 212                called_participants
 213                    .into_iter()
 214                    .map(|participant| participant.user_id),
 215            );
 216
 217            let room = self.get_room(room_id, &tx).await?;
 218            // Delete the room if it becomes empty.
 219            if room.participants.is_empty() {
 220                project::Entity::delete_many()
 221                    .filter(project::Column::RoomId.eq(room_id))
 222                    .exec(&*tx)
 223                    .await?;
 224                room::Entity::delete_by_id(room_id).exec(&*tx).await?;
 225            }
 226
 227            Ok(RefreshedRoom {
 228                room,
 229                stale_participant_user_ids,
 230                canceled_calls_to_user_ids,
 231            })
 232        })
 233        .await
 234    }
 235
 236    pub async fn delete_stale_servers(
 237        &self,
 238        environment: &str,
 239        new_server_id: ServerId,
 240    ) -> Result<()> {
 241        self.transaction(|tx| async move {
 242            server::Entity::delete_many()
 243                .filter(
 244                    Condition::all()
 245                        .add(server::Column::Environment.eq(environment))
 246                        .add(server::Column::Id.ne(new_server_id)),
 247                )
 248                .exec(&*tx)
 249                .await?;
 250            Ok(())
 251        })
 252        .await
 253    }
 254
 255    async fn stale_server_ids(
 256        &self,
 257        environment: &str,
 258        new_server_id: ServerId,
 259        tx: &DatabaseTransaction,
 260    ) -> Result<Vec<ServerId>> {
 261        let stale_servers = server::Entity::find()
 262            .filter(
 263                Condition::all()
 264                    .add(server::Column::Environment.eq(environment))
 265                    .add(server::Column::Id.ne(new_server_id)),
 266            )
 267            .all(&*tx)
 268            .await?;
 269        Ok(stale_servers.into_iter().map(|server| server.id).collect())
 270    }
 271
 272    // users
 273
 274    pub async fn create_user(
 275        &self,
 276        email_address: &str,
 277        admin: bool,
 278        params: NewUserParams,
 279    ) -> Result<NewUserResult> {
 280        self.transaction(|tx| async {
 281            let tx = tx;
 282            let user = user::Entity::insert(user::ActiveModel {
 283                email_address: ActiveValue::set(Some(email_address.into())),
 284                github_login: ActiveValue::set(params.github_login.clone()),
 285                github_user_id: ActiveValue::set(Some(params.github_user_id)),
 286                admin: ActiveValue::set(admin),
 287                metrics_id: ActiveValue::set(Uuid::new_v4()),
 288                ..Default::default()
 289            })
 290            .on_conflict(
 291                OnConflict::column(user::Column::GithubLogin)
 292                    .update_column(user::Column::GithubLogin)
 293                    .to_owned(),
 294            )
 295            .exec_with_returning(&*tx)
 296            .await?;
 297
 298            Ok(NewUserResult {
 299                user_id: user.id,
 300                metrics_id: user.metrics_id.to_string(),
 301                signup_device_id: None,
 302                inviting_user_id: None,
 303            })
 304        })
 305        .await
 306    }
 307
 308    pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
 309        self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
 310            .await
 311    }
 312
 313    pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
 314        self.transaction(|tx| async {
 315            let tx = tx;
 316            Ok(user::Entity::find()
 317                .filter(user::Column::Id.is_in(ids.iter().copied()))
 318                .all(&*tx)
 319                .await?)
 320        })
 321        .await
 322    }
 323
 324    pub async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>> {
 325        self.transaction(|tx| async move {
 326            Ok(user::Entity::find()
 327                .filter(user::Column::GithubLogin.eq(github_login))
 328                .one(&*tx)
 329                .await?)
 330        })
 331        .await
 332    }
 333
 334    pub async fn get_or_create_user_by_github_account(
 335        &self,
 336        github_login: &str,
 337        github_user_id: Option<i32>,
 338        github_email: Option<&str>,
 339    ) -> Result<Option<User>> {
 340        self.transaction(|tx| async move {
 341            let tx = &*tx;
 342            if let Some(github_user_id) = github_user_id {
 343                if let Some(user_by_github_user_id) = user::Entity::find()
 344                    .filter(user::Column::GithubUserId.eq(github_user_id))
 345                    .one(tx)
 346                    .await?
 347                {
 348                    let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
 349                    user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
 350                    Ok(Some(user_by_github_user_id.update(tx).await?))
 351                } else if let Some(user_by_github_login) = user::Entity::find()
 352                    .filter(user::Column::GithubLogin.eq(github_login))
 353                    .one(tx)
 354                    .await?
 355                {
 356                    let mut user_by_github_login = user_by_github_login.into_active_model();
 357                    user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
 358                    Ok(Some(user_by_github_login.update(tx).await?))
 359                } else {
 360                    let user = user::Entity::insert(user::ActiveModel {
 361                        email_address: ActiveValue::set(github_email.map(|email| email.into())),
 362                        github_login: ActiveValue::set(github_login.into()),
 363                        github_user_id: ActiveValue::set(Some(github_user_id)),
 364                        admin: ActiveValue::set(false),
 365                        invite_count: ActiveValue::set(0),
 366                        invite_code: ActiveValue::set(None),
 367                        metrics_id: ActiveValue::set(Uuid::new_v4()),
 368                        ..Default::default()
 369                    })
 370                    .exec_with_returning(&*tx)
 371                    .await?;
 372                    Ok(Some(user))
 373                }
 374            } else {
 375                Ok(user::Entity::find()
 376                    .filter(user::Column::GithubLogin.eq(github_login))
 377                    .one(tx)
 378                    .await?)
 379            }
 380        })
 381        .await
 382    }
 383
 384    pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
 385        self.transaction(|tx| async move {
 386            Ok(user::Entity::find()
 387                .order_by_asc(user::Column::GithubLogin)
 388                .limit(limit as u64)
 389                .offset(page as u64 * limit as u64)
 390                .all(&*tx)
 391                .await?)
 392        })
 393        .await
 394    }
 395
 396    pub async fn get_users_with_no_invites(
 397        &self,
 398        invited_by_another_user: bool,
 399    ) -> Result<Vec<User>> {
 400        self.transaction(|tx| async move {
 401            Ok(user::Entity::find()
 402                .filter(
 403                    user::Column::InviteCount
 404                        .eq(0)
 405                        .and(if invited_by_another_user {
 406                            user::Column::InviterId.is_not_null()
 407                        } else {
 408                            user::Column::InviterId.is_null()
 409                        }),
 410                )
 411                .all(&*tx)
 412                .await?)
 413        })
 414        .await
 415    }
 416
 417    pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
 418        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 419        enum QueryAs {
 420            MetricsId,
 421        }
 422
 423        self.transaction(|tx| async move {
 424            let metrics_id: Uuid = user::Entity::find_by_id(id)
 425                .select_only()
 426                .column(user::Column::MetricsId)
 427                .into_values::<_, QueryAs>()
 428                .one(&*tx)
 429                .await?
 430                .ok_or_else(|| anyhow!("could not find user"))?;
 431            Ok(metrics_id.to_string())
 432        })
 433        .await
 434    }
 435
 436    pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
 437        self.transaction(|tx| async move {
 438            user::Entity::update_many()
 439                .filter(user::Column::Id.eq(id))
 440                .set(user::ActiveModel {
 441                    admin: ActiveValue::set(is_admin),
 442                    ..Default::default()
 443                })
 444                .exec(&*tx)
 445                .await?;
 446            Ok(())
 447        })
 448        .await
 449    }
 450
 451    pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
 452        self.transaction(|tx| async move {
 453            user::Entity::update_many()
 454                .filter(user::Column::Id.eq(id))
 455                .set(user::ActiveModel {
 456                    connected_once: ActiveValue::set(connected_once),
 457                    ..Default::default()
 458                })
 459                .exec(&*tx)
 460                .await?;
 461            Ok(())
 462        })
 463        .await
 464    }
 465
 466    pub async fn destroy_user(&self, id: UserId) -> Result<()> {
 467        self.transaction(|tx| async move {
 468            access_token::Entity::delete_many()
 469                .filter(access_token::Column::UserId.eq(id))
 470                .exec(&*tx)
 471                .await?;
 472            user::Entity::delete_by_id(id).exec(&*tx).await?;
 473            Ok(())
 474        })
 475        .await
 476    }
 477
 478    // contacts
 479
 480    pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
 481        #[derive(Debug, FromQueryResult)]
 482        struct ContactWithUserBusyStatuses {
 483            user_id_a: UserId,
 484            user_id_b: UserId,
 485            a_to_b: bool,
 486            accepted: bool,
 487            should_notify: bool,
 488            user_a_busy: bool,
 489            user_b_busy: bool,
 490        }
 491
 492        self.transaction(|tx| async move {
 493            let user_a_participant = Alias::new("user_a_participant");
 494            let user_b_participant = Alias::new("user_b_participant");
 495            let mut db_contacts = contact::Entity::find()
 496                .column_as(
 497                    Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
 498                        .is_not_null(),
 499                    "user_a_busy",
 500                )
 501                .column_as(
 502                    Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
 503                        .is_not_null(),
 504                    "user_b_busy",
 505                )
 506                .filter(
 507                    contact::Column::UserIdA
 508                        .eq(user_id)
 509                        .or(contact::Column::UserIdB.eq(user_id)),
 510                )
 511                .join_as(
 512                    JoinType::LeftJoin,
 513                    contact::Relation::UserARoomParticipant.def(),
 514                    user_a_participant,
 515                )
 516                .join_as(
 517                    JoinType::LeftJoin,
 518                    contact::Relation::UserBRoomParticipant.def(),
 519                    user_b_participant,
 520                )
 521                .into_model::<ContactWithUserBusyStatuses>()
 522                .stream(&*tx)
 523                .await?;
 524
 525            let mut contacts = Vec::new();
 526            while let Some(db_contact) = db_contacts.next().await {
 527                let db_contact = db_contact?;
 528                if db_contact.user_id_a == user_id {
 529                    if db_contact.accepted {
 530                        contacts.push(Contact::Accepted {
 531                            user_id: db_contact.user_id_b,
 532                            should_notify: db_contact.should_notify && db_contact.a_to_b,
 533                            busy: db_contact.user_b_busy,
 534                        });
 535                    } else if db_contact.a_to_b {
 536                        contacts.push(Contact::Outgoing {
 537                            user_id: db_contact.user_id_b,
 538                        })
 539                    } else {
 540                        contacts.push(Contact::Incoming {
 541                            user_id: db_contact.user_id_b,
 542                            should_notify: db_contact.should_notify,
 543                        });
 544                    }
 545                } else if db_contact.accepted {
 546                    contacts.push(Contact::Accepted {
 547                        user_id: db_contact.user_id_a,
 548                        should_notify: db_contact.should_notify && !db_contact.a_to_b,
 549                        busy: db_contact.user_a_busy,
 550                    });
 551                } else if db_contact.a_to_b {
 552                    contacts.push(Contact::Incoming {
 553                        user_id: db_contact.user_id_a,
 554                        should_notify: db_contact.should_notify,
 555                    });
 556                } else {
 557                    contacts.push(Contact::Outgoing {
 558                        user_id: db_contact.user_id_a,
 559                    });
 560                }
 561            }
 562
 563            contacts.sort_unstable_by_key(|contact| contact.user_id());
 564
 565            Ok(contacts)
 566        })
 567        .await
 568    }
 569
 570    pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
 571        self.transaction(|tx| async move {
 572            let participant = room_participant::Entity::find()
 573                .filter(room_participant::Column::UserId.eq(user_id))
 574                .one(&*tx)
 575                .await?;
 576            Ok(participant.is_some())
 577        })
 578        .await
 579    }
 580
 581    pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
 582        self.transaction(|tx| async move {
 583            let (id_a, id_b) = if user_id_1 < user_id_2 {
 584                (user_id_1, user_id_2)
 585            } else {
 586                (user_id_2, user_id_1)
 587            };
 588
 589            Ok(contact::Entity::find()
 590                .filter(
 591                    contact::Column::UserIdA
 592                        .eq(id_a)
 593                        .and(contact::Column::UserIdB.eq(id_b))
 594                        .and(contact::Column::Accepted.eq(true)),
 595                )
 596                .one(&*tx)
 597                .await?
 598                .is_some())
 599        })
 600        .await
 601    }
 602
 603    pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
 604        self.transaction(|tx| async move {
 605            let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
 606                (sender_id, receiver_id, true)
 607            } else {
 608                (receiver_id, sender_id, false)
 609            };
 610
 611            let rows_affected = contact::Entity::insert(contact::ActiveModel {
 612                user_id_a: ActiveValue::set(id_a),
 613                user_id_b: ActiveValue::set(id_b),
 614                a_to_b: ActiveValue::set(a_to_b),
 615                accepted: ActiveValue::set(false),
 616                should_notify: ActiveValue::set(true),
 617                ..Default::default()
 618            })
 619            .on_conflict(
 620                OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
 621                    .values([
 622                        (contact::Column::Accepted, true.into()),
 623                        (contact::Column::ShouldNotify, false.into()),
 624                    ])
 625                    .action_and_where(
 626                        contact::Column::Accepted.eq(false).and(
 627                            contact::Column::AToB
 628                                .eq(a_to_b)
 629                                .and(contact::Column::UserIdA.eq(id_b))
 630                                .or(contact::Column::AToB
 631                                    .ne(a_to_b)
 632                                    .and(contact::Column::UserIdA.eq(id_a))),
 633                        ),
 634                    )
 635                    .to_owned(),
 636            )
 637            .exec_without_returning(&*tx)
 638            .await?;
 639
 640            if rows_affected == 1 {
 641                Ok(())
 642            } else {
 643                Err(anyhow!("contact already requested"))?
 644            }
 645        })
 646        .await
 647    }
 648
 649    /// Returns a bool indicating whether the removed contact had originally accepted or not
 650    ///
 651    /// Deletes the contact identified by the requester and responder ids, and then returns
 652    /// whether the deleted contact had originally accepted or was a pending contact request.
 653    ///
 654    /// # Arguments
 655    ///
 656    /// * `requester_id` - The user that initiates this request
 657    /// * `responder_id` - The user that will be removed
 658    pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<bool> {
 659        self.transaction(|tx| async move {
 660            let (id_a, id_b) = if responder_id < requester_id {
 661                (responder_id, requester_id)
 662            } else {
 663                (requester_id, responder_id)
 664            };
 665
 666            let contact = contact::Entity::find()
 667                .filter(
 668                    contact::Column::UserIdA
 669                        .eq(id_a)
 670                        .and(contact::Column::UserIdB.eq(id_b)),
 671                )
 672                .one(&*tx)
 673                .await?
 674                .ok_or_else(|| anyhow!("no such contact"))?;
 675
 676            contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
 677            Ok(contact.accepted)
 678        })
 679        .await
 680    }
 681
 682    pub async fn dismiss_contact_notification(
 683        &self,
 684        user_id: UserId,
 685        contact_user_id: UserId,
 686    ) -> Result<()> {
 687        self.transaction(|tx| async move {
 688            let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
 689                (user_id, contact_user_id, true)
 690            } else {
 691                (contact_user_id, user_id, false)
 692            };
 693
 694            let result = contact::Entity::update_many()
 695                .set(contact::ActiveModel {
 696                    should_notify: ActiveValue::set(false),
 697                    ..Default::default()
 698                })
 699                .filter(
 700                    contact::Column::UserIdA
 701                        .eq(id_a)
 702                        .and(contact::Column::UserIdB.eq(id_b))
 703                        .and(
 704                            contact::Column::AToB
 705                                .eq(a_to_b)
 706                                .and(contact::Column::Accepted.eq(true))
 707                                .or(contact::Column::AToB
 708                                    .ne(a_to_b)
 709                                    .and(contact::Column::Accepted.eq(false))),
 710                        ),
 711                )
 712                .exec(&*tx)
 713                .await?;
 714            if result.rows_affected == 0 {
 715                Err(anyhow!("no such contact request"))?
 716            } else {
 717                Ok(())
 718            }
 719        })
 720        .await
 721    }
 722
 723    pub async fn respond_to_contact_request(
 724        &self,
 725        responder_id: UserId,
 726        requester_id: UserId,
 727        accept: bool,
 728    ) -> Result<()> {
 729        self.transaction(|tx| async move {
 730            let (id_a, id_b, a_to_b) = if responder_id < requester_id {
 731                (responder_id, requester_id, false)
 732            } else {
 733                (requester_id, responder_id, true)
 734            };
 735            let rows_affected = if accept {
 736                let result = contact::Entity::update_many()
 737                    .set(contact::ActiveModel {
 738                        accepted: ActiveValue::set(true),
 739                        should_notify: ActiveValue::set(true),
 740                        ..Default::default()
 741                    })
 742                    .filter(
 743                        contact::Column::UserIdA
 744                            .eq(id_a)
 745                            .and(contact::Column::UserIdB.eq(id_b))
 746                            .and(contact::Column::AToB.eq(a_to_b)),
 747                    )
 748                    .exec(&*tx)
 749                    .await?;
 750                result.rows_affected
 751            } else {
 752                let result = contact::Entity::delete_many()
 753                    .filter(
 754                        contact::Column::UserIdA
 755                            .eq(id_a)
 756                            .and(contact::Column::UserIdB.eq(id_b))
 757                            .and(contact::Column::AToB.eq(a_to_b))
 758                            .and(contact::Column::Accepted.eq(false)),
 759                    )
 760                    .exec(&*tx)
 761                    .await?;
 762
 763                result.rows_affected
 764            };
 765
 766            if rows_affected == 1 {
 767                Ok(())
 768            } else {
 769                Err(anyhow!("no such contact request"))?
 770            }
 771        })
 772        .await
 773    }
 774
 775    pub fn fuzzy_like_string(string: &str) -> String {
 776        let mut result = String::with_capacity(string.len() * 2 + 1);
 777        for c in string.chars() {
 778            if c.is_alphanumeric() {
 779                result.push('%');
 780                result.push(c);
 781            }
 782        }
 783        result.push('%');
 784        result
 785    }
 786
 787    pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
 788        self.transaction(|tx| async {
 789            let tx = tx;
 790            let like_string = Self::fuzzy_like_string(name_query);
 791            let query = "
 792                SELECT users.*
 793                FROM users
 794                WHERE github_login ILIKE $1
 795                ORDER BY github_login <-> $2
 796                LIMIT $3
 797            ";
 798
 799            Ok(user::Entity::find()
 800                .from_raw_sql(Statement::from_sql_and_values(
 801                    self.pool.get_database_backend(),
 802                    query.into(),
 803                    vec![like_string.into(), name_query.into(), limit.into()],
 804                ))
 805                .all(&*tx)
 806                .await?)
 807        })
 808        .await
 809    }
 810
 811    // signups
 812
 813    pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
 814        self.transaction(|tx| async move {
 815            signup::Entity::insert(signup::ActiveModel {
 816                email_address: ActiveValue::set(signup.email_address.clone()),
 817                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 818                email_confirmation_sent: ActiveValue::set(false),
 819                platform_mac: ActiveValue::set(signup.platform_mac),
 820                platform_windows: ActiveValue::set(signup.platform_windows),
 821                platform_linux: ActiveValue::set(signup.platform_linux),
 822                platform_unknown: ActiveValue::set(false),
 823                editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
 824                programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
 825                device_id: ActiveValue::set(signup.device_id.clone()),
 826                added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
 827                ..Default::default()
 828            })
 829            .on_conflict(
 830                OnConflict::column(signup::Column::EmailAddress)
 831                    .update_columns([
 832                        signup::Column::PlatformMac,
 833                        signup::Column::PlatformWindows,
 834                        signup::Column::PlatformLinux,
 835                        signup::Column::EditorFeatures,
 836                        signup::Column::ProgrammingLanguages,
 837                        signup::Column::DeviceId,
 838                        signup::Column::AddedToMailingList,
 839                    ])
 840                    .to_owned(),
 841            )
 842            .exec(&*tx)
 843            .await?;
 844            Ok(())
 845        })
 846        .await
 847    }
 848
 849    pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
 850        self.transaction(|tx| async move {
 851            let signup = signup::Entity::find()
 852                .filter(signup::Column::EmailAddress.eq(email_address))
 853                .one(&*tx)
 854                .await?
 855                .ok_or_else(|| {
 856                    anyhow!("signup with email address {} doesn't exist", email_address)
 857                })?;
 858
 859            Ok(signup)
 860        })
 861        .await
 862    }
 863
 864    pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
 865        self.transaction(|tx| async move {
 866            let query = "
 867                SELECT
 868                    COUNT(*) as count,
 869                    COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
 870                    COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
 871                    COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
 872                    COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
 873                FROM (
 874                    SELECT *
 875                    FROM signups
 876                    WHERE
 877                        NOT email_confirmation_sent
 878                ) AS unsent
 879            ";
 880            Ok(
 881                WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
 882                    self.pool.get_database_backend(),
 883                    query.into(),
 884                    vec![],
 885                ))
 886                .one(&*tx)
 887                .await?
 888                .ok_or_else(|| anyhow!("invalid result"))?,
 889            )
 890        })
 891        .await
 892    }
 893
 894    pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
 895        let emails = invites
 896            .iter()
 897            .map(|s| s.email_address.as_str())
 898            .collect::<Vec<_>>();
 899        self.transaction(|tx| async {
 900            let tx = tx;
 901            signup::Entity::update_many()
 902                .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
 903                .set(signup::ActiveModel {
 904                    email_confirmation_sent: ActiveValue::set(true),
 905                    ..Default::default()
 906                })
 907                .exec(&*tx)
 908                .await?;
 909            Ok(())
 910        })
 911        .await
 912    }
 913
 914    pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
 915        self.transaction(|tx| async move {
 916            Ok(signup::Entity::find()
 917                .select_only()
 918                .column(signup::Column::EmailAddress)
 919                .column(signup::Column::EmailConfirmationCode)
 920                .filter(
 921                    signup::Column::EmailConfirmationSent.eq(false).and(
 922                        signup::Column::PlatformMac
 923                            .eq(true)
 924                            .or(signup::Column::PlatformUnknown.eq(true)),
 925                    ),
 926                )
 927                .order_by_asc(signup::Column::CreatedAt)
 928                .limit(count as u64)
 929                .into_model()
 930                .all(&*tx)
 931                .await?)
 932        })
 933        .await
 934    }
 935
 936    // invite codes
 937
 938    pub async fn create_invite_from_code(
 939        &self,
 940        code: &str,
 941        email_address: &str,
 942        device_id: Option<&str>,
 943        added_to_mailing_list: bool,
 944    ) -> Result<Invite> {
 945        self.transaction(|tx| async move {
 946            let existing_user = user::Entity::find()
 947                .filter(user::Column::EmailAddress.eq(email_address))
 948                .one(&*tx)
 949                .await?;
 950
 951            if existing_user.is_some() {
 952                Err(anyhow!("email address is already in use"))?;
 953            }
 954
 955            let inviting_user_with_invites = match user::Entity::find()
 956                .filter(
 957                    user::Column::InviteCode
 958                        .eq(code)
 959                        .and(user::Column::InviteCount.gt(0)),
 960                )
 961                .one(&*tx)
 962                .await?
 963            {
 964                Some(inviting_user) => inviting_user,
 965                None => {
 966                    return Err(Error::Http(
 967                        StatusCode::UNAUTHORIZED,
 968                        "unable to find an invite code with invites remaining".to_string(),
 969                    ))?
 970                }
 971            };
 972            user::Entity::update_many()
 973                .filter(
 974                    user::Column::Id
 975                        .eq(inviting_user_with_invites.id)
 976                        .and(user::Column::InviteCount.gt(0)),
 977                )
 978                .col_expr(
 979                    user::Column::InviteCount,
 980                    Expr::col(user::Column::InviteCount).sub(1),
 981                )
 982                .exec(&*tx)
 983                .await?;
 984
 985            let signup = signup::Entity::insert(signup::ActiveModel {
 986                email_address: ActiveValue::set(email_address.into()),
 987                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 988                email_confirmation_sent: ActiveValue::set(false),
 989                inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
 990                platform_linux: ActiveValue::set(false),
 991                platform_mac: ActiveValue::set(false),
 992                platform_windows: ActiveValue::set(false),
 993                platform_unknown: ActiveValue::set(true),
 994                device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
 995                added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
 996                ..Default::default()
 997            })
 998            .on_conflict(
 999                OnConflict::column(signup::Column::EmailAddress)
1000                    .update_column(signup::Column::InvitingUserId)
1001                    .to_owned(),
1002            )
1003            .exec_with_returning(&*tx)
1004            .await?;
1005
1006            Ok(Invite {
1007                email_address: signup.email_address,
1008                email_confirmation_code: signup.email_confirmation_code,
1009            })
1010        })
1011        .await
1012    }
1013
1014    pub async fn create_user_from_invite(
1015        &self,
1016        invite: &Invite,
1017        user: NewUserParams,
1018    ) -> Result<Option<NewUserResult>> {
1019        self.transaction(|tx| async {
1020            let tx = tx;
1021            let signup = signup::Entity::find()
1022                .filter(
1023                    signup::Column::EmailAddress
1024                        .eq(invite.email_address.as_str())
1025                        .and(
1026                            signup::Column::EmailConfirmationCode
1027                                .eq(invite.email_confirmation_code.as_str()),
1028                        ),
1029                )
1030                .one(&*tx)
1031                .await?
1032                .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
1033
1034            if signup.user_id.is_some() {
1035                return Ok(None);
1036            }
1037
1038            let user = user::Entity::insert(user::ActiveModel {
1039                email_address: ActiveValue::set(Some(invite.email_address.clone())),
1040                github_login: ActiveValue::set(user.github_login.clone()),
1041                github_user_id: ActiveValue::set(Some(user.github_user_id)),
1042                admin: ActiveValue::set(false),
1043                invite_count: ActiveValue::set(user.invite_count),
1044                invite_code: ActiveValue::set(Some(random_invite_code())),
1045                metrics_id: ActiveValue::set(Uuid::new_v4()),
1046                ..Default::default()
1047            })
1048            .on_conflict(
1049                OnConflict::column(user::Column::GithubLogin)
1050                    .update_columns([
1051                        user::Column::EmailAddress,
1052                        user::Column::GithubUserId,
1053                        user::Column::Admin,
1054                    ])
1055                    .to_owned(),
1056            )
1057            .exec_with_returning(&*tx)
1058            .await?;
1059
1060            let mut signup = signup.into_active_model();
1061            signup.user_id = ActiveValue::set(Some(user.id));
1062            let signup = signup.update(&*tx).await?;
1063
1064            if let Some(inviting_user_id) = signup.inviting_user_id {
1065                let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1066                    (inviting_user_id, user.id, true)
1067                } else {
1068                    (user.id, inviting_user_id, false)
1069                };
1070
1071                contact::Entity::insert(contact::ActiveModel {
1072                    user_id_a: ActiveValue::set(user_id_a),
1073                    user_id_b: ActiveValue::set(user_id_b),
1074                    a_to_b: ActiveValue::set(a_to_b),
1075                    should_notify: ActiveValue::set(true),
1076                    accepted: ActiveValue::set(true),
1077                    ..Default::default()
1078                })
1079                .on_conflict(OnConflict::new().do_nothing().to_owned())
1080                .exec_without_returning(&*tx)
1081                .await?;
1082            }
1083
1084            Ok(Some(NewUserResult {
1085                user_id: user.id,
1086                metrics_id: user.metrics_id.to_string(),
1087                inviting_user_id: signup.inviting_user_id,
1088                signup_device_id: signup.device_id,
1089            }))
1090        })
1091        .await
1092    }
1093
1094    pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1095        self.transaction(|tx| async move {
1096            if count > 0 {
1097                user::Entity::update_many()
1098                    .filter(
1099                        user::Column::Id
1100                            .eq(id)
1101                            .and(user::Column::InviteCode.is_null()),
1102                    )
1103                    .set(user::ActiveModel {
1104                        invite_code: ActiveValue::set(Some(random_invite_code())),
1105                        ..Default::default()
1106                    })
1107                    .exec(&*tx)
1108                    .await?;
1109            }
1110
1111            user::Entity::update_many()
1112                .filter(user::Column::Id.eq(id))
1113                .set(user::ActiveModel {
1114                    invite_count: ActiveValue::set(count),
1115                    ..Default::default()
1116                })
1117                .exec(&*tx)
1118                .await?;
1119            Ok(())
1120        })
1121        .await
1122    }
1123
1124    pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1125        self.transaction(|tx| async move {
1126            match user::Entity::find_by_id(id).one(&*tx).await? {
1127                Some(user) if user.invite_code.is_some() => {
1128                    Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1129                }
1130                _ => Ok(None),
1131            }
1132        })
1133        .await
1134    }
1135
1136    pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1137        self.transaction(|tx| async move {
1138            user::Entity::find()
1139                .filter(user::Column::InviteCode.eq(code))
1140                .one(&*tx)
1141                .await?
1142                .ok_or_else(|| {
1143                    Error::Http(
1144                        StatusCode::NOT_FOUND,
1145                        "that invite code does not exist".to_string(),
1146                    )
1147                })
1148        })
1149        .await
1150    }
1151
1152    // rooms
1153
1154    pub async fn incoming_call_for_user(
1155        &self,
1156        user_id: UserId,
1157    ) -> Result<Option<proto::IncomingCall>> {
1158        self.transaction(|tx| async move {
1159            let pending_participant = room_participant::Entity::find()
1160                .filter(
1161                    room_participant::Column::UserId
1162                        .eq(user_id)
1163                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1164                )
1165                .one(&*tx)
1166                .await?;
1167
1168            if let Some(pending_participant) = pending_participant {
1169                let room = self.get_room(pending_participant.room_id, &tx).await?;
1170                Ok(Self::build_incoming_call(&room, user_id))
1171            } else {
1172                Ok(None)
1173            }
1174        })
1175        .await
1176    }
1177
1178    pub async fn create_room(
1179        &self,
1180        user_id: UserId,
1181        connection: ConnectionId,
1182        live_kit_room: &str,
1183    ) -> Result<proto::Room> {
1184        self.transaction(|tx| async move {
1185            let room = room::ActiveModel {
1186                live_kit_room: ActiveValue::set(live_kit_room.into()),
1187                ..Default::default()
1188            }
1189            .insert(&*tx)
1190            .await?;
1191            room_participant::ActiveModel {
1192                room_id: ActiveValue::set(room.id),
1193                user_id: ActiveValue::set(user_id),
1194                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1195                answering_connection_server_id: ActiveValue::set(Some(ServerId(
1196                    connection.owner_id as i32,
1197                ))),
1198                answering_connection_lost: ActiveValue::set(false),
1199                calling_user_id: ActiveValue::set(user_id),
1200                calling_connection_id: ActiveValue::set(connection.id as i32),
1201                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1202                    connection.owner_id as i32,
1203                ))),
1204                ..Default::default()
1205            }
1206            .insert(&*tx)
1207            .await?;
1208
1209            let room = self.get_room(room.id, &tx).await?;
1210            Ok(room)
1211        })
1212        .await
1213    }
1214
1215    pub async fn call(
1216        &self,
1217        room_id: RoomId,
1218        calling_user_id: UserId,
1219        calling_connection: ConnectionId,
1220        called_user_id: UserId,
1221        initial_project_id: Option<ProjectId>,
1222    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1223        self.room_transaction(room_id, |tx| async move {
1224            room_participant::ActiveModel {
1225                room_id: ActiveValue::set(room_id),
1226                user_id: ActiveValue::set(called_user_id),
1227                answering_connection_lost: ActiveValue::set(false),
1228                calling_user_id: ActiveValue::set(calling_user_id),
1229                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1230                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1231                    calling_connection.owner_id as i32,
1232                ))),
1233                initial_project_id: ActiveValue::set(initial_project_id),
1234                ..Default::default()
1235            }
1236            .insert(&*tx)
1237            .await?;
1238
1239            let room = self.get_room(room_id, &tx).await?;
1240            let incoming_call = Self::build_incoming_call(&room, called_user_id)
1241                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1242            Ok((room, incoming_call))
1243        })
1244        .await
1245    }
1246
1247    pub async fn call_failed(
1248        &self,
1249        room_id: RoomId,
1250        called_user_id: UserId,
1251    ) -> Result<RoomGuard<proto::Room>> {
1252        self.room_transaction(room_id, |tx| async move {
1253            room_participant::Entity::delete_many()
1254                .filter(
1255                    room_participant::Column::RoomId
1256                        .eq(room_id)
1257                        .and(room_participant::Column::UserId.eq(called_user_id)),
1258                )
1259                .exec(&*tx)
1260                .await?;
1261            let room = self.get_room(room_id, &tx).await?;
1262            Ok(room)
1263        })
1264        .await
1265    }
1266
1267    pub async fn decline_call(
1268        &self,
1269        expected_room_id: Option<RoomId>,
1270        user_id: UserId,
1271    ) -> Result<Option<RoomGuard<proto::Room>>> {
1272        self.optional_room_transaction(|tx| async move {
1273            let mut filter = Condition::all()
1274                .add(room_participant::Column::UserId.eq(user_id))
1275                .add(room_participant::Column::AnsweringConnectionId.is_null());
1276            if let Some(room_id) = expected_room_id {
1277                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1278            }
1279            let participant = room_participant::Entity::find()
1280                .filter(filter)
1281                .one(&*tx)
1282                .await?;
1283
1284            let participant = if let Some(participant) = participant {
1285                participant
1286            } else if expected_room_id.is_some() {
1287                return Err(anyhow!("could not find call to decline"))?;
1288            } else {
1289                return Ok(None);
1290            };
1291
1292            let room_id = participant.room_id;
1293            room_participant::Entity::delete(participant.into_active_model())
1294                .exec(&*tx)
1295                .await?;
1296
1297            let room = self.get_room(room_id, &tx).await?;
1298            Ok(Some((room_id, room)))
1299        })
1300        .await
1301    }
1302
1303    pub async fn cancel_call(
1304        &self,
1305        room_id: RoomId,
1306        calling_connection: ConnectionId,
1307        called_user_id: UserId,
1308    ) -> Result<RoomGuard<proto::Room>> {
1309        self.room_transaction(room_id, |tx| async move {
1310            let participant = room_participant::Entity::find()
1311                .filter(
1312                    Condition::all()
1313                        .add(room_participant::Column::UserId.eq(called_user_id))
1314                        .add(room_participant::Column::RoomId.eq(room_id))
1315                        .add(
1316                            room_participant::Column::CallingConnectionId
1317                                .eq(calling_connection.id as i32),
1318                        )
1319                        .add(
1320                            room_participant::Column::CallingConnectionServerId
1321                                .eq(calling_connection.owner_id as i32),
1322                        )
1323                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1324                )
1325                .one(&*tx)
1326                .await?
1327                .ok_or_else(|| anyhow!("no call to cancel"))?;
1328
1329            room_participant::Entity::delete(participant.into_active_model())
1330                .exec(&*tx)
1331                .await?;
1332
1333            let room = self.get_room(room_id, &tx).await?;
1334            Ok(room)
1335        })
1336        .await
1337    }
1338
1339    pub async fn join_room(
1340        &self,
1341        room_id: RoomId,
1342        user_id: UserId,
1343        connection: ConnectionId,
1344    ) -> Result<RoomGuard<proto::Room>> {
1345        self.room_transaction(room_id, |tx| async move {
1346            let result = room_participant::Entity::update_many()
1347                .filter(
1348                    Condition::all()
1349                        .add(room_participant::Column::RoomId.eq(room_id))
1350                        .add(room_participant::Column::UserId.eq(user_id))
1351                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1352                )
1353                .set(room_participant::ActiveModel {
1354                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1355                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1356                        connection.owner_id as i32,
1357                    ))),
1358                    answering_connection_lost: ActiveValue::set(false),
1359                    ..Default::default()
1360                })
1361                .exec(&*tx)
1362                .await?;
1363            if result.rows_affected == 0 {
1364                Err(anyhow!("room does not exist or was already joined"))?
1365            } else {
1366                let room = self.get_room(room_id, &tx).await?;
1367                Ok(room)
1368            }
1369        })
1370        .await
1371    }
1372
1373    pub async fn rejoin_room(
1374        &self,
1375        rejoin_room: proto::RejoinRoom,
1376        user_id: UserId,
1377        connection: ConnectionId,
1378    ) -> Result<RoomGuard<RejoinedRoom>> {
1379        let room_id = RoomId::from_proto(rejoin_room.id);
1380        self.room_transaction(room_id, |tx| async {
1381            let tx = tx;
1382            let participant_update = room_participant::Entity::update_many()
1383                .filter(
1384                    Condition::all()
1385                        .add(room_participant::Column::RoomId.eq(room_id))
1386                        .add(room_participant::Column::UserId.eq(user_id))
1387                        .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1388                        .add(
1389                            Condition::any()
1390                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1391                                .add(
1392                                    room_participant::Column::AnsweringConnectionServerId
1393                                        .ne(connection.owner_id as i32),
1394                                ),
1395                        ),
1396                )
1397                .set(room_participant::ActiveModel {
1398                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1399                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1400                        connection.owner_id as i32,
1401                    ))),
1402                    answering_connection_lost: ActiveValue::set(false),
1403                    ..Default::default()
1404                })
1405                .exec(&*tx)
1406                .await?;
1407            if participant_update.rows_affected == 0 {
1408                return Err(anyhow!("room does not exist or was already joined"))?;
1409            }
1410
1411            let mut reshared_projects = Vec::new();
1412            for reshared_project in &rejoin_room.reshared_projects {
1413                let project_id = ProjectId::from_proto(reshared_project.project_id);
1414                let project = project::Entity::find_by_id(project_id)
1415                    .one(&*tx)
1416                    .await?
1417                    .ok_or_else(|| anyhow!("project does not exist"))?;
1418                if project.host_user_id != user_id {
1419                    return Err(anyhow!("no such project"))?;
1420                }
1421
1422                let mut collaborators = project
1423                    .find_related(project_collaborator::Entity)
1424                    .all(&*tx)
1425                    .await?;
1426                let host_ix = collaborators
1427                    .iter()
1428                    .position(|collaborator| {
1429                        collaborator.user_id == user_id && collaborator.is_host
1430                    })
1431                    .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1432                let host = collaborators.swap_remove(host_ix);
1433                let old_connection_id = host.connection();
1434
1435                project::Entity::update(project::ActiveModel {
1436                    host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1437                    host_connection_server_id: ActiveValue::set(Some(ServerId(
1438                        connection.owner_id as i32,
1439                    ))),
1440                    ..project.into_active_model()
1441                })
1442                .exec(&*tx)
1443                .await?;
1444                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1445                    connection_id: ActiveValue::set(connection.id as i32),
1446                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1447                    ..host.into_active_model()
1448                })
1449                .exec(&*tx)
1450                .await?;
1451
1452                self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1453                    .await?;
1454
1455                reshared_projects.push(ResharedProject {
1456                    id: project_id,
1457                    old_connection_id,
1458                    collaborators: collaborators
1459                        .iter()
1460                        .map(|collaborator| ProjectCollaborator {
1461                            connection_id: collaborator.connection(),
1462                            user_id: collaborator.user_id,
1463                            replica_id: collaborator.replica_id,
1464                            is_host: collaborator.is_host,
1465                        })
1466                        .collect(),
1467                    worktrees: reshared_project.worktrees.clone(),
1468                });
1469            }
1470
1471            project::Entity::delete_many()
1472                .filter(
1473                    Condition::all()
1474                        .add(project::Column::RoomId.eq(room_id))
1475                        .add(project::Column::HostUserId.eq(user_id))
1476                        .add(
1477                            project::Column::Id
1478                                .is_not_in(reshared_projects.iter().map(|project| project.id)),
1479                        ),
1480                )
1481                .exec(&*tx)
1482                .await?;
1483
1484            let mut rejoined_projects = Vec::new();
1485            for rejoined_project in &rejoin_room.rejoined_projects {
1486                let project_id = ProjectId::from_proto(rejoined_project.id);
1487                let Some(project) = project::Entity::find_by_id(project_id)
1488                    .one(&*tx)
1489                    .await? else { continue };
1490
1491                let mut worktrees = Vec::new();
1492                let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1493                for db_worktree in db_worktrees {
1494                    let mut worktree = RejoinedWorktree {
1495                        id: db_worktree.id as u64,
1496                        abs_path: db_worktree.abs_path,
1497                        root_name: db_worktree.root_name,
1498                        visible: db_worktree.visible,
1499                        updated_entries: Default::default(),
1500                        removed_entries: Default::default(),
1501                        updated_repositories: Default::default(),
1502                        removed_repositories: Default::default(),
1503                        diagnostic_summaries: Default::default(),
1504                        settings_files: Default::default(),
1505                        scan_id: db_worktree.scan_id as u64,
1506                        completed_scan_id: db_worktree.completed_scan_id as u64,
1507                    };
1508
1509                    let rejoined_worktree = rejoined_project
1510                        .worktrees
1511                        .iter()
1512                        .find(|worktree| worktree.id == db_worktree.id as u64);
1513
1514                    // File entries
1515                    {
1516                        let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1517                            worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1518                        } else {
1519                            worktree_entry::Column::IsDeleted.eq(false)
1520                        };
1521
1522                        let mut db_entries = worktree_entry::Entity::find()
1523                            .filter(
1524                                Condition::all()
1525                                    .add(worktree_entry::Column::ProjectId.eq(project.id))
1526                                    .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1527                                    .add(entry_filter),
1528                            )
1529                            .stream(&*tx)
1530                            .await?;
1531
1532                        while let Some(db_entry) = db_entries.next().await {
1533                            let db_entry = db_entry?;
1534                            if db_entry.is_deleted {
1535                                worktree.removed_entries.push(db_entry.id as u64);
1536                            } else {
1537                                worktree.updated_entries.push(proto::Entry {
1538                                    id: db_entry.id as u64,
1539                                    is_dir: db_entry.is_dir,
1540                                    path: db_entry.path,
1541                                    inode: db_entry.inode as u64,
1542                                    mtime: Some(proto::Timestamp {
1543                                        seconds: db_entry.mtime_seconds as u64,
1544                                        nanos: db_entry.mtime_nanos as u32,
1545                                    }),
1546                                    is_symlink: db_entry.is_symlink,
1547                                    is_ignored: db_entry.is_ignored,
1548                                    is_external: db_entry.is_external,
1549                                    git_status: db_entry.git_status.map(|status| status as i32),
1550                                });
1551                            }
1552                        }
1553                    }
1554
1555                    // Repository Entries
1556                    {
1557                        let repository_entry_filter =
1558                            if let Some(rejoined_worktree) = rejoined_worktree {
1559                                worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
1560                            } else {
1561                                worktree_repository::Column::IsDeleted.eq(false)
1562                            };
1563
1564                        let mut db_repositories = worktree_repository::Entity::find()
1565                            .filter(
1566                                Condition::all()
1567                                    .add(worktree_repository::Column::ProjectId.eq(project.id))
1568                                    .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
1569                                    .add(repository_entry_filter),
1570                            )
1571                            .stream(&*tx)
1572                            .await?;
1573
1574                        while let Some(db_repository) = db_repositories.next().await {
1575                            let db_repository = db_repository?;
1576                            if db_repository.is_deleted {
1577                                worktree
1578                                    .removed_repositories
1579                                    .push(db_repository.work_directory_id as u64);
1580                            } else {
1581                                worktree.updated_repositories.push(proto::RepositoryEntry {
1582                                    work_directory_id: db_repository.work_directory_id as u64,
1583                                    branch: db_repository.branch,
1584                                });
1585                            }
1586                        }
1587                    }
1588
1589                    worktrees.push(worktree);
1590                }
1591
1592                let language_servers = project
1593                    .find_related(language_server::Entity)
1594                    .all(&*tx)
1595                    .await?
1596                    .into_iter()
1597                    .map(|language_server| proto::LanguageServer {
1598                        id: language_server.id as u64,
1599                        name: language_server.name,
1600                    })
1601                    .collect::<Vec<_>>();
1602
1603                {
1604                    let mut db_settings_files = worktree_settings_file::Entity::find()
1605                        .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
1606                        .stream(&*tx)
1607                        .await?;
1608                    while let Some(db_settings_file) = db_settings_files.next().await {
1609                        let db_settings_file = db_settings_file?;
1610                        if let Some(worktree) = worktrees
1611                            .iter_mut()
1612                            .find(|w| w.id == db_settings_file.worktree_id as u64)
1613                        {
1614                            worktree.settings_files.push(WorktreeSettingsFile {
1615                                path: db_settings_file.path,
1616                                content: db_settings_file.content,
1617                            });
1618                        }
1619                    }
1620                }
1621
1622                let mut collaborators = project
1623                    .find_related(project_collaborator::Entity)
1624                    .all(&*tx)
1625                    .await?;
1626                let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1627                    .iter()
1628                    .position(|collaborator| collaborator.user_id == user_id)
1629                {
1630                    collaborators.swap_remove(self_collaborator_ix)
1631                } else {
1632                    continue;
1633                };
1634                let old_connection_id = self_collaborator.connection();
1635                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1636                    connection_id: ActiveValue::set(connection.id as i32),
1637                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1638                    ..self_collaborator.into_active_model()
1639                })
1640                .exec(&*tx)
1641                .await?;
1642
1643                let collaborators = collaborators
1644                    .into_iter()
1645                    .map(|collaborator| ProjectCollaborator {
1646                        connection_id: collaborator.connection(),
1647                        user_id: collaborator.user_id,
1648                        replica_id: collaborator.replica_id,
1649                        is_host: collaborator.is_host,
1650                    })
1651                    .collect::<Vec<_>>();
1652
1653                rejoined_projects.push(RejoinedProject {
1654                    id: project_id,
1655                    old_connection_id,
1656                    collaborators,
1657                    worktrees,
1658                    language_servers,
1659                });
1660            }
1661
1662            let room = self.get_room(room_id, &tx).await?;
1663            Ok(RejoinedRoom {
1664                room,
1665                rejoined_projects,
1666                reshared_projects,
1667            })
1668        })
1669        .await
1670    }
1671
1672    pub async fn leave_room(
1673        &self,
1674        connection: ConnectionId,
1675    ) -> Result<Option<RoomGuard<LeftRoom>>> {
1676        self.optional_room_transaction(|tx| async move {
1677            let leaving_participant = room_participant::Entity::find()
1678                .filter(
1679                    Condition::all()
1680                        .add(
1681                            room_participant::Column::AnsweringConnectionId
1682                                .eq(connection.id as i32),
1683                        )
1684                        .add(
1685                            room_participant::Column::AnsweringConnectionServerId
1686                                .eq(connection.owner_id as i32),
1687                        ),
1688                )
1689                .one(&*tx)
1690                .await?;
1691
1692            if let Some(leaving_participant) = leaving_participant {
1693                // Leave room.
1694                let room_id = leaving_participant.room_id;
1695                room_participant::Entity::delete_by_id(leaving_participant.id)
1696                    .exec(&*tx)
1697                    .await?;
1698
1699                // Cancel pending calls initiated by the leaving user.
1700                let called_participants = room_participant::Entity::find()
1701                    .filter(
1702                        Condition::all()
1703                            .add(
1704                                room_participant::Column::CallingUserId
1705                                    .eq(leaving_participant.user_id),
1706                            )
1707                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
1708                    )
1709                    .all(&*tx)
1710                    .await?;
1711                room_participant::Entity::delete_many()
1712                    .filter(
1713                        room_participant::Column::Id
1714                            .is_in(called_participants.iter().map(|participant| participant.id)),
1715                    )
1716                    .exec(&*tx)
1717                    .await?;
1718                let canceled_calls_to_user_ids = called_participants
1719                    .into_iter()
1720                    .map(|participant| participant.user_id)
1721                    .collect();
1722
1723                // Detect left projects.
1724                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1725                enum QueryProjectIds {
1726                    ProjectId,
1727                }
1728                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1729                    .select_only()
1730                    .column_as(
1731                        project_collaborator::Column::ProjectId,
1732                        QueryProjectIds::ProjectId,
1733                    )
1734                    .filter(
1735                        Condition::all()
1736                            .add(
1737                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1738                            )
1739                            .add(
1740                                project_collaborator::Column::ConnectionServerId
1741                                    .eq(connection.owner_id as i32),
1742                            ),
1743                    )
1744                    .into_values::<_, QueryProjectIds>()
1745                    .all(&*tx)
1746                    .await?;
1747                let mut left_projects = HashMap::default();
1748                let mut collaborators = project_collaborator::Entity::find()
1749                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1750                    .stream(&*tx)
1751                    .await?;
1752                while let Some(collaborator) = collaborators.next().await {
1753                    let collaborator = collaborator?;
1754                    let left_project =
1755                        left_projects
1756                            .entry(collaborator.project_id)
1757                            .or_insert(LeftProject {
1758                                id: collaborator.project_id,
1759                                host_user_id: Default::default(),
1760                                connection_ids: Default::default(),
1761                                host_connection_id: Default::default(),
1762                            });
1763
1764                    let collaborator_connection_id = collaborator.connection();
1765                    if collaborator_connection_id != connection {
1766                        left_project.connection_ids.push(collaborator_connection_id);
1767                    }
1768
1769                    if collaborator.is_host {
1770                        left_project.host_user_id = collaborator.user_id;
1771                        left_project.host_connection_id = collaborator_connection_id;
1772                    }
1773                }
1774                drop(collaborators);
1775
1776                // Leave projects.
1777                project_collaborator::Entity::delete_many()
1778                    .filter(
1779                        Condition::all()
1780                            .add(
1781                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1782                            )
1783                            .add(
1784                                project_collaborator::Column::ConnectionServerId
1785                                    .eq(connection.owner_id as i32),
1786                            ),
1787                    )
1788                    .exec(&*tx)
1789                    .await?;
1790
1791                // Unshare projects.
1792                project::Entity::delete_many()
1793                    .filter(
1794                        Condition::all()
1795                            .add(project::Column::RoomId.eq(room_id))
1796                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
1797                            .add(
1798                                project::Column::HostConnectionServerId
1799                                    .eq(connection.owner_id as i32),
1800                            ),
1801                    )
1802                    .exec(&*tx)
1803                    .await?;
1804
1805                let room = self.get_room(room_id, &tx).await?;
1806                if room.participants.is_empty() {
1807                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
1808                }
1809
1810                let left_room = LeftRoom {
1811                    room,
1812                    left_projects,
1813                    canceled_calls_to_user_ids,
1814                };
1815
1816                if left_room.room.participants.is_empty() {
1817                    self.rooms.remove(&room_id);
1818                }
1819
1820                Ok(Some((room_id, left_room)))
1821            } else {
1822                Ok(None)
1823            }
1824        })
1825        .await
1826    }
1827
1828    pub async fn follow(
1829        &self,
1830        project_id: ProjectId,
1831        leader_connection: ConnectionId,
1832        follower_connection: ConnectionId,
1833    ) -> Result<RoomGuard<proto::Room>> {
1834        let room_id = self.room_id_for_project(project_id).await?;
1835        self.room_transaction(room_id, |tx| async move {
1836            follower::ActiveModel {
1837                room_id: ActiveValue::set(room_id),
1838                project_id: ActiveValue::set(project_id),
1839                leader_connection_server_id: ActiveValue::set(ServerId(
1840                    leader_connection.owner_id as i32,
1841                )),
1842                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1843                follower_connection_server_id: ActiveValue::set(ServerId(
1844                    follower_connection.owner_id as i32,
1845                )),
1846                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1847                ..Default::default()
1848            }
1849            .insert(&*tx)
1850            .await?;
1851
1852            let room = self.get_room(room_id, &*tx).await?;
1853            Ok(room)
1854        })
1855        .await
1856    }
1857
1858    pub async fn unfollow(
1859        &self,
1860        project_id: ProjectId,
1861        leader_connection: ConnectionId,
1862        follower_connection: ConnectionId,
1863    ) -> Result<RoomGuard<proto::Room>> {
1864        let room_id = self.room_id_for_project(project_id).await?;
1865        self.room_transaction(room_id, |tx| async move {
1866            follower::Entity::delete_many()
1867                .filter(
1868                    Condition::all()
1869                        .add(follower::Column::ProjectId.eq(project_id))
1870                        .add(
1871                            follower::Column::LeaderConnectionServerId
1872                                .eq(leader_connection.owner_id),
1873                        )
1874                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1875                        .add(
1876                            follower::Column::FollowerConnectionServerId
1877                                .eq(follower_connection.owner_id),
1878                        )
1879                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1880                )
1881                .exec(&*tx)
1882                .await?;
1883
1884            let room = self.get_room(room_id, &*tx).await?;
1885            Ok(room)
1886        })
1887        .await
1888    }
1889
1890    pub async fn update_room_participant_location(
1891        &self,
1892        room_id: RoomId,
1893        connection: ConnectionId,
1894        location: proto::ParticipantLocation,
1895    ) -> Result<RoomGuard<proto::Room>> {
1896        self.room_transaction(room_id, |tx| async {
1897            let tx = tx;
1898            let location_kind;
1899            let location_project_id;
1900            match location
1901                .variant
1902                .as_ref()
1903                .ok_or_else(|| anyhow!("invalid location"))?
1904            {
1905                proto::participant_location::Variant::SharedProject(project) => {
1906                    location_kind = 0;
1907                    location_project_id = Some(ProjectId::from_proto(project.id));
1908                }
1909                proto::participant_location::Variant::UnsharedProject(_) => {
1910                    location_kind = 1;
1911                    location_project_id = None;
1912                }
1913                proto::participant_location::Variant::External(_) => {
1914                    location_kind = 2;
1915                    location_project_id = None;
1916                }
1917            }
1918
1919            let result = room_participant::Entity::update_many()
1920                .filter(
1921                    Condition::all()
1922                        .add(room_participant::Column::RoomId.eq(room_id))
1923                        .add(
1924                            room_participant::Column::AnsweringConnectionId
1925                                .eq(connection.id as i32),
1926                        )
1927                        .add(
1928                            room_participant::Column::AnsweringConnectionServerId
1929                                .eq(connection.owner_id as i32),
1930                        ),
1931                )
1932                .set(room_participant::ActiveModel {
1933                    location_kind: ActiveValue::set(Some(location_kind)),
1934                    location_project_id: ActiveValue::set(location_project_id),
1935                    ..Default::default()
1936                })
1937                .exec(&*tx)
1938                .await?;
1939
1940            if result.rows_affected == 1 {
1941                let room = self.get_room(room_id, &tx).await?;
1942                Ok(room)
1943            } else {
1944                Err(anyhow!("could not update room participant location"))?
1945            }
1946        })
1947        .await
1948    }
1949
1950    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1951        self.transaction(|tx| async move {
1952            let participant = room_participant::Entity::find()
1953                .filter(
1954                    Condition::all()
1955                        .add(
1956                            room_participant::Column::AnsweringConnectionId
1957                                .eq(connection.id as i32),
1958                        )
1959                        .add(
1960                            room_participant::Column::AnsweringConnectionServerId
1961                                .eq(connection.owner_id as i32),
1962                        ),
1963                )
1964                .one(&*tx)
1965                .await?
1966                .ok_or_else(|| anyhow!("not a participant in any room"))?;
1967
1968            room_participant::Entity::update(room_participant::ActiveModel {
1969                answering_connection_lost: ActiveValue::set(true),
1970                ..participant.into_active_model()
1971            })
1972            .exec(&*tx)
1973            .await?;
1974
1975            Ok(())
1976        })
1977        .await
1978    }
1979
1980    fn build_incoming_call(
1981        room: &proto::Room,
1982        called_user_id: UserId,
1983    ) -> Option<proto::IncomingCall> {
1984        let pending_participant = room
1985            .pending_participants
1986            .iter()
1987            .find(|participant| participant.user_id == called_user_id.to_proto())?;
1988
1989        Some(proto::IncomingCall {
1990            room_id: room.id,
1991            calling_user_id: pending_participant.calling_user_id,
1992            participant_user_ids: room
1993                .participants
1994                .iter()
1995                .map(|participant| participant.user_id)
1996                .collect(),
1997            initial_project: room.participants.iter().find_map(|participant| {
1998                let initial_project_id = pending_participant.initial_project_id?;
1999                participant
2000                    .projects
2001                    .iter()
2002                    .find(|project| project.id == initial_project_id)
2003                    .cloned()
2004            }),
2005        })
2006    }
2007
2008    async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
2009        let db_room = room::Entity::find_by_id(room_id)
2010            .one(tx)
2011            .await?
2012            .ok_or_else(|| anyhow!("could not find room"))?;
2013
2014        let mut db_participants = db_room
2015            .find_related(room_participant::Entity)
2016            .stream(tx)
2017            .await?;
2018        let mut participants = HashMap::default();
2019        let mut pending_participants = Vec::new();
2020        while let Some(db_participant) = db_participants.next().await {
2021            let db_participant = db_participant?;
2022            if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
2023                .answering_connection_id
2024                .zip(db_participant.answering_connection_server_id)
2025            {
2026                let location = match (
2027                    db_participant.location_kind,
2028                    db_participant.location_project_id,
2029                ) {
2030                    (Some(0), Some(project_id)) => {
2031                        Some(proto::participant_location::Variant::SharedProject(
2032                            proto::participant_location::SharedProject {
2033                                id: project_id.to_proto(),
2034                            },
2035                        ))
2036                    }
2037                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
2038                        Default::default(),
2039                    )),
2040                    _ => Some(proto::participant_location::Variant::External(
2041                        Default::default(),
2042                    )),
2043                };
2044
2045                let answering_connection = ConnectionId {
2046                    owner_id: answering_connection_server_id.0 as u32,
2047                    id: answering_connection_id as u32,
2048                };
2049                participants.insert(
2050                    answering_connection,
2051                    proto::Participant {
2052                        user_id: db_participant.user_id.to_proto(),
2053                        peer_id: Some(answering_connection.into()),
2054                        projects: Default::default(),
2055                        location: Some(proto::ParticipantLocation { variant: location }),
2056                    },
2057                );
2058            } else {
2059                pending_participants.push(proto::PendingParticipant {
2060                    user_id: db_participant.user_id.to_proto(),
2061                    calling_user_id: db_participant.calling_user_id.to_proto(),
2062                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
2063                });
2064            }
2065        }
2066        drop(db_participants);
2067
2068        let mut db_projects = db_room
2069            .find_related(project::Entity)
2070            .find_with_related(worktree::Entity)
2071            .stream(tx)
2072            .await?;
2073
2074        while let Some(row) = db_projects.next().await {
2075            let (db_project, db_worktree) = row?;
2076            let host_connection = db_project.host_connection()?;
2077            if let Some(participant) = participants.get_mut(&host_connection) {
2078                let project = if let Some(project) = participant
2079                    .projects
2080                    .iter_mut()
2081                    .find(|project| project.id == db_project.id.to_proto())
2082                {
2083                    project
2084                } else {
2085                    participant.projects.push(proto::ParticipantProject {
2086                        id: db_project.id.to_proto(),
2087                        worktree_root_names: Default::default(),
2088                    });
2089                    participant.projects.last_mut().unwrap()
2090                };
2091
2092                if let Some(db_worktree) = db_worktree {
2093                    if db_worktree.visible {
2094                        project.worktree_root_names.push(db_worktree.root_name);
2095                    }
2096                }
2097            }
2098        }
2099        drop(db_projects);
2100
2101        let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
2102        let mut followers = Vec::new();
2103        while let Some(db_follower) = db_followers.next().await {
2104            let db_follower = db_follower?;
2105            followers.push(proto::Follower {
2106                leader_id: Some(db_follower.leader_connection().into()),
2107                follower_id: Some(db_follower.follower_connection().into()),
2108                project_id: db_follower.project_id.to_proto(),
2109            });
2110        }
2111
2112        Ok(proto::Room {
2113            id: db_room.id.to_proto(),
2114            live_kit_room: db_room.live_kit_room,
2115            participants: participants.into_values().collect(),
2116            pending_participants,
2117            followers,
2118        })
2119    }
2120
2121    // projects
2122
2123    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
2124        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2125        enum QueryAs {
2126            Count,
2127        }
2128
2129        self.transaction(|tx| async move {
2130            Ok(project::Entity::find()
2131                .select_only()
2132                .column_as(project::Column::Id.count(), QueryAs::Count)
2133                .inner_join(user::Entity)
2134                .filter(user::Column::Admin.eq(false))
2135                .into_values::<_, QueryAs>()
2136                .one(&*tx)
2137                .await?
2138                .unwrap_or(0i64) as usize)
2139        })
2140        .await
2141    }
2142
2143    pub async fn share_project(
2144        &self,
2145        room_id: RoomId,
2146        connection: ConnectionId,
2147        worktrees: &[proto::WorktreeMetadata],
2148    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
2149        self.room_transaction(room_id, |tx| async move {
2150            let participant = room_participant::Entity::find()
2151                .filter(
2152                    Condition::all()
2153                        .add(
2154                            room_participant::Column::AnsweringConnectionId
2155                                .eq(connection.id as i32),
2156                        )
2157                        .add(
2158                            room_participant::Column::AnsweringConnectionServerId
2159                                .eq(connection.owner_id as i32),
2160                        ),
2161                )
2162                .one(&*tx)
2163                .await?
2164                .ok_or_else(|| anyhow!("could not find participant"))?;
2165            if participant.room_id != room_id {
2166                return Err(anyhow!("shared project on unexpected room"))?;
2167            }
2168
2169            let project = project::ActiveModel {
2170                room_id: ActiveValue::set(participant.room_id),
2171                host_user_id: ActiveValue::set(participant.user_id),
2172                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
2173                host_connection_server_id: ActiveValue::set(Some(ServerId(
2174                    connection.owner_id as i32,
2175                ))),
2176                ..Default::default()
2177            }
2178            .insert(&*tx)
2179            .await?;
2180
2181            if !worktrees.is_empty() {
2182                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2183                    worktree::ActiveModel {
2184                        id: ActiveValue::set(worktree.id as i64),
2185                        project_id: ActiveValue::set(project.id),
2186                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
2187                        root_name: ActiveValue::set(worktree.root_name.clone()),
2188                        visible: ActiveValue::set(worktree.visible),
2189                        scan_id: ActiveValue::set(0),
2190                        completed_scan_id: ActiveValue::set(0),
2191                    }
2192                }))
2193                .exec(&*tx)
2194                .await?;
2195            }
2196
2197            project_collaborator::ActiveModel {
2198                project_id: ActiveValue::set(project.id),
2199                connection_id: ActiveValue::set(connection.id as i32),
2200                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2201                user_id: ActiveValue::set(participant.user_id),
2202                replica_id: ActiveValue::set(ReplicaId(0)),
2203                is_host: ActiveValue::set(true),
2204                ..Default::default()
2205            }
2206            .insert(&*tx)
2207            .await?;
2208
2209            let room = self.get_room(room_id, &tx).await?;
2210            Ok((project.id, room))
2211        })
2212        .await
2213    }
2214
2215    pub async fn unshare_project(
2216        &self,
2217        project_id: ProjectId,
2218        connection: ConnectionId,
2219    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2220        let room_id = self.room_id_for_project(project_id).await?;
2221        self.room_transaction(room_id, |tx| async move {
2222            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2223
2224            let project = project::Entity::find_by_id(project_id)
2225                .one(&*tx)
2226                .await?
2227                .ok_or_else(|| anyhow!("project not found"))?;
2228            if project.host_connection()? == connection {
2229                project::Entity::delete(project.into_active_model())
2230                    .exec(&*tx)
2231                    .await?;
2232                let room = self.get_room(room_id, &tx).await?;
2233                Ok((room, guest_connection_ids))
2234            } else {
2235                Err(anyhow!("cannot unshare a project hosted by another user"))?
2236            }
2237        })
2238        .await
2239    }
2240
2241    pub async fn update_project(
2242        &self,
2243        project_id: ProjectId,
2244        connection: ConnectionId,
2245        worktrees: &[proto::WorktreeMetadata],
2246    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2247        let room_id = self.room_id_for_project(project_id).await?;
2248        self.room_transaction(room_id, |tx| async move {
2249            let project = project::Entity::find_by_id(project_id)
2250                .filter(
2251                    Condition::all()
2252                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2253                        .add(
2254                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2255                        ),
2256                )
2257                .one(&*tx)
2258                .await?
2259                .ok_or_else(|| anyhow!("no such project"))?;
2260
2261            self.update_project_worktrees(project.id, worktrees, &tx)
2262                .await?;
2263
2264            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2265            let room = self.get_room(project.room_id, &tx).await?;
2266            Ok((room, guest_connection_ids))
2267        })
2268        .await
2269    }
2270
2271    async fn update_project_worktrees(
2272        &self,
2273        project_id: ProjectId,
2274        worktrees: &[proto::WorktreeMetadata],
2275        tx: &DatabaseTransaction,
2276    ) -> Result<()> {
2277        if !worktrees.is_empty() {
2278            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2279                id: ActiveValue::set(worktree.id as i64),
2280                project_id: ActiveValue::set(project_id),
2281                abs_path: ActiveValue::set(worktree.abs_path.clone()),
2282                root_name: ActiveValue::set(worktree.root_name.clone()),
2283                visible: ActiveValue::set(worktree.visible),
2284                scan_id: ActiveValue::set(0),
2285                completed_scan_id: ActiveValue::set(0),
2286            }))
2287            .on_conflict(
2288                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2289                    .update_column(worktree::Column::RootName)
2290                    .to_owned(),
2291            )
2292            .exec(&*tx)
2293            .await?;
2294        }
2295
2296        worktree::Entity::delete_many()
2297            .filter(worktree::Column::ProjectId.eq(project_id).and(
2298                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2299            ))
2300            .exec(&*tx)
2301            .await?;
2302
2303        Ok(())
2304    }
2305
2306    pub async fn update_worktree(
2307        &self,
2308        update: &proto::UpdateWorktree,
2309        connection: ConnectionId,
2310    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2311        let project_id = ProjectId::from_proto(update.project_id);
2312        let worktree_id = update.worktree_id as i64;
2313        let room_id = self.room_id_for_project(project_id).await?;
2314        self.room_transaction(room_id, |tx| async move {
2315            // Ensure the update comes from the host.
2316            let _project = project::Entity::find_by_id(project_id)
2317                .filter(
2318                    Condition::all()
2319                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2320                        .add(
2321                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2322                        ),
2323                )
2324                .one(&*tx)
2325                .await?
2326                .ok_or_else(|| anyhow!("no such project"))?;
2327
2328            // Update metadata.
2329            worktree::Entity::update(worktree::ActiveModel {
2330                id: ActiveValue::set(worktree_id),
2331                project_id: ActiveValue::set(project_id),
2332                root_name: ActiveValue::set(update.root_name.clone()),
2333                scan_id: ActiveValue::set(update.scan_id as i64),
2334                completed_scan_id: if update.is_last_update {
2335                    ActiveValue::set(update.scan_id as i64)
2336                } else {
2337                    ActiveValue::default()
2338                },
2339                abs_path: ActiveValue::set(update.abs_path.clone()),
2340                ..Default::default()
2341            })
2342            .exec(&*tx)
2343            .await?;
2344
2345            if !update.updated_entries.is_empty() {
2346                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2347                    let mtime = entry.mtime.clone().unwrap_or_default();
2348                    worktree_entry::ActiveModel {
2349                        project_id: ActiveValue::set(project_id),
2350                        worktree_id: ActiveValue::set(worktree_id),
2351                        id: ActiveValue::set(entry.id as i64),
2352                        is_dir: ActiveValue::set(entry.is_dir),
2353                        path: ActiveValue::set(entry.path.clone()),
2354                        inode: ActiveValue::set(entry.inode as i64),
2355                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2356                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2357                        is_symlink: ActiveValue::set(entry.is_symlink),
2358                        is_ignored: ActiveValue::set(entry.is_ignored),
2359                        is_external: ActiveValue::set(entry.is_external),
2360                        git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
2361                        is_deleted: ActiveValue::set(false),
2362                        scan_id: ActiveValue::set(update.scan_id as i64),
2363                    }
2364                }))
2365                .on_conflict(
2366                    OnConflict::columns([
2367                        worktree_entry::Column::ProjectId,
2368                        worktree_entry::Column::WorktreeId,
2369                        worktree_entry::Column::Id,
2370                    ])
2371                    .update_columns([
2372                        worktree_entry::Column::IsDir,
2373                        worktree_entry::Column::Path,
2374                        worktree_entry::Column::Inode,
2375                        worktree_entry::Column::MtimeSeconds,
2376                        worktree_entry::Column::MtimeNanos,
2377                        worktree_entry::Column::IsSymlink,
2378                        worktree_entry::Column::IsIgnored,
2379                        worktree_entry::Column::GitStatus,
2380                        worktree_entry::Column::ScanId,
2381                    ])
2382                    .to_owned(),
2383                )
2384                .exec(&*tx)
2385                .await?;
2386            }
2387
2388            if !update.removed_entries.is_empty() {
2389                worktree_entry::Entity::update_many()
2390                    .filter(
2391                        worktree_entry::Column::ProjectId
2392                            .eq(project_id)
2393                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2394                            .and(
2395                                worktree_entry::Column::Id
2396                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2397                            ),
2398                    )
2399                    .set(worktree_entry::ActiveModel {
2400                        is_deleted: ActiveValue::Set(true),
2401                        scan_id: ActiveValue::Set(update.scan_id as i64),
2402                        ..Default::default()
2403                    })
2404                    .exec(&*tx)
2405                    .await?;
2406            }
2407
2408            if !update.updated_repositories.is_empty() {
2409                worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
2410                    |repository| worktree_repository::ActiveModel {
2411                        project_id: ActiveValue::set(project_id),
2412                        worktree_id: ActiveValue::set(worktree_id),
2413                        work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
2414                        scan_id: ActiveValue::set(update.scan_id as i64),
2415                        branch: ActiveValue::set(repository.branch.clone()),
2416                        is_deleted: ActiveValue::set(false),
2417                    },
2418                ))
2419                .on_conflict(
2420                    OnConflict::columns([
2421                        worktree_repository::Column::ProjectId,
2422                        worktree_repository::Column::WorktreeId,
2423                        worktree_repository::Column::WorkDirectoryId,
2424                    ])
2425                    .update_columns([
2426                        worktree_repository::Column::ScanId,
2427                        worktree_repository::Column::Branch,
2428                    ])
2429                    .to_owned(),
2430                )
2431                .exec(&*tx)
2432                .await?;
2433            }
2434
2435            if !update.removed_repositories.is_empty() {
2436                worktree_repository::Entity::update_many()
2437                    .filter(
2438                        worktree_repository::Column::ProjectId
2439                            .eq(project_id)
2440                            .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
2441                            .and(
2442                                worktree_repository::Column::WorkDirectoryId
2443                                    .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
2444                            ),
2445                    )
2446                    .set(worktree_repository::ActiveModel {
2447                        is_deleted: ActiveValue::Set(true),
2448                        scan_id: ActiveValue::Set(update.scan_id as i64),
2449                        ..Default::default()
2450                    })
2451                    .exec(&*tx)
2452                    .await?;
2453            }
2454
2455            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2456            Ok(connection_ids)
2457        })
2458        .await
2459    }
2460
2461    pub async fn update_diagnostic_summary(
2462        &self,
2463        update: &proto::UpdateDiagnosticSummary,
2464        connection: ConnectionId,
2465    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2466        let project_id = ProjectId::from_proto(update.project_id);
2467        let worktree_id = update.worktree_id as i64;
2468        let room_id = self.room_id_for_project(project_id).await?;
2469        self.room_transaction(room_id, |tx| async move {
2470            let summary = update
2471                .summary
2472                .as_ref()
2473                .ok_or_else(|| anyhow!("invalid summary"))?;
2474
2475            // Ensure the update comes from the host.
2476            let project = project::Entity::find_by_id(project_id)
2477                .one(&*tx)
2478                .await?
2479                .ok_or_else(|| anyhow!("no such project"))?;
2480            if project.host_connection()? != connection {
2481                return Err(anyhow!("can't update a project hosted by someone else"))?;
2482            }
2483
2484            // Update summary.
2485            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2486                project_id: ActiveValue::set(project_id),
2487                worktree_id: ActiveValue::set(worktree_id),
2488                path: ActiveValue::set(summary.path.clone()),
2489                language_server_id: ActiveValue::set(summary.language_server_id as i64),
2490                error_count: ActiveValue::set(summary.error_count as i32),
2491                warning_count: ActiveValue::set(summary.warning_count as i32),
2492                ..Default::default()
2493            })
2494            .on_conflict(
2495                OnConflict::columns([
2496                    worktree_diagnostic_summary::Column::ProjectId,
2497                    worktree_diagnostic_summary::Column::WorktreeId,
2498                    worktree_diagnostic_summary::Column::Path,
2499                ])
2500                .update_columns([
2501                    worktree_diagnostic_summary::Column::LanguageServerId,
2502                    worktree_diagnostic_summary::Column::ErrorCount,
2503                    worktree_diagnostic_summary::Column::WarningCount,
2504                ])
2505                .to_owned(),
2506            )
2507            .exec(&*tx)
2508            .await?;
2509
2510            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2511            Ok(connection_ids)
2512        })
2513        .await
2514    }
2515
2516    pub async fn start_language_server(
2517        &self,
2518        update: &proto::StartLanguageServer,
2519        connection: ConnectionId,
2520    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2521        let project_id = ProjectId::from_proto(update.project_id);
2522        let room_id = self.room_id_for_project(project_id).await?;
2523        self.room_transaction(room_id, |tx| async move {
2524            let server = update
2525                .server
2526                .as_ref()
2527                .ok_or_else(|| anyhow!("invalid language server"))?;
2528
2529            // Ensure the update comes from the host.
2530            let project = project::Entity::find_by_id(project_id)
2531                .one(&*tx)
2532                .await?
2533                .ok_or_else(|| anyhow!("no such project"))?;
2534            if project.host_connection()? != connection {
2535                return Err(anyhow!("can't update a project hosted by someone else"))?;
2536            }
2537
2538            // Add the newly-started language server.
2539            language_server::Entity::insert(language_server::ActiveModel {
2540                project_id: ActiveValue::set(project_id),
2541                id: ActiveValue::set(server.id as i64),
2542                name: ActiveValue::set(server.name.clone()),
2543                ..Default::default()
2544            })
2545            .on_conflict(
2546                OnConflict::columns([
2547                    language_server::Column::ProjectId,
2548                    language_server::Column::Id,
2549                ])
2550                .update_column(language_server::Column::Name)
2551                .to_owned(),
2552            )
2553            .exec(&*tx)
2554            .await?;
2555
2556            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2557            Ok(connection_ids)
2558        })
2559        .await
2560    }
2561
2562    pub async fn update_worktree_settings(
2563        &self,
2564        update: &proto::UpdateWorktreeSettings,
2565        connection: ConnectionId,
2566    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2567        let project_id = ProjectId::from_proto(update.project_id);
2568        let room_id = self.room_id_for_project(project_id).await?;
2569        self.room_transaction(room_id, |tx| async move {
2570            // Ensure the update comes from the host.
2571            let project = project::Entity::find_by_id(project_id)
2572                .one(&*tx)
2573                .await?
2574                .ok_or_else(|| anyhow!("no such project"))?;
2575            if project.host_connection()? != connection {
2576                return Err(anyhow!("can't update a project hosted by someone else"))?;
2577            }
2578
2579            if let Some(content) = &update.content {
2580                worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
2581                    project_id: ActiveValue::Set(project_id),
2582                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
2583                    path: ActiveValue::Set(update.path.clone()),
2584                    content: ActiveValue::Set(content.clone()),
2585                })
2586                .on_conflict(
2587                    OnConflict::columns([
2588                        worktree_settings_file::Column::ProjectId,
2589                        worktree_settings_file::Column::WorktreeId,
2590                        worktree_settings_file::Column::Path,
2591                    ])
2592                    .update_column(worktree_settings_file::Column::Content)
2593                    .to_owned(),
2594                )
2595                .exec(&*tx)
2596                .await?;
2597            } else {
2598                worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
2599                    project_id: ActiveValue::Set(project_id),
2600                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
2601                    path: ActiveValue::Set(update.path.clone()),
2602                    ..Default::default()
2603                })
2604                .exec(&*tx)
2605                .await?;
2606            }
2607
2608            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2609            Ok(connection_ids)
2610        })
2611        .await
2612    }
2613
2614    pub async fn join_project(
2615        &self,
2616        project_id: ProjectId,
2617        connection: ConnectionId,
2618    ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2619        let room_id = self.room_id_for_project(project_id).await?;
2620        self.room_transaction(room_id, |tx| async move {
2621            let participant = room_participant::Entity::find()
2622                .filter(
2623                    Condition::all()
2624                        .add(
2625                            room_participant::Column::AnsweringConnectionId
2626                                .eq(connection.id as i32),
2627                        )
2628                        .add(
2629                            room_participant::Column::AnsweringConnectionServerId
2630                                .eq(connection.owner_id as i32),
2631                        ),
2632                )
2633                .one(&*tx)
2634                .await?
2635                .ok_or_else(|| anyhow!("must join a room first"))?;
2636
2637            let project = project::Entity::find_by_id(project_id)
2638                .one(&*tx)
2639                .await?
2640                .ok_or_else(|| anyhow!("no such project"))?;
2641            if project.room_id != participant.room_id {
2642                return Err(anyhow!("no such project"))?;
2643            }
2644
2645            let mut collaborators = project
2646                .find_related(project_collaborator::Entity)
2647                .all(&*tx)
2648                .await?;
2649            let replica_ids = collaborators
2650                .iter()
2651                .map(|c| c.replica_id)
2652                .collect::<HashSet<_>>();
2653            let mut replica_id = ReplicaId(1);
2654            while replica_ids.contains(&replica_id) {
2655                replica_id.0 += 1;
2656            }
2657            let new_collaborator = project_collaborator::ActiveModel {
2658                project_id: ActiveValue::set(project_id),
2659                connection_id: ActiveValue::set(connection.id as i32),
2660                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2661                user_id: ActiveValue::set(participant.user_id),
2662                replica_id: ActiveValue::set(replica_id),
2663                is_host: ActiveValue::set(false),
2664                ..Default::default()
2665            }
2666            .insert(&*tx)
2667            .await?;
2668            collaborators.push(new_collaborator);
2669
2670            let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2671            let mut worktrees = db_worktrees
2672                .into_iter()
2673                .map(|db_worktree| {
2674                    (
2675                        db_worktree.id as u64,
2676                        Worktree {
2677                            id: db_worktree.id as u64,
2678                            abs_path: db_worktree.abs_path,
2679                            root_name: db_worktree.root_name,
2680                            visible: db_worktree.visible,
2681                            entries: Default::default(),
2682                            repository_entries: Default::default(),
2683                            diagnostic_summaries: Default::default(),
2684                            settings_files: Default::default(),
2685                            scan_id: db_worktree.scan_id as u64,
2686                            completed_scan_id: db_worktree.completed_scan_id as u64,
2687                        },
2688                    )
2689                })
2690                .collect::<BTreeMap<_, _>>();
2691
2692            // Populate worktree entries.
2693            {
2694                let mut db_entries = worktree_entry::Entity::find()
2695                    .filter(
2696                        Condition::all()
2697                            .add(worktree_entry::Column::ProjectId.eq(project_id))
2698                            .add(worktree_entry::Column::IsDeleted.eq(false)),
2699                    )
2700                    .stream(&*tx)
2701                    .await?;
2702                while let Some(db_entry) = db_entries.next().await {
2703                    let db_entry = db_entry?;
2704                    if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2705                        worktree.entries.push(proto::Entry {
2706                            id: db_entry.id as u64,
2707                            is_dir: db_entry.is_dir,
2708                            path: db_entry.path,
2709                            inode: db_entry.inode as u64,
2710                            mtime: Some(proto::Timestamp {
2711                                seconds: db_entry.mtime_seconds as u64,
2712                                nanos: db_entry.mtime_nanos as u32,
2713                            }),
2714                            is_symlink: db_entry.is_symlink,
2715                            is_ignored: db_entry.is_ignored,
2716                            is_external: db_entry.is_external,
2717                            git_status: db_entry.git_status.map(|status| status as i32),
2718                        });
2719                    }
2720                }
2721            }
2722
2723            // Populate repository entries.
2724            {
2725                let mut db_repository_entries = worktree_repository::Entity::find()
2726                    .filter(
2727                        Condition::all()
2728                            .add(worktree_repository::Column::ProjectId.eq(project_id))
2729                            .add(worktree_repository::Column::IsDeleted.eq(false)),
2730                    )
2731                    .stream(&*tx)
2732                    .await?;
2733                while let Some(db_repository_entry) = db_repository_entries.next().await {
2734                    let db_repository_entry = db_repository_entry?;
2735                    if let Some(worktree) =
2736                        worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
2737                    {
2738                        worktree.repository_entries.insert(
2739                            db_repository_entry.work_directory_id as u64,
2740                            proto::RepositoryEntry {
2741                                work_directory_id: db_repository_entry.work_directory_id as u64,
2742                                branch: db_repository_entry.branch,
2743                            },
2744                        );
2745                    }
2746                }
2747            }
2748
2749            // Populate worktree diagnostic summaries.
2750            {
2751                let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2752                    .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2753                    .stream(&*tx)
2754                    .await?;
2755                while let Some(db_summary) = db_summaries.next().await {
2756                    let db_summary = db_summary?;
2757                    if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2758                        worktree
2759                            .diagnostic_summaries
2760                            .push(proto::DiagnosticSummary {
2761                                path: db_summary.path,
2762                                language_server_id: db_summary.language_server_id as u64,
2763                                error_count: db_summary.error_count as u32,
2764                                warning_count: db_summary.warning_count as u32,
2765                            });
2766                    }
2767                }
2768            }
2769
2770            // Populate worktree settings files
2771            {
2772                let mut db_settings_files = worktree_settings_file::Entity::find()
2773                    .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
2774                    .stream(&*tx)
2775                    .await?;
2776                while let Some(db_settings_file) = db_settings_files.next().await {
2777                    let db_settings_file = db_settings_file?;
2778                    if let Some(worktree) =
2779                        worktrees.get_mut(&(db_settings_file.worktree_id as u64))
2780                    {
2781                        worktree.settings_files.push(WorktreeSettingsFile {
2782                            path: db_settings_file.path,
2783                            content: db_settings_file.content,
2784                        });
2785                    }
2786                }
2787            }
2788
2789            // Populate language servers.
2790            let language_servers = project
2791                .find_related(language_server::Entity)
2792                .all(&*tx)
2793                .await?;
2794
2795            let project = Project {
2796                collaborators: collaborators
2797                    .into_iter()
2798                    .map(|collaborator| ProjectCollaborator {
2799                        connection_id: collaborator.connection(),
2800                        user_id: collaborator.user_id,
2801                        replica_id: collaborator.replica_id,
2802                        is_host: collaborator.is_host,
2803                    })
2804                    .collect(),
2805                worktrees,
2806                language_servers: language_servers
2807                    .into_iter()
2808                    .map(|language_server| proto::LanguageServer {
2809                        id: language_server.id as u64,
2810                        name: language_server.name,
2811                    })
2812                    .collect(),
2813            };
2814            Ok((project, replica_id as ReplicaId))
2815        })
2816        .await
2817    }
2818
2819    pub async fn leave_project(
2820        &self,
2821        project_id: ProjectId,
2822        connection: ConnectionId,
2823    ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
2824        let room_id = self.room_id_for_project(project_id).await?;
2825        self.room_transaction(room_id, |tx| async move {
2826            let result = project_collaborator::Entity::delete_many()
2827                .filter(
2828                    Condition::all()
2829                        .add(project_collaborator::Column::ProjectId.eq(project_id))
2830                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2831                        .add(
2832                            project_collaborator::Column::ConnectionServerId
2833                                .eq(connection.owner_id as i32),
2834                        ),
2835                )
2836                .exec(&*tx)
2837                .await?;
2838            if result.rows_affected == 0 {
2839                Err(anyhow!("not a collaborator on this project"))?;
2840            }
2841
2842            let project = project::Entity::find_by_id(project_id)
2843                .one(&*tx)
2844                .await?
2845                .ok_or_else(|| anyhow!("no such project"))?;
2846            let collaborators = project
2847                .find_related(project_collaborator::Entity)
2848                .all(&*tx)
2849                .await?;
2850            let connection_ids = collaborators
2851                .into_iter()
2852                .map(|collaborator| collaborator.connection())
2853                .collect();
2854
2855            follower::Entity::delete_many()
2856                .filter(
2857                    Condition::any()
2858                        .add(
2859                            Condition::all()
2860                                .add(follower::Column::ProjectId.eq(project_id))
2861                                .add(
2862                                    follower::Column::LeaderConnectionServerId
2863                                        .eq(connection.owner_id),
2864                                )
2865                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
2866                        )
2867                        .add(
2868                            Condition::all()
2869                                .add(follower::Column::ProjectId.eq(project_id))
2870                                .add(
2871                                    follower::Column::FollowerConnectionServerId
2872                                        .eq(connection.owner_id),
2873                                )
2874                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
2875                        ),
2876                )
2877                .exec(&*tx)
2878                .await?;
2879
2880            let room = self.get_room(project.room_id, &tx).await?;
2881            let left_project = LeftProject {
2882                id: project_id,
2883                host_user_id: project.host_user_id,
2884                host_connection_id: project.host_connection()?,
2885                connection_ids,
2886            };
2887            Ok((room, left_project))
2888        })
2889        .await
2890    }
2891
2892    pub async fn project_collaborators(
2893        &self,
2894        project_id: ProjectId,
2895        connection_id: ConnectionId,
2896    ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
2897        let room_id = self.room_id_for_project(project_id).await?;
2898        self.room_transaction(room_id, |tx| async move {
2899            let collaborators = project_collaborator::Entity::find()
2900                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2901                .all(&*tx)
2902                .await?
2903                .into_iter()
2904                .map(|collaborator| ProjectCollaborator {
2905                    connection_id: collaborator.connection(),
2906                    user_id: collaborator.user_id,
2907                    replica_id: collaborator.replica_id,
2908                    is_host: collaborator.is_host,
2909                })
2910                .collect::<Vec<_>>();
2911
2912            if collaborators
2913                .iter()
2914                .any(|collaborator| collaborator.connection_id == connection_id)
2915            {
2916                Ok(collaborators)
2917            } else {
2918                Err(anyhow!("no such project"))?
2919            }
2920        })
2921        .await
2922    }
2923
2924    pub async fn project_connection_ids(
2925        &self,
2926        project_id: ProjectId,
2927        connection_id: ConnectionId,
2928    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
2929        let room_id = self.room_id_for_project(project_id).await?;
2930        self.room_transaction(room_id, |tx| async move {
2931            let mut collaborators = project_collaborator::Entity::find()
2932                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2933                .stream(&*tx)
2934                .await?;
2935
2936            let mut connection_ids = HashSet::default();
2937            while let Some(collaborator) = collaborators.next().await {
2938                let collaborator = collaborator?;
2939                connection_ids.insert(collaborator.connection());
2940            }
2941
2942            if connection_ids.contains(&connection_id) {
2943                Ok(connection_ids)
2944            } else {
2945                Err(anyhow!("no such project"))?
2946            }
2947        })
2948        .await
2949    }
2950
2951    async fn project_guest_connection_ids(
2952        &self,
2953        project_id: ProjectId,
2954        tx: &DatabaseTransaction,
2955    ) -> Result<Vec<ConnectionId>> {
2956        let mut collaborators = project_collaborator::Entity::find()
2957            .filter(
2958                project_collaborator::Column::ProjectId
2959                    .eq(project_id)
2960                    .and(project_collaborator::Column::IsHost.eq(false)),
2961            )
2962            .stream(tx)
2963            .await?;
2964
2965        let mut guest_connection_ids = Vec::new();
2966        while let Some(collaborator) = collaborators.next().await {
2967            let collaborator = collaborator?;
2968            guest_connection_ids.push(collaborator.connection());
2969        }
2970        Ok(guest_connection_ids)
2971    }
2972
2973    async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
2974        self.transaction(|tx| async move {
2975            let project = project::Entity::find_by_id(project_id)
2976                .one(&*tx)
2977                .await?
2978                .ok_or_else(|| anyhow!("project {} not found", project_id))?;
2979            Ok(project.room_id)
2980        })
2981        .await
2982    }
2983
2984    // access tokens
2985
2986    pub async fn create_access_token(
2987        &self,
2988        user_id: UserId,
2989        access_token_hash: &str,
2990        max_access_token_count: usize,
2991    ) -> Result<AccessTokenId> {
2992        self.transaction(|tx| async {
2993            let tx = tx;
2994
2995            let token = access_token::ActiveModel {
2996                user_id: ActiveValue::set(user_id),
2997                hash: ActiveValue::set(access_token_hash.into()),
2998                ..Default::default()
2999            }
3000            .insert(&*tx)
3001            .await?;
3002
3003            access_token::Entity::delete_many()
3004                .filter(
3005                    access_token::Column::Id.in_subquery(
3006                        Query::select()
3007                            .column(access_token::Column::Id)
3008                            .from(access_token::Entity)
3009                            .and_where(access_token::Column::UserId.eq(user_id))
3010                            .order_by(access_token::Column::Id, sea_orm::Order::Desc)
3011                            .limit(10000)
3012                            .offset(max_access_token_count as u64)
3013                            .to_owned(),
3014                    ),
3015                )
3016                .exec(&*tx)
3017                .await?;
3018            Ok(token.id)
3019        })
3020        .await
3021    }
3022
3023    pub async fn get_access_token(
3024        &self,
3025        access_token_id: AccessTokenId,
3026    ) -> Result<access_token::Model> {
3027        self.transaction(|tx| async move {
3028            Ok(access_token::Entity::find_by_id(access_token_id)
3029                .one(&*tx)
3030                .await?
3031                .ok_or_else(|| anyhow!("no such access token"))?)
3032        })
3033        .await
3034    }
3035
3036    // channels
3037
3038    pub async fn create_root_channel(&self, name: &str) -> Result<ChannelId> {
3039        self.create_channel(name, None).await
3040    }
3041
3042    pub async fn create_channel(&self, name: &str, parent: Option<ChannelId>) -> Result<ChannelId> {
3043        self.transaction(move |tx| async move {
3044            let tx = tx;
3045
3046            let channel = channel::ActiveModel {
3047                name: ActiveValue::Set(name.to_string()),
3048                ..Default::default()
3049            };
3050
3051            let channel = channel.insert(&*tx).await?;
3052
3053            if let Some(parent) = parent {
3054                channel_parent::ActiveModel {
3055                    child_id: ActiveValue::Set(channel.id),
3056                    parent_id: ActiveValue::Set(parent),
3057                }
3058                .insert(&*tx)
3059                .await?;
3060            }
3061
3062            Ok(channel.id)
3063        })
3064        .await
3065    }
3066
3067    // Property: Members are only
3068    pub async fn add_channel_member(&self, channel_id: ChannelId, user_id: UserId) -> Result<()> {
3069        self.transaction(move |tx| async move {
3070            let tx = tx;
3071
3072            let channel_membership = channel_member::ActiveModel {
3073                channel_id: ActiveValue::Set(channel_id),
3074                user_id: ActiveValue::Set(user_id),
3075                ..Default::default()
3076            };
3077
3078            channel_membership.insert(&*tx).await?;
3079
3080            Ok(())
3081        })
3082        .await
3083    }
3084
3085    pub async fn get_channels(&self, user_id: UserId) -> Result<Vec<Channel>> {
3086        self.transaction(|tx| async move {
3087            let tx = tx;
3088
3089            // This is the SQL statement we want to generate:
3090            let sql = r#"
3091            WITH RECURSIVE channel_tree(child_id, parent_id, depth) AS (
3092                    SELECT channel_id as child_id, NULL as parent_id, 0
3093                    FROM channel_members
3094                    WHERE user_id = ?
3095                UNION ALL
3096                    SELECT channel_parents.child_id, channel_parents.parent_id, channel_tree.depth + 1
3097                    FROM channel_parents, channel_tree
3098                    WHERE channel_parents.parent_id = channel_tree.child_id
3099            )
3100            SELECT channel_tree.child_id as id, channels.name, channel_tree.parent_id
3101            FROM channel_tree
3102            JOIN channels ON channels.id = channel_tree.child_id
3103            ORDER BY channel_tree.depth;
3104            "#;
3105
3106            // let root_channel_ids_query = SelectStatement::new()
3107            //     .column(channel_member::Column::ChannelId)
3108            //     .expr(Expr::value("NULL"))
3109            //     .from(channel_member::Entity.table_ref())
3110            //     .and_where(
3111            //         Expr::col(channel_member::Column::UserId)
3112            //             .eq(Expr::cust_with_values("?", vec![user_id])),
3113            //     );
3114
3115            // let build_tree_query = SelectStatement::new()
3116            //     .column(channel_parent::Column::ChildId)
3117            //     .column(channel_parent::Column::ParentId)
3118            //     .expr(Expr::col(Alias::new("channel_tree.depth")).add(1i32))
3119            //     .from(Alias::new("channel_tree"))
3120            //     .and_where(
3121            //         Expr::col(channel_parent::Column::ParentId)
3122            //             .equals(Alias::new("channel_tree"), Alias::new("child_id")),
3123            //     )
3124            //     .to_owned();
3125
3126            // let common_table_expression = CommonTableExpression::new()
3127            //     .query(
3128            //         root_channel_ids_query
3129            //             .union(UnionType::Distinct, build_tree_query)
3130            //             .to_owned(),
3131            //     )
3132            //     .column(Alias::new("child_id"))
3133            //     .column(Alias::new("parent_id"))
3134            //     .column(Alias::new("depth"))
3135            //     .table_name(Alias::new("channel_tree"))
3136            //     .to_owned();
3137
3138            // let select = SelectStatement::new()
3139            //     .expr_as(
3140            //         Expr::col(Alias::new("channel_tree.child_id")),
3141            //         Alias::new("id"),
3142            //     )
3143            //     .column(channel::Column::Name)
3144            //     .column(Alias::new("channel_tree.parent_id"))
3145            //     .from(Alias::new("channel_tree"))
3146            //     .inner_join(
3147            //         channel::Entity.table_ref(),
3148            //         Expr::eq(
3149            //             channel::Column::Id.into_expr(),
3150            //             Expr::tbl(Alias::new("channel_tree"), Alias::new("child_id")),
3151            //         ),
3152            //     )
3153            //     .order_by(Alias::new("channel_tree.child_id"), Order::Asc)
3154            //     .to_owned();
3155
3156            // let with_clause = WithClause::new()
3157            //     .recursive(true)
3158            //     .cte(common_table_expression)
3159            //     .to_owned();
3160
3161            // let query = select.with(with_clause);
3162
3163            // let query = SelectStatement::new()
3164            //     .column(ColumnRef::Asterisk)
3165            //     .from_subquery(query, Alias::new("channel_tree")
3166            //     .to_owned();
3167
3168            // let stmt = self.pool.get_database_backend().build(&query);
3169
3170            let stmt = Statement::from_sql_and_values(
3171                self.pool.get_database_backend(),
3172                sql,
3173                vec![user_id.into()],
3174            );
3175
3176            Ok(channel_parent::Entity::find()
3177                .from_raw_sql(stmt)
3178                .into_model::<Channel>()
3179                .all(&*tx)
3180                .await?)
3181        })
3182        .await
3183    }
3184
3185    async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
3186    where
3187        F: Send + Fn(TransactionHandle) -> Fut,
3188        Fut: Send + Future<Output = Result<T>>,
3189    {
3190        let body = async {
3191            let mut i = 0;
3192            loop {
3193                let (tx, result) = self.with_transaction(&f).await?;
3194                match result {
3195                    Ok(result) => match tx.commit().await.map_err(Into::into) {
3196                        Ok(()) => return Ok(result),
3197                        Err(error) => {
3198                            if !self.retry_on_serialization_error(&error, i).await {
3199                                return Err(error);
3200                            }
3201                        }
3202                    },
3203                    Err(error) => {
3204                        tx.rollback().await?;
3205                        if !self.retry_on_serialization_error(&error, i).await {
3206                            return Err(error);
3207                        }
3208                    }
3209                }
3210                i += 1;
3211            }
3212        };
3213
3214        self.run(body).await
3215    }
3216
3217    async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
3218    where
3219        F: Send + Fn(TransactionHandle) -> Fut,
3220        Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
3221    {
3222        let body = async {
3223            let mut i = 0;
3224            loop {
3225                let (tx, result) = self.with_transaction(&f).await?;
3226                match result {
3227                    Ok(Some((room_id, data))) => {
3228                        let lock = self.rooms.entry(room_id).or_default().clone();
3229                        let _guard = lock.lock_owned().await;
3230                        match tx.commit().await.map_err(Into::into) {
3231                            Ok(()) => {
3232                                return Ok(Some(RoomGuard {
3233                                    data,
3234                                    _guard,
3235                                    _not_send: PhantomData,
3236                                }));
3237                            }
3238                            Err(error) => {
3239                                if !self.retry_on_serialization_error(&error, i).await {
3240                                    return Err(error);
3241                                }
3242                            }
3243                        }
3244                    }
3245                    Ok(None) => match tx.commit().await.map_err(Into::into) {
3246                        Ok(()) => return Ok(None),
3247                        Err(error) => {
3248                            if !self.retry_on_serialization_error(&error, i).await {
3249                                return Err(error);
3250                            }
3251                        }
3252                    },
3253                    Err(error) => {
3254                        tx.rollback().await?;
3255                        if !self.retry_on_serialization_error(&error, i).await {
3256                            return Err(error);
3257                        }
3258                    }
3259                }
3260                i += 1;
3261            }
3262        };
3263
3264        self.run(body).await
3265    }
3266
3267    async fn room_transaction<F, Fut, T>(&self, room_id: RoomId, f: F) -> Result<RoomGuard<T>>
3268    where
3269        F: Send + Fn(TransactionHandle) -> Fut,
3270        Fut: Send + Future<Output = Result<T>>,
3271    {
3272        let body = async {
3273            let mut i = 0;
3274            loop {
3275                let lock = self.rooms.entry(room_id).or_default().clone();
3276                let _guard = lock.lock_owned().await;
3277                let (tx, result) = self.with_transaction(&f).await?;
3278                match result {
3279                    Ok(data) => match tx.commit().await.map_err(Into::into) {
3280                        Ok(()) => {
3281                            return Ok(RoomGuard {
3282                                data,
3283                                _guard,
3284                                _not_send: PhantomData,
3285                            });
3286                        }
3287                        Err(error) => {
3288                            if !self.retry_on_serialization_error(&error, i).await {
3289                                return Err(error);
3290                            }
3291                        }
3292                    },
3293                    Err(error) => {
3294                        tx.rollback().await?;
3295                        if !self.retry_on_serialization_error(&error, i).await {
3296                            return Err(error);
3297                        }
3298                    }
3299                }
3300                i += 1;
3301            }
3302        };
3303
3304        self.run(body).await
3305    }
3306
3307    async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
3308    where
3309        F: Send + Fn(TransactionHandle) -> Fut,
3310        Fut: Send + Future<Output = Result<T>>,
3311    {
3312        let tx = self
3313            .pool
3314            .begin_with_config(Some(IsolationLevel::Serializable), None)
3315            .await?;
3316
3317        let mut tx = Arc::new(Some(tx));
3318        let result = f(TransactionHandle(tx.clone())).await;
3319        let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
3320            return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
3321        };
3322
3323        Ok((tx, result))
3324    }
3325
3326    async fn run<F, T>(&self, future: F) -> Result<T>
3327    where
3328        F: Future<Output = Result<T>>,
3329    {
3330        #[cfg(test)]
3331        {
3332            if let Executor::Deterministic(executor) = &self.executor {
3333                executor.simulate_random_delay().await;
3334            }
3335
3336            self.runtime.as_ref().unwrap().block_on(future)
3337        }
3338
3339        #[cfg(not(test))]
3340        {
3341            future.await
3342        }
3343    }
3344
3345    async fn retry_on_serialization_error(&self, error: &Error, prev_attempt_count: u32) -> bool {
3346        // If the error is due to a failure to serialize concurrent transactions, then retry
3347        // this transaction after a delay. With each subsequent retry, double the delay duration.
3348        // Also vary the delay randomly in order to ensure different database connections retry
3349        // at different times.
3350        if is_serialization_error(error) {
3351            let base_delay = 4_u64 << prev_attempt_count.min(16);
3352            let randomized_delay = base_delay as f32 * self.rng.lock().await.gen_range(0.5..=2.0);
3353            log::info!(
3354                "retrying transaction after serialization error. delay: {} ms.",
3355                randomized_delay
3356            );
3357            self.executor
3358                .sleep(Duration::from_millis(randomized_delay as u64))
3359                .await;
3360            true
3361        } else {
3362            false
3363        }
3364    }
3365}
3366
3367fn is_serialization_error(error: &Error) -> bool {
3368    const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
3369    match error {
3370        Error::Database(
3371            DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
3372            | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
3373        ) if error
3374            .as_database_error()
3375            .and_then(|error| error.code())
3376            .as_deref()
3377            == Some(SERIALIZATION_FAILURE_CODE) =>
3378        {
3379            true
3380        }
3381        _ => false,
3382    }
3383}
3384
3385struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
3386
3387impl Deref for TransactionHandle {
3388    type Target = DatabaseTransaction;
3389
3390    fn deref(&self) -> &Self::Target {
3391        self.0.as_ref().as_ref().unwrap()
3392    }
3393}
3394
3395pub struct RoomGuard<T> {
3396    data: T,
3397    _guard: OwnedMutexGuard<()>,
3398    _not_send: PhantomData<Rc<()>>,
3399}
3400
3401impl<T> Deref for RoomGuard<T> {
3402    type Target = T;
3403
3404    fn deref(&self) -> &T {
3405        &self.data
3406    }
3407}
3408
3409impl<T> DerefMut for RoomGuard<T> {
3410    fn deref_mut(&mut self) -> &mut T {
3411        &mut self.data
3412    }
3413}
3414
3415#[derive(Debug, Serialize, Deserialize)]
3416pub struct NewUserParams {
3417    pub github_login: String,
3418    pub github_user_id: i32,
3419    pub invite_count: i32,
3420}
3421
3422#[derive(Debug)]
3423pub struct NewUserResult {
3424    pub user_id: UserId,
3425    pub metrics_id: String,
3426    pub inviting_user_id: Option<UserId>,
3427    pub signup_device_id: Option<String>,
3428}
3429
3430#[derive(FromQueryResult, Debug, PartialEq)]
3431pub struct Channel {
3432    pub id: ChannelId,
3433    pub name: String,
3434    pub parent_id: Option<ChannelId>,
3435}
3436
3437fn random_invite_code() -> String {
3438    nanoid::nanoid!(16)
3439}
3440
3441fn random_email_confirmation_code() -> String {
3442    nanoid::nanoid!(64)
3443}
3444
3445macro_rules! id_type {
3446    ($name:ident) => {
3447        #[derive(
3448            Clone,
3449            Copy,
3450            Debug,
3451            Default,
3452            PartialEq,
3453            Eq,
3454            PartialOrd,
3455            Ord,
3456            Hash,
3457            Serialize,
3458            Deserialize,
3459        )]
3460        #[serde(transparent)]
3461        pub struct $name(pub i32);
3462
3463        impl $name {
3464            #[allow(unused)]
3465            pub const MAX: Self = Self(i32::MAX);
3466
3467            #[allow(unused)]
3468            pub fn from_proto(value: u64) -> Self {
3469                Self(value as i32)
3470            }
3471
3472            #[allow(unused)]
3473            pub fn to_proto(self) -> u64 {
3474                self.0 as u64
3475            }
3476        }
3477
3478        impl std::fmt::Display for $name {
3479            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
3480                self.0.fmt(f)
3481            }
3482        }
3483
3484        impl From<$name> for sea_query::Value {
3485            fn from(value: $name) -> Self {
3486                sea_query::Value::Int(Some(value.0))
3487            }
3488        }
3489
3490        impl sea_orm::TryGetable for $name {
3491            fn try_get(
3492                res: &sea_orm::QueryResult,
3493                pre: &str,
3494                col: &str,
3495            ) -> Result<Self, sea_orm::TryGetError> {
3496                Ok(Self(i32::try_get(res, pre, col)?))
3497            }
3498        }
3499
3500        impl sea_query::ValueType for $name {
3501            fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
3502                match v {
3503                    Value::TinyInt(Some(int)) => {
3504                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3505                    }
3506                    Value::SmallInt(Some(int)) => {
3507                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3508                    }
3509                    Value::Int(Some(int)) => {
3510                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3511                    }
3512                    Value::BigInt(Some(int)) => {
3513                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3514                    }
3515                    Value::TinyUnsigned(Some(int)) => {
3516                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3517                    }
3518                    Value::SmallUnsigned(Some(int)) => {
3519                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3520                    }
3521                    Value::Unsigned(Some(int)) => {
3522                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3523                    }
3524                    Value::BigUnsigned(Some(int)) => {
3525                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3526                    }
3527                    _ => Err(sea_query::ValueTypeErr),
3528                }
3529            }
3530
3531            fn type_name() -> String {
3532                stringify!($name).into()
3533            }
3534
3535            fn array_type() -> sea_query::ArrayType {
3536                sea_query::ArrayType::Int
3537            }
3538
3539            fn column_type() -> sea_query::ColumnType {
3540                sea_query::ColumnType::Integer(None)
3541            }
3542        }
3543
3544        impl sea_orm::TryFromU64 for $name {
3545            fn try_from_u64(n: u64) -> Result<Self, DbErr> {
3546                Ok(Self(n.try_into().map_err(|_| {
3547                    DbErr::ConvertFromU64(concat!(
3548                        "error converting ",
3549                        stringify!($name),
3550                        " to u64"
3551                    ))
3552                })?))
3553            }
3554        }
3555
3556        impl sea_query::Nullable for $name {
3557            fn null() -> Value {
3558                Value::Int(None)
3559            }
3560        }
3561    };
3562}
3563
3564id_type!(AccessTokenId);
3565id_type!(ChannelId);
3566id_type!(ChannelMemberId);
3567id_type!(ContactId);
3568id_type!(FollowerId);
3569id_type!(RoomId);
3570id_type!(RoomParticipantId);
3571id_type!(ProjectId);
3572id_type!(ProjectCollaboratorId);
3573id_type!(ReplicaId);
3574id_type!(ServerId);
3575id_type!(SignupId);
3576id_type!(UserId);
3577
3578pub struct RejoinedRoom {
3579    pub room: proto::Room,
3580    pub rejoined_projects: Vec<RejoinedProject>,
3581    pub reshared_projects: Vec<ResharedProject>,
3582}
3583
3584pub struct ResharedProject {
3585    pub id: ProjectId,
3586    pub old_connection_id: ConnectionId,
3587    pub collaborators: Vec<ProjectCollaborator>,
3588    pub worktrees: Vec<proto::WorktreeMetadata>,
3589}
3590
3591pub struct RejoinedProject {
3592    pub id: ProjectId,
3593    pub old_connection_id: ConnectionId,
3594    pub collaborators: Vec<ProjectCollaborator>,
3595    pub worktrees: Vec<RejoinedWorktree>,
3596    pub language_servers: Vec<proto::LanguageServer>,
3597}
3598
3599#[derive(Debug)]
3600pub struct RejoinedWorktree {
3601    pub id: u64,
3602    pub abs_path: String,
3603    pub root_name: String,
3604    pub visible: bool,
3605    pub updated_entries: Vec<proto::Entry>,
3606    pub removed_entries: Vec<u64>,
3607    pub updated_repositories: Vec<proto::RepositoryEntry>,
3608    pub removed_repositories: Vec<u64>,
3609    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3610    pub settings_files: Vec<WorktreeSettingsFile>,
3611    pub scan_id: u64,
3612    pub completed_scan_id: u64,
3613}
3614
3615pub struct LeftRoom {
3616    pub room: proto::Room,
3617    pub left_projects: HashMap<ProjectId, LeftProject>,
3618    pub canceled_calls_to_user_ids: Vec<UserId>,
3619}
3620
3621pub struct RefreshedRoom {
3622    pub room: proto::Room,
3623    pub stale_participant_user_ids: Vec<UserId>,
3624    pub canceled_calls_to_user_ids: Vec<UserId>,
3625}
3626
3627pub struct Project {
3628    pub collaborators: Vec<ProjectCollaborator>,
3629    pub worktrees: BTreeMap<u64, Worktree>,
3630    pub language_servers: Vec<proto::LanguageServer>,
3631}
3632
3633pub struct ProjectCollaborator {
3634    pub connection_id: ConnectionId,
3635    pub user_id: UserId,
3636    pub replica_id: ReplicaId,
3637    pub is_host: bool,
3638}
3639
3640impl ProjectCollaborator {
3641    pub fn to_proto(&self) -> proto::Collaborator {
3642        proto::Collaborator {
3643            peer_id: Some(self.connection_id.into()),
3644            replica_id: self.replica_id.0 as u32,
3645            user_id: self.user_id.to_proto(),
3646        }
3647    }
3648}
3649
3650#[derive(Debug)]
3651pub struct LeftProject {
3652    pub id: ProjectId,
3653    pub host_user_id: UserId,
3654    pub host_connection_id: ConnectionId,
3655    pub connection_ids: Vec<ConnectionId>,
3656}
3657
3658pub struct Worktree {
3659    pub id: u64,
3660    pub abs_path: String,
3661    pub root_name: String,
3662    pub visible: bool,
3663    pub entries: Vec<proto::Entry>,
3664    pub repository_entries: BTreeMap<u64, proto::RepositoryEntry>,
3665    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3666    pub settings_files: Vec<WorktreeSettingsFile>,
3667    pub scan_id: u64,
3668    pub completed_scan_id: u64,
3669}
3670
3671#[derive(Debug)]
3672pub struct WorktreeSettingsFile {
3673    pub path: String,
3674    pub content: String,
3675}
3676
3677#[cfg(test)]
3678pub use test::*;
3679
3680#[cfg(test)]
3681mod test {
3682    use super::*;
3683    use gpui::executor::Background;
3684    use parking_lot::Mutex;
3685    use sea_orm::ConnectionTrait;
3686    use sqlx::migrate::MigrateDatabase;
3687    use std::sync::Arc;
3688
3689    pub struct TestDb {
3690        pub db: Option<Arc<Database>>,
3691        pub connection: Option<sqlx::AnyConnection>,
3692    }
3693
3694    impl TestDb {
3695        pub fn sqlite(background: Arc<Background>) -> Self {
3696            let url = format!("sqlite::memory:");
3697            let runtime = tokio::runtime::Builder::new_current_thread()
3698                .enable_io()
3699                .enable_time()
3700                .build()
3701                .unwrap();
3702
3703            let mut db = runtime.block_on(async {
3704                let mut options = ConnectOptions::new(url);
3705                options.max_connections(5);
3706                let db = Database::new(options, Executor::Deterministic(background))
3707                    .await
3708                    .unwrap();
3709                let sql = include_str!(concat!(
3710                    env!("CARGO_MANIFEST_DIR"),
3711                    "/migrations.sqlite/20221109000000_test_schema.sql"
3712                ));
3713                db.pool
3714                    .execute(sea_orm::Statement::from_string(
3715                        db.pool.get_database_backend(),
3716                        sql.into(),
3717                    ))
3718                    .await
3719                    .unwrap();
3720                db
3721            });
3722
3723            db.runtime = Some(runtime);
3724
3725            Self {
3726                db: Some(Arc::new(db)),
3727                connection: None,
3728            }
3729        }
3730
3731        pub fn postgres(background: Arc<Background>) -> Self {
3732            static LOCK: Mutex<()> = Mutex::new(());
3733
3734            let _guard = LOCK.lock();
3735            let mut rng = StdRng::from_entropy();
3736            let url = format!(
3737                "postgres://postgres@localhost/zed-test-{}",
3738                rng.gen::<u128>()
3739            );
3740            let runtime = tokio::runtime::Builder::new_current_thread()
3741                .enable_io()
3742                .enable_time()
3743                .build()
3744                .unwrap();
3745
3746            let mut db = runtime.block_on(async {
3747                sqlx::Postgres::create_database(&url)
3748                    .await
3749                    .expect("failed to create test db");
3750                let mut options = ConnectOptions::new(url);
3751                options
3752                    .max_connections(5)
3753                    .idle_timeout(Duration::from_secs(0));
3754                let db = Database::new(options, Executor::Deterministic(background))
3755                    .await
3756                    .unwrap();
3757                let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
3758                db.migrate(Path::new(migrations_path), false).await.unwrap();
3759                db
3760            });
3761
3762            db.runtime = Some(runtime);
3763
3764            Self {
3765                db: Some(Arc::new(db)),
3766                connection: None,
3767            }
3768        }
3769
3770        pub fn db(&self) -> &Arc<Database> {
3771            self.db.as_ref().unwrap()
3772        }
3773    }
3774
3775    impl Drop for TestDb {
3776        fn drop(&mut self) {
3777            let db = self.db.take().unwrap();
3778            if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
3779                db.runtime.as_ref().unwrap().block_on(async {
3780                    use util::ResultExt;
3781                    let query = "
3782                        SELECT pg_terminate_backend(pg_stat_activity.pid)
3783                        FROM pg_stat_activity
3784                        WHERE
3785                            pg_stat_activity.datname = current_database() AND
3786                            pid <> pg_backend_pid();
3787                    ";
3788                    db.pool
3789                        .execute(sea_orm::Statement::from_string(
3790                            db.pool.get_database_backend(),
3791                            query.into(),
3792                        ))
3793                        .await
3794                        .log_err();
3795                    sqlx::Postgres::drop_database(db.options.get_url())
3796                        .await
3797                        .log_err();
3798                })
3799            }
3800        }
3801    }
3802}