db.rs

   1mod access_token;
   2mod channel;
   3mod channel_member;
   4mod channel_parent;
   5mod contact;
   6mod follower;
   7mod language_server;
   8mod project;
   9mod project_collaborator;
  10mod room;
  11mod room_participant;
  12mod server;
  13mod signup;
  14#[cfg(test)]
  15mod tests;
  16mod user;
  17mod worktree;
  18mod worktree_diagnostic_summary;
  19mod worktree_entry;
  20mod worktree_repository;
  21mod worktree_repository_statuses;
  22mod worktree_settings_file;
  23
  24use crate::executor::Executor;
  25use crate::{Error, Result};
  26use anyhow::anyhow;
  27use collections::{BTreeMap, HashMap, HashSet};
  28pub use contact::Contact;
  29use dashmap::DashMap;
  30use futures::StreamExt;
  31use hyper::StatusCode;
  32use rand::prelude::StdRng;
  33use rand::{Rng, SeedableRng};
  34use rpc::{proto, ConnectionId};
  35use sea_orm::Condition;
  36pub use sea_orm::ConnectOptions;
  37use sea_orm::{
  38    entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
  39    DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
  40    Statement, TransactionTrait,
  41};
  42use sea_query::{Alias, Expr, OnConflict, Query};
  43use serde::{Deserialize, Serialize};
  44pub use signup::{Invite, NewSignup, WaitlistSummary};
  45use sqlx::migrate::{Migrate, Migration, MigrationSource};
  46use sqlx::Connection;
  47use std::fmt::Write as _;
  48use std::ops::{Deref, DerefMut};
  49use std::path::Path;
  50use std::time::Duration;
  51use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
  52use tokio::sync::{Mutex, OwnedMutexGuard};
  53pub use user::Model as User;
  54
  55pub struct Database {
  56    options: ConnectOptions,
  57    pool: DatabaseConnection,
  58    rooms: DashMap<RoomId, Arc<Mutex<()>>>,
  59    rng: Mutex<StdRng>,
  60    executor: Executor,
  61    #[cfg(test)]
  62    runtime: Option<tokio::runtime::Runtime>,
  63}
  64
  65impl Database {
  66    pub async fn new(options: ConnectOptions, executor: Executor) -> Result<Self> {
  67        Ok(Self {
  68            options: options.clone(),
  69            pool: sea_orm::Database::connect(options).await?,
  70            rooms: DashMap::with_capacity(16384),
  71            rng: Mutex::new(StdRng::seed_from_u64(0)),
  72            executor,
  73            #[cfg(test)]
  74            runtime: None,
  75        })
  76    }
  77
  78    #[cfg(test)]
  79    pub fn reset(&self) {
  80        self.rooms.clear();
  81    }
  82
  83    pub async fn migrate(
  84        &self,
  85        migrations_path: &Path,
  86        ignore_checksum_mismatch: bool,
  87    ) -> anyhow::Result<Vec<(Migration, Duration)>> {
  88        let migrations = MigrationSource::resolve(migrations_path)
  89            .await
  90            .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
  91
  92        let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
  93
  94        connection.ensure_migrations_table().await?;
  95        let applied_migrations: HashMap<_, _> = connection
  96            .list_applied_migrations()
  97            .await?
  98            .into_iter()
  99            .map(|m| (m.version, m))
 100            .collect();
 101
 102        let mut new_migrations = Vec::new();
 103        for migration in migrations {
 104            match applied_migrations.get(&migration.version) {
 105                Some(applied_migration) => {
 106                    if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
 107                    {
 108                        Err(anyhow!(
 109                            "checksum mismatch for applied migration {}",
 110                            migration.description
 111                        ))?;
 112                    }
 113                }
 114                None => {
 115                    let elapsed = connection.apply(&migration).await?;
 116                    new_migrations.push((migration, elapsed));
 117                }
 118            }
 119        }
 120
 121        Ok(new_migrations)
 122    }
 123
 124    pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
 125        self.transaction(|tx| async move {
 126            let server = server::ActiveModel {
 127                environment: ActiveValue::set(environment.into()),
 128                ..Default::default()
 129            }
 130            .insert(&*tx)
 131            .await?;
 132            Ok(server.id)
 133        })
 134        .await
 135    }
 136
 137    pub async fn stale_room_ids(
 138        &self,
 139        environment: &str,
 140        new_server_id: ServerId,
 141    ) -> Result<Vec<RoomId>> {
 142        self.transaction(|tx| async move {
 143            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 144            enum QueryAs {
 145                RoomId,
 146            }
 147
 148            let stale_server_epochs = self
 149                .stale_server_ids(environment, new_server_id, &tx)
 150                .await?;
 151            Ok(room_participant::Entity::find()
 152                .select_only()
 153                .column(room_participant::Column::RoomId)
 154                .distinct()
 155                .filter(
 156                    room_participant::Column::AnsweringConnectionServerId
 157                        .is_in(stale_server_epochs),
 158                )
 159                .into_values::<_, QueryAs>()
 160                .all(&*tx)
 161                .await?)
 162        })
 163        .await
 164    }
 165
 166    pub async fn refresh_room(
 167        &self,
 168        room_id: RoomId,
 169        new_server_id: ServerId,
 170    ) -> Result<RoomGuard<RefreshedRoom>> {
 171        self.room_transaction(room_id, |tx| async move {
 172            let stale_participant_filter = Condition::all()
 173                .add(room_participant::Column::RoomId.eq(room_id))
 174                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
 175                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
 176
 177            let stale_participant_user_ids = room_participant::Entity::find()
 178                .filter(stale_participant_filter.clone())
 179                .all(&*tx)
 180                .await?
 181                .into_iter()
 182                .map(|participant| participant.user_id)
 183                .collect::<Vec<_>>();
 184
 185            // Delete participants who failed to reconnect and cancel their calls.
 186            let mut canceled_calls_to_user_ids = Vec::new();
 187            room_participant::Entity::delete_many()
 188                .filter(stale_participant_filter)
 189                .exec(&*tx)
 190                .await?;
 191            let called_participants = room_participant::Entity::find()
 192                .filter(
 193                    Condition::all()
 194                        .add(
 195                            room_participant::Column::CallingUserId
 196                                .is_in(stale_participant_user_ids.iter().copied()),
 197                        )
 198                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
 199                )
 200                .all(&*tx)
 201                .await?;
 202            room_participant::Entity::delete_many()
 203                .filter(
 204                    room_participant::Column::Id
 205                        .is_in(called_participants.iter().map(|participant| participant.id)),
 206                )
 207                .exec(&*tx)
 208                .await?;
 209            canceled_calls_to_user_ids.extend(
 210                called_participants
 211                    .into_iter()
 212                    .map(|participant| participant.user_id),
 213            );
 214
 215            let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
 216            let channel_members = if let Some(channel_id) = channel_id {
 217                self.get_channel_members_internal(channel_id, &tx).await?
 218            } else {
 219                Vec::new()
 220            };
 221
 222            // Delete the room if it becomes empty.
 223            if room.participants.is_empty() {
 224                project::Entity::delete_many()
 225                    .filter(project::Column::RoomId.eq(room_id))
 226                    .exec(&*tx)
 227                    .await?;
 228                room::Entity::delete_by_id(room_id).exec(&*tx).await?;
 229            }
 230
 231            Ok(RefreshedRoom {
 232                room,
 233                channel_id,
 234                channel_members,
 235                stale_participant_user_ids,
 236                canceled_calls_to_user_ids,
 237            })
 238        })
 239        .await
 240    }
 241
 242    pub async fn delete_stale_servers(
 243        &self,
 244        environment: &str,
 245        new_server_id: ServerId,
 246    ) -> Result<()> {
 247        self.transaction(|tx| async move {
 248            server::Entity::delete_many()
 249                .filter(
 250                    Condition::all()
 251                        .add(server::Column::Environment.eq(environment))
 252                        .add(server::Column::Id.ne(new_server_id)),
 253                )
 254                .exec(&*tx)
 255                .await?;
 256            Ok(())
 257        })
 258        .await
 259    }
 260
 261    async fn stale_server_ids(
 262        &self,
 263        environment: &str,
 264        new_server_id: ServerId,
 265        tx: &DatabaseTransaction,
 266    ) -> Result<Vec<ServerId>> {
 267        let stale_servers = server::Entity::find()
 268            .filter(
 269                Condition::all()
 270                    .add(server::Column::Environment.eq(environment))
 271                    .add(server::Column::Id.ne(new_server_id)),
 272            )
 273            .all(&*tx)
 274            .await?;
 275        Ok(stale_servers.into_iter().map(|server| server.id).collect())
 276    }
 277
 278    // users
 279
 280    pub async fn create_user(
 281        &self,
 282        email_address: &str,
 283        admin: bool,
 284        params: NewUserParams,
 285    ) -> Result<NewUserResult> {
 286        self.transaction(|tx| async {
 287            let tx = tx;
 288            let user = user::Entity::insert(user::ActiveModel {
 289                email_address: ActiveValue::set(Some(email_address.into())),
 290                github_login: ActiveValue::set(params.github_login.clone()),
 291                github_user_id: ActiveValue::set(Some(params.github_user_id)),
 292                admin: ActiveValue::set(admin),
 293                metrics_id: ActiveValue::set(Uuid::new_v4()),
 294                ..Default::default()
 295            })
 296            .on_conflict(
 297                OnConflict::column(user::Column::GithubLogin)
 298                    .update_column(user::Column::GithubLogin)
 299                    .to_owned(),
 300            )
 301            .exec_with_returning(&*tx)
 302            .await?;
 303
 304            Ok(NewUserResult {
 305                user_id: user.id,
 306                metrics_id: user.metrics_id.to_string(),
 307                signup_device_id: None,
 308                inviting_user_id: None,
 309            })
 310        })
 311        .await
 312    }
 313
 314    pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
 315        self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
 316            .await
 317    }
 318
 319    pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
 320        self.transaction(|tx| async {
 321            let tx = tx;
 322            Ok(user::Entity::find()
 323                .filter(user::Column::Id.is_in(ids.iter().copied()))
 324                .all(&*tx)
 325                .await?)
 326        })
 327        .await
 328    }
 329
 330    pub async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>> {
 331        self.transaction(|tx| async move {
 332            Ok(user::Entity::find()
 333                .filter(user::Column::GithubLogin.eq(github_login))
 334                .one(&*tx)
 335                .await?)
 336        })
 337        .await
 338    }
 339
 340    pub async fn get_or_create_user_by_github_account(
 341        &self,
 342        github_login: &str,
 343        github_user_id: Option<i32>,
 344        github_email: Option<&str>,
 345    ) -> Result<Option<User>> {
 346        self.transaction(|tx| async move {
 347            let tx = &*tx;
 348            if let Some(github_user_id) = github_user_id {
 349                if let Some(user_by_github_user_id) = user::Entity::find()
 350                    .filter(user::Column::GithubUserId.eq(github_user_id))
 351                    .one(tx)
 352                    .await?
 353                {
 354                    let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
 355                    user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
 356                    Ok(Some(user_by_github_user_id.update(tx).await?))
 357                } else if let Some(user_by_github_login) = user::Entity::find()
 358                    .filter(user::Column::GithubLogin.eq(github_login))
 359                    .one(tx)
 360                    .await?
 361                {
 362                    let mut user_by_github_login = user_by_github_login.into_active_model();
 363                    user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
 364                    Ok(Some(user_by_github_login.update(tx).await?))
 365                } else {
 366                    let user = user::Entity::insert(user::ActiveModel {
 367                        email_address: ActiveValue::set(github_email.map(|email| email.into())),
 368                        github_login: ActiveValue::set(github_login.into()),
 369                        github_user_id: ActiveValue::set(Some(github_user_id)),
 370                        admin: ActiveValue::set(false),
 371                        invite_count: ActiveValue::set(0),
 372                        invite_code: ActiveValue::set(None),
 373                        metrics_id: ActiveValue::set(Uuid::new_v4()),
 374                        ..Default::default()
 375                    })
 376                    .exec_with_returning(&*tx)
 377                    .await?;
 378                    Ok(Some(user))
 379                }
 380            } else {
 381                Ok(user::Entity::find()
 382                    .filter(user::Column::GithubLogin.eq(github_login))
 383                    .one(tx)
 384                    .await?)
 385            }
 386        })
 387        .await
 388    }
 389
 390    pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
 391        self.transaction(|tx| async move {
 392            Ok(user::Entity::find()
 393                .order_by_asc(user::Column::GithubLogin)
 394                .limit(limit as u64)
 395                .offset(page as u64 * limit as u64)
 396                .all(&*tx)
 397                .await?)
 398        })
 399        .await
 400    }
 401
 402    pub async fn get_users_with_no_invites(
 403        &self,
 404        invited_by_another_user: bool,
 405    ) -> Result<Vec<User>> {
 406        self.transaction(|tx| async move {
 407            Ok(user::Entity::find()
 408                .filter(
 409                    user::Column::InviteCount
 410                        .eq(0)
 411                        .and(if invited_by_another_user {
 412                            user::Column::InviterId.is_not_null()
 413                        } else {
 414                            user::Column::InviterId.is_null()
 415                        }),
 416                )
 417                .all(&*tx)
 418                .await?)
 419        })
 420        .await
 421    }
 422
 423    pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
 424        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 425        enum QueryAs {
 426            MetricsId,
 427        }
 428
 429        self.transaction(|tx| async move {
 430            let metrics_id: Uuid = user::Entity::find_by_id(id)
 431                .select_only()
 432                .column(user::Column::MetricsId)
 433                .into_values::<_, QueryAs>()
 434                .one(&*tx)
 435                .await?
 436                .ok_or_else(|| anyhow!("could not find user"))?;
 437            Ok(metrics_id.to_string())
 438        })
 439        .await
 440    }
 441
 442    pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
 443        self.transaction(|tx| async move {
 444            user::Entity::update_many()
 445                .filter(user::Column::Id.eq(id))
 446                .set(user::ActiveModel {
 447                    admin: ActiveValue::set(is_admin),
 448                    ..Default::default()
 449                })
 450                .exec(&*tx)
 451                .await?;
 452            Ok(())
 453        })
 454        .await
 455    }
 456
 457    pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
 458        self.transaction(|tx| async move {
 459            user::Entity::update_many()
 460                .filter(user::Column::Id.eq(id))
 461                .set(user::ActiveModel {
 462                    connected_once: ActiveValue::set(connected_once),
 463                    ..Default::default()
 464                })
 465                .exec(&*tx)
 466                .await?;
 467            Ok(())
 468        })
 469        .await
 470    }
 471
 472    pub async fn destroy_user(&self, id: UserId) -> Result<()> {
 473        self.transaction(|tx| async move {
 474            access_token::Entity::delete_many()
 475                .filter(access_token::Column::UserId.eq(id))
 476                .exec(&*tx)
 477                .await?;
 478            user::Entity::delete_by_id(id).exec(&*tx).await?;
 479            Ok(())
 480        })
 481        .await
 482    }
 483
 484    // contacts
 485
 486    pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
 487        #[derive(Debug, FromQueryResult)]
 488        struct ContactWithUserBusyStatuses {
 489            user_id_a: UserId,
 490            user_id_b: UserId,
 491            a_to_b: bool,
 492            accepted: bool,
 493            should_notify: bool,
 494            user_a_busy: bool,
 495            user_b_busy: bool,
 496        }
 497
 498        self.transaction(|tx| async move {
 499            let user_a_participant = Alias::new("user_a_participant");
 500            let user_b_participant = Alias::new("user_b_participant");
 501            let mut db_contacts = contact::Entity::find()
 502                .column_as(
 503                    Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
 504                        .is_not_null(),
 505                    "user_a_busy",
 506                )
 507                .column_as(
 508                    Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
 509                        .is_not_null(),
 510                    "user_b_busy",
 511                )
 512                .filter(
 513                    contact::Column::UserIdA
 514                        .eq(user_id)
 515                        .or(contact::Column::UserIdB.eq(user_id)),
 516                )
 517                .join_as(
 518                    JoinType::LeftJoin,
 519                    contact::Relation::UserARoomParticipant.def(),
 520                    user_a_participant,
 521                )
 522                .join_as(
 523                    JoinType::LeftJoin,
 524                    contact::Relation::UserBRoomParticipant.def(),
 525                    user_b_participant,
 526                )
 527                .into_model::<ContactWithUserBusyStatuses>()
 528                .stream(&*tx)
 529                .await?;
 530
 531            let mut contacts = Vec::new();
 532            while let Some(db_contact) = db_contacts.next().await {
 533                let db_contact = db_contact?;
 534                if db_contact.user_id_a == user_id {
 535                    if db_contact.accepted {
 536                        contacts.push(Contact::Accepted {
 537                            user_id: db_contact.user_id_b,
 538                            should_notify: db_contact.should_notify && db_contact.a_to_b,
 539                            busy: db_contact.user_b_busy,
 540                        });
 541                    } else if db_contact.a_to_b {
 542                        contacts.push(Contact::Outgoing {
 543                            user_id: db_contact.user_id_b,
 544                        })
 545                    } else {
 546                        contacts.push(Contact::Incoming {
 547                            user_id: db_contact.user_id_b,
 548                            should_notify: db_contact.should_notify,
 549                        });
 550                    }
 551                } else if db_contact.accepted {
 552                    contacts.push(Contact::Accepted {
 553                        user_id: db_contact.user_id_a,
 554                        should_notify: db_contact.should_notify && !db_contact.a_to_b,
 555                        busy: db_contact.user_a_busy,
 556                    });
 557                } else if db_contact.a_to_b {
 558                    contacts.push(Contact::Incoming {
 559                        user_id: db_contact.user_id_a,
 560                        should_notify: db_contact.should_notify,
 561                    });
 562                } else {
 563                    contacts.push(Contact::Outgoing {
 564                        user_id: db_contact.user_id_a,
 565                    });
 566                }
 567            }
 568
 569            contacts.sort_unstable_by_key(|contact| contact.user_id());
 570
 571            Ok(contacts)
 572        })
 573        .await
 574    }
 575
 576    pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
 577        self.transaction(|tx| async move {
 578            let participant = room_participant::Entity::find()
 579                .filter(room_participant::Column::UserId.eq(user_id))
 580                .one(&*tx)
 581                .await?;
 582            Ok(participant.is_some())
 583        })
 584        .await
 585    }
 586
 587    pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
 588        self.transaction(|tx| async move {
 589            let (id_a, id_b) = if user_id_1 < user_id_2 {
 590                (user_id_1, user_id_2)
 591            } else {
 592                (user_id_2, user_id_1)
 593            };
 594
 595            Ok(contact::Entity::find()
 596                .filter(
 597                    contact::Column::UserIdA
 598                        .eq(id_a)
 599                        .and(contact::Column::UserIdB.eq(id_b))
 600                        .and(contact::Column::Accepted.eq(true)),
 601                )
 602                .one(&*tx)
 603                .await?
 604                .is_some())
 605        })
 606        .await
 607    }
 608
 609    pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
 610        self.transaction(|tx| async move {
 611            let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
 612                (sender_id, receiver_id, true)
 613            } else {
 614                (receiver_id, sender_id, false)
 615            };
 616
 617            let rows_affected = contact::Entity::insert(contact::ActiveModel {
 618                user_id_a: ActiveValue::set(id_a),
 619                user_id_b: ActiveValue::set(id_b),
 620                a_to_b: ActiveValue::set(a_to_b),
 621                accepted: ActiveValue::set(false),
 622                should_notify: ActiveValue::set(true),
 623                ..Default::default()
 624            })
 625            .on_conflict(
 626                OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
 627                    .values([
 628                        (contact::Column::Accepted, true.into()),
 629                        (contact::Column::ShouldNotify, false.into()),
 630                    ])
 631                    .action_and_where(
 632                        contact::Column::Accepted.eq(false).and(
 633                            contact::Column::AToB
 634                                .eq(a_to_b)
 635                                .and(contact::Column::UserIdA.eq(id_b))
 636                                .or(contact::Column::AToB
 637                                    .ne(a_to_b)
 638                                    .and(contact::Column::UserIdA.eq(id_a))),
 639                        ),
 640                    )
 641                    .to_owned(),
 642            )
 643            .exec_without_returning(&*tx)
 644            .await?;
 645
 646            if rows_affected == 1 {
 647                Ok(())
 648            } else {
 649                Err(anyhow!("contact already requested"))?
 650            }
 651        })
 652        .await
 653    }
 654
 655    /// Returns a bool indicating whether the removed contact had originally accepted or not
 656    ///
 657    /// Deletes the contact identified by the requester and responder ids, and then returns
 658    /// whether the deleted contact had originally accepted or was a pending contact request.
 659    ///
 660    /// # Arguments
 661    ///
 662    /// * `requester_id` - The user that initiates this request
 663    /// * `responder_id` - The user that will be removed
 664    pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<bool> {
 665        self.transaction(|tx| async move {
 666            let (id_a, id_b) = if responder_id < requester_id {
 667                (responder_id, requester_id)
 668            } else {
 669                (requester_id, responder_id)
 670            };
 671
 672            let contact = contact::Entity::find()
 673                .filter(
 674                    contact::Column::UserIdA
 675                        .eq(id_a)
 676                        .and(contact::Column::UserIdB.eq(id_b)),
 677                )
 678                .one(&*tx)
 679                .await?
 680                .ok_or_else(|| anyhow!("no such contact"))?;
 681
 682            contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
 683            Ok(contact.accepted)
 684        })
 685        .await
 686    }
 687
 688    pub async fn dismiss_contact_notification(
 689        &self,
 690        user_id: UserId,
 691        contact_user_id: UserId,
 692    ) -> Result<()> {
 693        self.transaction(|tx| async move {
 694            let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
 695                (user_id, contact_user_id, true)
 696            } else {
 697                (contact_user_id, user_id, false)
 698            };
 699
 700            let result = contact::Entity::update_many()
 701                .set(contact::ActiveModel {
 702                    should_notify: ActiveValue::set(false),
 703                    ..Default::default()
 704                })
 705                .filter(
 706                    contact::Column::UserIdA
 707                        .eq(id_a)
 708                        .and(contact::Column::UserIdB.eq(id_b))
 709                        .and(
 710                            contact::Column::AToB
 711                                .eq(a_to_b)
 712                                .and(contact::Column::Accepted.eq(true))
 713                                .or(contact::Column::AToB
 714                                    .ne(a_to_b)
 715                                    .and(contact::Column::Accepted.eq(false))),
 716                        ),
 717                )
 718                .exec(&*tx)
 719                .await?;
 720            if result.rows_affected == 0 {
 721                Err(anyhow!("no such contact request"))?
 722            } else {
 723                Ok(())
 724            }
 725        })
 726        .await
 727    }
 728
 729    pub async fn respond_to_contact_request(
 730        &self,
 731        responder_id: UserId,
 732        requester_id: UserId,
 733        accept: bool,
 734    ) -> Result<()> {
 735        self.transaction(|tx| async move {
 736            let (id_a, id_b, a_to_b) = if responder_id < requester_id {
 737                (responder_id, requester_id, false)
 738            } else {
 739                (requester_id, responder_id, true)
 740            };
 741            let rows_affected = if accept {
 742                let result = contact::Entity::update_many()
 743                    .set(contact::ActiveModel {
 744                        accepted: ActiveValue::set(true),
 745                        should_notify: ActiveValue::set(true),
 746                        ..Default::default()
 747                    })
 748                    .filter(
 749                        contact::Column::UserIdA
 750                            .eq(id_a)
 751                            .and(contact::Column::UserIdB.eq(id_b))
 752                            .and(contact::Column::AToB.eq(a_to_b)),
 753                    )
 754                    .exec(&*tx)
 755                    .await?;
 756                result.rows_affected
 757            } else {
 758                let result = contact::Entity::delete_many()
 759                    .filter(
 760                        contact::Column::UserIdA
 761                            .eq(id_a)
 762                            .and(contact::Column::UserIdB.eq(id_b))
 763                            .and(contact::Column::AToB.eq(a_to_b))
 764                            .and(contact::Column::Accepted.eq(false)),
 765                    )
 766                    .exec(&*tx)
 767                    .await?;
 768
 769                result.rows_affected
 770            };
 771
 772            if rows_affected == 1 {
 773                Ok(())
 774            } else {
 775                Err(anyhow!("no such contact request"))?
 776            }
 777        })
 778        .await
 779    }
 780
 781    pub fn fuzzy_like_string(string: &str) -> String {
 782        let mut result = String::with_capacity(string.len() * 2 + 1);
 783        for c in string.chars() {
 784            if c.is_alphanumeric() {
 785                result.push('%');
 786                result.push(c);
 787            }
 788        }
 789        result.push('%');
 790        result
 791    }
 792
 793    pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
 794        self.transaction(|tx| async {
 795            let tx = tx;
 796            let like_string = Self::fuzzy_like_string(name_query);
 797            let query = "
 798                SELECT users.*
 799                FROM users
 800                WHERE github_login ILIKE $1
 801                ORDER BY github_login <-> $2
 802                LIMIT $3
 803            ";
 804
 805            Ok(user::Entity::find()
 806                .from_raw_sql(Statement::from_sql_and_values(
 807                    self.pool.get_database_backend(),
 808                    query.into(),
 809                    vec![like_string.into(), name_query.into(), limit.into()],
 810                ))
 811                .all(&*tx)
 812                .await?)
 813        })
 814        .await
 815    }
 816
 817    // signups
 818
 819    pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
 820        self.transaction(|tx| async move {
 821            signup::Entity::insert(signup::ActiveModel {
 822                email_address: ActiveValue::set(signup.email_address.clone()),
 823                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 824                email_confirmation_sent: ActiveValue::set(false),
 825                platform_mac: ActiveValue::set(signup.platform_mac),
 826                platform_windows: ActiveValue::set(signup.platform_windows),
 827                platform_linux: ActiveValue::set(signup.platform_linux),
 828                platform_unknown: ActiveValue::set(false),
 829                editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
 830                programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
 831                device_id: ActiveValue::set(signup.device_id.clone()),
 832                added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
 833                ..Default::default()
 834            })
 835            .on_conflict(
 836                OnConflict::column(signup::Column::EmailAddress)
 837                    .update_columns([
 838                        signup::Column::PlatformMac,
 839                        signup::Column::PlatformWindows,
 840                        signup::Column::PlatformLinux,
 841                        signup::Column::EditorFeatures,
 842                        signup::Column::ProgrammingLanguages,
 843                        signup::Column::DeviceId,
 844                        signup::Column::AddedToMailingList,
 845                    ])
 846                    .to_owned(),
 847            )
 848            .exec(&*tx)
 849            .await?;
 850            Ok(())
 851        })
 852        .await
 853    }
 854
 855    pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
 856        self.transaction(|tx| async move {
 857            let signup = signup::Entity::find()
 858                .filter(signup::Column::EmailAddress.eq(email_address))
 859                .one(&*tx)
 860                .await?
 861                .ok_or_else(|| {
 862                    anyhow!("signup with email address {} doesn't exist", email_address)
 863                })?;
 864
 865            Ok(signup)
 866        })
 867        .await
 868    }
 869
 870    pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
 871        self.transaction(|tx| async move {
 872            let query = "
 873                SELECT
 874                    COUNT(*) as count,
 875                    COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
 876                    COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
 877                    COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
 878                    COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
 879                FROM (
 880                    SELECT *
 881                    FROM signups
 882                    WHERE
 883                        NOT email_confirmation_sent
 884                ) AS unsent
 885            ";
 886            Ok(
 887                WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
 888                    self.pool.get_database_backend(),
 889                    query.into(),
 890                    vec![],
 891                ))
 892                .one(&*tx)
 893                .await?
 894                .ok_or_else(|| anyhow!("invalid result"))?,
 895            )
 896        })
 897        .await
 898    }
 899
 900    pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
 901        let emails = invites
 902            .iter()
 903            .map(|s| s.email_address.as_str())
 904            .collect::<Vec<_>>();
 905        self.transaction(|tx| async {
 906            let tx = tx;
 907            signup::Entity::update_many()
 908                .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
 909                .set(signup::ActiveModel {
 910                    email_confirmation_sent: ActiveValue::set(true),
 911                    ..Default::default()
 912                })
 913                .exec(&*tx)
 914                .await?;
 915            Ok(())
 916        })
 917        .await
 918    }
 919
 920    pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
 921        self.transaction(|tx| async move {
 922            Ok(signup::Entity::find()
 923                .select_only()
 924                .column(signup::Column::EmailAddress)
 925                .column(signup::Column::EmailConfirmationCode)
 926                .filter(
 927                    signup::Column::EmailConfirmationSent.eq(false).and(
 928                        signup::Column::PlatformMac
 929                            .eq(true)
 930                            .or(signup::Column::PlatformUnknown.eq(true)),
 931                    ),
 932                )
 933                .order_by_asc(signup::Column::CreatedAt)
 934                .limit(count as u64)
 935                .into_model()
 936                .all(&*tx)
 937                .await?)
 938        })
 939        .await
 940    }
 941
 942    // invite codes
 943
 944    pub async fn create_invite_from_code(
 945        &self,
 946        code: &str,
 947        email_address: &str,
 948        device_id: Option<&str>,
 949        added_to_mailing_list: bool,
 950    ) -> Result<Invite> {
 951        self.transaction(|tx| async move {
 952            let existing_user = user::Entity::find()
 953                .filter(user::Column::EmailAddress.eq(email_address))
 954                .one(&*tx)
 955                .await?;
 956
 957            if existing_user.is_some() {
 958                Err(anyhow!("email address is already in use"))?;
 959            }
 960
 961            let inviting_user_with_invites = match user::Entity::find()
 962                .filter(
 963                    user::Column::InviteCode
 964                        .eq(code)
 965                        .and(user::Column::InviteCount.gt(0)),
 966                )
 967                .one(&*tx)
 968                .await?
 969            {
 970                Some(inviting_user) => inviting_user,
 971                None => {
 972                    return Err(Error::Http(
 973                        StatusCode::UNAUTHORIZED,
 974                        "unable to find an invite code with invites remaining".to_string(),
 975                    ))?
 976                }
 977            };
 978            user::Entity::update_many()
 979                .filter(
 980                    user::Column::Id
 981                        .eq(inviting_user_with_invites.id)
 982                        .and(user::Column::InviteCount.gt(0)),
 983                )
 984                .col_expr(
 985                    user::Column::InviteCount,
 986                    Expr::col(user::Column::InviteCount).sub(1),
 987                )
 988                .exec(&*tx)
 989                .await?;
 990
 991            let signup = signup::Entity::insert(signup::ActiveModel {
 992                email_address: ActiveValue::set(email_address.into()),
 993                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 994                email_confirmation_sent: ActiveValue::set(false),
 995                inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
 996                platform_linux: ActiveValue::set(false),
 997                platform_mac: ActiveValue::set(false),
 998                platform_windows: ActiveValue::set(false),
 999                platform_unknown: ActiveValue::set(true),
1000                device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
1001                added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
1002                ..Default::default()
1003            })
1004            .on_conflict(
1005                OnConflict::column(signup::Column::EmailAddress)
1006                    .update_column(signup::Column::InvitingUserId)
1007                    .to_owned(),
1008            )
1009            .exec_with_returning(&*tx)
1010            .await?;
1011
1012            Ok(Invite {
1013                email_address: signup.email_address,
1014                email_confirmation_code: signup.email_confirmation_code,
1015            })
1016        })
1017        .await
1018    }
1019
1020    pub async fn create_user_from_invite(
1021        &self,
1022        invite: &Invite,
1023        user: NewUserParams,
1024    ) -> Result<Option<NewUserResult>> {
1025        self.transaction(|tx| async {
1026            let tx = tx;
1027            let signup = signup::Entity::find()
1028                .filter(
1029                    signup::Column::EmailAddress
1030                        .eq(invite.email_address.as_str())
1031                        .and(
1032                            signup::Column::EmailConfirmationCode
1033                                .eq(invite.email_confirmation_code.as_str()),
1034                        ),
1035                )
1036                .one(&*tx)
1037                .await?
1038                .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
1039
1040            if signup.user_id.is_some() {
1041                return Ok(None);
1042            }
1043
1044            let user = user::Entity::insert(user::ActiveModel {
1045                email_address: ActiveValue::set(Some(invite.email_address.clone())),
1046                github_login: ActiveValue::set(user.github_login.clone()),
1047                github_user_id: ActiveValue::set(Some(user.github_user_id)),
1048                admin: ActiveValue::set(false),
1049                invite_count: ActiveValue::set(user.invite_count),
1050                invite_code: ActiveValue::set(Some(random_invite_code())),
1051                metrics_id: ActiveValue::set(Uuid::new_v4()),
1052                ..Default::default()
1053            })
1054            .on_conflict(
1055                OnConflict::column(user::Column::GithubLogin)
1056                    .update_columns([
1057                        user::Column::EmailAddress,
1058                        user::Column::GithubUserId,
1059                        user::Column::Admin,
1060                    ])
1061                    .to_owned(),
1062            )
1063            .exec_with_returning(&*tx)
1064            .await?;
1065
1066            let mut signup = signup.into_active_model();
1067            signup.user_id = ActiveValue::set(Some(user.id));
1068            let signup = signup.update(&*tx).await?;
1069
1070            if let Some(inviting_user_id) = signup.inviting_user_id {
1071                let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1072                    (inviting_user_id, user.id, true)
1073                } else {
1074                    (user.id, inviting_user_id, false)
1075                };
1076
1077                contact::Entity::insert(contact::ActiveModel {
1078                    user_id_a: ActiveValue::set(user_id_a),
1079                    user_id_b: ActiveValue::set(user_id_b),
1080                    a_to_b: ActiveValue::set(a_to_b),
1081                    should_notify: ActiveValue::set(true),
1082                    accepted: ActiveValue::set(true),
1083                    ..Default::default()
1084                })
1085                .on_conflict(OnConflict::new().do_nothing().to_owned())
1086                .exec_without_returning(&*tx)
1087                .await?;
1088            }
1089
1090            Ok(Some(NewUserResult {
1091                user_id: user.id,
1092                metrics_id: user.metrics_id.to_string(),
1093                inviting_user_id: signup.inviting_user_id,
1094                signup_device_id: signup.device_id,
1095            }))
1096        })
1097        .await
1098    }
1099
1100    pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1101        self.transaction(|tx| async move {
1102            if count > 0 {
1103                user::Entity::update_many()
1104                    .filter(
1105                        user::Column::Id
1106                            .eq(id)
1107                            .and(user::Column::InviteCode.is_null()),
1108                    )
1109                    .set(user::ActiveModel {
1110                        invite_code: ActiveValue::set(Some(random_invite_code())),
1111                        ..Default::default()
1112                    })
1113                    .exec(&*tx)
1114                    .await?;
1115            }
1116
1117            user::Entity::update_many()
1118                .filter(user::Column::Id.eq(id))
1119                .set(user::ActiveModel {
1120                    invite_count: ActiveValue::set(count),
1121                    ..Default::default()
1122                })
1123                .exec(&*tx)
1124                .await?;
1125            Ok(())
1126        })
1127        .await
1128    }
1129
1130    pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1131        self.transaction(|tx| async move {
1132            match user::Entity::find_by_id(id).one(&*tx).await? {
1133                Some(user) if user.invite_code.is_some() => {
1134                    Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1135                }
1136                _ => Ok(None),
1137            }
1138        })
1139        .await
1140    }
1141
1142    pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1143        self.transaction(|tx| async move {
1144            user::Entity::find()
1145                .filter(user::Column::InviteCode.eq(code))
1146                .one(&*tx)
1147                .await?
1148                .ok_or_else(|| {
1149                    Error::Http(
1150                        StatusCode::NOT_FOUND,
1151                        "that invite code does not exist".to_string(),
1152                    )
1153                })
1154        })
1155        .await
1156    }
1157
1158    // rooms
1159
1160    pub async fn incoming_call_for_user(
1161        &self,
1162        user_id: UserId,
1163    ) -> Result<Option<proto::IncomingCall>> {
1164        self.transaction(|tx| async move {
1165            let pending_participant = room_participant::Entity::find()
1166                .filter(
1167                    room_participant::Column::UserId
1168                        .eq(user_id)
1169                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1170                )
1171                .one(&*tx)
1172                .await?;
1173
1174            if let Some(pending_participant) = pending_participant {
1175                let room = self.get_room(pending_participant.room_id, &tx).await?;
1176                Ok(Self::build_incoming_call(&room, user_id))
1177            } else {
1178                Ok(None)
1179            }
1180        })
1181        .await
1182    }
1183
1184    pub async fn create_room(
1185        &self,
1186        user_id: UserId,
1187        connection: ConnectionId,
1188        live_kit_room: &str,
1189    ) -> Result<proto::Room> {
1190        self.transaction(|tx| async move {
1191            let room = room::ActiveModel {
1192                live_kit_room: ActiveValue::set(live_kit_room.into()),
1193                ..Default::default()
1194            }
1195            .insert(&*tx)
1196            .await?;
1197            room_participant::ActiveModel {
1198                room_id: ActiveValue::set(room.id),
1199                user_id: ActiveValue::set(user_id),
1200                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1201                answering_connection_server_id: ActiveValue::set(Some(ServerId(
1202                    connection.owner_id as i32,
1203                ))),
1204                answering_connection_lost: ActiveValue::set(false),
1205                calling_user_id: ActiveValue::set(user_id),
1206                calling_connection_id: ActiveValue::set(connection.id as i32),
1207                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1208                    connection.owner_id as i32,
1209                ))),
1210                ..Default::default()
1211            }
1212            .insert(&*tx)
1213            .await?;
1214
1215            let room = self.get_room(room.id, &tx).await?;
1216            Ok(room)
1217        })
1218        .await
1219    }
1220
1221    pub async fn call(
1222        &self,
1223        room_id: RoomId,
1224        calling_user_id: UserId,
1225        calling_connection: ConnectionId,
1226        called_user_id: UserId,
1227        initial_project_id: Option<ProjectId>,
1228    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1229        self.room_transaction(room_id, |tx| async move {
1230            room_participant::ActiveModel {
1231                room_id: ActiveValue::set(room_id),
1232                user_id: ActiveValue::set(called_user_id),
1233                answering_connection_lost: ActiveValue::set(false),
1234                calling_user_id: ActiveValue::set(calling_user_id),
1235                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1236                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1237                    calling_connection.owner_id as i32,
1238                ))),
1239                initial_project_id: ActiveValue::set(initial_project_id),
1240                ..Default::default()
1241            }
1242            .insert(&*tx)
1243            .await?;
1244
1245            let room = self.get_room(room_id, &tx).await?;
1246            let incoming_call = Self::build_incoming_call(&room, called_user_id)
1247                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1248            Ok((room, incoming_call))
1249        })
1250        .await
1251    }
1252
1253    pub async fn call_failed(
1254        &self,
1255        room_id: RoomId,
1256        called_user_id: UserId,
1257    ) -> Result<RoomGuard<proto::Room>> {
1258        self.room_transaction(room_id, |tx| async move {
1259            room_participant::Entity::delete_many()
1260                .filter(
1261                    room_participant::Column::RoomId
1262                        .eq(room_id)
1263                        .and(room_participant::Column::UserId.eq(called_user_id)),
1264                )
1265                .exec(&*tx)
1266                .await?;
1267            let room = self.get_room(room_id, &tx).await?;
1268            Ok(room)
1269        })
1270        .await
1271    }
1272
1273    pub async fn decline_call(
1274        &self,
1275        expected_room_id: Option<RoomId>,
1276        user_id: UserId,
1277    ) -> Result<Option<RoomGuard<proto::Room>>> {
1278        self.optional_room_transaction(|tx| async move {
1279            let mut filter = Condition::all()
1280                .add(room_participant::Column::UserId.eq(user_id))
1281                .add(room_participant::Column::AnsweringConnectionId.is_null());
1282            if let Some(room_id) = expected_room_id {
1283                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1284            }
1285            let participant = room_participant::Entity::find()
1286                .filter(filter)
1287                .one(&*tx)
1288                .await?;
1289
1290            let participant = if let Some(participant) = participant {
1291                participant
1292            } else if expected_room_id.is_some() {
1293                return Err(anyhow!("could not find call to decline"))?;
1294            } else {
1295                return Ok(None);
1296            };
1297
1298            let room_id = participant.room_id;
1299            room_participant::Entity::delete(participant.into_active_model())
1300                .exec(&*tx)
1301                .await?;
1302
1303            let room = self.get_room(room_id, &tx).await?;
1304            Ok(Some((room_id, room)))
1305        })
1306        .await
1307    }
1308
1309    pub async fn cancel_call(
1310        &self,
1311        room_id: RoomId,
1312        calling_connection: ConnectionId,
1313        called_user_id: UserId,
1314    ) -> Result<RoomGuard<proto::Room>> {
1315        self.room_transaction(room_id, |tx| async move {
1316            let participant = room_participant::Entity::find()
1317                .filter(
1318                    Condition::all()
1319                        .add(room_participant::Column::UserId.eq(called_user_id))
1320                        .add(room_participant::Column::RoomId.eq(room_id))
1321                        .add(
1322                            room_participant::Column::CallingConnectionId
1323                                .eq(calling_connection.id as i32),
1324                        )
1325                        .add(
1326                            room_participant::Column::CallingConnectionServerId
1327                                .eq(calling_connection.owner_id as i32),
1328                        )
1329                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1330                )
1331                .one(&*tx)
1332                .await?
1333                .ok_or_else(|| anyhow!("no call to cancel"))?;
1334
1335            room_participant::Entity::delete(participant.into_active_model())
1336                .exec(&*tx)
1337                .await?;
1338
1339            let room = self.get_room(room_id, &tx).await?;
1340            Ok(room)
1341        })
1342        .await
1343    }
1344
1345    pub async fn join_room(
1346        &self,
1347        room_id: RoomId,
1348        user_id: UserId,
1349        channel_id: Option<ChannelId>,
1350        connection: ConnectionId,
1351    ) -> Result<RoomGuard<JoinRoom>> {
1352        self.room_transaction(room_id, |tx| async move {
1353            if let Some(channel_id) = channel_id {
1354                channel_member::Entity::find()
1355                    .filter(
1356                        channel_member::Column::ChannelId
1357                            .eq(channel_id)
1358                            .and(channel_member::Column::UserId.eq(user_id))
1359                            .and(channel_member::Column::Accepted.eq(true)),
1360                    )
1361                    .one(&*tx)
1362                    .await?
1363                    .ok_or_else(|| anyhow!("no such channel membership"))?;
1364
1365                room_participant::ActiveModel {
1366                    room_id: ActiveValue::set(room_id),
1367                    user_id: ActiveValue::set(user_id),
1368                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1369                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1370                        connection.owner_id as i32,
1371                    ))),
1372                    answering_connection_lost: ActiveValue::set(false),
1373                    // Redundant for the channel join use case, used for channel and call invitations
1374                    calling_user_id: ActiveValue::set(user_id),
1375                    calling_connection_id: ActiveValue::set(connection.id as i32),
1376                    calling_connection_server_id: ActiveValue::set(Some(ServerId(
1377                        connection.owner_id as i32,
1378                    ))),
1379                    ..Default::default()
1380                }
1381                .insert(&*tx)
1382                .await?;
1383            } else {
1384                let result = room_participant::Entity::update_many()
1385                    .filter(
1386                        Condition::all()
1387                            .add(room_participant::Column::RoomId.eq(room_id))
1388                            .add(room_participant::Column::UserId.eq(user_id))
1389                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
1390                    )
1391                    .set(room_participant::ActiveModel {
1392                        answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1393                        answering_connection_server_id: ActiveValue::set(Some(ServerId(
1394                            connection.owner_id as i32,
1395                        ))),
1396                        answering_connection_lost: ActiveValue::set(false),
1397                        ..Default::default()
1398                    })
1399                    .exec(&*tx)
1400                    .await?;
1401                if result.rows_affected == 0 {
1402                    Err(anyhow!("room does not exist or was already joined"))?;
1403                }
1404            }
1405
1406            let room = self.get_room(room_id, &tx).await?;
1407            let channel_members = if let Some(channel_id) = channel_id {
1408                self.get_channel_members_internal(channel_id, &tx).await?
1409            } else {
1410                Vec::new()
1411            };
1412            Ok(JoinRoom {
1413                room,
1414                channel_id,
1415                channel_members,
1416            })
1417        })
1418        .await
1419    }
1420
1421    pub async fn rejoin_room(
1422        &self,
1423        rejoin_room: proto::RejoinRoom,
1424        user_id: UserId,
1425        connection: ConnectionId,
1426    ) -> Result<RoomGuard<RejoinedRoom>> {
1427        let room_id = RoomId::from_proto(rejoin_room.id);
1428        self.room_transaction(room_id, |tx| async {
1429            let tx = tx;
1430            let participant_update = room_participant::Entity::update_many()
1431                .filter(
1432                    Condition::all()
1433                        .add(room_participant::Column::RoomId.eq(room_id))
1434                        .add(room_participant::Column::UserId.eq(user_id))
1435                        .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1436                        .add(
1437                            Condition::any()
1438                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1439                                .add(
1440                                    room_participant::Column::AnsweringConnectionServerId
1441                                        .ne(connection.owner_id as i32),
1442                                ),
1443                        ),
1444                )
1445                .set(room_participant::ActiveModel {
1446                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1447                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1448                        connection.owner_id as i32,
1449                    ))),
1450                    answering_connection_lost: ActiveValue::set(false),
1451                    ..Default::default()
1452                })
1453                .exec(&*tx)
1454                .await?;
1455            if participant_update.rows_affected == 0 {
1456                return Err(anyhow!("room does not exist or was already joined"))?;
1457            }
1458
1459            let mut reshared_projects = Vec::new();
1460            for reshared_project in &rejoin_room.reshared_projects {
1461                let project_id = ProjectId::from_proto(reshared_project.project_id);
1462                let project = project::Entity::find_by_id(project_id)
1463                    .one(&*tx)
1464                    .await?
1465                    .ok_or_else(|| anyhow!("project does not exist"))?;
1466                if project.host_user_id != user_id {
1467                    return Err(anyhow!("no such project"))?;
1468                }
1469
1470                let mut collaborators = project
1471                    .find_related(project_collaborator::Entity)
1472                    .all(&*tx)
1473                    .await?;
1474                let host_ix = collaborators
1475                    .iter()
1476                    .position(|collaborator| {
1477                        collaborator.user_id == user_id && collaborator.is_host
1478                    })
1479                    .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1480                let host = collaborators.swap_remove(host_ix);
1481                let old_connection_id = host.connection();
1482
1483                project::Entity::update(project::ActiveModel {
1484                    host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1485                    host_connection_server_id: ActiveValue::set(Some(ServerId(
1486                        connection.owner_id as i32,
1487                    ))),
1488                    ..project.into_active_model()
1489                })
1490                .exec(&*tx)
1491                .await?;
1492                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1493                    connection_id: ActiveValue::set(connection.id as i32),
1494                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1495                    ..host.into_active_model()
1496                })
1497                .exec(&*tx)
1498                .await?;
1499
1500                self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1501                    .await?;
1502
1503                reshared_projects.push(ResharedProject {
1504                    id: project_id,
1505                    old_connection_id,
1506                    collaborators: collaborators
1507                        .iter()
1508                        .map(|collaborator| ProjectCollaborator {
1509                            connection_id: collaborator.connection(),
1510                            user_id: collaborator.user_id,
1511                            replica_id: collaborator.replica_id,
1512                            is_host: collaborator.is_host,
1513                        })
1514                        .collect(),
1515                    worktrees: reshared_project.worktrees.clone(),
1516                });
1517            }
1518
1519            project::Entity::delete_many()
1520                .filter(
1521                    Condition::all()
1522                        .add(project::Column::RoomId.eq(room_id))
1523                        .add(project::Column::HostUserId.eq(user_id))
1524                        .add(
1525                            project::Column::Id
1526                                .is_not_in(reshared_projects.iter().map(|project| project.id)),
1527                        ),
1528                )
1529                .exec(&*tx)
1530                .await?;
1531
1532            let mut rejoined_projects = Vec::new();
1533            for rejoined_project in &rejoin_room.rejoined_projects {
1534                let project_id = ProjectId::from_proto(rejoined_project.id);
1535                let Some(project) = project::Entity::find_by_id(project_id)
1536                    .one(&*tx)
1537                    .await? else { continue };
1538
1539                let mut worktrees = Vec::new();
1540                let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1541                for db_worktree in db_worktrees {
1542                    let mut worktree = RejoinedWorktree {
1543                        id: db_worktree.id as u64,
1544                        abs_path: db_worktree.abs_path,
1545                        root_name: db_worktree.root_name,
1546                        visible: db_worktree.visible,
1547                        updated_entries: Default::default(),
1548                        removed_entries: Default::default(),
1549                        updated_repositories: Default::default(),
1550                        removed_repositories: Default::default(),
1551                        diagnostic_summaries: Default::default(),
1552                        settings_files: Default::default(),
1553                        scan_id: db_worktree.scan_id as u64,
1554                        completed_scan_id: db_worktree.completed_scan_id as u64,
1555                    };
1556
1557                    let rejoined_worktree = rejoined_project
1558                        .worktrees
1559                        .iter()
1560                        .find(|worktree| worktree.id == db_worktree.id as u64);
1561
1562                    // File entries
1563                    {
1564                        let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1565                            worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1566                        } else {
1567                            worktree_entry::Column::IsDeleted.eq(false)
1568                        };
1569
1570                        let mut db_entries = worktree_entry::Entity::find()
1571                            .filter(
1572                                Condition::all()
1573                                    .add(worktree_entry::Column::ProjectId.eq(project.id))
1574                                    .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1575                                    .add(entry_filter),
1576                            )
1577                            .stream(&*tx)
1578                            .await?;
1579
1580                        while let Some(db_entry) = db_entries.next().await {
1581                            let db_entry = db_entry?;
1582                            if db_entry.is_deleted {
1583                                worktree.removed_entries.push(db_entry.id as u64);
1584                            } else {
1585                                worktree.updated_entries.push(proto::Entry {
1586                                    id: db_entry.id as u64,
1587                                    is_dir: db_entry.is_dir,
1588                                    path: db_entry.path,
1589                                    inode: db_entry.inode as u64,
1590                                    mtime: Some(proto::Timestamp {
1591                                        seconds: db_entry.mtime_seconds as u64,
1592                                        nanos: db_entry.mtime_nanos as u32,
1593                                    }),
1594                                    is_symlink: db_entry.is_symlink,
1595                                    is_ignored: db_entry.is_ignored,
1596                                    is_external: db_entry.is_external,
1597                                    git_status: db_entry.git_status.map(|status| status as i32),
1598                                });
1599                            }
1600                        }
1601                    }
1602
1603                    // Repository Entries
1604                    {
1605                        let repository_entry_filter =
1606                            if let Some(rejoined_worktree) = rejoined_worktree {
1607                                worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
1608                            } else {
1609                                worktree_repository::Column::IsDeleted.eq(false)
1610                            };
1611
1612                        let mut db_repositories = worktree_repository::Entity::find()
1613                            .filter(
1614                                Condition::all()
1615                                    .add(worktree_repository::Column::ProjectId.eq(project.id))
1616                                    .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
1617                                    .add(repository_entry_filter),
1618                            )
1619                            .stream(&*tx)
1620                            .await?;
1621
1622                        while let Some(db_repository) = db_repositories.next().await {
1623                            let db_repository = db_repository?;
1624                            if db_repository.is_deleted {
1625                                worktree
1626                                    .removed_repositories
1627                                    .push(db_repository.work_directory_id as u64);
1628                            } else {
1629                                worktree.updated_repositories.push(proto::RepositoryEntry {
1630                                    work_directory_id: db_repository.work_directory_id as u64,
1631                                    branch: db_repository.branch,
1632                                });
1633                            }
1634                        }
1635                    }
1636
1637                    worktrees.push(worktree);
1638                }
1639
1640                let language_servers = project
1641                    .find_related(language_server::Entity)
1642                    .all(&*tx)
1643                    .await?
1644                    .into_iter()
1645                    .map(|language_server| proto::LanguageServer {
1646                        id: language_server.id as u64,
1647                        name: language_server.name,
1648                    })
1649                    .collect::<Vec<_>>();
1650
1651                {
1652                    let mut db_settings_files = worktree_settings_file::Entity::find()
1653                        .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
1654                        .stream(&*tx)
1655                        .await?;
1656                    while let Some(db_settings_file) = db_settings_files.next().await {
1657                        let db_settings_file = db_settings_file?;
1658                        if let Some(worktree) = worktrees
1659                            .iter_mut()
1660                            .find(|w| w.id == db_settings_file.worktree_id as u64)
1661                        {
1662                            worktree.settings_files.push(WorktreeSettingsFile {
1663                                path: db_settings_file.path,
1664                                content: db_settings_file.content,
1665                            });
1666                        }
1667                    }
1668                }
1669
1670                let mut collaborators = project
1671                    .find_related(project_collaborator::Entity)
1672                    .all(&*tx)
1673                    .await?;
1674                let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1675                    .iter()
1676                    .position(|collaborator| collaborator.user_id == user_id)
1677                {
1678                    collaborators.swap_remove(self_collaborator_ix)
1679                } else {
1680                    continue;
1681                };
1682                let old_connection_id = self_collaborator.connection();
1683                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1684                    connection_id: ActiveValue::set(connection.id as i32),
1685                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1686                    ..self_collaborator.into_active_model()
1687                })
1688                .exec(&*tx)
1689                .await?;
1690
1691                let collaborators = collaborators
1692                    .into_iter()
1693                    .map(|collaborator| ProjectCollaborator {
1694                        connection_id: collaborator.connection(),
1695                        user_id: collaborator.user_id,
1696                        replica_id: collaborator.replica_id,
1697                        is_host: collaborator.is_host,
1698                    })
1699                    .collect::<Vec<_>>();
1700
1701                rejoined_projects.push(RejoinedProject {
1702                    id: project_id,
1703                    old_connection_id,
1704                    collaborators,
1705                    worktrees,
1706                    language_servers,
1707                });
1708            }
1709
1710            let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
1711
1712            let channel_members = if let Some(channel_id) = channel_id {
1713                self.get_channel_members_internal(channel_id, &tx).await?
1714            } else {
1715                Vec::new()
1716            };
1717
1718            Ok(RejoinedRoom {
1719                room,
1720                channel_id,
1721                channel_members,
1722                rejoined_projects,
1723                reshared_projects,
1724            })
1725        })
1726        .await
1727    }
1728
1729    pub async fn leave_room(
1730        &self,
1731        connection: ConnectionId,
1732    ) -> Result<Option<RoomGuard<LeftRoom>>> {
1733        self.optional_room_transaction(|tx| async move {
1734            let leaving_participant = room_participant::Entity::find()
1735                .filter(
1736                    Condition::all()
1737                        .add(
1738                            room_participant::Column::AnsweringConnectionId
1739                                .eq(connection.id as i32),
1740                        )
1741                        .add(
1742                            room_participant::Column::AnsweringConnectionServerId
1743                                .eq(connection.owner_id as i32),
1744                        ),
1745                )
1746                .one(&*tx)
1747                .await?;
1748
1749            if let Some(leaving_participant) = leaving_participant {
1750                // Leave room.
1751                let room_id = leaving_participant.room_id;
1752                room_participant::Entity::delete_by_id(leaving_participant.id)
1753                    .exec(&*tx)
1754                    .await?;
1755
1756                // Cancel pending calls initiated by the leaving user.
1757                let called_participants = room_participant::Entity::find()
1758                    .filter(
1759                        Condition::all()
1760                            .add(
1761                                room_participant::Column::CallingUserId
1762                                    .eq(leaving_participant.user_id),
1763                            )
1764                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
1765                    )
1766                    .all(&*tx)
1767                    .await?;
1768                room_participant::Entity::delete_many()
1769                    .filter(
1770                        room_participant::Column::Id
1771                            .is_in(called_participants.iter().map(|participant| participant.id)),
1772                    )
1773                    .exec(&*tx)
1774                    .await?;
1775                let canceled_calls_to_user_ids = called_participants
1776                    .into_iter()
1777                    .map(|participant| participant.user_id)
1778                    .collect();
1779
1780                // Detect left projects.
1781                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1782                enum QueryProjectIds {
1783                    ProjectId,
1784                }
1785                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1786                    .select_only()
1787                    .column_as(
1788                        project_collaborator::Column::ProjectId,
1789                        QueryProjectIds::ProjectId,
1790                    )
1791                    .filter(
1792                        Condition::all()
1793                            .add(
1794                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1795                            )
1796                            .add(
1797                                project_collaborator::Column::ConnectionServerId
1798                                    .eq(connection.owner_id as i32),
1799                            ),
1800                    )
1801                    .into_values::<_, QueryProjectIds>()
1802                    .all(&*tx)
1803                    .await?;
1804                let mut left_projects = HashMap::default();
1805                let mut collaborators = project_collaborator::Entity::find()
1806                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1807                    .stream(&*tx)
1808                    .await?;
1809                while let Some(collaborator) = collaborators.next().await {
1810                    let collaborator = collaborator?;
1811                    let left_project =
1812                        left_projects
1813                            .entry(collaborator.project_id)
1814                            .or_insert(LeftProject {
1815                                id: collaborator.project_id,
1816                                host_user_id: Default::default(),
1817                                connection_ids: Default::default(),
1818                                host_connection_id: Default::default(),
1819                            });
1820
1821                    let collaborator_connection_id = collaborator.connection();
1822                    if collaborator_connection_id != connection {
1823                        left_project.connection_ids.push(collaborator_connection_id);
1824                    }
1825
1826                    if collaborator.is_host {
1827                        left_project.host_user_id = collaborator.user_id;
1828                        left_project.host_connection_id = collaborator_connection_id;
1829                    }
1830                }
1831                drop(collaborators);
1832
1833                // Leave projects.
1834                project_collaborator::Entity::delete_many()
1835                    .filter(
1836                        Condition::all()
1837                            .add(
1838                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1839                            )
1840                            .add(
1841                                project_collaborator::Column::ConnectionServerId
1842                                    .eq(connection.owner_id as i32),
1843                            ),
1844                    )
1845                    .exec(&*tx)
1846                    .await?;
1847
1848                // Unshare projects.
1849                project::Entity::delete_many()
1850                    .filter(
1851                        Condition::all()
1852                            .add(project::Column::RoomId.eq(room_id))
1853                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
1854                            .add(
1855                                project::Column::HostConnectionServerId
1856                                    .eq(connection.owner_id as i32),
1857                            ),
1858                    )
1859                    .exec(&*tx)
1860                    .await?;
1861
1862                let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
1863                let deleted = if room.participants.is_empty() {
1864                    let result = room::Entity::delete_by_id(room_id)
1865                        .filter(room::Column::ChannelId.is_null())
1866                        .exec(&*tx)
1867                        .await?;
1868                    result.rows_affected > 0
1869                } else {
1870                    false
1871                };
1872
1873                let channel_members = if let Some(channel_id) = channel_id {
1874                    self.get_channel_members_internal(channel_id, &tx).await?
1875                } else {
1876                    Vec::new()
1877                };
1878                let left_room = LeftRoom {
1879                    room,
1880                    channel_id,
1881                    channel_members,
1882                    left_projects,
1883                    canceled_calls_to_user_ids,
1884                    deleted,
1885                };
1886
1887                if left_room.room.participants.is_empty() {
1888                    self.rooms.remove(&room_id);
1889                }
1890
1891                Ok(Some((room_id, left_room)))
1892            } else {
1893                Ok(None)
1894            }
1895        })
1896        .await
1897    }
1898
1899    pub async fn follow(
1900        &self,
1901        project_id: ProjectId,
1902        leader_connection: ConnectionId,
1903        follower_connection: ConnectionId,
1904    ) -> Result<RoomGuard<proto::Room>> {
1905        let room_id = self.room_id_for_project(project_id).await?;
1906        self.room_transaction(room_id, |tx| async move {
1907            follower::ActiveModel {
1908                room_id: ActiveValue::set(room_id),
1909                project_id: ActiveValue::set(project_id),
1910                leader_connection_server_id: ActiveValue::set(ServerId(
1911                    leader_connection.owner_id as i32,
1912                )),
1913                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1914                follower_connection_server_id: ActiveValue::set(ServerId(
1915                    follower_connection.owner_id as i32,
1916                )),
1917                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1918                ..Default::default()
1919            }
1920            .insert(&*tx)
1921            .await?;
1922
1923            let room = self.get_room(room_id, &*tx).await?;
1924            Ok(room)
1925        })
1926        .await
1927    }
1928
1929    pub async fn unfollow(
1930        &self,
1931        project_id: ProjectId,
1932        leader_connection: ConnectionId,
1933        follower_connection: ConnectionId,
1934    ) -> Result<RoomGuard<proto::Room>> {
1935        let room_id = self.room_id_for_project(project_id).await?;
1936        self.room_transaction(room_id, |tx| async move {
1937            follower::Entity::delete_many()
1938                .filter(
1939                    Condition::all()
1940                        .add(follower::Column::ProjectId.eq(project_id))
1941                        .add(
1942                            follower::Column::LeaderConnectionServerId
1943                                .eq(leader_connection.owner_id),
1944                        )
1945                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1946                        .add(
1947                            follower::Column::FollowerConnectionServerId
1948                                .eq(follower_connection.owner_id),
1949                        )
1950                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1951                )
1952                .exec(&*tx)
1953                .await?;
1954
1955            let room = self.get_room(room_id, &*tx).await?;
1956            Ok(room)
1957        })
1958        .await
1959    }
1960
1961    pub async fn update_room_participant_location(
1962        &self,
1963        room_id: RoomId,
1964        connection: ConnectionId,
1965        location: proto::ParticipantLocation,
1966    ) -> Result<RoomGuard<proto::Room>> {
1967        self.room_transaction(room_id, |tx| async {
1968            let tx = tx;
1969            let location_kind;
1970            let location_project_id;
1971            match location
1972                .variant
1973                .as_ref()
1974                .ok_or_else(|| anyhow!("invalid location"))?
1975            {
1976                proto::participant_location::Variant::SharedProject(project) => {
1977                    location_kind = 0;
1978                    location_project_id = Some(ProjectId::from_proto(project.id));
1979                }
1980                proto::participant_location::Variant::UnsharedProject(_) => {
1981                    location_kind = 1;
1982                    location_project_id = None;
1983                }
1984                proto::participant_location::Variant::External(_) => {
1985                    location_kind = 2;
1986                    location_project_id = None;
1987                }
1988            }
1989
1990            let result = room_participant::Entity::update_many()
1991                .filter(
1992                    Condition::all()
1993                        .add(room_participant::Column::RoomId.eq(room_id))
1994                        .add(
1995                            room_participant::Column::AnsweringConnectionId
1996                                .eq(connection.id as i32),
1997                        )
1998                        .add(
1999                            room_participant::Column::AnsweringConnectionServerId
2000                                .eq(connection.owner_id as i32),
2001                        ),
2002                )
2003                .set(room_participant::ActiveModel {
2004                    location_kind: ActiveValue::set(Some(location_kind)),
2005                    location_project_id: ActiveValue::set(location_project_id),
2006                    ..Default::default()
2007                })
2008                .exec(&*tx)
2009                .await?;
2010
2011            if result.rows_affected == 1 {
2012                let room = self.get_room(room_id, &tx).await?;
2013                Ok(room)
2014            } else {
2015                Err(anyhow!("could not update room participant location"))?
2016            }
2017        })
2018        .await
2019    }
2020
2021    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
2022        self.transaction(|tx| async move {
2023            let participant = room_participant::Entity::find()
2024                .filter(
2025                    Condition::all()
2026                        .add(
2027                            room_participant::Column::AnsweringConnectionId
2028                                .eq(connection.id as i32),
2029                        )
2030                        .add(
2031                            room_participant::Column::AnsweringConnectionServerId
2032                                .eq(connection.owner_id as i32),
2033                        ),
2034                )
2035                .one(&*tx)
2036                .await?
2037                .ok_or_else(|| anyhow!("not a participant in any room"))?;
2038
2039            room_participant::Entity::update(room_participant::ActiveModel {
2040                answering_connection_lost: ActiveValue::set(true),
2041                ..participant.into_active_model()
2042            })
2043            .exec(&*tx)
2044            .await?;
2045
2046            Ok(())
2047        })
2048        .await
2049    }
2050
2051    fn build_incoming_call(
2052        room: &proto::Room,
2053        called_user_id: UserId,
2054    ) -> Option<proto::IncomingCall> {
2055        let pending_participant = room
2056            .pending_participants
2057            .iter()
2058            .find(|participant| participant.user_id == called_user_id.to_proto())?;
2059
2060        Some(proto::IncomingCall {
2061            room_id: room.id,
2062            calling_user_id: pending_participant.calling_user_id,
2063            participant_user_ids: room
2064                .participants
2065                .iter()
2066                .map(|participant| participant.user_id)
2067                .collect(),
2068            initial_project: room.participants.iter().find_map(|participant| {
2069                let initial_project_id = pending_participant.initial_project_id?;
2070                participant
2071                    .projects
2072                    .iter()
2073                    .find(|project| project.id == initial_project_id)
2074                    .cloned()
2075            }),
2076        })
2077    }
2078    async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
2079        let (_, room) = self.get_channel_room(room_id, tx).await?;
2080        Ok(room)
2081    }
2082
2083    async fn get_channel_room(
2084        &self,
2085        room_id: RoomId,
2086        tx: &DatabaseTransaction,
2087    ) -> Result<(Option<ChannelId>, proto::Room)> {
2088        let db_room = room::Entity::find_by_id(room_id)
2089            .one(tx)
2090            .await?
2091            .ok_or_else(|| anyhow!("could not find room"))?;
2092
2093        let mut db_participants = db_room
2094            .find_related(room_participant::Entity)
2095            .stream(tx)
2096            .await?;
2097        let mut participants = HashMap::default();
2098        let mut pending_participants = Vec::new();
2099        while let Some(db_participant) = db_participants.next().await {
2100            let db_participant = db_participant?;
2101            if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
2102                .answering_connection_id
2103                .zip(db_participant.answering_connection_server_id)
2104            {
2105                let location = match (
2106                    db_participant.location_kind,
2107                    db_participant.location_project_id,
2108                ) {
2109                    (Some(0), Some(project_id)) => {
2110                        Some(proto::participant_location::Variant::SharedProject(
2111                            proto::participant_location::SharedProject {
2112                                id: project_id.to_proto(),
2113                            },
2114                        ))
2115                    }
2116                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
2117                        Default::default(),
2118                    )),
2119                    _ => Some(proto::participant_location::Variant::External(
2120                        Default::default(),
2121                    )),
2122                };
2123
2124                let answering_connection = ConnectionId {
2125                    owner_id: answering_connection_server_id.0 as u32,
2126                    id: answering_connection_id as u32,
2127                };
2128                participants.insert(
2129                    answering_connection,
2130                    proto::Participant {
2131                        user_id: db_participant.user_id.to_proto(),
2132                        peer_id: Some(answering_connection.into()),
2133                        projects: Default::default(),
2134                        location: Some(proto::ParticipantLocation { variant: location }),
2135                    },
2136                );
2137            } else {
2138                pending_participants.push(proto::PendingParticipant {
2139                    user_id: db_participant.user_id.to_proto(),
2140                    calling_user_id: db_participant.calling_user_id.to_proto(),
2141                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
2142                });
2143            }
2144        }
2145        drop(db_participants);
2146
2147        let mut db_projects = db_room
2148            .find_related(project::Entity)
2149            .find_with_related(worktree::Entity)
2150            .stream(tx)
2151            .await?;
2152
2153        while let Some(row) = db_projects.next().await {
2154            let (db_project, db_worktree) = row?;
2155            let host_connection = db_project.host_connection()?;
2156            if let Some(participant) = participants.get_mut(&host_connection) {
2157                let project = if let Some(project) = participant
2158                    .projects
2159                    .iter_mut()
2160                    .find(|project| project.id == db_project.id.to_proto())
2161                {
2162                    project
2163                } else {
2164                    participant.projects.push(proto::ParticipantProject {
2165                        id: db_project.id.to_proto(),
2166                        worktree_root_names: Default::default(),
2167                    });
2168                    participant.projects.last_mut().unwrap()
2169                };
2170
2171                if let Some(db_worktree) = db_worktree {
2172                    if db_worktree.visible {
2173                        project.worktree_root_names.push(db_worktree.root_name);
2174                    }
2175                }
2176            }
2177        }
2178        drop(db_projects);
2179
2180        let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
2181        let mut followers = Vec::new();
2182        while let Some(db_follower) = db_followers.next().await {
2183            let db_follower = db_follower?;
2184            followers.push(proto::Follower {
2185                leader_id: Some(db_follower.leader_connection().into()),
2186                follower_id: Some(db_follower.follower_connection().into()),
2187                project_id: db_follower.project_id.to_proto(),
2188            });
2189        }
2190
2191        Ok((
2192            db_room.channel_id,
2193            proto::Room {
2194                id: db_room.id.to_proto(),
2195                live_kit_room: db_room.live_kit_room,
2196                participants: participants.into_values().collect(),
2197                pending_participants,
2198                followers,
2199            },
2200        ))
2201    }
2202
2203    // projects
2204
2205    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
2206        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2207        enum QueryAs {
2208            Count,
2209        }
2210
2211        self.transaction(|tx| async move {
2212            Ok(project::Entity::find()
2213                .select_only()
2214                .column_as(project::Column::Id.count(), QueryAs::Count)
2215                .inner_join(user::Entity)
2216                .filter(user::Column::Admin.eq(false))
2217                .into_values::<_, QueryAs>()
2218                .one(&*tx)
2219                .await?
2220                .unwrap_or(0i64) as usize)
2221        })
2222        .await
2223    }
2224
2225    pub async fn share_project(
2226        &self,
2227        room_id: RoomId,
2228        connection: ConnectionId,
2229        worktrees: &[proto::WorktreeMetadata],
2230    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
2231        self.room_transaction(room_id, |tx| async move {
2232            let participant = room_participant::Entity::find()
2233                .filter(
2234                    Condition::all()
2235                        .add(
2236                            room_participant::Column::AnsweringConnectionId
2237                                .eq(connection.id as i32),
2238                        )
2239                        .add(
2240                            room_participant::Column::AnsweringConnectionServerId
2241                                .eq(connection.owner_id as i32),
2242                        ),
2243                )
2244                .one(&*tx)
2245                .await?
2246                .ok_or_else(|| anyhow!("could not find participant"))?;
2247            if participant.room_id != room_id {
2248                return Err(anyhow!("shared project on unexpected room"))?;
2249            }
2250
2251            let project = project::ActiveModel {
2252                room_id: ActiveValue::set(participant.room_id),
2253                host_user_id: ActiveValue::set(participant.user_id),
2254                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
2255                host_connection_server_id: ActiveValue::set(Some(ServerId(
2256                    connection.owner_id as i32,
2257                ))),
2258                ..Default::default()
2259            }
2260            .insert(&*tx)
2261            .await?;
2262
2263            if !worktrees.is_empty() {
2264                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2265                    worktree::ActiveModel {
2266                        id: ActiveValue::set(worktree.id as i64),
2267                        project_id: ActiveValue::set(project.id),
2268                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
2269                        root_name: ActiveValue::set(worktree.root_name.clone()),
2270                        visible: ActiveValue::set(worktree.visible),
2271                        scan_id: ActiveValue::set(0),
2272                        completed_scan_id: ActiveValue::set(0),
2273                    }
2274                }))
2275                .exec(&*tx)
2276                .await?;
2277            }
2278
2279            project_collaborator::ActiveModel {
2280                project_id: ActiveValue::set(project.id),
2281                connection_id: ActiveValue::set(connection.id as i32),
2282                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2283                user_id: ActiveValue::set(participant.user_id),
2284                replica_id: ActiveValue::set(ReplicaId(0)),
2285                is_host: ActiveValue::set(true),
2286                ..Default::default()
2287            }
2288            .insert(&*tx)
2289            .await?;
2290
2291            let room = self.get_room(room_id, &tx).await?;
2292            Ok((project.id, room))
2293        })
2294        .await
2295    }
2296
2297    pub async fn unshare_project(
2298        &self,
2299        project_id: ProjectId,
2300        connection: ConnectionId,
2301    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2302        let room_id = self.room_id_for_project(project_id).await?;
2303        self.room_transaction(room_id, |tx| async move {
2304            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2305
2306            let project = project::Entity::find_by_id(project_id)
2307                .one(&*tx)
2308                .await?
2309                .ok_or_else(|| anyhow!("project not found"))?;
2310            if project.host_connection()? == connection {
2311                project::Entity::delete(project.into_active_model())
2312                    .exec(&*tx)
2313                    .await?;
2314                let room = self.get_room(room_id, &tx).await?;
2315                Ok((room, guest_connection_ids))
2316            } else {
2317                Err(anyhow!("cannot unshare a project hosted by another user"))?
2318            }
2319        })
2320        .await
2321    }
2322
2323    pub async fn update_project(
2324        &self,
2325        project_id: ProjectId,
2326        connection: ConnectionId,
2327        worktrees: &[proto::WorktreeMetadata],
2328    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2329        let room_id = self.room_id_for_project(project_id).await?;
2330        self.room_transaction(room_id, |tx| async move {
2331            let project = project::Entity::find_by_id(project_id)
2332                .filter(
2333                    Condition::all()
2334                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2335                        .add(
2336                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2337                        ),
2338                )
2339                .one(&*tx)
2340                .await?
2341                .ok_or_else(|| anyhow!("no such project"))?;
2342
2343            self.update_project_worktrees(project.id, worktrees, &tx)
2344                .await?;
2345
2346            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2347            let room = self.get_room(project.room_id, &tx).await?;
2348            Ok((room, guest_connection_ids))
2349        })
2350        .await
2351    }
2352
2353    async fn update_project_worktrees(
2354        &self,
2355        project_id: ProjectId,
2356        worktrees: &[proto::WorktreeMetadata],
2357        tx: &DatabaseTransaction,
2358    ) -> Result<()> {
2359        if !worktrees.is_empty() {
2360            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2361                id: ActiveValue::set(worktree.id as i64),
2362                project_id: ActiveValue::set(project_id),
2363                abs_path: ActiveValue::set(worktree.abs_path.clone()),
2364                root_name: ActiveValue::set(worktree.root_name.clone()),
2365                visible: ActiveValue::set(worktree.visible),
2366                scan_id: ActiveValue::set(0),
2367                completed_scan_id: ActiveValue::set(0),
2368            }))
2369            .on_conflict(
2370                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2371                    .update_column(worktree::Column::RootName)
2372                    .to_owned(),
2373            )
2374            .exec(&*tx)
2375            .await?;
2376        }
2377
2378        worktree::Entity::delete_many()
2379            .filter(worktree::Column::ProjectId.eq(project_id).and(
2380                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2381            ))
2382            .exec(&*tx)
2383            .await?;
2384
2385        Ok(())
2386    }
2387
2388    pub async fn update_worktree(
2389        &self,
2390        update: &proto::UpdateWorktree,
2391        connection: ConnectionId,
2392    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2393        let project_id = ProjectId::from_proto(update.project_id);
2394        let worktree_id = update.worktree_id as i64;
2395        let room_id = self.room_id_for_project(project_id).await?;
2396        self.room_transaction(room_id, |tx| async move {
2397            // Ensure the update comes from the host.
2398            let _project = project::Entity::find_by_id(project_id)
2399                .filter(
2400                    Condition::all()
2401                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2402                        .add(
2403                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2404                        ),
2405                )
2406                .one(&*tx)
2407                .await?
2408                .ok_or_else(|| anyhow!("no such project"))?;
2409
2410            // Update metadata.
2411            worktree::Entity::update(worktree::ActiveModel {
2412                id: ActiveValue::set(worktree_id),
2413                project_id: ActiveValue::set(project_id),
2414                root_name: ActiveValue::set(update.root_name.clone()),
2415                scan_id: ActiveValue::set(update.scan_id as i64),
2416                completed_scan_id: if update.is_last_update {
2417                    ActiveValue::set(update.scan_id as i64)
2418                } else {
2419                    ActiveValue::default()
2420                },
2421                abs_path: ActiveValue::set(update.abs_path.clone()),
2422                ..Default::default()
2423            })
2424            .exec(&*tx)
2425            .await?;
2426
2427            if !update.updated_entries.is_empty() {
2428                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2429                    let mtime = entry.mtime.clone().unwrap_or_default();
2430                    worktree_entry::ActiveModel {
2431                        project_id: ActiveValue::set(project_id),
2432                        worktree_id: ActiveValue::set(worktree_id),
2433                        id: ActiveValue::set(entry.id as i64),
2434                        is_dir: ActiveValue::set(entry.is_dir),
2435                        path: ActiveValue::set(entry.path.clone()),
2436                        inode: ActiveValue::set(entry.inode as i64),
2437                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2438                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2439                        is_symlink: ActiveValue::set(entry.is_symlink),
2440                        is_ignored: ActiveValue::set(entry.is_ignored),
2441                        is_external: ActiveValue::set(entry.is_external),
2442                        git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
2443                        is_deleted: ActiveValue::set(false),
2444                        scan_id: ActiveValue::set(update.scan_id as i64),
2445                    }
2446                }))
2447                .on_conflict(
2448                    OnConflict::columns([
2449                        worktree_entry::Column::ProjectId,
2450                        worktree_entry::Column::WorktreeId,
2451                        worktree_entry::Column::Id,
2452                    ])
2453                    .update_columns([
2454                        worktree_entry::Column::IsDir,
2455                        worktree_entry::Column::Path,
2456                        worktree_entry::Column::Inode,
2457                        worktree_entry::Column::MtimeSeconds,
2458                        worktree_entry::Column::MtimeNanos,
2459                        worktree_entry::Column::IsSymlink,
2460                        worktree_entry::Column::IsIgnored,
2461                        worktree_entry::Column::GitStatus,
2462                        worktree_entry::Column::ScanId,
2463                    ])
2464                    .to_owned(),
2465                )
2466                .exec(&*tx)
2467                .await?;
2468            }
2469
2470            if !update.removed_entries.is_empty() {
2471                worktree_entry::Entity::update_many()
2472                    .filter(
2473                        worktree_entry::Column::ProjectId
2474                            .eq(project_id)
2475                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2476                            .and(
2477                                worktree_entry::Column::Id
2478                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2479                            ),
2480                    )
2481                    .set(worktree_entry::ActiveModel {
2482                        is_deleted: ActiveValue::Set(true),
2483                        scan_id: ActiveValue::Set(update.scan_id as i64),
2484                        ..Default::default()
2485                    })
2486                    .exec(&*tx)
2487                    .await?;
2488            }
2489
2490            if !update.updated_repositories.is_empty() {
2491                worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
2492                    |repository| worktree_repository::ActiveModel {
2493                        project_id: ActiveValue::set(project_id),
2494                        worktree_id: ActiveValue::set(worktree_id),
2495                        work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
2496                        scan_id: ActiveValue::set(update.scan_id as i64),
2497                        branch: ActiveValue::set(repository.branch.clone()),
2498                        is_deleted: ActiveValue::set(false),
2499                    },
2500                ))
2501                .on_conflict(
2502                    OnConflict::columns([
2503                        worktree_repository::Column::ProjectId,
2504                        worktree_repository::Column::WorktreeId,
2505                        worktree_repository::Column::WorkDirectoryId,
2506                    ])
2507                    .update_columns([
2508                        worktree_repository::Column::ScanId,
2509                        worktree_repository::Column::Branch,
2510                    ])
2511                    .to_owned(),
2512                )
2513                .exec(&*tx)
2514                .await?;
2515            }
2516
2517            if !update.removed_repositories.is_empty() {
2518                worktree_repository::Entity::update_many()
2519                    .filter(
2520                        worktree_repository::Column::ProjectId
2521                            .eq(project_id)
2522                            .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
2523                            .and(
2524                                worktree_repository::Column::WorkDirectoryId
2525                                    .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
2526                            ),
2527                    )
2528                    .set(worktree_repository::ActiveModel {
2529                        is_deleted: ActiveValue::Set(true),
2530                        scan_id: ActiveValue::Set(update.scan_id as i64),
2531                        ..Default::default()
2532                    })
2533                    .exec(&*tx)
2534                    .await?;
2535            }
2536
2537            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2538            Ok(connection_ids)
2539        })
2540        .await
2541    }
2542
2543    pub async fn update_diagnostic_summary(
2544        &self,
2545        update: &proto::UpdateDiagnosticSummary,
2546        connection: ConnectionId,
2547    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2548        let project_id = ProjectId::from_proto(update.project_id);
2549        let worktree_id = update.worktree_id as i64;
2550        let room_id = self.room_id_for_project(project_id).await?;
2551        self.room_transaction(room_id, |tx| async move {
2552            let summary = update
2553                .summary
2554                .as_ref()
2555                .ok_or_else(|| anyhow!("invalid summary"))?;
2556
2557            // Ensure the update comes from the host.
2558            let project = project::Entity::find_by_id(project_id)
2559                .one(&*tx)
2560                .await?
2561                .ok_or_else(|| anyhow!("no such project"))?;
2562            if project.host_connection()? != connection {
2563                return Err(anyhow!("can't update a project hosted by someone else"))?;
2564            }
2565
2566            // Update summary.
2567            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2568                project_id: ActiveValue::set(project_id),
2569                worktree_id: ActiveValue::set(worktree_id),
2570                path: ActiveValue::set(summary.path.clone()),
2571                language_server_id: ActiveValue::set(summary.language_server_id as i64),
2572                error_count: ActiveValue::set(summary.error_count as i32),
2573                warning_count: ActiveValue::set(summary.warning_count as i32),
2574                ..Default::default()
2575            })
2576            .on_conflict(
2577                OnConflict::columns([
2578                    worktree_diagnostic_summary::Column::ProjectId,
2579                    worktree_diagnostic_summary::Column::WorktreeId,
2580                    worktree_diagnostic_summary::Column::Path,
2581                ])
2582                .update_columns([
2583                    worktree_diagnostic_summary::Column::LanguageServerId,
2584                    worktree_diagnostic_summary::Column::ErrorCount,
2585                    worktree_diagnostic_summary::Column::WarningCount,
2586                ])
2587                .to_owned(),
2588            )
2589            .exec(&*tx)
2590            .await?;
2591
2592            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2593            Ok(connection_ids)
2594        })
2595        .await
2596    }
2597
2598    pub async fn start_language_server(
2599        &self,
2600        update: &proto::StartLanguageServer,
2601        connection: ConnectionId,
2602    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2603        let project_id = ProjectId::from_proto(update.project_id);
2604        let room_id = self.room_id_for_project(project_id).await?;
2605        self.room_transaction(room_id, |tx| async move {
2606            let server = update
2607                .server
2608                .as_ref()
2609                .ok_or_else(|| anyhow!("invalid language server"))?;
2610
2611            // Ensure the update comes from the host.
2612            let project = project::Entity::find_by_id(project_id)
2613                .one(&*tx)
2614                .await?
2615                .ok_or_else(|| anyhow!("no such project"))?;
2616            if project.host_connection()? != connection {
2617                return Err(anyhow!("can't update a project hosted by someone else"))?;
2618            }
2619
2620            // Add the newly-started language server.
2621            language_server::Entity::insert(language_server::ActiveModel {
2622                project_id: ActiveValue::set(project_id),
2623                id: ActiveValue::set(server.id as i64),
2624                name: ActiveValue::set(server.name.clone()),
2625                ..Default::default()
2626            })
2627            .on_conflict(
2628                OnConflict::columns([
2629                    language_server::Column::ProjectId,
2630                    language_server::Column::Id,
2631                ])
2632                .update_column(language_server::Column::Name)
2633                .to_owned(),
2634            )
2635            .exec(&*tx)
2636            .await?;
2637
2638            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2639            Ok(connection_ids)
2640        })
2641        .await
2642    }
2643
2644    pub async fn update_worktree_settings(
2645        &self,
2646        update: &proto::UpdateWorktreeSettings,
2647        connection: ConnectionId,
2648    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2649        let project_id = ProjectId::from_proto(update.project_id);
2650        let room_id = self.room_id_for_project(project_id).await?;
2651        self.room_transaction(room_id, |tx| async move {
2652            // Ensure the update comes from the host.
2653            let project = project::Entity::find_by_id(project_id)
2654                .one(&*tx)
2655                .await?
2656                .ok_or_else(|| anyhow!("no such project"))?;
2657            if project.host_connection()? != connection {
2658                return Err(anyhow!("can't update a project hosted by someone else"))?;
2659            }
2660
2661            if let Some(content) = &update.content {
2662                worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
2663                    project_id: ActiveValue::Set(project_id),
2664                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
2665                    path: ActiveValue::Set(update.path.clone()),
2666                    content: ActiveValue::Set(content.clone()),
2667                })
2668                .on_conflict(
2669                    OnConflict::columns([
2670                        worktree_settings_file::Column::ProjectId,
2671                        worktree_settings_file::Column::WorktreeId,
2672                        worktree_settings_file::Column::Path,
2673                    ])
2674                    .update_column(worktree_settings_file::Column::Content)
2675                    .to_owned(),
2676                )
2677                .exec(&*tx)
2678                .await?;
2679            } else {
2680                worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
2681                    project_id: ActiveValue::Set(project_id),
2682                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
2683                    path: ActiveValue::Set(update.path.clone()),
2684                    ..Default::default()
2685                })
2686                .exec(&*tx)
2687                .await?;
2688            }
2689
2690            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2691            Ok(connection_ids)
2692        })
2693        .await
2694    }
2695
2696    pub async fn join_project(
2697        &self,
2698        project_id: ProjectId,
2699        connection: ConnectionId,
2700    ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2701        let room_id = self.room_id_for_project(project_id).await?;
2702        self.room_transaction(room_id, |tx| async move {
2703            let participant = room_participant::Entity::find()
2704                .filter(
2705                    Condition::all()
2706                        .add(
2707                            room_participant::Column::AnsweringConnectionId
2708                                .eq(connection.id as i32),
2709                        )
2710                        .add(
2711                            room_participant::Column::AnsweringConnectionServerId
2712                                .eq(connection.owner_id as i32),
2713                        ),
2714                )
2715                .one(&*tx)
2716                .await?
2717                .ok_or_else(|| anyhow!("must join a room first"))?;
2718
2719            let project = project::Entity::find_by_id(project_id)
2720                .one(&*tx)
2721                .await?
2722                .ok_or_else(|| anyhow!("no such project"))?;
2723            if project.room_id != participant.room_id {
2724                return Err(anyhow!("no such project"))?;
2725            }
2726
2727            let mut collaborators = project
2728                .find_related(project_collaborator::Entity)
2729                .all(&*tx)
2730                .await?;
2731            let replica_ids = collaborators
2732                .iter()
2733                .map(|c| c.replica_id)
2734                .collect::<HashSet<_>>();
2735            let mut replica_id = ReplicaId(1);
2736            while replica_ids.contains(&replica_id) {
2737                replica_id.0 += 1;
2738            }
2739            let new_collaborator = project_collaborator::ActiveModel {
2740                project_id: ActiveValue::set(project_id),
2741                connection_id: ActiveValue::set(connection.id as i32),
2742                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2743                user_id: ActiveValue::set(participant.user_id),
2744                replica_id: ActiveValue::set(replica_id),
2745                is_host: ActiveValue::set(false),
2746                ..Default::default()
2747            }
2748            .insert(&*tx)
2749            .await?;
2750            collaborators.push(new_collaborator);
2751
2752            let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2753            let mut worktrees = db_worktrees
2754                .into_iter()
2755                .map(|db_worktree| {
2756                    (
2757                        db_worktree.id as u64,
2758                        Worktree {
2759                            id: db_worktree.id as u64,
2760                            abs_path: db_worktree.abs_path,
2761                            root_name: db_worktree.root_name,
2762                            visible: db_worktree.visible,
2763                            entries: Default::default(),
2764                            repository_entries: Default::default(),
2765                            diagnostic_summaries: Default::default(),
2766                            settings_files: Default::default(),
2767                            scan_id: db_worktree.scan_id as u64,
2768                            completed_scan_id: db_worktree.completed_scan_id as u64,
2769                        },
2770                    )
2771                })
2772                .collect::<BTreeMap<_, _>>();
2773
2774            // Populate worktree entries.
2775            {
2776                let mut db_entries = worktree_entry::Entity::find()
2777                    .filter(
2778                        Condition::all()
2779                            .add(worktree_entry::Column::ProjectId.eq(project_id))
2780                            .add(worktree_entry::Column::IsDeleted.eq(false)),
2781                    )
2782                    .stream(&*tx)
2783                    .await?;
2784                while let Some(db_entry) = db_entries.next().await {
2785                    let db_entry = db_entry?;
2786                    if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2787                        worktree.entries.push(proto::Entry {
2788                            id: db_entry.id as u64,
2789                            is_dir: db_entry.is_dir,
2790                            path: db_entry.path,
2791                            inode: db_entry.inode as u64,
2792                            mtime: Some(proto::Timestamp {
2793                                seconds: db_entry.mtime_seconds as u64,
2794                                nanos: db_entry.mtime_nanos as u32,
2795                            }),
2796                            is_symlink: db_entry.is_symlink,
2797                            is_ignored: db_entry.is_ignored,
2798                            is_external: db_entry.is_external,
2799                            git_status: db_entry.git_status.map(|status| status as i32),
2800                        });
2801                    }
2802                }
2803            }
2804
2805            // Populate repository entries.
2806            {
2807                let mut db_repository_entries = worktree_repository::Entity::find()
2808                    .filter(
2809                        Condition::all()
2810                            .add(worktree_repository::Column::ProjectId.eq(project_id))
2811                            .add(worktree_repository::Column::IsDeleted.eq(false)),
2812                    )
2813                    .stream(&*tx)
2814                    .await?;
2815                while let Some(db_repository_entry) = db_repository_entries.next().await {
2816                    let db_repository_entry = db_repository_entry?;
2817                    if let Some(worktree) =
2818                        worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
2819                    {
2820                        worktree.repository_entries.insert(
2821                            db_repository_entry.work_directory_id as u64,
2822                            proto::RepositoryEntry {
2823                                work_directory_id: db_repository_entry.work_directory_id as u64,
2824                                branch: db_repository_entry.branch,
2825                            },
2826                        );
2827                    }
2828                }
2829            }
2830
2831            // Populate worktree diagnostic summaries.
2832            {
2833                let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2834                    .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2835                    .stream(&*tx)
2836                    .await?;
2837                while let Some(db_summary) = db_summaries.next().await {
2838                    let db_summary = db_summary?;
2839                    if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2840                        worktree
2841                            .diagnostic_summaries
2842                            .push(proto::DiagnosticSummary {
2843                                path: db_summary.path,
2844                                language_server_id: db_summary.language_server_id as u64,
2845                                error_count: db_summary.error_count as u32,
2846                                warning_count: db_summary.warning_count as u32,
2847                            });
2848                    }
2849                }
2850            }
2851
2852            // Populate worktree settings files
2853            {
2854                let mut db_settings_files = worktree_settings_file::Entity::find()
2855                    .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
2856                    .stream(&*tx)
2857                    .await?;
2858                while let Some(db_settings_file) = db_settings_files.next().await {
2859                    let db_settings_file = db_settings_file?;
2860                    if let Some(worktree) =
2861                        worktrees.get_mut(&(db_settings_file.worktree_id as u64))
2862                    {
2863                        worktree.settings_files.push(WorktreeSettingsFile {
2864                            path: db_settings_file.path,
2865                            content: db_settings_file.content,
2866                        });
2867                    }
2868                }
2869            }
2870
2871            // Populate language servers.
2872            let language_servers = project
2873                .find_related(language_server::Entity)
2874                .all(&*tx)
2875                .await?;
2876
2877            let project = Project {
2878                collaborators: collaborators
2879                    .into_iter()
2880                    .map(|collaborator| ProjectCollaborator {
2881                        connection_id: collaborator.connection(),
2882                        user_id: collaborator.user_id,
2883                        replica_id: collaborator.replica_id,
2884                        is_host: collaborator.is_host,
2885                    })
2886                    .collect(),
2887                worktrees,
2888                language_servers: language_servers
2889                    .into_iter()
2890                    .map(|language_server| proto::LanguageServer {
2891                        id: language_server.id as u64,
2892                        name: language_server.name,
2893                    })
2894                    .collect(),
2895            };
2896            Ok((project, replica_id as ReplicaId))
2897        })
2898        .await
2899    }
2900
2901    pub async fn leave_project(
2902        &self,
2903        project_id: ProjectId,
2904        connection: ConnectionId,
2905    ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
2906        let room_id = self.room_id_for_project(project_id).await?;
2907        self.room_transaction(room_id, |tx| async move {
2908            let result = project_collaborator::Entity::delete_many()
2909                .filter(
2910                    Condition::all()
2911                        .add(project_collaborator::Column::ProjectId.eq(project_id))
2912                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2913                        .add(
2914                            project_collaborator::Column::ConnectionServerId
2915                                .eq(connection.owner_id as i32),
2916                        ),
2917                )
2918                .exec(&*tx)
2919                .await?;
2920            if result.rows_affected == 0 {
2921                Err(anyhow!("not a collaborator on this project"))?;
2922            }
2923
2924            let project = project::Entity::find_by_id(project_id)
2925                .one(&*tx)
2926                .await?
2927                .ok_or_else(|| anyhow!("no such project"))?;
2928            let collaborators = project
2929                .find_related(project_collaborator::Entity)
2930                .all(&*tx)
2931                .await?;
2932            let connection_ids = collaborators
2933                .into_iter()
2934                .map(|collaborator| collaborator.connection())
2935                .collect();
2936
2937            follower::Entity::delete_many()
2938                .filter(
2939                    Condition::any()
2940                        .add(
2941                            Condition::all()
2942                                .add(follower::Column::ProjectId.eq(project_id))
2943                                .add(
2944                                    follower::Column::LeaderConnectionServerId
2945                                        .eq(connection.owner_id),
2946                                )
2947                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
2948                        )
2949                        .add(
2950                            Condition::all()
2951                                .add(follower::Column::ProjectId.eq(project_id))
2952                                .add(
2953                                    follower::Column::FollowerConnectionServerId
2954                                        .eq(connection.owner_id),
2955                                )
2956                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
2957                        ),
2958                )
2959                .exec(&*tx)
2960                .await?;
2961
2962            let room = self.get_room(project.room_id, &tx).await?;
2963            let left_project = LeftProject {
2964                id: project_id,
2965                host_user_id: project.host_user_id,
2966                host_connection_id: project.host_connection()?,
2967                connection_ids,
2968            };
2969            Ok((room, left_project))
2970        })
2971        .await
2972    }
2973
2974    pub async fn project_collaborators(
2975        &self,
2976        project_id: ProjectId,
2977        connection_id: ConnectionId,
2978    ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
2979        let room_id = self.room_id_for_project(project_id).await?;
2980        self.room_transaction(room_id, |tx| async move {
2981            let collaborators = project_collaborator::Entity::find()
2982                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2983                .all(&*tx)
2984                .await?
2985                .into_iter()
2986                .map(|collaborator| ProjectCollaborator {
2987                    connection_id: collaborator.connection(),
2988                    user_id: collaborator.user_id,
2989                    replica_id: collaborator.replica_id,
2990                    is_host: collaborator.is_host,
2991                })
2992                .collect::<Vec<_>>();
2993
2994            if collaborators
2995                .iter()
2996                .any(|collaborator| collaborator.connection_id == connection_id)
2997            {
2998                Ok(collaborators)
2999            } else {
3000                Err(anyhow!("no such project"))?
3001            }
3002        })
3003        .await
3004    }
3005
3006    pub async fn project_connection_ids(
3007        &self,
3008        project_id: ProjectId,
3009        connection_id: ConnectionId,
3010    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
3011        let room_id = self.room_id_for_project(project_id).await?;
3012        self.room_transaction(room_id, |tx| async move {
3013            let mut collaborators = project_collaborator::Entity::find()
3014                .filter(project_collaborator::Column::ProjectId.eq(project_id))
3015                .stream(&*tx)
3016                .await?;
3017
3018            let mut connection_ids = HashSet::default();
3019            while let Some(collaborator) = collaborators.next().await {
3020                let collaborator = collaborator?;
3021                connection_ids.insert(collaborator.connection());
3022            }
3023
3024            if connection_ids.contains(&connection_id) {
3025                Ok(connection_ids)
3026            } else {
3027                Err(anyhow!("no such project"))?
3028            }
3029        })
3030        .await
3031    }
3032
3033    async fn project_guest_connection_ids(
3034        &self,
3035        project_id: ProjectId,
3036        tx: &DatabaseTransaction,
3037    ) -> Result<Vec<ConnectionId>> {
3038        let mut collaborators = project_collaborator::Entity::find()
3039            .filter(
3040                project_collaborator::Column::ProjectId
3041                    .eq(project_id)
3042                    .and(project_collaborator::Column::IsHost.eq(false)),
3043            )
3044            .stream(tx)
3045            .await?;
3046
3047        let mut guest_connection_ids = Vec::new();
3048        while let Some(collaborator) = collaborators.next().await {
3049            let collaborator = collaborator?;
3050            guest_connection_ids.push(collaborator.connection());
3051        }
3052        Ok(guest_connection_ids)
3053    }
3054
3055    async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
3056        self.transaction(|tx| async move {
3057            let project = project::Entity::find_by_id(project_id)
3058                .one(&*tx)
3059                .await?
3060                .ok_or_else(|| anyhow!("project {} not found", project_id))?;
3061            Ok(project.room_id)
3062        })
3063        .await
3064    }
3065
3066    // access tokens
3067
3068    pub async fn create_access_token(
3069        &self,
3070        user_id: UserId,
3071        access_token_hash: &str,
3072        max_access_token_count: usize,
3073    ) -> Result<AccessTokenId> {
3074        self.transaction(|tx| async {
3075            let tx = tx;
3076
3077            let token = access_token::ActiveModel {
3078                user_id: ActiveValue::set(user_id),
3079                hash: ActiveValue::set(access_token_hash.into()),
3080                ..Default::default()
3081            }
3082            .insert(&*tx)
3083            .await?;
3084
3085            access_token::Entity::delete_many()
3086                .filter(
3087                    access_token::Column::Id.in_subquery(
3088                        Query::select()
3089                            .column(access_token::Column::Id)
3090                            .from(access_token::Entity)
3091                            .and_where(access_token::Column::UserId.eq(user_id))
3092                            .order_by(access_token::Column::Id, sea_orm::Order::Desc)
3093                            .limit(10000)
3094                            .offset(max_access_token_count as u64)
3095                            .to_owned(),
3096                    ),
3097                )
3098                .exec(&*tx)
3099                .await?;
3100            Ok(token.id)
3101        })
3102        .await
3103    }
3104
3105    pub async fn get_access_token(
3106        &self,
3107        access_token_id: AccessTokenId,
3108    ) -> Result<access_token::Model> {
3109        self.transaction(|tx| async move {
3110            Ok(access_token::Entity::find_by_id(access_token_id)
3111                .one(&*tx)
3112                .await?
3113                .ok_or_else(|| anyhow!("no such access token"))?)
3114        })
3115        .await
3116    }
3117
3118    // channels
3119
3120    pub async fn create_root_channel(
3121        &self,
3122        name: &str,
3123        live_kit_room: &str,
3124        creator_id: UserId,
3125    ) -> Result<ChannelId> {
3126        self.create_channel(name, None, live_kit_room, creator_id)
3127            .await
3128    }
3129
3130    pub async fn create_channel(
3131        &self,
3132        name: &str,
3133        parent: Option<ChannelId>,
3134        live_kit_room: &str,
3135        creator_id: UserId,
3136    ) -> Result<ChannelId> {
3137        self.transaction(move |tx| async move {
3138            let tx = tx;
3139
3140            if let Some(parent) = parent {
3141                let channels = self.get_channel_ancestors(parent, &*tx).await?;
3142                channel_member::Entity::find()
3143                    .filter(channel_member::Column::ChannelId.is_in(channels.iter().copied()))
3144                    .filter(
3145                        channel_member::Column::UserId
3146                            .eq(creator_id)
3147                            .and(channel_member::Column::Accepted.eq(true)),
3148                    )
3149                    .one(&*tx)
3150                    .await?
3151                    .ok_or_else(|| {
3152                        anyhow!("User does not have the permissions to create this channel")
3153                    })?;
3154            }
3155
3156            let channel = channel::ActiveModel {
3157                name: ActiveValue::Set(name.to_string()),
3158                ..Default::default()
3159            };
3160
3161            let channel = channel.insert(&*tx).await?;
3162
3163            if let Some(parent) = parent {
3164                channel_parent::ActiveModel {
3165                    child_id: ActiveValue::Set(channel.id),
3166                    parent_id: ActiveValue::Set(parent),
3167                }
3168                .insert(&*tx)
3169                .await?;
3170            }
3171
3172            channel_member::ActiveModel {
3173                channel_id: ActiveValue::Set(channel.id),
3174                user_id: ActiveValue::Set(creator_id),
3175                accepted: ActiveValue::Set(true),
3176                admin: ActiveValue::Set(true),
3177                ..Default::default()
3178            }
3179            .insert(&*tx)
3180            .await?;
3181
3182            room::ActiveModel {
3183                channel_id: ActiveValue::Set(Some(channel.id)),
3184                live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
3185                ..Default::default()
3186            }
3187            .insert(&*tx)
3188            .await?;
3189
3190            Ok(channel.id)
3191        })
3192        .await
3193    }
3194
3195    pub async fn remove_channel(
3196        &self,
3197        channel_id: ChannelId,
3198        user_id: UserId,
3199    ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
3200        self.transaction(move |tx| async move {
3201            let tx = tx;
3202
3203            // Check if user is an admin
3204            channel_member::Entity::find()
3205                .filter(
3206                    channel_member::Column::ChannelId
3207                        .eq(channel_id)
3208                        .and(channel_member::Column::UserId.eq(user_id))
3209                        .and(channel_member::Column::Admin.eq(true)),
3210                )
3211                .one(&*tx)
3212                .await?
3213                .ok_or_else(|| anyhow!("user is not allowed to remove this channel"))?;
3214
3215            let mut descendants = self.get_channel_descendants([channel_id], &*tx).await?;
3216
3217            // Keep channels which have another active
3218            let mut channels_to_keep = channel_parent::Entity::find()
3219                .filter(
3220                    channel_parent::Column::ChildId
3221                        .is_in(descendants.keys().copied().filter(|&id| id != channel_id))
3222                        .and(
3223                            channel_parent::Column::ParentId.is_not_in(descendants.keys().copied()),
3224                        ),
3225                )
3226                .stream(&*tx)
3227                .await?;
3228
3229            while let Some(row) = channels_to_keep.next().await {
3230                let row = row?;
3231                descendants.remove(&row.child_id);
3232            }
3233
3234            drop(channels_to_keep);
3235
3236            let channels_to_remove = descendants.keys().copied().collect::<Vec<_>>();
3237
3238            let members_to_notify: Vec<UserId> = channel_member::Entity::find()
3239                .filter(channel_member::Column::ChannelId.is_in(channels_to_remove.iter().copied()))
3240                .select_only()
3241                .column(channel_member::Column::UserId)
3242                .distinct()
3243                .into_values::<_, QueryUserIds>()
3244                .all(&*tx)
3245                .await?;
3246
3247            // Channel members and parents should delete via cascade
3248            channel::Entity::delete_many()
3249                .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
3250                .exec(&*tx)
3251                .await?;
3252
3253            Ok((channels_to_remove, members_to_notify))
3254        })
3255        .await
3256    }
3257
3258    pub async fn invite_channel_member(
3259        &self,
3260        channel_id: ChannelId,
3261        invitee_id: UserId,
3262        inviter_id: UserId,
3263        is_admin: bool,
3264    ) -> Result<()> {
3265        self.transaction(move |tx| async move {
3266            let tx = tx;
3267
3268            // Check if inviter is a member
3269            channel_member::Entity::find()
3270                .filter(
3271                    channel_member::Column::ChannelId
3272                        .eq(channel_id)
3273                        .and(channel_member::Column::UserId.eq(inviter_id))
3274                        .and(channel_member::Column::Admin.eq(true)),
3275                )
3276                .one(&*tx)
3277                .await?
3278                .ok_or_else(|| {
3279                    anyhow!("Inviter does not have permissions to invite the invitee")
3280                })?;
3281
3282            let channel_membership = channel_member::ActiveModel {
3283                channel_id: ActiveValue::Set(channel_id),
3284                user_id: ActiveValue::Set(invitee_id),
3285                accepted: ActiveValue::Set(false),
3286                admin: ActiveValue::Set(is_admin),
3287                ..Default::default()
3288            };
3289
3290            channel_membership.insert(&*tx).await?;
3291
3292            Ok(())
3293        })
3294        .await
3295    }
3296
3297    pub async fn respond_to_channel_invite(
3298        &self,
3299        channel_id: ChannelId,
3300        user_id: UserId,
3301        accept: bool,
3302    ) -> Result<()> {
3303        self.transaction(move |tx| async move {
3304            let tx = tx;
3305
3306            let rows_affected = if accept {
3307                channel_member::Entity::update_many()
3308                    .set(channel_member::ActiveModel {
3309                        accepted: ActiveValue::Set(accept),
3310                        ..Default::default()
3311                    })
3312                    .filter(
3313                        channel_member::Column::ChannelId
3314                            .eq(channel_id)
3315                            .and(channel_member::Column::UserId.eq(user_id))
3316                            .and(channel_member::Column::Accepted.eq(false)),
3317                    )
3318                    .exec(&*tx)
3319                    .await?
3320                    .rows_affected
3321            } else {
3322                channel_member::ActiveModel {
3323                    channel_id: ActiveValue::Unchanged(channel_id),
3324                    user_id: ActiveValue::Unchanged(user_id),
3325                    ..Default::default()
3326                }
3327                .delete(&*tx)
3328                .await?
3329                .rows_affected
3330            };
3331
3332            if rows_affected == 0 {
3333                Err(anyhow!("no such invitation"))?;
3334            }
3335
3336            Ok(())
3337        })
3338        .await
3339    }
3340
3341    pub async fn get_channel_invites(&self, user_id: UserId) -> Result<Vec<Channel>> {
3342        self.transaction(|tx| async move {
3343            let tx = tx;
3344
3345            let channel_invites = channel_member::Entity::find()
3346                .filter(
3347                    channel_member::Column::UserId
3348                        .eq(user_id)
3349                        .and(channel_member::Column::Accepted.eq(false)),
3350                )
3351                .all(&*tx)
3352                .await?;
3353
3354            let channels = channel::Entity::find()
3355                .filter(
3356                    channel::Column::Id.is_in(
3357                        channel_invites
3358                            .into_iter()
3359                            .map(|channel_member| channel_member.channel_id),
3360                    ),
3361                )
3362                .all(&*tx)
3363                .await?;
3364
3365            let channels = channels
3366                .into_iter()
3367                .map(|channel| Channel {
3368                    id: channel.id,
3369                    name: channel.name,
3370                    parent_id: None,
3371                })
3372                .collect();
3373
3374            Ok(channels)
3375        })
3376        .await
3377    }
3378
3379    pub async fn get_channels(
3380        &self,
3381        user_id: UserId,
3382    ) -> Result<(Vec<Channel>, HashMap<ChannelId, Vec<UserId>>)> {
3383        self.transaction(|tx| async move {
3384            let tx = tx;
3385
3386            let starting_channel_ids: Vec<ChannelId> = channel_member::Entity::find()
3387                .filter(
3388                    channel_member::Column::UserId
3389                        .eq(user_id)
3390                        .and(channel_member::Column::Accepted.eq(true)),
3391                )
3392                .select_only()
3393                .column(channel_member::Column::ChannelId)
3394                .into_values::<_, QueryChannelIds>()
3395                .all(&*tx)
3396                .await?;
3397
3398            let parents_by_child_id = self
3399                .get_channel_descendants(starting_channel_ids, &*tx)
3400                .await?;
3401
3402            let mut channels = Vec::with_capacity(parents_by_child_id.len());
3403            let mut rows = channel::Entity::find()
3404                .filter(channel::Column::Id.is_in(parents_by_child_id.keys().copied()))
3405                .stream(&*tx)
3406                .await?;
3407
3408            while let Some(row) = rows.next().await {
3409                let row = row?;
3410                channels.push(Channel {
3411                    id: row.id,
3412                    name: row.name,
3413                    parent_id: parents_by_child_id.get(&row.id).copied().flatten(),
3414                });
3415            }
3416
3417            drop(rows);
3418
3419            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
3420            enum QueryUserIdsAndChannelIds {
3421                ChannelId,
3422                UserId,
3423            }
3424
3425            let mut participants = room_participant::Entity::find()
3426                .inner_join(room::Entity)
3427                .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
3428                .select_only()
3429                .column(room::Column::ChannelId)
3430                .column(room_participant::Column::UserId)
3431                .into_values::<_, QueryUserIdsAndChannelIds>()
3432                .stream(&*tx)
3433                .await?;
3434
3435            let mut participant_map: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
3436            while let Some(row) = participants.next().await {
3437                let row: (ChannelId, UserId) = row?;
3438                participant_map.entry(row.0).or_default().push(row.1)
3439            }
3440
3441            drop(participants);
3442
3443            Ok((channels, participant_map))
3444        })
3445        .await
3446    }
3447
3448    pub async fn get_channel_members(&self, id: ChannelId) -> Result<Vec<UserId>> {
3449        self.transaction(|tx| async move {
3450            let tx = tx;
3451            let user_ids = self.get_channel_members_internal(id, &*tx).await?;
3452            Ok(user_ids)
3453        })
3454        .await
3455    }
3456
3457    pub async fn get_channel_members_internal(
3458        &self,
3459        id: ChannelId,
3460        tx: &DatabaseTransaction,
3461    ) -> Result<Vec<UserId>> {
3462        let ancestor_ids = self.get_channel_ancestors(id, tx).await?;
3463        let user_ids = channel_member::Entity::find()
3464            .distinct()
3465            .filter(channel_member::Column::ChannelId.is_in(ancestor_ids.iter().copied()))
3466            .select_only()
3467            .column(channel_member::Column::UserId)
3468            .into_values::<_, QueryUserIds>()
3469            .all(&*tx)
3470            .await?;
3471        Ok(user_ids)
3472    }
3473
3474    async fn get_channel_ancestors(
3475        &self,
3476        id: ChannelId,
3477        tx: &DatabaseTransaction,
3478    ) -> Result<Vec<ChannelId>> {
3479        let sql = format!(
3480            r#"
3481            WITH RECURSIVE channel_tree(child_id, parent_id) AS (
3482                    SELECT CAST(NULL as INTEGER) as child_id, root_ids.column1 as parent_id
3483                    FROM (VALUES ({})) as root_ids
3484                UNION
3485                    SELECT channel_parents.child_id, channel_parents.parent_id
3486                    FROM channel_parents, channel_tree
3487                    WHERE channel_parents.child_id = channel_tree.parent_id
3488            )
3489            SELECT DISTINCT channel_tree.parent_id
3490            FROM channel_tree
3491            "#,
3492            id
3493        );
3494
3495        #[derive(FromQueryResult, Debug, PartialEq)]
3496        pub struct ChannelParent {
3497            pub parent_id: ChannelId,
3498        }
3499
3500        let stmt = Statement::from_string(self.pool.get_database_backend(), sql);
3501
3502        let mut channel_ids_stream = channel_parent::Entity::find()
3503            .from_raw_sql(stmt)
3504            .into_model::<ChannelParent>()
3505            .stream(&*tx)
3506            .await?;
3507
3508        let mut channel_ids = vec![];
3509        while let Some(channel_id) = channel_ids_stream.next().await {
3510            channel_ids.push(channel_id?.parent_id);
3511        }
3512
3513        Ok(channel_ids)
3514    }
3515
3516    async fn get_channel_descendants(
3517        &self,
3518        channel_ids: impl IntoIterator<Item = ChannelId>,
3519        tx: &DatabaseTransaction,
3520    ) -> Result<HashMap<ChannelId, Option<ChannelId>>> {
3521        let mut values = String::new();
3522        for id in channel_ids {
3523            if !values.is_empty() {
3524                values.push_str(", ");
3525            }
3526            write!(&mut values, "({})", id).unwrap();
3527        }
3528
3529        if values.is_empty() {
3530            return Ok(HashMap::default());
3531        }
3532
3533        let sql = format!(
3534            r#"
3535            WITH RECURSIVE channel_tree(child_id, parent_id) AS (
3536                    SELECT root_ids.column1 as child_id, CAST(NULL as INTEGER) as parent_id
3537                    FROM (VALUES {}) as root_ids
3538                UNION
3539                    SELECT channel_parents.child_id, channel_parents.parent_id
3540                    FROM channel_parents, channel_tree
3541                    WHERE channel_parents.parent_id = channel_tree.child_id
3542            )
3543            SELECT channel_tree.child_id, channel_tree.parent_id
3544            FROM channel_tree
3545            ORDER BY child_id, parent_id IS NOT NULL
3546            "#,
3547            values
3548        );
3549
3550        #[derive(FromQueryResult, Debug, PartialEq)]
3551        pub struct ChannelParent {
3552            pub child_id: ChannelId,
3553            pub parent_id: Option<ChannelId>,
3554        }
3555
3556        let stmt = Statement::from_string(self.pool.get_database_backend(), sql);
3557
3558        let mut parents_by_child_id = HashMap::default();
3559        let mut parents = channel_parent::Entity::find()
3560            .from_raw_sql(stmt)
3561            .into_model::<ChannelParent>()
3562            .stream(tx)
3563            .await?;
3564
3565        while let Some(parent) = parents.next().await {
3566            let parent = parent?;
3567            parents_by_child_id.insert(parent.child_id, parent.parent_id);
3568        }
3569
3570        Ok(parents_by_child_id)
3571    }
3572
3573    pub async fn get_channel(&self, channel_id: ChannelId) -> Result<Option<Channel>> {
3574        self.transaction(|tx| async move {
3575            let tx = tx;
3576            let channel = channel::Entity::find_by_id(channel_id).one(&*tx).await?;
3577
3578            Ok(channel.map(|channel| Channel {
3579                id: channel.id,
3580                name: channel.name,
3581                parent_id: None,
3582            }))
3583        })
3584        .await
3585    }
3586
3587    pub async fn room_id_for_channel(&self, channel_id: ChannelId) -> Result<RoomId> {
3588        self.transaction(|tx| async move {
3589            let tx = tx;
3590            let room = channel::Model {
3591                id: channel_id,
3592                ..Default::default()
3593            }
3594            .find_related(room::Entity)
3595            .one(&*tx)
3596            .await?
3597            .ok_or_else(|| anyhow!("invalid channel"))?;
3598            Ok(room.id)
3599        })
3600        .await
3601    }
3602
3603    async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
3604    where
3605        F: Send + Fn(TransactionHandle) -> Fut,
3606        Fut: Send + Future<Output = Result<T>>,
3607    {
3608        let body = async {
3609            let mut i = 0;
3610            loop {
3611                let (tx, result) = self.with_transaction(&f).await?;
3612                match result {
3613                    Ok(result) => match tx.commit().await.map_err(Into::into) {
3614                        Ok(()) => return Ok(result),
3615                        Err(error) => {
3616                            if !self.retry_on_serialization_error(&error, i).await {
3617                                return Err(error);
3618                            }
3619                        }
3620                    },
3621                    Err(error) => {
3622                        tx.rollback().await?;
3623                        if !self.retry_on_serialization_error(&error, i).await {
3624                            return Err(error);
3625                        }
3626                    }
3627                }
3628                i += 1;
3629            }
3630        };
3631
3632        self.run(body).await
3633    }
3634
3635    async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
3636    where
3637        F: Send + Fn(TransactionHandle) -> Fut,
3638        Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
3639    {
3640        let body = async {
3641            let mut i = 0;
3642            loop {
3643                let (tx, result) = self.with_transaction(&f).await?;
3644                match result {
3645                    Ok(Some((room_id, data))) => {
3646                        let lock = self.rooms.entry(room_id).or_default().clone();
3647                        let _guard = lock.lock_owned().await;
3648                        match tx.commit().await.map_err(Into::into) {
3649                            Ok(()) => {
3650                                return Ok(Some(RoomGuard {
3651                                    data,
3652                                    _guard,
3653                                    _not_send: PhantomData,
3654                                }));
3655                            }
3656                            Err(error) => {
3657                                if !self.retry_on_serialization_error(&error, i).await {
3658                                    return Err(error);
3659                                }
3660                            }
3661                        }
3662                    }
3663                    Ok(None) => match tx.commit().await.map_err(Into::into) {
3664                        Ok(()) => return Ok(None),
3665                        Err(error) => {
3666                            if !self.retry_on_serialization_error(&error, i).await {
3667                                return Err(error);
3668                            }
3669                        }
3670                    },
3671                    Err(error) => {
3672                        tx.rollback().await?;
3673                        if !self.retry_on_serialization_error(&error, i).await {
3674                            return Err(error);
3675                        }
3676                    }
3677                }
3678                i += 1;
3679            }
3680        };
3681
3682        self.run(body).await
3683    }
3684
3685    async fn room_transaction<F, Fut, T>(&self, room_id: RoomId, f: F) -> Result<RoomGuard<T>>
3686    where
3687        F: Send + Fn(TransactionHandle) -> Fut,
3688        Fut: Send + Future<Output = Result<T>>,
3689    {
3690        let body = async {
3691            let mut i = 0;
3692            loop {
3693                let lock = self.rooms.entry(room_id).or_default().clone();
3694                let _guard = lock.lock_owned().await;
3695                let (tx, result) = self.with_transaction(&f).await?;
3696                match result {
3697                    Ok(data) => match tx.commit().await.map_err(Into::into) {
3698                        Ok(()) => {
3699                            return Ok(RoomGuard {
3700                                data,
3701                                _guard,
3702                                _not_send: PhantomData,
3703                            });
3704                        }
3705                        Err(error) => {
3706                            if !self.retry_on_serialization_error(&error, i).await {
3707                                return Err(error);
3708                            }
3709                        }
3710                    },
3711                    Err(error) => {
3712                        tx.rollback().await?;
3713                        if !self.retry_on_serialization_error(&error, i).await {
3714                            return Err(error);
3715                        }
3716                    }
3717                }
3718                i += 1;
3719            }
3720        };
3721
3722        self.run(body).await
3723    }
3724
3725    async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
3726    where
3727        F: Send + Fn(TransactionHandle) -> Fut,
3728        Fut: Send + Future<Output = Result<T>>,
3729    {
3730        let tx = self
3731            .pool
3732            .begin_with_config(Some(IsolationLevel::Serializable), None)
3733            .await?;
3734
3735        let mut tx = Arc::new(Some(tx));
3736        let result = f(TransactionHandle(tx.clone())).await;
3737        let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
3738            return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
3739        };
3740
3741        Ok((tx, result))
3742    }
3743
3744    async fn run<F, T>(&self, future: F) -> Result<T>
3745    where
3746        F: Future<Output = Result<T>>,
3747    {
3748        #[cfg(test)]
3749        {
3750            if let Executor::Deterministic(executor) = &self.executor {
3751                executor.simulate_random_delay().await;
3752            }
3753
3754            self.runtime.as_ref().unwrap().block_on(future)
3755        }
3756
3757        #[cfg(not(test))]
3758        {
3759            future.await
3760        }
3761    }
3762
3763    async fn retry_on_serialization_error(&self, error: &Error, prev_attempt_count: u32) -> bool {
3764        // If the error is due to a failure to serialize concurrent transactions, then retry
3765        // this transaction after a delay. With each subsequent retry, double the delay duration.
3766        // Also vary the delay randomly in order to ensure different database connections retry
3767        // at different times.
3768        if is_serialization_error(error) {
3769            let base_delay = 4_u64 << prev_attempt_count.min(16);
3770            let randomized_delay = base_delay as f32 * self.rng.lock().await.gen_range(0.5..=2.0);
3771            log::info!(
3772                "retrying transaction after serialization error. delay: {} ms.",
3773                randomized_delay
3774            );
3775            self.executor
3776                .sleep(Duration::from_millis(randomized_delay as u64))
3777                .await;
3778            true
3779        } else {
3780            false
3781        }
3782    }
3783}
3784
3785fn is_serialization_error(error: &Error) -> bool {
3786    const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
3787    match error {
3788        Error::Database(
3789            DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
3790            | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
3791        ) if error
3792            .as_database_error()
3793            .and_then(|error| error.code())
3794            .as_deref()
3795            == Some(SERIALIZATION_FAILURE_CODE) =>
3796        {
3797            true
3798        }
3799        _ => false,
3800    }
3801}
3802
3803struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
3804
3805impl Deref for TransactionHandle {
3806    type Target = DatabaseTransaction;
3807
3808    fn deref(&self) -> &Self::Target {
3809        self.0.as_ref().as_ref().unwrap()
3810    }
3811}
3812
3813pub struct RoomGuard<T> {
3814    data: T,
3815    _guard: OwnedMutexGuard<()>,
3816    _not_send: PhantomData<Rc<()>>,
3817}
3818
3819impl<T> Deref for RoomGuard<T> {
3820    type Target = T;
3821
3822    fn deref(&self) -> &T {
3823        &self.data
3824    }
3825}
3826
3827impl<T> DerefMut for RoomGuard<T> {
3828    fn deref_mut(&mut self) -> &mut T {
3829        &mut self.data
3830    }
3831}
3832
3833#[derive(Debug, Serialize, Deserialize)]
3834pub struct NewUserParams {
3835    pub github_login: String,
3836    pub github_user_id: i32,
3837    pub invite_count: i32,
3838}
3839
3840#[derive(Debug)]
3841pub struct NewUserResult {
3842    pub user_id: UserId,
3843    pub metrics_id: String,
3844    pub inviting_user_id: Option<UserId>,
3845    pub signup_device_id: Option<String>,
3846}
3847
3848#[derive(FromQueryResult, Debug, PartialEq)]
3849pub struct Channel {
3850    pub id: ChannelId,
3851    pub name: String,
3852    pub parent_id: Option<ChannelId>,
3853}
3854
3855fn random_invite_code() -> String {
3856    nanoid::nanoid!(16)
3857}
3858
3859fn random_email_confirmation_code() -> String {
3860    nanoid::nanoid!(64)
3861}
3862
3863macro_rules! id_type {
3864    ($name:ident) => {
3865        #[derive(
3866            Clone,
3867            Copy,
3868            Debug,
3869            Default,
3870            PartialEq,
3871            Eq,
3872            PartialOrd,
3873            Ord,
3874            Hash,
3875            Serialize,
3876            Deserialize,
3877        )]
3878        #[serde(transparent)]
3879        pub struct $name(pub i32);
3880
3881        impl $name {
3882            #[allow(unused)]
3883            pub const MAX: Self = Self(i32::MAX);
3884
3885            #[allow(unused)]
3886            pub fn from_proto(value: u64) -> Self {
3887                Self(value as i32)
3888            }
3889
3890            #[allow(unused)]
3891            pub fn to_proto(self) -> u64 {
3892                self.0 as u64
3893            }
3894        }
3895
3896        impl std::fmt::Display for $name {
3897            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
3898                self.0.fmt(f)
3899            }
3900        }
3901
3902        impl From<$name> for sea_query::Value {
3903            fn from(value: $name) -> Self {
3904                sea_query::Value::Int(Some(value.0))
3905            }
3906        }
3907
3908        impl sea_orm::TryGetable for $name {
3909            fn try_get(
3910                res: &sea_orm::QueryResult,
3911                pre: &str,
3912                col: &str,
3913            ) -> Result<Self, sea_orm::TryGetError> {
3914                Ok(Self(i32::try_get(res, pre, col)?))
3915            }
3916        }
3917
3918        impl sea_query::ValueType for $name {
3919            fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
3920                match v {
3921                    Value::TinyInt(Some(int)) => {
3922                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3923                    }
3924                    Value::SmallInt(Some(int)) => {
3925                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3926                    }
3927                    Value::Int(Some(int)) => {
3928                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3929                    }
3930                    Value::BigInt(Some(int)) => {
3931                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3932                    }
3933                    Value::TinyUnsigned(Some(int)) => {
3934                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3935                    }
3936                    Value::SmallUnsigned(Some(int)) => {
3937                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3938                    }
3939                    Value::Unsigned(Some(int)) => {
3940                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3941                    }
3942                    Value::BigUnsigned(Some(int)) => {
3943                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3944                    }
3945                    _ => Err(sea_query::ValueTypeErr),
3946                }
3947            }
3948
3949            fn type_name() -> String {
3950                stringify!($name).into()
3951            }
3952
3953            fn array_type() -> sea_query::ArrayType {
3954                sea_query::ArrayType::Int
3955            }
3956
3957            fn column_type() -> sea_query::ColumnType {
3958                sea_query::ColumnType::Integer(None)
3959            }
3960        }
3961
3962        impl sea_orm::TryFromU64 for $name {
3963            fn try_from_u64(n: u64) -> Result<Self, DbErr> {
3964                Ok(Self(n.try_into().map_err(|_| {
3965                    DbErr::ConvertFromU64(concat!(
3966                        "error converting ",
3967                        stringify!($name),
3968                        " to u64"
3969                    ))
3970                })?))
3971            }
3972        }
3973
3974        impl sea_query::Nullable for $name {
3975            fn null() -> Value {
3976                Value::Int(None)
3977            }
3978        }
3979    };
3980}
3981
3982id_type!(AccessTokenId);
3983id_type!(ChannelId);
3984id_type!(ChannelMemberId);
3985id_type!(ContactId);
3986id_type!(FollowerId);
3987id_type!(RoomId);
3988id_type!(RoomParticipantId);
3989id_type!(ProjectId);
3990id_type!(ProjectCollaboratorId);
3991id_type!(ReplicaId);
3992id_type!(ServerId);
3993id_type!(SignupId);
3994id_type!(UserId);
3995
3996#[derive(Clone)]
3997pub struct JoinRoom {
3998    pub room: proto::Room,
3999    pub channel_id: Option<ChannelId>,
4000    pub channel_members: Vec<UserId>,
4001}
4002
4003pub struct RejoinedRoom {
4004    pub room: proto::Room,
4005    pub rejoined_projects: Vec<RejoinedProject>,
4006    pub reshared_projects: Vec<ResharedProject>,
4007    pub channel_id: Option<ChannelId>,
4008    pub channel_members: Vec<UserId>,
4009}
4010
4011pub struct ResharedProject {
4012    pub id: ProjectId,
4013    pub old_connection_id: ConnectionId,
4014    pub collaborators: Vec<ProjectCollaborator>,
4015    pub worktrees: Vec<proto::WorktreeMetadata>,
4016}
4017
4018pub struct RejoinedProject {
4019    pub id: ProjectId,
4020    pub old_connection_id: ConnectionId,
4021    pub collaborators: Vec<ProjectCollaborator>,
4022    pub worktrees: Vec<RejoinedWorktree>,
4023    pub language_servers: Vec<proto::LanguageServer>,
4024}
4025
4026#[derive(Debug)]
4027pub struct RejoinedWorktree {
4028    pub id: u64,
4029    pub abs_path: String,
4030    pub root_name: String,
4031    pub visible: bool,
4032    pub updated_entries: Vec<proto::Entry>,
4033    pub removed_entries: Vec<u64>,
4034    pub updated_repositories: Vec<proto::RepositoryEntry>,
4035    pub removed_repositories: Vec<u64>,
4036    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
4037    pub settings_files: Vec<WorktreeSettingsFile>,
4038    pub scan_id: u64,
4039    pub completed_scan_id: u64,
4040}
4041
4042pub struct LeftRoom {
4043    pub room: proto::Room,
4044    pub channel_id: Option<ChannelId>,
4045    pub channel_members: Vec<UserId>,
4046    pub left_projects: HashMap<ProjectId, LeftProject>,
4047    pub canceled_calls_to_user_ids: Vec<UserId>,
4048    pub deleted: bool,
4049}
4050
4051pub struct RefreshedRoom {
4052    pub room: proto::Room,
4053    pub channel_id: Option<ChannelId>,
4054    pub channel_members: Vec<UserId>,
4055    pub stale_participant_user_ids: Vec<UserId>,
4056    pub canceled_calls_to_user_ids: Vec<UserId>,
4057}
4058
4059pub struct Project {
4060    pub collaborators: Vec<ProjectCollaborator>,
4061    pub worktrees: BTreeMap<u64, Worktree>,
4062    pub language_servers: Vec<proto::LanguageServer>,
4063}
4064
4065pub struct ProjectCollaborator {
4066    pub connection_id: ConnectionId,
4067    pub user_id: UserId,
4068    pub replica_id: ReplicaId,
4069    pub is_host: bool,
4070}
4071
4072impl ProjectCollaborator {
4073    pub fn to_proto(&self) -> proto::Collaborator {
4074        proto::Collaborator {
4075            peer_id: Some(self.connection_id.into()),
4076            replica_id: self.replica_id.0 as u32,
4077            user_id: self.user_id.to_proto(),
4078        }
4079    }
4080}
4081
4082#[derive(Debug)]
4083pub struct LeftProject {
4084    pub id: ProjectId,
4085    pub host_user_id: UserId,
4086    pub host_connection_id: ConnectionId,
4087    pub connection_ids: Vec<ConnectionId>,
4088}
4089
4090pub struct Worktree {
4091    pub id: u64,
4092    pub abs_path: String,
4093    pub root_name: String,
4094    pub visible: bool,
4095    pub entries: Vec<proto::Entry>,
4096    pub repository_entries: BTreeMap<u64, proto::RepositoryEntry>,
4097    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
4098    pub settings_files: Vec<WorktreeSettingsFile>,
4099    pub scan_id: u64,
4100    pub completed_scan_id: u64,
4101}
4102
4103#[derive(Debug)]
4104pub struct WorktreeSettingsFile {
4105    pub path: String,
4106    pub content: String,
4107}
4108
4109#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
4110enum QueryChannelIds {
4111    ChannelId,
4112}
4113
4114#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
4115enum QueryUserIds {
4116    UserId,
4117}
4118
4119#[cfg(test)]
4120pub use test::*;
4121
4122#[cfg(test)]
4123mod test {
4124    use super::*;
4125    use gpui::executor::Background;
4126    use parking_lot::Mutex;
4127    use sea_orm::ConnectionTrait;
4128    use sqlx::migrate::MigrateDatabase;
4129    use std::sync::Arc;
4130
4131    pub struct TestDb {
4132        pub db: Option<Arc<Database>>,
4133        pub connection: Option<sqlx::AnyConnection>,
4134    }
4135
4136    impl TestDb {
4137        pub fn sqlite(background: Arc<Background>) -> Self {
4138            let url = format!("sqlite::memory:");
4139            let runtime = tokio::runtime::Builder::new_current_thread()
4140                .enable_io()
4141                .enable_time()
4142                .build()
4143                .unwrap();
4144
4145            let mut db = runtime.block_on(async {
4146                let mut options = ConnectOptions::new(url);
4147                options.max_connections(5);
4148                let db = Database::new(options, Executor::Deterministic(background))
4149                    .await
4150                    .unwrap();
4151                let sql = include_str!(concat!(
4152                    env!("CARGO_MANIFEST_DIR"),
4153                    "/migrations.sqlite/20221109000000_test_schema.sql"
4154                ));
4155                db.pool
4156                    .execute(sea_orm::Statement::from_string(
4157                        db.pool.get_database_backend(),
4158                        sql.into(),
4159                    ))
4160                    .await
4161                    .unwrap();
4162                db
4163            });
4164
4165            db.runtime = Some(runtime);
4166
4167            Self {
4168                db: Some(Arc::new(db)),
4169                connection: None,
4170            }
4171        }
4172
4173        pub fn postgres(background: Arc<Background>) -> Self {
4174            static LOCK: Mutex<()> = Mutex::new(());
4175
4176            let _guard = LOCK.lock();
4177            let mut rng = StdRng::from_entropy();
4178            let url = format!(
4179                "postgres://postgres@localhost/zed-test-{}",
4180                rng.gen::<u128>()
4181            );
4182            let runtime = tokio::runtime::Builder::new_current_thread()
4183                .enable_io()
4184                .enable_time()
4185                .build()
4186                .unwrap();
4187
4188            let mut db = runtime.block_on(async {
4189                sqlx::Postgres::create_database(&url)
4190                    .await
4191                    .expect("failed to create test db");
4192                let mut options = ConnectOptions::new(url);
4193                options
4194                    .max_connections(5)
4195                    .idle_timeout(Duration::from_secs(0));
4196                let db = Database::new(options, Executor::Deterministic(background))
4197                    .await
4198                    .unwrap();
4199                let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
4200                db.migrate(Path::new(migrations_path), false).await.unwrap();
4201                db
4202            });
4203
4204            db.runtime = Some(runtime);
4205
4206            Self {
4207                db: Some(Arc::new(db)),
4208                connection: None,
4209            }
4210        }
4211
4212        pub fn db(&self) -> &Arc<Database> {
4213            self.db.as_ref().unwrap()
4214        }
4215    }
4216
4217    impl Drop for TestDb {
4218        fn drop(&mut self) {
4219            let db = self.db.take().unwrap();
4220            if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
4221                db.runtime.as_ref().unwrap().block_on(async {
4222                    use util::ResultExt;
4223                    let query = "
4224                        SELECT pg_terminate_backend(pg_stat_activity.pid)
4225                        FROM pg_stat_activity
4226                        WHERE
4227                            pg_stat_activity.datname = current_database() AND
4228                            pid <> pg_backend_pid();
4229                    ";
4230                    db.pool
4231                        .execute(sea_orm::Statement::from_string(
4232                            db.pool.get_database_backend(),
4233                            query.into(),
4234                        ))
4235                        .await
4236                        .log_err();
4237                    sqlx::Postgres::drop_database(db.options.get_url())
4238                        .await
4239                        .log_err();
4240                })
4241            }
4242        }
4243    }
4244}