db.rs

   1mod access_token;
   2mod channel;
   3mod channel_member;
   4mod channel_parent;
   5mod contact;
   6mod follower;
   7mod language_server;
   8mod project;
   9mod project_collaborator;
  10mod room;
  11mod room_participant;
  12mod server;
  13mod signup;
  14#[cfg(test)]
  15mod tests;
  16mod user;
  17mod worktree;
  18mod worktree_diagnostic_summary;
  19mod worktree_entry;
  20mod worktree_repository;
  21mod worktree_repository_statuses;
  22mod worktree_settings_file;
  23
  24use crate::executor::Executor;
  25use crate::{Error, Result};
  26use anyhow::anyhow;
  27use collections::{BTreeMap, HashMap, HashSet};
  28pub use contact::Contact;
  29use dashmap::DashMap;
  30use futures::StreamExt;
  31use hyper::StatusCode;
  32use rand::prelude::StdRng;
  33use rand::{Rng, SeedableRng};
  34use rpc::{proto, ConnectionId};
  35use sea_orm::Condition;
  36pub use sea_orm::ConnectOptions;
  37use sea_orm::{
  38    entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
  39    DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
  40    Statement, TransactionTrait,
  41};
  42use sea_query::{Alias, Expr, OnConflict, Query};
  43use serde::{Deserialize, Serialize};
  44pub use signup::{Invite, NewSignup, WaitlistSummary};
  45use sqlx::migrate::{Migrate, Migration, MigrationSource};
  46use sqlx::Connection;
  47use std::fmt::Write as _;
  48use std::ops::{Deref, DerefMut};
  49use std::path::Path;
  50use std::time::Duration;
  51use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
  52use tokio::sync::{Mutex, OwnedMutexGuard};
  53pub use user::Model as User;
  54
  55pub struct Database {
  56    options: ConnectOptions,
  57    pool: DatabaseConnection,
  58    rooms: DashMap<RoomId, Arc<Mutex<()>>>,
  59    rng: Mutex<StdRng>,
  60    executor: Executor,
  61    #[cfg(test)]
  62    runtime: Option<tokio::runtime::Runtime>,
  63}
  64
  65impl Database {
  66    pub async fn new(options: ConnectOptions, executor: Executor) -> Result<Self> {
  67        Ok(Self {
  68            options: options.clone(),
  69            pool: sea_orm::Database::connect(options).await?,
  70            rooms: DashMap::with_capacity(16384),
  71            rng: Mutex::new(StdRng::seed_from_u64(0)),
  72            executor,
  73            #[cfg(test)]
  74            runtime: None,
  75        })
  76    }
  77
  78    #[cfg(test)]
  79    pub fn reset(&self) {
  80        self.rooms.clear();
  81    }
  82
  83    pub async fn migrate(
  84        &self,
  85        migrations_path: &Path,
  86        ignore_checksum_mismatch: bool,
  87    ) -> anyhow::Result<Vec<(Migration, Duration)>> {
  88        let migrations = MigrationSource::resolve(migrations_path)
  89            .await
  90            .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
  91
  92        let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
  93
  94        connection.ensure_migrations_table().await?;
  95        let applied_migrations: HashMap<_, _> = connection
  96            .list_applied_migrations()
  97            .await?
  98            .into_iter()
  99            .map(|m| (m.version, m))
 100            .collect();
 101
 102        let mut new_migrations = Vec::new();
 103        for migration in migrations {
 104            match applied_migrations.get(&migration.version) {
 105                Some(applied_migration) => {
 106                    if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
 107                    {
 108                        Err(anyhow!(
 109                            "checksum mismatch for applied migration {}",
 110                            migration.description
 111                        ))?;
 112                    }
 113                }
 114                None => {
 115                    let elapsed = connection.apply(&migration).await?;
 116                    new_migrations.push((migration, elapsed));
 117                }
 118            }
 119        }
 120
 121        Ok(new_migrations)
 122    }
 123
 124    pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
 125        self.transaction(|tx| async move {
 126            let server = server::ActiveModel {
 127                environment: ActiveValue::set(environment.into()),
 128                ..Default::default()
 129            }
 130            .insert(&*tx)
 131            .await?;
 132            Ok(server.id)
 133        })
 134        .await
 135    }
 136
 137    pub async fn stale_room_ids(
 138        &self,
 139        environment: &str,
 140        new_server_id: ServerId,
 141    ) -> Result<Vec<RoomId>> {
 142        self.transaction(|tx| async move {
 143            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 144            enum QueryAs {
 145                RoomId,
 146            }
 147
 148            let stale_server_epochs = self
 149                .stale_server_ids(environment, new_server_id, &tx)
 150                .await?;
 151            Ok(room_participant::Entity::find()
 152                .select_only()
 153                .column(room_participant::Column::RoomId)
 154                .distinct()
 155                .filter(
 156                    room_participant::Column::AnsweringConnectionServerId
 157                        .is_in(stale_server_epochs),
 158                )
 159                .into_values::<_, QueryAs>()
 160                .all(&*tx)
 161                .await?)
 162        })
 163        .await
 164    }
 165
 166    pub async fn refresh_room(
 167        &self,
 168        room_id: RoomId,
 169        new_server_id: ServerId,
 170    ) -> Result<RoomGuard<RefreshedRoom>> {
 171        self.room_transaction(room_id, |tx| async move {
 172            let stale_participant_filter = Condition::all()
 173                .add(room_participant::Column::RoomId.eq(room_id))
 174                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
 175                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
 176
 177            let stale_participant_user_ids = room_participant::Entity::find()
 178                .filter(stale_participant_filter.clone())
 179                .all(&*tx)
 180                .await?
 181                .into_iter()
 182                .map(|participant| participant.user_id)
 183                .collect::<Vec<_>>();
 184
 185            // Delete participants who failed to reconnect and cancel their calls.
 186            let mut canceled_calls_to_user_ids = Vec::new();
 187            room_participant::Entity::delete_many()
 188                .filter(stale_participant_filter)
 189                .exec(&*tx)
 190                .await?;
 191            let called_participants = room_participant::Entity::find()
 192                .filter(
 193                    Condition::all()
 194                        .add(
 195                            room_participant::Column::CallingUserId
 196                                .is_in(stale_participant_user_ids.iter().copied()),
 197                        )
 198                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
 199                )
 200                .all(&*tx)
 201                .await?;
 202            room_participant::Entity::delete_many()
 203                .filter(
 204                    room_participant::Column::Id
 205                        .is_in(called_participants.iter().map(|participant| participant.id)),
 206                )
 207                .exec(&*tx)
 208                .await?;
 209            canceled_calls_to_user_ids.extend(
 210                called_participants
 211                    .into_iter()
 212                    .map(|participant| participant.user_id),
 213            );
 214
 215            let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
 216            let channel_members = if let Some(channel_id) = channel_id {
 217                self.get_channel_members_internal(channel_id, &tx).await?
 218            } else {
 219                Vec::new()
 220            };
 221
 222            // Delete the room if it becomes empty.
 223            if room.participants.is_empty() {
 224                project::Entity::delete_many()
 225                    .filter(project::Column::RoomId.eq(room_id))
 226                    .exec(&*tx)
 227                    .await?;
 228                room::Entity::delete_by_id(room_id).exec(&*tx).await?;
 229            }
 230
 231            Ok(RefreshedRoom {
 232                room,
 233                channel_id,
 234                channel_members,
 235                stale_participant_user_ids,
 236                canceled_calls_to_user_ids,
 237            })
 238        })
 239        .await
 240    }
 241
 242    pub async fn delete_stale_servers(
 243        &self,
 244        environment: &str,
 245        new_server_id: ServerId,
 246    ) -> Result<()> {
 247        self.transaction(|tx| async move {
 248            server::Entity::delete_many()
 249                .filter(
 250                    Condition::all()
 251                        .add(server::Column::Environment.eq(environment))
 252                        .add(server::Column::Id.ne(new_server_id)),
 253                )
 254                .exec(&*tx)
 255                .await?;
 256            Ok(())
 257        })
 258        .await
 259    }
 260
 261    async fn stale_server_ids(
 262        &self,
 263        environment: &str,
 264        new_server_id: ServerId,
 265        tx: &DatabaseTransaction,
 266    ) -> Result<Vec<ServerId>> {
 267        let stale_servers = server::Entity::find()
 268            .filter(
 269                Condition::all()
 270                    .add(server::Column::Environment.eq(environment))
 271                    .add(server::Column::Id.ne(new_server_id)),
 272            )
 273            .all(&*tx)
 274            .await?;
 275        Ok(stale_servers.into_iter().map(|server| server.id).collect())
 276    }
 277
 278    // users
 279
 280    pub async fn create_user(
 281        &self,
 282        email_address: &str,
 283        admin: bool,
 284        params: NewUserParams,
 285    ) -> Result<NewUserResult> {
 286        self.transaction(|tx| async {
 287            let tx = tx;
 288            let user = user::Entity::insert(user::ActiveModel {
 289                email_address: ActiveValue::set(Some(email_address.into())),
 290                github_login: ActiveValue::set(params.github_login.clone()),
 291                github_user_id: ActiveValue::set(Some(params.github_user_id)),
 292                admin: ActiveValue::set(admin),
 293                metrics_id: ActiveValue::set(Uuid::new_v4()),
 294                ..Default::default()
 295            })
 296            .on_conflict(
 297                OnConflict::column(user::Column::GithubLogin)
 298                    .update_column(user::Column::GithubLogin)
 299                    .to_owned(),
 300            )
 301            .exec_with_returning(&*tx)
 302            .await?;
 303
 304            Ok(NewUserResult {
 305                user_id: user.id,
 306                metrics_id: user.metrics_id.to_string(),
 307                signup_device_id: None,
 308                inviting_user_id: None,
 309            })
 310        })
 311        .await
 312    }
 313
 314    pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
 315        self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
 316            .await
 317    }
 318
 319    pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
 320        self.transaction(|tx| async {
 321            let tx = tx;
 322            Ok(user::Entity::find()
 323                .filter(user::Column::Id.is_in(ids.iter().copied()))
 324                .all(&*tx)
 325                .await?)
 326        })
 327        .await
 328    }
 329
 330    pub async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>> {
 331        self.transaction(|tx| async move {
 332            Ok(user::Entity::find()
 333                .filter(user::Column::GithubLogin.eq(github_login))
 334                .one(&*tx)
 335                .await?)
 336        })
 337        .await
 338    }
 339
 340    pub async fn get_or_create_user_by_github_account(
 341        &self,
 342        github_login: &str,
 343        github_user_id: Option<i32>,
 344        github_email: Option<&str>,
 345    ) -> Result<Option<User>> {
 346        self.transaction(|tx| async move {
 347            let tx = &*tx;
 348            if let Some(github_user_id) = github_user_id {
 349                if let Some(user_by_github_user_id) = user::Entity::find()
 350                    .filter(user::Column::GithubUserId.eq(github_user_id))
 351                    .one(tx)
 352                    .await?
 353                {
 354                    let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
 355                    user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
 356                    Ok(Some(user_by_github_user_id.update(tx).await?))
 357                } else if let Some(user_by_github_login) = user::Entity::find()
 358                    .filter(user::Column::GithubLogin.eq(github_login))
 359                    .one(tx)
 360                    .await?
 361                {
 362                    let mut user_by_github_login = user_by_github_login.into_active_model();
 363                    user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
 364                    Ok(Some(user_by_github_login.update(tx).await?))
 365                } else {
 366                    let user = user::Entity::insert(user::ActiveModel {
 367                        email_address: ActiveValue::set(github_email.map(|email| email.into())),
 368                        github_login: ActiveValue::set(github_login.into()),
 369                        github_user_id: ActiveValue::set(Some(github_user_id)),
 370                        admin: ActiveValue::set(false),
 371                        invite_count: ActiveValue::set(0),
 372                        invite_code: ActiveValue::set(None),
 373                        metrics_id: ActiveValue::set(Uuid::new_v4()),
 374                        ..Default::default()
 375                    })
 376                    .exec_with_returning(&*tx)
 377                    .await?;
 378                    Ok(Some(user))
 379                }
 380            } else {
 381                Ok(user::Entity::find()
 382                    .filter(user::Column::GithubLogin.eq(github_login))
 383                    .one(tx)
 384                    .await?)
 385            }
 386        })
 387        .await
 388    }
 389
 390    pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
 391        self.transaction(|tx| async move {
 392            Ok(user::Entity::find()
 393                .order_by_asc(user::Column::GithubLogin)
 394                .limit(limit as u64)
 395                .offset(page as u64 * limit as u64)
 396                .all(&*tx)
 397                .await?)
 398        })
 399        .await
 400    }
 401
 402    pub async fn get_users_with_no_invites(
 403        &self,
 404        invited_by_another_user: bool,
 405    ) -> Result<Vec<User>> {
 406        self.transaction(|tx| async move {
 407            Ok(user::Entity::find()
 408                .filter(
 409                    user::Column::InviteCount
 410                        .eq(0)
 411                        .and(if invited_by_another_user {
 412                            user::Column::InviterId.is_not_null()
 413                        } else {
 414                            user::Column::InviterId.is_null()
 415                        }),
 416                )
 417                .all(&*tx)
 418                .await?)
 419        })
 420        .await
 421    }
 422
 423    pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
 424        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 425        enum QueryAs {
 426            MetricsId,
 427        }
 428
 429        self.transaction(|tx| async move {
 430            let metrics_id: Uuid = user::Entity::find_by_id(id)
 431                .select_only()
 432                .column(user::Column::MetricsId)
 433                .into_values::<_, QueryAs>()
 434                .one(&*tx)
 435                .await?
 436                .ok_or_else(|| anyhow!("could not find user"))?;
 437            Ok(metrics_id.to_string())
 438        })
 439        .await
 440    }
 441
 442    pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
 443        self.transaction(|tx| async move {
 444            user::Entity::update_many()
 445                .filter(user::Column::Id.eq(id))
 446                .set(user::ActiveModel {
 447                    admin: ActiveValue::set(is_admin),
 448                    ..Default::default()
 449                })
 450                .exec(&*tx)
 451                .await?;
 452            Ok(())
 453        })
 454        .await
 455    }
 456
 457    pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
 458        self.transaction(|tx| async move {
 459            user::Entity::update_many()
 460                .filter(user::Column::Id.eq(id))
 461                .set(user::ActiveModel {
 462                    connected_once: ActiveValue::set(connected_once),
 463                    ..Default::default()
 464                })
 465                .exec(&*tx)
 466                .await?;
 467            Ok(())
 468        })
 469        .await
 470    }
 471
 472    pub async fn destroy_user(&self, id: UserId) -> Result<()> {
 473        self.transaction(|tx| async move {
 474            access_token::Entity::delete_many()
 475                .filter(access_token::Column::UserId.eq(id))
 476                .exec(&*tx)
 477                .await?;
 478            user::Entity::delete_by_id(id).exec(&*tx).await?;
 479            Ok(())
 480        })
 481        .await
 482    }
 483
 484    // contacts
 485
 486    pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
 487        #[derive(Debug, FromQueryResult)]
 488        struct ContactWithUserBusyStatuses {
 489            user_id_a: UserId,
 490            user_id_b: UserId,
 491            a_to_b: bool,
 492            accepted: bool,
 493            should_notify: bool,
 494            user_a_busy: bool,
 495            user_b_busy: bool,
 496        }
 497
 498        self.transaction(|tx| async move {
 499            let user_a_participant = Alias::new("user_a_participant");
 500            let user_b_participant = Alias::new("user_b_participant");
 501            let mut db_contacts = contact::Entity::find()
 502                .column_as(
 503                    Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
 504                        .is_not_null(),
 505                    "user_a_busy",
 506                )
 507                .column_as(
 508                    Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
 509                        .is_not_null(),
 510                    "user_b_busy",
 511                )
 512                .filter(
 513                    contact::Column::UserIdA
 514                        .eq(user_id)
 515                        .or(contact::Column::UserIdB.eq(user_id)),
 516                )
 517                .join_as(
 518                    JoinType::LeftJoin,
 519                    contact::Relation::UserARoomParticipant.def(),
 520                    user_a_participant,
 521                )
 522                .join_as(
 523                    JoinType::LeftJoin,
 524                    contact::Relation::UserBRoomParticipant.def(),
 525                    user_b_participant,
 526                )
 527                .into_model::<ContactWithUserBusyStatuses>()
 528                .stream(&*tx)
 529                .await?;
 530
 531            let mut contacts = Vec::new();
 532            while let Some(db_contact) = db_contacts.next().await {
 533                let db_contact = db_contact?;
 534                if db_contact.user_id_a == user_id {
 535                    if db_contact.accepted {
 536                        contacts.push(Contact::Accepted {
 537                            user_id: db_contact.user_id_b,
 538                            should_notify: db_contact.should_notify && db_contact.a_to_b,
 539                            busy: db_contact.user_b_busy,
 540                        });
 541                    } else if db_contact.a_to_b {
 542                        contacts.push(Contact::Outgoing {
 543                            user_id: db_contact.user_id_b,
 544                        })
 545                    } else {
 546                        contacts.push(Contact::Incoming {
 547                            user_id: db_contact.user_id_b,
 548                            should_notify: db_contact.should_notify,
 549                        });
 550                    }
 551                } else if db_contact.accepted {
 552                    contacts.push(Contact::Accepted {
 553                        user_id: db_contact.user_id_a,
 554                        should_notify: db_contact.should_notify && !db_contact.a_to_b,
 555                        busy: db_contact.user_a_busy,
 556                    });
 557                } else if db_contact.a_to_b {
 558                    contacts.push(Contact::Incoming {
 559                        user_id: db_contact.user_id_a,
 560                        should_notify: db_contact.should_notify,
 561                    });
 562                } else {
 563                    contacts.push(Contact::Outgoing {
 564                        user_id: db_contact.user_id_a,
 565                    });
 566                }
 567            }
 568
 569            contacts.sort_unstable_by_key(|contact| contact.user_id());
 570
 571            Ok(contacts)
 572        })
 573        .await
 574    }
 575
 576    pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
 577        self.transaction(|tx| async move {
 578            let participant = room_participant::Entity::find()
 579                .filter(room_participant::Column::UserId.eq(user_id))
 580                .one(&*tx)
 581                .await?;
 582            Ok(participant.is_some())
 583        })
 584        .await
 585    }
 586
 587    pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
 588        self.transaction(|tx| async move {
 589            let (id_a, id_b) = if user_id_1 < user_id_2 {
 590                (user_id_1, user_id_2)
 591            } else {
 592                (user_id_2, user_id_1)
 593            };
 594
 595            Ok(contact::Entity::find()
 596                .filter(
 597                    contact::Column::UserIdA
 598                        .eq(id_a)
 599                        .and(contact::Column::UserIdB.eq(id_b))
 600                        .and(contact::Column::Accepted.eq(true)),
 601                )
 602                .one(&*tx)
 603                .await?
 604                .is_some())
 605        })
 606        .await
 607    }
 608
 609    pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
 610        self.transaction(|tx| async move {
 611            let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
 612                (sender_id, receiver_id, true)
 613            } else {
 614                (receiver_id, sender_id, false)
 615            };
 616
 617            let rows_affected = contact::Entity::insert(contact::ActiveModel {
 618                user_id_a: ActiveValue::set(id_a),
 619                user_id_b: ActiveValue::set(id_b),
 620                a_to_b: ActiveValue::set(a_to_b),
 621                accepted: ActiveValue::set(false),
 622                should_notify: ActiveValue::set(true),
 623                ..Default::default()
 624            })
 625            .on_conflict(
 626                OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
 627                    .values([
 628                        (contact::Column::Accepted, true.into()),
 629                        (contact::Column::ShouldNotify, false.into()),
 630                    ])
 631                    .action_and_where(
 632                        contact::Column::Accepted.eq(false).and(
 633                            contact::Column::AToB
 634                                .eq(a_to_b)
 635                                .and(contact::Column::UserIdA.eq(id_b))
 636                                .or(contact::Column::AToB
 637                                    .ne(a_to_b)
 638                                    .and(contact::Column::UserIdA.eq(id_a))),
 639                        ),
 640                    )
 641                    .to_owned(),
 642            )
 643            .exec_without_returning(&*tx)
 644            .await?;
 645
 646            if rows_affected == 1 {
 647                Ok(())
 648            } else {
 649                Err(anyhow!("contact already requested"))?
 650            }
 651        })
 652        .await
 653    }
 654
 655    /// Returns a bool indicating whether the removed contact had originally accepted or not
 656    ///
 657    /// Deletes the contact identified by the requester and responder ids, and then returns
 658    /// whether the deleted contact had originally accepted or was a pending contact request.
 659    ///
 660    /// # Arguments
 661    ///
 662    /// * `requester_id` - The user that initiates this request
 663    /// * `responder_id` - The user that will be removed
 664    pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<bool> {
 665        self.transaction(|tx| async move {
 666            let (id_a, id_b) = if responder_id < requester_id {
 667                (responder_id, requester_id)
 668            } else {
 669                (requester_id, responder_id)
 670            };
 671
 672            let contact = contact::Entity::find()
 673                .filter(
 674                    contact::Column::UserIdA
 675                        .eq(id_a)
 676                        .and(contact::Column::UserIdB.eq(id_b)),
 677                )
 678                .one(&*tx)
 679                .await?
 680                .ok_or_else(|| anyhow!("no such contact"))?;
 681
 682            contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
 683            Ok(contact.accepted)
 684        })
 685        .await
 686    }
 687
 688    pub async fn dismiss_contact_notification(
 689        &self,
 690        user_id: UserId,
 691        contact_user_id: UserId,
 692    ) -> Result<()> {
 693        self.transaction(|tx| async move {
 694            let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
 695                (user_id, contact_user_id, true)
 696            } else {
 697                (contact_user_id, user_id, false)
 698            };
 699
 700            let result = contact::Entity::update_many()
 701                .set(contact::ActiveModel {
 702                    should_notify: ActiveValue::set(false),
 703                    ..Default::default()
 704                })
 705                .filter(
 706                    contact::Column::UserIdA
 707                        .eq(id_a)
 708                        .and(contact::Column::UserIdB.eq(id_b))
 709                        .and(
 710                            contact::Column::AToB
 711                                .eq(a_to_b)
 712                                .and(contact::Column::Accepted.eq(true))
 713                                .or(contact::Column::AToB
 714                                    .ne(a_to_b)
 715                                    .and(contact::Column::Accepted.eq(false))),
 716                        ),
 717                )
 718                .exec(&*tx)
 719                .await?;
 720            if result.rows_affected == 0 {
 721                Err(anyhow!("no such contact request"))?
 722            } else {
 723                Ok(())
 724            }
 725        })
 726        .await
 727    }
 728
 729    pub async fn respond_to_contact_request(
 730        &self,
 731        responder_id: UserId,
 732        requester_id: UserId,
 733        accept: bool,
 734    ) -> Result<()> {
 735        self.transaction(|tx| async move {
 736            let (id_a, id_b, a_to_b) = if responder_id < requester_id {
 737                (responder_id, requester_id, false)
 738            } else {
 739                (requester_id, responder_id, true)
 740            };
 741            let rows_affected = if accept {
 742                let result = contact::Entity::update_many()
 743                    .set(contact::ActiveModel {
 744                        accepted: ActiveValue::set(true),
 745                        should_notify: ActiveValue::set(true),
 746                        ..Default::default()
 747                    })
 748                    .filter(
 749                        contact::Column::UserIdA
 750                            .eq(id_a)
 751                            .and(contact::Column::UserIdB.eq(id_b))
 752                            .and(contact::Column::AToB.eq(a_to_b)),
 753                    )
 754                    .exec(&*tx)
 755                    .await?;
 756                result.rows_affected
 757            } else {
 758                let result = contact::Entity::delete_many()
 759                    .filter(
 760                        contact::Column::UserIdA
 761                            .eq(id_a)
 762                            .and(contact::Column::UserIdB.eq(id_b))
 763                            .and(contact::Column::AToB.eq(a_to_b))
 764                            .and(contact::Column::Accepted.eq(false)),
 765                    )
 766                    .exec(&*tx)
 767                    .await?;
 768
 769                result.rows_affected
 770            };
 771
 772            if rows_affected == 1 {
 773                Ok(())
 774            } else {
 775                Err(anyhow!("no such contact request"))?
 776            }
 777        })
 778        .await
 779    }
 780
 781    pub fn fuzzy_like_string(string: &str) -> String {
 782        let mut result = String::with_capacity(string.len() * 2 + 1);
 783        for c in string.chars() {
 784            if c.is_alphanumeric() {
 785                result.push('%');
 786                result.push(c);
 787            }
 788        }
 789        result.push('%');
 790        result
 791    }
 792
 793    pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
 794        self.transaction(|tx| async {
 795            let tx = tx;
 796            let like_string = Self::fuzzy_like_string(name_query);
 797            let query = "
 798                SELECT users.*
 799                FROM users
 800                WHERE github_login ILIKE $1
 801                ORDER BY github_login <-> $2
 802                LIMIT $3
 803            ";
 804
 805            Ok(user::Entity::find()
 806                .from_raw_sql(Statement::from_sql_and_values(
 807                    self.pool.get_database_backend(),
 808                    query.into(),
 809                    vec![like_string.into(), name_query.into(), limit.into()],
 810                ))
 811                .all(&*tx)
 812                .await?)
 813        })
 814        .await
 815    }
 816
 817    // signups
 818
 819    pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
 820        self.transaction(|tx| async move {
 821            signup::Entity::insert(signup::ActiveModel {
 822                email_address: ActiveValue::set(signup.email_address.clone()),
 823                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 824                email_confirmation_sent: ActiveValue::set(false),
 825                platform_mac: ActiveValue::set(signup.platform_mac),
 826                platform_windows: ActiveValue::set(signup.platform_windows),
 827                platform_linux: ActiveValue::set(signup.platform_linux),
 828                platform_unknown: ActiveValue::set(false),
 829                editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
 830                programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
 831                device_id: ActiveValue::set(signup.device_id.clone()),
 832                added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
 833                ..Default::default()
 834            })
 835            .on_conflict(
 836                OnConflict::column(signup::Column::EmailAddress)
 837                    .update_columns([
 838                        signup::Column::PlatformMac,
 839                        signup::Column::PlatformWindows,
 840                        signup::Column::PlatformLinux,
 841                        signup::Column::EditorFeatures,
 842                        signup::Column::ProgrammingLanguages,
 843                        signup::Column::DeviceId,
 844                        signup::Column::AddedToMailingList,
 845                    ])
 846                    .to_owned(),
 847            )
 848            .exec(&*tx)
 849            .await?;
 850            Ok(())
 851        })
 852        .await
 853    }
 854
 855    pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
 856        self.transaction(|tx| async move {
 857            let signup = signup::Entity::find()
 858                .filter(signup::Column::EmailAddress.eq(email_address))
 859                .one(&*tx)
 860                .await?
 861                .ok_or_else(|| {
 862                    anyhow!("signup with email address {} doesn't exist", email_address)
 863                })?;
 864
 865            Ok(signup)
 866        })
 867        .await
 868    }
 869
 870    pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
 871        self.transaction(|tx| async move {
 872            let query = "
 873                SELECT
 874                    COUNT(*) as count,
 875                    COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
 876                    COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
 877                    COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
 878                    COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
 879                FROM (
 880                    SELECT *
 881                    FROM signups
 882                    WHERE
 883                        NOT email_confirmation_sent
 884                ) AS unsent
 885            ";
 886            Ok(
 887                WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
 888                    self.pool.get_database_backend(),
 889                    query.into(),
 890                    vec![],
 891                ))
 892                .one(&*tx)
 893                .await?
 894                .ok_or_else(|| anyhow!("invalid result"))?,
 895            )
 896        })
 897        .await
 898    }
 899
 900    pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
 901        let emails = invites
 902            .iter()
 903            .map(|s| s.email_address.as_str())
 904            .collect::<Vec<_>>();
 905        self.transaction(|tx| async {
 906            let tx = tx;
 907            signup::Entity::update_many()
 908                .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
 909                .set(signup::ActiveModel {
 910                    email_confirmation_sent: ActiveValue::set(true),
 911                    ..Default::default()
 912                })
 913                .exec(&*tx)
 914                .await?;
 915            Ok(())
 916        })
 917        .await
 918    }
 919
 920    pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
 921        self.transaction(|tx| async move {
 922            Ok(signup::Entity::find()
 923                .select_only()
 924                .column(signup::Column::EmailAddress)
 925                .column(signup::Column::EmailConfirmationCode)
 926                .filter(
 927                    signup::Column::EmailConfirmationSent.eq(false).and(
 928                        signup::Column::PlatformMac
 929                            .eq(true)
 930                            .or(signup::Column::PlatformUnknown.eq(true)),
 931                    ),
 932                )
 933                .order_by_asc(signup::Column::CreatedAt)
 934                .limit(count as u64)
 935                .into_model()
 936                .all(&*tx)
 937                .await?)
 938        })
 939        .await
 940    }
 941
 942    // invite codes
 943
 944    pub async fn create_invite_from_code(
 945        &self,
 946        code: &str,
 947        email_address: &str,
 948        device_id: Option<&str>,
 949        added_to_mailing_list: bool,
 950    ) -> Result<Invite> {
 951        self.transaction(|tx| async move {
 952            let existing_user = user::Entity::find()
 953                .filter(user::Column::EmailAddress.eq(email_address))
 954                .one(&*tx)
 955                .await?;
 956
 957            if existing_user.is_some() {
 958                Err(anyhow!("email address is already in use"))?;
 959            }
 960
 961            let inviting_user_with_invites = match user::Entity::find()
 962                .filter(
 963                    user::Column::InviteCode
 964                        .eq(code)
 965                        .and(user::Column::InviteCount.gt(0)),
 966                )
 967                .one(&*tx)
 968                .await?
 969            {
 970                Some(inviting_user) => inviting_user,
 971                None => {
 972                    return Err(Error::Http(
 973                        StatusCode::UNAUTHORIZED,
 974                        "unable to find an invite code with invites remaining".to_string(),
 975                    ))?
 976                }
 977            };
 978            user::Entity::update_many()
 979                .filter(
 980                    user::Column::Id
 981                        .eq(inviting_user_with_invites.id)
 982                        .and(user::Column::InviteCount.gt(0)),
 983                )
 984                .col_expr(
 985                    user::Column::InviteCount,
 986                    Expr::col(user::Column::InviteCount).sub(1),
 987                )
 988                .exec(&*tx)
 989                .await?;
 990
 991            let signup = signup::Entity::insert(signup::ActiveModel {
 992                email_address: ActiveValue::set(email_address.into()),
 993                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 994                email_confirmation_sent: ActiveValue::set(false),
 995                inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
 996                platform_linux: ActiveValue::set(false),
 997                platform_mac: ActiveValue::set(false),
 998                platform_windows: ActiveValue::set(false),
 999                platform_unknown: ActiveValue::set(true),
1000                device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
1001                added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
1002                ..Default::default()
1003            })
1004            .on_conflict(
1005                OnConflict::column(signup::Column::EmailAddress)
1006                    .update_column(signup::Column::InvitingUserId)
1007                    .to_owned(),
1008            )
1009            .exec_with_returning(&*tx)
1010            .await?;
1011
1012            Ok(Invite {
1013                email_address: signup.email_address,
1014                email_confirmation_code: signup.email_confirmation_code,
1015            })
1016        })
1017        .await
1018    }
1019
1020    pub async fn create_user_from_invite(
1021        &self,
1022        invite: &Invite,
1023        user: NewUserParams,
1024    ) -> Result<Option<NewUserResult>> {
1025        self.transaction(|tx| async {
1026            let tx = tx;
1027            let signup = signup::Entity::find()
1028                .filter(
1029                    signup::Column::EmailAddress
1030                        .eq(invite.email_address.as_str())
1031                        .and(
1032                            signup::Column::EmailConfirmationCode
1033                                .eq(invite.email_confirmation_code.as_str()),
1034                        ),
1035                )
1036                .one(&*tx)
1037                .await?
1038                .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
1039
1040            if signup.user_id.is_some() {
1041                return Ok(None);
1042            }
1043
1044            let user = user::Entity::insert(user::ActiveModel {
1045                email_address: ActiveValue::set(Some(invite.email_address.clone())),
1046                github_login: ActiveValue::set(user.github_login.clone()),
1047                github_user_id: ActiveValue::set(Some(user.github_user_id)),
1048                admin: ActiveValue::set(false),
1049                invite_count: ActiveValue::set(user.invite_count),
1050                invite_code: ActiveValue::set(Some(random_invite_code())),
1051                metrics_id: ActiveValue::set(Uuid::new_v4()),
1052                ..Default::default()
1053            })
1054            .on_conflict(
1055                OnConflict::column(user::Column::GithubLogin)
1056                    .update_columns([
1057                        user::Column::EmailAddress,
1058                        user::Column::GithubUserId,
1059                        user::Column::Admin,
1060                    ])
1061                    .to_owned(),
1062            )
1063            .exec_with_returning(&*tx)
1064            .await?;
1065
1066            let mut signup = signup.into_active_model();
1067            signup.user_id = ActiveValue::set(Some(user.id));
1068            let signup = signup.update(&*tx).await?;
1069
1070            if let Some(inviting_user_id) = signup.inviting_user_id {
1071                let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1072                    (inviting_user_id, user.id, true)
1073                } else {
1074                    (user.id, inviting_user_id, false)
1075                };
1076
1077                contact::Entity::insert(contact::ActiveModel {
1078                    user_id_a: ActiveValue::set(user_id_a),
1079                    user_id_b: ActiveValue::set(user_id_b),
1080                    a_to_b: ActiveValue::set(a_to_b),
1081                    should_notify: ActiveValue::set(true),
1082                    accepted: ActiveValue::set(true),
1083                    ..Default::default()
1084                })
1085                .on_conflict(OnConflict::new().do_nothing().to_owned())
1086                .exec_without_returning(&*tx)
1087                .await?;
1088            }
1089
1090            Ok(Some(NewUserResult {
1091                user_id: user.id,
1092                metrics_id: user.metrics_id.to_string(),
1093                inviting_user_id: signup.inviting_user_id,
1094                signup_device_id: signup.device_id,
1095            }))
1096        })
1097        .await
1098    }
1099
1100    pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1101        self.transaction(|tx| async move {
1102            if count > 0 {
1103                user::Entity::update_many()
1104                    .filter(
1105                        user::Column::Id
1106                            .eq(id)
1107                            .and(user::Column::InviteCode.is_null()),
1108                    )
1109                    .set(user::ActiveModel {
1110                        invite_code: ActiveValue::set(Some(random_invite_code())),
1111                        ..Default::default()
1112                    })
1113                    .exec(&*tx)
1114                    .await?;
1115            }
1116
1117            user::Entity::update_many()
1118                .filter(user::Column::Id.eq(id))
1119                .set(user::ActiveModel {
1120                    invite_count: ActiveValue::set(count),
1121                    ..Default::default()
1122                })
1123                .exec(&*tx)
1124                .await?;
1125            Ok(())
1126        })
1127        .await
1128    }
1129
1130    pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1131        self.transaction(|tx| async move {
1132            match user::Entity::find_by_id(id).one(&*tx).await? {
1133                Some(user) if user.invite_code.is_some() => {
1134                    Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1135                }
1136                _ => Ok(None),
1137            }
1138        })
1139        .await
1140    }
1141
1142    pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1143        self.transaction(|tx| async move {
1144            user::Entity::find()
1145                .filter(user::Column::InviteCode.eq(code))
1146                .one(&*tx)
1147                .await?
1148                .ok_or_else(|| {
1149                    Error::Http(
1150                        StatusCode::NOT_FOUND,
1151                        "that invite code does not exist".to_string(),
1152                    )
1153                })
1154        })
1155        .await
1156    }
1157
1158    // rooms
1159
1160    pub async fn incoming_call_for_user(
1161        &self,
1162        user_id: UserId,
1163    ) -> Result<Option<proto::IncomingCall>> {
1164        self.transaction(|tx| async move {
1165            let pending_participant = room_participant::Entity::find()
1166                .filter(
1167                    room_participant::Column::UserId
1168                        .eq(user_id)
1169                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1170                )
1171                .one(&*tx)
1172                .await?;
1173
1174            if let Some(pending_participant) = pending_participant {
1175                let room = self.get_room(pending_participant.room_id, &tx).await?;
1176                Ok(Self::build_incoming_call(&room, user_id))
1177            } else {
1178                Ok(None)
1179            }
1180        })
1181        .await
1182    }
1183
1184    pub async fn create_room(
1185        &self,
1186        user_id: UserId,
1187        connection: ConnectionId,
1188        live_kit_room: &str,
1189    ) -> Result<proto::Room> {
1190        self.transaction(|tx| async move {
1191            let room = room::ActiveModel {
1192                live_kit_room: ActiveValue::set(live_kit_room.into()),
1193                ..Default::default()
1194            }
1195            .insert(&*tx)
1196            .await?;
1197            room_participant::ActiveModel {
1198                room_id: ActiveValue::set(room.id),
1199                user_id: ActiveValue::set(user_id),
1200                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1201                answering_connection_server_id: ActiveValue::set(Some(ServerId(
1202                    connection.owner_id as i32,
1203                ))),
1204                answering_connection_lost: ActiveValue::set(false),
1205                calling_user_id: ActiveValue::set(user_id),
1206                calling_connection_id: ActiveValue::set(connection.id as i32),
1207                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1208                    connection.owner_id as i32,
1209                ))),
1210                ..Default::default()
1211            }
1212            .insert(&*tx)
1213            .await?;
1214
1215            let room = self.get_room(room.id, &tx).await?;
1216            Ok(room)
1217        })
1218        .await
1219    }
1220
1221    pub async fn call(
1222        &self,
1223        room_id: RoomId,
1224        calling_user_id: UserId,
1225        calling_connection: ConnectionId,
1226        called_user_id: UserId,
1227        initial_project_id: Option<ProjectId>,
1228    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1229        self.room_transaction(room_id, |tx| async move {
1230            room_participant::ActiveModel {
1231                room_id: ActiveValue::set(room_id),
1232                user_id: ActiveValue::set(called_user_id),
1233                answering_connection_lost: ActiveValue::set(false),
1234                calling_user_id: ActiveValue::set(calling_user_id),
1235                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1236                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1237                    calling_connection.owner_id as i32,
1238                ))),
1239                initial_project_id: ActiveValue::set(initial_project_id),
1240                ..Default::default()
1241            }
1242            .insert(&*tx)
1243            .await?;
1244
1245            let room = self.get_room(room_id, &tx).await?;
1246            let incoming_call = Self::build_incoming_call(&room, called_user_id)
1247                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1248            Ok((room, incoming_call))
1249        })
1250        .await
1251    }
1252
1253    pub async fn call_failed(
1254        &self,
1255        room_id: RoomId,
1256        called_user_id: UserId,
1257    ) -> Result<RoomGuard<proto::Room>> {
1258        self.room_transaction(room_id, |tx| async move {
1259            room_participant::Entity::delete_many()
1260                .filter(
1261                    room_participant::Column::RoomId
1262                        .eq(room_id)
1263                        .and(room_participant::Column::UserId.eq(called_user_id)),
1264                )
1265                .exec(&*tx)
1266                .await?;
1267            let room = self.get_room(room_id, &tx).await?;
1268            Ok(room)
1269        })
1270        .await
1271    }
1272
1273    pub async fn decline_call(
1274        &self,
1275        expected_room_id: Option<RoomId>,
1276        user_id: UserId,
1277    ) -> Result<Option<RoomGuard<proto::Room>>> {
1278        self.optional_room_transaction(|tx| async move {
1279            let mut filter = Condition::all()
1280                .add(room_participant::Column::UserId.eq(user_id))
1281                .add(room_participant::Column::AnsweringConnectionId.is_null());
1282            if let Some(room_id) = expected_room_id {
1283                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1284            }
1285            let participant = room_participant::Entity::find()
1286                .filter(filter)
1287                .one(&*tx)
1288                .await?;
1289
1290            let participant = if let Some(participant) = participant {
1291                participant
1292            } else if expected_room_id.is_some() {
1293                return Err(anyhow!("could not find call to decline"))?;
1294            } else {
1295                return Ok(None);
1296            };
1297
1298            let room_id = participant.room_id;
1299            room_participant::Entity::delete(participant.into_active_model())
1300                .exec(&*tx)
1301                .await?;
1302
1303            let room = self.get_room(room_id, &tx).await?;
1304            Ok(Some((room_id, room)))
1305        })
1306        .await
1307    }
1308
1309    pub async fn cancel_call(
1310        &self,
1311        room_id: RoomId,
1312        calling_connection: ConnectionId,
1313        called_user_id: UserId,
1314    ) -> Result<RoomGuard<proto::Room>> {
1315        self.room_transaction(room_id, |tx| async move {
1316            let participant = room_participant::Entity::find()
1317                .filter(
1318                    Condition::all()
1319                        .add(room_participant::Column::UserId.eq(called_user_id))
1320                        .add(room_participant::Column::RoomId.eq(room_id))
1321                        .add(
1322                            room_participant::Column::CallingConnectionId
1323                                .eq(calling_connection.id as i32),
1324                        )
1325                        .add(
1326                            room_participant::Column::CallingConnectionServerId
1327                                .eq(calling_connection.owner_id as i32),
1328                        )
1329                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1330                )
1331                .one(&*tx)
1332                .await?
1333                .ok_or_else(|| anyhow!("no call to cancel"))?;
1334
1335            room_participant::Entity::delete(participant.into_active_model())
1336                .exec(&*tx)
1337                .await?;
1338
1339            let room = self.get_room(room_id, &tx).await?;
1340            Ok(room)
1341        })
1342        .await
1343    }
1344
1345    pub async fn is_current_room_different_channel(
1346        &self,
1347        user_id: UserId,
1348        channel_id: ChannelId,
1349    ) -> Result<bool> {
1350        self.transaction(|tx| async move {
1351            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1352            enum QueryAs {
1353                ChannelId,
1354            }
1355
1356            let channel_id_model: Option<ChannelId> = room_participant::Entity::find()
1357                .select_only()
1358                .column_as(room::Column::ChannelId, QueryAs::ChannelId)
1359                .inner_join(room::Entity)
1360                .filter(room_participant::Column::UserId.eq(user_id))
1361                .into_values::<_, QueryAs>()
1362                .one(&*tx)
1363                .await?;
1364
1365            let result = channel_id_model
1366                .map(|channel_id_model| channel_id_model != channel_id)
1367                .unwrap_or(false);
1368
1369            Ok(result)
1370        })
1371        .await
1372    }
1373
1374    pub async fn join_room(
1375        &self,
1376        room_id: RoomId,
1377        user_id: UserId,
1378        channel_id: Option<ChannelId>,
1379        connection: ConnectionId,
1380    ) -> Result<RoomGuard<JoinRoom>> {
1381        self.room_transaction(room_id, |tx| async move {
1382            if let Some(channel_id) = channel_id {
1383                channel_member::Entity::find()
1384                    .filter(
1385                        channel_member::Column::ChannelId
1386                            .eq(channel_id)
1387                            .and(channel_member::Column::UserId.eq(user_id))
1388                            .and(channel_member::Column::Accepted.eq(true)),
1389                    )
1390                    .one(&*tx)
1391                    .await?
1392                    .ok_or_else(|| anyhow!("no such channel membership"))?;
1393
1394                room_participant::ActiveModel {
1395                    room_id: ActiveValue::set(room_id),
1396                    user_id: ActiveValue::set(user_id),
1397                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1398                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1399                        connection.owner_id as i32,
1400                    ))),
1401                    answering_connection_lost: ActiveValue::set(false),
1402                    // Redundant for the channel join use case, used for channel and call invitations
1403                    calling_user_id: ActiveValue::set(user_id),
1404                    calling_connection_id: ActiveValue::set(connection.id as i32),
1405                    calling_connection_server_id: ActiveValue::set(Some(ServerId(
1406                        connection.owner_id as i32,
1407                    ))),
1408                    ..Default::default()
1409                }
1410                .insert(&*tx)
1411                .await?;
1412            } else {
1413                let result = room_participant::Entity::update_many()
1414                    .filter(
1415                        Condition::all()
1416                            .add(room_participant::Column::RoomId.eq(room_id))
1417                            .add(room_participant::Column::UserId.eq(user_id))
1418                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
1419                    )
1420                    .set(room_participant::ActiveModel {
1421                        answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1422                        answering_connection_server_id: ActiveValue::set(Some(ServerId(
1423                            connection.owner_id as i32,
1424                        ))),
1425                        answering_connection_lost: ActiveValue::set(false),
1426                        ..Default::default()
1427                    })
1428                    .exec(&*tx)
1429                    .await?;
1430                if result.rows_affected == 0 {
1431                    Err(anyhow!("room does not exist or was already joined"))?;
1432                }
1433            }
1434
1435            let room = self.get_room(room_id, &tx).await?;
1436            let channel_members = if let Some(channel_id) = channel_id {
1437                self.get_channel_members_internal(channel_id, &tx).await?
1438            } else {
1439                Vec::new()
1440            };
1441            Ok(JoinRoom {
1442                room,
1443                channel_id,
1444                channel_members,
1445            })
1446        })
1447        .await
1448    }
1449
1450    pub async fn rejoin_room(
1451        &self,
1452        rejoin_room: proto::RejoinRoom,
1453        user_id: UserId,
1454        connection: ConnectionId,
1455    ) -> Result<RoomGuard<RejoinedRoom>> {
1456        let room_id = RoomId::from_proto(rejoin_room.id);
1457        self.room_transaction(room_id, |tx| async {
1458            let tx = tx;
1459            let participant_update = room_participant::Entity::update_many()
1460                .filter(
1461                    Condition::all()
1462                        .add(room_participant::Column::RoomId.eq(room_id))
1463                        .add(room_participant::Column::UserId.eq(user_id))
1464                        .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1465                        .add(
1466                            Condition::any()
1467                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1468                                .add(
1469                                    room_participant::Column::AnsweringConnectionServerId
1470                                        .ne(connection.owner_id as i32),
1471                                ),
1472                        ),
1473                )
1474                .set(room_participant::ActiveModel {
1475                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1476                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1477                        connection.owner_id as i32,
1478                    ))),
1479                    answering_connection_lost: ActiveValue::set(false),
1480                    ..Default::default()
1481                })
1482                .exec(&*tx)
1483                .await?;
1484            if participant_update.rows_affected == 0 {
1485                return Err(anyhow!("room does not exist or was already joined"))?;
1486            }
1487
1488            let mut reshared_projects = Vec::new();
1489            for reshared_project in &rejoin_room.reshared_projects {
1490                let project_id = ProjectId::from_proto(reshared_project.project_id);
1491                let project = project::Entity::find_by_id(project_id)
1492                    .one(&*tx)
1493                    .await?
1494                    .ok_or_else(|| anyhow!("project does not exist"))?;
1495                if project.host_user_id != user_id {
1496                    return Err(anyhow!("no such project"))?;
1497                }
1498
1499                let mut collaborators = project
1500                    .find_related(project_collaborator::Entity)
1501                    .all(&*tx)
1502                    .await?;
1503                let host_ix = collaborators
1504                    .iter()
1505                    .position(|collaborator| {
1506                        collaborator.user_id == user_id && collaborator.is_host
1507                    })
1508                    .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1509                let host = collaborators.swap_remove(host_ix);
1510                let old_connection_id = host.connection();
1511
1512                project::Entity::update(project::ActiveModel {
1513                    host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1514                    host_connection_server_id: ActiveValue::set(Some(ServerId(
1515                        connection.owner_id as i32,
1516                    ))),
1517                    ..project.into_active_model()
1518                })
1519                .exec(&*tx)
1520                .await?;
1521                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1522                    connection_id: ActiveValue::set(connection.id as i32),
1523                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1524                    ..host.into_active_model()
1525                })
1526                .exec(&*tx)
1527                .await?;
1528
1529                self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1530                    .await?;
1531
1532                reshared_projects.push(ResharedProject {
1533                    id: project_id,
1534                    old_connection_id,
1535                    collaborators: collaborators
1536                        .iter()
1537                        .map(|collaborator| ProjectCollaborator {
1538                            connection_id: collaborator.connection(),
1539                            user_id: collaborator.user_id,
1540                            replica_id: collaborator.replica_id,
1541                            is_host: collaborator.is_host,
1542                        })
1543                        .collect(),
1544                    worktrees: reshared_project.worktrees.clone(),
1545                });
1546            }
1547
1548            project::Entity::delete_many()
1549                .filter(
1550                    Condition::all()
1551                        .add(project::Column::RoomId.eq(room_id))
1552                        .add(project::Column::HostUserId.eq(user_id))
1553                        .add(
1554                            project::Column::Id
1555                                .is_not_in(reshared_projects.iter().map(|project| project.id)),
1556                        ),
1557                )
1558                .exec(&*tx)
1559                .await?;
1560
1561            let mut rejoined_projects = Vec::new();
1562            for rejoined_project in &rejoin_room.rejoined_projects {
1563                let project_id = ProjectId::from_proto(rejoined_project.id);
1564                let Some(project) = project::Entity::find_by_id(project_id)
1565                    .one(&*tx)
1566                    .await? else { continue };
1567
1568                let mut worktrees = Vec::new();
1569                let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1570                for db_worktree in db_worktrees {
1571                    let mut worktree = RejoinedWorktree {
1572                        id: db_worktree.id as u64,
1573                        abs_path: db_worktree.abs_path,
1574                        root_name: db_worktree.root_name,
1575                        visible: db_worktree.visible,
1576                        updated_entries: Default::default(),
1577                        removed_entries: Default::default(),
1578                        updated_repositories: Default::default(),
1579                        removed_repositories: Default::default(),
1580                        diagnostic_summaries: Default::default(),
1581                        settings_files: Default::default(),
1582                        scan_id: db_worktree.scan_id as u64,
1583                        completed_scan_id: db_worktree.completed_scan_id as u64,
1584                    };
1585
1586                    let rejoined_worktree = rejoined_project
1587                        .worktrees
1588                        .iter()
1589                        .find(|worktree| worktree.id == db_worktree.id as u64);
1590
1591                    // File entries
1592                    {
1593                        let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1594                            worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1595                        } else {
1596                            worktree_entry::Column::IsDeleted.eq(false)
1597                        };
1598
1599                        let mut db_entries = worktree_entry::Entity::find()
1600                            .filter(
1601                                Condition::all()
1602                                    .add(worktree_entry::Column::ProjectId.eq(project.id))
1603                                    .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1604                                    .add(entry_filter),
1605                            )
1606                            .stream(&*tx)
1607                            .await?;
1608
1609                        while let Some(db_entry) = db_entries.next().await {
1610                            let db_entry = db_entry?;
1611                            if db_entry.is_deleted {
1612                                worktree.removed_entries.push(db_entry.id as u64);
1613                            } else {
1614                                worktree.updated_entries.push(proto::Entry {
1615                                    id: db_entry.id as u64,
1616                                    is_dir: db_entry.is_dir,
1617                                    path: db_entry.path,
1618                                    inode: db_entry.inode as u64,
1619                                    mtime: Some(proto::Timestamp {
1620                                        seconds: db_entry.mtime_seconds as u64,
1621                                        nanos: db_entry.mtime_nanos as u32,
1622                                    }),
1623                                    is_symlink: db_entry.is_symlink,
1624                                    is_ignored: db_entry.is_ignored,
1625                                    is_external: db_entry.is_external,
1626                                    git_status: db_entry.git_status.map(|status| status as i32),
1627                                });
1628                            }
1629                        }
1630                    }
1631
1632                    // Repository Entries
1633                    {
1634                        let repository_entry_filter =
1635                            if let Some(rejoined_worktree) = rejoined_worktree {
1636                                worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
1637                            } else {
1638                                worktree_repository::Column::IsDeleted.eq(false)
1639                            };
1640
1641                        let mut db_repositories = worktree_repository::Entity::find()
1642                            .filter(
1643                                Condition::all()
1644                                    .add(worktree_repository::Column::ProjectId.eq(project.id))
1645                                    .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
1646                                    .add(repository_entry_filter),
1647                            )
1648                            .stream(&*tx)
1649                            .await?;
1650
1651                        while let Some(db_repository) = db_repositories.next().await {
1652                            let db_repository = db_repository?;
1653                            if db_repository.is_deleted {
1654                                worktree
1655                                    .removed_repositories
1656                                    .push(db_repository.work_directory_id as u64);
1657                            } else {
1658                                worktree.updated_repositories.push(proto::RepositoryEntry {
1659                                    work_directory_id: db_repository.work_directory_id as u64,
1660                                    branch: db_repository.branch,
1661                                });
1662                            }
1663                        }
1664                    }
1665
1666                    worktrees.push(worktree);
1667                }
1668
1669                let language_servers = project
1670                    .find_related(language_server::Entity)
1671                    .all(&*tx)
1672                    .await?
1673                    .into_iter()
1674                    .map(|language_server| proto::LanguageServer {
1675                        id: language_server.id as u64,
1676                        name: language_server.name,
1677                    })
1678                    .collect::<Vec<_>>();
1679
1680                {
1681                    let mut db_settings_files = worktree_settings_file::Entity::find()
1682                        .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
1683                        .stream(&*tx)
1684                        .await?;
1685                    while let Some(db_settings_file) = db_settings_files.next().await {
1686                        let db_settings_file = db_settings_file?;
1687                        if let Some(worktree) = worktrees
1688                            .iter_mut()
1689                            .find(|w| w.id == db_settings_file.worktree_id as u64)
1690                        {
1691                            worktree.settings_files.push(WorktreeSettingsFile {
1692                                path: db_settings_file.path,
1693                                content: db_settings_file.content,
1694                            });
1695                        }
1696                    }
1697                }
1698
1699                let mut collaborators = project
1700                    .find_related(project_collaborator::Entity)
1701                    .all(&*tx)
1702                    .await?;
1703                let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1704                    .iter()
1705                    .position(|collaborator| collaborator.user_id == user_id)
1706                {
1707                    collaborators.swap_remove(self_collaborator_ix)
1708                } else {
1709                    continue;
1710                };
1711                let old_connection_id = self_collaborator.connection();
1712                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1713                    connection_id: ActiveValue::set(connection.id as i32),
1714                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1715                    ..self_collaborator.into_active_model()
1716                })
1717                .exec(&*tx)
1718                .await?;
1719
1720                let collaborators = collaborators
1721                    .into_iter()
1722                    .map(|collaborator| ProjectCollaborator {
1723                        connection_id: collaborator.connection(),
1724                        user_id: collaborator.user_id,
1725                        replica_id: collaborator.replica_id,
1726                        is_host: collaborator.is_host,
1727                    })
1728                    .collect::<Vec<_>>();
1729
1730                rejoined_projects.push(RejoinedProject {
1731                    id: project_id,
1732                    old_connection_id,
1733                    collaborators,
1734                    worktrees,
1735                    language_servers,
1736                });
1737            }
1738
1739            let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
1740
1741            let channel_members = if let Some(channel_id) = channel_id {
1742                self.get_channel_members_internal(channel_id, &tx).await?
1743            } else {
1744                Vec::new()
1745            };
1746
1747            Ok(RejoinedRoom {
1748                room,
1749                channel_id,
1750                channel_members,
1751                rejoined_projects,
1752                reshared_projects,
1753            })
1754        })
1755        .await
1756    }
1757
1758    pub async fn leave_room(
1759        &self,
1760        connection: ConnectionId,
1761    ) -> Result<Option<RoomGuard<LeftRoom>>> {
1762        self.optional_room_transaction(|tx| async move {
1763            let leaving_participant = room_participant::Entity::find()
1764                .filter(
1765                    Condition::all()
1766                        .add(
1767                            room_participant::Column::AnsweringConnectionId
1768                                .eq(connection.id as i32),
1769                        )
1770                        .add(
1771                            room_participant::Column::AnsweringConnectionServerId
1772                                .eq(connection.owner_id as i32),
1773                        ),
1774                )
1775                .one(&*tx)
1776                .await?;
1777
1778            if let Some(leaving_participant) = leaving_participant {
1779                // Leave room.
1780                let room_id = leaving_participant.room_id;
1781                room_participant::Entity::delete_by_id(leaving_participant.id)
1782                    .exec(&*tx)
1783                    .await?;
1784
1785                // Cancel pending calls initiated by the leaving user.
1786                let called_participants = room_participant::Entity::find()
1787                    .filter(
1788                        Condition::all()
1789                            .add(
1790                                room_participant::Column::CallingUserId
1791                                    .eq(leaving_participant.user_id),
1792                            )
1793                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
1794                    )
1795                    .all(&*tx)
1796                    .await?;
1797                room_participant::Entity::delete_many()
1798                    .filter(
1799                        room_participant::Column::Id
1800                            .is_in(called_participants.iter().map(|participant| participant.id)),
1801                    )
1802                    .exec(&*tx)
1803                    .await?;
1804                let canceled_calls_to_user_ids = called_participants
1805                    .into_iter()
1806                    .map(|participant| participant.user_id)
1807                    .collect();
1808
1809                // Detect left projects.
1810                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1811                enum QueryProjectIds {
1812                    ProjectId,
1813                }
1814                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1815                    .select_only()
1816                    .column_as(
1817                        project_collaborator::Column::ProjectId,
1818                        QueryProjectIds::ProjectId,
1819                    )
1820                    .filter(
1821                        Condition::all()
1822                            .add(
1823                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1824                            )
1825                            .add(
1826                                project_collaborator::Column::ConnectionServerId
1827                                    .eq(connection.owner_id as i32),
1828                            ),
1829                    )
1830                    .into_values::<_, QueryProjectIds>()
1831                    .all(&*tx)
1832                    .await?;
1833                let mut left_projects = HashMap::default();
1834                let mut collaborators = project_collaborator::Entity::find()
1835                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1836                    .stream(&*tx)
1837                    .await?;
1838                while let Some(collaborator) = collaborators.next().await {
1839                    let collaborator = collaborator?;
1840                    let left_project =
1841                        left_projects
1842                            .entry(collaborator.project_id)
1843                            .or_insert(LeftProject {
1844                                id: collaborator.project_id,
1845                                host_user_id: Default::default(),
1846                                connection_ids: Default::default(),
1847                                host_connection_id: Default::default(),
1848                            });
1849
1850                    let collaborator_connection_id = collaborator.connection();
1851                    if collaborator_connection_id != connection {
1852                        left_project.connection_ids.push(collaborator_connection_id);
1853                    }
1854
1855                    if collaborator.is_host {
1856                        left_project.host_user_id = collaborator.user_id;
1857                        left_project.host_connection_id = collaborator_connection_id;
1858                    }
1859                }
1860                drop(collaborators);
1861
1862                // Leave projects.
1863                project_collaborator::Entity::delete_many()
1864                    .filter(
1865                        Condition::all()
1866                            .add(
1867                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1868                            )
1869                            .add(
1870                                project_collaborator::Column::ConnectionServerId
1871                                    .eq(connection.owner_id as i32),
1872                            ),
1873                    )
1874                    .exec(&*tx)
1875                    .await?;
1876
1877                // Unshare projects.
1878                project::Entity::delete_many()
1879                    .filter(
1880                        Condition::all()
1881                            .add(project::Column::RoomId.eq(room_id))
1882                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
1883                            .add(
1884                                project::Column::HostConnectionServerId
1885                                    .eq(connection.owner_id as i32),
1886                            ),
1887                    )
1888                    .exec(&*tx)
1889                    .await?;
1890
1891                let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
1892                let deleted = if room.participants.is_empty() {
1893                    let result = room::Entity::delete_by_id(room_id)
1894                        .filter(room::Column::ChannelId.is_null())
1895                        .exec(&*tx)
1896                        .await?;
1897                    result.rows_affected > 0
1898                } else {
1899                    false
1900                };
1901
1902                let channel_members = if let Some(channel_id) = channel_id {
1903                    self.get_channel_members_internal(channel_id, &tx).await?
1904                } else {
1905                    Vec::new()
1906                };
1907                let left_room = LeftRoom {
1908                    room,
1909                    channel_id,
1910                    channel_members,
1911                    left_projects,
1912                    canceled_calls_to_user_ids,
1913                    deleted,
1914                };
1915
1916                if left_room.room.participants.is_empty() {
1917                    self.rooms.remove(&room_id);
1918                }
1919
1920                Ok(Some((room_id, left_room)))
1921            } else {
1922                Ok(None)
1923            }
1924        })
1925        .await
1926    }
1927
1928    pub async fn follow(
1929        &self,
1930        project_id: ProjectId,
1931        leader_connection: ConnectionId,
1932        follower_connection: ConnectionId,
1933    ) -> Result<RoomGuard<proto::Room>> {
1934        let room_id = self.room_id_for_project(project_id).await?;
1935        self.room_transaction(room_id, |tx| async move {
1936            follower::ActiveModel {
1937                room_id: ActiveValue::set(room_id),
1938                project_id: ActiveValue::set(project_id),
1939                leader_connection_server_id: ActiveValue::set(ServerId(
1940                    leader_connection.owner_id as i32,
1941                )),
1942                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1943                follower_connection_server_id: ActiveValue::set(ServerId(
1944                    follower_connection.owner_id as i32,
1945                )),
1946                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1947                ..Default::default()
1948            }
1949            .insert(&*tx)
1950            .await?;
1951
1952            let room = self.get_room(room_id, &*tx).await?;
1953            Ok(room)
1954        })
1955        .await
1956    }
1957
1958    pub async fn unfollow(
1959        &self,
1960        project_id: ProjectId,
1961        leader_connection: ConnectionId,
1962        follower_connection: ConnectionId,
1963    ) -> Result<RoomGuard<proto::Room>> {
1964        let room_id = self.room_id_for_project(project_id).await?;
1965        self.room_transaction(room_id, |tx| async move {
1966            follower::Entity::delete_many()
1967                .filter(
1968                    Condition::all()
1969                        .add(follower::Column::ProjectId.eq(project_id))
1970                        .add(
1971                            follower::Column::LeaderConnectionServerId
1972                                .eq(leader_connection.owner_id),
1973                        )
1974                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1975                        .add(
1976                            follower::Column::FollowerConnectionServerId
1977                                .eq(follower_connection.owner_id),
1978                        )
1979                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1980                )
1981                .exec(&*tx)
1982                .await?;
1983
1984            let room = self.get_room(room_id, &*tx).await?;
1985            Ok(room)
1986        })
1987        .await
1988    }
1989
1990    pub async fn update_room_participant_location(
1991        &self,
1992        room_id: RoomId,
1993        connection: ConnectionId,
1994        location: proto::ParticipantLocation,
1995    ) -> Result<RoomGuard<proto::Room>> {
1996        self.room_transaction(room_id, |tx| async {
1997            let tx = tx;
1998            let location_kind;
1999            let location_project_id;
2000            match location
2001                .variant
2002                .as_ref()
2003                .ok_or_else(|| anyhow!("invalid location"))?
2004            {
2005                proto::participant_location::Variant::SharedProject(project) => {
2006                    location_kind = 0;
2007                    location_project_id = Some(ProjectId::from_proto(project.id));
2008                }
2009                proto::participant_location::Variant::UnsharedProject(_) => {
2010                    location_kind = 1;
2011                    location_project_id = None;
2012                }
2013                proto::participant_location::Variant::External(_) => {
2014                    location_kind = 2;
2015                    location_project_id = None;
2016                }
2017            }
2018
2019            let result = room_participant::Entity::update_many()
2020                .filter(
2021                    Condition::all()
2022                        .add(room_participant::Column::RoomId.eq(room_id))
2023                        .add(
2024                            room_participant::Column::AnsweringConnectionId
2025                                .eq(connection.id as i32),
2026                        )
2027                        .add(
2028                            room_participant::Column::AnsweringConnectionServerId
2029                                .eq(connection.owner_id as i32),
2030                        ),
2031                )
2032                .set(room_participant::ActiveModel {
2033                    location_kind: ActiveValue::set(Some(location_kind)),
2034                    location_project_id: ActiveValue::set(location_project_id),
2035                    ..Default::default()
2036                })
2037                .exec(&*tx)
2038                .await?;
2039
2040            if result.rows_affected == 1 {
2041                let room = self.get_room(room_id, &tx).await?;
2042                Ok(room)
2043            } else {
2044                Err(anyhow!("could not update room participant location"))?
2045            }
2046        })
2047        .await
2048    }
2049
2050    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
2051        self.transaction(|tx| async move {
2052            let participant = room_participant::Entity::find()
2053                .filter(
2054                    Condition::all()
2055                        .add(
2056                            room_participant::Column::AnsweringConnectionId
2057                                .eq(connection.id as i32),
2058                        )
2059                        .add(
2060                            room_participant::Column::AnsweringConnectionServerId
2061                                .eq(connection.owner_id as i32),
2062                        ),
2063                )
2064                .one(&*tx)
2065                .await?
2066                .ok_or_else(|| anyhow!("not a participant in any room"))?;
2067
2068            room_participant::Entity::update(room_participant::ActiveModel {
2069                answering_connection_lost: ActiveValue::set(true),
2070                ..participant.into_active_model()
2071            })
2072            .exec(&*tx)
2073            .await?;
2074
2075            Ok(())
2076        })
2077        .await
2078    }
2079
2080    fn build_incoming_call(
2081        room: &proto::Room,
2082        called_user_id: UserId,
2083    ) -> Option<proto::IncomingCall> {
2084        let pending_participant = room
2085            .pending_participants
2086            .iter()
2087            .find(|participant| participant.user_id == called_user_id.to_proto())?;
2088
2089        Some(proto::IncomingCall {
2090            room_id: room.id,
2091            calling_user_id: pending_participant.calling_user_id,
2092            participant_user_ids: room
2093                .participants
2094                .iter()
2095                .map(|participant| participant.user_id)
2096                .collect(),
2097            initial_project: room.participants.iter().find_map(|participant| {
2098                let initial_project_id = pending_participant.initial_project_id?;
2099                participant
2100                    .projects
2101                    .iter()
2102                    .find(|project| project.id == initial_project_id)
2103                    .cloned()
2104            }),
2105        })
2106    }
2107    async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
2108        let (_, room) = self.get_channel_room(room_id, tx).await?;
2109        Ok(room)
2110    }
2111
2112    async fn get_channel_room(
2113        &self,
2114        room_id: RoomId,
2115        tx: &DatabaseTransaction,
2116    ) -> Result<(Option<ChannelId>, proto::Room)> {
2117        let db_room = room::Entity::find_by_id(room_id)
2118            .one(tx)
2119            .await?
2120            .ok_or_else(|| anyhow!("could not find room"))?;
2121
2122        let mut db_participants = db_room
2123            .find_related(room_participant::Entity)
2124            .stream(tx)
2125            .await?;
2126        let mut participants = HashMap::default();
2127        let mut pending_participants = Vec::new();
2128        while let Some(db_participant) = db_participants.next().await {
2129            let db_participant = db_participant?;
2130            if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
2131                .answering_connection_id
2132                .zip(db_participant.answering_connection_server_id)
2133            {
2134                let location = match (
2135                    db_participant.location_kind,
2136                    db_participant.location_project_id,
2137                ) {
2138                    (Some(0), Some(project_id)) => {
2139                        Some(proto::participant_location::Variant::SharedProject(
2140                            proto::participant_location::SharedProject {
2141                                id: project_id.to_proto(),
2142                            },
2143                        ))
2144                    }
2145                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
2146                        Default::default(),
2147                    )),
2148                    _ => Some(proto::participant_location::Variant::External(
2149                        Default::default(),
2150                    )),
2151                };
2152
2153                let answering_connection = ConnectionId {
2154                    owner_id: answering_connection_server_id.0 as u32,
2155                    id: answering_connection_id as u32,
2156                };
2157                participants.insert(
2158                    answering_connection,
2159                    proto::Participant {
2160                        user_id: db_participant.user_id.to_proto(),
2161                        peer_id: Some(answering_connection.into()),
2162                        projects: Default::default(),
2163                        location: Some(proto::ParticipantLocation { variant: location }),
2164                    },
2165                );
2166            } else {
2167                pending_participants.push(proto::PendingParticipant {
2168                    user_id: db_participant.user_id.to_proto(),
2169                    calling_user_id: db_participant.calling_user_id.to_proto(),
2170                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
2171                });
2172            }
2173        }
2174        drop(db_participants);
2175
2176        let mut db_projects = db_room
2177            .find_related(project::Entity)
2178            .find_with_related(worktree::Entity)
2179            .stream(tx)
2180            .await?;
2181
2182        while let Some(row) = db_projects.next().await {
2183            let (db_project, db_worktree) = row?;
2184            let host_connection = db_project.host_connection()?;
2185            if let Some(participant) = participants.get_mut(&host_connection) {
2186                let project = if let Some(project) = participant
2187                    .projects
2188                    .iter_mut()
2189                    .find(|project| project.id == db_project.id.to_proto())
2190                {
2191                    project
2192                } else {
2193                    participant.projects.push(proto::ParticipantProject {
2194                        id: db_project.id.to_proto(),
2195                        worktree_root_names: Default::default(),
2196                    });
2197                    participant.projects.last_mut().unwrap()
2198                };
2199
2200                if let Some(db_worktree) = db_worktree {
2201                    if db_worktree.visible {
2202                        project.worktree_root_names.push(db_worktree.root_name);
2203                    }
2204                }
2205            }
2206        }
2207        drop(db_projects);
2208
2209        let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
2210        let mut followers = Vec::new();
2211        while let Some(db_follower) = db_followers.next().await {
2212            let db_follower = db_follower?;
2213            followers.push(proto::Follower {
2214                leader_id: Some(db_follower.leader_connection().into()),
2215                follower_id: Some(db_follower.follower_connection().into()),
2216                project_id: db_follower.project_id.to_proto(),
2217            });
2218        }
2219
2220        Ok((
2221            db_room.channel_id,
2222            proto::Room {
2223                id: db_room.id.to_proto(),
2224                live_kit_room: db_room.live_kit_room,
2225                participants: participants.into_values().collect(),
2226                pending_participants,
2227                followers,
2228            },
2229        ))
2230    }
2231
2232    // projects
2233
2234    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
2235        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2236        enum QueryAs {
2237            Count,
2238        }
2239
2240        self.transaction(|tx| async move {
2241            Ok(project::Entity::find()
2242                .select_only()
2243                .column_as(project::Column::Id.count(), QueryAs::Count)
2244                .inner_join(user::Entity)
2245                .filter(user::Column::Admin.eq(false))
2246                .into_values::<_, QueryAs>()
2247                .one(&*tx)
2248                .await?
2249                .unwrap_or(0i64) as usize)
2250        })
2251        .await
2252    }
2253
2254    pub async fn share_project(
2255        &self,
2256        room_id: RoomId,
2257        connection: ConnectionId,
2258        worktrees: &[proto::WorktreeMetadata],
2259    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
2260        self.room_transaction(room_id, |tx| async move {
2261            let participant = room_participant::Entity::find()
2262                .filter(
2263                    Condition::all()
2264                        .add(
2265                            room_participant::Column::AnsweringConnectionId
2266                                .eq(connection.id as i32),
2267                        )
2268                        .add(
2269                            room_participant::Column::AnsweringConnectionServerId
2270                                .eq(connection.owner_id as i32),
2271                        ),
2272                )
2273                .one(&*tx)
2274                .await?
2275                .ok_or_else(|| anyhow!("could not find participant"))?;
2276            if participant.room_id != room_id {
2277                return Err(anyhow!("shared project on unexpected room"))?;
2278            }
2279
2280            let project = project::ActiveModel {
2281                room_id: ActiveValue::set(participant.room_id),
2282                host_user_id: ActiveValue::set(participant.user_id),
2283                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
2284                host_connection_server_id: ActiveValue::set(Some(ServerId(
2285                    connection.owner_id as i32,
2286                ))),
2287                ..Default::default()
2288            }
2289            .insert(&*tx)
2290            .await?;
2291
2292            if !worktrees.is_empty() {
2293                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2294                    worktree::ActiveModel {
2295                        id: ActiveValue::set(worktree.id as i64),
2296                        project_id: ActiveValue::set(project.id),
2297                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
2298                        root_name: ActiveValue::set(worktree.root_name.clone()),
2299                        visible: ActiveValue::set(worktree.visible),
2300                        scan_id: ActiveValue::set(0),
2301                        completed_scan_id: ActiveValue::set(0),
2302                    }
2303                }))
2304                .exec(&*tx)
2305                .await?;
2306            }
2307
2308            project_collaborator::ActiveModel {
2309                project_id: ActiveValue::set(project.id),
2310                connection_id: ActiveValue::set(connection.id as i32),
2311                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2312                user_id: ActiveValue::set(participant.user_id),
2313                replica_id: ActiveValue::set(ReplicaId(0)),
2314                is_host: ActiveValue::set(true),
2315                ..Default::default()
2316            }
2317            .insert(&*tx)
2318            .await?;
2319
2320            let room = self.get_room(room_id, &tx).await?;
2321            Ok((project.id, room))
2322        })
2323        .await
2324    }
2325
2326    pub async fn unshare_project(
2327        &self,
2328        project_id: ProjectId,
2329        connection: ConnectionId,
2330    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2331        let room_id = self.room_id_for_project(project_id).await?;
2332        self.room_transaction(room_id, |tx| async move {
2333            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2334
2335            let project = project::Entity::find_by_id(project_id)
2336                .one(&*tx)
2337                .await?
2338                .ok_or_else(|| anyhow!("project not found"))?;
2339            if project.host_connection()? == connection {
2340                project::Entity::delete(project.into_active_model())
2341                    .exec(&*tx)
2342                    .await?;
2343                let room = self.get_room(room_id, &tx).await?;
2344                Ok((room, guest_connection_ids))
2345            } else {
2346                Err(anyhow!("cannot unshare a project hosted by another user"))?
2347            }
2348        })
2349        .await
2350    }
2351
2352    pub async fn update_project(
2353        &self,
2354        project_id: ProjectId,
2355        connection: ConnectionId,
2356        worktrees: &[proto::WorktreeMetadata],
2357    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2358        let room_id = self.room_id_for_project(project_id).await?;
2359        self.room_transaction(room_id, |tx| async move {
2360            let project = project::Entity::find_by_id(project_id)
2361                .filter(
2362                    Condition::all()
2363                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2364                        .add(
2365                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2366                        ),
2367                )
2368                .one(&*tx)
2369                .await?
2370                .ok_or_else(|| anyhow!("no such project"))?;
2371
2372            self.update_project_worktrees(project.id, worktrees, &tx)
2373                .await?;
2374
2375            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2376            let room = self.get_room(project.room_id, &tx).await?;
2377            Ok((room, guest_connection_ids))
2378        })
2379        .await
2380    }
2381
2382    async fn update_project_worktrees(
2383        &self,
2384        project_id: ProjectId,
2385        worktrees: &[proto::WorktreeMetadata],
2386        tx: &DatabaseTransaction,
2387    ) -> Result<()> {
2388        if !worktrees.is_empty() {
2389            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2390                id: ActiveValue::set(worktree.id as i64),
2391                project_id: ActiveValue::set(project_id),
2392                abs_path: ActiveValue::set(worktree.abs_path.clone()),
2393                root_name: ActiveValue::set(worktree.root_name.clone()),
2394                visible: ActiveValue::set(worktree.visible),
2395                scan_id: ActiveValue::set(0),
2396                completed_scan_id: ActiveValue::set(0),
2397            }))
2398            .on_conflict(
2399                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2400                    .update_column(worktree::Column::RootName)
2401                    .to_owned(),
2402            )
2403            .exec(&*tx)
2404            .await?;
2405        }
2406
2407        worktree::Entity::delete_many()
2408            .filter(worktree::Column::ProjectId.eq(project_id).and(
2409                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2410            ))
2411            .exec(&*tx)
2412            .await?;
2413
2414        Ok(())
2415    }
2416
2417    pub async fn update_worktree(
2418        &self,
2419        update: &proto::UpdateWorktree,
2420        connection: ConnectionId,
2421    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2422        let project_id = ProjectId::from_proto(update.project_id);
2423        let worktree_id = update.worktree_id as i64;
2424        let room_id = self.room_id_for_project(project_id).await?;
2425        self.room_transaction(room_id, |tx| async move {
2426            // Ensure the update comes from the host.
2427            let _project = project::Entity::find_by_id(project_id)
2428                .filter(
2429                    Condition::all()
2430                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2431                        .add(
2432                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2433                        ),
2434                )
2435                .one(&*tx)
2436                .await?
2437                .ok_or_else(|| anyhow!("no such project"))?;
2438
2439            // Update metadata.
2440            worktree::Entity::update(worktree::ActiveModel {
2441                id: ActiveValue::set(worktree_id),
2442                project_id: ActiveValue::set(project_id),
2443                root_name: ActiveValue::set(update.root_name.clone()),
2444                scan_id: ActiveValue::set(update.scan_id as i64),
2445                completed_scan_id: if update.is_last_update {
2446                    ActiveValue::set(update.scan_id as i64)
2447                } else {
2448                    ActiveValue::default()
2449                },
2450                abs_path: ActiveValue::set(update.abs_path.clone()),
2451                ..Default::default()
2452            })
2453            .exec(&*tx)
2454            .await?;
2455
2456            if !update.updated_entries.is_empty() {
2457                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2458                    let mtime = entry.mtime.clone().unwrap_or_default();
2459                    worktree_entry::ActiveModel {
2460                        project_id: ActiveValue::set(project_id),
2461                        worktree_id: ActiveValue::set(worktree_id),
2462                        id: ActiveValue::set(entry.id as i64),
2463                        is_dir: ActiveValue::set(entry.is_dir),
2464                        path: ActiveValue::set(entry.path.clone()),
2465                        inode: ActiveValue::set(entry.inode as i64),
2466                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2467                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2468                        is_symlink: ActiveValue::set(entry.is_symlink),
2469                        is_ignored: ActiveValue::set(entry.is_ignored),
2470                        is_external: ActiveValue::set(entry.is_external),
2471                        git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
2472                        is_deleted: ActiveValue::set(false),
2473                        scan_id: ActiveValue::set(update.scan_id as i64),
2474                    }
2475                }))
2476                .on_conflict(
2477                    OnConflict::columns([
2478                        worktree_entry::Column::ProjectId,
2479                        worktree_entry::Column::WorktreeId,
2480                        worktree_entry::Column::Id,
2481                    ])
2482                    .update_columns([
2483                        worktree_entry::Column::IsDir,
2484                        worktree_entry::Column::Path,
2485                        worktree_entry::Column::Inode,
2486                        worktree_entry::Column::MtimeSeconds,
2487                        worktree_entry::Column::MtimeNanos,
2488                        worktree_entry::Column::IsSymlink,
2489                        worktree_entry::Column::IsIgnored,
2490                        worktree_entry::Column::GitStatus,
2491                        worktree_entry::Column::ScanId,
2492                    ])
2493                    .to_owned(),
2494                )
2495                .exec(&*tx)
2496                .await?;
2497            }
2498
2499            if !update.removed_entries.is_empty() {
2500                worktree_entry::Entity::update_many()
2501                    .filter(
2502                        worktree_entry::Column::ProjectId
2503                            .eq(project_id)
2504                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2505                            .and(
2506                                worktree_entry::Column::Id
2507                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2508                            ),
2509                    )
2510                    .set(worktree_entry::ActiveModel {
2511                        is_deleted: ActiveValue::Set(true),
2512                        scan_id: ActiveValue::Set(update.scan_id as i64),
2513                        ..Default::default()
2514                    })
2515                    .exec(&*tx)
2516                    .await?;
2517            }
2518
2519            if !update.updated_repositories.is_empty() {
2520                worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
2521                    |repository| worktree_repository::ActiveModel {
2522                        project_id: ActiveValue::set(project_id),
2523                        worktree_id: ActiveValue::set(worktree_id),
2524                        work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
2525                        scan_id: ActiveValue::set(update.scan_id as i64),
2526                        branch: ActiveValue::set(repository.branch.clone()),
2527                        is_deleted: ActiveValue::set(false),
2528                    },
2529                ))
2530                .on_conflict(
2531                    OnConflict::columns([
2532                        worktree_repository::Column::ProjectId,
2533                        worktree_repository::Column::WorktreeId,
2534                        worktree_repository::Column::WorkDirectoryId,
2535                    ])
2536                    .update_columns([
2537                        worktree_repository::Column::ScanId,
2538                        worktree_repository::Column::Branch,
2539                    ])
2540                    .to_owned(),
2541                )
2542                .exec(&*tx)
2543                .await?;
2544            }
2545
2546            if !update.removed_repositories.is_empty() {
2547                worktree_repository::Entity::update_many()
2548                    .filter(
2549                        worktree_repository::Column::ProjectId
2550                            .eq(project_id)
2551                            .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
2552                            .and(
2553                                worktree_repository::Column::WorkDirectoryId
2554                                    .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
2555                            ),
2556                    )
2557                    .set(worktree_repository::ActiveModel {
2558                        is_deleted: ActiveValue::Set(true),
2559                        scan_id: ActiveValue::Set(update.scan_id as i64),
2560                        ..Default::default()
2561                    })
2562                    .exec(&*tx)
2563                    .await?;
2564            }
2565
2566            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2567            Ok(connection_ids)
2568        })
2569        .await
2570    }
2571
2572    pub async fn update_diagnostic_summary(
2573        &self,
2574        update: &proto::UpdateDiagnosticSummary,
2575        connection: ConnectionId,
2576    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2577        let project_id = ProjectId::from_proto(update.project_id);
2578        let worktree_id = update.worktree_id as i64;
2579        let room_id = self.room_id_for_project(project_id).await?;
2580        self.room_transaction(room_id, |tx| async move {
2581            let summary = update
2582                .summary
2583                .as_ref()
2584                .ok_or_else(|| anyhow!("invalid summary"))?;
2585
2586            // Ensure the update comes from the host.
2587            let project = project::Entity::find_by_id(project_id)
2588                .one(&*tx)
2589                .await?
2590                .ok_or_else(|| anyhow!("no such project"))?;
2591            if project.host_connection()? != connection {
2592                return Err(anyhow!("can't update a project hosted by someone else"))?;
2593            }
2594
2595            // Update summary.
2596            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2597                project_id: ActiveValue::set(project_id),
2598                worktree_id: ActiveValue::set(worktree_id),
2599                path: ActiveValue::set(summary.path.clone()),
2600                language_server_id: ActiveValue::set(summary.language_server_id as i64),
2601                error_count: ActiveValue::set(summary.error_count as i32),
2602                warning_count: ActiveValue::set(summary.warning_count as i32),
2603                ..Default::default()
2604            })
2605            .on_conflict(
2606                OnConflict::columns([
2607                    worktree_diagnostic_summary::Column::ProjectId,
2608                    worktree_diagnostic_summary::Column::WorktreeId,
2609                    worktree_diagnostic_summary::Column::Path,
2610                ])
2611                .update_columns([
2612                    worktree_diagnostic_summary::Column::LanguageServerId,
2613                    worktree_diagnostic_summary::Column::ErrorCount,
2614                    worktree_diagnostic_summary::Column::WarningCount,
2615                ])
2616                .to_owned(),
2617            )
2618            .exec(&*tx)
2619            .await?;
2620
2621            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2622            Ok(connection_ids)
2623        })
2624        .await
2625    }
2626
2627    pub async fn start_language_server(
2628        &self,
2629        update: &proto::StartLanguageServer,
2630        connection: ConnectionId,
2631    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2632        let project_id = ProjectId::from_proto(update.project_id);
2633        let room_id = self.room_id_for_project(project_id).await?;
2634        self.room_transaction(room_id, |tx| async move {
2635            let server = update
2636                .server
2637                .as_ref()
2638                .ok_or_else(|| anyhow!("invalid language server"))?;
2639
2640            // Ensure the update comes from the host.
2641            let project = project::Entity::find_by_id(project_id)
2642                .one(&*tx)
2643                .await?
2644                .ok_or_else(|| anyhow!("no such project"))?;
2645            if project.host_connection()? != connection {
2646                return Err(anyhow!("can't update a project hosted by someone else"))?;
2647            }
2648
2649            // Add the newly-started language server.
2650            language_server::Entity::insert(language_server::ActiveModel {
2651                project_id: ActiveValue::set(project_id),
2652                id: ActiveValue::set(server.id as i64),
2653                name: ActiveValue::set(server.name.clone()),
2654                ..Default::default()
2655            })
2656            .on_conflict(
2657                OnConflict::columns([
2658                    language_server::Column::ProjectId,
2659                    language_server::Column::Id,
2660                ])
2661                .update_column(language_server::Column::Name)
2662                .to_owned(),
2663            )
2664            .exec(&*tx)
2665            .await?;
2666
2667            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2668            Ok(connection_ids)
2669        })
2670        .await
2671    }
2672
2673    pub async fn update_worktree_settings(
2674        &self,
2675        update: &proto::UpdateWorktreeSettings,
2676        connection: ConnectionId,
2677    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2678        let project_id = ProjectId::from_proto(update.project_id);
2679        let room_id = self.room_id_for_project(project_id).await?;
2680        self.room_transaction(room_id, |tx| async move {
2681            // Ensure the update comes from the host.
2682            let project = project::Entity::find_by_id(project_id)
2683                .one(&*tx)
2684                .await?
2685                .ok_or_else(|| anyhow!("no such project"))?;
2686            if project.host_connection()? != connection {
2687                return Err(anyhow!("can't update a project hosted by someone else"))?;
2688            }
2689
2690            if let Some(content) = &update.content {
2691                worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
2692                    project_id: ActiveValue::Set(project_id),
2693                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
2694                    path: ActiveValue::Set(update.path.clone()),
2695                    content: ActiveValue::Set(content.clone()),
2696                })
2697                .on_conflict(
2698                    OnConflict::columns([
2699                        worktree_settings_file::Column::ProjectId,
2700                        worktree_settings_file::Column::WorktreeId,
2701                        worktree_settings_file::Column::Path,
2702                    ])
2703                    .update_column(worktree_settings_file::Column::Content)
2704                    .to_owned(),
2705                )
2706                .exec(&*tx)
2707                .await?;
2708            } else {
2709                worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
2710                    project_id: ActiveValue::Set(project_id),
2711                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
2712                    path: ActiveValue::Set(update.path.clone()),
2713                    ..Default::default()
2714                })
2715                .exec(&*tx)
2716                .await?;
2717            }
2718
2719            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2720            Ok(connection_ids)
2721        })
2722        .await
2723    }
2724
2725    pub async fn join_project(
2726        &self,
2727        project_id: ProjectId,
2728        connection: ConnectionId,
2729    ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2730        let room_id = self.room_id_for_project(project_id).await?;
2731        self.room_transaction(room_id, |tx| async move {
2732            let participant = room_participant::Entity::find()
2733                .filter(
2734                    Condition::all()
2735                        .add(
2736                            room_participant::Column::AnsweringConnectionId
2737                                .eq(connection.id as i32),
2738                        )
2739                        .add(
2740                            room_participant::Column::AnsweringConnectionServerId
2741                                .eq(connection.owner_id as i32),
2742                        ),
2743                )
2744                .one(&*tx)
2745                .await?
2746                .ok_or_else(|| anyhow!("must join a room first"))?;
2747
2748            let project = project::Entity::find_by_id(project_id)
2749                .one(&*tx)
2750                .await?
2751                .ok_or_else(|| anyhow!("no such project"))?;
2752            if project.room_id != participant.room_id {
2753                return Err(anyhow!("no such project"))?;
2754            }
2755
2756            let mut collaborators = project
2757                .find_related(project_collaborator::Entity)
2758                .all(&*tx)
2759                .await?;
2760            let replica_ids = collaborators
2761                .iter()
2762                .map(|c| c.replica_id)
2763                .collect::<HashSet<_>>();
2764            let mut replica_id = ReplicaId(1);
2765            while replica_ids.contains(&replica_id) {
2766                replica_id.0 += 1;
2767            }
2768            let new_collaborator = project_collaborator::ActiveModel {
2769                project_id: ActiveValue::set(project_id),
2770                connection_id: ActiveValue::set(connection.id as i32),
2771                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2772                user_id: ActiveValue::set(participant.user_id),
2773                replica_id: ActiveValue::set(replica_id),
2774                is_host: ActiveValue::set(false),
2775                ..Default::default()
2776            }
2777            .insert(&*tx)
2778            .await?;
2779            collaborators.push(new_collaborator);
2780
2781            let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2782            let mut worktrees = db_worktrees
2783                .into_iter()
2784                .map(|db_worktree| {
2785                    (
2786                        db_worktree.id as u64,
2787                        Worktree {
2788                            id: db_worktree.id as u64,
2789                            abs_path: db_worktree.abs_path,
2790                            root_name: db_worktree.root_name,
2791                            visible: db_worktree.visible,
2792                            entries: Default::default(),
2793                            repository_entries: Default::default(),
2794                            diagnostic_summaries: Default::default(),
2795                            settings_files: Default::default(),
2796                            scan_id: db_worktree.scan_id as u64,
2797                            completed_scan_id: db_worktree.completed_scan_id as u64,
2798                        },
2799                    )
2800                })
2801                .collect::<BTreeMap<_, _>>();
2802
2803            // Populate worktree entries.
2804            {
2805                let mut db_entries = worktree_entry::Entity::find()
2806                    .filter(
2807                        Condition::all()
2808                            .add(worktree_entry::Column::ProjectId.eq(project_id))
2809                            .add(worktree_entry::Column::IsDeleted.eq(false)),
2810                    )
2811                    .stream(&*tx)
2812                    .await?;
2813                while let Some(db_entry) = db_entries.next().await {
2814                    let db_entry = db_entry?;
2815                    if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2816                        worktree.entries.push(proto::Entry {
2817                            id: db_entry.id as u64,
2818                            is_dir: db_entry.is_dir,
2819                            path: db_entry.path,
2820                            inode: db_entry.inode as u64,
2821                            mtime: Some(proto::Timestamp {
2822                                seconds: db_entry.mtime_seconds as u64,
2823                                nanos: db_entry.mtime_nanos as u32,
2824                            }),
2825                            is_symlink: db_entry.is_symlink,
2826                            is_ignored: db_entry.is_ignored,
2827                            is_external: db_entry.is_external,
2828                            git_status: db_entry.git_status.map(|status| status as i32),
2829                        });
2830                    }
2831                }
2832            }
2833
2834            // Populate repository entries.
2835            {
2836                let mut db_repository_entries = worktree_repository::Entity::find()
2837                    .filter(
2838                        Condition::all()
2839                            .add(worktree_repository::Column::ProjectId.eq(project_id))
2840                            .add(worktree_repository::Column::IsDeleted.eq(false)),
2841                    )
2842                    .stream(&*tx)
2843                    .await?;
2844                while let Some(db_repository_entry) = db_repository_entries.next().await {
2845                    let db_repository_entry = db_repository_entry?;
2846                    if let Some(worktree) =
2847                        worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
2848                    {
2849                        worktree.repository_entries.insert(
2850                            db_repository_entry.work_directory_id as u64,
2851                            proto::RepositoryEntry {
2852                                work_directory_id: db_repository_entry.work_directory_id as u64,
2853                                branch: db_repository_entry.branch,
2854                            },
2855                        );
2856                    }
2857                }
2858            }
2859
2860            // Populate worktree diagnostic summaries.
2861            {
2862                let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2863                    .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2864                    .stream(&*tx)
2865                    .await?;
2866                while let Some(db_summary) = db_summaries.next().await {
2867                    let db_summary = db_summary?;
2868                    if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2869                        worktree
2870                            .diagnostic_summaries
2871                            .push(proto::DiagnosticSummary {
2872                                path: db_summary.path,
2873                                language_server_id: db_summary.language_server_id as u64,
2874                                error_count: db_summary.error_count as u32,
2875                                warning_count: db_summary.warning_count as u32,
2876                            });
2877                    }
2878                }
2879            }
2880
2881            // Populate worktree settings files
2882            {
2883                let mut db_settings_files = worktree_settings_file::Entity::find()
2884                    .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
2885                    .stream(&*tx)
2886                    .await?;
2887                while let Some(db_settings_file) = db_settings_files.next().await {
2888                    let db_settings_file = db_settings_file?;
2889                    if let Some(worktree) =
2890                        worktrees.get_mut(&(db_settings_file.worktree_id as u64))
2891                    {
2892                        worktree.settings_files.push(WorktreeSettingsFile {
2893                            path: db_settings_file.path,
2894                            content: db_settings_file.content,
2895                        });
2896                    }
2897                }
2898            }
2899
2900            // Populate language servers.
2901            let language_servers = project
2902                .find_related(language_server::Entity)
2903                .all(&*tx)
2904                .await?;
2905
2906            let project = Project {
2907                collaborators: collaborators
2908                    .into_iter()
2909                    .map(|collaborator| ProjectCollaborator {
2910                        connection_id: collaborator.connection(),
2911                        user_id: collaborator.user_id,
2912                        replica_id: collaborator.replica_id,
2913                        is_host: collaborator.is_host,
2914                    })
2915                    .collect(),
2916                worktrees,
2917                language_servers: language_servers
2918                    .into_iter()
2919                    .map(|language_server| proto::LanguageServer {
2920                        id: language_server.id as u64,
2921                        name: language_server.name,
2922                    })
2923                    .collect(),
2924            };
2925            Ok((project, replica_id as ReplicaId))
2926        })
2927        .await
2928    }
2929
2930    pub async fn leave_project(
2931        &self,
2932        project_id: ProjectId,
2933        connection: ConnectionId,
2934    ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
2935        let room_id = self.room_id_for_project(project_id).await?;
2936        self.room_transaction(room_id, |tx| async move {
2937            let result = project_collaborator::Entity::delete_many()
2938                .filter(
2939                    Condition::all()
2940                        .add(project_collaborator::Column::ProjectId.eq(project_id))
2941                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2942                        .add(
2943                            project_collaborator::Column::ConnectionServerId
2944                                .eq(connection.owner_id as i32),
2945                        ),
2946                )
2947                .exec(&*tx)
2948                .await?;
2949            if result.rows_affected == 0 {
2950                Err(anyhow!("not a collaborator on this project"))?;
2951            }
2952
2953            let project = project::Entity::find_by_id(project_id)
2954                .one(&*tx)
2955                .await?
2956                .ok_or_else(|| anyhow!("no such project"))?;
2957            let collaborators = project
2958                .find_related(project_collaborator::Entity)
2959                .all(&*tx)
2960                .await?;
2961            let connection_ids = collaborators
2962                .into_iter()
2963                .map(|collaborator| collaborator.connection())
2964                .collect();
2965
2966            follower::Entity::delete_many()
2967                .filter(
2968                    Condition::any()
2969                        .add(
2970                            Condition::all()
2971                                .add(follower::Column::ProjectId.eq(project_id))
2972                                .add(
2973                                    follower::Column::LeaderConnectionServerId
2974                                        .eq(connection.owner_id),
2975                                )
2976                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
2977                        )
2978                        .add(
2979                            Condition::all()
2980                                .add(follower::Column::ProjectId.eq(project_id))
2981                                .add(
2982                                    follower::Column::FollowerConnectionServerId
2983                                        .eq(connection.owner_id),
2984                                )
2985                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
2986                        ),
2987                )
2988                .exec(&*tx)
2989                .await?;
2990
2991            let room = self.get_room(project.room_id, &tx).await?;
2992            let left_project = LeftProject {
2993                id: project_id,
2994                host_user_id: project.host_user_id,
2995                host_connection_id: project.host_connection()?,
2996                connection_ids,
2997            };
2998            Ok((room, left_project))
2999        })
3000        .await
3001    }
3002
3003    pub async fn project_collaborators(
3004        &self,
3005        project_id: ProjectId,
3006        connection_id: ConnectionId,
3007    ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
3008        let room_id = self.room_id_for_project(project_id).await?;
3009        self.room_transaction(room_id, |tx| async move {
3010            let collaborators = project_collaborator::Entity::find()
3011                .filter(project_collaborator::Column::ProjectId.eq(project_id))
3012                .all(&*tx)
3013                .await?
3014                .into_iter()
3015                .map(|collaborator| ProjectCollaborator {
3016                    connection_id: collaborator.connection(),
3017                    user_id: collaborator.user_id,
3018                    replica_id: collaborator.replica_id,
3019                    is_host: collaborator.is_host,
3020                })
3021                .collect::<Vec<_>>();
3022
3023            if collaborators
3024                .iter()
3025                .any(|collaborator| collaborator.connection_id == connection_id)
3026            {
3027                Ok(collaborators)
3028            } else {
3029                Err(anyhow!("no such project"))?
3030            }
3031        })
3032        .await
3033    }
3034
3035    pub async fn project_connection_ids(
3036        &self,
3037        project_id: ProjectId,
3038        connection_id: ConnectionId,
3039    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
3040        let room_id = self.room_id_for_project(project_id).await?;
3041        self.room_transaction(room_id, |tx| async move {
3042            let mut collaborators = project_collaborator::Entity::find()
3043                .filter(project_collaborator::Column::ProjectId.eq(project_id))
3044                .stream(&*tx)
3045                .await?;
3046
3047            let mut connection_ids = HashSet::default();
3048            while let Some(collaborator) = collaborators.next().await {
3049                let collaborator = collaborator?;
3050                connection_ids.insert(collaborator.connection());
3051            }
3052
3053            if connection_ids.contains(&connection_id) {
3054                Ok(connection_ids)
3055            } else {
3056                Err(anyhow!("no such project"))?
3057            }
3058        })
3059        .await
3060    }
3061
3062    async fn project_guest_connection_ids(
3063        &self,
3064        project_id: ProjectId,
3065        tx: &DatabaseTransaction,
3066    ) -> Result<Vec<ConnectionId>> {
3067        let mut collaborators = project_collaborator::Entity::find()
3068            .filter(
3069                project_collaborator::Column::ProjectId
3070                    .eq(project_id)
3071                    .and(project_collaborator::Column::IsHost.eq(false)),
3072            )
3073            .stream(tx)
3074            .await?;
3075
3076        let mut guest_connection_ids = Vec::new();
3077        while let Some(collaborator) = collaborators.next().await {
3078            let collaborator = collaborator?;
3079            guest_connection_ids.push(collaborator.connection());
3080        }
3081        Ok(guest_connection_ids)
3082    }
3083
3084    async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
3085        self.transaction(|tx| async move {
3086            let project = project::Entity::find_by_id(project_id)
3087                .one(&*tx)
3088                .await?
3089                .ok_or_else(|| anyhow!("project {} not found", project_id))?;
3090            Ok(project.room_id)
3091        })
3092        .await
3093    }
3094
3095    // access tokens
3096
3097    pub async fn create_access_token(
3098        &self,
3099        user_id: UserId,
3100        access_token_hash: &str,
3101        max_access_token_count: usize,
3102    ) -> Result<AccessTokenId> {
3103        self.transaction(|tx| async {
3104            let tx = tx;
3105
3106            let token = access_token::ActiveModel {
3107                user_id: ActiveValue::set(user_id),
3108                hash: ActiveValue::set(access_token_hash.into()),
3109                ..Default::default()
3110            }
3111            .insert(&*tx)
3112            .await?;
3113
3114            access_token::Entity::delete_many()
3115                .filter(
3116                    access_token::Column::Id.in_subquery(
3117                        Query::select()
3118                            .column(access_token::Column::Id)
3119                            .from(access_token::Entity)
3120                            .and_where(access_token::Column::UserId.eq(user_id))
3121                            .order_by(access_token::Column::Id, sea_orm::Order::Desc)
3122                            .limit(10000)
3123                            .offset(max_access_token_count as u64)
3124                            .to_owned(),
3125                    ),
3126                )
3127                .exec(&*tx)
3128                .await?;
3129            Ok(token.id)
3130        })
3131        .await
3132    }
3133
3134    pub async fn get_access_token(
3135        &self,
3136        access_token_id: AccessTokenId,
3137    ) -> Result<access_token::Model> {
3138        self.transaction(|tx| async move {
3139            Ok(access_token::Entity::find_by_id(access_token_id)
3140                .one(&*tx)
3141                .await?
3142                .ok_or_else(|| anyhow!("no such access token"))?)
3143        })
3144        .await
3145    }
3146
3147    // channels
3148
3149    pub async fn create_root_channel(
3150        &self,
3151        name: &str,
3152        live_kit_room: &str,
3153        creator_id: UserId,
3154    ) -> Result<ChannelId> {
3155        self.create_channel(name, None, live_kit_room, creator_id)
3156            .await
3157    }
3158
3159    pub async fn create_channel(
3160        &self,
3161        name: &str,
3162        parent: Option<ChannelId>,
3163        live_kit_room: &str,
3164        creator_id: UserId,
3165    ) -> Result<ChannelId> {
3166        self.transaction(move |tx| async move {
3167            let tx = tx;
3168
3169            if let Some(parent) = parent {
3170                let channels = self.get_channel_ancestors(parent, &*tx).await?;
3171                channel_member::Entity::find()
3172                    .filter(channel_member::Column::ChannelId.is_in(channels.iter().copied()))
3173                    .filter(
3174                        channel_member::Column::UserId
3175                            .eq(creator_id)
3176                            .and(channel_member::Column::Accepted.eq(true)),
3177                    )
3178                    .one(&*tx)
3179                    .await?
3180                    .ok_or_else(|| {
3181                        anyhow!("User does not have the permissions to create this channel")
3182                    })?;
3183            }
3184
3185            let channel = channel::ActiveModel {
3186                name: ActiveValue::Set(name.to_string()),
3187                ..Default::default()
3188            };
3189
3190            let channel = channel.insert(&*tx).await?;
3191
3192            if let Some(parent) = parent {
3193                channel_parent::ActiveModel {
3194                    child_id: ActiveValue::Set(channel.id),
3195                    parent_id: ActiveValue::Set(parent),
3196                }
3197                .insert(&*tx)
3198                .await?;
3199            }
3200
3201            channel_member::ActiveModel {
3202                channel_id: ActiveValue::Set(channel.id),
3203                user_id: ActiveValue::Set(creator_id),
3204                accepted: ActiveValue::Set(true),
3205                admin: ActiveValue::Set(true),
3206                ..Default::default()
3207            }
3208            .insert(&*tx)
3209            .await?;
3210
3211            room::ActiveModel {
3212                channel_id: ActiveValue::Set(Some(channel.id)),
3213                live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
3214                ..Default::default()
3215            }
3216            .insert(&*tx)
3217            .await?;
3218
3219            Ok(channel.id)
3220        })
3221        .await
3222    }
3223
3224    pub async fn remove_channel(
3225        &self,
3226        channel_id: ChannelId,
3227        user_id: UserId,
3228    ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
3229        self.transaction(move |tx| async move {
3230            let tx = tx;
3231
3232            // Check if user is an admin
3233            channel_member::Entity::find()
3234                .filter(
3235                    channel_member::Column::ChannelId
3236                        .eq(channel_id)
3237                        .and(channel_member::Column::UserId.eq(user_id))
3238                        .and(channel_member::Column::Admin.eq(true)),
3239                )
3240                .one(&*tx)
3241                .await?
3242                .ok_or_else(|| anyhow!("user is not allowed to remove this channel"))?;
3243
3244            let mut descendants = self.get_channel_descendants([channel_id], &*tx).await?;
3245
3246            // Keep channels which have another active
3247            let mut channels_to_keep = channel_parent::Entity::find()
3248                .filter(
3249                    channel_parent::Column::ChildId
3250                        .is_in(descendants.keys().copied().filter(|&id| id != channel_id))
3251                        .and(
3252                            channel_parent::Column::ParentId.is_not_in(descendants.keys().copied()),
3253                        ),
3254                )
3255                .stream(&*tx)
3256                .await?;
3257
3258            while let Some(row) = channels_to_keep.next().await {
3259                let row = row?;
3260                descendants.remove(&row.child_id);
3261            }
3262
3263            drop(channels_to_keep);
3264
3265            let channels_to_remove = descendants.keys().copied().collect::<Vec<_>>();
3266
3267            let members_to_notify: Vec<UserId> = channel_member::Entity::find()
3268                .filter(channel_member::Column::ChannelId.is_in(channels_to_remove.iter().copied()))
3269                .select_only()
3270                .column(channel_member::Column::UserId)
3271                .distinct()
3272                .into_values::<_, QueryUserIds>()
3273                .all(&*tx)
3274                .await?;
3275
3276            // Channel members and parents should delete via cascade
3277            channel::Entity::delete_many()
3278                .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
3279                .exec(&*tx)
3280                .await?;
3281
3282            Ok((channels_to_remove, members_to_notify))
3283        })
3284        .await
3285    }
3286
3287    pub async fn invite_channel_member(
3288        &self,
3289        channel_id: ChannelId,
3290        invitee_id: UserId,
3291        inviter_id: UserId,
3292        is_admin: bool,
3293    ) -> Result<()> {
3294        self.transaction(move |tx| async move {
3295            let tx = tx;
3296
3297            // Check if inviter is a member
3298            channel_member::Entity::find()
3299                .filter(
3300                    channel_member::Column::ChannelId
3301                        .eq(channel_id)
3302                        .and(channel_member::Column::UserId.eq(inviter_id))
3303                        .and(channel_member::Column::Admin.eq(true)),
3304                )
3305                .one(&*tx)
3306                .await?
3307                .ok_or_else(|| {
3308                    anyhow!("Inviter does not have permissions to invite the invitee")
3309                })?;
3310
3311            let channel_membership = channel_member::ActiveModel {
3312                channel_id: ActiveValue::Set(channel_id),
3313                user_id: ActiveValue::Set(invitee_id),
3314                accepted: ActiveValue::Set(false),
3315                admin: ActiveValue::Set(is_admin),
3316                ..Default::default()
3317            };
3318
3319            channel_membership.insert(&*tx).await?;
3320
3321            Ok(())
3322        })
3323        .await
3324    }
3325
3326    pub async fn respond_to_channel_invite(
3327        &self,
3328        channel_id: ChannelId,
3329        user_id: UserId,
3330        accept: bool,
3331    ) -> Result<()> {
3332        self.transaction(move |tx| async move {
3333            let tx = tx;
3334
3335            let rows_affected = if accept {
3336                channel_member::Entity::update_many()
3337                    .set(channel_member::ActiveModel {
3338                        accepted: ActiveValue::Set(accept),
3339                        ..Default::default()
3340                    })
3341                    .filter(
3342                        channel_member::Column::ChannelId
3343                            .eq(channel_id)
3344                            .and(channel_member::Column::UserId.eq(user_id))
3345                            .and(channel_member::Column::Accepted.eq(false)),
3346                    )
3347                    .exec(&*tx)
3348                    .await?
3349                    .rows_affected
3350            } else {
3351                channel_member::ActiveModel {
3352                    channel_id: ActiveValue::Unchanged(channel_id),
3353                    user_id: ActiveValue::Unchanged(user_id),
3354                    ..Default::default()
3355                }
3356                .delete(&*tx)
3357                .await?
3358                .rows_affected
3359            };
3360
3361            if rows_affected == 0 {
3362                Err(anyhow!("no such invitation"))?;
3363            }
3364
3365            Ok(())
3366        })
3367        .await
3368    }
3369
3370    pub async fn get_channel_invites(&self, user_id: UserId) -> Result<Vec<Channel>> {
3371        self.transaction(|tx| async move {
3372            let tx = tx;
3373
3374            let channel_invites = channel_member::Entity::find()
3375                .filter(
3376                    channel_member::Column::UserId
3377                        .eq(user_id)
3378                        .and(channel_member::Column::Accepted.eq(false)),
3379                )
3380                .all(&*tx)
3381                .await?;
3382
3383            let channels = channel::Entity::find()
3384                .filter(
3385                    channel::Column::Id.is_in(
3386                        channel_invites
3387                            .into_iter()
3388                            .map(|channel_member| channel_member.channel_id),
3389                    ),
3390                )
3391                .all(&*tx)
3392                .await?;
3393
3394            let channels = channels
3395                .into_iter()
3396                .map(|channel| Channel {
3397                    id: channel.id,
3398                    name: channel.name,
3399                    parent_id: None,
3400                })
3401                .collect();
3402
3403            Ok(channels)
3404        })
3405        .await
3406    }
3407
3408    pub async fn get_channels(
3409        &self,
3410        user_id: UserId,
3411    ) -> Result<(Vec<Channel>, HashMap<ChannelId, Vec<UserId>>)> {
3412        self.transaction(|tx| async move {
3413            let tx = tx;
3414
3415            let starting_channel_ids: Vec<ChannelId> = channel_member::Entity::find()
3416                .filter(
3417                    channel_member::Column::UserId
3418                        .eq(user_id)
3419                        .and(channel_member::Column::Accepted.eq(true)),
3420                )
3421                .select_only()
3422                .column(channel_member::Column::ChannelId)
3423                .into_values::<_, QueryChannelIds>()
3424                .all(&*tx)
3425                .await?;
3426
3427            let parents_by_child_id = self
3428                .get_channel_descendants(starting_channel_ids, &*tx)
3429                .await?;
3430
3431            let mut channels = Vec::with_capacity(parents_by_child_id.len());
3432            let mut rows = channel::Entity::find()
3433                .filter(channel::Column::Id.is_in(parents_by_child_id.keys().copied()))
3434                .stream(&*tx)
3435                .await?;
3436
3437            while let Some(row) = rows.next().await {
3438                let row = row?;
3439                channels.push(Channel {
3440                    id: row.id,
3441                    name: row.name,
3442                    parent_id: parents_by_child_id.get(&row.id).copied().flatten(),
3443                });
3444            }
3445
3446            drop(rows);
3447
3448            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
3449            enum QueryUserIdsAndChannelIds {
3450                ChannelId,
3451                UserId,
3452            }
3453
3454            let mut participants = room_participant::Entity::find()
3455                .inner_join(room::Entity)
3456                .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
3457                .select_only()
3458                .column(room::Column::ChannelId)
3459                .column(room_participant::Column::UserId)
3460                .into_values::<_, QueryUserIdsAndChannelIds>()
3461                .stream(&*tx)
3462                .await?;
3463
3464            let mut participant_map: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
3465            while let Some(row) = participants.next().await {
3466                let row: (ChannelId, UserId) = row?;
3467                participant_map.entry(row.0).or_default().push(row.1)
3468            }
3469
3470            drop(participants);
3471
3472            Ok((channels, participant_map))
3473        })
3474        .await
3475    }
3476
3477    pub async fn get_channel_members(&self, id: ChannelId) -> Result<Vec<UserId>> {
3478        self.transaction(|tx| async move {
3479            let tx = tx;
3480            let user_ids = self.get_channel_members_internal(id, &*tx).await?;
3481            Ok(user_ids)
3482        })
3483        .await
3484    }
3485
3486    pub async fn get_channel_members_internal(
3487        &self,
3488        id: ChannelId,
3489        tx: &DatabaseTransaction,
3490    ) -> Result<Vec<UserId>> {
3491        let ancestor_ids = self.get_channel_ancestors(id, tx).await?;
3492        let user_ids = channel_member::Entity::find()
3493            .distinct()
3494            .filter(channel_member::Column::ChannelId.is_in(ancestor_ids.iter().copied()))
3495            .select_only()
3496            .column(channel_member::Column::UserId)
3497            .into_values::<_, QueryUserIds>()
3498            .all(&*tx)
3499            .await?;
3500        Ok(user_ids)
3501    }
3502
3503    async fn get_channel_ancestors(
3504        &self,
3505        id: ChannelId,
3506        tx: &DatabaseTransaction,
3507    ) -> Result<Vec<ChannelId>> {
3508        let sql = format!(
3509            r#"
3510            WITH RECURSIVE channel_tree(child_id, parent_id) AS (
3511                    SELECT CAST(NULL as INTEGER) as child_id, root_ids.column1 as parent_id
3512                    FROM (VALUES ({})) as root_ids
3513                UNION
3514                    SELECT channel_parents.child_id, channel_parents.parent_id
3515                    FROM channel_parents, channel_tree
3516                    WHERE channel_parents.child_id = channel_tree.parent_id
3517            )
3518            SELECT DISTINCT channel_tree.parent_id
3519            FROM channel_tree
3520            "#,
3521            id
3522        );
3523
3524        #[derive(FromQueryResult, Debug, PartialEq)]
3525        pub struct ChannelParent {
3526            pub parent_id: ChannelId,
3527        }
3528
3529        let stmt = Statement::from_string(self.pool.get_database_backend(), sql);
3530
3531        let mut channel_ids_stream = channel_parent::Entity::find()
3532            .from_raw_sql(stmt)
3533            .into_model::<ChannelParent>()
3534            .stream(&*tx)
3535            .await?;
3536
3537        let mut channel_ids = vec![];
3538        while let Some(channel_id) = channel_ids_stream.next().await {
3539            channel_ids.push(channel_id?.parent_id);
3540        }
3541
3542        Ok(channel_ids)
3543    }
3544
3545    async fn get_channel_descendants(
3546        &self,
3547        channel_ids: impl IntoIterator<Item = ChannelId>,
3548        tx: &DatabaseTransaction,
3549    ) -> Result<HashMap<ChannelId, Option<ChannelId>>> {
3550        let mut values = String::new();
3551        for id in channel_ids {
3552            if !values.is_empty() {
3553                values.push_str(", ");
3554            }
3555            write!(&mut values, "({})", id).unwrap();
3556        }
3557
3558        if values.is_empty() {
3559            return Ok(HashMap::default());
3560        }
3561
3562        let sql = format!(
3563            r#"
3564            WITH RECURSIVE channel_tree(child_id, parent_id) AS (
3565                    SELECT root_ids.column1 as child_id, CAST(NULL as INTEGER) as parent_id
3566                    FROM (VALUES {}) as root_ids
3567                UNION
3568                    SELECT channel_parents.child_id, channel_parents.parent_id
3569                    FROM channel_parents, channel_tree
3570                    WHERE channel_parents.parent_id = channel_tree.child_id
3571            )
3572            SELECT channel_tree.child_id, channel_tree.parent_id
3573            FROM channel_tree
3574            ORDER BY child_id, parent_id IS NOT NULL
3575            "#,
3576            values
3577        );
3578
3579        #[derive(FromQueryResult, Debug, PartialEq)]
3580        pub struct ChannelParent {
3581            pub child_id: ChannelId,
3582            pub parent_id: Option<ChannelId>,
3583        }
3584
3585        let stmt = Statement::from_string(self.pool.get_database_backend(), sql);
3586
3587        let mut parents_by_child_id = HashMap::default();
3588        let mut parents = channel_parent::Entity::find()
3589            .from_raw_sql(stmt)
3590            .into_model::<ChannelParent>()
3591            .stream(tx)
3592            .await?;
3593
3594        while let Some(parent) = parents.next().await {
3595            let parent = parent?;
3596            parents_by_child_id.insert(parent.child_id, parent.parent_id);
3597        }
3598
3599        Ok(parents_by_child_id)
3600    }
3601
3602    pub async fn get_channel(&self, channel_id: ChannelId) -> Result<Option<Channel>> {
3603        self.transaction(|tx| async move {
3604            let tx = tx;
3605            let channel = channel::Entity::find_by_id(channel_id).one(&*tx).await?;
3606
3607            Ok(channel.map(|channel| Channel {
3608                id: channel.id,
3609                name: channel.name,
3610                parent_id: None,
3611            }))
3612        })
3613        .await
3614    }
3615
3616    pub async fn room_id_for_channel(&self, channel_id: ChannelId) -> Result<RoomId> {
3617        self.transaction(|tx| async move {
3618            let tx = tx;
3619            let room = channel::Model {
3620                id: channel_id,
3621                ..Default::default()
3622            }
3623            .find_related(room::Entity)
3624            .one(&*tx)
3625            .await?
3626            .ok_or_else(|| anyhow!("invalid channel"))?;
3627            Ok(room.id)
3628        })
3629        .await
3630    }
3631
3632    async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
3633    where
3634        F: Send + Fn(TransactionHandle) -> Fut,
3635        Fut: Send + Future<Output = Result<T>>,
3636    {
3637        let body = async {
3638            let mut i = 0;
3639            loop {
3640                let (tx, result) = self.with_transaction(&f).await?;
3641                match result {
3642                    Ok(result) => match tx.commit().await.map_err(Into::into) {
3643                        Ok(()) => return Ok(result),
3644                        Err(error) => {
3645                            if !self.retry_on_serialization_error(&error, i).await {
3646                                return Err(error);
3647                            }
3648                        }
3649                    },
3650                    Err(error) => {
3651                        tx.rollback().await?;
3652                        if !self.retry_on_serialization_error(&error, i).await {
3653                            return Err(error);
3654                        }
3655                    }
3656                }
3657                i += 1;
3658            }
3659        };
3660
3661        self.run(body).await
3662    }
3663
3664    async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
3665    where
3666        F: Send + Fn(TransactionHandle) -> Fut,
3667        Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
3668    {
3669        let body = async {
3670            let mut i = 0;
3671            loop {
3672                let (tx, result) = self.with_transaction(&f).await?;
3673                match result {
3674                    Ok(Some((room_id, data))) => {
3675                        let lock = self.rooms.entry(room_id).or_default().clone();
3676                        let _guard = lock.lock_owned().await;
3677                        match tx.commit().await.map_err(Into::into) {
3678                            Ok(()) => {
3679                                return Ok(Some(RoomGuard {
3680                                    data,
3681                                    _guard,
3682                                    _not_send: PhantomData,
3683                                }));
3684                            }
3685                            Err(error) => {
3686                                if !self.retry_on_serialization_error(&error, i).await {
3687                                    return Err(error);
3688                                }
3689                            }
3690                        }
3691                    }
3692                    Ok(None) => match tx.commit().await.map_err(Into::into) {
3693                        Ok(()) => return Ok(None),
3694                        Err(error) => {
3695                            if !self.retry_on_serialization_error(&error, i).await {
3696                                return Err(error);
3697                            }
3698                        }
3699                    },
3700                    Err(error) => {
3701                        tx.rollback().await?;
3702                        if !self.retry_on_serialization_error(&error, i).await {
3703                            return Err(error);
3704                        }
3705                    }
3706                }
3707                i += 1;
3708            }
3709        };
3710
3711        self.run(body).await
3712    }
3713
3714    async fn room_transaction<F, Fut, T>(&self, room_id: RoomId, f: F) -> Result<RoomGuard<T>>
3715    where
3716        F: Send + Fn(TransactionHandle) -> Fut,
3717        Fut: Send + Future<Output = Result<T>>,
3718    {
3719        let body = async {
3720            let mut i = 0;
3721            loop {
3722                let lock = self.rooms.entry(room_id).or_default().clone();
3723                let _guard = lock.lock_owned().await;
3724                let (tx, result) = self.with_transaction(&f).await?;
3725                match result {
3726                    Ok(data) => match tx.commit().await.map_err(Into::into) {
3727                        Ok(()) => {
3728                            return Ok(RoomGuard {
3729                                data,
3730                                _guard,
3731                                _not_send: PhantomData,
3732                            });
3733                        }
3734                        Err(error) => {
3735                            if !self.retry_on_serialization_error(&error, i).await {
3736                                return Err(error);
3737                            }
3738                        }
3739                    },
3740                    Err(error) => {
3741                        tx.rollback().await?;
3742                        if !self.retry_on_serialization_error(&error, i).await {
3743                            return Err(error);
3744                        }
3745                    }
3746                }
3747                i += 1;
3748            }
3749        };
3750
3751        self.run(body).await
3752    }
3753
3754    async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
3755    where
3756        F: Send + Fn(TransactionHandle) -> Fut,
3757        Fut: Send + Future<Output = Result<T>>,
3758    {
3759        let tx = self
3760            .pool
3761            .begin_with_config(Some(IsolationLevel::Serializable), None)
3762            .await?;
3763
3764        let mut tx = Arc::new(Some(tx));
3765        let result = f(TransactionHandle(tx.clone())).await;
3766        let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
3767            return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
3768        };
3769
3770        Ok((tx, result))
3771    }
3772
3773    async fn run<F, T>(&self, future: F) -> Result<T>
3774    where
3775        F: Future<Output = Result<T>>,
3776    {
3777        #[cfg(test)]
3778        {
3779            if let Executor::Deterministic(executor) = &self.executor {
3780                executor.simulate_random_delay().await;
3781            }
3782
3783            self.runtime.as_ref().unwrap().block_on(future)
3784        }
3785
3786        #[cfg(not(test))]
3787        {
3788            future.await
3789        }
3790    }
3791
3792    async fn retry_on_serialization_error(&self, error: &Error, prev_attempt_count: u32) -> bool {
3793        // If the error is due to a failure to serialize concurrent transactions, then retry
3794        // this transaction after a delay. With each subsequent retry, double the delay duration.
3795        // Also vary the delay randomly in order to ensure different database connections retry
3796        // at different times.
3797        if is_serialization_error(error) {
3798            let base_delay = 4_u64 << prev_attempt_count.min(16);
3799            let randomized_delay = base_delay as f32 * self.rng.lock().await.gen_range(0.5..=2.0);
3800            log::info!(
3801                "retrying transaction after serialization error. delay: {} ms.",
3802                randomized_delay
3803            );
3804            self.executor
3805                .sleep(Duration::from_millis(randomized_delay as u64))
3806                .await;
3807            true
3808        } else {
3809            false
3810        }
3811    }
3812}
3813
3814fn is_serialization_error(error: &Error) -> bool {
3815    const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
3816    match error {
3817        Error::Database(
3818            DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
3819            | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
3820        ) if error
3821            .as_database_error()
3822            .and_then(|error| error.code())
3823            .as_deref()
3824            == Some(SERIALIZATION_FAILURE_CODE) =>
3825        {
3826            true
3827        }
3828        _ => false,
3829    }
3830}
3831
3832struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
3833
3834impl Deref for TransactionHandle {
3835    type Target = DatabaseTransaction;
3836
3837    fn deref(&self) -> &Self::Target {
3838        self.0.as_ref().as_ref().unwrap()
3839    }
3840}
3841
3842pub struct RoomGuard<T> {
3843    data: T,
3844    _guard: OwnedMutexGuard<()>,
3845    _not_send: PhantomData<Rc<()>>,
3846}
3847
3848impl<T> Deref for RoomGuard<T> {
3849    type Target = T;
3850
3851    fn deref(&self) -> &T {
3852        &self.data
3853    }
3854}
3855
3856impl<T> DerefMut for RoomGuard<T> {
3857    fn deref_mut(&mut self) -> &mut T {
3858        &mut self.data
3859    }
3860}
3861
3862#[derive(Debug, Serialize, Deserialize)]
3863pub struct NewUserParams {
3864    pub github_login: String,
3865    pub github_user_id: i32,
3866    pub invite_count: i32,
3867}
3868
3869#[derive(Debug)]
3870pub struct NewUserResult {
3871    pub user_id: UserId,
3872    pub metrics_id: String,
3873    pub inviting_user_id: Option<UserId>,
3874    pub signup_device_id: Option<String>,
3875}
3876
3877#[derive(FromQueryResult, Debug, PartialEq)]
3878pub struct Channel {
3879    pub id: ChannelId,
3880    pub name: String,
3881    pub parent_id: Option<ChannelId>,
3882}
3883
3884fn random_invite_code() -> String {
3885    nanoid::nanoid!(16)
3886}
3887
3888fn random_email_confirmation_code() -> String {
3889    nanoid::nanoid!(64)
3890}
3891
3892macro_rules! id_type {
3893    ($name:ident) => {
3894        #[derive(
3895            Clone,
3896            Copy,
3897            Debug,
3898            Default,
3899            PartialEq,
3900            Eq,
3901            PartialOrd,
3902            Ord,
3903            Hash,
3904            Serialize,
3905            Deserialize,
3906        )]
3907        #[serde(transparent)]
3908        pub struct $name(pub i32);
3909
3910        impl $name {
3911            #[allow(unused)]
3912            pub const MAX: Self = Self(i32::MAX);
3913
3914            #[allow(unused)]
3915            pub fn from_proto(value: u64) -> Self {
3916                Self(value as i32)
3917            }
3918
3919            #[allow(unused)]
3920            pub fn to_proto(self) -> u64 {
3921                self.0 as u64
3922            }
3923        }
3924
3925        impl std::fmt::Display for $name {
3926            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
3927                self.0.fmt(f)
3928            }
3929        }
3930
3931        impl From<$name> for sea_query::Value {
3932            fn from(value: $name) -> Self {
3933                sea_query::Value::Int(Some(value.0))
3934            }
3935        }
3936
3937        impl sea_orm::TryGetable for $name {
3938            fn try_get(
3939                res: &sea_orm::QueryResult,
3940                pre: &str,
3941                col: &str,
3942            ) -> Result<Self, sea_orm::TryGetError> {
3943                Ok(Self(i32::try_get(res, pre, col)?))
3944            }
3945        }
3946
3947        impl sea_query::ValueType for $name {
3948            fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
3949                match v {
3950                    Value::TinyInt(Some(int)) => {
3951                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3952                    }
3953                    Value::SmallInt(Some(int)) => {
3954                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3955                    }
3956                    Value::Int(Some(int)) => {
3957                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3958                    }
3959                    Value::BigInt(Some(int)) => {
3960                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3961                    }
3962                    Value::TinyUnsigned(Some(int)) => {
3963                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3964                    }
3965                    Value::SmallUnsigned(Some(int)) => {
3966                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3967                    }
3968                    Value::Unsigned(Some(int)) => {
3969                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3970                    }
3971                    Value::BigUnsigned(Some(int)) => {
3972                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3973                    }
3974                    _ => Err(sea_query::ValueTypeErr),
3975                }
3976            }
3977
3978            fn type_name() -> String {
3979                stringify!($name).into()
3980            }
3981
3982            fn array_type() -> sea_query::ArrayType {
3983                sea_query::ArrayType::Int
3984            }
3985
3986            fn column_type() -> sea_query::ColumnType {
3987                sea_query::ColumnType::Integer(None)
3988            }
3989        }
3990
3991        impl sea_orm::TryFromU64 for $name {
3992            fn try_from_u64(n: u64) -> Result<Self, DbErr> {
3993                Ok(Self(n.try_into().map_err(|_| {
3994                    DbErr::ConvertFromU64(concat!(
3995                        "error converting ",
3996                        stringify!($name),
3997                        " to u64"
3998                    ))
3999                })?))
4000            }
4001        }
4002
4003        impl sea_query::Nullable for $name {
4004            fn null() -> Value {
4005                Value::Int(None)
4006            }
4007        }
4008    };
4009}
4010
4011id_type!(AccessTokenId);
4012id_type!(ChannelId);
4013id_type!(ChannelMemberId);
4014id_type!(ContactId);
4015id_type!(FollowerId);
4016id_type!(RoomId);
4017id_type!(RoomParticipantId);
4018id_type!(ProjectId);
4019id_type!(ProjectCollaboratorId);
4020id_type!(ReplicaId);
4021id_type!(ServerId);
4022id_type!(SignupId);
4023id_type!(UserId);
4024
4025#[derive(Clone)]
4026pub struct JoinRoom {
4027    pub room: proto::Room,
4028    pub channel_id: Option<ChannelId>,
4029    pub channel_members: Vec<UserId>,
4030}
4031
4032pub struct RejoinedRoom {
4033    pub room: proto::Room,
4034    pub rejoined_projects: Vec<RejoinedProject>,
4035    pub reshared_projects: Vec<ResharedProject>,
4036    pub channel_id: Option<ChannelId>,
4037    pub channel_members: Vec<UserId>,
4038}
4039
4040pub struct ResharedProject {
4041    pub id: ProjectId,
4042    pub old_connection_id: ConnectionId,
4043    pub collaborators: Vec<ProjectCollaborator>,
4044    pub worktrees: Vec<proto::WorktreeMetadata>,
4045}
4046
4047pub struct RejoinedProject {
4048    pub id: ProjectId,
4049    pub old_connection_id: ConnectionId,
4050    pub collaborators: Vec<ProjectCollaborator>,
4051    pub worktrees: Vec<RejoinedWorktree>,
4052    pub language_servers: Vec<proto::LanguageServer>,
4053}
4054
4055#[derive(Debug)]
4056pub struct RejoinedWorktree {
4057    pub id: u64,
4058    pub abs_path: String,
4059    pub root_name: String,
4060    pub visible: bool,
4061    pub updated_entries: Vec<proto::Entry>,
4062    pub removed_entries: Vec<u64>,
4063    pub updated_repositories: Vec<proto::RepositoryEntry>,
4064    pub removed_repositories: Vec<u64>,
4065    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
4066    pub settings_files: Vec<WorktreeSettingsFile>,
4067    pub scan_id: u64,
4068    pub completed_scan_id: u64,
4069}
4070
4071pub struct LeftRoom {
4072    pub room: proto::Room,
4073    pub channel_id: Option<ChannelId>,
4074    pub channel_members: Vec<UserId>,
4075    pub left_projects: HashMap<ProjectId, LeftProject>,
4076    pub canceled_calls_to_user_ids: Vec<UserId>,
4077    pub deleted: bool,
4078}
4079
4080pub struct RefreshedRoom {
4081    pub room: proto::Room,
4082    pub channel_id: Option<ChannelId>,
4083    pub channel_members: Vec<UserId>,
4084    pub stale_participant_user_ids: Vec<UserId>,
4085    pub canceled_calls_to_user_ids: Vec<UserId>,
4086}
4087
4088pub struct Project {
4089    pub collaborators: Vec<ProjectCollaborator>,
4090    pub worktrees: BTreeMap<u64, Worktree>,
4091    pub language_servers: Vec<proto::LanguageServer>,
4092}
4093
4094pub struct ProjectCollaborator {
4095    pub connection_id: ConnectionId,
4096    pub user_id: UserId,
4097    pub replica_id: ReplicaId,
4098    pub is_host: bool,
4099}
4100
4101impl ProjectCollaborator {
4102    pub fn to_proto(&self) -> proto::Collaborator {
4103        proto::Collaborator {
4104            peer_id: Some(self.connection_id.into()),
4105            replica_id: self.replica_id.0 as u32,
4106            user_id: self.user_id.to_proto(),
4107        }
4108    }
4109}
4110
4111#[derive(Debug)]
4112pub struct LeftProject {
4113    pub id: ProjectId,
4114    pub host_user_id: UserId,
4115    pub host_connection_id: ConnectionId,
4116    pub connection_ids: Vec<ConnectionId>,
4117}
4118
4119pub struct Worktree {
4120    pub id: u64,
4121    pub abs_path: String,
4122    pub root_name: String,
4123    pub visible: bool,
4124    pub entries: Vec<proto::Entry>,
4125    pub repository_entries: BTreeMap<u64, proto::RepositoryEntry>,
4126    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
4127    pub settings_files: Vec<WorktreeSettingsFile>,
4128    pub scan_id: u64,
4129    pub completed_scan_id: u64,
4130}
4131
4132#[derive(Debug)]
4133pub struct WorktreeSettingsFile {
4134    pub path: String,
4135    pub content: String,
4136}
4137
4138#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
4139enum QueryChannelIds {
4140    ChannelId,
4141}
4142
4143#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
4144enum QueryUserIds {
4145    UserId,
4146}
4147
4148#[cfg(test)]
4149pub use test::*;
4150
4151#[cfg(test)]
4152mod test {
4153    use super::*;
4154    use gpui::executor::Background;
4155    use parking_lot::Mutex;
4156    use sea_orm::ConnectionTrait;
4157    use sqlx::migrate::MigrateDatabase;
4158    use std::sync::Arc;
4159
4160    pub struct TestDb {
4161        pub db: Option<Arc<Database>>,
4162        pub connection: Option<sqlx::AnyConnection>,
4163    }
4164
4165    impl TestDb {
4166        pub fn sqlite(background: Arc<Background>) -> Self {
4167            let url = format!("sqlite::memory:");
4168            let runtime = tokio::runtime::Builder::new_current_thread()
4169                .enable_io()
4170                .enable_time()
4171                .build()
4172                .unwrap();
4173
4174            let mut db = runtime.block_on(async {
4175                let mut options = ConnectOptions::new(url);
4176                options.max_connections(5);
4177                let db = Database::new(options, Executor::Deterministic(background))
4178                    .await
4179                    .unwrap();
4180                let sql = include_str!(concat!(
4181                    env!("CARGO_MANIFEST_DIR"),
4182                    "/migrations.sqlite/20221109000000_test_schema.sql"
4183                ));
4184                db.pool
4185                    .execute(sea_orm::Statement::from_string(
4186                        db.pool.get_database_backend(),
4187                        sql.into(),
4188                    ))
4189                    .await
4190                    .unwrap();
4191                db
4192            });
4193
4194            db.runtime = Some(runtime);
4195
4196            Self {
4197                db: Some(Arc::new(db)),
4198                connection: None,
4199            }
4200        }
4201
4202        pub fn postgres(background: Arc<Background>) -> Self {
4203            static LOCK: Mutex<()> = Mutex::new(());
4204
4205            let _guard = LOCK.lock();
4206            let mut rng = StdRng::from_entropy();
4207            let url = format!(
4208                "postgres://postgres@localhost/zed-test-{}",
4209                rng.gen::<u128>()
4210            );
4211            let runtime = tokio::runtime::Builder::new_current_thread()
4212                .enable_io()
4213                .enable_time()
4214                .build()
4215                .unwrap();
4216
4217            let mut db = runtime.block_on(async {
4218                sqlx::Postgres::create_database(&url)
4219                    .await
4220                    .expect("failed to create test db");
4221                let mut options = ConnectOptions::new(url);
4222                options
4223                    .max_connections(5)
4224                    .idle_timeout(Duration::from_secs(0));
4225                let db = Database::new(options, Executor::Deterministic(background))
4226                    .await
4227                    .unwrap();
4228                let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
4229                db.migrate(Path::new(migrations_path), false).await.unwrap();
4230                db
4231            });
4232
4233            db.runtime = Some(runtime);
4234
4235            Self {
4236                db: Some(Arc::new(db)),
4237                connection: None,
4238            }
4239        }
4240
4241        pub fn db(&self) -> &Arc<Database> {
4242            self.db.as_ref().unwrap()
4243        }
4244    }
4245
4246    impl Drop for TestDb {
4247        fn drop(&mut self) {
4248            let db = self.db.take().unwrap();
4249            if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
4250                db.runtime.as_ref().unwrap().block_on(async {
4251                    use util::ResultExt;
4252                    let query = "
4253                        SELECT pg_terminate_backend(pg_stat_activity.pid)
4254                        FROM pg_stat_activity
4255                        WHERE
4256                            pg_stat_activity.datname = current_database() AND
4257                            pid <> pg_backend_pid();
4258                    ";
4259                    db.pool
4260                        .execute(sea_orm::Statement::from_string(
4261                            db.pool.get_database_backend(),
4262                            query.into(),
4263                        ))
4264                        .await
4265                        .log_err();
4266                    sqlx::Postgres::drop_database(db.options.get_url())
4267                        .await
4268                        .log_err();
4269                })
4270            }
4271        }
4272    }
4273}