db.rs

   1mod access_token;
   2mod channel;
   3mod channel_member;
   4mod channel_parent;
   5mod contact;
   6mod follower;
   7mod language_server;
   8mod project;
   9mod project_collaborator;
  10mod room;
  11mod room_participant;
  12mod server;
  13mod signup;
  14#[cfg(test)]
  15mod tests;
  16mod user;
  17mod worktree;
  18mod worktree_diagnostic_summary;
  19mod worktree_entry;
  20mod worktree_repository;
  21mod worktree_repository_statuses;
  22mod worktree_settings_file;
  23
  24use crate::executor::Executor;
  25use crate::{Error, Result};
  26use anyhow::anyhow;
  27use collections::{BTreeMap, HashMap, HashSet};
  28pub use contact::Contact;
  29use dashmap::DashMap;
  30use futures::StreamExt;
  31use hyper::StatusCode;
  32use rand::prelude::StdRng;
  33use rand::{Rng, SeedableRng};
  34use rpc::{proto, ConnectionId};
  35use sea_orm::Condition;
  36pub use sea_orm::ConnectOptions;
  37use sea_orm::{
  38    entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
  39    DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
  40    Statement, TransactionTrait,
  41};
  42use sea_query::{Alias, Expr, OnConflict, Query};
  43use serde::{Deserialize, Serialize};
  44pub use signup::{Invite, NewSignup, WaitlistSummary};
  45use sqlx::migrate::{Migrate, Migration, MigrationSource};
  46use sqlx::Connection;
  47use std::fmt::Write as _;
  48use std::ops::{Deref, DerefMut};
  49use std::path::Path;
  50use std::time::Duration;
  51use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
  52use tokio::sync::{Mutex, OwnedMutexGuard};
  53pub use user::Model as User;
  54
  55pub struct Database {
  56    options: ConnectOptions,
  57    pool: DatabaseConnection,
  58    rooms: DashMap<RoomId, Arc<Mutex<()>>>,
  59    rng: Mutex<StdRng>,
  60    executor: Executor,
  61    #[cfg(test)]
  62    runtime: Option<tokio::runtime::Runtime>,
  63}
  64
  65impl Database {
  66    pub async fn new(options: ConnectOptions, executor: Executor) -> Result<Self> {
  67        Ok(Self {
  68            options: options.clone(),
  69            pool: sea_orm::Database::connect(options).await?,
  70            rooms: DashMap::with_capacity(16384),
  71            rng: Mutex::new(StdRng::seed_from_u64(0)),
  72            executor,
  73            #[cfg(test)]
  74            runtime: None,
  75        })
  76    }
  77
  78    #[cfg(test)]
  79    pub fn reset(&self) {
  80        self.rooms.clear();
  81    }
  82
  83    pub async fn migrate(
  84        &self,
  85        migrations_path: &Path,
  86        ignore_checksum_mismatch: bool,
  87    ) -> anyhow::Result<Vec<(Migration, Duration)>> {
  88        let migrations = MigrationSource::resolve(migrations_path)
  89            .await
  90            .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
  91
  92        let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
  93
  94        connection.ensure_migrations_table().await?;
  95        let applied_migrations: HashMap<_, _> = connection
  96            .list_applied_migrations()
  97            .await?
  98            .into_iter()
  99            .map(|m| (m.version, m))
 100            .collect();
 101
 102        let mut new_migrations = Vec::new();
 103        for migration in migrations {
 104            match applied_migrations.get(&migration.version) {
 105                Some(applied_migration) => {
 106                    if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
 107                    {
 108                        Err(anyhow!(
 109                            "checksum mismatch for applied migration {}",
 110                            migration.description
 111                        ))?;
 112                    }
 113                }
 114                None => {
 115                    let elapsed = connection.apply(&migration).await?;
 116                    new_migrations.push((migration, elapsed));
 117                }
 118            }
 119        }
 120
 121        Ok(new_migrations)
 122    }
 123
 124    pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
 125        self.transaction(|tx| async move {
 126            let server = server::ActiveModel {
 127                environment: ActiveValue::set(environment.into()),
 128                ..Default::default()
 129            }
 130            .insert(&*tx)
 131            .await?;
 132            Ok(server.id)
 133        })
 134        .await
 135    }
 136
 137    pub async fn stale_room_ids(
 138        &self,
 139        environment: &str,
 140        new_server_id: ServerId,
 141    ) -> Result<Vec<RoomId>> {
 142        self.transaction(|tx| async move {
 143            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 144            enum QueryAs {
 145                RoomId,
 146            }
 147
 148            let stale_server_epochs = self
 149                .stale_server_ids(environment, new_server_id, &tx)
 150                .await?;
 151            Ok(room_participant::Entity::find()
 152                .select_only()
 153                .column(room_participant::Column::RoomId)
 154                .distinct()
 155                .filter(
 156                    room_participant::Column::AnsweringConnectionServerId
 157                        .is_in(stale_server_epochs),
 158                )
 159                .into_values::<_, QueryAs>()
 160                .all(&*tx)
 161                .await?)
 162        })
 163        .await
 164    }
 165
 166    pub async fn refresh_room(
 167        &self,
 168        room_id: RoomId,
 169        new_server_id: ServerId,
 170    ) -> Result<RoomGuard<RefreshedRoom>> {
 171        self.room_transaction(room_id, |tx| async move {
 172            let stale_participant_filter = Condition::all()
 173                .add(room_participant::Column::RoomId.eq(room_id))
 174                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
 175                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
 176
 177            let stale_participant_user_ids = room_participant::Entity::find()
 178                .filter(stale_participant_filter.clone())
 179                .all(&*tx)
 180                .await?
 181                .into_iter()
 182                .map(|participant| participant.user_id)
 183                .collect::<Vec<_>>();
 184
 185            // Delete participants who failed to reconnect and cancel their calls.
 186            let mut canceled_calls_to_user_ids = Vec::new();
 187            room_participant::Entity::delete_many()
 188                .filter(stale_participant_filter)
 189                .exec(&*tx)
 190                .await?;
 191            let called_participants = room_participant::Entity::find()
 192                .filter(
 193                    Condition::all()
 194                        .add(
 195                            room_participant::Column::CallingUserId
 196                                .is_in(stale_participant_user_ids.iter().copied()),
 197                        )
 198                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
 199                )
 200                .all(&*tx)
 201                .await?;
 202            room_participant::Entity::delete_many()
 203                .filter(
 204                    room_participant::Column::Id
 205                        .is_in(called_participants.iter().map(|participant| participant.id)),
 206                )
 207                .exec(&*tx)
 208                .await?;
 209            canceled_calls_to_user_ids.extend(
 210                called_participants
 211                    .into_iter()
 212                    .map(|participant| participant.user_id),
 213            );
 214
 215            let room = self.get_room(room_id, &tx).await?;
 216            // Delete the room if it becomes empty.
 217            if room.participants.is_empty() {
 218                project::Entity::delete_many()
 219                    .filter(project::Column::RoomId.eq(room_id))
 220                    .exec(&*tx)
 221                    .await?;
 222                room::Entity::delete_by_id(room_id).exec(&*tx).await?;
 223            }
 224
 225            Ok(RefreshedRoom {
 226                room,
 227                stale_participant_user_ids,
 228                canceled_calls_to_user_ids,
 229            })
 230        })
 231        .await
 232    }
 233
 234    pub async fn delete_stale_servers(
 235        &self,
 236        environment: &str,
 237        new_server_id: ServerId,
 238    ) -> Result<()> {
 239        self.transaction(|tx| async move {
 240            server::Entity::delete_many()
 241                .filter(
 242                    Condition::all()
 243                        .add(server::Column::Environment.eq(environment))
 244                        .add(server::Column::Id.ne(new_server_id)),
 245                )
 246                .exec(&*tx)
 247                .await?;
 248            Ok(())
 249        })
 250        .await
 251    }
 252
 253    async fn stale_server_ids(
 254        &self,
 255        environment: &str,
 256        new_server_id: ServerId,
 257        tx: &DatabaseTransaction,
 258    ) -> Result<Vec<ServerId>> {
 259        let stale_servers = server::Entity::find()
 260            .filter(
 261                Condition::all()
 262                    .add(server::Column::Environment.eq(environment))
 263                    .add(server::Column::Id.ne(new_server_id)),
 264            )
 265            .all(&*tx)
 266            .await?;
 267        Ok(stale_servers.into_iter().map(|server| server.id).collect())
 268    }
 269
 270    // users
 271
 272    pub async fn create_user(
 273        &self,
 274        email_address: &str,
 275        admin: bool,
 276        params: NewUserParams,
 277    ) -> Result<NewUserResult> {
 278        self.transaction(|tx| async {
 279            let tx = tx;
 280            let user = user::Entity::insert(user::ActiveModel {
 281                email_address: ActiveValue::set(Some(email_address.into())),
 282                github_login: ActiveValue::set(params.github_login.clone()),
 283                github_user_id: ActiveValue::set(Some(params.github_user_id)),
 284                admin: ActiveValue::set(admin),
 285                metrics_id: ActiveValue::set(Uuid::new_v4()),
 286                ..Default::default()
 287            })
 288            .on_conflict(
 289                OnConflict::column(user::Column::GithubLogin)
 290                    .update_column(user::Column::GithubLogin)
 291                    .to_owned(),
 292            )
 293            .exec_with_returning(&*tx)
 294            .await?;
 295
 296            Ok(NewUserResult {
 297                user_id: user.id,
 298                metrics_id: user.metrics_id.to_string(),
 299                signup_device_id: None,
 300                inviting_user_id: None,
 301            })
 302        })
 303        .await
 304    }
 305
 306    pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
 307        self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
 308            .await
 309    }
 310
 311    pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
 312        self.transaction(|tx| async {
 313            let tx = tx;
 314            Ok(user::Entity::find()
 315                .filter(user::Column::Id.is_in(ids.iter().copied()))
 316                .all(&*tx)
 317                .await?)
 318        })
 319        .await
 320    }
 321
 322    pub async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>> {
 323        self.transaction(|tx| async move {
 324            Ok(user::Entity::find()
 325                .filter(user::Column::GithubLogin.eq(github_login))
 326                .one(&*tx)
 327                .await?)
 328        })
 329        .await
 330    }
 331
 332    pub async fn get_or_create_user_by_github_account(
 333        &self,
 334        github_login: &str,
 335        github_user_id: Option<i32>,
 336        github_email: Option<&str>,
 337    ) -> Result<Option<User>> {
 338        self.transaction(|tx| async move {
 339            let tx = &*tx;
 340            if let Some(github_user_id) = github_user_id {
 341                if let Some(user_by_github_user_id) = user::Entity::find()
 342                    .filter(user::Column::GithubUserId.eq(github_user_id))
 343                    .one(tx)
 344                    .await?
 345                {
 346                    let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
 347                    user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
 348                    Ok(Some(user_by_github_user_id.update(tx).await?))
 349                } else if let Some(user_by_github_login) = user::Entity::find()
 350                    .filter(user::Column::GithubLogin.eq(github_login))
 351                    .one(tx)
 352                    .await?
 353                {
 354                    let mut user_by_github_login = user_by_github_login.into_active_model();
 355                    user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
 356                    Ok(Some(user_by_github_login.update(tx).await?))
 357                } else {
 358                    let user = user::Entity::insert(user::ActiveModel {
 359                        email_address: ActiveValue::set(github_email.map(|email| email.into())),
 360                        github_login: ActiveValue::set(github_login.into()),
 361                        github_user_id: ActiveValue::set(Some(github_user_id)),
 362                        admin: ActiveValue::set(false),
 363                        invite_count: ActiveValue::set(0),
 364                        invite_code: ActiveValue::set(None),
 365                        metrics_id: ActiveValue::set(Uuid::new_v4()),
 366                        ..Default::default()
 367                    })
 368                    .exec_with_returning(&*tx)
 369                    .await?;
 370                    Ok(Some(user))
 371                }
 372            } else {
 373                Ok(user::Entity::find()
 374                    .filter(user::Column::GithubLogin.eq(github_login))
 375                    .one(tx)
 376                    .await?)
 377            }
 378        })
 379        .await
 380    }
 381
 382    pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
 383        self.transaction(|tx| async move {
 384            Ok(user::Entity::find()
 385                .order_by_asc(user::Column::GithubLogin)
 386                .limit(limit as u64)
 387                .offset(page as u64 * limit as u64)
 388                .all(&*tx)
 389                .await?)
 390        })
 391        .await
 392    }
 393
 394    pub async fn get_users_with_no_invites(
 395        &self,
 396        invited_by_another_user: bool,
 397    ) -> Result<Vec<User>> {
 398        self.transaction(|tx| async move {
 399            Ok(user::Entity::find()
 400                .filter(
 401                    user::Column::InviteCount
 402                        .eq(0)
 403                        .and(if invited_by_another_user {
 404                            user::Column::InviterId.is_not_null()
 405                        } else {
 406                            user::Column::InviterId.is_null()
 407                        }),
 408                )
 409                .all(&*tx)
 410                .await?)
 411        })
 412        .await
 413    }
 414
 415    pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
 416        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 417        enum QueryAs {
 418            MetricsId,
 419        }
 420
 421        self.transaction(|tx| async move {
 422            let metrics_id: Uuid = user::Entity::find_by_id(id)
 423                .select_only()
 424                .column(user::Column::MetricsId)
 425                .into_values::<_, QueryAs>()
 426                .one(&*tx)
 427                .await?
 428                .ok_or_else(|| anyhow!("could not find user"))?;
 429            Ok(metrics_id.to_string())
 430        })
 431        .await
 432    }
 433
 434    pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
 435        self.transaction(|tx| async move {
 436            user::Entity::update_many()
 437                .filter(user::Column::Id.eq(id))
 438                .set(user::ActiveModel {
 439                    admin: ActiveValue::set(is_admin),
 440                    ..Default::default()
 441                })
 442                .exec(&*tx)
 443                .await?;
 444            Ok(())
 445        })
 446        .await
 447    }
 448
 449    pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
 450        self.transaction(|tx| async move {
 451            user::Entity::update_many()
 452                .filter(user::Column::Id.eq(id))
 453                .set(user::ActiveModel {
 454                    connected_once: ActiveValue::set(connected_once),
 455                    ..Default::default()
 456                })
 457                .exec(&*tx)
 458                .await?;
 459            Ok(())
 460        })
 461        .await
 462    }
 463
 464    pub async fn destroy_user(&self, id: UserId) -> Result<()> {
 465        self.transaction(|tx| async move {
 466            access_token::Entity::delete_many()
 467                .filter(access_token::Column::UserId.eq(id))
 468                .exec(&*tx)
 469                .await?;
 470            user::Entity::delete_by_id(id).exec(&*tx).await?;
 471            Ok(())
 472        })
 473        .await
 474    }
 475
 476    // contacts
 477
 478    pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
 479        #[derive(Debug, FromQueryResult)]
 480        struct ContactWithUserBusyStatuses {
 481            user_id_a: UserId,
 482            user_id_b: UserId,
 483            a_to_b: bool,
 484            accepted: bool,
 485            should_notify: bool,
 486            user_a_busy: bool,
 487            user_b_busy: bool,
 488        }
 489
 490        self.transaction(|tx| async move {
 491            let user_a_participant = Alias::new("user_a_participant");
 492            let user_b_participant = Alias::new("user_b_participant");
 493            let mut db_contacts = contact::Entity::find()
 494                .column_as(
 495                    Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
 496                        .is_not_null(),
 497                    "user_a_busy",
 498                )
 499                .column_as(
 500                    Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
 501                        .is_not_null(),
 502                    "user_b_busy",
 503                )
 504                .filter(
 505                    contact::Column::UserIdA
 506                        .eq(user_id)
 507                        .or(contact::Column::UserIdB.eq(user_id)),
 508                )
 509                .join_as(
 510                    JoinType::LeftJoin,
 511                    contact::Relation::UserARoomParticipant.def(),
 512                    user_a_participant,
 513                )
 514                .join_as(
 515                    JoinType::LeftJoin,
 516                    contact::Relation::UserBRoomParticipant.def(),
 517                    user_b_participant,
 518                )
 519                .into_model::<ContactWithUserBusyStatuses>()
 520                .stream(&*tx)
 521                .await?;
 522
 523            let mut contacts = Vec::new();
 524            while let Some(db_contact) = db_contacts.next().await {
 525                let db_contact = db_contact?;
 526                if db_contact.user_id_a == user_id {
 527                    if db_contact.accepted {
 528                        contacts.push(Contact::Accepted {
 529                            user_id: db_contact.user_id_b,
 530                            should_notify: db_contact.should_notify && db_contact.a_to_b,
 531                            busy: db_contact.user_b_busy,
 532                        });
 533                    } else if db_contact.a_to_b {
 534                        contacts.push(Contact::Outgoing {
 535                            user_id: db_contact.user_id_b,
 536                        })
 537                    } else {
 538                        contacts.push(Contact::Incoming {
 539                            user_id: db_contact.user_id_b,
 540                            should_notify: db_contact.should_notify,
 541                        });
 542                    }
 543                } else if db_contact.accepted {
 544                    contacts.push(Contact::Accepted {
 545                        user_id: db_contact.user_id_a,
 546                        should_notify: db_contact.should_notify && !db_contact.a_to_b,
 547                        busy: db_contact.user_a_busy,
 548                    });
 549                } else if db_contact.a_to_b {
 550                    contacts.push(Contact::Incoming {
 551                        user_id: db_contact.user_id_a,
 552                        should_notify: db_contact.should_notify,
 553                    });
 554                } else {
 555                    contacts.push(Contact::Outgoing {
 556                        user_id: db_contact.user_id_a,
 557                    });
 558                }
 559            }
 560
 561            contacts.sort_unstable_by_key(|contact| contact.user_id());
 562
 563            Ok(contacts)
 564        })
 565        .await
 566    }
 567
 568    pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
 569        self.transaction(|tx| async move {
 570            let participant = room_participant::Entity::find()
 571                .filter(room_participant::Column::UserId.eq(user_id))
 572                .one(&*tx)
 573                .await?;
 574            Ok(participant.is_some())
 575        })
 576        .await
 577    }
 578
 579    pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
 580        self.transaction(|tx| async move {
 581            let (id_a, id_b) = if user_id_1 < user_id_2 {
 582                (user_id_1, user_id_2)
 583            } else {
 584                (user_id_2, user_id_1)
 585            };
 586
 587            Ok(contact::Entity::find()
 588                .filter(
 589                    contact::Column::UserIdA
 590                        .eq(id_a)
 591                        .and(contact::Column::UserIdB.eq(id_b))
 592                        .and(contact::Column::Accepted.eq(true)),
 593                )
 594                .one(&*tx)
 595                .await?
 596                .is_some())
 597        })
 598        .await
 599    }
 600
 601    pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
 602        self.transaction(|tx| async move {
 603            let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
 604                (sender_id, receiver_id, true)
 605            } else {
 606                (receiver_id, sender_id, false)
 607            };
 608
 609            let rows_affected = contact::Entity::insert(contact::ActiveModel {
 610                user_id_a: ActiveValue::set(id_a),
 611                user_id_b: ActiveValue::set(id_b),
 612                a_to_b: ActiveValue::set(a_to_b),
 613                accepted: ActiveValue::set(false),
 614                should_notify: ActiveValue::set(true),
 615                ..Default::default()
 616            })
 617            .on_conflict(
 618                OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
 619                    .values([
 620                        (contact::Column::Accepted, true.into()),
 621                        (contact::Column::ShouldNotify, false.into()),
 622                    ])
 623                    .action_and_where(
 624                        contact::Column::Accepted.eq(false).and(
 625                            contact::Column::AToB
 626                                .eq(a_to_b)
 627                                .and(contact::Column::UserIdA.eq(id_b))
 628                                .or(contact::Column::AToB
 629                                    .ne(a_to_b)
 630                                    .and(contact::Column::UserIdA.eq(id_a))),
 631                        ),
 632                    )
 633                    .to_owned(),
 634            )
 635            .exec_without_returning(&*tx)
 636            .await?;
 637
 638            if rows_affected == 1 {
 639                Ok(())
 640            } else {
 641                Err(anyhow!("contact already requested"))?
 642            }
 643        })
 644        .await
 645    }
 646
 647    /// Returns a bool indicating whether the removed contact had originally accepted or not
 648    ///
 649    /// Deletes the contact identified by the requester and responder ids, and then returns
 650    /// whether the deleted contact had originally accepted or was a pending contact request.
 651    ///
 652    /// # Arguments
 653    ///
 654    /// * `requester_id` - The user that initiates this request
 655    /// * `responder_id` - The user that will be removed
 656    pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<bool> {
 657        self.transaction(|tx| async move {
 658            let (id_a, id_b) = if responder_id < requester_id {
 659                (responder_id, requester_id)
 660            } else {
 661                (requester_id, responder_id)
 662            };
 663
 664            let contact = contact::Entity::find()
 665                .filter(
 666                    contact::Column::UserIdA
 667                        .eq(id_a)
 668                        .and(contact::Column::UserIdB.eq(id_b)),
 669                )
 670                .one(&*tx)
 671                .await?
 672                .ok_or_else(|| anyhow!("no such contact"))?;
 673
 674            contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
 675            Ok(contact.accepted)
 676        })
 677        .await
 678    }
 679
 680    pub async fn dismiss_contact_notification(
 681        &self,
 682        user_id: UserId,
 683        contact_user_id: UserId,
 684    ) -> Result<()> {
 685        self.transaction(|tx| async move {
 686            let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
 687                (user_id, contact_user_id, true)
 688            } else {
 689                (contact_user_id, user_id, false)
 690            };
 691
 692            let result = contact::Entity::update_many()
 693                .set(contact::ActiveModel {
 694                    should_notify: ActiveValue::set(false),
 695                    ..Default::default()
 696                })
 697                .filter(
 698                    contact::Column::UserIdA
 699                        .eq(id_a)
 700                        .and(contact::Column::UserIdB.eq(id_b))
 701                        .and(
 702                            contact::Column::AToB
 703                                .eq(a_to_b)
 704                                .and(contact::Column::Accepted.eq(true))
 705                                .or(contact::Column::AToB
 706                                    .ne(a_to_b)
 707                                    .and(contact::Column::Accepted.eq(false))),
 708                        ),
 709                )
 710                .exec(&*tx)
 711                .await?;
 712            if result.rows_affected == 0 {
 713                Err(anyhow!("no such contact request"))?
 714            } else {
 715                Ok(())
 716            }
 717        })
 718        .await
 719    }
 720
 721    pub async fn respond_to_contact_request(
 722        &self,
 723        responder_id: UserId,
 724        requester_id: UserId,
 725        accept: bool,
 726    ) -> Result<()> {
 727        self.transaction(|tx| async move {
 728            let (id_a, id_b, a_to_b) = if responder_id < requester_id {
 729                (responder_id, requester_id, false)
 730            } else {
 731                (requester_id, responder_id, true)
 732            };
 733            let rows_affected = if accept {
 734                let result = contact::Entity::update_many()
 735                    .set(contact::ActiveModel {
 736                        accepted: ActiveValue::set(true),
 737                        should_notify: ActiveValue::set(true),
 738                        ..Default::default()
 739                    })
 740                    .filter(
 741                        contact::Column::UserIdA
 742                            .eq(id_a)
 743                            .and(contact::Column::UserIdB.eq(id_b))
 744                            .and(contact::Column::AToB.eq(a_to_b)),
 745                    )
 746                    .exec(&*tx)
 747                    .await?;
 748                result.rows_affected
 749            } else {
 750                let result = contact::Entity::delete_many()
 751                    .filter(
 752                        contact::Column::UserIdA
 753                            .eq(id_a)
 754                            .and(contact::Column::UserIdB.eq(id_b))
 755                            .and(contact::Column::AToB.eq(a_to_b))
 756                            .and(contact::Column::Accepted.eq(false)),
 757                    )
 758                    .exec(&*tx)
 759                    .await?;
 760
 761                result.rows_affected
 762            };
 763
 764            if rows_affected == 1 {
 765                Ok(())
 766            } else {
 767                Err(anyhow!("no such contact request"))?
 768            }
 769        })
 770        .await
 771    }
 772
 773    pub fn fuzzy_like_string(string: &str) -> String {
 774        let mut result = String::with_capacity(string.len() * 2 + 1);
 775        for c in string.chars() {
 776            if c.is_alphanumeric() {
 777                result.push('%');
 778                result.push(c);
 779            }
 780        }
 781        result.push('%');
 782        result
 783    }
 784
 785    pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
 786        self.transaction(|tx| async {
 787            let tx = tx;
 788            let like_string = Self::fuzzy_like_string(name_query);
 789            let query = "
 790                SELECT users.*
 791                FROM users
 792                WHERE github_login ILIKE $1
 793                ORDER BY github_login <-> $2
 794                LIMIT $3
 795            ";
 796
 797            Ok(user::Entity::find()
 798                .from_raw_sql(Statement::from_sql_and_values(
 799                    self.pool.get_database_backend(),
 800                    query.into(),
 801                    vec![like_string.into(), name_query.into(), limit.into()],
 802                ))
 803                .all(&*tx)
 804                .await?)
 805        })
 806        .await
 807    }
 808
 809    // signups
 810
 811    pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
 812        self.transaction(|tx| async move {
 813            signup::Entity::insert(signup::ActiveModel {
 814                email_address: ActiveValue::set(signup.email_address.clone()),
 815                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 816                email_confirmation_sent: ActiveValue::set(false),
 817                platform_mac: ActiveValue::set(signup.platform_mac),
 818                platform_windows: ActiveValue::set(signup.platform_windows),
 819                platform_linux: ActiveValue::set(signup.platform_linux),
 820                platform_unknown: ActiveValue::set(false),
 821                editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
 822                programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
 823                device_id: ActiveValue::set(signup.device_id.clone()),
 824                added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
 825                ..Default::default()
 826            })
 827            .on_conflict(
 828                OnConflict::column(signup::Column::EmailAddress)
 829                    .update_columns([
 830                        signup::Column::PlatformMac,
 831                        signup::Column::PlatformWindows,
 832                        signup::Column::PlatformLinux,
 833                        signup::Column::EditorFeatures,
 834                        signup::Column::ProgrammingLanguages,
 835                        signup::Column::DeviceId,
 836                        signup::Column::AddedToMailingList,
 837                    ])
 838                    .to_owned(),
 839            )
 840            .exec(&*tx)
 841            .await?;
 842            Ok(())
 843        })
 844        .await
 845    }
 846
 847    pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
 848        self.transaction(|tx| async move {
 849            let signup = signup::Entity::find()
 850                .filter(signup::Column::EmailAddress.eq(email_address))
 851                .one(&*tx)
 852                .await?
 853                .ok_or_else(|| {
 854                    anyhow!("signup with email address {} doesn't exist", email_address)
 855                })?;
 856
 857            Ok(signup)
 858        })
 859        .await
 860    }
 861
 862    pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
 863        self.transaction(|tx| async move {
 864            let query = "
 865                SELECT
 866                    COUNT(*) as count,
 867                    COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
 868                    COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
 869                    COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
 870                    COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
 871                FROM (
 872                    SELECT *
 873                    FROM signups
 874                    WHERE
 875                        NOT email_confirmation_sent
 876                ) AS unsent
 877            ";
 878            Ok(
 879                WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
 880                    self.pool.get_database_backend(),
 881                    query.into(),
 882                    vec![],
 883                ))
 884                .one(&*tx)
 885                .await?
 886                .ok_or_else(|| anyhow!("invalid result"))?,
 887            )
 888        })
 889        .await
 890    }
 891
 892    pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
 893        let emails = invites
 894            .iter()
 895            .map(|s| s.email_address.as_str())
 896            .collect::<Vec<_>>();
 897        self.transaction(|tx| async {
 898            let tx = tx;
 899            signup::Entity::update_many()
 900                .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
 901                .set(signup::ActiveModel {
 902                    email_confirmation_sent: ActiveValue::set(true),
 903                    ..Default::default()
 904                })
 905                .exec(&*tx)
 906                .await?;
 907            Ok(())
 908        })
 909        .await
 910    }
 911
 912    pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
 913        self.transaction(|tx| async move {
 914            Ok(signup::Entity::find()
 915                .select_only()
 916                .column(signup::Column::EmailAddress)
 917                .column(signup::Column::EmailConfirmationCode)
 918                .filter(
 919                    signup::Column::EmailConfirmationSent.eq(false).and(
 920                        signup::Column::PlatformMac
 921                            .eq(true)
 922                            .or(signup::Column::PlatformUnknown.eq(true)),
 923                    ),
 924                )
 925                .order_by_asc(signup::Column::CreatedAt)
 926                .limit(count as u64)
 927                .into_model()
 928                .all(&*tx)
 929                .await?)
 930        })
 931        .await
 932    }
 933
 934    // invite codes
 935
 936    pub async fn create_invite_from_code(
 937        &self,
 938        code: &str,
 939        email_address: &str,
 940        device_id: Option<&str>,
 941        added_to_mailing_list: bool,
 942    ) -> Result<Invite> {
 943        self.transaction(|tx| async move {
 944            let existing_user = user::Entity::find()
 945                .filter(user::Column::EmailAddress.eq(email_address))
 946                .one(&*tx)
 947                .await?;
 948
 949            if existing_user.is_some() {
 950                Err(anyhow!("email address is already in use"))?;
 951            }
 952
 953            let inviting_user_with_invites = match user::Entity::find()
 954                .filter(
 955                    user::Column::InviteCode
 956                        .eq(code)
 957                        .and(user::Column::InviteCount.gt(0)),
 958                )
 959                .one(&*tx)
 960                .await?
 961            {
 962                Some(inviting_user) => inviting_user,
 963                None => {
 964                    return Err(Error::Http(
 965                        StatusCode::UNAUTHORIZED,
 966                        "unable to find an invite code with invites remaining".to_string(),
 967                    ))?
 968                }
 969            };
 970            user::Entity::update_many()
 971                .filter(
 972                    user::Column::Id
 973                        .eq(inviting_user_with_invites.id)
 974                        .and(user::Column::InviteCount.gt(0)),
 975                )
 976                .col_expr(
 977                    user::Column::InviteCount,
 978                    Expr::col(user::Column::InviteCount).sub(1),
 979                )
 980                .exec(&*tx)
 981                .await?;
 982
 983            let signup = signup::Entity::insert(signup::ActiveModel {
 984                email_address: ActiveValue::set(email_address.into()),
 985                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 986                email_confirmation_sent: ActiveValue::set(false),
 987                inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
 988                platform_linux: ActiveValue::set(false),
 989                platform_mac: ActiveValue::set(false),
 990                platform_windows: ActiveValue::set(false),
 991                platform_unknown: ActiveValue::set(true),
 992                device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
 993                added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
 994                ..Default::default()
 995            })
 996            .on_conflict(
 997                OnConflict::column(signup::Column::EmailAddress)
 998                    .update_column(signup::Column::InvitingUserId)
 999                    .to_owned(),
1000            )
1001            .exec_with_returning(&*tx)
1002            .await?;
1003
1004            Ok(Invite {
1005                email_address: signup.email_address,
1006                email_confirmation_code: signup.email_confirmation_code,
1007            })
1008        })
1009        .await
1010    }
1011
1012    pub async fn create_user_from_invite(
1013        &self,
1014        invite: &Invite,
1015        user: NewUserParams,
1016    ) -> Result<Option<NewUserResult>> {
1017        self.transaction(|tx| async {
1018            let tx = tx;
1019            let signup = signup::Entity::find()
1020                .filter(
1021                    signup::Column::EmailAddress
1022                        .eq(invite.email_address.as_str())
1023                        .and(
1024                            signup::Column::EmailConfirmationCode
1025                                .eq(invite.email_confirmation_code.as_str()),
1026                        ),
1027                )
1028                .one(&*tx)
1029                .await?
1030                .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
1031
1032            if signup.user_id.is_some() {
1033                return Ok(None);
1034            }
1035
1036            let user = user::Entity::insert(user::ActiveModel {
1037                email_address: ActiveValue::set(Some(invite.email_address.clone())),
1038                github_login: ActiveValue::set(user.github_login.clone()),
1039                github_user_id: ActiveValue::set(Some(user.github_user_id)),
1040                admin: ActiveValue::set(false),
1041                invite_count: ActiveValue::set(user.invite_count),
1042                invite_code: ActiveValue::set(Some(random_invite_code())),
1043                metrics_id: ActiveValue::set(Uuid::new_v4()),
1044                ..Default::default()
1045            })
1046            .on_conflict(
1047                OnConflict::column(user::Column::GithubLogin)
1048                    .update_columns([
1049                        user::Column::EmailAddress,
1050                        user::Column::GithubUserId,
1051                        user::Column::Admin,
1052                    ])
1053                    .to_owned(),
1054            )
1055            .exec_with_returning(&*tx)
1056            .await?;
1057
1058            let mut signup = signup.into_active_model();
1059            signup.user_id = ActiveValue::set(Some(user.id));
1060            let signup = signup.update(&*tx).await?;
1061
1062            if let Some(inviting_user_id) = signup.inviting_user_id {
1063                let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1064                    (inviting_user_id, user.id, true)
1065                } else {
1066                    (user.id, inviting_user_id, false)
1067                };
1068
1069                contact::Entity::insert(contact::ActiveModel {
1070                    user_id_a: ActiveValue::set(user_id_a),
1071                    user_id_b: ActiveValue::set(user_id_b),
1072                    a_to_b: ActiveValue::set(a_to_b),
1073                    should_notify: ActiveValue::set(true),
1074                    accepted: ActiveValue::set(true),
1075                    ..Default::default()
1076                })
1077                .on_conflict(OnConflict::new().do_nothing().to_owned())
1078                .exec_without_returning(&*tx)
1079                .await?;
1080            }
1081
1082            Ok(Some(NewUserResult {
1083                user_id: user.id,
1084                metrics_id: user.metrics_id.to_string(),
1085                inviting_user_id: signup.inviting_user_id,
1086                signup_device_id: signup.device_id,
1087            }))
1088        })
1089        .await
1090    }
1091
1092    pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1093        self.transaction(|tx| async move {
1094            if count > 0 {
1095                user::Entity::update_many()
1096                    .filter(
1097                        user::Column::Id
1098                            .eq(id)
1099                            .and(user::Column::InviteCode.is_null()),
1100                    )
1101                    .set(user::ActiveModel {
1102                        invite_code: ActiveValue::set(Some(random_invite_code())),
1103                        ..Default::default()
1104                    })
1105                    .exec(&*tx)
1106                    .await?;
1107            }
1108
1109            user::Entity::update_many()
1110                .filter(user::Column::Id.eq(id))
1111                .set(user::ActiveModel {
1112                    invite_count: ActiveValue::set(count),
1113                    ..Default::default()
1114                })
1115                .exec(&*tx)
1116                .await?;
1117            Ok(())
1118        })
1119        .await
1120    }
1121
1122    pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1123        self.transaction(|tx| async move {
1124            match user::Entity::find_by_id(id).one(&*tx).await? {
1125                Some(user) if user.invite_code.is_some() => {
1126                    Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1127                }
1128                _ => Ok(None),
1129            }
1130        })
1131        .await
1132    }
1133
1134    pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1135        self.transaction(|tx| async move {
1136            user::Entity::find()
1137                .filter(user::Column::InviteCode.eq(code))
1138                .one(&*tx)
1139                .await?
1140                .ok_or_else(|| {
1141                    Error::Http(
1142                        StatusCode::NOT_FOUND,
1143                        "that invite code does not exist".to_string(),
1144                    )
1145                })
1146        })
1147        .await
1148    }
1149
1150    // rooms
1151
1152    pub async fn incoming_call_for_user(
1153        &self,
1154        user_id: UserId,
1155    ) -> Result<Option<proto::IncomingCall>> {
1156        self.transaction(|tx| async move {
1157            let pending_participant = room_participant::Entity::find()
1158                .filter(
1159                    room_participant::Column::UserId
1160                        .eq(user_id)
1161                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1162                )
1163                .one(&*tx)
1164                .await?;
1165
1166            if let Some(pending_participant) = pending_participant {
1167                let room = self.get_room(pending_participant.room_id, &tx).await?;
1168                Ok(Self::build_incoming_call(&room, user_id))
1169            } else {
1170                Ok(None)
1171            }
1172        })
1173        .await
1174    }
1175
1176    pub async fn create_room(
1177        &self,
1178        user_id: UserId,
1179        connection: ConnectionId,
1180        live_kit_room: &str,
1181    ) -> Result<proto::Room> {
1182        self.transaction(|tx| async move {
1183            let room = room::ActiveModel {
1184                live_kit_room: ActiveValue::set(live_kit_room.into()),
1185                ..Default::default()
1186            }
1187            .insert(&*tx)
1188            .await?;
1189            room_participant::ActiveModel {
1190                room_id: ActiveValue::set(room.id),
1191                user_id: ActiveValue::set(user_id),
1192                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1193                answering_connection_server_id: ActiveValue::set(Some(ServerId(
1194                    connection.owner_id as i32,
1195                ))),
1196                answering_connection_lost: ActiveValue::set(false),
1197                calling_user_id: ActiveValue::set(user_id),
1198                calling_connection_id: ActiveValue::set(connection.id as i32),
1199                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1200                    connection.owner_id as i32,
1201                ))),
1202                ..Default::default()
1203            }
1204            .insert(&*tx)
1205            .await?;
1206
1207            let room = self.get_room(room.id, &tx).await?;
1208            Ok(room)
1209        })
1210        .await
1211    }
1212
1213    pub async fn call(
1214        &self,
1215        room_id: RoomId,
1216        calling_user_id: UserId,
1217        calling_connection: ConnectionId,
1218        called_user_id: UserId,
1219        initial_project_id: Option<ProjectId>,
1220    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1221        self.room_transaction(room_id, |tx| async move {
1222            room_participant::ActiveModel {
1223                room_id: ActiveValue::set(room_id),
1224                user_id: ActiveValue::set(called_user_id),
1225                answering_connection_lost: ActiveValue::set(false),
1226                calling_user_id: ActiveValue::set(calling_user_id),
1227                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1228                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1229                    calling_connection.owner_id as i32,
1230                ))),
1231                initial_project_id: ActiveValue::set(initial_project_id),
1232                ..Default::default()
1233            }
1234            .insert(&*tx)
1235            .await?;
1236
1237            let room = self.get_room(room_id, &tx).await?;
1238            let incoming_call = Self::build_incoming_call(&room, called_user_id)
1239                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1240            Ok((room, incoming_call))
1241        })
1242        .await
1243    }
1244
1245    pub async fn call_failed(
1246        &self,
1247        room_id: RoomId,
1248        called_user_id: UserId,
1249    ) -> Result<RoomGuard<proto::Room>> {
1250        self.room_transaction(room_id, |tx| async move {
1251            room_participant::Entity::delete_many()
1252                .filter(
1253                    room_participant::Column::RoomId
1254                        .eq(room_id)
1255                        .and(room_participant::Column::UserId.eq(called_user_id)),
1256                )
1257                .exec(&*tx)
1258                .await?;
1259            let room = self.get_room(room_id, &tx).await?;
1260            Ok(room)
1261        })
1262        .await
1263    }
1264
1265    pub async fn decline_call(
1266        &self,
1267        expected_room_id: Option<RoomId>,
1268        user_id: UserId,
1269    ) -> Result<Option<RoomGuard<proto::Room>>> {
1270        self.optional_room_transaction(|tx| async move {
1271            let mut filter = Condition::all()
1272                .add(room_participant::Column::UserId.eq(user_id))
1273                .add(room_participant::Column::AnsweringConnectionId.is_null());
1274            if let Some(room_id) = expected_room_id {
1275                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1276            }
1277            let participant = room_participant::Entity::find()
1278                .filter(filter)
1279                .one(&*tx)
1280                .await?;
1281
1282            let participant = if let Some(participant) = participant {
1283                participant
1284            } else if expected_room_id.is_some() {
1285                return Err(anyhow!("could not find call to decline"))?;
1286            } else {
1287                return Ok(None);
1288            };
1289
1290            let room_id = participant.room_id;
1291            room_participant::Entity::delete(participant.into_active_model())
1292                .exec(&*tx)
1293                .await?;
1294
1295            let room = self.get_room(room_id, &tx).await?;
1296            Ok(Some((room_id, room)))
1297        })
1298        .await
1299    }
1300
1301    pub async fn cancel_call(
1302        &self,
1303        room_id: RoomId,
1304        calling_connection: ConnectionId,
1305        called_user_id: UserId,
1306    ) -> Result<RoomGuard<proto::Room>> {
1307        self.room_transaction(room_id, |tx| async move {
1308            let participant = room_participant::Entity::find()
1309                .filter(
1310                    Condition::all()
1311                        .add(room_participant::Column::UserId.eq(called_user_id))
1312                        .add(room_participant::Column::RoomId.eq(room_id))
1313                        .add(
1314                            room_participant::Column::CallingConnectionId
1315                                .eq(calling_connection.id as i32),
1316                        )
1317                        .add(
1318                            room_participant::Column::CallingConnectionServerId
1319                                .eq(calling_connection.owner_id as i32),
1320                        )
1321                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1322                )
1323                .one(&*tx)
1324                .await?
1325                .ok_or_else(|| anyhow!("no call to cancel"))?;
1326
1327            room_participant::Entity::delete(participant.into_active_model())
1328                .exec(&*tx)
1329                .await?;
1330
1331            let room = self.get_room(room_id, &tx).await?;
1332            Ok(room)
1333        })
1334        .await
1335    }
1336
1337    pub async fn join_room(
1338        &self,
1339        room_id: RoomId,
1340        user_id: UserId,
1341        channel_id: Option<ChannelId>,
1342        connection: ConnectionId,
1343    ) -> Result<RoomGuard<proto::Room>> {
1344        self.room_transaction(room_id, |tx| async move {
1345            if let Some(channel_id) = channel_id {
1346                channel_member::Entity::find()
1347                    .filter(
1348                        channel_member::Column::ChannelId
1349                            .eq(channel_id)
1350                            .and(channel_member::Column::UserId.eq(user_id))
1351                            .and(channel_member::Column::Accepted.eq(true)),
1352                    )
1353                    .one(&*tx)
1354                    .await?
1355                    .ok_or_else(|| anyhow!("no such channel membership"))?;
1356
1357                room_participant::ActiveModel {
1358                    room_id: ActiveValue::set(room_id),
1359                    user_id: ActiveValue::set(user_id),
1360                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1361                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1362                        connection.owner_id as i32,
1363                    ))),
1364                    answering_connection_lost: ActiveValue::set(false),
1365                    // Redundant for the channel join use case, used for channel and call invitations
1366                    calling_user_id: ActiveValue::set(user_id),
1367                    calling_connection_id: ActiveValue::set(connection.id as i32),
1368                    calling_connection_server_id: ActiveValue::set(Some(ServerId(
1369                        connection.owner_id as i32,
1370                    ))),
1371                    ..Default::default()
1372                }
1373                .insert(&*tx)
1374                .await?;
1375            } else {
1376                let result = room_participant::Entity::update_many()
1377                    .filter(
1378                        Condition::all()
1379                            .add(room_participant::Column::RoomId.eq(room_id))
1380                            .add(room_participant::Column::UserId.eq(user_id))
1381                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
1382                    )
1383                    .set(room_participant::ActiveModel {
1384                        answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1385                        answering_connection_server_id: ActiveValue::set(Some(ServerId(
1386                            connection.owner_id as i32,
1387                        ))),
1388                        answering_connection_lost: ActiveValue::set(false),
1389                        ..Default::default()
1390                    })
1391                    .exec(&*tx)
1392                    .await?;
1393                if result.rows_affected == 0 {
1394                    Err(anyhow!("room does not exist or was already joined"))?;
1395                }
1396            }
1397
1398            let room = self.get_room(room_id, &tx).await?;
1399            Ok(room)
1400        })
1401        .await
1402    }
1403
1404    pub async fn rejoin_room(
1405        &self,
1406        rejoin_room: proto::RejoinRoom,
1407        user_id: UserId,
1408        connection: ConnectionId,
1409    ) -> Result<RoomGuard<RejoinedRoom>> {
1410        let room_id = RoomId::from_proto(rejoin_room.id);
1411        self.room_transaction(room_id, |tx| async {
1412            let tx = tx;
1413            let participant_update = room_participant::Entity::update_many()
1414                .filter(
1415                    Condition::all()
1416                        .add(room_participant::Column::RoomId.eq(room_id))
1417                        .add(room_participant::Column::UserId.eq(user_id))
1418                        .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1419                        .add(
1420                            Condition::any()
1421                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1422                                .add(
1423                                    room_participant::Column::AnsweringConnectionServerId
1424                                        .ne(connection.owner_id as i32),
1425                                ),
1426                        ),
1427                )
1428                .set(room_participant::ActiveModel {
1429                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1430                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1431                        connection.owner_id as i32,
1432                    ))),
1433                    answering_connection_lost: ActiveValue::set(false),
1434                    ..Default::default()
1435                })
1436                .exec(&*tx)
1437                .await?;
1438            if participant_update.rows_affected == 0 {
1439                return Err(anyhow!("room does not exist or was already joined"))?;
1440            }
1441
1442            let mut reshared_projects = Vec::new();
1443            for reshared_project in &rejoin_room.reshared_projects {
1444                let project_id = ProjectId::from_proto(reshared_project.project_id);
1445                let project = project::Entity::find_by_id(project_id)
1446                    .one(&*tx)
1447                    .await?
1448                    .ok_or_else(|| anyhow!("project does not exist"))?;
1449                if project.host_user_id != user_id {
1450                    return Err(anyhow!("no such project"))?;
1451                }
1452
1453                let mut collaborators = project
1454                    .find_related(project_collaborator::Entity)
1455                    .all(&*tx)
1456                    .await?;
1457                let host_ix = collaborators
1458                    .iter()
1459                    .position(|collaborator| {
1460                        collaborator.user_id == user_id && collaborator.is_host
1461                    })
1462                    .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1463                let host = collaborators.swap_remove(host_ix);
1464                let old_connection_id = host.connection();
1465
1466                project::Entity::update(project::ActiveModel {
1467                    host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1468                    host_connection_server_id: ActiveValue::set(Some(ServerId(
1469                        connection.owner_id as i32,
1470                    ))),
1471                    ..project.into_active_model()
1472                })
1473                .exec(&*tx)
1474                .await?;
1475                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1476                    connection_id: ActiveValue::set(connection.id as i32),
1477                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1478                    ..host.into_active_model()
1479                })
1480                .exec(&*tx)
1481                .await?;
1482
1483                self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1484                    .await?;
1485
1486                reshared_projects.push(ResharedProject {
1487                    id: project_id,
1488                    old_connection_id,
1489                    collaborators: collaborators
1490                        .iter()
1491                        .map(|collaborator| ProjectCollaborator {
1492                            connection_id: collaborator.connection(),
1493                            user_id: collaborator.user_id,
1494                            replica_id: collaborator.replica_id,
1495                            is_host: collaborator.is_host,
1496                        })
1497                        .collect(),
1498                    worktrees: reshared_project.worktrees.clone(),
1499                });
1500            }
1501
1502            project::Entity::delete_many()
1503                .filter(
1504                    Condition::all()
1505                        .add(project::Column::RoomId.eq(room_id))
1506                        .add(project::Column::HostUserId.eq(user_id))
1507                        .add(
1508                            project::Column::Id
1509                                .is_not_in(reshared_projects.iter().map(|project| project.id)),
1510                        ),
1511                )
1512                .exec(&*tx)
1513                .await?;
1514
1515            let mut rejoined_projects = Vec::new();
1516            for rejoined_project in &rejoin_room.rejoined_projects {
1517                let project_id = ProjectId::from_proto(rejoined_project.id);
1518                let Some(project) = project::Entity::find_by_id(project_id)
1519                    .one(&*tx)
1520                    .await? else { continue };
1521
1522                let mut worktrees = Vec::new();
1523                let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1524                for db_worktree in db_worktrees {
1525                    let mut worktree = RejoinedWorktree {
1526                        id: db_worktree.id as u64,
1527                        abs_path: db_worktree.abs_path,
1528                        root_name: db_worktree.root_name,
1529                        visible: db_worktree.visible,
1530                        updated_entries: Default::default(),
1531                        removed_entries: Default::default(),
1532                        updated_repositories: Default::default(),
1533                        removed_repositories: Default::default(),
1534                        diagnostic_summaries: Default::default(),
1535                        settings_files: Default::default(),
1536                        scan_id: db_worktree.scan_id as u64,
1537                        completed_scan_id: db_worktree.completed_scan_id as u64,
1538                    };
1539
1540                    let rejoined_worktree = rejoined_project
1541                        .worktrees
1542                        .iter()
1543                        .find(|worktree| worktree.id == db_worktree.id as u64);
1544
1545                    // File entries
1546                    {
1547                        let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1548                            worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1549                        } else {
1550                            worktree_entry::Column::IsDeleted.eq(false)
1551                        };
1552
1553                        let mut db_entries = worktree_entry::Entity::find()
1554                            .filter(
1555                                Condition::all()
1556                                    .add(worktree_entry::Column::ProjectId.eq(project.id))
1557                                    .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1558                                    .add(entry_filter),
1559                            )
1560                            .stream(&*tx)
1561                            .await?;
1562
1563                        while let Some(db_entry) = db_entries.next().await {
1564                            let db_entry = db_entry?;
1565                            if db_entry.is_deleted {
1566                                worktree.removed_entries.push(db_entry.id as u64);
1567                            } else {
1568                                worktree.updated_entries.push(proto::Entry {
1569                                    id: db_entry.id as u64,
1570                                    is_dir: db_entry.is_dir,
1571                                    path: db_entry.path,
1572                                    inode: db_entry.inode as u64,
1573                                    mtime: Some(proto::Timestamp {
1574                                        seconds: db_entry.mtime_seconds as u64,
1575                                        nanos: db_entry.mtime_nanos as u32,
1576                                    }),
1577                                    is_symlink: db_entry.is_symlink,
1578                                    is_ignored: db_entry.is_ignored,
1579                                    is_external: db_entry.is_external,
1580                                    git_status: db_entry.git_status.map(|status| status as i32),
1581                                });
1582                            }
1583                        }
1584                    }
1585
1586                    // Repository Entries
1587                    {
1588                        let repository_entry_filter =
1589                            if let Some(rejoined_worktree) = rejoined_worktree {
1590                                worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
1591                            } else {
1592                                worktree_repository::Column::IsDeleted.eq(false)
1593                            };
1594
1595                        let mut db_repositories = worktree_repository::Entity::find()
1596                            .filter(
1597                                Condition::all()
1598                                    .add(worktree_repository::Column::ProjectId.eq(project.id))
1599                                    .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
1600                                    .add(repository_entry_filter),
1601                            )
1602                            .stream(&*tx)
1603                            .await?;
1604
1605                        while let Some(db_repository) = db_repositories.next().await {
1606                            let db_repository = db_repository?;
1607                            if db_repository.is_deleted {
1608                                worktree
1609                                    .removed_repositories
1610                                    .push(db_repository.work_directory_id as u64);
1611                            } else {
1612                                worktree.updated_repositories.push(proto::RepositoryEntry {
1613                                    work_directory_id: db_repository.work_directory_id as u64,
1614                                    branch: db_repository.branch,
1615                                });
1616                            }
1617                        }
1618                    }
1619
1620                    worktrees.push(worktree);
1621                }
1622
1623                let language_servers = project
1624                    .find_related(language_server::Entity)
1625                    .all(&*tx)
1626                    .await?
1627                    .into_iter()
1628                    .map(|language_server| proto::LanguageServer {
1629                        id: language_server.id as u64,
1630                        name: language_server.name,
1631                    })
1632                    .collect::<Vec<_>>();
1633
1634                {
1635                    let mut db_settings_files = worktree_settings_file::Entity::find()
1636                        .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
1637                        .stream(&*tx)
1638                        .await?;
1639                    while let Some(db_settings_file) = db_settings_files.next().await {
1640                        let db_settings_file = db_settings_file?;
1641                        if let Some(worktree) = worktrees
1642                            .iter_mut()
1643                            .find(|w| w.id == db_settings_file.worktree_id as u64)
1644                        {
1645                            worktree.settings_files.push(WorktreeSettingsFile {
1646                                path: db_settings_file.path,
1647                                content: db_settings_file.content,
1648                            });
1649                        }
1650                    }
1651                }
1652
1653                let mut collaborators = project
1654                    .find_related(project_collaborator::Entity)
1655                    .all(&*tx)
1656                    .await?;
1657                let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1658                    .iter()
1659                    .position(|collaborator| collaborator.user_id == user_id)
1660                {
1661                    collaborators.swap_remove(self_collaborator_ix)
1662                } else {
1663                    continue;
1664                };
1665                let old_connection_id = self_collaborator.connection();
1666                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1667                    connection_id: ActiveValue::set(connection.id as i32),
1668                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1669                    ..self_collaborator.into_active_model()
1670                })
1671                .exec(&*tx)
1672                .await?;
1673
1674                let collaborators = collaborators
1675                    .into_iter()
1676                    .map(|collaborator| ProjectCollaborator {
1677                        connection_id: collaborator.connection(),
1678                        user_id: collaborator.user_id,
1679                        replica_id: collaborator.replica_id,
1680                        is_host: collaborator.is_host,
1681                    })
1682                    .collect::<Vec<_>>();
1683
1684                rejoined_projects.push(RejoinedProject {
1685                    id: project_id,
1686                    old_connection_id,
1687                    collaborators,
1688                    worktrees,
1689                    language_servers,
1690                });
1691            }
1692
1693            let room = self.get_room(room_id, &tx).await?;
1694            Ok(RejoinedRoom {
1695                room,
1696                rejoined_projects,
1697                reshared_projects,
1698            })
1699        })
1700        .await
1701    }
1702
1703    pub async fn leave_room(
1704        &self,
1705        connection: ConnectionId,
1706    ) -> Result<Option<RoomGuard<LeftRoom>>> {
1707        self.optional_room_transaction(|tx| async move {
1708            let leaving_participant = room_participant::Entity::find()
1709                .filter(
1710                    Condition::all()
1711                        .add(
1712                            room_participant::Column::AnsweringConnectionId
1713                                .eq(connection.id as i32),
1714                        )
1715                        .add(
1716                            room_participant::Column::AnsweringConnectionServerId
1717                                .eq(connection.owner_id as i32),
1718                        ),
1719                )
1720                .one(&*tx)
1721                .await?;
1722
1723            if let Some(leaving_participant) = leaving_participant {
1724                // Leave room.
1725                let room_id = leaving_participant.room_id;
1726                room_participant::Entity::delete_by_id(leaving_participant.id)
1727                    .exec(&*tx)
1728                    .await?;
1729
1730                // Cancel pending calls initiated by the leaving user.
1731                let called_participants = room_participant::Entity::find()
1732                    .filter(
1733                        Condition::all()
1734                            .add(
1735                                room_participant::Column::CallingUserId
1736                                    .eq(leaving_participant.user_id),
1737                            )
1738                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
1739                    )
1740                    .all(&*tx)
1741                    .await?;
1742                room_participant::Entity::delete_many()
1743                    .filter(
1744                        room_participant::Column::Id
1745                            .is_in(called_participants.iter().map(|participant| participant.id)),
1746                    )
1747                    .exec(&*tx)
1748                    .await?;
1749                let canceled_calls_to_user_ids = called_participants
1750                    .into_iter()
1751                    .map(|participant| participant.user_id)
1752                    .collect();
1753
1754                // Detect left projects.
1755                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1756                enum QueryProjectIds {
1757                    ProjectId,
1758                }
1759                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1760                    .select_only()
1761                    .column_as(
1762                        project_collaborator::Column::ProjectId,
1763                        QueryProjectIds::ProjectId,
1764                    )
1765                    .filter(
1766                        Condition::all()
1767                            .add(
1768                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1769                            )
1770                            .add(
1771                                project_collaborator::Column::ConnectionServerId
1772                                    .eq(connection.owner_id as i32),
1773                            ),
1774                    )
1775                    .into_values::<_, QueryProjectIds>()
1776                    .all(&*tx)
1777                    .await?;
1778                let mut left_projects = HashMap::default();
1779                let mut collaborators = project_collaborator::Entity::find()
1780                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1781                    .stream(&*tx)
1782                    .await?;
1783                while let Some(collaborator) = collaborators.next().await {
1784                    let collaborator = collaborator?;
1785                    let left_project =
1786                        left_projects
1787                            .entry(collaborator.project_id)
1788                            .or_insert(LeftProject {
1789                                id: collaborator.project_id,
1790                                host_user_id: Default::default(),
1791                                connection_ids: Default::default(),
1792                                host_connection_id: Default::default(),
1793                            });
1794
1795                    let collaborator_connection_id = collaborator.connection();
1796                    if collaborator_connection_id != connection {
1797                        left_project.connection_ids.push(collaborator_connection_id);
1798                    }
1799
1800                    if collaborator.is_host {
1801                        left_project.host_user_id = collaborator.user_id;
1802                        left_project.host_connection_id = collaborator_connection_id;
1803                    }
1804                }
1805                drop(collaborators);
1806
1807                // Leave projects.
1808                project_collaborator::Entity::delete_many()
1809                    .filter(
1810                        Condition::all()
1811                            .add(
1812                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1813                            )
1814                            .add(
1815                                project_collaborator::Column::ConnectionServerId
1816                                    .eq(connection.owner_id as i32),
1817                            ),
1818                    )
1819                    .exec(&*tx)
1820                    .await?;
1821
1822                // Unshare projects.
1823                project::Entity::delete_many()
1824                    .filter(
1825                        Condition::all()
1826                            .add(project::Column::RoomId.eq(room_id))
1827                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
1828                            .add(
1829                                project::Column::HostConnectionServerId
1830                                    .eq(connection.owner_id as i32),
1831                            ),
1832                    )
1833                    .exec(&*tx)
1834                    .await?;
1835
1836                let room = self.get_room(room_id, &tx).await?;
1837                let deleted = if room.participants.is_empty() {
1838                    let result = room::Entity::delete_by_id(room_id)
1839                        .filter(room::Column::ChannelId.is_null())
1840                        .exec(&*tx)
1841                        .await?;
1842                    result.rows_affected > 0
1843                } else {
1844                    false
1845                };
1846
1847                let left_room = LeftRoom {
1848                    room,
1849                    left_projects,
1850                    canceled_calls_to_user_ids,
1851                    deleted,
1852                };
1853
1854                if left_room.room.participants.is_empty() {
1855                    self.rooms.remove(&room_id);
1856                }
1857
1858                Ok(Some((room_id, left_room)))
1859            } else {
1860                Ok(None)
1861            }
1862        })
1863        .await
1864    }
1865
1866    pub async fn follow(
1867        &self,
1868        project_id: ProjectId,
1869        leader_connection: ConnectionId,
1870        follower_connection: ConnectionId,
1871    ) -> Result<RoomGuard<proto::Room>> {
1872        let room_id = self.room_id_for_project(project_id).await?;
1873        self.room_transaction(room_id, |tx| async move {
1874            follower::ActiveModel {
1875                room_id: ActiveValue::set(room_id),
1876                project_id: ActiveValue::set(project_id),
1877                leader_connection_server_id: ActiveValue::set(ServerId(
1878                    leader_connection.owner_id as i32,
1879                )),
1880                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1881                follower_connection_server_id: ActiveValue::set(ServerId(
1882                    follower_connection.owner_id as i32,
1883                )),
1884                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1885                ..Default::default()
1886            }
1887            .insert(&*tx)
1888            .await?;
1889
1890            let room = self.get_room(room_id, &*tx).await?;
1891            Ok(room)
1892        })
1893        .await
1894    }
1895
1896    pub async fn unfollow(
1897        &self,
1898        project_id: ProjectId,
1899        leader_connection: ConnectionId,
1900        follower_connection: ConnectionId,
1901    ) -> Result<RoomGuard<proto::Room>> {
1902        let room_id = self.room_id_for_project(project_id).await?;
1903        self.room_transaction(room_id, |tx| async move {
1904            follower::Entity::delete_many()
1905                .filter(
1906                    Condition::all()
1907                        .add(follower::Column::ProjectId.eq(project_id))
1908                        .add(
1909                            follower::Column::LeaderConnectionServerId
1910                                .eq(leader_connection.owner_id),
1911                        )
1912                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1913                        .add(
1914                            follower::Column::FollowerConnectionServerId
1915                                .eq(follower_connection.owner_id),
1916                        )
1917                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1918                )
1919                .exec(&*tx)
1920                .await?;
1921
1922            let room = self.get_room(room_id, &*tx).await?;
1923            Ok(room)
1924        })
1925        .await
1926    }
1927
1928    pub async fn update_room_participant_location(
1929        &self,
1930        room_id: RoomId,
1931        connection: ConnectionId,
1932        location: proto::ParticipantLocation,
1933    ) -> Result<RoomGuard<proto::Room>> {
1934        self.room_transaction(room_id, |tx| async {
1935            let tx = tx;
1936            let location_kind;
1937            let location_project_id;
1938            match location
1939                .variant
1940                .as_ref()
1941                .ok_or_else(|| anyhow!("invalid location"))?
1942            {
1943                proto::participant_location::Variant::SharedProject(project) => {
1944                    location_kind = 0;
1945                    location_project_id = Some(ProjectId::from_proto(project.id));
1946                }
1947                proto::participant_location::Variant::UnsharedProject(_) => {
1948                    location_kind = 1;
1949                    location_project_id = None;
1950                }
1951                proto::participant_location::Variant::External(_) => {
1952                    location_kind = 2;
1953                    location_project_id = None;
1954                }
1955            }
1956
1957            let result = room_participant::Entity::update_many()
1958                .filter(
1959                    Condition::all()
1960                        .add(room_participant::Column::RoomId.eq(room_id))
1961                        .add(
1962                            room_participant::Column::AnsweringConnectionId
1963                                .eq(connection.id as i32),
1964                        )
1965                        .add(
1966                            room_participant::Column::AnsweringConnectionServerId
1967                                .eq(connection.owner_id as i32),
1968                        ),
1969                )
1970                .set(room_participant::ActiveModel {
1971                    location_kind: ActiveValue::set(Some(location_kind)),
1972                    location_project_id: ActiveValue::set(location_project_id),
1973                    ..Default::default()
1974                })
1975                .exec(&*tx)
1976                .await?;
1977
1978            if result.rows_affected == 1 {
1979                let room = self.get_room(room_id, &tx).await?;
1980                Ok(room)
1981            } else {
1982                Err(anyhow!("could not update room participant location"))?
1983            }
1984        })
1985        .await
1986    }
1987
1988    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1989        self.transaction(|tx| async move {
1990            let participant = room_participant::Entity::find()
1991                .filter(
1992                    Condition::all()
1993                        .add(
1994                            room_participant::Column::AnsweringConnectionId
1995                                .eq(connection.id as i32),
1996                        )
1997                        .add(
1998                            room_participant::Column::AnsweringConnectionServerId
1999                                .eq(connection.owner_id as i32),
2000                        ),
2001                )
2002                .one(&*tx)
2003                .await?
2004                .ok_or_else(|| anyhow!("not a participant in any room"))?;
2005
2006            room_participant::Entity::update(room_participant::ActiveModel {
2007                answering_connection_lost: ActiveValue::set(true),
2008                ..participant.into_active_model()
2009            })
2010            .exec(&*tx)
2011            .await?;
2012
2013            Ok(())
2014        })
2015        .await
2016    }
2017
2018    fn build_incoming_call(
2019        room: &proto::Room,
2020        called_user_id: UserId,
2021    ) -> Option<proto::IncomingCall> {
2022        let pending_participant = room
2023            .pending_participants
2024            .iter()
2025            .find(|participant| participant.user_id == called_user_id.to_proto())?;
2026
2027        Some(proto::IncomingCall {
2028            room_id: room.id,
2029            calling_user_id: pending_participant.calling_user_id,
2030            participant_user_ids: room
2031                .participants
2032                .iter()
2033                .map(|participant| participant.user_id)
2034                .collect(),
2035            initial_project: room.participants.iter().find_map(|participant| {
2036                let initial_project_id = pending_participant.initial_project_id?;
2037                participant
2038                    .projects
2039                    .iter()
2040                    .find(|project| project.id == initial_project_id)
2041                    .cloned()
2042            }),
2043        })
2044    }
2045
2046    async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
2047        let db_room = room::Entity::find_by_id(room_id)
2048            .one(tx)
2049            .await?
2050            .ok_or_else(|| anyhow!("could not find room"))?;
2051
2052        let mut db_participants = db_room
2053            .find_related(room_participant::Entity)
2054            .stream(tx)
2055            .await?;
2056        let mut participants = HashMap::default();
2057        let mut pending_participants = Vec::new();
2058        while let Some(db_participant) = db_participants.next().await {
2059            let db_participant = db_participant?;
2060            if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
2061                .answering_connection_id
2062                .zip(db_participant.answering_connection_server_id)
2063            {
2064                let location = match (
2065                    db_participant.location_kind,
2066                    db_participant.location_project_id,
2067                ) {
2068                    (Some(0), Some(project_id)) => {
2069                        Some(proto::participant_location::Variant::SharedProject(
2070                            proto::participant_location::SharedProject {
2071                                id: project_id.to_proto(),
2072                            },
2073                        ))
2074                    }
2075                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
2076                        Default::default(),
2077                    )),
2078                    _ => Some(proto::participant_location::Variant::External(
2079                        Default::default(),
2080                    )),
2081                };
2082
2083                let answering_connection = ConnectionId {
2084                    owner_id: answering_connection_server_id.0 as u32,
2085                    id: answering_connection_id as u32,
2086                };
2087                participants.insert(
2088                    answering_connection,
2089                    proto::Participant {
2090                        user_id: db_participant.user_id.to_proto(),
2091                        peer_id: Some(answering_connection.into()),
2092                        projects: Default::default(),
2093                        location: Some(proto::ParticipantLocation { variant: location }),
2094                    },
2095                );
2096            } else {
2097                pending_participants.push(proto::PendingParticipant {
2098                    user_id: db_participant.user_id.to_proto(),
2099                    calling_user_id: db_participant.calling_user_id.to_proto(),
2100                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
2101                });
2102            }
2103        }
2104        drop(db_participants);
2105
2106        let mut db_projects = db_room
2107            .find_related(project::Entity)
2108            .find_with_related(worktree::Entity)
2109            .stream(tx)
2110            .await?;
2111
2112        while let Some(row) = db_projects.next().await {
2113            let (db_project, db_worktree) = row?;
2114            let host_connection = db_project.host_connection()?;
2115            if let Some(participant) = participants.get_mut(&host_connection) {
2116                let project = if let Some(project) = participant
2117                    .projects
2118                    .iter_mut()
2119                    .find(|project| project.id == db_project.id.to_proto())
2120                {
2121                    project
2122                } else {
2123                    participant.projects.push(proto::ParticipantProject {
2124                        id: db_project.id.to_proto(),
2125                        worktree_root_names: Default::default(),
2126                    });
2127                    participant.projects.last_mut().unwrap()
2128                };
2129
2130                if let Some(db_worktree) = db_worktree {
2131                    if db_worktree.visible {
2132                        project.worktree_root_names.push(db_worktree.root_name);
2133                    }
2134                }
2135            }
2136        }
2137        drop(db_projects);
2138
2139        let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
2140        let mut followers = Vec::new();
2141        while let Some(db_follower) = db_followers.next().await {
2142            let db_follower = db_follower?;
2143            followers.push(proto::Follower {
2144                leader_id: Some(db_follower.leader_connection().into()),
2145                follower_id: Some(db_follower.follower_connection().into()),
2146                project_id: db_follower.project_id.to_proto(),
2147            });
2148        }
2149
2150        Ok(proto::Room {
2151            id: db_room.id.to_proto(),
2152            live_kit_room: db_room.live_kit_room,
2153            participants: participants.into_values().collect(),
2154            pending_participants,
2155            followers,
2156        })
2157    }
2158
2159    // projects
2160
2161    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
2162        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2163        enum QueryAs {
2164            Count,
2165        }
2166
2167        self.transaction(|tx| async move {
2168            Ok(project::Entity::find()
2169                .select_only()
2170                .column_as(project::Column::Id.count(), QueryAs::Count)
2171                .inner_join(user::Entity)
2172                .filter(user::Column::Admin.eq(false))
2173                .into_values::<_, QueryAs>()
2174                .one(&*tx)
2175                .await?
2176                .unwrap_or(0i64) as usize)
2177        })
2178        .await
2179    }
2180
2181    pub async fn share_project(
2182        &self,
2183        room_id: RoomId,
2184        connection: ConnectionId,
2185        worktrees: &[proto::WorktreeMetadata],
2186    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
2187        self.room_transaction(room_id, |tx| async move {
2188            let participant = room_participant::Entity::find()
2189                .filter(
2190                    Condition::all()
2191                        .add(
2192                            room_participant::Column::AnsweringConnectionId
2193                                .eq(connection.id as i32),
2194                        )
2195                        .add(
2196                            room_participant::Column::AnsweringConnectionServerId
2197                                .eq(connection.owner_id as i32),
2198                        ),
2199                )
2200                .one(&*tx)
2201                .await?
2202                .ok_or_else(|| anyhow!("could not find participant"))?;
2203            if participant.room_id != room_id {
2204                return Err(anyhow!("shared project on unexpected room"))?;
2205            }
2206
2207            let project = project::ActiveModel {
2208                room_id: ActiveValue::set(participant.room_id),
2209                host_user_id: ActiveValue::set(participant.user_id),
2210                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
2211                host_connection_server_id: ActiveValue::set(Some(ServerId(
2212                    connection.owner_id as i32,
2213                ))),
2214                ..Default::default()
2215            }
2216            .insert(&*tx)
2217            .await?;
2218
2219            if !worktrees.is_empty() {
2220                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2221                    worktree::ActiveModel {
2222                        id: ActiveValue::set(worktree.id as i64),
2223                        project_id: ActiveValue::set(project.id),
2224                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
2225                        root_name: ActiveValue::set(worktree.root_name.clone()),
2226                        visible: ActiveValue::set(worktree.visible),
2227                        scan_id: ActiveValue::set(0),
2228                        completed_scan_id: ActiveValue::set(0),
2229                    }
2230                }))
2231                .exec(&*tx)
2232                .await?;
2233            }
2234
2235            project_collaborator::ActiveModel {
2236                project_id: ActiveValue::set(project.id),
2237                connection_id: ActiveValue::set(connection.id as i32),
2238                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2239                user_id: ActiveValue::set(participant.user_id),
2240                replica_id: ActiveValue::set(ReplicaId(0)),
2241                is_host: ActiveValue::set(true),
2242                ..Default::default()
2243            }
2244            .insert(&*tx)
2245            .await?;
2246
2247            let room = self.get_room(room_id, &tx).await?;
2248            Ok((project.id, room))
2249        })
2250        .await
2251    }
2252
2253    pub async fn unshare_project(
2254        &self,
2255        project_id: ProjectId,
2256        connection: ConnectionId,
2257    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2258        let room_id = self.room_id_for_project(project_id).await?;
2259        self.room_transaction(room_id, |tx| async move {
2260            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2261
2262            let project = project::Entity::find_by_id(project_id)
2263                .one(&*tx)
2264                .await?
2265                .ok_or_else(|| anyhow!("project not found"))?;
2266            if project.host_connection()? == connection {
2267                project::Entity::delete(project.into_active_model())
2268                    .exec(&*tx)
2269                    .await?;
2270                let room = self.get_room(room_id, &tx).await?;
2271                Ok((room, guest_connection_ids))
2272            } else {
2273                Err(anyhow!("cannot unshare a project hosted by another user"))?
2274            }
2275        })
2276        .await
2277    }
2278
2279    pub async fn update_project(
2280        &self,
2281        project_id: ProjectId,
2282        connection: ConnectionId,
2283        worktrees: &[proto::WorktreeMetadata],
2284    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2285        let room_id = self.room_id_for_project(project_id).await?;
2286        self.room_transaction(room_id, |tx| async move {
2287            let project = project::Entity::find_by_id(project_id)
2288                .filter(
2289                    Condition::all()
2290                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2291                        .add(
2292                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2293                        ),
2294                )
2295                .one(&*tx)
2296                .await?
2297                .ok_or_else(|| anyhow!("no such project"))?;
2298
2299            self.update_project_worktrees(project.id, worktrees, &tx)
2300                .await?;
2301
2302            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2303            let room = self.get_room(project.room_id, &tx).await?;
2304            Ok((room, guest_connection_ids))
2305        })
2306        .await
2307    }
2308
2309    async fn update_project_worktrees(
2310        &self,
2311        project_id: ProjectId,
2312        worktrees: &[proto::WorktreeMetadata],
2313        tx: &DatabaseTransaction,
2314    ) -> Result<()> {
2315        if !worktrees.is_empty() {
2316            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2317                id: ActiveValue::set(worktree.id as i64),
2318                project_id: ActiveValue::set(project_id),
2319                abs_path: ActiveValue::set(worktree.abs_path.clone()),
2320                root_name: ActiveValue::set(worktree.root_name.clone()),
2321                visible: ActiveValue::set(worktree.visible),
2322                scan_id: ActiveValue::set(0),
2323                completed_scan_id: ActiveValue::set(0),
2324            }))
2325            .on_conflict(
2326                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2327                    .update_column(worktree::Column::RootName)
2328                    .to_owned(),
2329            )
2330            .exec(&*tx)
2331            .await?;
2332        }
2333
2334        worktree::Entity::delete_many()
2335            .filter(worktree::Column::ProjectId.eq(project_id).and(
2336                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2337            ))
2338            .exec(&*tx)
2339            .await?;
2340
2341        Ok(())
2342    }
2343
2344    pub async fn update_worktree(
2345        &self,
2346        update: &proto::UpdateWorktree,
2347        connection: ConnectionId,
2348    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2349        let project_id = ProjectId::from_proto(update.project_id);
2350        let worktree_id = update.worktree_id as i64;
2351        let room_id = self.room_id_for_project(project_id).await?;
2352        self.room_transaction(room_id, |tx| async move {
2353            // Ensure the update comes from the host.
2354            let _project = project::Entity::find_by_id(project_id)
2355                .filter(
2356                    Condition::all()
2357                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2358                        .add(
2359                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2360                        ),
2361                )
2362                .one(&*tx)
2363                .await?
2364                .ok_or_else(|| anyhow!("no such project"))?;
2365
2366            // Update metadata.
2367            worktree::Entity::update(worktree::ActiveModel {
2368                id: ActiveValue::set(worktree_id),
2369                project_id: ActiveValue::set(project_id),
2370                root_name: ActiveValue::set(update.root_name.clone()),
2371                scan_id: ActiveValue::set(update.scan_id as i64),
2372                completed_scan_id: if update.is_last_update {
2373                    ActiveValue::set(update.scan_id as i64)
2374                } else {
2375                    ActiveValue::default()
2376                },
2377                abs_path: ActiveValue::set(update.abs_path.clone()),
2378                ..Default::default()
2379            })
2380            .exec(&*tx)
2381            .await?;
2382
2383            if !update.updated_entries.is_empty() {
2384                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2385                    let mtime = entry.mtime.clone().unwrap_or_default();
2386                    worktree_entry::ActiveModel {
2387                        project_id: ActiveValue::set(project_id),
2388                        worktree_id: ActiveValue::set(worktree_id),
2389                        id: ActiveValue::set(entry.id as i64),
2390                        is_dir: ActiveValue::set(entry.is_dir),
2391                        path: ActiveValue::set(entry.path.clone()),
2392                        inode: ActiveValue::set(entry.inode as i64),
2393                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2394                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2395                        is_symlink: ActiveValue::set(entry.is_symlink),
2396                        is_ignored: ActiveValue::set(entry.is_ignored),
2397                        is_external: ActiveValue::set(entry.is_external),
2398                        git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
2399                        is_deleted: ActiveValue::set(false),
2400                        scan_id: ActiveValue::set(update.scan_id as i64),
2401                    }
2402                }))
2403                .on_conflict(
2404                    OnConflict::columns([
2405                        worktree_entry::Column::ProjectId,
2406                        worktree_entry::Column::WorktreeId,
2407                        worktree_entry::Column::Id,
2408                    ])
2409                    .update_columns([
2410                        worktree_entry::Column::IsDir,
2411                        worktree_entry::Column::Path,
2412                        worktree_entry::Column::Inode,
2413                        worktree_entry::Column::MtimeSeconds,
2414                        worktree_entry::Column::MtimeNanos,
2415                        worktree_entry::Column::IsSymlink,
2416                        worktree_entry::Column::IsIgnored,
2417                        worktree_entry::Column::GitStatus,
2418                        worktree_entry::Column::ScanId,
2419                    ])
2420                    .to_owned(),
2421                )
2422                .exec(&*tx)
2423                .await?;
2424            }
2425
2426            if !update.removed_entries.is_empty() {
2427                worktree_entry::Entity::update_many()
2428                    .filter(
2429                        worktree_entry::Column::ProjectId
2430                            .eq(project_id)
2431                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2432                            .and(
2433                                worktree_entry::Column::Id
2434                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2435                            ),
2436                    )
2437                    .set(worktree_entry::ActiveModel {
2438                        is_deleted: ActiveValue::Set(true),
2439                        scan_id: ActiveValue::Set(update.scan_id as i64),
2440                        ..Default::default()
2441                    })
2442                    .exec(&*tx)
2443                    .await?;
2444            }
2445
2446            if !update.updated_repositories.is_empty() {
2447                worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
2448                    |repository| worktree_repository::ActiveModel {
2449                        project_id: ActiveValue::set(project_id),
2450                        worktree_id: ActiveValue::set(worktree_id),
2451                        work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
2452                        scan_id: ActiveValue::set(update.scan_id as i64),
2453                        branch: ActiveValue::set(repository.branch.clone()),
2454                        is_deleted: ActiveValue::set(false),
2455                    },
2456                ))
2457                .on_conflict(
2458                    OnConflict::columns([
2459                        worktree_repository::Column::ProjectId,
2460                        worktree_repository::Column::WorktreeId,
2461                        worktree_repository::Column::WorkDirectoryId,
2462                    ])
2463                    .update_columns([
2464                        worktree_repository::Column::ScanId,
2465                        worktree_repository::Column::Branch,
2466                    ])
2467                    .to_owned(),
2468                )
2469                .exec(&*tx)
2470                .await?;
2471            }
2472
2473            if !update.removed_repositories.is_empty() {
2474                worktree_repository::Entity::update_many()
2475                    .filter(
2476                        worktree_repository::Column::ProjectId
2477                            .eq(project_id)
2478                            .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
2479                            .and(
2480                                worktree_repository::Column::WorkDirectoryId
2481                                    .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
2482                            ),
2483                    )
2484                    .set(worktree_repository::ActiveModel {
2485                        is_deleted: ActiveValue::Set(true),
2486                        scan_id: ActiveValue::Set(update.scan_id as i64),
2487                        ..Default::default()
2488                    })
2489                    .exec(&*tx)
2490                    .await?;
2491            }
2492
2493            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2494            Ok(connection_ids)
2495        })
2496        .await
2497    }
2498
2499    pub async fn update_diagnostic_summary(
2500        &self,
2501        update: &proto::UpdateDiagnosticSummary,
2502        connection: ConnectionId,
2503    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2504        let project_id = ProjectId::from_proto(update.project_id);
2505        let worktree_id = update.worktree_id as i64;
2506        let room_id = self.room_id_for_project(project_id).await?;
2507        self.room_transaction(room_id, |tx| async move {
2508            let summary = update
2509                .summary
2510                .as_ref()
2511                .ok_or_else(|| anyhow!("invalid summary"))?;
2512
2513            // Ensure the update comes from the host.
2514            let project = project::Entity::find_by_id(project_id)
2515                .one(&*tx)
2516                .await?
2517                .ok_or_else(|| anyhow!("no such project"))?;
2518            if project.host_connection()? != connection {
2519                return Err(anyhow!("can't update a project hosted by someone else"))?;
2520            }
2521
2522            // Update summary.
2523            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2524                project_id: ActiveValue::set(project_id),
2525                worktree_id: ActiveValue::set(worktree_id),
2526                path: ActiveValue::set(summary.path.clone()),
2527                language_server_id: ActiveValue::set(summary.language_server_id as i64),
2528                error_count: ActiveValue::set(summary.error_count as i32),
2529                warning_count: ActiveValue::set(summary.warning_count as i32),
2530                ..Default::default()
2531            })
2532            .on_conflict(
2533                OnConflict::columns([
2534                    worktree_diagnostic_summary::Column::ProjectId,
2535                    worktree_diagnostic_summary::Column::WorktreeId,
2536                    worktree_diagnostic_summary::Column::Path,
2537                ])
2538                .update_columns([
2539                    worktree_diagnostic_summary::Column::LanguageServerId,
2540                    worktree_diagnostic_summary::Column::ErrorCount,
2541                    worktree_diagnostic_summary::Column::WarningCount,
2542                ])
2543                .to_owned(),
2544            )
2545            .exec(&*tx)
2546            .await?;
2547
2548            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2549            Ok(connection_ids)
2550        })
2551        .await
2552    }
2553
2554    pub async fn start_language_server(
2555        &self,
2556        update: &proto::StartLanguageServer,
2557        connection: ConnectionId,
2558    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2559        let project_id = ProjectId::from_proto(update.project_id);
2560        let room_id = self.room_id_for_project(project_id).await?;
2561        self.room_transaction(room_id, |tx| async move {
2562            let server = update
2563                .server
2564                .as_ref()
2565                .ok_or_else(|| anyhow!("invalid language server"))?;
2566
2567            // Ensure the update comes from the host.
2568            let project = project::Entity::find_by_id(project_id)
2569                .one(&*tx)
2570                .await?
2571                .ok_or_else(|| anyhow!("no such project"))?;
2572            if project.host_connection()? != connection {
2573                return Err(anyhow!("can't update a project hosted by someone else"))?;
2574            }
2575
2576            // Add the newly-started language server.
2577            language_server::Entity::insert(language_server::ActiveModel {
2578                project_id: ActiveValue::set(project_id),
2579                id: ActiveValue::set(server.id as i64),
2580                name: ActiveValue::set(server.name.clone()),
2581                ..Default::default()
2582            })
2583            .on_conflict(
2584                OnConflict::columns([
2585                    language_server::Column::ProjectId,
2586                    language_server::Column::Id,
2587                ])
2588                .update_column(language_server::Column::Name)
2589                .to_owned(),
2590            )
2591            .exec(&*tx)
2592            .await?;
2593
2594            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2595            Ok(connection_ids)
2596        })
2597        .await
2598    }
2599
2600    pub async fn update_worktree_settings(
2601        &self,
2602        update: &proto::UpdateWorktreeSettings,
2603        connection: ConnectionId,
2604    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2605        let project_id = ProjectId::from_proto(update.project_id);
2606        let room_id = self.room_id_for_project(project_id).await?;
2607        self.room_transaction(room_id, |tx| async move {
2608            // Ensure the update comes from the host.
2609            let project = project::Entity::find_by_id(project_id)
2610                .one(&*tx)
2611                .await?
2612                .ok_or_else(|| anyhow!("no such project"))?;
2613            if project.host_connection()? != connection {
2614                return Err(anyhow!("can't update a project hosted by someone else"))?;
2615            }
2616
2617            if let Some(content) = &update.content {
2618                worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
2619                    project_id: ActiveValue::Set(project_id),
2620                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
2621                    path: ActiveValue::Set(update.path.clone()),
2622                    content: ActiveValue::Set(content.clone()),
2623                })
2624                .on_conflict(
2625                    OnConflict::columns([
2626                        worktree_settings_file::Column::ProjectId,
2627                        worktree_settings_file::Column::WorktreeId,
2628                        worktree_settings_file::Column::Path,
2629                    ])
2630                    .update_column(worktree_settings_file::Column::Content)
2631                    .to_owned(),
2632                )
2633                .exec(&*tx)
2634                .await?;
2635            } else {
2636                worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
2637                    project_id: ActiveValue::Set(project_id),
2638                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
2639                    path: ActiveValue::Set(update.path.clone()),
2640                    ..Default::default()
2641                })
2642                .exec(&*tx)
2643                .await?;
2644            }
2645
2646            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2647            Ok(connection_ids)
2648        })
2649        .await
2650    }
2651
2652    pub async fn join_project(
2653        &self,
2654        project_id: ProjectId,
2655        connection: ConnectionId,
2656    ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2657        let room_id = self.room_id_for_project(project_id).await?;
2658        self.room_transaction(room_id, |tx| async move {
2659            let participant = room_participant::Entity::find()
2660                .filter(
2661                    Condition::all()
2662                        .add(
2663                            room_participant::Column::AnsweringConnectionId
2664                                .eq(connection.id as i32),
2665                        )
2666                        .add(
2667                            room_participant::Column::AnsweringConnectionServerId
2668                                .eq(connection.owner_id as i32),
2669                        ),
2670                )
2671                .one(&*tx)
2672                .await?
2673                .ok_or_else(|| anyhow!("must join a room first"))?;
2674
2675            let project = project::Entity::find_by_id(project_id)
2676                .one(&*tx)
2677                .await?
2678                .ok_or_else(|| anyhow!("no such project"))?;
2679            if project.room_id != participant.room_id {
2680                return Err(anyhow!("no such project"))?;
2681            }
2682
2683            let mut collaborators = project
2684                .find_related(project_collaborator::Entity)
2685                .all(&*tx)
2686                .await?;
2687            let replica_ids = collaborators
2688                .iter()
2689                .map(|c| c.replica_id)
2690                .collect::<HashSet<_>>();
2691            let mut replica_id = ReplicaId(1);
2692            while replica_ids.contains(&replica_id) {
2693                replica_id.0 += 1;
2694            }
2695            let new_collaborator = project_collaborator::ActiveModel {
2696                project_id: ActiveValue::set(project_id),
2697                connection_id: ActiveValue::set(connection.id as i32),
2698                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2699                user_id: ActiveValue::set(participant.user_id),
2700                replica_id: ActiveValue::set(replica_id),
2701                is_host: ActiveValue::set(false),
2702                ..Default::default()
2703            }
2704            .insert(&*tx)
2705            .await?;
2706            collaborators.push(new_collaborator);
2707
2708            let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2709            let mut worktrees = db_worktrees
2710                .into_iter()
2711                .map(|db_worktree| {
2712                    (
2713                        db_worktree.id as u64,
2714                        Worktree {
2715                            id: db_worktree.id as u64,
2716                            abs_path: db_worktree.abs_path,
2717                            root_name: db_worktree.root_name,
2718                            visible: db_worktree.visible,
2719                            entries: Default::default(),
2720                            repository_entries: Default::default(),
2721                            diagnostic_summaries: Default::default(),
2722                            settings_files: Default::default(),
2723                            scan_id: db_worktree.scan_id as u64,
2724                            completed_scan_id: db_worktree.completed_scan_id as u64,
2725                        },
2726                    )
2727                })
2728                .collect::<BTreeMap<_, _>>();
2729
2730            // Populate worktree entries.
2731            {
2732                let mut db_entries = worktree_entry::Entity::find()
2733                    .filter(
2734                        Condition::all()
2735                            .add(worktree_entry::Column::ProjectId.eq(project_id))
2736                            .add(worktree_entry::Column::IsDeleted.eq(false)),
2737                    )
2738                    .stream(&*tx)
2739                    .await?;
2740                while let Some(db_entry) = db_entries.next().await {
2741                    let db_entry = db_entry?;
2742                    if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2743                        worktree.entries.push(proto::Entry {
2744                            id: db_entry.id as u64,
2745                            is_dir: db_entry.is_dir,
2746                            path: db_entry.path,
2747                            inode: db_entry.inode as u64,
2748                            mtime: Some(proto::Timestamp {
2749                                seconds: db_entry.mtime_seconds as u64,
2750                                nanos: db_entry.mtime_nanos as u32,
2751                            }),
2752                            is_symlink: db_entry.is_symlink,
2753                            is_ignored: db_entry.is_ignored,
2754                            is_external: db_entry.is_external,
2755                            git_status: db_entry.git_status.map(|status| status as i32),
2756                        });
2757                    }
2758                }
2759            }
2760
2761            // Populate repository entries.
2762            {
2763                let mut db_repository_entries = worktree_repository::Entity::find()
2764                    .filter(
2765                        Condition::all()
2766                            .add(worktree_repository::Column::ProjectId.eq(project_id))
2767                            .add(worktree_repository::Column::IsDeleted.eq(false)),
2768                    )
2769                    .stream(&*tx)
2770                    .await?;
2771                while let Some(db_repository_entry) = db_repository_entries.next().await {
2772                    let db_repository_entry = db_repository_entry?;
2773                    if let Some(worktree) =
2774                        worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
2775                    {
2776                        worktree.repository_entries.insert(
2777                            db_repository_entry.work_directory_id as u64,
2778                            proto::RepositoryEntry {
2779                                work_directory_id: db_repository_entry.work_directory_id as u64,
2780                                branch: db_repository_entry.branch,
2781                            },
2782                        );
2783                    }
2784                }
2785            }
2786
2787            // Populate worktree diagnostic summaries.
2788            {
2789                let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2790                    .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2791                    .stream(&*tx)
2792                    .await?;
2793                while let Some(db_summary) = db_summaries.next().await {
2794                    let db_summary = db_summary?;
2795                    if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2796                        worktree
2797                            .diagnostic_summaries
2798                            .push(proto::DiagnosticSummary {
2799                                path: db_summary.path,
2800                                language_server_id: db_summary.language_server_id as u64,
2801                                error_count: db_summary.error_count as u32,
2802                                warning_count: db_summary.warning_count as u32,
2803                            });
2804                    }
2805                }
2806            }
2807
2808            // Populate worktree settings files
2809            {
2810                let mut db_settings_files = worktree_settings_file::Entity::find()
2811                    .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
2812                    .stream(&*tx)
2813                    .await?;
2814                while let Some(db_settings_file) = db_settings_files.next().await {
2815                    let db_settings_file = db_settings_file?;
2816                    if let Some(worktree) =
2817                        worktrees.get_mut(&(db_settings_file.worktree_id as u64))
2818                    {
2819                        worktree.settings_files.push(WorktreeSettingsFile {
2820                            path: db_settings_file.path,
2821                            content: db_settings_file.content,
2822                        });
2823                    }
2824                }
2825            }
2826
2827            // Populate language servers.
2828            let language_servers = project
2829                .find_related(language_server::Entity)
2830                .all(&*tx)
2831                .await?;
2832
2833            let project = Project {
2834                collaborators: collaborators
2835                    .into_iter()
2836                    .map(|collaborator| ProjectCollaborator {
2837                        connection_id: collaborator.connection(),
2838                        user_id: collaborator.user_id,
2839                        replica_id: collaborator.replica_id,
2840                        is_host: collaborator.is_host,
2841                    })
2842                    .collect(),
2843                worktrees,
2844                language_servers: language_servers
2845                    .into_iter()
2846                    .map(|language_server| proto::LanguageServer {
2847                        id: language_server.id as u64,
2848                        name: language_server.name,
2849                    })
2850                    .collect(),
2851            };
2852            Ok((project, replica_id as ReplicaId))
2853        })
2854        .await
2855    }
2856
2857    pub async fn leave_project(
2858        &self,
2859        project_id: ProjectId,
2860        connection: ConnectionId,
2861    ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
2862        let room_id = self.room_id_for_project(project_id).await?;
2863        self.room_transaction(room_id, |tx| async move {
2864            let result = project_collaborator::Entity::delete_many()
2865                .filter(
2866                    Condition::all()
2867                        .add(project_collaborator::Column::ProjectId.eq(project_id))
2868                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2869                        .add(
2870                            project_collaborator::Column::ConnectionServerId
2871                                .eq(connection.owner_id as i32),
2872                        ),
2873                )
2874                .exec(&*tx)
2875                .await?;
2876            if result.rows_affected == 0 {
2877                Err(anyhow!("not a collaborator on this project"))?;
2878            }
2879
2880            let project = project::Entity::find_by_id(project_id)
2881                .one(&*tx)
2882                .await?
2883                .ok_or_else(|| anyhow!("no such project"))?;
2884            let collaborators = project
2885                .find_related(project_collaborator::Entity)
2886                .all(&*tx)
2887                .await?;
2888            let connection_ids = collaborators
2889                .into_iter()
2890                .map(|collaborator| collaborator.connection())
2891                .collect();
2892
2893            follower::Entity::delete_many()
2894                .filter(
2895                    Condition::any()
2896                        .add(
2897                            Condition::all()
2898                                .add(follower::Column::ProjectId.eq(project_id))
2899                                .add(
2900                                    follower::Column::LeaderConnectionServerId
2901                                        .eq(connection.owner_id),
2902                                )
2903                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
2904                        )
2905                        .add(
2906                            Condition::all()
2907                                .add(follower::Column::ProjectId.eq(project_id))
2908                                .add(
2909                                    follower::Column::FollowerConnectionServerId
2910                                        .eq(connection.owner_id),
2911                                )
2912                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
2913                        ),
2914                )
2915                .exec(&*tx)
2916                .await?;
2917
2918            let room = self.get_room(project.room_id, &tx).await?;
2919            let left_project = LeftProject {
2920                id: project_id,
2921                host_user_id: project.host_user_id,
2922                host_connection_id: project.host_connection()?,
2923                connection_ids,
2924            };
2925            Ok((room, left_project))
2926        })
2927        .await
2928    }
2929
2930    pub async fn project_collaborators(
2931        &self,
2932        project_id: ProjectId,
2933        connection_id: ConnectionId,
2934    ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
2935        let room_id = self.room_id_for_project(project_id).await?;
2936        self.room_transaction(room_id, |tx| async move {
2937            let collaborators = project_collaborator::Entity::find()
2938                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2939                .all(&*tx)
2940                .await?
2941                .into_iter()
2942                .map(|collaborator| ProjectCollaborator {
2943                    connection_id: collaborator.connection(),
2944                    user_id: collaborator.user_id,
2945                    replica_id: collaborator.replica_id,
2946                    is_host: collaborator.is_host,
2947                })
2948                .collect::<Vec<_>>();
2949
2950            if collaborators
2951                .iter()
2952                .any(|collaborator| collaborator.connection_id == connection_id)
2953            {
2954                Ok(collaborators)
2955            } else {
2956                Err(anyhow!("no such project"))?
2957            }
2958        })
2959        .await
2960    }
2961
2962    pub async fn project_connection_ids(
2963        &self,
2964        project_id: ProjectId,
2965        connection_id: ConnectionId,
2966    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
2967        let room_id = self.room_id_for_project(project_id).await?;
2968        self.room_transaction(room_id, |tx| async move {
2969            let mut collaborators = project_collaborator::Entity::find()
2970                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2971                .stream(&*tx)
2972                .await?;
2973
2974            let mut connection_ids = HashSet::default();
2975            while let Some(collaborator) = collaborators.next().await {
2976                let collaborator = collaborator?;
2977                connection_ids.insert(collaborator.connection());
2978            }
2979
2980            if connection_ids.contains(&connection_id) {
2981                Ok(connection_ids)
2982            } else {
2983                Err(anyhow!("no such project"))?
2984            }
2985        })
2986        .await
2987    }
2988
2989    async fn project_guest_connection_ids(
2990        &self,
2991        project_id: ProjectId,
2992        tx: &DatabaseTransaction,
2993    ) -> Result<Vec<ConnectionId>> {
2994        let mut collaborators = project_collaborator::Entity::find()
2995            .filter(
2996                project_collaborator::Column::ProjectId
2997                    .eq(project_id)
2998                    .and(project_collaborator::Column::IsHost.eq(false)),
2999            )
3000            .stream(tx)
3001            .await?;
3002
3003        let mut guest_connection_ids = Vec::new();
3004        while let Some(collaborator) = collaborators.next().await {
3005            let collaborator = collaborator?;
3006            guest_connection_ids.push(collaborator.connection());
3007        }
3008        Ok(guest_connection_ids)
3009    }
3010
3011    async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
3012        self.transaction(|tx| async move {
3013            let project = project::Entity::find_by_id(project_id)
3014                .one(&*tx)
3015                .await?
3016                .ok_or_else(|| anyhow!("project {} not found", project_id))?;
3017            Ok(project.room_id)
3018        })
3019        .await
3020    }
3021
3022    // access tokens
3023
3024    pub async fn create_access_token(
3025        &self,
3026        user_id: UserId,
3027        access_token_hash: &str,
3028        max_access_token_count: usize,
3029    ) -> Result<AccessTokenId> {
3030        self.transaction(|tx| async {
3031            let tx = tx;
3032
3033            let token = access_token::ActiveModel {
3034                user_id: ActiveValue::set(user_id),
3035                hash: ActiveValue::set(access_token_hash.into()),
3036                ..Default::default()
3037            }
3038            .insert(&*tx)
3039            .await?;
3040
3041            access_token::Entity::delete_many()
3042                .filter(
3043                    access_token::Column::Id.in_subquery(
3044                        Query::select()
3045                            .column(access_token::Column::Id)
3046                            .from(access_token::Entity)
3047                            .and_where(access_token::Column::UserId.eq(user_id))
3048                            .order_by(access_token::Column::Id, sea_orm::Order::Desc)
3049                            .limit(10000)
3050                            .offset(max_access_token_count as u64)
3051                            .to_owned(),
3052                    ),
3053                )
3054                .exec(&*tx)
3055                .await?;
3056            Ok(token.id)
3057        })
3058        .await
3059    }
3060
3061    pub async fn get_access_token(
3062        &self,
3063        access_token_id: AccessTokenId,
3064    ) -> Result<access_token::Model> {
3065        self.transaction(|tx| async move {
3066            Ok(access_token::Entity::find_by_id(access_token_id)
3067                .one(&*tx)
3068                .await?
3069                .ok_or_else(|| anyhow!("no such access token"))?)
3070        })
3071        .await
3072    }
3073
3074    // channels
3075
3076    pub async fn create_root_channel(
3077        &self,
3078        name: &str,
3079        live_kit_room: &str,
3080        creator_id: UserId,
3081    ) -> Result<ChannelId> {
3082        self.create_channel(name, None, live_kit_room, creator_id)
3083            .await
3084    }
3085
3086    pub async fn create_channel(
3087        &self,
3088        name: &str,
3089        parent: Option<ChannelId>,
3090        live_kit_room: &str,
3091        creator_id: UserId,
3092    ) -> Result<ChannelId> {
3093        self.transaction(move |tx| async move {
3094            let tx = tx;
3095
3096            if let Some(parent) = parent {
3097                let channels = self.get_channel_ancestors(parent, &*tx).await?;
3098                channel_member::Entity::find()
3099                    .filter(channel_member::Column::ChannelId.is_in(channels.iter().copied()))
3100                    .filter(
3101                        channel_member::Column::UserId
3102                            .eq(creator_id)
3103                            .and(channel_member::Column::Accepted.eq(true)),
3104                    )
3105                    .one(&*tx)
3106                    .await?
3107                    .ok_or_else(|| {
3108                        anyhow!("User does not have the permissions to create this channel")
3109                    })?;
3110            }
3111
3112            let channel = channel::ActiveModel {
3113                name: ActiveValue::Set(name.to_string()),
3114                ..Default::default()
3115            };
3116
3117            let channel = channel.insert(&*tx).await?;
3118
3119            if let Some(parent) = parent {
3120                channel_parent::ActiveModel {
3121                    child_id: ActiveValue::Set(channel.id),
3122                    parent_id: ActiveValue::Set(parent),
3123                }
3124                .insert(&*tx)
3125                .await?;
3126            }
3127
3128            channel_member::ActiveModel {
3129                channel_id: ActiveValue::Set(channel.id),
3130                user_id: ActiveValue::Set(creator_id),
3131                accepted: ActiveValue::Set(true),
3132                admin: ActiveValue::Set(true),
3133                ..Default::default()
3134            }
3135            .insert(&*tx)
3136            .await?;
3137
3138            room::ActiveModel {
3139                channel_id: ActiveValue::Set(Some(channel.id)),
3140                live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
3141                ..Default::default()
3142            }
3143            .insert(&*tx)
3144            .await?;
3145
3146            Ok(channel.id)
3147        })
3148        .await
3149    }
3150
3151    pub async fn remove_channel(
3152        &self,
3153        channel_id: ChannelId,
3154        user_id: UserId,
3155    ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
3156        self.transaction(move |tx| async move {
3157            let tx = tx;
3158
3159            // Check if user is an admin
3160            channel_member::Entity::find()
3161                .filter(
3162                    channel_member::Column::ChannelId
3163                        .eq(channel_id)
3164                        .and(channel_member::Column::UserId.eq(user_id))
3165                        .and(channel_member::Column::Admin.eq(true)),
3166                )
3167                .one(&*tx)
3168                .await?
3169                .ok_or_else(|| anyhow!("user is not allowed to remove this channel"))?;
3170
3171            let mut descendants = self.get_channel_descendants([channel_id], &*tx).await?;
3172
3173            // Keep channels which have another active
3174            let mut channels_to_keep = channel_parent::Entity::find()
3175                .filter(
3176                    channel_parent::Column::ChildId
3177                        .is_in(descendants.keys().copied().filter(|&id| id != channel_id))
3178                        .and(
3179                            channel_parent::Column::ParentId.is_not_in(descendants.keys().copied()),
3180                        ),
3181                )
3182                .stream(&*tx)
3183                .await?;
3184
3185            while let Some(row) = channels_to_keep.next().await {
3186                let row = row?;
3187                descendants.remove(&row.child_id);
3188            }
3189
3190            drop(channels_to_keep);
3191
3192            let channels_to_remove = descendants.keys().copied().collect::<Vec<_>>();
3193
3194            let members_to_notify: Vec<UserId> = channel_member::Entity::find()
3195                .filter(channel_member::Column::ChannelId.is_in(channels_to_remove.iter().copied()))
3196                .select_only()
3197                .column(channel_member::Column::UserId)
3198                .distinct()
3199                .into_values::<_, QueryUserIds>()
3200                .all(&*tx)
3201                .await?;
3202
3203            // Channel members and parents should delete via cascade
3204            channel::Entity::delete_many()
3205                .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
3206                .exec(&*tx)
3207                .await?;
3208
3209            Ok((channels_to_remove, members_to_notify))
3210        })
3211        .await
3212    }
3213
3214    pub async fn invite_channel_member(
3215        &self,
3216        channel_id: ChannelId,
3217        invitee_id: UserId,
3218        inviter_id: UserId,
3219        is_admin: bool,
3220    ) -> Result<()> {
3221        self.transaction(move |tx| async move {
3222            let tx = tx;
3223
3224            // Check if inviter is a member
3225            channel_member::Entity::find()
3226                .filter(
3227                    channel_member::Column::ChannelId
3228                        .eq(channel_id)
3229                        .and(channel_member::Column::UserId.eq(inviter_id))
3230                        .and(channel_member::Column::Admin.eq(true)),
3231                )
3232                .one(&*tx)
3233                .await?
3234                .ok_or_else(|| {
3235                    anyhow!("Inviter does not have permissions to invite the invitee")
3236                })?;
3237
3238            let channel_membership = channel_member::ActiveModel {
3239                channel_id: ActiveValue::Set(channel_id),
3240                user_id: ActiveValue::Set(invitee_id),
3241                accepted: ActiveValue::Set(false),
3242                admin: ActiveValue::Set(is_admin),
3243                ..Default::default()
3244            };
3245
3246            channel_membership.insert(&*tx).await?;
3247
3248            Ok(())
3249        })
3250        .await
3251    }
3252
3253    pub async fn respond_to_channel_invite(
3254        &self,
3255        channel_id: ChannelId,
3256        user_id: UserId,
3257        accept: bool,
3258    ) -> Result<()> {
3259        self.transaction(move |tx| async move {
3260            let tx = tx;
3261
3262            let rows_affected = if accept {
3263                channel_member::Entity::update_many()
3264                    .set(channel_member::ActiveModel {
3265                        accepted: ActiveValue::Set(accept),
3266                        ..Default::default()
3267                    })
3268                    .filter(
3269                        channel_member::Column::ChannelId
3270                            .eq(channel_id)
3271                            .and(channel_member::Column::UserId.eq(user_id))
3272                            .and(channel_member::Column::Accepted.eq(false)),
3273                    )
3274                    .exec(&*tx)
3275                    .await?
3276                    .rows_affected
3277            } else {
3278                channel_member::ActiveModel {
3279                    channel_id: ActiveValue::Unchanged(channel_id),
3280                    user_id: ActiveValue::Unchanged(user_id),
3281                    ..Default::default()
3282                }
3283                .delete(&*tx)
3284                .await?
3285                .rows_affected
3286            };
3287
3288            if rows_affected == 0 {
3289                Err(anyhow!("no such invitation"))?;
3290            }
3291
3292            Ok(())
3293        })
3294        .await
3295    }
3296
3297    pub async fn get_channel_invites(&self, user_id: UserId) -> Result<Vec<Channel>> {
3298        self.transaction(|tx| async move {
3299            let tx = tx;
3300
3301            let channel_invites = channel_member::Entity::find()
3302                .filter(
3303                    channel_member::Column::UserId
3304                        .eq(user_id)
3305                        .and(channel_member::Column::Accepted.eq(false)),
3306                )
3307                .all(&*tx)
3308                .await?;
3309
3310            let channels = channel::Entity::find()
3311                .filter(
3312                    channel::Column::Id.is_in(
3313                        channel_invites
3314                            .into_iter()
3315                            .map(|channel_member| channel_member.channel_id),
3316                    ),
3317                )
3318                .all(&*tx)
3319                .await?;
3320
3321            let channels = channels
3322                .into_iter()
3323                .map(|channel| Channel {
3324                    id: channel.id,
3325                    name: channel.name,
3326                    parent_id: None,
3327                })
3328                .collect();
3329
3330            Ok(channels)
3331        })
3332        .await
3333    }
3334
3335    pub async fn get_channels(&self, user_id: UserId) -> Result<Vec<Channel>> {
3336        self.transaction(|tx| async move {
3337            let tx = tx;
3338
3339            let starting_channel_ids: Vec<ChannelId> = channel_member::Entity::find()
3340                .filter(
3341                    channel_member::Column::UserId
3342                        .eq(user_id)
3343                        .and(channel_member::Column::Accepted.eq(true)),
3344                )
3345                .select_only()
3346                .column(channel_member::Column::ChannelId)
3347                .into_values::<_, QueryChannelIds>()
3348                .all(&*tx)
3349                .await?;
3350
3351            let parents_by_child_id = self
3352                .get_channel_descendants(starting_channel_ids, &*tx)
3353                .await?;
3354
3355            let mut channels = Vec::with_capacity(parents_by_child_id.len());
3356            let mut rows = channel::Entity::find()
3357                .filter(channel::Column::Id.is_in(parents_by_child_id.keys().copied()))
3358                .stream(&*tx)
3359                .await?;
3360
3361            while let Some(row) = rows.next().await {
3362                let row = row?;
3363                channels.push(Channel {
3364                    id: row.id,
3365                    name: row.name,
3366                    parent_id: parents_by_child_id.get(&row.id).copied().flatten(),
3367                });
3368            }
3369
3370            drop(rows);
3371
3372            Ok(channels)
3373        })
3374        .await
3375    }
3376
3377    pub async fn get_channel_members(&self, id: ChannelId) -> Result<Vec<UserId>> {
3378        self.transaction(|tx| async move {
3379            let tx = tx;
3380            let ancestor_ids = self.get_channel_ancestors(id, &*tx).await?;
3381            let user_ids = channel_member::Entity::find()
3382                .distinct()
3383                .filter(channel_member::Column::ChannelId.is_in(ancestor_ids.iter().copied()))
3384                .select_only()
3385                .column(channel_member::Column::UserId)
3386                .into_values::<_, QueryUserIds>()
3387                .all(&*tx)
3388                .await?;
3389            Ok(user_ids)
3390        })
3391        .await
3392    }
3393
3394    async fn get_channel_ancestors(
3395        &self,
3396        id: ChannelId,
3397        tx: &DatabaseTransaction,
3398    ) -> Result<Vec<ChannelId>> {
3399        let sql = format!(
3400            r#"
3401            WITH RECURSIVE channel_tree(child_id, parent_id) AS (
3402                    SELECT CAST(NULL as INTEGER) as child_id, root_ids.column1 as parent_id
3403                    FROM (VALUES ({})) as root_ids
3404                UNION
3405                    SELECT channel_parents.child_id, channel_parents.parent_id
3406                    FROM channel_parents, channel_tree
3407                    WHERE channel_parents.child_id = channel_tree.parent_id
3408            )
3409            SELECT DISTINCT channel_tree.parent_id
3410            FROM channel_tree
3411            "#,
3412            id
3413        );
3414
3415        #[derive(FromQueryResult, Debug, PartialEq)]
3416        pub struct ChannelParent {
3417            pub parent_id: ChannelId,
3418        }
3419
3420        let stmt = Statement::from_string(self.pool.get_database_backend(), sql);
3421
3422        let mut channel_ids_stream = channel_parent::Entity::find()
3423            .from_raw_sql(stmt)
3424            .into_model::<ChannelParent>()
3425            .stream(&*tx)
3426            .await?;
3427
3428        let mut channel_ids = vec![];
3429        while let Some(channel_id) = channel_ids_stream.next().await {
3430            channel_ids.push(channel_id?.parent_id);
3431        }
3432
3433        Ok(channel_ids)
3434    }
3435
3436    async fn get_channel_descendants(
3437        &self,
3438        channel_ids: impl IntoIterator<Item = ChannelId>,
3439        tx: &DatabaseTransaction,
3440    ) -> Result<HashMap<ChannelId, Option<ChannelId>>> {
3441        let mut values = String::new();
3442        for id in channel_ids {
3443            if !values.is_empty() {
3444                values.push_str(", ");
3445            }
3446            write!(&mut values, "({})", id).unwrap();
3447        }
3448
3449        if values.is_empty() {
3450            return Ok(HashMap::default());
3451        }
3452
3453        let sql = format!(
3454            r#"
3455            WITH RECURSIVE channel_tree(child_id, parent_id) AS (
3456                    SELECT root_ids.column1 as child_id, CAST(NULL as INTEGER) as parent_id
3457                    FROM (VALUES {}) as root_ids
3458                UNION
3459                    SELECT channel_parents.child_id, channel_parents.parent_id
3460                    FROM channel_parents, channel_tree
3461                    WHERE channel_parents.parent_id = channel_tree.child_id
3462            )
3463            SELECT channel_tree.child_id, channel_tree.parent_id
3464            FROM channel_tree
3465            ORDER BY child_id, parent_id IS NOT NULL
3466            "#,
3467            values
3468        );
3469
3470        #[derive(FromQueryResult, Debug, PartialEq)]
3471        pub struct ChannelParent {
3472            pub child_id: ChannelId,
3473            pub parent_id: Option<ChannelId>,
3474        }
3475
3476        let stmt = Statement::from_string(self.pool.get_database_backend(), sql);
3477
3478        let mut parents_by_child_id = HashMap::default();
3479        let mut parents = channel_parent::Entity::find()
3480            .from_raw_sql(stmt)
3481            .into_model::<ChannelParent>()
3482            .stream(tx)
3483            .await?;
3484
3485        while let Some(parent) = parents.next().await {
3486            let parent = parent?;
3487            parents_by_child_id.insert(parent.child_id, parent.parent_id);
3488        }
3489
3490        Ok(parents_by_child_id)
3491    }
3492
3493    pub async fn get_channel(&self, channel_id: ChannelId) -> Result<Option<Channel>> {
3494        self.transaction(|tx| async move {
3495            let tx = tx;
3496            let channel = channel::Entity::find_by_id(channel_id).one(&*tx).await?;
3497
3498            Ok(channel.map(|channel| Channel {
3499                id: channel.id,
3500                name: channel.name,
3501                parent_id: None,
3502            }))
3503        })
3504        .await
3505    }
3506
3507    pub async fn get_channel_room(&self, channel_id: ChannelId) -> Result<RoomId> {
3508        self.transaction(|tx| async move {
3509            let tx = tx;
3510            let room = channel::Model {
3511                id: channel_id,
3512                ..Default::default()
3513            }
3514            .find_related(room::Entity)
3515            .one(&*tx)
3516            .await?
3517            .ok_or_else(|| anyhow!("invalid channel"))?;
3518            Ok(room.id)
3519        })
3520        .await
3521    }
3522
3523    async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
3524    where
3525        F: Send + Fn(TransactionHandle) -> Fut,
3526        Fut: Send + Future<Output = Result<T>>,
3527    {
3528        let body = async {
3529            let mut i = 0;
3530            loop {
3531                let (tx, result) = self.with_transaction(&f).await?;
3532                match result {
3533                    Ok(result) => match tx.commit().await.map_err(Into::into) {
3534                        Ok(()) => return Ok(result),
3535                        Err(error) => {
3536                            if !self.retry_on_serialization_error(&error, i).await {
3537                                return Err(error);
3538                            }
3539                        }
3540                    },
3541                    Err(error) => {
3542                        tx.rollback().await?;
3543                        if !self.retry_on_serialization_error(&error, i).await {
3544                            return Err(error);
3545                        }
3546                    }
3547                }
3548                i += 1;
3549            }
3550        };
3551
3552        self.run(body).await
3553    }
3554
3555    async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
3556    where
3557        F: Send + Fn(TransactionHandle) -> Fut,
3558        Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
3559    {
3560        let body = async {
3561            let mut i = 0;
3562            loop {
3563                let (tx, result) = self.with_transaction(&f).await?;
3564                match result {
3565                    Ok(Some((room_id, data))) => {
3566                        let lock = self.rooms.entry(room_id).or_default().clone();
3567                        let _guard = lock.lock_owned().await;
3568                        match tx.commit().await.map_err(Into::into) {
3569                            Ok(()) => {
3570                                return Ok(Some(RoomGuard {
3571                                    data,
3572                                    _guard,
3573                                    _not_send: PhantomData,
3574                                }));
3575                            }
3576                            Err(error) => {
3577                                if !self.retry_on_serialization_error(&error, i).await {
3578                                    return Err(error);
3579                                }
3580                            }
3581                        }
3582                    }
3583                    Ok(None) => match tx.commit().await.map_err(Into::into) {
3584                        Ok(()) => return Ok(None),
3585                        Err(error) => {
3586                            if !self.retry_on_serialization_error(&error, i).await {
3587                                return Err(error);
3588                            }
3589                        }
3590                    },
3591                    Err(error) => {
3592                        tx.rollback().await?;
3593                        if !self.retry_on_serialization_error(&error, i).await {
3594                            return Err(error);
3595                        }
3596                    }
3597                }
3598                i += 1;
3599            }
3600        };
3601
3602        self.run(body).await
3603    }
3604
3605    async fn room_transaction<F, Fut, T>(&self, room_id: RoomId, f: F) -> Result<RoomGuard<T>>
3606    where
3607        F: Send + Fn(TransactionHandle) -> Fut,
3608        Fut: Send + Future<Output = Result<T>>,
3609    {
3610        let body = async {
3611            let mut i = 0;
3612            loop {
3613                let lock = self.rooms.entry(room_id).or_default().clone();
3614                let _guard = lock.lock_owned().await;
3615                let (tx, result) = self.with_transaction(&f).await?;
3616                match result {
3617                    Ok(data) => match tx.commit().await.map_err(Into::into) {
3618                        Ok(()) => {
3619                            return Ok(RoomGuard {
3620                                data,
3621                                _guard,
3622                                _not_send: PhantomData,
3623                            });
3624                        }
3625                        Err(error) => {
3626                            if !self.retry_on_serialization_error(&error, i).await {
3627                                return Err(error);
3628                            }
3629                        }
3630                    },
3631                    Err(error) => {
3632                        tx.rollback().await?;
3633                        if !self.retry_on_serialization_error(&error, i).await {
3634                            return Err(error);
3635                        }
3636                    }
3637                }
3638                i += 1;
3639            }
3640        };
3641
3642        self.run(body).await
3643    }
3644
3645    async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
3646    where
3647        F: Send + Fn(TransactionHandle) -> Fut,
3648        Fut: Send + Future<Output = Result<T>>,
3649    {
3650        let tx = self
3651            .pool
3652            .begin_with_config(Some(IsolationLevel::Serializable), None)
3653            .await?;
3654
3655        let mut tx = Arc::new(Some(tx));
3656        let result = f(TransactionHandle(tx.clone())).await;
3657        let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
3658            return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
3659        };
3660
3661        Ok((tx, result))
3662    }
3663
3664    async fn run<F, T>(&self, future: F) -> Result<T>
3665    where
3666        F: Future<Output = Result<T>>,
3667    {
3668        #[cfg(test)]
3669        {
3670            if let Executor::Deterministic(executor) = &self.executor {
3671                executor.simulate_random_delay().await;
3672            }
3673
3674            self.runtime.as_ref().unwrap().block_on(future)
3675        }
3676
3677        #[cfg(not(test))]
3678        {
3679            future.await
3680        }
3681    }
3682
3683    async fn retry_on_serialization_error(&self, error: &Error, prev_attempt_count: u32) -> bool {
3684        // If the error is due to a failure to serialize concurrent transactions, then retry
3685        // this transaction after a delay. With each subsequent retry, double the delay duration.
3686        // Also vary the delay randomly in order to ensure different database connections retry
3687        // at different times.
3688        if is_serialization_error(error) {
3689            let base_delay = 4_u64 << prev_attempt_count.min(16);
3690            let randomized_delay = base_delay as f32 * self.rng.lock().await.gen_range(0.5..=2.0);
3691            log::info!(
3692                "retrying transaction after serialization error. delay: {} ms.",
3693                randomized_delay
3694            );
3695            self.executor
3696                .sleep(Duration::from_millis(randomized_delay as u64))
3697                .await;
3698            true
3699        } else {
3700            false
3701        }
3702    }
3703}
3704
3705fn is_serialization_error(error: &Error) -> bool {
3706    const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
3707    match error {
3708        Error::Database(
3709            DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
3710            | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
3711        ) if error
3712            .as_database_error()
3713            .and_then(|error| error.code())
3714            .as_deref()
3715            == Some(SERIALIZATION_FAILURE_CODE) =>
3716        {
3717            true
3718        }
3719        _ => false,
3720    }
3721}
3722
3723struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
3724
3725impl Deref for TransactionHandle {
3726    type Target = DatabaseTransaction;
3727
3728    fn deref(&self) -> &Self::Target {
3729        self.0.as_ref().as_ref().unwrap()
3730    }
3731}
3732
3733pub struct RoomGuard<T> {
3734    data: T,
3735    _guard: OwnedMutexGuard<()>,
3736    _not_send: PhantomData<Rc<()>>,
3737}
3738
3739impl<T> Deref for RoomGuard<T> {
3740    type Target = T;
3741
3742    fn deref(&self) -> &T {
3743        &self.data
3744    }
3745}
3746
3747impl<T> DerefMut for RoomGuard<T> {
3748    fn deref_mut(&mut self) -> &mut T {
3749        &mut self.data
3750    }
3751}
3752
3753#[derive(Debug, Serialize, Deserialize)]
3754pub struct NewUserParams {
3755    pub github_login: String,
3756    pub github_user_id: i32,
3757    pub invite_count: i32,
3758}
3759
3760#[derive(Debug)]
3761pub struct NewUserResult {
3762    pub user_id: UserId,
3763    pub metrics_id: String,
3764    pub inviting_user_id: Option<UserId>,
3765    pub signup_device_id: Option<String>,
3766}
3767
3768#[derive(FromQueryResult, Debug, PartialEq)]
3769pub struct Channel {
3770    pub id: ChannelId,
3771    pub name: String,
3772    pub parent_id: Option<ChannelId>,
3773}
3774
3775fn random_invite_code() -> String {
3776    nanoid::nanoid!(16)
3777}
3778
3779fn random_email_confirmation_code() -> String {
3780    nanoid::nanoid!(64)
3781}
3782
3783macro_rules! id_type {
3784    ($name:ident) => {
3785        #[derive(
3786            Clone,
3787            Copy,
3788            Debug,
3789            Default,
3790            PartialEq,
3791            Eq,
3792            PartialOrd,
3793            Ord,
3794            Hash,
3795            Serialize,
3796            Deserialize,
3797        )]
3798        #[serde(transparent)]
3799        pub struct $name(pub i32);
3800
3801        impl $name {
3802            #[allow(unused)]
3803            pub const MAX: Self = Self(i32::MAX);
3804
3805            #[allow(unused)]
3806            pub fn from_proto(value: u64) -> Self {
3807                Self(value as i32)
3808            }
3809
3810            #[allow(unused)]
3811            pub fn to_proto(self) -> u64 {
3812                self.0 as u64
3813            }
3814        }
3815
3816        impl std::fmt::Display for $name {
3817            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
3818                self.0.fmt(f)
3819            }
3820        }
3821
3822        impl From<$name> for sea_query::Value {
3823            fn from(value: $name) -> Self {
3824                sea_query::Value::Int(Some(value.0))
3825            }
3826        }
3827
3828        impl sea_orm::TryGetable for $name {
3829            fn try_get(
3830                res: &sea_orm::QueryResult,
3831                pre: &str,
3832                col: &str,
3833            ) -> Result<Self, sea_orm::TryGetError> {
3834                Ok(Self(i32::try_get(res, pre, col)?))
3835            }
3836        }
3837
3838        impl sea_query::ValueType for $name {
3839            fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
3840                match v {
3841                    Value::TinyInt(Some(int)) => {
3842                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3843                    }
3844                    Value::SmallInt(Some(int)) => {
3845                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3846                    }
3847                    Value::Int(Some(int)) => {
3848                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3849                    }
3850                    Value::BigInt(Some(int)) => {
3851                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3852                    }
3853                    Value::TinyUnsigned(Some(int)) => {
3854                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3855                    }
3856                    Value::SmallUnsigned(Some(int)) => {
3857                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3858                    }
3859                    Value::Unsigned(Some(int)) => {
3860                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3861                    }
3862                    Value::BigUnsigned(Some(int)) => {
3863                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3864                    }
3865                    _ => Err(sea_query::ValueTypeErr),
3866                }
3867            }
3868
3869            fn type_name() -> String {
3870                stringify!($name).into()
3871            }
3872
3873            fn array_type() -> sea_query::ArrayType {
3874                sea_query::ArrayType::Int
3875            }
3876
3877            fn column_type() -> sea_query::ColumnType {
3878                sea_query::ColumnType::Integer(None)
3879            }
3880        }
3881
3882        impl sea_orm::TryFromU64 for $name {
3883            fn try_from_u64(n: u64) -> Result<Self, DbErr> {
3884                Ok(Self(n.try_into().map_err(|_| {
3885                    DbErr::ConvertFromU64(concat!(
3886                        "error converting ",
3887                        stringify!($name),
3888                        " to u64"
3889                    ))
3890                })?))
3891            }
3892        }
3893
3894        impl sea_query::Nullable for $name {
3895            fn null() -> Value {
3896                Value::Int(None)
3897            }
3898        }
3899    };
3900}
3901
3902id_type!(AccessTokenId);
3903id_type!(ChannelId);
3904id_type!(ChannelMemberId);
3905id_type!(ContactId);
3906id_type!(FollowerId);
3907id_type!(RoomId);
3908id_type!(RoomParticipantId);
3909id_type!(ProjectId);
3910id_type!(ProjectCollaboratorId);
3911id_type!(ReplicaId);
3912id_type!(ServerId);
3913id_type!(SignupId);
3914id_type!(UserId);
3915
3916pub struct RejoinedRoom {
3917    pub room: proto::Room,
3918    pub rejoined_projects: Vec<RejoinedProject>,
3919    pub reshared_projects: Vec<ResharedProject>,
3920}
3921
3922pub struct ResharedProject {
3923    pub id: ProjectId,
3924    pub old_connection_id: ConnectionId,
3925    pub collaborators: Vec<ProjectCollaborator>,
3926    pub worktrees: Vec<proto::WorktreeMetadata>,
3927}
3928
3929pub struct RejoinedProject {
3930    pub id: ProjectId,
3931    pub old_connection_id: ConnectionId,
3932    pub collaborators: Vec<ProjectCollaborator>,
3933    pub worktrees: Vec<RejoinedWorktree>,
3934    pub language_servers: Vec<proto::LanguageServer>,
3935}
3936
3937#[derive(Debug)]
3938pub struct RejoinedWorktree {
3939    pub id: u64,
3940    pub abs_path: String,
3941    pub root_name: String,
3942    pub visible: bool,
3943    pub updated_entries: Vec<proto::Entry>,
3944    pub removed_entries: Vec<u64>,
3945    pub updated_repositories: Vec<proto::RepositoryEntry>,
3946    pub removed_repositories: Vec<u64>,
3947    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3948    pub settings_files: Vec<WorktreeSettingsFile>,
3949    pub scan_id: u64,
3950    pub completed_scan_id: u64,
3951}
3952
3953pub struct LeftRoom {
3954    pub room: proto::Room,
3955    pub left_projects: HashMap<ProjectId, LeftProject>,
3956    pub canceled_calls_to_user_ids: Vec<UserId>,
3957    pub deleted: bool,
3958}
3959
3960pub struct RefreshedRoom {
3961    pub room: proto::Room,
3962    pub stale_participant_user_ids: Vec<UserId>,
3963    pub canceled_calls_to_user_ids: Vec<UserId>,
3964}
3965
3966pub struct Project {
3967    pub collaborators: Vec<ProjectCollaborator>,
3968    pub worktrees: BTreeMap<u64, Worktree>,
3969    pub language_servers: Vec<proto::LanguageServer>,
3970}
3971
3972pub struct ProjectCollaborator {
3973    pub connection_id: ConnectionId,
3974    pub user_id: UserId,
3975    pub replica_id: ReplicaId,
3976    pub is_host: bool,
3977}
3978
3979impl ProjectCollaborator {
3980    pub fn to_proto(&self) -> proto::Collaborator {
3981        proto::Collaborator {
3982            peer_id: Some(self.connection_id.into()),
3983            replica_id: self.replica_id.0 as u32,
3984            user_id: self.user_id.to_proto(),
3985        }
3986    }
3987}
3988
3989#[derive(Debug)]
3990pub struct LeftProject {
3991    pub id: ProjectId,
3992    pub host_user_id: UserId,
3993    pub host_connection_id: ConnectionId,
3994    pub connection_ids: Vec<ConnectionId>,
3995}
3996
3997pub struct Worktree {
3998    pub id: u64,
3999    pub abs_path: String,
4000    pub root_name: String,
4001    pub visible: bool,
4002    pub entries: Vec<proto::Entry>,
4003    pub repository_entries: BTreeMap<u64, proto::RepositoryEntry>,
4004    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
4005    pub settings_files: Vec<WorktreeSettingsFile>,
4006    pub scan_id: u64,
4007    pub completed_scan_id: u64,
4008}
4009
4010#[derive(Debug)]
4011pub struct WorktreeSettingsFile {
4012    pub path: String,
4013    pub content: String,
4014}
4015
4016#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
4017enum QueryChannelIds {
4018    ChannelId,
4019}
4020
4021#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
4022enum QueryUserIds {
4023    UserId,
4024}
4025
4026#[cfg(test)]
4027pub use test::*;
4028
4029#[cfg(test)]
4030mod test {
4031    use super::*;
4032    use gpui::executor::Background;
4033    use parking_lot::Mutex;
4034    use sea_orm::ConnectionTrait;
4035    use sqlx::migrate::MigrateDatabase;
4036    use std::sync::Arc;
4037
4038    pub struct TestDb {
4039        pub db: Option<Arc<Database>>,
4040        pub connection: Option<sqlx::AnyConnection>,
4041    }
4042
4043    impl TestDb {
4044        pub fn sqlite(background: Arc<Background>) -> Self {
4045            let url = format!("sqlite::memory:");
4046            let runtime = tokio::runtime::Builder::new_current_thread()
4047                .enable_io()
4048                .enable_time()
4049                .build()
4050                .unwrap();
4051
4052            let mut db = runtime.block_on(async {
4053                let mut options = ConnectOptions::new(url);
4054                options.max_connections(5);
4055                let db = Database::new(options, Executor::Deterministic(background))
4056                    .await
4057                    .unwrap();
4058                let sql = include_str!(concat!(
4059                    env!("CARGO_MANIFEST_DIR"),
4060                    "/migrations.sqlite/20221109000000_test_schema.sql"
4061                ));
4062                db.pool
4063                    .execute(sea_orm::Statement::from_string(
4064                        db.pool.get_database_backend(),
4065                        sql.into(),
4066                    ))
4067                    .await
4068                    .unwrap();
4069                db
4070            });
4071
4072            db.runtime = Some(runtime);
4073
4074            Self {
4075                db: Some(Arc::new(db)),
4076                connection: None,
4077            }
4078        }
4079
4080        pub fn postgres(background: Arc<Background>) -> Self {
4081            static LOCK: Mutex<()> = Mutex::new(());
4082
4083            let _guard = LOCK.lock();
4084            let mut rng = StdRng::from_entropy();
4085            let url = format!(
4086                "postgres://postgres@localhost/zed-test-{}",
4087                rng.gen::<u128>()
4088            );
4089            let runtime = tokio::runtime::Builder::new_current_thread()
4090                .enable_io()
4091                .enable_time()
4092                .build()
4093                .unwrap();
4094
4095            let mut db = runtime.block_on(async {
4096                sqlx::Postgres::create_database(&url)
4097                    .await
4098                    .expect("failed to create test db");
4099                let mut options = ConnectOptions::new(url);
4100                options
4101                    .max_connections(5)
4102                    .idle_timeout(Duration::from_secs(0));
4103                let db = Database::new(options, Executor::Deterministic(background))
4104                    .await
4105                    .unwrap();
4106                let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
4107                db.migrate(Path::new(migrations_path), false).await.unwrap();
4108                db
4109            });
4110
4111            db.runtime = Some(runtime);
4112
4113            Self {
4114                db: Some(Arc::new(db)),
4115                connection: None,
4116            }
4117        }
4118
4119        pub fn db(&self) -> &Arc<Database> {
4120            self.db.as_ref().unwrap()
4121        }
4122    }
4123
4124    impl Drop for TestDb {
4125        fn drop(&mut self) {
4126            let db = self.db.take().unwrap();
4127            if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
4128                db.runtime.as_ref().unwrap().block_on(async {
4129                    use util::ResultExt;
4130                    let query = "
4131                        SELECT pg_terminate_backend(pg_stat_activity.pid)
4132                        FROM pg_stat_activity
4133                        WHERE
4134                            pg_stat_activity.datname = current_database() AND
4135                            pid <> pg_backend_pid();
4136                    ";
4137                    db.pool
4138                        .execute(sea_orm::Statement::from_string(
4139                            db.pool.get_database_backend(),
4140                            query.into(),
4141                        ))
4142                        .await
4143                        .log_err();
4144                    sqlx::Postgres::drop_database(db.options.get_url())
4145                        .await
4146                        .log_err();
4147                })
4148            }
4149        }
4150    }
4151}