db.rs

   1mod access_token;
   2mod channel;
   3mod channel_member;
   4mod channel_parent;
   5mod contact;
   6mod follower;
   7mod language_server;
   8mod project;
   9mod project_collaborator;
  10mod room;
  11mod room_participant;
  12mod server;
  13mod signup;
  14#[cfg(test)]
  15mod tests;
  16mod user;
  17mod worktree;
  18mod worktree_diagnostic_summary;
  19mod worktree_entry;
  20mod worktree_repository;
  21mod worktree_repository_statuses;
  22mod worktree_settings_file;
  23
  24use crate::executor::Executor;
  25use crate::{Error, Result};
  26use anyhow::anyhow;
  27use collections::{BTreeMap, HashMap, HashSet};
  28pub use contact::Contact;
  29use dashmap::DashMap;
  30use futures::StreamExt;
  31use hyper::StatusCode;
  32use rand::prelude::StdRng;
  33use rand::{Rng, SeedableRng};
  34use rpc::{proto, ConnectionId};
  35use sea_orm::Condition;
  36pub use sea_orm::ConnectOptions;
  37use sea_orm::{
  38    entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
  39    DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
  40    Statement, TransactionTrait,
  41};
  42use sea_query::{Alias, Expr, OnConflict, Query};
  43use serde::{Deserialize, Serialize};
  44pub use signup::{Invite, NewSignup, WaitlistSummary};
  45use sqlx::migrate::{Migrate, Migration, MigrationSource};
  46use sqlx::Connection;
  47use std::fmt::Write as _;
  48use std::ops::{Deref, DerefMut};
  49use std::path::Path;
  50use std::time::Duration;
  51use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
  52use tokio::sync::{Mutex, OwnedMutexGuard};
  53pub use user::Model as User;
  54
  55pub struct Database {
  56    options: ConnectOptions,
  57    pool: DatabaseConnection,
  58    rooms: DashMap<RoomId, Arc<Mutex<()>>>,
  59    rng: Mutex<StdRng>,
  60    executor: Executor,
  61    #[cfg(test)]
  62    runtime: Option<tokio::runtime::Runtime>,
  63}
  64
  65impl Database {
  66    pub async fn new(options: ConnectOptions, executor: Executor) -> Result<Self> {
  67        Ok(Self {
  68            options: options.clone(),
  69            pool: sea_orm::Database::connect(options).await?,
  70            rooms: DashMap::with_capacity(16384),
  71            rng: Mutex::new(StdRng::seed_from_u64(0)),
  72            executor,
  73            #[cfg(test)]
  74            runtime: None,
  75        })
  76    }
  77
  78    #[cfg(test)]
  79    pub fn reset(&self) {
  80        self.rooms.clear();
  81    }
  82
  83    pub async fn migrate(
  84        &self,
  85        migrations_path: &Path,
  86        ignore_checksum_mismatch: bool,
  87    ) -> anyhow::Result<Vec<(Migration, Duration)>> {
  88        let migrations = MigrationSource::resolve(migrations_path)
  89            .await
  90            .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
  91
  92        let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
  93
  94        connection.ensure_migrations_table().await?;
  95        let applied_migrations: HashMap<_, _> = connection
  96            .list_applied_migrations()
  97            .await?
  98            .into_iter()
  99            .map(|m| (m.version, m))
 100            .collect();
 101
 102        let mut new_migrations = Vec::new();
 103        for migration in migrations {
 104            match applied_migrations.get(&migration.version) {
 105                Some(applied_migration) => {
 106                    if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
 107                    {
 108                        Err(anyhow!(
 109                            "checksum mismatch for applied migration {}",
 110                            migration.description
 111                        ))?;
 112                    }
 113                }
 114                None => {
 115                    let elapsed = connection.apply(&migration).await?;
 116                    new_migrations.push((migration, elapsed));
 117                }
 118            }
 119        }
 120
 121        Ok(new_migrations)
 122    }
 123
 124    pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
 125        self.transaction(|tx| async move {
 126            let server = server::ActiveModel {
 127                environment: ActiveValue::set(environment.into()),
 128                ..Default::default()
 129            }
 130            .insert(&*tx)
 131            .await?;
 132            Ok(server.id)
 133        })
 134        .await
 135    }
 136
 137    pub async fn stale_room_ids(
 138        &self,
 139        environment: &str,
 140        new_server_id: ServerId,
 141    ) -> Result<Vec<RoomId>> {
 142        self.transaction(|tx| async move {
 143            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 144            enum QueryAs {
 145                RoomId,
 146            }
 147
 148            let stale_server_epochs = self
 149                .stale_server_ids(environment, new_server_id, &tx)
 150                .await?;
 151            Ok(room_participant::Entity::find()
 152                .select_only()
 153                .column(room_participant::Column::RoomId)
 154                .distinct()
 155                .filter(
 156                    room_participant::Column::AnsweringConnectionServerId
 157                        .is_in(stale_server_epochs),
 158                )
 159                .into_values::<_, QueryAs>()
 160                .all(&*tx)
 161                .await?)
 162        })
 163        .await
 164    }
 165
 166    pub async fn refresh_room(
 167        &self,
 168        room_id: RoomId,
 169        new_server_id: ServerId,
 170    ) -> Result<RoomGuard<RefreshedRoom>> {
 171        self.room_transaction(room_id, |tx| async move {
 172            let stale_participant_filter = Condition::all()
 173                .add(room_participant::Column::RoomId.eq(room_id))
 174                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
 175                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
 176
 177            let stale_participant_user_ids = room_participant::Entity::find()
 178                .filter(stale_participant_filter.clone())
 179                .all(&*tx)
 180                .await?
 181                .into_iter()
 182                .map(|participant| participant.user_id)
 183                .collect::<Vec<_>>();
 184
 185            // Delete participants who failed to reconnect and cancel their calls.
 186            let mut canceled_calls_to_user_ids = Vec::new();
 187            room_participant::Entity::delete_many()
 188                .filter(stale_participant_filter)
 189                .exec(&*tx)
 190                .await?;
 191            let called_participants = room_participant::Entity::find()
 192                .filter(
 193                    Condition::all()
 194                        .add(
 195                            room_participant::Column::CallingUserId
 196                                .is_in(stale_participant_user_ids.iter().copied()),
 197                        )
 198                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
 199                )
 200                .all(&*tx)
 201                .await?;
 202            room_participant::Entity::delete_many()
 203                .filter(
 204                    room_participant::Column::Id
 205                        .is_in(called_participants.iter().map(|participant| participant.id)),
 206                )
 207                .exec(&*tx)
 208                .await?;
 209            canceled_calls_to_user_ids.extend(
 210                called_participants
 211                    .into_iter()
 212                    .map(|participant| participant.user_id),
 213            );
 214
 215            let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
 216            let channel_members = if let Some(channel_id) = channel_id {
 217                self.get_channel_members_internal(channel_id, &tx).await?
 218            } else {
 219                Vec::new()
 220            };
 221
 222            // Delete the room if it becomes empty.
 223            if room.participants.is_empty() {
 224                project::Entity::delete_many()
 225                    .filter(project::Column::RoomId.eq(room_id))
 226                    .exec(&*tx)
 227                    .await?;
 228                room::Entity::delete_by_id(room_id).exec(&*tx).await?;
 229            }
 230
 231            Ok(RefreshedRoom {
 232                room,
 233                channel_id,
 234                channel_members,
 235                stale_participant_user_ids,
 236                canceled_calls_to_user_ids,
 237            })
 238        })
 239        .await
 240    }
 241
 242    pub async fn delete_stale_servers(
 243        &self,
 244        environment: &str,
 245        new_server_id: ServerId,
 246    ) -> Result<()> {
 247        self.transaction(|tx| async move {
 248            server::Entity::delete_many()
 249                .filter(
 250                    Condition::all()
 251                        .add(server::Column::Environment.eq(environment))
 252                        .add(server::Column::Id.ne(new_server_id)),
 253                )
 254                .exec(&*tx)
 255                .await?;
 256            Ok(())
 257        })
 258        .await
 259    }
 260
 261    async fn stale_server_ids(
 262        &self,
 263        environment: &str,
 264        new_server_id: ServerId,
 265        tx: &DatabaseTransaction,
 266    ) -> Result<Vec<ServerId>> {
 267        let stale_servers = server::Entity::find()
 268            .filter(
 269                Condition::all()
 270                    .add(server::Column::Environment.eq(environment))
 271                    .add(server::Column::Id.ne(new_server_id)),
 272            )
 273            .all(&*tx)
 274            .await?;
 275        Ok(stale_servers.into_iter().map(|server| server.id).collect())
 276    }
 277
 278    // users
 279
 280    pub async fn create_user(
 281        &self,
 282        email_address: &str,
 283        admin: bool,
 284        params: NewUserParams,
 285    ) -> Result<NewUserResult> {
 286        self.transaction(|tx| async {
 287            let tx = tx;
 288            let user = user::Entity::insert(user::ActiveModel {
 289                email_address: ActiveValue::set(Some(email_address.into())),
 290                github_login: ActiveValue::set(params.github_login.clone()),
 291                github_user_id: ActiveValue::set(Some(params.github_user_id)),
 292                admin: ActiveValue::set(admin),
 293                metrics_id: ActiveValue::set(Uuid::new_v4()),
 294                ..Default::default()
 295            })
 296            .on_conflict(
 297                OnConflict::column(user::Column::GithubLogin)
 298                    .update_column(user::Column::GithubLogin)
 299                    .to_owned(),
 300            )
 301            .exec_with_returning(&*tx)
 302            .await?;
 303
 304            Ok(NewUserResult {
 305                user_id: user.id,
 306                metrics_id: user.metrics_id.to_string(),
 307                signup_device_id: None,
 308                inviting_user_id: None,
 309            })
 310        })
 311        .await
 312    }
 313
 314    pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
 315        self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
 316            .await
 317    }
 318
 319    pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
 320        self.transaction(|tx| async {
 321            let tx = tx;
 322            Ok(user::Entity::find()
 323                .filter(user::Column::Id.is_in(ids.iter().copied()))
 324                .all(&*tx)
 325                .await?)
 326        })
 327        .await
 328    }
 329
 330    pub async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>> {
 331        self.transaction(|tx| async move {
 332            Ok(user::Entity::find()
 333                .filter(user::Column::GithubLogin.eq(github_login))
 334                .one(&*tx)
 335                .await?)
 336        })
 337        .await
 338    }
 339
 340    pub async fn get_or_create_user_by_github_account(
 341        &self,
 342        github_login: &str,
 343        github_user_id: Option<i32>,
 344        github_email: Option<&str>,
 345    ) -> Result<Option<User>> {
 346        self.transaction(|tx| async move {
 347            let tx = &*tx;
 348            if let Some(github_user_id) = github_user_id {
 349                if let Some(user_by_github_user_id) = user::Entity::find()
 350                    .filter(user::Column::GithubUserId.eq(github_user_id))
 351                    .one(tx)
 352                    .await?
 353                {
 354                    let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
 355                    user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
 356                    Ok(Some(user_by_github_user_id.update(tx).await?))
 357                } else if let Some(user_by_github_login) = user::Entity::find()
 358                    .filter(user::Column::GithubLogin.eq(github_login))
 359                    .one(tx)
 360                    .await?
 361                {
 362                    let mut user_by_github_login = user_by_github_login.into_active_model();
 363                    user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
 364                    Ok(Some(user_by_github_login.update(tx).await?))
 365                } else {
 366                    let user = user::Entity::insert(user::ActiveModel {
 367                        email_address: ActiveValue::set(github_email.map(|email| email.into())),
 368                        github_login: ActiveValue::set(github_login.into()),
 369                        github_user_id: ActiveValue::set(Some(github_user_id)),
 370                        admin: ActiveValue::set(false),
 371                        invite_count: ActiveValue::set(0),
 372                        invite_code: ActiveValue::set(None),
 373                        metrics_id: ActiveValue::set(Uuid::new_v4()),
 374                        ..Default::default()
 375                    })
 376                    .exec_with_returning(&*tx)
 377                    .await?;
 378                    Ok(Some(user))
 379                }
 380            } else {
 381                Ok(user::Entity::find()
 382                    .filter(user::Column::GithubLogin.eq(github_login))
 383                    .one(tx)
 384                    .await?)
 385            }
 386        })
 387        .await
 388    }
 389
 390    pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
 391        self.transaction(|tx| async move {
 392            Ok(user::Entity::find()
 393                .order_by_asc(user::Column::GithubLogin)
 394                .limit(limit as u64)
 395                .offset(page as u64 * limit as u64)
 396                .all(&*tx)
 397                .await?)
 398        })
 399        .await
 400    }
 401
 402    pub async fn get_users_with_no_invites(
 403        &self,
 404        invited_by_another_user: bool,
 405    ) -> Result<Vec<User>> {
 406        self.transaction(|tx| async move {
 407            Ok(user::Entity::find()
 408                .filter(
 409                    user::Column::InviteCount
 410                        .eq(0)
 411                        .and(if invited_by_another_user {
 412                            user::Column::InviterId.is_not_null()
 413                        } else {
 414                            user::Column::InviterId.is_null()
 415                        }),
 416                )
 417                .all(&*tx)
 418                .await?)
 419        })
 420        .await
 421    }
 422
 423    pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
 424        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 425        enum QueryAs {
 426            MetricsId,
 427        }
 428
 429        self.transaction(|tx| async move {
 430            let metrics_id: Uuid = user::Entity::find_by_id(id)
 431                .select_only()
 432                .column(user::Column::MetricsId)
 433                .into_values::<_, QueryAs>()
 434                .one(&*tx)
 435                .await?
 436                .ok_or_else(|| anyhow!("could not find user"))?;
 437            Ok(metrics_id.to_string())
 438        })
 439        .await
 440    }
 441
 442    pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
 443        self.transaction(|tx| async move {
 444            user::Entity::update_many()
 445                .filter(user::Column::Id.eq(id))
 446                .set(user::ActiveModel {
 447                    admin: ActiveValue::set(is_admin),
 448                    ..Default::default()
 449                })
 450                .exec(&*tx)
 451                .await?;
 452            Ok(())
 453        })
 454        .await
 455    }
 456
 457    pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
 458        self.transaction(|tx| async move {
 459            user::Entity::update_many()
 460                .filter(user::Column::Id.eq(id))
 461                .set(user::ActiveModel {
 462                    connected_once: ActiveValue::set(connected_once),
 463                    ..Default::default()
 464                })
 465                .exec(&*tx)
 466                .await?;
 467            Ok(())
 468        })
 469        .await
 470    }
 471
 472    pub async fn destroy_user(&self, id: UserId) -> Result<()> {
 473        self.transaction(|tx| async move {
 474            access_token::Entity::delete_many()
 475                .filter(access_token::Column::UserId.eq(id))
 476                .exec(&*tx)
 477                .await?;
 478            user::Entity::delete_by_id(id).exec(&*tx).await?;
 479            Ok(())
 480        })
 481        .await
 482    }
 483
 484    // contacts
 485
 486    pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
 487        #[derive(Debug, FromQueryResult)]
 488        struct ContactWithUserBusyStatuses {
 489            user_id_a: UserId,
 490            user_id_b: UserId,
 491            a_to_b: bool,
 492            accepted: bool,
 493            should_notify: bool,
 494            user_a_busy: bool,
 495            user_b_busy: bool,
 496        }
 497
 498        self.transaction(|tx| async move {
 499            let user_a_participant = Alias::new("user_a_participant");
 500            let user_b_participant = Alias::new("user_b_participant");
 501            let mut db_contacts = contact::Entity::find()
 502                .column_as(
 503                    Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
 504                        .is_not_null(),
 505                    "user_a_busy",
 506                )
 507                .column_as(
 508                    Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
 509                        .is_not_null(),
 510                    "user_b_busy",
 511                )
 512                .filter(
 513                    contact::Column::UserIdA
 514                        .eq(user_id)
 515                        .or(contact::Column::UserIdB.eq(user_id)),
 516                )
 517                .join_as(
 518                    JoinType::LeftJoin,
 519                    contact::Relation::UserARoomParticipant.def(),
 520                    user_a_participant,
 521                )
 522                .join_as(
 523                    JoinType::LeftJoin,
 524                    contact::Relation::UserBRoomParticipant.def(),
 525                    user_b_participant,
 526                )
 527                .into_model::<ContactWithUserBusyStatuses>()
 528                .stream(&*tx)
 529                .await?;
 530
 531            let mut contacts = Vec::new();
 532            while let Some(db_contact) = db_contacts.next().await {
 533                let db_contact = db_contact?;
 534                if db_contact.user_id_a == user_id {
 535                    if db_contact.accepted {
 536                        contacts.push(Contact::Accepted {
 537                            user_id: db_contact.user_id_b,
 538                            should_notify: db_contact.should_notify && db_contact.a_to_b,
 539                            busy: db_contact.user_b_busy,
 540                        });
 541                    } else if db_contact.a_to_b {
 542                        contacts.push(Contact::Outgoing {
 543                            user_id: db_contact.user_id_b,
 544                        })
 545                    } else {
 546                        contacts.push(Contact::Incoming {
 547                            user_id: db_contact.user_id_b,
 548                            should_notify: db_contact.should_notify,
 549                        });
 550                    }
 551                } else if db_contact.accepted {
 552                    contacts.push(Contact::Accepted {
 553                        user_id: db_contact.user_id_a,
 554                        should_notify: db_contact.should_notify && !db_contact.a_to_b,
 555                        busy: db_contact.user_a_busy,
 556                    });
 557                } else if db_contact.a_to_b {
 558                    contacts.push(Contact::Incoming {
 559                        user_id: db_contact.user_id_a,
 560                        should_notify: db_contact.should_notify,
 561                    });
 562                } else {
 563                    contacts.push(Contact::Outgoing {
 564                        user_id: db_contact.user_id_a,
 565                    });
 566                }
 567            }
 568
 569            contacts.sort_unstable_by_key(|contact| contact.user_id());
 570
 571            Ok(contacts)
 572        })
 573        .await
 574    }
 575
 576    pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
 577        self.transaction(|tx| async move {
 578            let participant = room_participant::Entity::find()
 579                .filter(room_participant::Column::UserId.eq(user_id))
 580                .one(&*tx)
 581                .await?;
 582            Ok(participant.is_some())
 583        })
 584        .await
 585    }
 586
 587    pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
 588        self.transaction(|tx| async move {
 589            let (id_a, id_b) = if user_id_1 < user_id_2 {
 590                (user_id_1, user_id_2)
 591            } else {
 592                (user_id_2, user_id_1)
 593            };
 594
 595            Ok(contact::Entity::find()
 596                .filter(
 597                    contact::Column::UserIdA
 598                        .eq(id_a)
 599                        .and(contact::Column::UserIdB.eq(id_b))
 600                        .and(contact::Column::Accepted.eq(true)),
 601                )
 602                .one(&*tx)
 603                .await?
 604                .is_some())
 605        })
 606        .await
 607    }
 608
 609    pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
 610        self.transaction(|tx| async move {
 611            let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
 612                (sender_id, receiver_id, true)
 613            } else {
 614                (receiver_id, sender_id, false)
 615            };
 616
 617            let rows_affected = contact::Entity::insert(contact::ActiveModel {
 618                user_id_a: ActiveValue::set(id_a),
 619                user_id_b: ActiveValue::set(id_b),
 620                a_to_b: ActiveValue::set(a_to_b),
 621                accepted: ActiveValue::set(false),
 622                should_notify: ActiveValue::set(true),
 623                ..Default::default()
 624            })
 625            .on_conflict(
 626                OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
 627                    .values([
 628                        (contact::Column::Accepted, true.into()),
 629                        (contact::Column::ShouldNotify, false.into()),
 630                    ])
 631                    .action_and_where(
 632                        contact::Column::Accepted.eq(false).and(
 633                            contact::Column::AToB
 634                                .eq(a_to_b)
 635                                .and(contact::Column::UserIdA.eq(id_b))
 636                                .or(contact::Column::AToB
 637                                    .ne(a_to_b)
 638                                    .and(contact::Column::UserIdA.eq(id_a))),
 639                        ),
 640                    )
 641                    .to_owned(),
 642            )
 643            .exec_without_returning(&*tx)
 644            .await?;
 645
 646            if rows_affected == 1 {
 647                Ok(())
 648            } else {
 649                Err(anyhow!("contact already requested"))?
 650            }
 651        })
 652        .await
 653    }
 654
 655    /// Returns a bool indicating whether the removed contact had originally accepted or not
 656    ///
 657    /// Deletes the contact identified by the requester and responder ids, and then returns
 658    /// whether the deleted contact had originally accepted or was a pending contact request.
 659    ///
 660    /// # Arguments
 661    ///
 662    /// * `requester_id` - The user that initiates this request
 663    /// * `responder_id` - The user that will be removed
 664    pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<bool> {
 665        self.transaction(|tx| async move {
 666            let (id_a, id_b) = if responder_id < requester_id {
 667                (responder_id, requester_id)
 668            } else {
 669                (requester_id, responder_id)
 670            };
 671
 672            let contact = contact::Entity::find()
 673                .filter(
 674                    contact::Column::UserIdA
 675                        .eq(id_a)
 676                        .and(contact::Column::UserIdB.eq(id_b)),
 677                )
 678                .one(&*tx)
 679                .await?
 680                .ok_or_else(|| anyhow!("no such contact"))?;
 681
 682            contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
 683            Ok(contact.accepted)
 684        })
 685        .await
 686    }
 687
 688    pub async fn dismiss_contact_notification(
 689        &self,
 690        user_id: UserId,
 691        contact_user_id: UserId,
 692    ) -> Result<()> {
 693        self.transaction(|tx| async move {
 694            let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
 695                (user_id, contact_user_id, true)
 696            } else {
 697                (contact_user_id, user_id, false)
 698            };
 699
 700            let result = contact::Entity::update_many()
 701                .set(contact::ActiveModel {
 702                    should_notify: ActiveValue::set(false),
 703                    ..Default::default()
 704                })
 705                .filter(
 706                    contact::Column::UserIdA
 707                        .eq(id_a)
 708                        .and(contact::Column::UserIdB.eq(id_b))
 709                        .and(
 710                            contact::Column::AToB
 711                                .eq(a_to_b)
 712                                .and(contact::Column::Accepted.eq(true))
 713                                .or(contact::Column::AToB
 714                                    .ne(a_to_b)
 715                                    .and(contact::Column::Accepted.eq(false))),
 716                        ),
 717                )
 718                .exec(&*tx)
 719                .await?;
 720            if result.rows_affected == 0 {
 721                Err(anyhow!("no such contact request"))?
 722            } else {
 723                Ok(())
 724            }
 725        })
 726        .await
 727    }
 728
 729    pub async fn respond_to_contact_request(
 730        &self,
 731        responder_id: UserId,
 732        requester_id: UserId,
 733        accept: bool,
 734    ) -> Result<()> {
 735        self.transaction(|tx| async move {
 736            let (id_a, id_b, a_to_b) = if responder_id < requester_id {
 737                (responder_id, requester_id, false)
 738            } else {
 739                (requester_id, responder_id, true)
 740            };
 741            let rows_affected = if accept {
 742                let result = contact::Entity::update_many()
 743                    .set(contact::ActiveModel {
 744                        accepted: ActiveValue::set(true),
 745                        should_notify: ActiveValue::set(true),
 746                        ..Default::default()
 747                    })
 748                    .filter(
 749                        contact::Column::UserIdA
 750                            .eq(id_a)
 751                            .and(contact::Column::UserIdB.eq(id_b))
 752                            .and(contact::Column::AToB.eq(a_to_b)),
 753                    )
 754                    .exec(&*tx)
 755                    .await?;
 756                result.rows_affected
 757            } else {
 758                let result = contact::Entity::delete_many()
 759                    .filter(
 760                        contact::Column::UserIdA
 761                            .eq(id_a)
 762                            .and(contact::Column::UserIdB.eq(id_b))
 763                            .and(contact::Column::AToB.eq(a_to_b))
 764                            .and(contact::Column::Accepted.eq(false)),
 765                    )
 766                    .exec(&*tx)
 767                    .await?;
 768
 769                result.rows_affected
 770            };
 771
 772            if rows_affected == 1 {
 773                Ok(())
 774            } else {
 775                Err(anyhow!("no such contact request"))?
 776            }
 777        })
 778        .await
 779    }
 780
 781    pub fn fuzzy_like_string(string: &str) -> String {
 782        let mut result = String::with_capacity(string.len() * 2 + 1);
 783        for c in string.chars() {
 784            if c.is_alphanumeric() {
 785                result.push('%');
 786                result.push(c);
 787            }
 788        }
 789        result.push('%');
 790        result
 791    }
 792
 793    pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
 794        self.transaction(|tx| async {
 795            let tx = tx;
 796            let like_string = Self::fuzzy_like_string(name_query);
 797            let query = "
 798                SELECT users.*
 799                FROM users
 800                WHERE github_login ILIKE $1
 801                ORDER BY github_login <-> $2
 802                LIMIT $3
 803            ";
 804
 805            Ok(user::Entity::find()
 806                .from_raw_sql(Statement::from_sql_and_values(
 807                    self.pool.get_database_backend(),
 808                    query.into(),
 809                    vec![like_string.into(), name_query.into(), limit.into()],
 810                ))
 811                .all(&*tx)
 812                .await?)
 813        })
 814        .await
 815    }
 816
 817    // signups
 818
 819    pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
 820        self.transaction(|tx| async move {
 821            signup::Entity::insert(signup::ActiveModel {
 822                email_address: ActiveValue::set(signup.email_address.clone()),
 823                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 824                email_confirmation_sent: ActiveValue::set(false),
 825                platform_mac: ActiveValue::set(signup.platform_mac),
 826                platform_windows: ActiveValue::set(signup.platform_windows),
 827                platform_linux: ActiveValue::set(signup.platform_linux),
 828                platform_unknown: ActiveValue::set(false),
 829                editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
 830                programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
 831                device_id: ActiveValue::set(signup.device_id.clone()),
 832                added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
 833                ..Default::default()
 834            })
 835            .on_conflict(
 836                OnConflict::column(signup::Column::EmailAddress)
 837                    .update_columns([
 838                        signup::Column::PlatformMac,
 839                        signup::Column::PlatformWindows,
 840                        signup::Column::PlatformLinux,
 841                        signup::Column::EditorFeatures,
 842                        signup::Column::ProgrammingLanguages,
 843                        signup::Column::DeviceId,
 844                        signup::Column::AddedToMailingList,
 845                    ])
 846                    .to_owned(),
 847            )
 848            .exec(&*tx)
 849            .await?;
 850            Ok(())
 851        })
 852        .await
 853    }
 854
 855    pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
 856        self.transaction(|tx| async move {
 857            let signup = signup::Entity::find()
 858                .filter(signup::Column::EmailAddress.eq(email_address))
 859                .one(&*tx)
 860                .await?
 861                .ok_or_else(|| {
 862                    anyhow!("signup with email address {} doesn't exist", email_address)
 863                })?;
 864
 865            Ok(signup)
 866        })
 867        .await
 868    }
 869
 870    pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
 871        self.transaction(|tx| async move {
 872            let query = "
 873                SELECT
 874                    COUNT(*) as count,
 875                    COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
 876                    COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
 877                    COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
 878                    COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
 879                FROM (
 880                    SELECT *
 881                    FROM signups
 882                    WHERE
 883                        NOT email_confirmation_sent
 884                ) AS unsent
 885            ";
 886            Ok(
 887                WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
 888                    self.pool.get_database_backend(),
 889                    query.into(),
 890                    vec![],
 891                ))
 892                .one(&*tx)
 893                .await?
 894                .ok_or_else(|| anyhow!("invalid result"))?,
 895            )
 896        })
 897        .await
 898    }
 899
 900    pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
 901        let emails = invites
 902            .iter()
 903            .map(|s| s.email_address.as_str())
 904            .collect::<Vec<_>>();
 905        self.transaction(|tx| async {
 906            let tx = tx;
 907            signup::Entity::update_many()
 908                .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
 909                .set(signup::ActiveModel {
 910                    email_confirmation_sent: ActiveValue::set(true),
 911                    ..Default::default()
 912                })
 913                .exec(&*tx)
 914                .await?;
 915            Ok(())
 916        })
 917        .await
 918    }
 919
 920    pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
 921        self.transaction(|tx| async move {
 922            Ok(signup::Entity::find()
 923                .select_only()
 924                .column(signup::Column::EmailAddress)
 925                .column(signup::Column::EmailConfirmationCode)
 926                .filter(
 927                    signup::Column::EmailConfirmationSent.eq(false).and(
 928                        signup::Column::PlatformMac
 929                            .eq(true)
 930                            .or(signup::Column::PlatformUnknown.eq(true)),
 931                    ),
 932                )
 933                .order_by_asc(signup::Column::CreatedAt)
 934                .limit(count as u64)
 935                .into_model()
 936                .all(&*tx)
 937                .await?)
 938        })
 939        .await
 940    }
 941
 942    // invite codes
 943
 944    pub async fn create_invite_from_code(
 945        &self,
 946        code: &str,
 947        email_address: &str,
 948        device_id: Option<&str>,
 949        added_to_mailing_list: bool,
 950    ) -> Result<Invite> {
 951        self.transaction(|tx| async move {
 952            let existing_user = user::Entity::find()
 953                .filter(user::Column::EmailAddress.eq(email_address))
 954                .one(&*tx)
 955                .await?;
 956
 957            if existing_user.is_some() {
 958                Err(anyhow!("email address is already in use"))?;
 959            }
 960
 961            let inviting_user_with_invites = match user::Entity::find()
 962                .filter(
 963                    user::Column::InviteCode
 964                        .eq(code)
 965                        .and(user::Column::InviteCount.gt(0)),
 966                )
 967                .one(&*tx)
 968                .await?
 969            {
 970                Some(inviting_user) => inviting_user,
 971                None => {
 972                    return Err(Error::Http(
 973                        StatusCode::UNAUTHORIZED,
 974                        "unable to find an invite code with invites remaining".to_string(),
 975                    ))?
 976                }
 977            };
 978            user::Entity::update_many()
 979                .filter(
 980                    user::Column::Id
 981                        .eq(inviting_user_with_invites.id)
 982                        .and(user::Column::InviteCount.gt(0)),
 983                )
 984                .col_expr(
 985                    user::Column::InviteCount,
 986                    Expr::col(user::Column::InviteCount).sub(1),
 987                )
 988                .exec(&*tx)
 989                .await?;
 990
 991            let signup = signup::Entity::insert(signup::ActiveModel {
 992                email_address: ActiveValue::set(email_address.into()),
 993                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 994                email_confirmation_sent: ActiveValue::set(false),
 995                inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
 996                platform_linux: ActiveValue::set(false),
 997                platform_mac: ActiveValue::set(false),
 998                platform_windows: ActiveValue::set(false),
 999                platform_unknown: ActiveValue::set(true),
1000                device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
1001                added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
1002                ..Default::default()
1003            })
1004            .on_conflict(
1005                OnConflict::column(signup::Column::EmailAddress)
1006                    .update_column(signup::Column::InvitingUserId)
1007                    .to_owned(),
1008            )
1009            .exec_with_returning(&*tx)
1010            .await?;
1011
1012            Ok(Invite {
1013                email_address: signup.email_address,
1014                email_confirmation_code: signup.email_confirmation_code,
1015            })
1016        })
1017        .await
1018    }
1019
1020    pub async fn create_user_from_invite(
1021        &self,
1022        invite: &Invite,
1023        user: NewUserParams,
1024    ) -> Result<Option<NewUserResult>> {
1025        self.transaction(|tx| async {
1026            let tx = tx;
1027            let signup = signup::Entity::find()
1028                .filter(
1029                    signup::Column::EmailAddress
1030                        .eq(invite.email_address.as_str())
1031                        .and(
1032                            signup::Column::EmailConfirmationCode
1033                                .eq(invite.email_confirmation_code.as_str()),
1034                        ),
1035                )
1036                .one(&*tx)
1037                .await?
1038                .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
1039
1040            if signup.user_id.is_some() {
1041                return Ok(None);
1042            }
1043
1044            let user = user::Entity::insert(user::ActiveModel {
1045                email_address: ActiveValue::set(Some(invite.email_address.clone())),
1046                github_login: ActiveValue::set(user.github_login.clone()),
1047                github_user_id: ActiveValue::set(Some(user.github_user_id)),
1048                admin: ActiveValue::set(false),
1049                invite_count: ActiveValue::set(user.invite_count),
1050                invite_code: ActiveValue::set(Some(random_invite_code())),
1051                metrics_id: ActiveValue::set(Uuid::new_v4()),
1052                ..Default::default()
1053            })
1054            .on_conflict(
1055                OnConflict::column(user::Column::GithubLogin)
1056                    .update_columns([
1057                        user::Column::EmailAddress,
1058                        user::Column::GithubUserId,
1059                        user::Column::Admin,
1060                    ])
1061                    .to_owned(),
1062            )
1063            .exec_with_returning(&*tx)
1064            .await?;
1065
1066            let mut signup = signup.into_active_model();
1067            signup.user_id = ActiveValue::set(Some(user.id));
1068            let signup = signup.update(&*tx).await?;
1069
1070            if let Some(inviting_user_id) = signup.inviting_user_id {
1071                let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1072                    (inviting_user_id, user.id, true)
1073                } else {
1074                    (user.id, inviting_user_id, false)
1075                };
1076
1077                contact::Entity::insert(contact::ActiveModel {
1078                    user_id_a: ActiveValue::set(user_id_a),
1079                    user_id_b: ActiveValue::set(user_id_b),
1080                    a_to_b: ActiveValue::set(a_to_b),
1081                    should_notify: ActiveValue::set(true),
1082                    accepted: ActiveValue::set(true),
1083                    ..Default::default()
1084                })
1085                .on_conflict(OnConflict::new().do_nothing().to_owned())
1086                .exec_without_returning(&*tx)
1087                .await?;
1088            }
1089
1090            Ok(Some(NewUserResult {
1091                user_id: user.id,
1092                metrics_id: user.metrics_id.to_string(),
1093                inviting_user_id: signup.inviting_user_id,
1094                signup_device_id: signup.device_id,
1095            }))
1096        })
1097        .await
1098    }
1099
1100    pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1101        self.transaction(|tx| async move {
1102            if count > 0 {
1103                user::Entity::update_many()
1104                    .filter(
1105                        user::Column::Id
1106                            .eq(id)
1107                            .and(user::Column::InviteCode.is_null()),
1108                    )
1109                    .set(user::ActiveModel {
1110                        invite_code: ActiveValue::set(Some(random_invite_code())),
1111                        ..Default::default()
1112                    })
1113                    .exec(&*tx)
1114                    .await?;
1115            }
1116
1117            user::Entity::update_many()
1118                .filter(user::Column::Id.eq(id))
1119                .set(user::ActiveModel {
1120                    invite_count: ActiveValue::set(count),
1121                    ..Default::default()
1122                })
1123                .exec(&*tx)
1124                .await?;
1125            Ok(())
1126        })
1127        .await
1128    }
1129
1130    pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1131        self.transaction(|tx| async move {
1132            match user::Entity::find_by_id(id).one(&*tx).await? {
1133                Some(user) if user.invite_code.is_some() => {
1134                    Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1135                }
1136                _ => Ok(None),
1137            }
1138        })
1139        .await
1140    }
1141
1142    pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1143        self.transaction(|tx| async move {
1144            user::Entity::find()
1145                .filter(user::Column::InviteCode.eq(code))
1146                .one(&*tx)
1147                .await?
1148                .ok_or_else(|| {
1149                    Error::Http(
1150                        StatusCode::NOT_FOUND,
1151                        "that invite code does not exist".to_string(),
1152                    )
1153                })
1154        })
1155        .await
1156    }
1157
1158    // rooms
1159
1160    pub async fn incoming_call_for_user(
1161        &self,
1162        user_id: UserId,
1163    ) -> Result<Option<proto::IncomingCall>> {
1164        self.transaction(|tx| async move {
1165            let pending_participant = room_participant::Entity::find()
1166                .filter(
1167                    room_participant::Column::UserId
1168                        .eq(user_id)
1169                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1170                )
1171                .one(&*tx)
1172                .await?;
1173
1174            if let Some(pending_participant) = pending_participant {
1175                let room = self.get_room(pending_participant.room_id, &tx).await?;
1176                Ok(Self::build_incoming_call(&room, user_id))
1177            } else {
1178                Ok(None)
1179            }
1180        })
1181        .await
1182    }
1183
1184    pub async fn create_room(
1185        &self,
1186        user_id: UserId,
1187        connection: ConnectionId,
1188        live_kit_room: &str,
1189    ) -> Result<proto::Room> {
1190        self.transaction(|tx| async move {
1191            let room = room::ActiveModel {
1192                live_kit_room: ActiveValue::set(live_kit_room.into()),
1193                ..Default::default()
1194            }
1195            .insert(&*tx)
1196            .await?;
1197            room_participant::ActiveModel {
1198                room_id: ActiveValue::set(room.id),
1199                user_id: ActiveValue::set(user_id),
1200                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1201                answering_connection_server_id: ActiveValue::set(Some(ServerId(
1202                    connection.owner_id as i32,
1203                ))),
1204                answering_connection_lost: ActiveValue::set(false),
1205                calling_user_id: ActiveValue::set(user_id),
1206                calling_connection_id: ActiveValue::set(connection.id as i32),
1207                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1208                    connection.owner_id as i32,
1209                ))),
1210                ..Default::default()
1211            }
1212            .insert(&*tx)
1213            .await?;
1214
1215            let room = self.get_room(room.id, &tx).await?;
1216            Ok(room)
1217        })
1218        .await
1219    }
1220
1221    pub async fn call(
1222        &self,
1223        room_id: RoomId,
1224        calling_user_id: UserId,
1225        calling_connection: ConnectionId,
1226        called_user_id: UserId,
1227        initial_project_id: Option<ProjectId>,
1228    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1229        self.room_transaction(room_id, |tx| async move {
1230            room_participant::ActiveModel {
1231                room_id: ActiveValue::set(room_id),
1232                user_id: ActiveValue::set(called_user_id),
1233                answering_connection_lost: ActiveValue::set(false),
1234                calling_user_id: ActiveValue::set(calling_user_id),
1235                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1236                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1237                    calling_connection.owner_id as i32,
1238                ))),
1239                initial_project_id: ActiveValue::set(initial_project_id),
1240                ..Default::default()
1241            }
1242            .insert(&*tx)
1243            .await?;
1244
1245            let room = self.get_room(room_id, &tx).await?;
1246            let incoming_call = Self::build_incoming_call(&room, called_user_id)
1247                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1248            Ok((room, incoming_call))
1249        })
1250        .await
1251    }
1252
1253    pub async fn call_failed(
1254        &self,
1255        room_id: RoomId,
1256        called_user_id: UserId,
1257    ) -> Result<RoomGuard<proto::Room>> {
1258        self.room_transaction(room_id, |tx| async move {
1259            room_participant::Entity::delete_many()
1260                .filter(
1261                    room_participant::Column::RoomId
1262                        .eq(room_id)
1263                        .and(room_participant::Column::UserId.eq(called_user_id)),
1264                )
1265                .exec(&*tx)
1266                .await?;
1267            let room = self.get_room(room_id, &tx).await?;
1268            Ok(room)
1269        })
1270        .await
1271    }
1272
1273    pub async fn decline_call(
1274        &self,
1275        expected_room_id: Option<RoomId>,
1276        user_id: UserId,
1277    ) -> Result<Option<RoomGuard<proto::Room>>> {
1278        self.optional_room_transaction(|tx| async move {
1279            let mut filter = Condition::all()
1280                .add(room_participant::Column::UserId.eq(user_id))
1281                .add(room_participant::Column::AnsweringConnectionId.is_null());
1282            if let Some(room_id) = expected_room_id {
1283                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1284            }
1285            let participant = room_participant::Entity::find()
1286                .filter(filter)
1287                .one(&*tx)
1288                .await?;
1289
1290            let participant = if let Some(participant) = participant {
1291                participant
1292            } else if expected_room_id.is_some() {
1293                return Err(anyhow!("could not find call to decline"))?;
1294            } else {
1295                return Ok(None);
1296            };
1297
1298            let room_id = participant.room_id;
1299            room_participant::Entity::delete(participant.into_active_model())
1300                .exec(&*tx)
1301                .await?;
1302
1303            let room = self.get_room(room_id, &tx).await?;
1304            Ok(Some((room_id, room)))
1305        })
1306        .await
1307    }
1308
1309    pub async fn cancel_call(
1310        &self,
1311        room_id: RoomId,
1312        calling_connection: ConnectionId,
1313        called_user_id: UserId,
1314    ) -> Result<RoomGuard<proto::Room>> {
1315        self.room_transaction(room_id, |tx| async move {
1316            let participant = room_participant::Entity::find()
1317                .filter(
1318                    Condition::all()
1319                        .add(room_participant::Column::UserId.eq(called_user_id))
1320                        .add(room_participant::Column::RoomId.eq(room_id))
1321                        .add(
1322                            room_participant::Column::CallingConnectionId
1323                                .eq(calling_connection.id as i32),
1324                        )
1325                        .add(
1326                            room_participant::Column::CallingConnectionServerId
1327                                .eq(calling_connection.owner_id as i32),
1328                        )
1329                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1330                )
1331                .one(&*tx)
1332                .await?
1333                .ok_or_else(|| anyhow!("no call to cancel"))?;
1334
1335            room_participant::Entity::delete(participant.into_active_model())
1336                .exec(&*tx)
1337                .await?;
1338
1339            let room = self.get_room(room_id, &tx).await?;
1340            Ok(room)
1341        })
1342        .await
1343    }
1344
1345    pub async fn join_room(
1346        &self,
1347        room_id: RoomId,
1348        user_id: UserId,
1349        channel_id: Option<ChannelId>,
1350        connection: ConnectionId,
1351    ) -> Result<RoomGuard<JoinRoom>> {
1352        self.room_transaction(room_id, |tx| async move {
1353            if let Some(channel_id) = channel_id {
1354                channel_member::Entity::find()
1355                    .filter(
1356                        channel_member::Column::ChannelId
1357                            .eq(channel_id)
1358                            .and(channel_member::Column::UserId.eq(user_id))
1359                            .and(channel_member::Column::Accepted.eq(true)),
1360                    )
1361                    .one(&*tx)
1362                    .await?
1363                    .ok_or_else(|| anyhow!("no such channel membership"))?;
1364
1365                room_participant::ActiveModel {
1366                    room_id: ActiveValue::set(room_id),
1367                    user_id: ActiveValue::set(user_id),
1368                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1369                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1370                        connection.owner_id as i32,
1371                    ))),
1372                    answering_connection_lost: ActiveValue::set(false),
1373                    // Redundant for the channel join use case, used for channel and call invitations
1374                    calling_user_id: ActiveValue::set(user_id),
1375                    calling_connection_id: ActiveValue::set(connection.id as i32),
1376                    calling_connection_server_id: ActiveValue::set(Some(ServerId(
1377                        connection.owner_id as i32,
1378                    ))),
1379                    ..Default::default()
1380                }
1381                .insert(&*tx)
1382                .await?;
1383            } else {
1384                let result = room_participant::Entity::update_many()
1385                    .filter(
1386                        Condition::all()
1387                            .add(room_participant::Column::RoomId.eq(room_id))
1388                            .add(room_participant::Column::UserId.eq(user_id))
1389                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
1390                    )
1391                    .set(room_participant::ActiveModel {
1392                        answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1393                        answering_connection_server_id: ActiveValue::set(Some(ServerId(
1394                            connection.owner_id as i32,
1395                        ))),
1396                        answering_connection_lost: ActiveValue::set(false),
1397                        ..Default::default()
1398                    })
1399                    .exec(&*tx)
1400                    .await?;
1401                if result.rows_affected == 0 {
1402                    Err(anyhow!("room does not exist or was already joined"))?;
1403                }
1404            }
1405
1406            let room = self.get_room(room_id, &tx).await?;
1407            let channel_members = if let Some(channel_id) = channel_id {
1408                self.get_channel_members_internal(channel_id, &tx).await?
1409            } else {
1410                Vec::new()
1411            };
1412            Ok(JoinRoom {
1413                room,
1414                channel_id,
1415                channel_members,
1416            })
1417        })
1418        .await
1419    }
1420
1421    pub async fn rejoin_room(
1422        &self,
1423        rejoin_room: proto::RejoinRoom,
1424        user_id: UserId,
1425        connection: ConnectionId,
1426    ) -> Result<RoomGuard<RejoinedRoom>> {
1427        let room_id = RoomId::from_proto(rejoin_room.id);
1428        self.room_transaction(room_id, |tx| async {
1429            let tx = tx;
1430            let participant_update = room_participant::Entity::update_many()
1431                .filter(
1432                    Condition::all()
1433                        .add(room_participant::Column::RoomId.eq(room_id))
1434                        .add(room_participant::Column::UserId.eq(user_id))
1435                        .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1436                        .add(
1437                            Condition::any()
1438                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1439                                .add(
1440                                    room_participant::Column::AnsweringConnectionServerId
1441                                        .ne(connection.owner_id as i32),
1442                                ),
1443                        ),
1444                )
1445                .set(room_participant::ActiveModel {
1446                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1447                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1448                        connection.owner_id as i32,
1449                    ))),
1450                    answering_connection_lost: ActiveValue::set(false),
1451                    ..Default::default()
1452                })
1453                .exec(&*tx)
1454                .await?;
1455            if participant_update.rows_affected == 0 {
1456                return Err(anyhow!("room does not exist or was already joined"))?;
1457            }
1458
1459            let mut reshared_projects = Vec::new();
1460            for reshared_project in &rejoin_room.reshared_projects {
1461                let project_id = ProjectId::from_proto(reshared_project.project_id);
1462                let project = project::Entity::find_by_id(project_id)
1463                    .one(&*tx)
1464                    .await?
1465                    .ok_or_else(|| anyhow!("project does not exist"))?;
1466                if project.host_user_id != user_id {
1467                    return Err(anyhow!("no such project"))?;
1468                }
1469
1470                let mut collaborators = project
1471                    .find_related(project_collaborator::Entity)
1472                    .all(&*tx)
1473                    .await?;
1474                let host_ix = collaborators
1475                    .iter()
1476                    .position(|collaborator| {
1477                        collaborator.user_id == user_id && collaborator.is_host
1478                    })
1479                    .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1480                let host = collaborators.swap_remove(host_ix);
1481                let old_connection_id = host.connection();
1482
1483                project::Entity::update(project::ActiveModel {
1484                    host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1485                    host_connection_server_id: ActiveValue::set(Some(ServerId(
1486                        connection.owner_id as i32,
1487                    ))),
1488                    ..project.into_active_model()
1489                })
1490                .exec(&*tx)
1491                .await?;
1492                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1493                    connection_id: ActiveValue::set(connection.id as i32),
1494                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1495                    ..host.into_active_model()
1496                })
1497                .exec(&*tx)
1498                .await?;
1499
1500                self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1501                    .await?;
1502
1503                reshared_projects.push(ResharedProject {
1504                    id: project_id,
1505                    old_connection_id,
1506                    collaborators: collaborators
1507                        .iter()
1508                        .map(|collaborator| ProjectCollaborator {
1509                            connection_id: collaborator.connection(),
1510                            user_id: collaborator.user_id,
1511                            replica_id: collaborator.replica_id,
1512                            is_host: collaborator.is_host,
1513                        })
1514                        .collect(),
1515                    worktrees: reshared_project.worktrees.clone(),
1516                });
1517            }
1518
1519            project::Entity::delete_many()
1520                .filter(
1521                    Condition::all()
1522                        .add(project::Column::RoomId.eq(room_id))
1523                        .add(project::Column::HostUserId.eq(user_id))
1524                        .add(
1525                            project::Column::Id
1526                                .is_not_in(reshared_projects.iter().map(|project| project.id)),
1527                        ),
1528                )
1529                .exec(&*tx)
1530                .await?;
1531
1532            let mut rejoined_projects = Vec::new();
1533            for rejoined_project in &rejoin_room.rejoined_projects {
1534                let project_id = ProjectId::from_proto(rejoined_project.id);
1535                let Some(project) = project::Entity::find_by_id(project_id)
1536                    .one(&*tx)
1537                    .await? else { continue };
1538
1539                let mut worktrees = Vec::new();
1540                let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1541                for db_worktree in db_worktrees {
1542                    let mut worktree = RejoinedWorktree {
1543                        id: db_worktree.id as u64,
1544                        abs_path: db_worktree.abs_path,
1545                        root_name: db_worktree.root_name,
1546                        visible: db_worktree.visible,
1547                        updated_entries: Default::default(),
1548                        removed_entries: Default::default(),
1549                        updated_repositories: Default::default(),
1550                        removed_repositories: Default::default(),
1551                        diagnostic_summaries: Default::default(),
1552                        settings_files: Default::default(),
1553                        scan_id: db_worktree.scan_id as u64,
1554                        completed_scan_id: db_worktree.completed_scan_id as u64,
1555                    };
1556
1557                    let rejoined_worktree = rejoined_project
1558                        .worktrees
1559                        .iter()
1560                        .find(|worktree| worktree.id == db_worktree.id as u64);
1561
1562                    // File entries
1563                    {
1564                        let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1565                            worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1566                        } else {
1567                            worktree_entry::Column::IsDeleted.eq(false)
1568                        };
1569
1570                        let mut db_entries = worktree_entry::Entity::find()
1571                            .filter(
1572                                Condition::all()
1573                                    .add(worktree_entry::Column::ProjectId.eq(project.id))
1574                                    .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1575                                    .add(entry_filter),
1576                            )
1577                            .stream(&*tx)
1578                            .await?;
1579
1580                        while let Some(db_entry) = db_entries.next().await {
1581                            let db_entry = db_entry?;
1582                            if db_entry.is_deleted {
1583                                worktree.removed_entries.push(db_entry.id as u64);
1584                            } else {
1585                                worktree.updated_entries.push(proto::Entry {
1586                                    id: db_entry.id as u64,
1587                                    is_dir: db_entry.is_dir,
1588                                    path: db_entry.path,
1589                                    inode: db_entry.inode as u64,
1590                                    mtime: Some(proto::Timestamp {
1591                                        seconds: db_entry.mtime_seconds as u64,
1592                                        nanos: db_entry.mtime_nanos as u32,
1593                                    }),
1594                                    is_symlink: db_entry.is_symlink,
1595                                    is_ignored: db_entry.is_ignored,
1596                                    is_external: db_entry.is_external,
1597                                    git_status: db_entry.git_status.map(|status| status as i32),
1598                                });
1599                            }
1600                        }
1601                    }
1602
1603                    // Repository Entries
1604                    {
1605                        let repository_entry_filter =
1606                            if let Some(rejoined_worktree) = rejoined_worktree {
1607                                worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
1608                            } else {
1609                                worktree_repository::Column::IsDeleted.eq(false)
1610                            };
1611
1612                        let mut db_repositories = worktree_repository::Entity::find()
1613                            .filter(
1614                                Condition::all()
1615                                    .add(worktree_repository::Column::ProjectId.eq(project.id))
1616                                    .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
1617                                    .add(repository_entry_filter),
1618                            )
1619                            .stream(&*tx)
1620                            .await?;
1621
1622                        while let Some(db_repository) = db_repositories.next().await {
1623                            let db_repository = db_repository?;
1624                            if db_repository.is_deleted {
1625                                worktree
1626                                    .removed_repositories
1627                                    .push(db_repository.work_directory_id as u64);
1628                            } else {
1629                                worktree.updated_repositories.push(proto::RepositoryEntry {
1630                                    work_directory_id: db_repository.work_directory_id as u64,
1631                                    branch: db_repository.branch,
1632                                });
1633                            }
1634                        }
1635                    }
1636
1637                    worktrees.push(worktree);
1638                }
1639
1640                let language_servers = project
1641                    .find_related(language_server::Entity)
1642                    .all(&*tx)
1643                    .await?
1644                    .into_iter()
1645                    .map(|language_server| proto::LanguageServer {
1646                        id: language_server.id as u64,
1647                        name: language_server.name,
1648                    })
1649                    .collect::<Vec<_>>();
1650
1651                {
1652                    let mut db_settings_files = worktree_settings_file::Entity::find()
1653                        .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
1654                        .stream(&*tx)
1655                        .await?;
1656                    while let Some(db_settings_file) = db_settings_files.next().await {
1657                        let db_settings_file = db_settings_file?;
1658                        if let Some(worktree) = worktrees
1659                            .iter_mut()
1660                            .find(|w| w.id == db_settings_file.worktree_id as u64)
1661                        {
1662                            worktree.settings_files.push(WorktreeSettingsFile {
1663                                path: db_settings_file.path,
1664                                content: db_settings_file.content,
1665                            });
1666                        }
1667                    }
1668                }
1669
1670                let mut collaborators = project
1671                    .find_related(project_collaborator::Entity)
1672                    .all(&*tx)
1673                    .await?;
1674                let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1675                    .iter()
1676                    .position(|collaborator| collaborator.user_id == user_id)
1677                {
1678                    collaborators.swap_remove(self_collaborator_ix)
1679                } else {
1680                    continue;
1681                };
1682                let old_connection_id = self_collaborator.connection();
1683                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1684                    connection_id: ActiveValue::set(connection.id as i32),
1685                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1686                    ..self_collaborator.into_active_model()
1687                })
1688                .exec(&*tx)
1689                .await?;
1690
1691                let collaborators = collaborators
1692                    .into_iter()
1693                    .map(|collaborator| ProjectCollaborator {
1694                        connection_id: collaborator.connection(),
1695                        user_id: collaborator.user_id,
1696                        replica_id: collaborator.replica_id,
1697                        is_host: collaborator.is_host,
1698                    })
1699                    .collect::<Vec<_>>();
1700
1701                rejoined_projects.push(RejoinedProject {
1702                    id: project_id,
1703                    old_connection_id,
1704                    collaborators,
1705                    worktrees,
1706                    language_servers,
1707                });
1708            }
1709
1710            let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
1711
1712            let channel_members = if let Some(channel_id) = channel_id {
1713                self.get_channel_members_internal(channel_id, &tx).await?
1714            } else {
1715                Vec::new()
1716            };
1717
1718            Ok(RejoinedRoom {
1719                room,
1720                channel_id,
1721                channel_members,
1722                rejoined_projects,
1723                reshared_projects,
1724            })
1725        })
1726        .await
1727    }
1728
1729    pub async fn leave_room(
1730        &self,
1731        connection: ConnectionId,
1732    ) -> Result<Option<RoomGuard<LeftRoom>>> {
1733        self.optional_room_transaction(|tx| async move {
1734            let leaving_participant = room_participant::Entity::find()
1735                .filter(
1736                    Condition::all()
1737                        .add(
1738                            room_participant::Column::AnsweringConnectionId
1739                                .eq(connection.id as i32),
1740                        )
1741                        .add(
1742                            room_participant::Column::AnsweringConnectionServerId
1743                                .eq(connection.owner_id as i32),
1744                        ),
1745                )
1746                .one(&*tx)
1747                .await?;
1748
1749            if let Some(leaving_participant) = leaving_participant {
1750                // Leave room.
1751                let room_id = leaving_participant.room_id;
1752                room_participant::Entity::delete_by_id(leaving_participant.id)
1753                    .exec(&*tx)
1754                    .await?;
1755
1756                // Cancel pending calls initiated by the leaving user.
1757                let called_participants = room_participant::Entity::find()
1758                    .filter(
1759                        Condition::all()
1760                            .add(
1761                                room_participant::Column::CallingUserId
1762                                    .eq(leaving_participant.user_id),
1763                            )
1764                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
1765                    )
1766                    .all(&*tx)
1767                    .await?;
1768                room_participant::Entity::delete_many()
1769                    .filter(
1770                        room_participant::Column::Id
1771                            .is_in(called_participants.iter().map(|participant| participant.id)),
1772                    )
1773                    .exec(&*tx)
1774                    .await?;
1775                let canceled_calls_to_user_ids = called_participants
1776                    .into_iter()
1777                    .map(|participant| participant.user_id)
1778                    .collect();
1779
1780                // Detect left projects.
1781                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1782                enum QueryProjectIds {
1783                    ProjectId,
1784                }
1785                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1786                    .select_only()
1787                    .column_as(
1788                        project_collaborator::Column::ProjectId,
1789                        QueryProjectIds::ProjectId,
1790                    )
1791                    .filter(
1792                        Condition::all()
1793                            .add(
1794                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1795                            )
1796                            .add(
1797                                project_collaborator::Column::ConnectionServerId
1798                                    .eq(connection.owner_id as i32),
1799                            ),
1800                    )
1801                    .into_values::<_, QueryProjectIds>()
1802                    .all(&*tx)
1803                    .await?;
1804                let mut left_projects = HashMap::default();
1805                let mut collaborators = project_collaborator::Entity::find()
1806                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1807                    .stream(&*tx)
1808                    .await?;
1809                while let Some(collaborator) = collaborators.next().await {
1810                    let collaborator = collaborator?;
1811                    let left_project =
1812                        left_projects
1813                            .entry(collaborator.project_id)
1814                            .or_insert(LeftProject {
1815                                id: collaborator.project_id,
1816                                host_user_id: Default::default(),
1817                                connection_ids: Default::default(),
1818                                host_connection_id: Default::default(),
1819                            });
1820
1821                    let collaborator_connection_id = collaborator.connection();
1822                    if collaborator_connection_id != connection {
1823                        left_project.connection_ids.push(collaborator_connection_id);
1824                    }
1825
1826                    if collaborator.is_host {
1827                        left_project.host_user_id = collaborator.user_id;
1828                        left_project.host_connection_id = collaborator_connection_id;
1829                    }
1830                }
1831                drop(collaborators);
1832
1833                // Leave projects.
1834                project_collaborator::Entity::delete_many()
1835                    .filter(
1836                        Condition::all()
1837                            .add(
1838                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1839                            )
1840                            .add(
1841                                project_collaborator::Column::ConnectionServerId
1842                                    .eq(connection.owner_id as i32),
1843                            ),
1844                    )
1845                    .exec(&*tx)
1846                    .await?;
1847
1848                // Unshare projects.
1849                project::Entity::delete_many()
1850                    .filter(
1851                        Condition::all()
1852                            .add(project::Column::RoomId.eq(room_id))
1853                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
1854                            .add(
1855                                project::Column::HostConnectionServerId
1856                                    .eq(connection.owner_id as i32),
1857                            ),
1858                    )
1859                    .exec(&*tx)
1860                    .await?;
1861
1862                let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
1863                let deleted = if room.participants.is_empty() {
1864                    let result = room::Entity::delete_by_id(room_id)
1865                        .filter(room::Column::ChannelId.is_null())
1866                        .exec(&*tx)
1867                        .await?;
1868                    result.rows_affected > 0
1869                } else {
1870                    false
1871                };
1872
1873                let channel_members = if let Some(channel_id) = channel_id {
1874                    self.get_channel_members_internal(channel_id, &tx).await?
1875                } else {
1876                    Vec::new()
1877                };
1878                let left_room = LeftRoom {
1879                    room,
1880                    channel_id,
1881                    channel_members,
1882                    left_projects,
1883                    canceled_calls_to_user_ids,
1884                    deleted,
1885                };
1886
1887                if left_room.room.participants.is_empty() {
1888                    self.rooms.remove(&room_id);
1889                }
1890
1891                Ok(Some((room_id, left_room)))
1892            } else {
1893                Ok(None)
1894            }
1895        })
1896        .await
1897    }
1898
1899    pub async fn follow(
1900        &self,
1901        project_id: ProjectId,
1902        leader_connection: ConnectionId,
1903        follower_connection: ConnectionId,
1904    ) -> Result<RoomGuard<proto::Room>> {
1905        let room_id = self.room_id_for_project(project_id).await?;
1906        self.room_transaction(room_id, |tx| async move {
1907            follower::ActiveModel {
1908                room_id: ActiveValue::set(room_id),
1909                project_id: ActiveValue::set(project_id),
1910                leader_connection_server_id: ActiveValue::set(ServerId(
1911                    leader_connection.owner_id as i32,
1912                )),
1913                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1914                follower_connection_server_id: ActiveValue::set(ServerId(
1915                    follower_connection.owner_id as i32,
1916                )),
1917                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1918                ..Default::default()
1919            }
1920            .insert(&*tx)
1921            .await?;
1922
1923            let room = self.get_room(room_id, &*tx).await?;
1924            Ok(room)
1925        })
1926        .await
1927    }
1928
1929    pub async fn unfollow(
1930        &self,
1931        project_id: ProjectId,
1932        leader_connection: ConnectionId,
1933        follower_connection: ConnectionId,
1934    ) -> Result<RoomGuard<proto::Room>> {
1935        let room_id = self.room_id_for_project(project_id).await?;
1936        self.room_transaction(room_id, |tx| async move {
1937            follower::Entity::delete_many()
1938                .filter(
1939                    Condition::all()
1940                        .add(follower::Column::ProjectId.eq(project_id))
1941                        .add(
1942                            follower::Column::LeaderConnectionServerId
1943                                .eq(leader_connection.owner_id),
1944                        )
1945                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1946                        .add(
1947                            follower::Column::FollowerConnectionServerId
1948                                .eq(follower_connection.owner_id),
1949                        )
1950                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1951                )
1952                .exec(&*tx)
1953                .await?;
1954
1955            let room = self.get_room(room_id, &*tx).await?;
1956            Ok(room)
1957        })
1958        .await
1959    }
1960
1961    pub async fn update_room_participant_location(
1962        &self,
1963        room_id: RoomId,
1964        connection: ConnectionId,
1965        location: proto::ParticipantLocation,
1966    ) -> Result<RoomGuard<proto::Room>> {
1967        self.room_transaction(room_id, |tx| async {
1968            let tx = tx;
1969            let location_kind;
1970            let location_project_id;
1971            match location
1972                .variant
1973                .as_ref()
1974                .ok_or_else(|| anyhow!("invalid location"))?
1975            {
1976                proto::participant_location::Variant::SharedProject(project) => {
1977                    location_kind = 0;
1978                    location_project_id = Some(ProjectId::from_proto(project.id));
1979                }
1980                proto::participant_location::Variant::UnsharedProject(_) => {
1981                    location_kind = 1;
1982                    location_project_id = None;
1983                }
1984                proto::participant_location::Variant::External(_) => {
1985                    location_kind = 2;
1986                    location_project_id = None;
1987                }
1988            }
1989
1990            let result = room_participant::Entity::update_many()
1991                .filter(
1992                    Condition::all()
1993                        .add(room_participant::Column::RoomId.eq(room_id))
1994                        .add(
1995                            room_participant::Column::AnsweringConnectionId
1996                                .eq(connection.id as i32),
1997                        )
1998                        .add(
1999                            room_participant::Column::AnsweringConnectionServerId
2000                                .eq(connection.owner_id as i32),
2001                        ),
2002                )
2003                .set(room_participant::ActiveModel {
2004                    location_kind: ActiveValue::set(Some(location_kind)),
2005                    location_project_id: ActiveValue::set(location_project_id),
2006                    ..Default::default()
2007                })
2008                .exec(&*tx)
2009                .await?;
2010
2011            if result.rows_affected == 1 {
2012                let room = self.get_room(room_id, &tx).await?;
2013                Ok(room)
2014            } else {
2015                Err(anyhow!("could not update room participant location"))?
2016            }
2017        })
2018        .await
2019    }
2020
2021    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
2022        self.transaction(|tx| async move {
2023            let participant = room_participant::Entity::find()
2024                .filter(
2025                    Condition::all()
2026                        .add(
2027                            room_participant::Column::AnsweringConnectionId
2028                                .eq(connection.id as i32),
2029                        )
2030                        .add(
2031                            room_participant::Column::AnsweringConnectionServerId
2032                                .eq(connection.owner_id as i32),
2033                        ),
2034                )
2035                .one(&*tx)
2036                .await?
2037                .ok_or_else(|| anyhow!("not a participant in any room"))?;
2038
2039            room_participant::Entity::update(room_participant::ActiveModel {
2040                answering_connection_lost: ActiveValue::set(true),
2041                ..participant.into_active_model()
2042            })
2043            .exec(&*tx)
2044            .await?;
2045
2046            Ok(())
2047        })
2048        .await
2049    }
2050
2051    fn build_incoming_call(
2052        room: &proto::Room,
2053        called_user_id: UserId,
2054    ) -> Option<proto::IncomingCall> {
2055        let pending_participant = room
2056            .pending_participants
2057            .iter()
2058            .find(|participant| participant.user_id == called_user_id.to_proto())?;
2059
2060        Some(proto::IncomingCall {
2061            room_id: room.id,
2062            calling_user_id: pending_participant.calling_user_id,
2063            participant_user_ids: room
2064                .participants
2065                .iter()
2066                .map(|participant| participant.user_id)
2067                .collect(),
2068            initial_project: room.participants.iter().find_map(|participant| {
2069                let initial_project_id = pending_participant.initial_project_id?;
2070                participant
2071                    .projects
2072                    .iter()
2073                    .find(|project| project.id == initial_project_id)
2074                    .cloned()
2075            }),
2076        })
2077    }
2078    async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
2079        let (_, room) = self.get_channel_room(room_id, tx).await?;
2080        Ok(room)
2081    }
2082
2083    async fn get_channel_room(
2084        &self,
2085        room_id: RoomId,
2086        tx: &DatabaseTransaction,
2087    ) -> Result<(Option<ChannelId>, proto::Room)> {
2088        let db_room = room::Entity::find_by_id(room_id)
2089            .one(tx)
2090            .await?
2091            .ok_or_else(|| anyhow!("could not find room"))?;
2092
2093        let mut db_participants = db_room
2094            .find_related(room_participant::Entity)
2095            .stream(tx)
2096            .await?;
2097        let mut participants = HashMap::default();
2098        let mut pending_participants = Vec::new();
2099        while let Some(db_participant) = db_participants.next().await {
2100            let db_participant = db_participant?;
2101            if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
2102                .answering_connection_id
2103                .zip(db_participant.answering_connection_server_id)
2104            {
2105                let location = match (
2106                    db_participant.location_kind,
2107                    db_participant.location_project_id,
2108                ) {
2109                    (Some(0), Some(project_id)) => {
2110                        Some(proto::participant_location::Variant::SharedProject(
2111                            proto::participant_location::SharedProject {
2112                                id: project_id.to_proto(),
2113                            },
2114                        ))
2115                    }
2116                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
2117                        Default::default(),
2118                    )),
2119                    _ => Some(proto::participant_location::Variant::External(
2120                        Default::default(),
2121                    )),
2122                };
2123
2124                let answering_connection = ConnectionId {
2125                    owner_id: answering_connection_server_id.0 as u32,
2126                    id: answering_connection_id as u32,
2127                };
2128                participants.insert(
2129                    answering_connection,
2130                    proto::Participant {
2131                        user_id: db_participant.user_id.to_proto(),
2132                        peer_id: Some(answering_connection.into()),
2133                        projects: Default::default(),
2134                        location: Some(proto::ParticipantLocation { variant: location }),
2135                    },
2136                );
2137            } else {
2138                pending_participants.push(proto::PendingParticipant {
2139                    user_id: db_participant.user_id.to_proto(),
2140                    calling_user_id: db_participant.calling_user_id.to_proto(),
2141                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
2142                });
2143            }
2144        }
2145        drop(db_participants);
2146
2147        let mut db_projects = db_room
2148            .find_related(project::Entity)
2149            .find_with_related(worktree::Entity)
2150            .stream(tx)
2151            .await?;
2152
2153        while let Some(row) = db_projects.next().await {
2154            let (db_project, db_worktree) = row?;
2155            let host_connection = db_project.host_connection()?;
2156            if let Some(participant) = participants.get_mut(&host_connection) {
2157                let project = if let Some(project) = participant
2158                    .projects
2159                    .iter_mut()
2160                    .find(|project| project.id == db_project.id.to_proto())
2161                {
2162                    project
2163                } else {
2164                    participant.projects.push(proto::ParticipantProject {
2165                        id: db_project.id.to_proto(),
2166                        worktree_root_names: Default::default(),
2167                    });
2168                    participant.projects.last_mut().unwrap()
2169                };
2170
2171                if let Some(db_worktree) = db_worktree {
2172                    if db_worktree.visible {
2173                        project.worktree_root_names.push(db_worktree.root_name);
2174                    }
2175                }
2176            }
2177        }
2178        drop(db_projects);
2179
2180        let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
2181        let mut followers = Vec::new();
2182        while let Some(db_follower) = db_followers.next().await {
2183            let db_follower = db_follower?;
2184            followers.push(proto::Follower {
2185                leader_id: Some(db_follower.leader_connection().into()),
2186                follower_id: Some(db_follower.follower_connection().into()),
2187                project_id: db_follower.project_id.to_proto(),
2188            });
2189        }
2190
2191        Ok((
2192            db_room.channel_id,
2193            proto::Room {
2194                id: db_room.id.to_proto(),
2195                live_kit_room: db_room.live_kit_room,
2196                participants: participants.into_values().collect(),
2197                pending_participants,
2198                followers,
2199            },
2200        ))
2201    }
2202
2203    async fn get_channel_members_for_room(
2204        &self,
2205        room_id: RoomId,
2206        tx: &DatabaseTransaction,
2207    ) -> Result<Vec<UserId>> {
2208        let db_room = room::Model {
2209            id: room_id,
2210            ..Default::default()
2211        };
2212
2213        let channel_users =
2214            if let Some(channel) = db_room.find_related(channel::Entity).one(tx).await? {
2215                self.get_channel_members_internal(channel.id, tx).await?
2216            } else {
2217                Vec::new()
2218            };
2219
2220        Ok(channel_users)
2221    }
2222
2223    // projects
2224
2225    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
2226        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2227        enum QueryAs {
2228            Count,
2229        }
2230
2231        self.transaction(|tx| async move {
2232            Ok(project::Entity::find()
2233                .select_only()
2234                .column_as(project::Column::Id.count(), QueryAs::Count)
2235                .inner_join(user::Entity)
2236                .filter(user::Column::Admin.eq(false))
2237                .into_values::<_, QueryAs>()
2238                .one(&*tx)
2239                .await?
2240                .unwrap_or(0i64) as usize)
2241        })
2242        .await
2243    }
2244
2245    pub async fn share_project(
2246        &self,
2247        room_id: RoomId,
2248        connection: ConnectionId,
2249        worktrees: &[proto::WorktreeMetadata],
2250    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
2251        self.room_transaction(room_id, |tx| async move {
2252            let participant = room_participant::Entity::find()
2253                .filter(
2254                    Condition::all()
2255                        .add(
2256                            room_participant::Column::AnsweringConnectionId
2257                                .eq(connection.id as i32),
2258                        )
2259                        .add(
2260                            room_participant::Column::AnsweringConnectionServerId
2261                                .eq(connection.owner_id as i32),
2262                        ),
2263                )
2264                .one(&*tx)
2265                .await?
2266                .ok_or_else(|| anyhow!("could not find participant"))?;
2267            if participant.room_id != room_id {
2268                return Err(anyhow!("shared project on unexpected room"))?;
2269            }
2270
2271            let project = project::ActiveModel {
2272                room_id: ActiveValue::set(participant.room_id),
2273                host_user_id: ActiveValue::set(participant.user_id),
2274                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
2275                host_connection_server_id: ActiveValue::set(Some(ServerId(
2276                    connection.owner_id as i32,
2277                ))),
2278                ..Default::default()
2279            }
2280            .insert(&*tx)
2281            .await?;
2282
2283            if !worktrees.is_empty() {
2284                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2285                    worktree::ActiveModel {
2286                        id: ActiveValue::set(worktree.id as i64),
2287                        project_id: ActiveValue::set(project.id),
2288                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
2289                        root_name: ActiveValue::set(worktree.root_name.clone()),
2290                        visible: ActiveValue::set(worktree.visible),
2291                        scan_id: ActiveValue::set(0),
2292                        completed_scan_id: ActiveValue::set(0),
2293                    }
2294                }))
2295                .exec(&*tx)
2296                .await?;
2297            }
2298
2299            project_collaborator::ActiveModel {
2300                project_id: ActiveValue::set(project.id),
2301                connection_id: ActiveValue::set(connection.id as i32),
2302                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2303                user_id: ActiveValue::set(participant.user_id),
2304                replica_id: ActiveValue::set(ReplicaId(0)),
2305                is_host: ActiveValue::set(true),
2306                ..Default::default()
2307            }
2308            .insert(&*tx)
2309            .await?;
2310
2311            let room = self.get_room(room_id, &tx).await?;
2312            Ok((project.id, room))
2313        })
2314        .await
2315    }
2316
2317    pub async fn unshare_project(
2318        &self,
2319        project_id: ProjectId,
2320        connection: ConnectionId,
2321    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2322        let room_id = self.room_id_for_project(project_id).await?;
2323        self.room_transaction(room_id, |tx| async move {
2324            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2325
2326            let project = project::Entity::find_by_id(project_id)
2327                .one(&*tx)
2328                .await?
2329                .ok_or_else(|| anyhow!("project not found"))?;
2330            if project.host_connection()? == connection {
2331                project::Entity::delete(project.into_active_model())
2332                    .exec(&*tx)
2333                    .await?;
2334                let room = self.get_room(room_id, &tx).await?;
2335                Ok((room, guest_connection_ids))
2336            } else {
2337                Err(anyhow!("cannot unshare a project hosted by another user"))?
2338            }
2339        })
2340        .await
2341    }
2342
2343    pub async fn update_project(
2344        &self,
2345        project_id: ProjectId,
2346        connection: ConnectionId,
2347        worktrees: &[proto::WorktreeMetadata],
2348    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2349        let room_id = self.room_id_for_project(project_id).await?;
2350        self.room_transaction(room_id, |tx| async move {
2351            let project = project::Entity::find_by_id(project_id)
2352                .filter(
2353                    Condition::all()
2354                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2355                        .add(
2356                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2357                        ),
2358                )
2359                .one(&*tx)
2360                .await?
2361                .ok_or_else(|| anyhow!("no such project"))?;
2362
2363            self.update_project_worktrees(project.id, worktrees, &tx)
2364                .await?;
2365
2366            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2367            let room = self.get_room(project.room_id, &tx).await?;
2368            Ok((room, guest_connection_ids))
2369        })
2370        .await
2371    }
2372
2373    async fn update_project_worktrees(
2374        &self,
2375        project_id: ProjectId,
2376        worktrees: &[proto::WorktreeMetadata],
2377        tx: &DatabaseTransaction,
2378    ) -> Result<()> {
2379        if !worktrees.is_empty() {
2380            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2381                id: ActiveValue::set(worktree.id as i64),
2382                project_id: ActiveValue::set(project_id),
2383                abs_path: ActiveValue::set(worktree.abs_path.clone()),
2384                root_name: ActiveValue::set(worktree.root_name.clone()),
2385                visible: ActiveValue::set(worktree.visible),
2386                scan_id: ActiveValue::set(0),
2387                completed_scan_id: ActiveValue::set(0),
2388            }))
2389            .on_conflict(
2390                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2391                    .update_column(worktree::Column::RootName)
2392                    .to_owned(),
2393            )
2394            .exec(&*tx)
2395            .await?;
2396        }
2397
2398        worktree::Entity::delete_many()
2399            .filter(worktree::Column::ProjectId.eq(project_id).and(
2400                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2401            ))
2402            .exec(&*tx)
2403            .await?;
2404
2405        Ok(())
2406    }
2407
2408    pub async fn update_worktree(
2409        &self,
2410        update: &proto::UpdateWorktree,
2411        connection: ConnectionId,
2412    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2413        let project_id = ProjectId::from_proto(update.project_id);
2414        let worktree_id = update.worktree_id as i64;
2415        let room_id = self.room_id_for_project(project_id).await?;
2416        self.room_transaction(room_id, |tx| async move {
2417            // Ensure the update comes from the host.
2418            let _project = project::Entity::find_by_id(project_id)
2419                .filter(
2420                    Condition::all()
2421                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2422                        .add(
2423                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2424                        ),
2425                )
2426                .one(&*tx)
2427                .await?
2428                .ok_or_else(|| anyhow!("no such project"))?;
2429
2430            // Update metadata.
2431            worktree::Entity::update(worktree::ActiveModel {
2432                id: ActiveValue::set(worktree_id),
2433                project_id: ActiveValue::set(project_id),
2434                root_name: ActiveValue::set(update.root_name.clone()),
2435                scan_id: ActiveValue::set(update.scan_id as i64),
2436                completed_scan_id: if update.is_last_update {
2437                    ActiveValue::set(update.scan_id as i64)
2438                } else {
2439                    ActiveValue::default()
2440                },
2441                abs_path: ActiveValue::set(update.abs_path.clone()),
2442                ..Default::default()
2443            })
2444            .exec(&*tx)
2445            .await?;
2446
2447            if !update.updated_entries.is_empty() {
2448                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2449                    let mtime = entry.mtime.clone().unwrap_or_default();
2450                    worktree_entry::ActiveModel {
2451                        project_id: ActiveValue::set(project_id),
2452                        worktree_id: ActiveValue::set(worktree_id),
2453                        id: ActiveValue::set(entry.id as i64),
2454                        is_dir: ActiveValue::set(entry.is_dir),
2455                        path: ActiveValue::set(entry.path.clone()),
2456                        inode: ActiveValue::set(entry.inode as i64),
2457                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2458                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2459                        is_symlink: ActiveValue::set(entry.is_symlink),
2460                        is_ignored: ActiveValue::set(entry.is_ignored),
2461                        is_external: ActiveValue::set(entry.is_external),
2462                        git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
2463                        is_deleted: ActiveValue::set(false),
2464                        scan_id: ActiveValue::set(update.scan_id as i64),
2465                    }
2466                }))
2467                .on_conflict(
2468                    OnConflict::columns([
2469                        worktree_entry::Column::ProjectId,
2470                        worktree_entry::Column::WorktreeId,
2471                        worktree_entry::Column::Id,
2472                    ])
2473                    .update_columns([
2474                        worktree_entry::Column::IsDir,
2475                        worktree_entry::Column::Path,
2476                        worktree_entry::Column::Inode,
2477                        worktree_entry::Column::MtimeSeconds,
2478                        worktree_entry::Column::MtimeNanos,
2479                        worktree_entry::Column::IsSymlink,
2480                        worktree_entry::Column::IsIgnored,
2481                        worktree_entry::Column::GitStatus,
2482                        worktree_entry::Column::ScanId,
2483                    ])
2484                    .to_owned(),
2485                )
2486                .exec(&*tx)
2487                .await?;
2488            }
2489
2490            if !update.removed_entries.is_empty() {
2491                worktree_entry::Entity::update_many()
2492                    .filter(
2493                        worktree_entry::Column::ProjectId
2494                            .eq(project_id)
2495                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2496                            .and(
2497                                worktree_entry::Column::Id
2498                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2499                            ),
2500                    )
2501                    .set(worktree_entry::ActiveModel {
2502                        is_deleted: ActiveValue::Set(true),
2503                        scan_id: ActiveValue::Set(update.scan_id as i64),
2504                        ..Default::default()
2505                    })
2506                    .exec(&*tx)
2507                    .await?;
2508            }
2509
2510            if !update.updated_repositories.is_empty() {
2511                worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
2512                    |repository| worktree_repository::ActiveModel {
2513                        project_id: ActiveValue::set(project_id),
2514                        worktree_id: ActiveValue::set(worktree_id),
2515                        work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
2516                        scan_id: ActiveValue::set(update.scan_id as i64),
2517                        branch: ActiveValue::set(repository.branch.clone()),
2518                        is_deleted: ActiveValue::set(false),
2519                    },
2520                ))
2521                .on_conflict(
2522                    OnConflict::columns([
2523                        worktree_repository::Column::ProjectId,
2524                        worktree_repository::Column::WorktreeId,
2525                        worktree_repository::Column::WorkDirectoryId,
2526                    ])
2527                    .update_columns([
2528                        worktree_repository::Column::ScanId,
2529                        worktree_repository::Column::Branch,
2530                    ])
2531                    .to_owned(),
2532                )
2533                .exec(&*tx)
2534                .await?;
2535            }
2536
2537            if !update.removed_repositories.is_empty() {
2538                worktree_repository::Entity::update_many()
2539                    .filter(
2540                        worktree_repository::Column::ProjectId
2541                            .eq(project_id)
2542                            .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
2543                            .and(
2544                                worktree_repository::Column::WorkDirectoryId
2545                                    .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
2546                            ),
2547                    )
2548                    .set(worktree_repository::ActiveModel {
2549                        is_deleted: ActiveValue::Set(true),
2550                        scan_id: ActiveValue::Set(update.scan_id as i64),
2551                        ..Default::default()
2552                    })
2553                    .exec(&*tx)
2554                    .await?;
2555            }
2556
2557            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2558            Ok(connection_ids)
2559        })
2560        .await
2561    }
2562
2563    pub async fn update_diagnostic_summary(
2564        &self,
2565        update: &proto::UpdateDiagnosticSummary,
2566        connection: ConnectionId,
2567    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2568        let project_id = ProjectId::from_proto(update.project_id);
2569        let worktree_id = update.worktree_id as i64;
2570        let room_id = self.room_id_for_project(project_id).await?;
2571        self.room_transaction(room_id, |tx| async move {
2572            let summary = update
2573                .summary
2574                .as_ref()
2575                .ok_or_else(|| anyhow!("invalid summary"))?;
2576
2577            // Ensure the update comes from the host.
2578            let project = project::Entity::find_by_id(project_id)
2579                .one(&*tx)
2580                .await?
2581                .ok_or_else(|| anyhow!("no such project"))?;
2582            if project.host_connection()? != connection {
2583                return Err(anyhow!("can't update a project hosted by someone else"))?;
2584            }
2585
2586            // Update summary.
2587            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2588                project_id: ActiveValue::set(project_id),
2589                worktree_id: ActiveValue::set(worktree_id),
2590                path: ActiveValue::set(summary.path.clone()),
2591                language_server_id: ActiveValue::set(summary.language_server_id as i64),
2592                error_count: ActiveValue::set(summary.error_count as i32),
2593                warning_count: ActiveValue::set(summary.warning_count as i32),
2594                ..Default::default()
2595            })
2596            .on_conflict(
2597                OnConflict::columns([
2598                    worktree_diagnostic_summary::Column::ProjectId,
2599                    worktree_diagnostic_summary::Column::WorktreeId,
2600                    worktree_diagnostic_summary::Column::Path,
2601                ])
2602                .update_columns([
2603                    worktree_diagnostic_summary::Column::LanguageServerId,
2604                    worktree_diagnostic_summary::Column::ErrorCount,
2605                    worktree_diagnostic_summary::Column::WarningCount,
2606                ])
2607                .to_owned(),
2608            )
2609            .exec(&*tx)
2610            .await?;
2611
2612            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2613            Ok(connection_ids)
2614        })
2615        .await
2616    }
2617
2618    pub async fn start_language_server(
2619        &self,
2620        update: &proto::StartLanguageServer,
2621        connection: ConnectionId,
2622    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2623        let project_id = ProjectId::from_proto(update.project_id);
2624        let room_id = self.room_id_for_project(project_id).await?;
2625        self.room_transaction(room_id, |tx| async move {
2626            let server = update
2627                .server
2628                .as_ref()
2629                .ok_or_else(|| anyhow!("invalid language server"))?;
2630
2631            // Ensure the update comes from the host.
2632            let project = project::Entity::find_by_id(project_id)
2633                .one(&*tx)
2634                .await?
2635                .ok_or_else(|| anyhow!("no such project"))?;
2636            if project.host_connection()? != connection {
2637                return Err(anyhow!("can't update a project hosted by someone else"))?;
2638            }
2639
2640            // Add the newly-started language server.
2641            language_server::Entity::insert(language_server::ActiveModel {
2642                project_id: ActiveValue::set(project_id),
2643                id: ActiveValue::set(server.id as i64),
2644                name: ActiveValue::set(server.name.clone()),
2645                ..Default::default()
2646            })
2647            .on_conflict(
2648                OnConflict::columns([
2649                    language_server::Column::ProjectId,
2650                    language_server::Column::Id,
2651                ])
2652                .update_column(language_server::Column::Name)
2653                .to_owned(),
2654            )
2655            .exec(&*tx)
2656            .await?;
2657
2658            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2659            Ok(connection_ids)
2660        })
2661        .await
2662    }
2663
2664    pub async fn update_worktree_settings(
2665        &self,
2666        update: &proto::UpdateWorktreeSettings,
2667        connection: ConnectionId,
2668    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2669        let project_id = ProjectId::from_proto(update.project_id);
2670        let room_id = self.room_id_for_project(project_id).await?;
2671        self.room_transaction(room_id, |tx| async move {
2672            // Ensure the update comes from the host.
2673            let project = project::Entity::find_by_id(project_id)
2674                .one(&*tx)
2675                .await?
2676                .ok_or_else(|| anyhow!("no such project"))?;
2677            if project.host_connection()? != connection {
2678                return Err(anyhow!("can't update a project hosted by someone else"))?;
2679            }
2680
2681            if let Some(content) = &update.content {
2682                worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
2683                    project_id: ActiveValue::Set(project_id),
2684                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
2685                    path: ActiveValue::Set(update.path.clone()),
2686                    content: ActiveValue::Set(content.clone()),
2687                })
2688                .on_conflict(
2689                    OnConflict::columns([
2690                        worktree_settings_file::Column::ProjectId,
2691                        worktree_settings_file::Column::WorktreeId,
2692                        worktree_settings_file::Column::Path,
2693                    ])
2694                    .update_column(worktree_settings_file::Column::Content)
2695                    .to_owned(),
2696                )
2697                .exec(&*tx)
2698                .await?;
2699            } else {
2700                worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
2701                    project_id: ActiveValue::Set(project_id),
2702                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
2703                    path: ActiveValue::Set(update.path.clone()),
2704                    ..Default::default()
2705                })
2706                .exec(&*tx)
2707                .await?;
2708            }
2709
2710            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2711            Ok(connection_ids)
2712        })
2713        .await
2714    }
2715
2716    pub async fn join_project(
2717        &self,
2718        project_id: ProjectId,
2719        connection: ConnectionId,
2720    ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2721        let room_id = self.room_id_for_project(project_id).await?;
2722        self.room_transaction(room_id, |tx| async move {
2723            let participant = room_participant::Entity::find()
2724                .filter(
2725                    Condition::all()
2726                        .add(
2727                            room_participant::Column::AnsweringConnectionId
2728                                .eq(connection.id as i32),
2729                        )
2730                        .add(
2731                            room_participant::Column::AnsweringConnectionServerId
2732                                .eq(connection.owner_id as i32),
2733                        ),
2734                )
2735                .one(&*tx)
2736                .await?
2737                .ok_or_else(|| anyhow!("must join a room first"))?;
2738
2739            let project = project::Entity::find_by_id(project_id)
2740                .one(&*tx)
2741                .await?
2742                .ok_or_else(|| anyhow!("no such project"))?;
2743            if project.room_id != participant.room_id {
2744                return Err(anyhow!("no such project"))?;
2745            }
2746
2747            let mut collaborators = project
2748                .find_related(project_collaborator::Entity)
2749                .all(&*tx)
2750                .await?;
2751            let replica_ids = collaborators
2752                .iter()
2753                .map(|c| c.replica_id)
2754                .collect::<HashSet<_>>();
2755            let mut replica_id = ReplicaId(1);
2756            while replica_ids.contains(&replica_id) {
2757                replica_id.0 += 1;
2758            }
2759            let new_collaborator = project_collaborator::ActiveModel {
2760                project_id: ActiveValue::set(project_id),
2761                connection_id: ActiveValue::set(connection.id as i32),
2762                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2763                user_id: ActiveValue::set(participant.user_id),
2764                replica_id: ActiveValue::set(replica_id),
2765                is_host: ActiveValue::set(false),
2766                ..Default::default()
2767            }
2768            .insert(&*tx)
2769            .await?;
2770            collaborators.push(new_collaborator);
2771
2772            let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2773            let mut worktrees = db_worktrees
2774                .into_iter()
2775                .map(|db_worktree| {
2776                    (
2777                        db_worktree.id as u64,
2778                        Worktree {
2779                            id: db_worktree.id as u64,
2780                            abs_path: db_worktree.abs_path,
2781                            root_name: db_worktree.root_name,
2782                            visible: db_worktree.visible,
2783                            entries: Default::default(),
2784                            repository_entries: Default::default(),
2785                            diagnostic_summaries: Default::default(),
2786                            settings_files: Default::default(),
2787                            scan_id: db_worktree.scan_id as u64,
2788                            completed_scan_id: db_worktree.completed_scan_id as u64,
2789                        },
2790                    )
2791                })
2792                .collect::<BTreeMap<_, _>>();
2793
2794            // Populate worktree entries.
2795            {
2796                let mut db_entries = worktree_entry::Entity::find()
2797                    .filter(
2798                        Condition::all()
2799                            .add(worktree_entry::Column::ProjectId.eq(project_id))
2800                            .add(worktree_entry::Column::IsDeleted.eq(false)),
2801                    )
2802                    .stream(&*tx)
2803                    .await?;
2804                while let Some(db_entry) = db_entries.next().await {
2805                    let db_entry = db_entry?;
2806                    if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2807                        worktree.entries.push(proto::Entry {
2808                            id: db_entry.id as u64,
2809                            is_dir: db_entry.is_dir,
2810                            path: db_entry.path,
2811                            inode: db_entry.inode as u64,
2812                            mtime: Some(proto::Timestamp {
2813                                seconds: db_entry.mtime_seconds as u64,
2814                                nanos: db_entry.mtime_nanos as u32,
2815                            }),
2816                            is_symlink: db_entry.is_symlink,
2817                            is_ignored: db_entry.is_ignored,
2818                            is_external: db_entry.is_external,
2819                            git_status: db_entry.git_status.map(|status| status as i32),
2820                        });
2821                    }
2822                }
2823            }
2824
2825            // Populate repository entries.
2826            {
2827                let mut db_repository_entries = worktree_repository::Entity::find()
2828                    .filter(
2829                        Condition::all()
2830                            .add(worktree_repository::Column::ProjectId.eq(project_id))
2831                            .add(worktree_repository::Column::IsDeleted.eq(false)),
2832                    )
2833                    .stream(&*tx)
2834                    .await?;
2835                while let Some(db_repository_entry) = db_repository_entries.next().await {
2836                    let db_repository_entry = db_repository_entry?;
2837                    if let Some(worktree) =
2838                        worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
2839                    {
2840                        worktree.repository_entries.insert(
2841                            db_repository_entry.work_directory_id as u64,
2842                            proto::RepositoryEntry {
2843                                work_directory_id: db_repository_entry.work_directory_id as u64,
2844                                branch: db_repository_entry.branch,
2845                            },
2846                        );
2847                    }
2848                }
2849            }
2850
2851            // Populate worktree diagnostic summaries.
2852            {
2853                let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2854                    .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2855                    .stream(&*tx)
2856                    .await?;
2857                while let Some(db_summary) = db_summaries.next().await {
2858                    let db_summary = db_summary?;
2859                    if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2860                        worktree
2861                            .diagnostic_summaries
2862                            .push(proto::DiagnosticSummary {
2863                                path: db_summary.path,
2864                                language_server_id: db_summary.language_server_id as u64,
2865                                error_count: db_summary.error_count as u32,
2866                                warning_count: db_summary.warning_count as u32,
2867                            });
2868                    }
2869                }
2870            }
2871
2872            // Populate worktree settings files
2873            {
2874                let mut db_settings_files = worktree_settings_file::Entity::find()
2875                    .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
2876                    .stream(&*tx)
2877                    .await?;
2878                while let Some(db_settings_file) = db_settings_files.next().await {
2879                    let db_settings_file = db_settings_file?;
2880                    if let Some(worktree) =
2881                        worktrees.get_mut(&(db_settings_file.worktree_id as u64))
2882                    {
2883                        worktree.settings_files.push(WorktreeSettingsFile {
2884                            path: db_settings_file.path,
2885                            content: db_settings_file.content,
2886                        });
2887                    }
2888                }
2889            }
2890
2891            // Populate language servers.
2892            let language_servers = project
2893                .find_related(language_server::Entity)
2894                .all(&*tx)
2895                .await?;
2896
2897            let project = Project {
2898                collaborators: collaborators
2899                    .into_iter()
2900                    .map(|collaborator| ProjectCollaborator {
2901                        connection_id: collaborator.connection(),
2902                        user_id: collaborator.user_id,
2903                        replica_id: collaborator.replica_id,
2904                        is_host: collaborator.is_host,
2905                    })
2906                    .collect(),
2907                worktrees,
2908                language_servers: language_servers
2909                    .into_iter()
2910                    .map(|language_server| proto::LanguageServer {
2911                        id: language_server.id as u64,
2912                        name: language_server.name,
2913                    })
2914                    .collect(),
2915            };
2916            Ok((project, replica_id as ReplicaId))
2917        })
2918        .await
2919    }
2920
2921    pub async fn leave_project(
2922        &self,
2923        project_id: ProjectId,
2924        connection: ConnectionId,
2925    ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
2926        let room_id = self.room_id_for_project(project_id).await?;
2927        self.room_transaction(room_id, |tx| async move {
2928            let result = project_collaborator::Entity::delete_many()
2929                .filter(
2930                    Condition::all()
2931                        .add(project_collaborator::Column::ProjectId.eq(project_id))
2932                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2933                        .add(
2934                            project_collaborator::Column::ConnectionServerId
2935                                .eq(connection.owner_id as i32),
2936                        ),
2937                )
2938                .exec(&*tx)
2939                .await?;
2940            if result.rows_affected == 0 {
2941                Err(anyhow!("not a collaborator on this project"))?;
2942            }
2943
2944            let project = project::Entity::find_by_id(project_id)
2945                .one(&*tx)
2946                .await?
2947                .ok_or_else(|| anyhow!("no such project"))?;
2948            let collaborators = project
2949                .find_related(project_collaborator::Entity)
2950                .all(&*tx)
2951                .await?;
2952            let connection_ids = collaborators
2953                .into_iter()
2954                .map(|collaborator| collaborator.connection())
2955                .collect();
2956
2957            follower::Entity::delete_many()
2958                .filter(
2959                    Condition::any()
2960                        .add(
2961                            Condition::all()
2962                                .add(follower::Column::ProjectId.eq(project_id))
2963                                .add(
2964                                    follower::Column::LeaderConnectionServerId
2965                                        .eq(connection.owner_id),
2966                                )
2967                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
2968                        )
2969                        .add(
2970                            Condition::all()
2971                                .add(follower::Column::ProjectId.eq(project_id))
2972                                .add(
2973                                    follower::Column::FollowerConnectionServerId
2974                                        .eq(connection.owner_id),
2975                                )
2976                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
2977                        ),
2978                )
2979                .exec(&*tx)
2980                .await?;
2981
2982            let room = self.get_room(project.room_id, &tx).await?;
2983            let left_project = LeftProject {
2984                id: project_id,
2985                host_user_id: project.host_user_id,
2986                host_connection_id: project.host_connection()?,
2987                connection_ids,
2988            };
2989            Ok((room, left_project))
2990        })
2991        .await
2992    }
2993
2994    pub async fn project_collaborators(
2995        &self,
2996        project_id: ProjectId,
2997        connection_id: ConnectionId,
2998    ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
2999        let room_id = self.room_id_for_project(project_id).await?;
3000        self.room_transaction(room_id, |tx| async move {
3001            let collaborators = project_collaborator::Entity::find()
3002                .filter(project_collaborator::Column::ProjectId.eq(project_id))
3003                .all(&*tx)
3004                .await?
3005                .into_iter()
3006                .map(|collaborator| ProjectCollaborator {
3007                    connection_id: collaborator.connection(),
3008                    user_id: collaborator.user_id,
3009                    replica_id: collaborator.replica_id,
3010                    is_host: collaborator.is_host,
3011                })
3012                .collect::<Vec<_>>();
3013
3014            if collaborators
3015                .iter()
3016                .any(|collaborator| collaborator.connection_id == connection_id)
3017            {
3018                Ok(collaborators)
3019            } else {
3020                Err(anyhow!("no such project"))?
3021            }
3022        })
3023        .await
3024    }
3025
3026    pub async fn project_connection_ids(
3027        &self,
3028        project_id: ProjectId,
3029        connection_id: ConnectionId,
3030    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
3031        let room_id = self.room_id_for_project(project_id).await?;
3032        self.room_transaction(room_id, |tx| async move {
3033            let mut collaborators = project_collaborator::Entity::find()
3034                .filter(project_collaborator::Column::ProjectId.eq(project_id))
3035                .stream(&*tx)
3036                .await?;
3037
3038            let mut connection_ids = HashSet::default();
3039            while let Some(collaborator) = collaborators.next().await {
3040                let collaborator = collaborator?;
3041                connection_ids.insert(collaborator.connection());
3042            }
3043
3044            if connection_ids.contains(&connection_id) {
3045                Ok(connection_ids)
3046            } else {
3047                Err(anyhow!("no such project"))?
3048            }
3049        })
3050        .await
3051    }
3052
3053    async fn project_guest_connection_ids(
3054        &self,
3055        project_id: ProjectId,
3056        tx: &DatabaseTransaction,
3057    ) -> Result<Vec<ConnectionId>> {
3058        let mut collaborators = project_collaborator::Entity::find()
3059            .filter(
3060                project_collaborator::Column::ProjectId
3061                    .eq(project_id)
3062                    .and(project_collaborator::Column::IsHost.eq(false)),
3063            )
3064            .stream(tx)
3065            .await?;
3066
3067        let mut guest_connection_ids = Vec::new();
3068        while let Some(collaborator) = collaborators.next().await {
3069            let collaborator = collaborator?;
3070            guest_connection_ids.push(collaborator.connection());
3071        }
3072        Ok(guest_connection_ids)
3073    }
3074
3075    async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
3076        self.transaction(|tx| async move {
3077            let project = project::Entity::find_by_id(project_id)
3078                .one(&*tx)
3079                .await?
3080                .ok_or_else(|| anyhow!("project {} not found", project_id))?;
3081            Ok(project.room_id)
3082        })
3083        .await
3084    }
3085
3086    // access tokens
3087
3088    pub async fn create_access_token(
3089        &self,
3090        user_id: UserId,
3091        access_token_hash: &str,
3092        max_access_token_count: usize,
3093    ) -> Result<AccessTokenId> {
3094        self.transaction(|tx| async {
3095            let tx = tx;
3096
3097            let token = access_token::ActiveModel {
3098                user_id: ActiveValue::set(user_id),
3099                hash: ActiveValue::set(access_token_hash.into()),
3100                ..Default::default()
3101            }
3102            .insert(&*tx)
3103            .await?;
3104
3105            access_token::Entity::delete_many()
3106                .filter(
3107                    access_token::Column::Id.in_subquery(
3108                        Query::select()
3109                            .column(access_token::Column::Id)
3110                            .from(access_token::Entity)
3111                            .and_where(access_token::Column::UserId.eq(user_id))
3112                            .order_by(access_token::Column::Id, sea_orm::Order::Desc)
3113                            .limit(10000)
3114                            .offset(max_access_token_count as u64)
3115                            .to_owned(),
3116                    ),
3117                )
3118                .exec(&*tx)
3119                .await?;
3120            Ok(token.id)
3121        })
3122        .await
3123    }
3124
3125    pub async fn get_access_token(
3126        &self,
3127        access_token_id: AccessTokenId,
3128    ) -> Result<access_token::Model> {
3129        self.transaction(|tx| async move {
3130            Ok(access_token::Entity::find_by_id(access_token_id)
3131                .one(&*tx)
3132                .await?
3133                .ok_or_else(|| anyhow!("no such access token"))?)
3134        })
3135        .await
3136    }
3137
3138    // channels
3139
3140    pub async fn create_root_channel(
3141        &self,
3142        name: &str,
3143        live_kit_room: &str,
3144        creator_id: UserId,
3145    ) -> Result<ChannelId> {
3146        self.create_channel(name, None, live_kit_room, creator_id)
3147            .await
3148    }
3149
3150    pub async fn create_channel(
3151        &self,
3152        name: &str,
3153        parent: Option<ChannelId>,
3154        live_kit_room: &str,
3155        creator_id: UserId,
3156    ) -> Result<ChannelId> {
3157        self.transaction(move |tx| async move {
3158            let tx = tx;
3159
3160            if let Some(parent) = parent {
3161                let channels = self.get_channel_ancestors(parent, &*tx).await?;
3162                channel_member::Entity::find()
3163                    .filter(channel_member::Column::ChannelId.is_in(channels.iter().copied()))
3164                    .filter(
3165                        channel_member::Column::UserId
3166                            .eq(creator_id)
3167                            .and(channel_member::Column::Accepted.eq(true)),
3168                    )
3169                    .one(&*tx)
3170                    .await?
3171                    .ok_or_else(|| {
3172                        anyhow!("User does not have the permissions to create this channel")
3173                    })?;
3174            }
3175
3176            let channel = channel::ActiveModel {
3177                name: ActiveValue::Set(name.to_string()),
3178                ..Default::default()
3179            };
3180
3181            let channel = channel.insert(&*tx).await?;
3182
3183            if let Some(parent) = parent {
3184                channel_parent::ActiveModel {
3185                    child_id: ActiveValue::Set(channel.id),
3186                    parent_id: ActiveValue::Set(parent),
3187                }
3188                .insert(&*tx)
3189                .await?;
3190            }
3191
3192            channel_member::ActiveModel {
3193                channel_id: ActiveValue::Set(channel.id),
3194                user_id: ActiveValue::Set(creator_id),
3195                accepted: ActiveValue::Set(true),
3196                admin: ActiveValue::Set(true),
3197                ..Default::default()
3198            }
3199            .insert(&*tx)
3200            .await?;
3201
3202            room::ActiveModel {
3203                channel_id: ActiveValue::Set(Some(channel.id)),
3204                live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
3205                ..Default::default()
3206            }
3207            .insert(&*tx)
3208            .await?;
3209
3210            Ok(channel.id)
3211        })
3212        .await
3213    }
3214
3215    pub async fn remove_channel(
3216        &self,
3217        channel_id: ChannelId,
3218        user_id: UserId,
3219    ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
3220        self.transaction(move |tx| async move {
3221            let tx = tx;
3222
3223            // Check if user is an admin
3224            channel_member::Entity::find()
3225                .filter(
3226                    channel_member::Column::ChannelId
3227                        .eq(channel_id)
3228                        .and(channel_member::Column::UserId.eq(user_id))
3229                        .and(channel_member::Column::Admin.eq(true)),
3230                )
3231                .one(&*tx)
3232                .await?
3233                .ok_or_else(|| anyhow!("user is not allowed to remove this channel"))?;
3234
3235            let mut descendants = self.get_channel_descendants([channel_id], &*tx).await?;
3236
3237            // Keep channels which have another active
3238            let mut channels_to_keep = channel_parent::Entity::find()
3239                .filter(
3240                    channel_parent::Column::ChildId
3241                        .is_in(descendants.keys().copied().filter(|&id| id != channel_id))
3242                        .and(
3243                            channel_parent::Column::ParentId.is_not_in(descendants.keys().copied()),
3244                        ),
3245                )
3246                .stream(&*tx)
3247                .await?;
3248
3249            while let Some(row) = channels_to_keep.next().await {
3250                let row = row?;
3251                descendants.remove(&row.child_id);
3252            }
3253
3254            drop(channels_to_keep);
3255
3256            let channels_to_remove = descendants.keys().copied().collect::<Vec<_>>();
3257
3258            let members_to_notify: Vec<UserId> = channel_member::Entity::find()
3259                .filter(channel_member::Column::ChannelId.is_in(channels_to_remove.iter().copied()))
3260                .select_only()
3261                .column(channel_member::Column::UserId)
3262                .distinct()
3263                .into_values::<_, QueryUserIds>()
3264                .all(&*tx)
3265                .await?;
3266
3267            // Channel members and parents should delete via cascade
3268            channel::Entity::delete_many()
3269                .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
3270                .exec(&*tx)
3271                .await?;
3272
3273            Ok((channels_to_remove, members_to_notify))
3274        })
3275        .await
3276    }
3277
3278    pub async fn invite_channel_member(
3279        &self,
3280        channel_id: ChannelId,
3281        invitee_id: UserId,
3282        inviter_id: UserId,
3283        is_admin: bool,
3284    ) -> Result<()> {
3285        self.transaction(move |tx| async move {
3286            let tx = tx;
3287
3288            // Check if inviter is a member
3289            channel_member::Entity::find()
3290                .filter(
3291                    channel_member::Column::ChannelId
3292                        .eq(channel_id)
3293                        .and(channel_member::Column::UserId.eq(inviter_id))
3294                        .and(channel_member::Column::Admin.eq(true)),
3295                )
3296                .one(&*tx)
3297                .await?
3298                .ok_or_else(|| {
3299                    anyhow!("Inviter does not have permissions to invite the invitee")
3300                })?;
3301
3302            let channel_membership = channel_member::ActiveModel {
3303                channel_id: ActiveValue::Set(channel_id),
3304                user_id: ActiveValue::Set(invitee_id),
3305                accepted: ActiveValue::Set(false),
3306                admin: ActiveValue::Set(is_admin),
3307                ..Default::default()
3308            };
3309
3310            channel_membership.insert(&*tx).await?;
3311
3312            Ok(())
3313        })
3314        .await
3315    }
3316
3317    pub async fn respond_to_channel_invite(
3318        &self,
3319        channel_id: ChannelId,
3320        user_id: UserId,
3321        accept: bool,
3322    ) -> Result<()> {
3323        self.transaction(move |tx| async move {
3324            let tx = tx;
3325
3326            let rows_affected = if accept {
3327                channel_member::Entity::update_many()
3328                    .set(channel_member::ActiveModel {
3329                        accepted: ActiveValue::Set(accept),
3330                        ..Default::default()
3331                    })
3332                    .filter(
3333                        channel_member::Column::ChannelId
3334                            .eq(channel_id)
3335                            .and(channel_member::Column::UserId.eq(user_id))
3336                            .and(channel_member::Column::Accepted.eq(false)),
3337                    )
3338                    .exec(&*tx)
3339                    .await?
3340                    .rows_affected
3341            } else {
3342                channel_member::ActiveModel {
3343                    channel_id: ActiveValue::Unchanged(channel_id),
3344                    user_id: ActiveValue::Unchanged(user_id),
3345                    ..Default::default()
3346                }
3347                .delete(&*tx)
3348                .await?
3349                .rows_affected
3350            };
3351
3352            if rows_affected == 0 {
3353                Err(anyhow!("no such invitation"))?;
3354            }
3355
3356            Ok(())
3357        })
3358        .await
3359    }
3360
3361    pub async fn get_channel_invites(&self, user_id: UserId) -> Result<Vec<Channel>> {
3362        self.transaction(|tx| async move {
3363            let tx = tx;
3364
3365            let channel_invites = channel_member::Entity::find()
3366                .filter(
3367                    channel_member::Column::UserId
3368                        .eq(user_id)
3369                        .and(channel_member::Column::Accepted.eq(false)),
3370                )
3371                .all(&*tx)
3372                .await?;
3373
3374            let channels = channel::Entity::find()
3375                .filter(
3376                    channel::Column::Id.is_in(
3377                        channel_invites
3378                            .into_iter()
3379                            .map(|channel_member| channel_member.channel_id),
3380                    ),
3381                )
3382                .all(&*tx)
3383                .await?;
3384
3385            let channels = channels
3386                .into_iter()
3387                .map(|channel| Channel {
3388                    id: channel.id,
3389                    name: channel.name,
3390                    parent_id: None,
3391                })
3392                .collect();
3393
3394            Ok(channels)
3395        })
3396        .await
3397    }
3398
3399    pub async fn get_channels(
3400        &self,
3401        user_id: UserId,
3402    ) -> Result<(Vec<Channel>, HashMap<ChannelId, Vec<UserId>>)> {
3403        self.transaction(|tx| async move {
3404            let tx = tx;
3405
3406            let starting_channel_ids: Vec<ChannelId> = channel_member::Entity::find()
3407                .filter(
3408                    channel_member::Column::UserId
3409                        .eq(user_id)
3410                        .and(channel_member::Column::Accepted.eq(true)),
3411                )
3412                .select_only()
3413                .column(channel_member::Column::ChannelId)
3414                .into_values::<_, QueryChannelIds>()
3415                .all(&*tx)
3416                .await?;
3417
3418            let parents_by_child_id = self
3419                .get_channel_descendants(starting_channel_ids, &*tx)
3420                .await?;
3421
3422            let mut channels = Vec::with_capacity(parents_by_child_id.len());
3423            let mut rows = channel::Entity::find()
3424                .filter(channel::Column::Id.is_in(parents_by_child_id.keys().copied()))
3425                .stream(&*tx)
3426                .await?;
3427
3428            while let Some(row) = rows.next().await {
3429                let row = row?;
3430                channels.push(Channel {
3431                    id: row.id,
3432                    name: row.name,
3433                    parent_id: parents_by_child_id.get(&row.id).copied().flatten(),
3434                });
3435            }
3436
3437            drop(rows);
3438
3439            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
3440            enum QueryUserIdsAndChannelIds {
3441                ChannelId,
3442                UserId,
3443            }
3444
3445            let mut participants = room_participant::Entity::find()
3446                .inner_join(room::Entity)
3447                .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
3448                .select_only()
3449                .column(room::Column::ChannelId)
3450                .column(room_participant::Column::UserId)
3451                .into_values::<_, QueryUserIdsAndChannelIds>()
3452                .stream(&*tx)
3453                .await?;
3454
3455            let mut participant_map: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
3456            while let Some(row) = participants.next().await {
3457                let row: (ChannelId, UserId) = row?;
3458                participant_map.entry(row.0).or_default().push(row.1)
3459            }
3460
3461            drop(participants);
3462
3463            Ok((channels, participant_map))
3464        })
3465        .await
3466    }
3467
3468    pub async fn get_channel_members(&self, id: ChannelId) -> Result<Vec<UserId>> {
3469        self.transaction(|tx| async move {
3470            let tx = tx;
3471            let user_ids = self.get_channel_members_internal(id, &*tx).await?;
3472            Ok(user_ids)
3473        })
3474        .await
3475    }
3476
3477    pub async fn get_channel_members_internal(
3478        &self,
3479        id: ChannelId,
3480        tx: &DatabaseTransaction,
3481    ) -> Result<Vec<UserId>> {
3482        let ancestor_ids = self.get_channel_ancestors(id, tx).await?;
3483        let user_ids = channel_member::Entity::find()
3484            .distinct()
3485            .filter(channel_member::Column::ChannelId.is_in(ancestor_ids.iter().copied()))
3486            .select_only()
3487            .column(channel_member::Column::UserId)
3488            .into_values::<_, QueryUserIds>()
3489            .all(&*tx)
3490            .await?;
3491        Ok(user_ids)
3492    }
3493
3494    async fn get_channel_ancestors(
3495        &self,
3496        id: ChannelId,
3497        tx: &DatabaseTransaction,
3498    ) -> Result<Vec<ChannelId>> {
3499        let sql = format!(
3500            r#"
3501            WITH RECURSIVE channel_tree(child_id, parent_id) AS (
3502                    SELECT CAST(NULL as INTEGER) as child_id, root_ids.column1 as parent_id
3503                    FROM (VALUES ({})) as root_ids
3504                UNION
3505                    SELECT channel_parents.child_id, channel_parents.parent_id
3506                    FROM channel_parents, channel_tree
3507                    WHERE channel_parents.child_id = channel_tree.parent_id
3508            )
3509            SELECT DISTINCT channel_tree.parent_id
3510            FROM channel_tree
3511            "#,
3512            id
3513        );
3514
3515        #[derive(FromQueryResult, Debug, PartialEq)]
3516        pub struct ChannelParent {
3517            pub parent_id: ChannelId,
3518        }
3519
3520        let stmt = Statement::from_string(self.pool.get_database_backend(), sql);
3521
3522        let mut channel_ids_stream = channel_parent::Entity::find()
3523            .from_raw_sql(stmt)
3524            .into_model::<ChannelParent>()
3525            .stream(&*tx)
3526            .await?;
3527
3528        let mut channel_ids = vec![];
3529        while let Some(channel_id) = channel_ids_stream.next().await {
3530            channel_ids.push(channel_id?.parent_id);
3531        }
3532
3533        Ok(channel_ids)
3534    }
3535
3536    async fn get_channel_descendants(
3537        &self,
3538        channel_ids: impl IntoIterator<Item = ChannelId>,
3539        tx: &DatabaseTransaction,
3540    ) -> Result<HashMap<ChannelId, Option<ChannelId>>> {
3541        let mut values = String::new();
3542        for id in channel_ids {
3543            if !values.is_empty() {
3544                values.push_str(", ");
3545            }
3546            write!(&mut values, "({})", id).unwrap();
3547        }
3548
3549        if values.is_empty() {
3550            return Ok(HashMap::default());
3551        }
3552
3553        let sql = format!(
3554            r#"
3555            WITH RECURSIVE channel_tree(child_id, parent_id) AS (
3556                    SELECT root_ids.column1 as child_id, CAST(NULL as INTEGER) as parent_id
3557                    FROM (VALUES {}) as root_ids
3558                UNION
3559                    SELECT channel_parents.child_id, channel_parents.parent_id
3560                    FROM channel_parents, channel_tree
3561                    WHERE channel_parents.parent_id = channel_tree.child_id
3562            )
3563            SELECT channel_tree.child_id, channel_tree.parent_id
3564            FROM channel_tree
3565            ORDER BY child_id, parent_id IS NOT NULL
3566            "#,
3567            values
3568        );
3569
3570        #[derive(FromQueryResult, Debug, PartialEq)]
3571        pub struct ChannelParent {
3572            pub child_id: ChannelId,
3573            pub parent_id: Option<ChannelId>,
3574        }
3575
3576        let stmt = Statement::from_string(self.pool.get_database_backend(), sql);
3577
3578        let mut parents_by_child_id = HashMap::default();
3579        let mut parents = channel_parent::Entity::find()
3580            .from_raw_sql(stmt)
3581            .into_model::<ChannelParent>()
3582            .stream(tx)
3583            .await?;
3584
3585        while let Some(parent) = parents.next().await {
3586            let parent = parent?;
3587            parents_by_child_id.insert(parent.child_id, parent.parent_id);
3588        }
3589
3590        Ok(parents_by_child_id)
3591    }
3592
3593    pub async fn get_channel(&self, channel_id: ChannelId) -> Result<Option<Channel>> {
3594        self.transaction(|tx| async move {
3595            let tx = tx;
3596            let channel = channel::Entity::find_by_id(channel_id).one(&*tx).await?;
3597
3598            Ok(channel.map(|channel| Channel {
3599                id: channel.id,
3600                name: channel.name,
3601                parent_id: None,
3602            }))
3603        })
3604        .await
3605    }
3606
3607    pub async fn room_id_for_channel(&self, channel_id: ChannelId) -> Result<RoomId> {
3608        self.transaction(|tx| async move {
3609            let tx = tx;
3610            let room = channel::Model {
3611                id: channel_id,
3612                ..Default::default()
3613            }
3614            .find_related(room::Entity)
3615            .one(&*tx)
3616            .await?
3617            .ok_or_else(|| anyhow!("invalid channel"))?;
3618            Ok(room.id)
3619        })
3620        .await
3621    }
3622
3623    async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
3624    where
3625        F: Send + Fn(TransactionHandle) -> Fut,
3626        Fut: Send + Future<Output = Result<T>>,
3627    {
3628        let body = async {
3629            let mut i = 0;
3630            loop {
3631                let (tx, result) = self.with_transaction(&f).await?;
3632                match result {
3633                    Ok(result) => match tx.commit().await.map_err(Into::into) {
3634                        Ok(()) => return Ok(result),
3635                        Err(error) => {
3636                            if !self.retry_on_serialization_error(&error, i).await {
3637                                return Err(error);
3638                            }
3639                        }
3640                    },
3641                    Err(error) => {
3642                        tx.rollback().await?;
3643                        if !self.retry_on_serialization_error(&error, i).await {
3644                            return Err(error);
3645                        }
3646                    }
3647                }
3648                i += 1;
3649            }
3650        };
3651
3652        self.run(body).await
3653    }
3654
3655    async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
3656    where
3657        F: Send + Fn(TransactionHandle) -> Fut,
3658        Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
3659    {
3660        let body = async {
3661            let mut i = 0;
3662            loop {
3663                let (tx, result) = self.with_transaction(&f).await?;
3664                match result {
3665                    Ok(Some((room_id, data))) => {
3666                        let lock = self.rooms.entry(room_id).or_default().clone();
3667                        let _guard = lock.lock_owned().await;
3668                        match tx.commit().await.map_err(Into::into) {
3669                            Ok(()) => {
3670                                return Ok(Some(RoomGuard {
3671                                    data,
3672                                    _guard,
3673                                    _not_send: PhantomData,
3674                                }));
3675                            }
3676                            Err(error) => {
3677                                if !self.retry_on_serialization_error(&error, i).await {
3678                                    return Err(error);
3679                                }
3680                            }
3681                        }
3682                    }
3683                    Ok(None) => match tx.commit().await.map_err(Into::into) {
3684                        Ok(()) => return Ok(None),
3685                        Err(error) => {
3686                            if !self.retry_on_serialization_error(&error, i).await {
3687                                return Err(error);
3688                            }
3689                        }
3690                    },
3691                    Err(error) => {
3692                        tx.rollback().await?;
3693                        if !self.retry_on_serialization_error(&error, i).await {
3694                            return Err(error);
3695                        }
3696                    }
3697                }
3698                i += 1;
3699            }
3700        };
3701
3702        self.run(body).await
3703    }
3704
3705    async fn room_transaction<F, Fut, T>(&self, room_id: RoomId, f: F) -> Result<RoomGuard<T>>
3706    where
3707        F: Send + Fn(TransactionHandle) -> Fut,
3708        Fut: Send + Future<Output = Result<T>>,
3709    {
3710        let body = async {
3711            let mut i = 0;
3712            loop {
3713                let lock = self.rooms.entry(room_id).or_default().clone();
3714                let _guard = lock.lock_owned().await;
3715                let (tx, result) = self.with_transaction(&f).await?;
3716                match result {
3717                    Ok(data) => match tx.commit().await.map_err(Into::into) {
3718                        Ok(()) => {
3719                            return Ok(RoomGuard {
3720                                data,
3721                                _guard,
3722                                _not_send: PhantomData,
3723                            });
3724                        }
3725                        Err(error) => {
3726                            if !self.retry_on_serialization_error(&error, i).await {
3727                                return Err(error);
3728                            }
3729                        }
3730                    },
3731                    Err(error) => {
3732                        tx.rollback().await?;
3733                        if !self.retry_on_serialization_error(&error, i).await {
3734                            return Err(error);
3735                        }
3736                    }
3737                }
3738                i += 1;
3739            }
3740        };
3741
3742        self.run(body).await
3743    }
3744
3745    async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
3746    where
3747        F: Send + Fn(TransactionHandle) -> Fut,
3748        Fut: Send + Future<Output = Result<T>>,
3749    {
3750        let tx = self
3751            .pool
3752            .begin_with_config(Some(IsolationLevel::Serializable), None)
3753            .await?;
3754
3755        let mut tx = Arc::new(Some(tx));
3756        let result = f(TransactionHandle(tx.clone())).await;
3757        let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
3758            return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
3759        };
3760
3761        Ok((tx, result))
3762    }
3763
3764    async fn run<F, T>(&self, future: F) -> Result<T>
3765    where
3766        F: Future<Output = Result<T>>,
3767    {
3768        #[cfg(test)]
3769        {
3770            if let Executor::Deterministic(executor) = &self.executor {
3771                executor.simulate_random_delay().await;
3772            }
3773
3774            self.runtime.as_ref().unwrap().block_on(future)
3775        }
3776
3777        #[cfg(not(test))]
3778        {
3779            future.await
3780        }
3781    }
3782
3783    async fn retry_on_serialization_error(&self, error: &Error, prev_attempt_count: u32) -> bool {
3784        // If the error is due to a failure to serialize concurrent transactions, then retry
3785        // this transaction after a delay. With each subsequent retry, double the delay duration.
3786        // Also vary the delay randomly in order to ensure different database connections retry
3787        // at different times.
3788        if is_serialization_error(error) {
3789            let base_delay = 4_u64 << prev_attempt_count.min(16);
3790            let randomized_delay = base_delay as f32 * self.rng.lock().await.gen_range(0.5..=2.0);
3791            log::info!(
3792                "retrying transaction after serialization error. delay: {} ms.",
3793                randomized_delay
3794            );
3795            self.executor
3796                .sleep(Duration::from_millis(randomized_delay as u64))
3797                .await;
3798            true
3799        } else {
3800            false
3801        }
3802    }
3803}
3804
3805fn is_serialization_error(error: &Error) -> bool {
3806    const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
3807    match error {
3808        Error::Database(
3809            DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
3810            | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
3811        ) if error
3812            .as_database_error()
3813            .and_then(|error| error.code())
3814            .as_deref()
3815            == Some(SERIALIZATION_FAILURE_CODE) =>
3816        {
3817            true
3818        }
3819        _ => false,
3820    }
3821}
3822
3823struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
3824
3825impl Deref for TransactionHandle {
3826    type Target = DatabaseTransaction;
3827
3828    fn deref(&self) -> &Self::Target {
3829        self.0.as_ref().as_ref().unwrap()
3830    }
3831}
3832
3833pub struct RoomGuard<T> {
3834    data: T,
3835    _guard: OwnedMutexGuard<()>,
3836    _not_send: PhantomData<Rc<()>>,
3837}
3838
3839impl<T> Deref for RoomGuard<T> {
3840    type Target = T;
3841
3842    fn deref(&self) -> &T {
3843        &self.data
3844    }
3845}
3846
3847impl<T> DerefMut for RoomGuard<T> {
3848    fn deref_mut(&mut self) -> &mut T {
3849        &mut self.data
3850    }
3851}
3852
3853#[derive(Debug, Serialize, Deserialize)]
3854pub struct NewUserParams {
3855    pub github_login: String,
3856    pub github_user_id: i32,
3857    pub invite_count: i32,
3858}
3859
3860#[derive(Debug)]
3861pub struct NewUserResult {
3862    pub user_id: UserId,
3863    pub metrics_id: String,
3864    pub inviting_user_id: Option<UserId>,
3865    pub signup_device_id: Option<String>,
3866}
3867
3868#[derive(FromQueryResult, Debug, PartialEq)]
3869pub struct Channel {
3870    pub id: ChannelId,
3871    pub name: String,
3872    pub parent_id: Option<ChannelId>,
3873}
3874
3875fn random_invite_code() -> String {
3876    nanoid::nanoid!(16)
3877}
3878
3879fn random_email_confirmation_code() -> String {
3880    nanoid::nanoid!(64)
3881}
3882
3883macro_rules! id_type {
3884    ($name:ident) => {
3885        #[derive(
3886            Clone,
3887            Copy,
3888            Debug,
3889            Default,
3890            PartialEq,
3891            Eq,
3892            PartialOrd,
3893            Ord,
3894            Hash,
3895            Serialize,
3896            Deserialize,
3897        )]
3898        #[serde(transparent)]
3899        pub struct $name(pub i32);
3900
3901        impl $name {
3902            #[allow(unused)]
3903            pub const MAX: Self = Self(i32::MAX);
3904
3905            #[allow(unused)]
3906            pub fn from_proto(value: u64) -> Self {
3907                Self(value as i32)
3908            }
3909
3910            #[allow(unused)]
3911            pub fn to_proto(self) -> u64 {
3912                self.0 as u64
3913            }
3914        }
3915
3916        impl std::fmt::Display for $name {
3917            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
3918                self.0.fmt(f)
3919            }
3920        }
3921
3922        impl From<$name> for sea_query::Value {
3923            fn from(value: $name) -> Self {
3924                sea_query::Value::Int(Some(value.0))
3925            }
3926        }
3927
3928        impl sea_orm::TryGetable for $name {
3929            fn try_get(
3930                res: &sea_orm::QueryResult,
3931                pre: &str,
3932                col: &str,
3933            ) -> Result<Self, sea_orm::TryGetError> {
3934                Ok(Self(i32::try_get(res, pre, col)?))
3935            }
3936        }
3937
3938        impl sea_query::ValueType for $name {
3939            fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
3940                match v {
3941                    Value::TinyInt(Some(int)) => {
3942                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3943                    }
3944                    Value::SmallInt(Some(int)) => {
3945                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3946                    }
3947                    Value::Int(Some(int)) => {
3948                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3949                    }
3950                    Value::BigInt(Some(int)) => {
3951                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3952                    }
3953                    Value::TinyUnsigned(Some(int)) => {
3954                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3955                    }
3956                    Value::SmallUnsigned(Some(int)) => {
3957                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3958                    }
3959                    Value::Unsigned(Some(int)) => {
3960                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3961                    }
3962                    Value::BigUnsigned(Some(int)) => {
3963                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3964                    }
3965                    _ => Err(sea_query::ValueTypeErr),
3966                }
3967            }
3968
3969            fn type_name() -> String {
3970                stringify!($name).into()
3971            }
3972
3973            fn array_type() -> sea_query::ArrayType {
3974                sea_query::ArrayType::Int
3975            }
3976
3977            fn column_type() -> sea_query::ColumnType {
3978                sea_query::ColumnType::Integer(None)
3979            }
3980        }
3981
3982        impl sea_orm::TryFromU64 for $name {
3983            fn try_from_u64(n: u64) -> Result<Self, DbErr> {
3984                Ok(Self(n.try_into().map_err(|_| {
3985                    DbErr::ConvertFromU64(concat!(
3986                        "error converting ",
3987                        stringify!($name),
3988                        " to u64"
3989                    ))
3990                })?))
3991            }
3992        }
3993
3994        impl sea_query::Nullable for $name {
3995            fn null() -> Value {
3996                Value::Int(None)
3997            }
3998        }
3999    };
4000}
4001
4002id_type!(AccessTokenId);
4003id_type!(ChannelId);
4004id_type!(ChannelMemberId);
4005id_type!(ContactId);
4006id_type!(FollowerId);
4007id_type!(RoomId);
4008id_type!(RoomParticipantId);
4009id_type!(ProjectId);
4010id_type!(ProjectCollaboratorId);
4011id_type!(ReplicaId);
4012id_type!(ServerId);
4013id_type!(SignupId);
4014id_type!(UserId);
4015
4016#[derive(Clone)]
4017pub struct JoinRoom {
4018    pub room: proto::Room,
4019    pub channel_id: Option<ChannelId>,
4020    pub channel_members: Vec<UserId>,
4021}
4022
4023pub struct RejoinedRoom {
4024    pub room: proto::Room,
4025    pub rejoined_projects: Vec<RejoinedProject>,
4026    pub reshared_projects: Vec<ResharedProject>,
4027    pub channel_id: Option<ChannelId>,
4028    pub channel_members: Vec<UserId>,
4029}
4030
4031pub struct ResharedProject {
4032    pub id: ProjectId,
4033    pub old_connection_id: ConnectionId,
4034    pub collaborators: Vec<ProjectCollaborator>,
4035    pub worktrees: Vec<proto::WorktreeMetadata>,
4036}
4037
4038pub struct RejoinedProject {
4039    pub id: ProjectId,
4040    pub old_connection_id: ConnectionId,
4041    pub collaborators: Vec<ProjectCollaborator>,
4042    pub worktrees: Vec<RejoinedWorktree>,
4043    pub language_servers: Vec<proto::LanguageServer>,
4044}
4045
4046#[derive(Debug)]
4047pub struct RejoinedWorktree {
4048    pub id: u64,
4049    pub abs_path: String,
4050    pub root_name: String,
4051    pub visible: bool,
4052    pub updated_entries: Vec<proto::Entry>,
4053    pub removed_entries: Vec<u64>,
4054    pub updated_repositories: Vec<proto::RepositoryEntry>,
4055    pub removed_repositories: Vec<u64>,
4056    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
4057    pub settings_files: Vec<WorktreeSettingsFile>,
4058    pub scan_id: u64,
4059    pub completed_scan_id: u64,
4060}
4061
4062pub struct LeftRoom {
4063    pub room: proto::Room,
4064    pub channel_id: Option<ChannelId>,
4065    pub channel_members: Vec<UserId>,
4066    pub left_projects: HashMap<ProjectId, LeftProject>,
4067    pub canceled_calls_to_user_ids: Vec<UserId>,
4068    pub deleted: bool,
4069}
4070
4071pub struct RefreshedRoom {
4072    pub room: proto::Room,
4073    pub channel_id: Option<ChannelId>,
4074    pub channel_members: Vec<UserId>,
4075    pub stale_participant_user_ids: Vec<UserId>,
4076    pub canceled_calls_to_user_ids: Vec<UserId>,
4077}
4078
4079pub struct Project {
4080    pub collaborators: Vec<ProjectCollaborator>,
4081    pub worktrees: BTreeMap<u64, Worktree>,
4082    pub language_servers: Vec<proto::LanguageServer>,
4083}
4084
4085pub struct ProjectCollaborator {
4086    pub connection_id: ConnectionId,
4087    pub user_id: UserId,
4088    pub replica_id: ReplicaId,
4089    pub is_host: bool,
4090}
4091
4092impl ProjectCollaborator {
4093    pub fn to_proto(&self) -> proto::Collaborator {
4094        proto::Collaborator {
4095            peer_id: Some(self.connection_id.into()),
4096            replica_id: self.replica_id.0 as u32,
4097            user_id: self.user_id.to_proto(),
4098        }
4099    }
4100}
4101
4102#[derive(Debug)]
4103pub struct LeftProject {
4104    pub id: ProjectId,
4105    pub host_user_id: UserId,
4106    pub host_connection_id: ConnectionId,
4107    pub connection_ids: Vec<ConnectionId>,
4108}
4109
4110pub struct Worktree {
4111    pub id: u64,
4112    pub abs_path: String,
4113    pub root_name: String,
4114    pub visible: bool,
4115    pub entries: Vec<proto::Entry>,
4116    pub repository_entries: BTreeMap<u64, proto::RepositoryEntry>,
4117    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
4118    pub settings_files: Vec<WorktreeSettingsFile>,
4119    pub scan_id: u64,
4120    pub completed_scan_id: u64,
4121}
4122
4123#[derive(Debug)]
4124pub struct WorktreeSettingsFile {
4125    pub path: String,
4126    pub content: String,
4127}
4128
4129#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
4130enum QueryChannelIds {
4131    ChannelId,
4132}
4133
4134#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
4135enum QueryUserIds {
4136    UserId,
4137}
4138
4139#[cfg(test)]
4140pub use test::*;
4141
4142#[cfg(test)]
4143mod test {
4144    use super::*;
4145    use gpui::executor::Background;
4146    use parking_lot::Mutex;
4147    use sea_orm::ConnectionTrait;
4148    use sqlx::migrate::MigrateDatabase;
4149    use std::sync::Arc;
4150
4151    pub struct TestDb {
4152        pub db: Option<Arc<Database>>,
4153        pub connection: Option<sqlx::AnyConnection>,
4154    }
4155
4156    impl TestDb {
4157        pub fn sqlite(background: Arc<Background>) -> Self {
4158            let url = format!("sqlite::memory:");
4159            let runtime = tokio::runtime::Builder::new_current_thread()
4160                .enable_io()
4161                .enable_time()
4162                .build()
4163                .unwrap();
4164
4165            let mut db = runtime.block_on(async {
4166                let mut options = ConnectOptions::new(url);
4167                options.max_connections(5);
4168                let db = Database::new(options, Executor::Deterministic(background))
4169                    .await
4170                    .unwrap();
4171                let sql = include_str!(concat!(
4172                    env!("CARGO_MANIFEST_DIR"),
4173                    "/migrations.sqlite/20221109000000_test_schema.sql"
4174                ));
4175                db.pool
4176                    .execute(sea_orm::Statement::from_string(
4177                        db.pool.get_database_backend(),
4178                        sql.into(),
4179                    ))
4180                    .await
4181                    .unwrap();
4182                db
4183            });
4184
4185            db.runtime = Some(runtime);
4186
4187            Self {
4188                db: Some(Arc::new(db)),
4189                connection: None,
4190            }
4191        }
4192
4193        pub fn postgres(background: Arc<Background>) -> Self {
4194            static LOCK: Mutex<()> = Mutex::new(());
4195
4196            let _guard = LOCK.lock();
4197            let mut rng = StdRng::from_entropy();
4198            let url = format!(
4199                "postgres://postgres@localhost/zed-test-{}",
4200                rng.gen::<u128>()
4201            );
4202            let runtime = tokio::runtime::Builder::new_current_thread()
4203                .enable_io()
4204                .enable_time()
4205                .build()
4206                .unwrap();
4207
4208            let mut db = runtime.block_on(async {
4209                sqlx::Postgres::create_database(&url)
4210                    .await
4211                    .expect("failed to create test db");
4212                let mut options = ConnectOptions::new(url);
4213                options
4214                    .max_connections(5)
4215                    .idle_timeout(Duration::from_secs(0));
4216                let db = Database::new(options, Executor::Deterministic(background))
4217                    .await
4218                    .unwrap();
4219                let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
4220                db.migrate(Path::new(migrations_path), false).await.unwrap();
4221                db
4222            });
4223
4224            db.runtime = Some(runtime);
4225
4226            Self {
4227                db: Some(Arc::new(db)),
4228                connection: None,
4229            }
4230        }
4231
4232        pub fn db(&self) -> &Arc<Database> {
4233            self.db.as_ref().unwrap()
4234        }
4235    }
4236
4237    impl Drop for TestDb {
4238        fn drop(&mut self) {
4239            let db = self.db.take().unwrap();
4240            if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
4241                db.runtime.as_ref().unwrap().block_on(async {
4242                    use util::ResultExt;
4243                    let query = "
4244                        SELECT pg_terminate_backend(pg_stat_activity.pid)
4245                        FROM pg_stat_activity
4246                        WHERE
4247                            pg_stat_activity.datname = current_database() AND
4248                            pid <> pg_backend_pid();
4249                    ";
4250                    db.pool
4251                        .execute(sea_orm::Statement::from_string(
4252                            db.pool.get_database_backend(),
4253                            query.into(),
4254                        ))
4255                        .await
4256                        .log_err();
4257                    sqlx::Postgres::drop_database(db.options.get_url())
4258                        .await
4259                        .log_err();
4260                })
4261            }
4262        }
4263    }
4264}