db.rs

   1mod access_token;
   2mod channel;
   3mod channel_member;
   4// mod channel_parent;
   5mod contact;
   6mod follower;
   7mod language_server;
   8mod project;
   9mod project_collaborator;
  10mod room;
  11mod room_participant;
  12mod server;
  13mod signup;
  14#[cfg(test)]
  15mod tests;
  16mod user;
  17mod worktree;
  18mod worktree_diagnostic_summary;
  19mod worktree_entry;
  20mod worktree_repository;
  21mod worktree_repository_statuses;
  22mod worktree_settings_file;
  23
  24use crate::executor::Executor;
  25use crate::{Error, Result};
  26use anyhow::anyhow;
  27use collections::{BTreeMap, HashMap, HashSet};
  28pub use contact::Contact;
  29use dashmap::DashMap;
  30use futures::StreamExt;
  31use hyper::StatusCode;
  32use rand::prelude::StdRng;
  33use rand::{Rng, SeedableRng};
  34use rpc::{proto, ConnectionId};
  35use sea_orm::Condition;
  36pub use sea_orm::ConnectOptions;
  37use sea_orm::{
  38    entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
  39    DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
  40    Statement, TransactionTrait,
  41};
  42use sea_query::{Alias, Expr, OnConflict, Query, SelectStatement};
  43use serde::{Deserialize, Serialize};
  44pub use signup::{Invite, NewSignup, WaitlistSummary};
  45use sqlx::migrate::{Migrate, Migration, MigrationSource};
  46use sqlx::Connection;
  47use std::ops::{Deref, DerefMut};
  48use std::path::Path;
  49use std::time::Duration;
  50use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
  51use tokio::sync::{Mutex, OwnedMutexGuard};
  52pub use user::Model as User;
  53
  54pub struct Database {
  55    options: ConnectOptions,
  56    pool: DatabaseConnection,
  57    rooms: DashMap<RoomId, Arc<Mutex<()>>>,
  58    rng: Mutex<StdRng>,
  59    executor: Executor,
  60    #[cfg(test)]
  61    runtime: Option<tokio::runtime::Runtime>,
  62}
  63
  64impl Database {
  65    pub async fn new(options: ConnectOptions, executor: Executor) -> Result<Self> {
  66        Ok(Self {
  67            options: options.clone(),
  68            pool: sea_orm::Database::connect(options).await?,
  69            rooms: DashMap::with_capacity(16384),
  70            rng: Mutex::new(StdRng::seed_from_u64(0)),
  71            executor,
  72            #[cfg(test)]
  73            runtime: None,
  74        })
  75    }
  76
  77    #[cfg(test)]
  78    pub fn reset(&self) {
  79        self.rooms.clear();
  80    }
  81
  82    pub async fn migrate(
  83        &self,
  84        migrations_path: &Path,
  85        ignore_checksum_mismatch: bool,
  86    ) -> anyhow::Result<Vec<(Migration, Duration)>> {
  87        let migrations = MigrationSource::resolve(migrations_path)
  88            .await
  89            .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
  90
  91        let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
  92
  93        connection.ensure_migrations_table().await?;
  94        let applied_migrations: HashMap<_, _> = connection
  95            .list_applied_migrations()
  96            .await?
  97            .into_iter()
  98            .map(|m| (m.version, m))
  99            .collect();
 100
 101        let mut new_migrations = Vec::new();
 102        for migration in migrations {
 103            match applied_migrations.get(&migration.version) {
 104                Some(applied_migration) => {
 105                    if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
 106                    {
 107                        Err(anyhow!(
 108                            "checksum mismatch for applied migration {}",
 109                            migration.description
 110                        ))?;
 111                    }
 112                }
 113                None => {
 114                    let elapsed = connection.apply(&migration).await?;
 115                    new_migrations.push((migration, elapsed));
 116                }
 117            }
 118        }
 119
 120        Ok(new_migrations)
 121    }
 122
 123    pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
 124        self.transaction(|tx| async move {
 125            let server = server::ActiveModel {
 126                environment: ActiveValue::set(environment.into()),
 127                ..Default::default()
 128            }
 129            .insert(&*tx)
 130            .await?;
 131            Ok(server.id)
 132        })
 133        .await
 134    }
 135
 136    pub async fn stale_room_ids(
 137        &self,
 138        environment: &str,
 139        new_server_id: ServerId,
 140    ) -> Result<Vec<RoomId>> {
 141        self.transaction(|tx| async move {
 142            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 143            enum QueryAs {
 144                RoomId,
 145            }
 146
 147            let stale_server_epochs = self
 148                .stale_server_ids(environment, new_server_id, &tx)
 149                .await?;
 150            Ok(room_participant::Entity::find()
 151                .select_only()
 152                .column(room_participant::Column::RoomId)
 153                .distinct()
 154                .filter(
 155                    room_participant::Column::AnsweringConnectionServerId
 156                        .is_in(stale_server_epochs),
 157                )
 158                .into_values::<_, QueryAs>()
 159                .all(&*tx)
 160                .await?)
 161        })
 162        .await
 163    }
 164
 165    pub async fn refresh_room(
 166        &self,
 167        room_id: RoomId,
 168        new_server_id: ServerId,
 169    ) -> Result<RoomGuard<RefreshedRoom>> {
 170        self.room_transaction(room_id, |tx| async move {
 171            let stale_participant_filter = Condition::all()
 172                .add(room_participant::Column::RoomId.eq(room_id))
 173                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
 174                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
 175
 176            let stale_participant_user_ids = room_participant::Entity::find()
 177                .filter(stale_participant_filter.clone())
 178                .all(&*tx)
 179                .await?
 180                .into_iter()
 181                .map(|participant| participant.user_id)
 182                .collect::<Vec<_>>();
 183
 184            // Delete participants who failed to reconnect and cancel their calls.
 185            let mut canceled_calls_to_user_ids = Vec::new();
 186            room_participant::Entity::delete_many()
 187                .filter(stale_participant_filter)
 188                .exec(&*tx)
 189                .await?;
 190            let called_participants = room_participant::Entity::find()
 191                .filter(
 192                    Condition::all()
 193                        .add(
 194                            room_participant::Column::CallingUserId
 195                                .is_in(stale_participant_user_ids.iter().copied()),
 196                        )
 197                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
 198                )
 199                .all(&*tx)
 200                .await?;
 201            room_participant::Entity::delete_many()
 202                .filter(
 203                    room_participant::Column::Id
 204                        .is_in(called_participants.iter().map(|participant| participant.id)),
 205                )
 206                .exec(&*tx)
 207                .await?;
 208            canceled_calls_to_user_ids.extend(
 209                called_participants
 210                    .into_iter()
 211                    .map(|participant| participant.user_id),
 212            );
 213
 214            let room = self.get_room(room_id, &tx).await?;
 215            // Delete the room if it becomes empty.
 216            if room.participants.is_empty() {
 217                project::Entity::delete_many()
 218                    .filter(project::Column::RoomId.eq(room_id))
 219                    .exec(&*tx)
 220                    .await?;
 221                room::Entity::delete_by_id(room_id).exec(&*tx).await?;
 222            }
 223
 224            Ok(RefreshedRoom {
 225                room,
 226                stale_participant_user_ids,
 227                canceled_calls_to_user_ids,
 228            })
 229        })
 230        .await
 231    }
 232
 233    pub async fn delete_stale_servers(
 234        &self,
 235        environment: &str,
 236        new_server_id: ServerId,
 237    ) -> Result<()> {
 238        self.transaction(|tx| async move {
 239            server::Entity::delete_many()
 240                .filter(
 241                    Condition::all()
 242                        .add(server::Column::Environment.eq(environment))
 243                        .add(server::Column::Id.ne(new_server_id)),
 244                )
 245                .exec(&*tx)
 246                .await?;
 247            Ok(())
 248        })
 249        .await
 250    }
 251
 252    async fn stale_server_ids(
 253        &self,
 254        environment: &str,
 255        new_server_id: ServerId,
 256        tx: &DatabaseTransaction,
 257    ) -> Result<Vec<ServerId>> {
 258        let stale_servers = server::Entity::find()
 259            .filter(
 260                Condition::all()
 261                    .add(server::Column::Environment.eq(environment))
 262                    .add(server::Column::Id.ne(new_server_id)),
 263            )
 264            .all(&*tx)
 265            .await?;
 266        Ok(stale_servers.into_iter().map(|server| server.id).collect())
 267    }
 268
 269    // users
 270
 271    pub async fn create_user(
 272        &self,
 273        email_address: &str,
 274        admin: bool,
 275        params: NewUserParams,
 276    ) -> Result<NewUserResult> {
 277        self.transaction(|tx| async {
 278            let tx = tx;
 279            let user = user::Entity::insert(user::ActiveModel {
 280                email_address: ActiveValue::set(Some(email_address.into())),
 281                github_login: ActiveValue::set(params.github_login.clone()),
 282                github_user_id: ActiveValue::set(Some(params.github_user_id)),
 283                admin: ActiveValue::set(admin),
 284                metrics_id: ActiveValue::set(Uuid::new_v4()),
 285                ..Default::default()
 286            })
 287            .on_conflict(
 288                OnConflict::column(user::Column::GithubLogin)
 289                    .update_column(user::Column::GithubLogin)
 290                    .to_owned(),
 291            )
 292            .exec_with_returning(&*tx)
 293            .await?;
 294
 295            Ok(NewUserResult {
 296                user_id: user.id,
 297                metrics_id: user.metrics_id.to_string(),
 298                signup_device_id: None,
 299                inviting_user_id: None,
 300            })
 301        })
 302        .await
 303    }
 304
 305    pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
 306        self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
 307            .await
 308    }
 309
 310    pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
 311        self.transaction(|tx| async {
 312            let tx = tx;
 313            Ok(user::Entity::find()
 314                .filter(user::Column::Id.is_in(ids.iter().copied()))
 315                .all(&*tx)
 316                .await?)
 317        })
 318        .await
 319    }
 320
 321    pub async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>> {
 322        self.transaction(|tx| async move {
 323            Ok(user::Entity::find()
 324                .filter(user::Column::GithubLogin.eq(github_login))
 325                .one(&*tx)
 326                .await?)
 327        })
 328        .await
 329    }
 330
 331    pub async fn get_or_create_user_by_github_account(
 332        &self,
 333        github_login: &str,
 334        github_user_id: Option<i32>,
 335        github_email: Option<&str>,
 336    ) -> Result<Option<User>> {
 337        self.transaction(|tx| async move {
 338            let tx = &*tx;
 339            if let Some(github_user_id) = github_user_id {
 340                if let Some(user_by_github_user_id) = user::Entity::find()
 341                    .filter(user::Column::GithubUserId.eq(github_user_id))
 342                    .one(tx)
 343                    .await?
 344                {
 345                    let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
 346                    user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
 347                    Ok(Some(user_by_github_user_id.update(tx).await?))
 348                } else if let Some(user_by_github_login) = user::Entity::find()
 349                    .filter(user::Column::GithubLogin.eq(github_login))
 350                    .one(tx)
 351                    .await?
 352                {
 353                    let mut user_by_github_login = user_by_github_login.into_active_model();
 354                    user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
 355                    Ok(Some(user_by_github_login.update(tx).await?))
 356                } else {
 357                    let user = user::Entity::insert(user::ActiveModel {
 358                        email_address: ActiveValue::set(github_email.map(|email| email.into())),
 359                        github_login: ActiveValue::set(github_login.into()),
 360                        github_user_id: ActiveValue::set(Some(github_user_id)),
 361                        admin: ActiveValue::set(false),
 362                        invite_count: ActiveValue::set(0),
 363                        invite_code: ActiveValue::set(None),
 364                        metrics_id: ActiveValue::set(Uuid::new_v4()),
 365                        ..Default::default()
 366                    })
 367                    .exec_with_returning(&*tx)
 368                    .await?;
 369                    Ok(Some(user))
 370                }
 371            } else {
 372                Ok(user::Entity::find()
 373                    .filter(user::Column::GithubLogin.eq(github_login))
 374                    .one(tx)
 375                    .await?)
 376            }
 377        })
 378        .await
 379    }
 380
 381    pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
 382        self.transaction(|tx| async move {
 383            Ok(user::Entity::find()
 384                .order_by_asc(user::Column::GithubLogin)
 385                .limit(limit as u64)
 386                .offset(page as u64 * limit as u64)
 387                .all(&*tx)
 388                .await?)
 389        })
 390        .await
 391    }
 392
 393    pub async fn get_users_with_no_invites(
 394        &self,
 395        invited_by_another_user: bool,
 396    ) -> Result<Vec<User>> {
 397        self.transaction(|tx| async move {
 398            Ok(user::Entity::find()
 399                .filter(
 400                    user::Column::InviteCount
 401                        .eq(0)
 402                        .and(if invited_by_another_user {
 403                            user::Column::InviterId.is_not_null()
 404                        } else {
 405                            user::Column::InviterId.is_null()
 406                        }),
 407                )
 408                .all(&*tx)
 409                .await?)
 410        })
 411        .await
 412    }
 413
 414    pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
 415        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 416        enum QueryAs {
 417            MetricsId,
 418        }
 419
 420        self.transaction(|tx| async move {
 421            let metrics_id: Uuid = user::Entity::find_by_id(id)
 422                .select_only()
 423                .column(user::Column::MetricsId)
 424                .into_values::<_, QueryAs>()
 425                .one(&*tx)
 426                .await?
 427                .ok_or_else(|| anyhow!("could not find user"))?;
 428            Ok(metrics_id.to_string())
 429        })
 430        .await
 431    }
 432
 433    pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
 434        self.transaction(|tx| async move {
 435            user::Entity::update_many()
 436                .filter(user::Column::Id.eq(id))
 437                .set(user::ActiveModel {
 438                    admin: ActiveValue::set(is_admin),
 439                    ..Default::default()
 440                })
 441                .exec(&*tx)
 442                .await?;
 443            Ok(())
 444        })
 445        .await
 446    }
 447
 448    pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
 449        self.transaction(|tx| async move {
 450            user::Entity::update_many()
 451                .filter(user::Column::Id.eq(id))
 452                .set(user::ActiveModel {
 453                    connected_once: ActiveValue::set(connected_once),
 454                    ..Default::default()
 455                })
 456                .exec(&*tx)
 457                .await?;
 458            Ok(())
 459        })
 460        .await
 461    }
 462
 463    pub async fn destroy_user(&self, id: UserId) -> Result<()> {
 464        self.transaction(|tx| async move {
 465            access_token::Entity::delete_many()
 466                .filter(access_token::Column::UserId.eq(id))
 467                .exec(&*tx)
 468                .await?;
 469            user::Entity::delete_by_id(id).exec(&*tx).await?;
 470            Ok(())
 471        })
 472        .await
 473    }
 474
 475    // contacts
 476
 477    pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
 478        #[derive(Debug, FromQueryResult)]
 479        struct ContactWithUserBusyStatuses {
 480            user_id_a: UserId,
 481            user_id_b: UserId,
 482            a_to_b: bool,
 483            accepted: bool,
 484            should_notify: bool,
 485            user_a_busy: bool,
 486            user_b_busy: bool,
 487        }
 488
 489        self.transaction(|tx| async move {
 490            let user_a_participant = Alias::new("user_a_participant");
 491            let user_b_participant = Alias::new("user_b_participant");
 492            let mut db_contacts = contact::Entity::find()
 493                .column_as(
 494                    Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
 495                        .is_not_null(),
 496                    "user_a_busy",
 497                )
 498                .column_as(
 499                    Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
 500                        .is_not_null(),
 501                    "user_b_busy",
 502                )
 503                .filter(
 504                    contact::Column::UserIdA
 505                        .eq(user_id)
 506                        .or(contact::Column::UserIdB.eq(user_id)),
 507                )
 508                .join_as(
 509                    JoinType::LeftJoin,
 510                    contact::Relation::UserARoomParticipant.def(),
 511                    user_a_participant,
 512                )
 513                .join_as(
 514                    JoinType::LeftJoin,
 515                    contact::Relation::UserBRoomParticipant.def(),
 516                    user_b_participant,
 517                )
 518                .into_model::<ContactWithUserBusyStatuses>()
 519                .stream(&*tx)
 520                .await?;
 521
 522            let mut contacts = Vec::new();
 523            while let Some(db_contact) = db_contacts.next().await {
 524                let db_contact = db_contact?;
 525                if db_contact.user_id_a == user_id {
 526                    if db_contact.accepted {
 527                        contacts.push(Contact::Accepted {
 528                            user_id: db_contact.user_id_b,
 529                            should_notify: db_contact.should_notify && db_contact.a_to_b,
 530                            busy: db_contact.user_b_busy,
 531                        });
 532                    } else if db_contact.a_to_b {
 533                        contacts.push(Contact::Outgoing {
 534                            user_id: db_contact.user_id_b,
 535                        })
 536                    } else {
 537                        contacts.push(Contact::Incoming {
 538                            user_id: db_contact.user_id_b,
 539                            should_notify: db_contact.should_notify,
 540                        });
 541                    }
 542                } else if db_contact.accepted {
 543                    contacts.push(Contact::Accepted {
 544                        user_id: db_contact.user_id_a,
 545                        should_notify: db_contact.should_notify && !db_contact.a_to_b,
 546                        busy: db_contact.user_a_busy,
 547                    });
 548                } else if db_contact.a_to_b {
 549                    contacts.push(Contact::Incoming {
 550                        user_id: db_contact.user_id_a,
 551                        should_notify: db_contact.should_notify,
 552                    });
 553                } else {
 554                    contacts.push(Contact::Outgoing {
 555                        user_id: db_contact.user_id_a,
 556                    });
 557                }
 558            }
 559
 560            contacts.sort_unstable_by_key(|contact| contact.user_id());
 561
 562            Ok(contacts)
 563        })
 564        .await
 565    }
 566
 567    pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
 568        self.transaction(|tx| async move {
 569            let participant = room_participant::Entity::find()
 570                .filter(room_participant::Column::UserId.eq(user_id))
 571                .one(&*tx)
 572                .await?;
 573            Ok(participant.is_some())
 574        })
 575        .await
 576    }
 577
 578    pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
 579        self.transaction(|tx| async move {
 580            let (id_a, id_b) = if user_id_1 < user_id_2 {
 581                (user_id_1, user_id_2)
 582            } else {
 583                (user_id_2, user_id_1)
 584            };
 585
 586            Ok(contact::Entity::find()
 587                .filter(
 588                    contact::Column::UserIdA
 589                        .eq(id_a)
 590                        .and(contact::Column::UserIdB.eq(id_b))
 591                        .and(contact::Column::Accepted.eq(true)),
 592                )
 593                .one(&*tx)
 594                .await?
 595                .is_some())
 596        })
 597        .await
 598    }
 599
 600    pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
 601        self.transaction(|tx| async move {
 602            let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
 603                (sender_id, receiver_id, true)
 604            } else {
 605                (receiver_id, sender_id, false)
 606            };
 607
 608            let rows_affected = contact::Entity::insert(contact::ActiveModel {
 609                user_id_a: ActiveValue::set(id_a),
 610                user_id_b: ActiveValue::set(id_b),
 611                a_to_b: ActiveValue::set(a_to_b),
 612                accepted: ActiveValue::set(false),
 613                should_notify: ActiveValue::set(true),
 614                ..Default::default()
 615            })
 616            .on_conflict(
 617                OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
 618                    .values([
 619                        (contact::Column::Accepted, true.into()),
 620                        (contact::Column::ShouldNotify, false.into()),
 621                    ])
 622                    .action_and_where(
 623                        contact::Column::Accepted.eq(false).and(
 624                            contact::Column::AToB
 625                                .eq(a_to_b)
 626                                .and(contact::Column::UserIdA.eq(id_b))
 627                                .or(contact::Column::AToB
 628                                    .ne(a_to_b)
 629                                    .and(contact::Column::UserIdA.eq(id_a))),
 630                        ),
 631                    )
 632                    .to_owned(),
 633            )
 634            .exec_without_returning(&*tx)
 635            .await?;
 636
 637            if rows_affected == 1 {
 638                Ok(())
 639            } else {
 640                Err(anyhow!("contact already requested"))?
 641            }
 642        })
 643        .await
 644    }
 645
 646    /// Returns a bool indicating whether the removed contact had originally accepted or not
 647    ///
 648    /// Deletes the contact identified by the requester and responder ids, and then returns
 649    /// whether the deleted contact had originally accepted or was a pending contact request.
 650    ///
 651    /// # Arguments
 652    ///
 653    /// * `requester_id` - The user that initiates this request
 654    /// * `responder_id` - The user that will be removed
 655    pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<bool> {
 656        self.transaction(|tx| async move {
 657            let (id_a, id_b) = if responder_id < requester_id {
 658                (responder_id, requester_id)
 659            } else {
 660                (requester_id, responder_id)
 661            };
 662
 663            let contact = contact::Entity::find()
 664                .filter(
 665                    contact::Column::UserIdA
 666                        .eq(id_a)
 667                        .and(contact::Column::UserIdB.eq(id_b)),
 668                )
 669                .one(&*tx)
 670                .await?
 671                .ok_or_else(|| anyhow!("no such contact"))?;
 672
 673            contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
 674            Ok(contact.accepted)
 675        })
 676        .await
 677    }
 678
 679    pub async fn dismiss_contact_notification(
 680        &self,
 681        user_id: UserId,
 682        contact_user_id: UserId,
 683    ) -> Result<()> {
 684        self.transaction(|tx| async move {
 685            let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
 686                (user_id, contact_user_id, true)
 687            } else {
 688                (contact_user_id, user_id, false)
 689            };
 690
 691            let result = contact::Entity::update_many()
 692                .set(contact::ActiveModel {
 693                    should_notify: ActiveValue::set(false),
 694                    ..Default::default()
 695                })
 696                .filter(
 697                    contact::Column::UserIdA
 698                        .eq(id_a)
 699                        .and(contact::Column::UserIdB.eq(id_b))
 700                        .and(
 701                            contact::Column::AToB
 702                                .eq(a_to_b)
 703                                .and(contact::Column::Accepted.eq(true))
 704                                .or(contact::Column::AToB
 705                                    .ne(a_to_b)
 706                                    .and(contact::Column::Accepted.eq(false))),
 707                        ),
 708                )
 709                .exec(&*tx)
 710                .await?;
 711            if result.rows_affected == 0 {
 712                Err(anyhow!("no such contact request"))?
 713            } else {
 714                Ok(())
 715            }
 716        })
 717        .await
 718    }
 719
 720    pub async fn respond_to_contact_request(
 721        &self,
 722        responder_id: UserId,
 723        requester_id: UserId,
 724        accept: bool,
 725    ) -> Result<()> {
 726        self.transaction(|tx| async move {
 727            let (id_a, id_b, a_to_b) = if responder_id < requester_id {
 728                (responder_id, requester_id, false)
 729            } else {
 730                (requester_id, responder_id, true)
 731            };
 732            let rows_affected = if accept {
 733                let result = contact::Entity::update_many()
 734                    .set(contact::ActiveModel {
 735                        accepted: ActiveValue::set(true),
 736                        should_notify: ActiveValue::set(true),
 737                        ..Default::default()
 738                    })
 739                    .filter(
 740                        contact::Column::UserIdA
 741                            .eq(id_a)
 742                            .and(contact::Column::UserIdB.eq(id_b))
 743                            .and(contact::Column::AToB.eq(a_to_b)),
 744                    )
 745                    .exec(&*tx)
 746                    .await?;
 747                result.rows_affected
 748            } else {
 749                let result = contact::Entity::delete_many()
 750                    .filter(
 751                        contact::Column::UserIdA
 752                            .eq(id_a)
 753                            .and(contact::Column::UserIdB.eq(id_b))
 754                            .and(contact::Column::AToB.eq(a_to_b))
 755                            .and(contact::Column::Accepted.eq(false)),
 756                    )
 757                    .exec(&*tx)
 758                    .await?;
 759
 760                result.rows_affected
 761            };
 762
 763            if rows_affected == 1 {
 764                Ok(())
 765            } else {
 766                Err(anyhow!("no such contact request"))?
 767            }
 768        })
 769        .await
 770    }
 771
 772    pub fn fuzzy_like_string(string: &str) -> String {
 773        let mut result = String::with_capacity(string.len() * 2 + 1);
 774        for c in string.chars() {
 775            if c.is_alphanumeric() {
 776                result.push('%');
 777                result.push(c);
 778            }
 779        }
 780        result.push('%');
 781        result
 782    }
 783
 784    pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
 785        self.transaction(|tx| async {
 786            let tx = tx;
 787            let like_string = Self::fuzzy_like_string(name_query);
 788            let query = "
 789                SELECT users.*
 790                FROM users
 791                WHERE github_login ILIKE $1
 792                ORDER BY github_login <-> $2
 793                LIMIT $3
 794            ";
 795
 796            Ok(user::Entity::find()
 797                .from_raw_sql(Statement::from_sql_and_values(
 798                    self.pool.get_database_backend(),
 799                    query.into(),
 800                    vec![like_string.into(), name_query.into(), limit.into()],
 801                ))
 802                .all(&*tx)
 803                .await?)
 804        })
 805        .await
 806    }
 807
 808    // signups
 809
 810    pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
 811        self.transaction(|tx| async move {
 812            signup::Entity::insert(signup::ActiveModel {
 813                email_address: ActiveValue::set(signup.email_address.clone()),
 814                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 815                email_confirmation_sent: ActiveValue::set(false),
 816                platform_mac: ActiveValue::set(signup.platform_mac),
 817                platform_windows: ActiveValue::set(signup.platform_windows),
 818                platform_linux: ActiveValue::set(signup.platform_linux),
 819                platform_unknown: ActiveValue::set(false),
 820                editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
 821                programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
 822                device_id: ActiveValue::set(signup.device_id.clone()),
 823                added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
 824                ..Default::default()
 825            })
 826            .on_conflict(
 827                OnConflict::column(signup::Column::EmailAddress)
 828                    .update_columns([
 829                        signup::Column::PlatformMac,
 830                        signup::Column::PlatformWindows,
 831                        signup::Column::PlatformLinux,
 832                        signup::Column::EditorFeatures,
 833                        signup::Column::ProgrammingLanguages,
 834                        signup::Column::DeviceId,
 835                        signup::Column::AddedToMailingList,
 836                    ])
 837                    .to_owned(),
 838            )
 839            .exec(&*tx)
 840            .await?;
 841            Ok(())
 842        })
 843        .await
 844    }
 845
 846    pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
 847        self.transaction(|tx| async move {
 848            let signup = signup::Entity::find()
 849                .filter(signup::Column::EmailAddress.eq(email_address))
 850                .one(&*tx)
 851                .await?
 852                .ok_or_else(|| {
 853                    anyhow!("signup with email address {} doesn't exist", email_address)
 854                })?;
 855
 856            Ok(signup)
 857        })
 858        .await
 859    }
 860
 861    pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
 862        self.transaction(|tx| async move {
 863            let query = "
 864                SELECT
 865                    COUNT(*) as count,
 866                    COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
 867                    COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
 868                    COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
 869                    COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
 870                FROM (
 871                    SELECT *
 872                    FROM signups
 873                    WHERE
 874                        NOT email_confirmation_sent
 875                ) AS unsent
 876            ";
 877            Ok(
 878                WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
 879                    self.pool.get_database_backend(),
 880                    query.into(),
 881                    vec![],
 882                ))
 883                .one(&*tx)
 884                .await?
 885                .ok_or_else(|| anyhow!("invalid result"))?,
 886            )
 887        })
 888        .await
 889    }
 890
 891    pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
 892        let emails = invites
 893            .iter()
 894            .map(|s| s.email_address.as_str())
 895            .collect::<Vec<_>>();
 896        self.transaction(|tx| async {
 897            let tx = tx;
 898            signup::Entity::update_many()
 899                .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
 900                .set(signup::ActiveModel {
 901                    email_confirmation_sent: ActiveValue::set(true),
 902                    ..Default::default()
 903                })
 904                .exec(&*tx)
 905                .await?;
 906            Ok(())
 907        })
 908        .await
 909    }
 910
 911    pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
 912        self.transaction(|tx| async move {
 913            Ok(signup::Entity::find()
 914                .select_only()
 915                .column(signup::Column::EmailAddress)
 916                .column(signup::Column::EmailConfirmationCode)
 917                .filter(
 918                    signup::Column::EmailConfirmationSent.eq(false).and(
 919                        signup::Column::PlatformMac
 920                            .eq(true)
 921                            .or(signup::Column::PlatformUnknown.eq(true)),
 922                    ),
 923                )
 924                .order_by_asc(signup::Column::CreatedAt)
 925                .limit(count as u64)
 926                .into_model()
 927                .all(&*tx)
 928                .await?)
 929        })
 930        .await
 931    }
 932
 933    // invite codes
 934
 935    pub async fn create_invite_from_code(
 936        &self,
 937        code: &str,
 938        email_address: &str,
 939        device_id: Option<&str>,
 940        added_to_mailing_list: bool,
 941    ) -> Result<Invite> {
 942        self.transaction(|tx| async move {
 943            let existing_user = user::Entity::find()
 944                .filter(user::Column::EmailAddress.eq(email_address))
 945                .one(&*tx)
 946                .await?;
 947
 948            if existing_user.is_some() {
 949                Err(anyhow!("email address is already in use"))?;
 950            }
 951
 952            let inviting_user_with_invites = match user::Entity::find()
 953                .filter(
 954                    user::Column::InviteCode
 955                        .eq(code)
 956                        .and(user::Column::InviteCount.gt(0)),
 957                )
 958                .one(&*tx)
 959                .await?
 960            {
 961                Some(inviting_user) => inviting_user,
 962                None => {
 963                    return Err(Error::Http(
 964                        StatusCode::UNAUTHORIZED,
 965                        "unable to find an invite code with invites remaining".to_string(),
 966                    ))?
 967                }
 968            };
 969            user::Entity::update_many()
 970                .filter(
 971                    user::Column::Id
 972                        .eq(inviting_user_with_invites.id)
 973                        .and(user::Column::InviteCount.gt(0)),
 974                )
 975                .col_expr(
 976                    user::Column::InviteCount,
 977                    Expr::col(user::Column::InviteCount).sub(1),
 978                )
 979                .exec(&*tx)
 980                .await?;
 981
 982            let signup = signup::Entity::insert(signup::ActiveModel {
 983                email_address: ActiveValue::set(email_address.into()),
 984                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 985                email_confirmation_sent: ActiveValue::set(false),
 986                inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
 987                platform_linux: ActiveValue::set(false),
 988                platform_mac: ActiveValue::set(false),
 989                platform_windows: ActiveValue::set(false),
 990                platform_unknown: ActiveValue::set(true),
 991                device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
 992                added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
 993                ..Default::default()
 994            })
 995            .on_conflict(
 996                OnConflict::column(signup::Column::EmailAddress)
 997                    .update_column(signup::Column::InvitingUserId)
 998                    .to_owned(),
 999            )
1000            .exec_with_returning(&*tx)
1001            .await?;
1002
1003            Ok(Invite {
1004                email_address: signup.email_address,
1005                email_confirmation_code: signup.email_confirmation_code,
1006            })
1007        })
1008        .await
1009    }
1010
1011    pub async fn create_user_from_invite(
1012        &self,
1013        invite: &Invite,
1014        user: NewUserParams,
1015    ) -> Result<Option<NewUserResult>> {
1016        self.transaction(|tx| async {
1017            let tx = tx;
1018            let signup = signup::Entity::find()
1019                .filter(
1020                    signup::Column::EmailAddress
1021                        .eq(invite.email_address.as_str())
1022                        .and(
1023                            signup::Column::EmailConfirmationCode
1024                                .eq(invite.email_confirmation_code.as_str()),
1025                        ),
1026                )
1027                .one(&*tx)
1028                .await?
1029                .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
1030
1031            if signup.user_id.is_some() {
1032                return Ok(None);
1033            }
1034
1035            let user = user::Entity::insert(user::ActiveModel {
1036                email_address: ActiveValue::set(Some(invite.email_address.clone())),
1037                github_login: ActiveValue::set(user.github_login.clone()),
1038                github_user_id: ActiveValue::set(Some(user.github_user_id)),
1039                admin: ActiveValue::set(false),
1040                invite_count: ActiveValue::set(user.invite_count),
1041                invite_code: ActiveValue::set(Some(random_invite_code())),
1042                metrics_id: ActiveValue::set(Uuid::new_v4()),
1043                ..Default::default()
1044            })
1045            .on_conflict(
1046                OnConflict::column(user::Column::GithubLogin)
1047                    .update_columns([
1048                        user::Column::EmailAddress,
1049                        user::Column::GithubUserId,
1050                        user::Column::Admin,
1051                    ])
1052                    .to_owned(),
1053            )
1054            .exec_with_returning(&*tx)
1055            .await?;
1056
1057            let mut signup = signup.into_active_model();
1058            signup.user_id = ActiveValue::set(Some(user.id));
1059            let signup = signup.update(&*tx).await?;
1060
1061            if let Some(inviting_user_id) = signup.inviting_user_id {
1062                let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1063                    (inviting_user_id, user.id, true)
1064                } else {
1065                    (user.id, inviting_user_id, false)
1066                };
1067
1068                contact::Entity::insert(contact::ActiveModel {
1069                    user_id_a: ActiveValue::set(user_id_a),
1070                    user_id_b: ActiveValue::set(user_id_b),
1071                    a_to_b: ActiveValue::set(a_to_b),
1072                    should_notify: ActiveValue::set(true),
1073                    accepted: ActiveValue::set(true),
1074                    ..Default::default()
1075                })
1076                .on_conflict(OnConflict::new().do_nothing().to_owned())
1077                .exec_without_returning(&*tx)
1078                .await?;
1079            }
1080
1081            Ok(Some(NewUserResult {
1082                user_id: user.id,
1083                metrics_id: user.metrics_id.to_string(),
1084                inviting_user_id: signup.inviting_user_id,
1085                signup_device_id: signup.device_id,
1086            }))
1087        })
1088        .await
1089    }
1090
1091    pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1092        self.transaction(|tx| async move {
1093            if count > 0 {
1094                user::Entity::update_many()
1095                    .filter(
1096                        user::Column::Id
1097                            .eq(id)
1098                            .and(user::Column::InviteCode.is_null()),
1099                    )
1100                    .set(user::ActiveModel {
1101                        invite_code: ActiveValue::set(Some(random_invite_code())),
1102                        ..Default::default()
1103                    })
1104                    .exec(&*tx)
1105                    .await?;
1106            }
1107
1108            user::Entity::update_many()
1109                .filter(user::Column::Id.eq(id))
1110                .set(user::ActiveModel {
1111                    invite_count: ActiveValue::set(count),
1112                    ..Default::default()
1113                })
1114                .exec(&*tx)
1115                .await?;
1116            Ok(())
1117        })
1118        .await
1119    }
1120
1121    pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1122        self.transaction(|tx| async move {
1123            match user::Entity::find_by_id(id).one(&*tx).await? {
1124                Some(user) if user.invite_code.is_some() => {
1125                    Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1126                }
1127                _ => Ok(None),
1128            }
1129        })
1130        .await
1131    }
1132
1133    pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1134        self.transaction(|tx| async move {
1135            user::Entity::find()
1136                .filter(user::Column::InviteCode.eq(code))
1137                .one(&*tx)
1138                .await?
1139                .ok_or_else(|| {
1140                    Error::Http(
1141                        StatusCode::NOT_FOUND,
1142                        "that invite code does not exist".to_string(),
1143                    )
1144                })
1145        })
1146        .await
1147    }
1148
1149    // rooms
1150
1151    pub async fn incoming_call_for_user(
1152        &self,
1153        user_id: UserId,
1154    ) -> Result<Option<proto::IncomingCall>> {
1155        self.transaction(|tx| async move {
1156            let pending_participant = room_participant::Entity::find()
1157                .filter(
1158                    room_participant::Column::UserId
1159                        .eq(user_id)
1160                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1161                )
1162                .one(&*tx)
1163                .await?;
1164
1165            if let Some(pending_participant) = pending_participant {
1166                let room = self.get_room(pending_participant.room_id, &tx).await?;
1167                Ok(Self::build_incoming_call(&room, user_id))
1168            } else {
1169                Ok(None)
1170            }
1171        })
1172        .await
1173    }
1174
1175    pub async fn create_room(
1176        &self,
1177        user_id: UserId,
1178        connection: ConnectionId,
1179        live_kit_room: &str,
1180    ) -> Result<proto::Room> {
1181        self.transaction(|tx| async move {
1182            let room = room::ActiveModel {
1183                live_kit_room: ActiveValue::set(live_kit_room.into()),
1184                ..Default::default()
1185            }
1186            .insert(&*tx)
1187            .await?;
1188            room_participant::ActiveModel {
1189                room_id: ActiveValue::set(room.id),
1190                user_id: ActiveValue::set(user_id),
1191                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1192                answering_connection_server_id: ActiveValue::set(Some(ServerId(
1193                    connection.owner_id as i32,
1194                ))),
1195                answering_connection_lost: ActiveValue::set(false),
1196                calling_user_id: ActiveValue::set(user_id),
1197                calling_connection_id: ActiveValue::set(connection.id as i32),
1198                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1199                    connection.owner_id as i32,
1200                ))),
1201                ..Default::default()
1202            }
1203            .insert(&*tx)
1204            .await?;
1205
1206            let room = self.get_room(room.id, &tx).await?;
1207            Ok(room)
1208        })
1209        .await
1210    }
1211
1212    pub async fn call(
1213        &self,
1214        room_id: RoomId,
1215        calling_user_id: UserId,
1216        calling_connection: ConnectionId,
1217        called_user_id: UserId,
1218        initial_project_id: Option<ProjectId>,
1219    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1220        self.room_transaction(room_id, |tx| async move {
1221            room_participant::ActiveModel {
1222                room_id: ActiveValue::set(room_id),
1223                user_id: ActiveValue::set(called_user_id),
1224                answering_connection_lost: ActiveValue::set(false),
1225                calling_user_id: ActiveValue::set(calling_user_id),
1226                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1227                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1228                    calling_connection.owner_id as i32,
1229                ))),
1230                initial_project_id: ActiveValue::set(initial_project_id),
1231                ..Default::default()
1232            }
1233            .insert(&*tx)
1234            .await?;
1235
1236            let room = self.get_room(room_id, &tx).await?;
1237            let incoming_call = Self::build_incoming_call(&room, called_user_id)
1238                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1239            Ok((room, incoming_call))
1240        })
1241        .await
1242    }
1243
1244    pub async fn call_failed(
1245        &self,
1246        room_id: RoomId,
1247        called_user_id: UserId,
1248    ) -> Result<RoomGuard<proto::Room>> {
1249        self.room_transaction(room_id, |tx| async move {
1250            room_participant::Entity::delete_many()
1251                .filter(
1252                    room_participant::Column::RoomId
1253                        .eq(room_id)
1254                        .and(room_participant::Column::UserId.eq(called_user_id)),
1255                )
1256                .exec(&*tx)
1257                .await?;
1258            let room = self.get_room(room_id, &tx).await?;
1259            Ok(room)
1260        })
1261        .await
1262    }
1263
1264    pub async fn decline_call(
1265        &self,
1266        expected_room_id: Option<RoomId>,
1267        user_id: UserId,
1268    ) -> Result<Option<RoomGuard<proto::Room>>> {
1269        self.optional_room_transaction(|tx| async move {
1270            let mut filter = Condition::all()
1271                .add(room_participant::Column::UserId.eq(user_id))
1272                .add(room_participant::Column::AnsweringConnectionId.is_null());
1273            if let Some(room_id) = expected_room_id {
1274                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1275            }
1276            let participant = room_participant::Entity::find()
1277                .filter(filter)
1278                .one(&*tx)
1279                .await?;
1280
1281            let participant = if let Some(participant) = participant {
1282                participant
1283            } else if expected_room_id.is_some() {
1284                return Err(anyhow!("could not find call to decline"))?;
1285            } else {
1286                return Ok(None);
1287            };
1288
1289            let room_id = participant.room_id;
1290            room_participant::Entity::delete(participant.into_active_model())
1291                .exec(&*tx)
1292                .await?;
1293
1294            let room = self.get_room(room_id, &tx).await?;
1295            Ok(Some((room_id, room)))
1296        })
1297        .await
1298    }
1299
1300    pub async fn cancel_call(
1301        &self,
1302        room_id: RoomId,
1303        calling_connection: ConnectionId,
1304        called_user_id: UserId,
1305    ) -> Result<RoomGuard<proto::Room>> {
1306        self.room_transaction(room_id, |tx| async move {
1307            let participant = room_participant::Entity::find()
1308                .filter(
1309                    Condition::all()
1310                        .add(room_participant::Column::UserId.eq(called_user_id))
1311                        .add(room_participant::Column::RoomId.eq(room_id))
1312                        .add(
1313                            room_participant::Column::CallingConnectionId
1314                                .eq(calling_connection.id as i32),
1315                        )
1316                        .add(
1317                            room_participant::Column::CallingConnectionServerId
1318                                .eq(calling_connection.owner_id as i32),
1319                        )
1320                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1321                )
1322                .one(&*tx)
1323                .await?
1324                .ok_or_else(|| anyhow!("no call to cancel"))?;
1325
1326            room_participant::Entity::delete(participant.into_active_model())
1327                .exec(&*tx)
1328                .await?;
1329
1330            let room = self.get_room(room_id, &tx).await?;
1331            Ok(room)
1332        })
1333        .await
1334    }
1335
1336    pub async fn join_room(
1337        &self,
1338        room_id: RoomId,
1339        user_id: UserId,
1340        connection: ConnectionId,
1341    ) -> Result<RoomGuard<proto::Room>> {
1342        self.room_transaction(room_id, |tx| async move {
1343            let result = room_participant::Entity::update_many()
1344                .filter(
1345                    Condition::all()
1346                        .add(room_participant::Column::RoomId.eq(room_id))
1347                        .add(room_participant::Column::UserId.eq(user_id))
1348                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1349                )
1350                .set(room_participant::ActiveModel {
1351                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1352                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1353                        connection.owner_id as i32,
1354                    ))),
1355                    answering_connection_lost: ActiveValue::set(false),
1356                    ..Default::default()
1357                })
1358                .exec(&*tx)
1359                .await?;
1360            if result.rows_affected == 0 {
1361                Err(anyhow!("room does not exist or was already joined"))?
1362            } else {
1363                let room = self.get_room(room_id, &tx).await?;
1364                Ok(room)
1365            }
1366        })
1367        .await
1368    }
1369
1370    pub async fn rejoin_room(
1371        &self,
1372        rejoin_room: proto::RejoinRoom,
1373        user_id: UserId,
1374        connection: ConnectionId,
1375    ) -> Result<RoomGuard<RejoinedRoom>> {
1376        let room_id = RoomId::from_proto(rejoin_room.id);
1377        self.room_transaction(room_id, |tx| async {
1378            let tx = tx;
1379            let participant_update = room_participant::Entity::update_many()
1380                .filter(
1381                    Condition::all()
1382                        .add(room_participant::Column::RoomId.eq(room_id))
1383                        .add(room_participant::Column::UserId.eq(user_id))
1384                        .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1385                        .add(
1386                            Condition::any()
1387                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1388                                .add(
1389                                    room_participant::Column::AnsweringConnectionServerId
1390                                        .ne(connection.owner_id as i32),
1391                                ),
1392                        ),
1393                )
1394                .set(room_participant::ActiveModel {
1395                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1396                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1397                        connection.owner_id as i32,
1398                    ))),
1399                    answering_connection_lost: ActiveValue::set(false),
1400                    ..Default::default()
1401                })
1402                .exec(&*tx)
1403                .await?;
1404            if participant_update.rows_affected == 0 {
1405                return Err(anyhow!("room does not exist or was already joined"))?;
1406            }
1407
1408            let mut reshared_projects = Vec::new();
1409            for reshared_project in &rejoin_room.reshared_projects {
1410                let project_id = ProjectId::from_proto(reshared_project.project_id);
1411                let project = project::Entity::find_by_id(project_id)
1412                    .one(&*tx)
1413                    .await?
1414                    .ok_or_else(|| anyhow!("project does not exist"))?;
1415                if project.host_user_id != user_id {
1416                    return Err(anyhow!("no such project"))?;
1417                }
1418
1419                let mut collaborators = project
1420                    .find_related(project_collaborator::Entity)
1421                    .all(&*tx)
1422                    .await?;
1423                let host_ix = collaborators
1424                    .iter()
1425                    .position(|collaborator| {
1426                        collaborator.user_id == user_id && collaborator.is_host
1427                    })
1428                    .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1429                let host = collaborators.swap_remove(host_ix);
1430                let old_connection_id = host.connection();
1431
1432                project::Entity::update(project::ActiveModel {
1433                    host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1434                    host_connection_server_id: ActiveValue::set(Some(ServerId(
1435                        connection.owner_id as i32,
1436                    ))),
1437                    ..project.into_active_model()
1438                })
1439                .exec(&*tx)
1440                .await?;
1441                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1442                    connection_id: ActiveValue::set(connection.id as i32),
1443                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1444                    ..host.into_active_model()
1445                })
1446                .exec(&*tx)
1447                .await?;
1448
1449                self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1450                    .await?;
1451
1452                reshared_projects.push(ResharedProject {
1453                    id: project_id,
1454                    old_connection_id,
1455                    collaborators: collaborators
1456                        .iter()
1457                        .map(|collaborator| ProjectCollaborator {
1458                            connection_id: collaborator.connection(),
1459                            user_id: collaborator.user_id,
1460                            replica_id: collaborator.replica_id,
1461                            is_host: collaborator.is_host,
1462                        })
1463                        .collect(),
1464                    worktrees: reshared_project.worktrees.clone(),
1465                });
1466            }
1467
1468            project::Entity::delete_many()
1469                .filter(
1470                    Condition::all()
1471                        .add(project::Column::RoomId.eq(room_id))
1472                        .add(project::Column::HostUserId.eq(user_id))
1473                        .add(
1474                            project::Column::Id
1475                                .is_not_in(reshared_projects.iter().map(|project| project.id)),
1476                        ),
1477                )
1478                .exec(&*tx)
1479                .await?;
1480
1481            let mut rejoined_projects = Vec::new();
1482            for rejoined_project in &rejoin_room.rejoined_projects {
1483                let project_id = ProjectId::from_proto(rejoined_project.id);
1484                let Some(project) = project::Entity::find_by_id(project_id)
1485                    .one(&*tx)
1486                    .await? else { continue };
1487
1488                let mut worktrees = Vec::new();
1489                let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1490                for db_worktree in db_worktrees {
1491                    let mut worktree = RejoinedWorktree {
1492                        id: db_worktree.id as u64,
1493                        abs_path: db_worktree.abs_path,
1494                        root_name: db_worktree.root_name,
1495                        visible: db_worktree.visible,
1496                        updated_entries: Default::default(),
1497                        removed_entries: Default::default(),
1498                        updated_repositories: Default::default(),
1499                        removed_repositories: Default::default(),
1500                        diagnostic_summaries: Default::default(),
1501                        settings_files: Default::default(),
1502                        scan_id: db_worktree.scan_id as u64,
1503                        completed_scan_id: db_worktree.completed_scan_id as u64,
1504                    };
1505
1506                    let rejoined_worktree = rejoined_project
1507                        .worktrees
1508                        .iter()
1509                        .find(|worktree| worktree.id == db_worktree.id as u64);
1510
1511                    // File entries
1512                    {
1513                        let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1514                            worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1515                        } else {
1516                            worktree_entry::Column::IsDeleted.eq(false)
1517                        };
1518
1519                        let mut db_entries = worktree_entry::Entity::find()
1520                            .filter(
1521                                Condition::all()
1522                                    .add(worktree_entry::Column::ProjectId.eq(project.id))
1523                                    .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1524                                    .add(entry_filter),
1525                            )
1526                            .stream(&*tx)
1527                            .await?;
1528
1529                        while let Some(db_entry) = db_entries.next().await {
1530                            let db_entry = db_entry?;
1531                            if db_entry.is_deleted {
1532                                worktree.removed_entries.push(db_entry.id as u64);
1533                            } else {
1534                                worktree.updated_entries.push(proto::Entry {
1535                                    id: db_entry.id as u64,
1536                                    is_dir: db_entry.is_dir,
1537                                    path: db_entry.path,
1538                                    inode: db_entry.inode as u64,
1539                                    mtime: Some(proto::Timestamp {
1540                                        seconds: db_entry.mtime_seconds as u64,
1541                                        nanos: db_entry.mtime_nanos as u32,
1542                                    }),
1543                                    is_symlink: db_entry.is_symlink,
1544                                    is_ignored: db_entry.is_ignored,
1545                                    is_external: db_entry.is_external,
1546                                    git_status: db_entry.git_status.map(|status| status as i32),
1547                                });
1548                            }
1549                        }
1550                    }
1551
1552                    // Repository Entries
1553                    {
1554                        let repository_entry_filter =
1555                            if let Some(rejoined_worktree) = rejoined_worktree {
1556                                worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
1557                            } else {
1558                                worktree_repository::Column::IsDeleted.eq(false)
1559                            };
1560
1561                        let mut db_repositories = worktree_repository::Entity::find()
1562                            .filter(
1563                                Condition::all()
1564                                    .add(worktree_repository::Column::ProjectId.eq(project.id))
1565                                    .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
1566                                    .add(repository_entry_filter),
1567                            )
1568                            .stream(&*tx)
1569                            .await?;
1570
1571                        while let Some(db_repository) = db_repositories.next().await {
1572                            let db_repository = db_repository?;
1573                            if db_repository.is_deleted {
1574                                worktree
1575                                    .removed_repositories
1576                                    .push(db_repository.work_directory_id as u64);
1577                            } else {
1578                                worktree.updated_repositories.push(proto::RepositoryEntry {
1579                                    work_directory_id: db_repository.work_directory_id as u64,
1580                                    branch: db_repository.branch,
1581                                });
1582                            }
1583                        }
1584                    }
1585
1586                    worktrees.push(worktree);
1587                }
1588
1589                let language_servers = project
1590                    .find_related(language_server::Entity)
1591                    .all(&*tx)
1592                    .await?
1593                    .into_iter()
1594                    .map(|language_server| proto::LanguageServer {
1595                        id: language_server.id as u64,
1596                        name: language_server.name,
1597                    })
1598                    .collect::<Vec<_>>();
1599
1600                {
1601                    let mut db_settings_files = worktree_settings_file::Entity::find()
1602                        .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
1603                        .stream(&*tx)
1604                        .await?;
1605                    while let Some(db_settings_file) = db_settings_files.next().await {
1606                        let db_settings_file = db_settings_file?;
1607                        if let Some(worktree) = worktrees
1608                            .iter_mut()
1609                            .find(|w| w.id == db_settings_file.worktree_id as u64)
1610                        {
1611                            worktree.settings_files.push(WorktreeSettingsFile {
1612                                path: db_settings_file.path,
1613                                content: db_settings_file.content,
1614                            });
1615                        }
1616                    }
1617                }
1618
1619                let mut collaborators = project
1620                    .find_related(project_collaborator::Entity)
1621                    .all(&*tx)
1622                    .await?;
1623                let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1624                    .iter()
1625                    .position(|collaborator| collaborator.user_id == user_id)
1626                {
1627                    collaborators.swap_remove(self_collaborator_ix)
1628                } else {
1629                    continue;
1630                };
1631                let old_connection_id = self_collaborator.connection();
1632                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1633                    connection_id: ActiveValue::set(connection.id as i32),
1634                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1635                    ..self_collaborator.into_active_model()
1636                })
1637                .exec(&*tx)
1638                .await?;
1639
1640                let collaborators = collaborators
1641                    .into_iter()
1642                    .map(|collaborator| ProjectCollaborator {
1643                        connection_id: collaborator.connection(),
1644                        user_id: collaborator.user_id,
1645                        replica_id: collaborator.replica_id,
1646                        is_host: collaborator.is_host,
1647                    })
1648                    .collect::<Vec<_>>();
1649
1650                rejoined_projects.push(RejoinedProject {
1651                    id: project_id,
1652                    old_connection_id,
1653                    collaborators,
1654                    worktrees,
1655                    language_servers,
1656                });
1657            }
1658
1659            let room = self.get_room(room_id, &tx).await?;
1660            Ok(RejoinedRoom {
1661                room,
1662                rejoined_projects,
1663                reshared_projects,
1664            })
1665        })
1666        .await
1667    }
1668
1669    pub async fn leave_room(
1670        &self,
1671        connection: ConnectionId,
1672    ) -> Result<Option<RoomGuard<LeftRoom>>> {
1673        self.optional_room_transaction(|tx| async move {
1674            let leaving_participant = room_participant::Entity::find()
1675                .filter(
1676                    Condition::all()
1677                        .add(
1678                            room_participant::Column::AnsweringConnectionId
1679                                .eq(connection.id as i32),
1680                        )
1681                        .add(
1682                            room_participant::Column::AnsweringConnectionServerId
1683                                .eq(connection.owner_id as i32),
1684                        ),
1685                )
1686                .one(&*tx)
1687                .await?;
1688
1689            if let Some(leaving_participant) = leaving_participant {
1690                // Leave room.
1691                let room_id = leaving_participant.room_id;
1692                room_participant::Entity::delete_by_id(leaving_participant.id)
1693                    .exec(&*tx)
1694                    .await?;
1695
1696                // Cancel pending calls initiated by the leaving user.
1697                let called_participants = room_participant::Entity::find()
1698                    .filter(
1699                        Condition::all()
1700                            .add(
1701                                room_participant::Column::CallingUserId
1702                                    .eq(leaving_participant.user_id),
1703                            )
1704                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
1705                    )
1706                    .all(&*tx)
1707                    .await?;
1708                room_participant::Entity::delete_many()
1709                    .filter(
1710                        room_participant::Column::Id
1711                            .is_in(called_participants.iter().map(|participant| participant.id)),
1712                    )
1713                    .exec(&*tx)
1714                    .await?;
1715                let canceled_calls_to_user_ids = called_participants
1716                    .into_iter()
1717                    .map(|participant| participant.user_id)
1718                    .collect();
1719
1720                // Detect left projects.
1721                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1722                enum QueryProjectIds {
1723                    ProjectId,
1724                }
1725                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1726                    .select_only()
1727                    .column_as(
1728                        project_collaborator::Column::ProjectId,
1729                        QueryProjectIds::ProjectId,
1730                    )
1731                    .filter(
1732                        Condition::all()
1733                            .add(
1734                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1735                            )
1736                            .add(
1737                                project_collaborator::Column::ConnectionServerId
1738                                    .eq(connection.owner_id as i32),
1739                            ),
1740                    )
1741                    .into_values::<_, QueryProjectIds>()
1742                    .all(&*tx)
1743                    .await?;
1744                let mut left_projects = HashMap::default();
1745                let mut collaborators = project_collaborator::Entity::find()
1746                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1747                    .stream(&*tx)
1748                    .await?;
1749                while let Some(collaborator) = collaborators.next().await {
1750                    let collaborator = collaborator?;
1751                    let left_project =
1752                        left_projects
1753                            .entry(collaborator.project_id)
1754                            .or_insert(LeftProject {
1755                                id: collaborator.project_id,
1756                                host_user_id: Default::default(),
1757                                connection_ids: Default::default(),
1758                                host_connection_id: Default::default(),
1759                            });
1760
1761                    let collaborator_connection_id = collaborator.connection();
1762                    if collaborator_connection_id != connection {
1763                        left_project.connection_ids.push(collaborator_connection_id);
1764                    }
1765
1766                    if collaborator.is_host {
1767                        left_project.host_user_id = collaborator.user_id;
1768                        left_project.host_connection_id = collaborator_connection_id;
1769                    }
1770                }
1771                drop(collaborators);
1772
1773                // Leave projects.
1774                project_collaborator::Entity::delete_many()
1775                    .filter(
1776                        Condition::all()
1777                            .add(
1778                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1779                            )
1780                            .add(
1781                                project_collaborator::Column::ConnectionServerId
1782                                    .eq(connection.owner_id as i32),
1783                            ),
1784                    )
1785                    .exec(&*tx)
1786                    .await?;
1787
1788                // Unshare projects.
1789                project::Entity::delete_many()
1790                    .filter(
1791                        Condition::all()
1792                            .add(project::Column::RoomId.eq(room_id))
1793                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
1794                            .add(
1795                                project::Column::HostConnectionServerId
1796                                    .eq(connection.owner_id as i32),
1797                            ),
1798                    )
1799                    .exec(&*tx)
1800                    .await?;
1801
1802                let room = self.get_room(room_id, &tx).await?;
1803                if room.participants.is_empty() {
1804                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
1805                }
1806
1807                let left_room = LeftRoom {
1808                    room,
1809                    left_projects,
1810                    canceled_calls_to_user_ids,
1811                };
1812
1813                if left_room.room.participants.is_empty() {
1814                    self.rooms.remove(&room_id);
1815                }
1816
1817                Ok(Some((room_id, left_room)))
1818            } else {
1819                Ok(None)
1820            }
1821        })
1822        .await
1823    }
1824
1825    pub async fn follow(
1826        &self,
1827        project_id: ProjectId,
1828        leader_connection: ConnectionId,
1829        follower_connection: ConnectionId,
1830    ) -> Result<RoomGuard<proto::Room>> {
1831        let room_id = self.room_id_for_project(project_id).await?;
1832        self.room_transaction(room_id, |tx| async move {
1833            follower::ActiveModel {
1834                room_id: ActiveValue::set(room_id),
1835                project_id: ActiveValue::set(project_id),
1836                leader_connection_server_id: ActiveValue::set(ServerId(
1837                    leader_connection.owner_id as i32,
1838                )),
1839                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1840                follower_connection_server_id: ActiveValue::set(ServerId(
1841                    follower_connection.owner_id as i32,
1842                )),
1843                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1844                ..Default::default()
1845            }
1846            .insert(&*tx)
1847            .await?;
1848
1849            let room = self.get_room(room_id, &*tx).await?;
1850            Ok(room)
1851        })
1852        .await
1853    }
1854
1855    pub async fn unfollow(
1856        &self,
1857        project_id: ProjectId,
1858        leader_connection: ConnectionId,
1859        follower_connection: ConnectionId,
1860    ) -> Result<RoomGuard<proto::Room>> {
1861        let room_id = self.room_id_for_project(project_id).await?;
1862        self.room_transaction(room_id, |tx| async move {
1863            follower::Entity::delete_many()
1864                .filter(
1865                    Condition::all()
1866                        .add(follower::Column::ProjectId.eq(project_id))
1867                        .add(
1868                            follower::Column::LeaderConnectionServerId
1869                                .eq(leader_connection.owner_id),
1870                        )
1871                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1872                        .add(
1873                            follower::Column::FollowerConnectionServerId
1874                                .eq(follower_connection.owner_id),
1875                        )
1876                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1877                )
1878                .exec(&*tx)
1879                .await?;
1880
1881            let room = self.get_room(room_id, &*tx).await?;
1882            Ok(room)
1883        })
1884        .await
1885    }
1886
1887    pub async fn update_room_participant_location(
1888        &self,
1889        room_id: RoomId,
1890        connection: ConnectionId,
1891        location: proto::ParticipantLocation,
1892    ) -> Result<RoomGuard<proto::Room>> {
1893        self.room_transaction(room_id, |tx| async {
1894            let tx = tx;
1895            let location_kind;
1896            let location_project_id;
1897            match location
1898                .variant
1899                .as_ref()
1900                .ok_or_else(|| anyhow!("invalid location"))?
1901            {
1902                proto::participant_location::Variant::SharedProject(project) => {
1903                    location_kind = 0;
1904                    location_project_id = Some(ProjectId::from_proto(project.id));
1905                }
1906                proto::participant_location::Variant::UnsharedProject(_) => {
1907                    location_kind = 1;
1908                    location_project_id = None;
1909                }
1910                proto::participant_location::Variant::External(_) => {
1911                    location_kind = 2;
1912                    location_project_id = None;
1913                }
1914            }
1915
1916            let result = room_participant::Entity::update_many()
1917                .filter(
1918                    Condition::all()
1919                        .add(room_participant::Column::RoomId.eq(room_id))
1920                        .add(
1921                            room_participant::Column::AnsweringConnectionId
1922                                .eq(connection.id as i32),
1923                        )
1924                        .add(
1925                            room_participant::Column::AnsweringConnectionServerId
1926                                .eq(connection.owner_id as i32),
1927                        ),
1928                )
1929                .set(room_participant::ActiveModel {
1930                    location_kind: ActiveValue::set(Some(location_kind)),
1931                    location_project_id: ActiveValue::set(location_project_id),
1932                    ..Default::default()
1933                })
1934                .exec(&*tx)
1935                .await?;
1936
1937            if result.rows_affected == 1 {
1938                let room = self.get_room(room_id, &tx).await?;
1939                Ok(room)
1940            } else {
1941                Err(anyhow!("could not update room participant location"))?
1942            }
1943        })
1944        .await
1945    }
1946
1947    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1948        self.transaction(|tx| async move {
1949            let participant = room_participant::Entity::find()
1950                .filter(
1951                    Condition::all()
1952                        .add(
1953                            room_participant::Column::AnsweringConnectionId
1954                                .eq(connection.id as i32),
1955                        )
1956                        .add(
1957                            room_participant::Column::AnsweringConnectionServerId
1958                                .eq(connection.owner_id as i32),
1959                        ),
1960                )
1961                .one(&*tx)
1962                .await?
1963                .ok_or_else(|| anyhow!("not a participant in any room"))?;
1964
1965            room_participant::Entity::update(room_participant::ActiveModel {
1966                answering_connection_lost: ActiveValue::set(true),
1967                ..participant.into_active_model()
1968            })
1969            .exec(&*tx)
1970            .await?;
1971
1972            Ok(())
1973        })
1974        .await
1975    }
1976
1977    fn build_incoming_call(
1978        room: &proto::Room,
1979        called_user_id: UserId,
1980    ) -> Option<proto::IncomingCall> {
1981        let pending_participant = room
1982            .pending_participants
1983            .iter()
1984            .find(|participant| participant.user_id == called_user_id.to_proto())?;
1985
1986        Some(proto::IncomingCall {
1987            room_id: room.id,
1988            calling_user_id: pending_participant.calling_user_id,
1989            participant_user_ids: room
1990                .participants
1991                .iter()
1992                .map(|participant| participant.user_id)
1993                .collect(),
1994            initial_project: room.participants.iter().find_map(|participant| {
1995                let initial_project_id = pending_participant.initial_project_id?;
1996                participant
1997                    .projects
1998                    .iter()
1999                    .find(|project| project.id == initial_project_id)
2000                    .cloned()
2001            }),
2002        })
2003    }
2004
2005    async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
2006        let db_room = room::Entity::find_by_id(room_id)
2007            .one(tx)
2008            .await?
2009            .ok_or_else(|| anyhow!("could not find room"))?;
2010
2011        let mut db_participants = db_room
2012            .find_related(room_participant::Entity)
2013            .stream(tx)
2014            .await?;
2015        let mut participants = HashMap::default();
2016        let mut pending_participants = Vec::new();
2017        while let Some(db_participant) = db_participants.next().await {
2018            let db_participant = db_participant?;
2019            if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
2020                .answering_connection_id
2021                .zip(db_participant.answering_connection_server_id)
2022            {
2023                let location = match (
2024                    db_participant.location_kind,
2025                    db_participant.location_project_id,
2026                ) {
2027                    (Some(0), Some(project_id)) => {
2028                        Some(proto::participant_location::Variant::SharedProject(
2029                            proto::participant_location::SharedProject {
2030                                id: project_id.to_proto(),
2031                            },
2032                        ))
2033                    }
2034                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
2035                        Default::default(),
2036                    )),
2037                    _ => Some(proto::participant_location::Variant::External(
2038                        Default::default(),
2039                    )),
2040                };
2041
2042                let answering_connection = ConnectionId {
2043                    owner_id: answering_connection_server_id.0 as u32,
2044                    id: answering_connection_id as u32,
2045                };
2046                participants.insert(
2047                    answering_connection,
2048                    proto::Participant {
2049                        user_id: db_participant.user_id.to_proto(),
2050                        peer_id: Some(answering_connection.into()),
2051                        projects: Default::default(),
2052                        location: Some(proto::ParticipantLocation { variant: location }),
2053                    },
2054                );
2055            } else {
2056                pending_participants.push(proto::PendingParticipant {
2057                    user_id: db_participant.user_id.to_proto(),
2058                    calling_user_id: db_participant.calling_user_id.to_proto(),
2059                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
2060                });
2061            }
2062        }
2063        drop(db_participants);
2064
2065        let mut db_projects = db_room
2066            .find_related(project::Entity)
2067            .find_with_related(worktree::Entity)
2068            .stream(tx)
2069            .await?;
2070
2071        while let Some(row) = db_projects.next().await {
2072            let (db_project, db_worktree) = row?;
2073            let host_connection = db_project.host_connection()?;
2074            if let Some(participant) = participants.get_mut(&host_connection) {
2075                let project = if let Some(project) = participant
2076                    .projects
2077                    .iter_mut()
2078                    .find(|project| project.id == db_project.id.to_proto())
2079                {
2080                    project
2081                } else {
2082                    participant.projects.push(proto::ParticipantProject {
2083                        id: db_project.id.to_proto(),
2084                        worktree_root_names: Default::default(),
2085                    });
2086                    participant.projects.last_mut().unwrap()
2087                };
2088
2089                if let Some(db_worktree) = db_worktree {
2090                    if db_worktree.visible {
2091                        project.worktree_root_names.push(db_worktree.root_name);
2092                    }
2093                }
2094            }
2095        }
2096        drop(db_projects);
2097
2098        let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
2099        let mut followers = Vec::new();
2100        while let Some(db_follower) = db_followers.next().await {
2101            let db_follower = db_follower?;
2102            followers.push(proto::Follower {
2103                leader_id: Some(db_follower.leader_connection().into()),
2104                follower_id: Some(db_follower.follower_connection().into()),
2105                project_id: db_follower.project_id.to_proto(),
2106            });
2107        }
2108
2109        Ok(proto::Room {
2110            id: db_room.id.to_proto(),
2111            live_kit_room: db_room.live_kit_room,
2112            participants: participants.into_values().collect(),
2113            pending_participants,
2114            followers,
2115        })
2116    }
2117
2118    // projects
2119
2120    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
2121        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2122        enum QueryAs {
2123            Count,
2124        }
2125
2126        self.transaction(|tx| async move {
2127            Ok(project::Entity::find()
2128                .select_only()
2129                .column_as(project::Column::Id.count(), QueryAs::Count)
2130                .inner_join(user::Entity)
2131                .filter(user::Column::Admin.eq(false))
2132                .into_values::<_, QueryAs>()
2133                .one(&*tx)
2134                .await?
2135                .unwrap_or(0i64) as usize)
2136        })
2137        .await
2138    }
2139
2140    pub async fn share_project(
2141        &self,
2142        room_id: RoomId,
2143        connection: ConnectionId,
2144        worktrees: &[proto::WorktreeMetadata],
2145    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
2146        self.room_transaction(room_id, |tx| async move {
2147            let participant = room_participant::Entity::find()
2148                .filter(
2149                    Condition::all()
2150                        .add(
2151                            room_participant::Column::AnsweringConnectionId
2152                                .eq(connection.id as i32),
2153                        )
2154                        .add(
2155                            room_participant::Column::AnsweringConnectionServerId
2156                                .eq(connection.owner_id as i32),
2157                        ),
2158                )
2159                .one(&*tx)
2160                .await?
2161                .ok_or_else(|| anyhow!("could not find participant"))?;
2162            if participant.room_id != room_id {
2163                return Err(anyhow!("shared project on unexpected room"))?;
2164            }
2165
2166            let project = project::ActiveModel {
2167                room_id: ActiveValue::set(participant.room_id),
2168                host_user_id: ActiveValue::set(participant.user_id),
2169                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
2170                host_connection_server_id: ActiveValue::set(Some(ServerId(
2171                    connection.owner_id as i32,
2172                ))),
2173                ..Default::default()
2174            }
2175            .insert(&*tx)
2176            .await?;
2177
2178            if !worktrees.is_empty() {
2179                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2180                    worktree::ActiveModel {
2181                        id: ActiveValue::set(worktree.id as i64),
2182                        project_id: ActiveValue::set(project.id),
2183                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
2184                        root_name: ActiveValue::set(worktree.root_name.clone()),
2185                        visible: ActiveValue::set(worktree.visible),
2186                        scan_id: ActiveValue::set(0),
2187                        completed_scan_id: ActiveValue::set(0),
2188                    }
2189                }))
2190                .exec(&*tx)
2191                .await?;
2192            }
2193
2194            project_collaborator::ActiveModel {
2195                project_id: ActiveValue::set(project.id),
2196                connection_id: ActiveValue::set(connection.id as i32),
2197                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2198                user_id: ActiveValue::set(participant.user_id),
2199                replica_id: ActiveValue::set(ReplicaId(0)),
2200                is_host: ActiveValue::set(true),
2201                ..Default::default()
2202            }
2203            .insert(&*tx)
2204            .await?;
2205
2206            let room = self.get_room(room_id, &tx).await?;
2207            Ok((project.id, room))
2208        })
2209        .await
2210    }
2211
2212    pub async fn unshare_project(
2213        &self,
2214        project_id: ProjectId,
2215        connection: ConnectionId,
2216    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2217        let room_id = self.room_id_for_project(project_id).await?;
2218        self.room_transaction(room_id, |tx| async move {
2219            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2220
2221            let project = project::Entity::find_by_id(project_id)
2222                .one(&*tx)
2223                .await?
2224                .ok_or_else(|| anyhow!("project not found"))?;
2225            if project.host_connection()? == connection {
2226                project::Entity::delete(project.into_active_model())
2227                    .exec(&*tx)
2228                    .await?;
2229                let room = self.get_room(room_id, &tx).await?;
2230                Ok((room, guest_connection_ids))
2231            } else {
2232                Err(anyhow!("cannot unshare a project hosted by another user"))?
2233            }
2234        })
2235        .await
2236    }
2237
2238    pub async fn update_project(
2239        &self,
2240        project_id: ProjectId,
2241        connection: ConnectionId,
2242        worktrees: &[proto::WorktreeMetadata],
2243    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2244        let room_id = self.room_id_for_project(project_id).await?;
2245        self.room_transaction(room_id, |tx| async move {
2246            let project = project::Entity::find_by_id(project_id)
2247                .filter(
2248                    Condition::all()
2249                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2250                        .add(
2251                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2252                        ),
2253                )
2254                .one(&*tx)
2255                .await?
2256                .ok_or_else(|| anyhow!("no such project"))?;
2257
2258            self.update_project_worktrees(project.id, worktrees, &tx)
2259                .await?;
2260
2261            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2262            let room = self.get_room(project.room_id, &tx).await?;
2263            Ok((room, guest_connection_ids))
2264        })
2265        .await
2266    }
2267
2268    async fn update_project_worktrees(
2269        &self,
2270        project_id: ProjectId,
2271        worktrees: &[proto::WorktreeMetadata],
2272        tx: &DatabaseTransaction,
2273    ) -> Result<()> {
2274        if !worktrees.is_empty() {
2275            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2276                id: ActiveValue::set(worktree.id as i64),
2277                project_id: ActiveValue::set(project_id),
2278                abs_path: ActiveValue::set(worktree.abs_path.clone()),
2279                root_name: ActiveValue::set(worktree.root_name.clone()),
2280                visible: ActiveValue::set(worktree.visible),
2281                scan_id: ActiveValue::set(0),
2282                completed_scan_id: ActiveValue::set(0),
2283            }))
2284            .on_conflict(
2285                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2286                    .update_column(worktree::Column::RootName)
2287                    .to_owned(),
2288            )
2289            .exec(&*tx)
2290            .await?;
2291        }
2292
2293        worktree::Entity::delete_many()
2294            .filter(worktree::Column::ProjectId.eq(project_id).and(
2295                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2296            ))
2297            .exec(&*tx)
2298            .await?;
2299
2300        Ok(())
2301    }
2302
2303    pub async fn update_worktree(
2304        &self,
2305        update: &proto::UpdateWorktree,
2306        connection: ConnectionId,
2307    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2308        let project_id = ProjectId::from_proto(update.project_id);
2309        let worktree_id = update.worktree_id as i64;
2310        let room_id = self.room_id_for_project(project_id).await?;
2311        self.room_transaction(room_id, |tx| async move {
2312            // Ensure the update comes from the host.
2313            let _project = project::Entity::find_by_id(project_id)
2314                .filter(
2315                    Condition::all()
2316                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2317                        .add(
2318                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2319                        ),
2320                )
2321                .one(&*tx)
2322                .await?
2323                .ok_or_else(|| anyhow!("no such project"))?;
2324
2325            // Update metadata.
2326            worktree::Entity::update(worktree::ActiveModel {
2327                id: ActiveValue::set(worktree_id),
2328                project_id: ActiveValue::set(project_id),
2329                root_name: ActiveValue::set(update.root_name.clone()),
2330                scan_id: ActiveValue::set(update.scan_id as i64),
2331                completed_scan_id: if update.is_last_update {
2332                    ActiveValue::set(update.scan_id as i64)
2333                } else {
2334                    ActiveValue::default()
2335                },
2336                abs_path: ActiveValue::set(update.abs_path.clone()),
2337                ..Default::default()
2338            })
2339            .exec(&*tx)
2340            .await?;
2341
2342            if !update.updated_entries.is_empty() {
2343                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2344                    let mtime = entry.mtime.clone().unwrap_or_default();
2345                    worktree_entry::ActiveModel {
2346                        project_id: ActiveValue::set(project_id),
2347                        worktree_id: ActiveValue::set(worktree_id),
2348                        id: ActiveValue::set(entry.id as i64),
2349                        is_dir: ActiveValue::set(entry.is_dir),
2350                        path: ActiveValue::set(entry.path.clone()),
2351                        inode: ActiveValue::set(entry.inode as i64),
2352                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2353                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2354                        is_symlink: ActiveValue::set(entry.is_symlink),
2355                        is_ignored: ActiveValue::set(entry.is_ignored),
2356                        is_external: ActiveValue::set(entry.is_external),
2357                        git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
2358                        is_deleted: ActiveValue::set(false),
2359                        scan_id: ActiveValue::set(update.scan_id as i64),
2360                    }
2361                }))
2362                .on_conflict(
2363                    OnConflict::columns([
2364                        worktree_entry::Column::ProjectId,
2365                        worktree_entry::Column::WorktreeId,
2366                        worktree_entry::Column::Id,
2367                    ])
2368                    .update_columns([
2369                        worktree_entry::Column::IsDir,
2370                        worktree_entry::Column::Path,
2371                        worktree_entry::Column::Inode,
2372                        worktree_entry::Column::MtimeSeconds,
2373                        worktree_entry::Column::MtimeNanos,
2374                        worktree_entry::Column::IsSymlink,
2375                        worktree_entry::Column::IsIgnored,
2376                        worktree_entry::Column::GitStatus,
2377                        worktree_entry::Column::ScanId,
2378                    ])
2379                    .to_owned(),
2380                )
2381                .exec(&*tx)
2382                .await?;
2383            }
2384
2385            if !update.removed_entries.is_empty() {
2386                worktree_entry::Entity::update_many()
2387                    .filter(
2388                        worktree_entry::Column::ProjectId
2389                            .eq(project_id)
2390                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2391                            .and(
2392                                worktree_entry::Column::Id
2393                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2394                            ),
2395                    )
2396                    .set(worktree_entry::ActiveModel {
2397                        is_deleted: ActiveValue::Set(true),
2398                        scan_id: ActiveValue::Set(update.scan_id as i64),
2399                        ..Default::default()
2400                    })
2401                    .exec(&*tx)
2402                    .await?;
2403            }
2404
2405            if !update.updated_repositories.is_empty() {
2406                worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
2407                    |repository| worktree_repository::ActiveModel {
2408                        project_id: ActiveValue::set(project_id),
2409                        worktree_id: ActiveValue::set(worktree_id),
2410                        work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
2411                        scan_id: ActiveValue::set(update.scan_id as i64),
2412                        branch: ActiveValue::set(repository.branch.clone()),
2413                        is_deleted: ActiveValue::set(false),
2414                    },
2415                ))
2416                .on_conflict(
2417                    OnConflict::columns([
2418                        worktree_repository::Column::ProjectId,
2419                        worktree_repository::Column::WorktreeId,
2420                        worktree_repository::Column::WorkDirectoryId,
2421                    ])
2422                    .update_columns([
2423                        worktree_repository::Column::ScanId,
2424                        worktree_repository::Column::Branch,
2425                    ])
2426                    .to_owned(),
2427                )
2428                .exec(&*tx)
2429                .await?;
2430            }
2431
2432            if !update.removed_repositories.is_empty() {
2433                worktree_repository::Entity::update_many()
2434                    .filter(
2435                        worktree_repository::Column::ProjectId
2436                            .eq(project_id)
2437                            .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
2438                            .and(
2439                                worktree_repository::Column::WorkDirectoryId
2440                                    .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
2441                            ),
2442                    )
2443                    .set(worktree_repository::ActiveModel {
2444                        is_deleted: ActiveValue::Set(true),
2445                        scan_id: ActiveValue::Set(update.scan_id as i64),
2446                        ..Default::default()
2447                    })
2448                    .exec(&*tx)
2449                    .await?;
2450            }
2451
2452            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2453            Ok(connection_ids)
2454        })
2455        .await
2456    }
2457
2458    pub async fn update_diagnostic_summary(
2459        &self,
2460        update: &proto::UpdateDiagnosticSummary,
2461        connection: ConnectionId,
2462    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2463        let project_id = ProjectId::from_proto(update.project_id);
2464        let worktree_id = update.worktree_id as i64;
2465        let room_id = self.room_id_for_project(project_id).await?;
2466        self.room_transaction(room_id, |tx| async move {
2467            let summary = update
2468                .summary
2469                .as_ref()
2470                .ok_or_else(|| anyhow!("invalid summary"))?;
2471
2472            // Ensure the update comes from the host.
2473            let project = project::Entity::find_by_id(project_id)
2474                .one(&*tx)
2475                .await?
2476                .ok_or_else(|| anyhow!("no such project"))?;
2477            if project.host_connection()? != connection {
2478                return Err(anyhow!("can't update a project hosted by someone else"))?;
2479            }
2480
2481            // Update summary.
2482            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2483                project_id: ActiveValue::set(project_id),
2484                worktree_id: ActiveValue::set(worktree_id),
2485                path: ActiveValue::set(summary.path.clone()),
2486                language_server_id: ActiveValue::set(summary.language_server_id as i64),
2487                error_count: ActiveValue::set(summary.error_count as i32),
2488                warning_count: ActiveValue::set(summary.warning_count as i32),
2489                ..Default::default()
2490            })
2491            .on_conflict(
2492                OnConflict::columns([
2493                    worktree_diagnostic_summary::Column::ProjectId,
2494                    worktree_diagnostic_summary::Column::WorktreeId,
2495                    worktree_diagnostic_summary::Column::Path,
2496                ])
2497                .update_columns([
2498                    worktree_diagnostic_summary::Column::LanguageServerId,
2499                    worktree_diagnostic_summary::Column::ErrorCount,
2500                    worktree_diagnostic_summary::Column::WarningCount,
2501                ])
2502                .to_owned(),
2503            )
2504            .exec(&*tx)
2505            .await?;
2506
2507            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2508            Ok(connection_ids)
2509        })
2510        .await
2511    }
2512
2513    pub async fn start_language_server(
2514        &self,
2515        update: &proto::StartLanguageServer,
2516        connection: ConnectionId,
2517    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2518        let project_id = ProjectId::from_proto(update.project_id);
2519        let room_id = self.room_id_for_project(project_id).await?;
2520        self.room_transaction(room_id, |tx| async move {
2521            let server = update
2522                .server
2523                .as_ref()
2524                .ok_or_else(|| anyhow!("invalid language server"))?;
2525
2526            // Ensure the update comes from the host.
2527            let project = project::Entity::find_by_id(project_id)
2528                .one(&*tx)
2529                .await?
2530                .ok_or_else(|| anyhow!("no such project"))?;
2531            if project.host_connection()? != connection {
2532                return Err(anyhow!("can't update a project hosted by someone else"))?;
2533            }
2534
2535            // Add the newly-started language server.
2536            language_server::Entity::insert(language_server::ActiveModel {
2537                project_id: ActiveValue::set(project_id),
2538                id: ActiveValue::set(server.id as i64),
2539                name: ActiveValue::set(server.name.clone()),
2540                ..Default::default()
2541            })
2542            .on_conflict(
2543                OnConflict::columns([
2544                    language_server::Column::ProjectId,
2545                    language_server::Column::Id,
2546                ])
2547                .update_column(language_server::Column::Name)
2548                .to_owned(),
2549            )
2550            .exec(&*tx)
2551            .await?;
2552
2553            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2554            Ok(connection_ids)
2555        })
2556        .await
2557    }
2558
2559    pub async fn update_worktree_settings(
2560        &self,
2561        update: &proto::UpdateWorktreeSettings,
2562        connection: ConnectionId,
2563    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2564        let project_id = ProjectId::from_proto(update.project_id);
2565        let room_id = self.room_id_for_project(project_id).await?;
2566        self.room_transaction(room_id, |tx| async move {
2567            // Ensure the update comes from the host.
2568            let project = project::Entity::find_by_id(project_id)
2569                .one(&*tx)
2570                .await?
2571                .ok_or_else(|| anyhow!("no such project"))?;
2572            if project.host_connection()? != connection {
2573                return Err(anyhow!("can't update a project hosted by someone else"))?;
2574            }
2575
2576            if let Some(content) = &update.content {
2577                worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
2578                    project_id: ActiveValue::Set(project_id),
2579                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
2580                    path: ActiveValue::Set(update.path.clone()),
2581                    content: ActiveValue::Set(content.clone()),
2582                })
2583                .on_conflict(
2584                    OnConflict::columns([
2585                        worktree_settings_file::Column::ProjectId,
2586                        worktree_settings_file::Column::WorktreeId,
2587                        worktree_settings_file::Column::Path,
2588                    ])
2589                    .update_column(worktree_settings_file::Column::Content)
2590                    .to_owned(),
2591                )
2592                .exec(&*tx)
2593                .await?;
2594            } else {
2595                worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
2596                    project_id: ActiveValue::Set(project_id),
2597                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
2598                    path: ActiveValue::Set(update.path.clone()),
2599                    ..Default::default()
2600                })
2601                .exec(&*tx)
2602                .await?;
2603            }
2604
2605            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2606            Ok(connection_ids)
2607        })
2608        .await
2609    }
2610
2611    pub async fn join_project(
2612        &self,
2613        project_id: ProjectId,
2614        connection: ConnectionId,
2615    ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2616        let room_id = self.room_id_for_project(project_id).await?;
2617        self.room_transaction(room_id, |tx| async move {
2618            let participant = room_participant::Entity::find()
2619                .filter(
2620                    Condition::all()
2621                        .add(
2622                            room_participant::Column::AnsweringConnectionId
2623                                .eq(connection.id as i32),
2624                        )
2625                        .add(
2626                            room_participant::Column::AnsweringConnectionServerId
2627                                .eq(connection.owner_id as i32),
2628                        ),
2629                )
2630                .one(&*tx)
2631                .await?
2632                .ok_or_else(|| anyhow!("must join a room first"))?;
2633
2634            let project = project::Entity::find_by_id(project_id)
2635                .one(&*tx)
2636                .await?
2637                .ok_or_else(|| anyhow!("no such project"))?;
2638            if project.room_id != participant.room_id {
2639                return Err(anyhow!("no such project"))?;
2640            }
2641
2642            let mut collaborators = project
2643                .find_related(project_collaborator::Entity)
2644                .all(&*tx)
2645                .await?;
2646            let replica_ids = collaborators
2647                .iter()
2648                .map(|c| c.replica_id)
2649                .collect::<HashSet<_>>();
2650            let mut replica_id = ReplicaId(1);
2651            while replica_ids.contains(&replica_id) {
2652                replica_id.0 += 1;
2653            }
2654            let new_collaborator = project_collaborator::ActiveModel {
2655                project_id: ActiveValue::set(project_id),
2656                connection_id: ActiveValue::set(connection.id as i32),
2657                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2658                user_id: ActiveValue::set(participant.user_id),
2659                replica_id: ActiveValue::set(replica_id),
2660                is_host: ActiveValue::set(false),
2661                ..Default::default()
2662            }
2663            .insert(&*tx)
2664            .await?;
2665            collaborators.push(new_collaborator);
2666
2667            let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2668            let mut worktrees = db_worktrees
2669                .into_iter()
2670                .map(|db_worktree| {
2671                    (
2672                        db_worktree.id as u64,
2673                        Worktree {
2674                            id: db_worktree.id as u64,
2675                            abs_path: db_worktree.abs_path,
2676                            root_name: db_worktree.root_name,
2677                            visible: db_worktree.visible,
2678                            entries: Default::default(),
2679                            repository_entries: Default::default(),
2680                            diagnostic_summaries: Default::default(),
2681                            settings_files: Default::default(),
2682                            scan_id: db_worktree.scan_id as u64,
2683                            completed_scan_id: db_worktree.completed_scan_id as u64,
2684                        },
2685                    )
2686                })
2687                .collect::<BTreeMap<_, _>>();
2688
2689            // Populate worktree entries.
2690            {
2691                let mut db_entries = worktree_entry::Entity::find()
2692                    .filter(
2693                        Condition::all()
2694                            .add(worktree_entry::Column::ProjectId.eq(project_id))
2695                            .add(worktree_entry::Column::IsDeleted.eq(false)),
2696                    )
2697                    .stream(&*tx)
2698                    .await?;
2699                while let Some(db_entry) = db_entries.next().await {
2700                    let db_entry = db_entry?;
2701                    if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2702                        worktree.entries.push(proto::Entry {
2703                            id: db_entry.id as u64,
2704                            is_dir: db_entry.is_dir,
2705                            path: db_entry.path,
2706                            inode: db_entry.inode as u64,
2707                            mtime: Some(proto::Timestamp {
2708                                seconds: db_entry.mtime_seconds as u64,
2709                                nanos: db_entry.mtime_nanos as u32,
2710                            }),
2711                            is_symlink: db_entry.is_symlink,
2712                            is_ignored: db_entry.is_ignored,
2713                            is_external: db_entry.is_external,
2714                            git_status: db_entry.git_status.map(|status| status as i32),
2715                        });
2716                    }
2717                }
2718            }
2719
2720            // Populate repository entries.
2721            {
2722                let mut db_repository_entries = worktree_repository::Entity::find()
2723                    .filter(
2724                        Condition::all()
2725                            .add(worktree_repository::Column::ProjectId.eq(project_id))
2726                            .add(worktree_repository::Column::IsDeleted.eq(false)),
2727                    )
2728                    .stream(&*tx)
2729                    .await?;
2730                while let Some(db_repository_entry) = db_repository_entries.next().await {
2731                    let db_repository_entry = db_repository_entry?;
2732                    if let Some(worktree) =
2733                        worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
2734                    {
2735                        worktree.repository_entries.insert(
2736                            db_repository_entry.work_directory_id as u64,
2737                            proto::RepositoryEntry {
2738                                work_directory_id: db_repository_entry.work_directory_id as u64,
2739                                branch: db_repository_entry.branch,
2740                            },
2741                        );
2742                    }
2743                }
2744            }
2745
2746            // Populate worktree diagnostic summaries.
2747            {
2748                let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2749                    .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2750                    .stream(&*tx)
2751                    .await?;
2752                while let Some(db_summary) = db_summaries.next().await {
2753                    let db_summary = db_summary?;
2754                    if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2755                        worktree
2756                            .diagnostic_summaries
2757                            .push(proto::DiagnosticSummary {
2758                                path: db_summary.path,
2759                                language_server_id: db_summary.language_server_id as u64,
2760                                error_count: db_summary.error_count as u32,
2761                                warning_count: db_summary.warning_count as u32,
2762                            });
2763                    }
2764                }
2765            }
2766
2767            // Populate worktree settings files
2768            {
2769                let mut db_settings_files = worktree_settings_file::Entity::find()
2770                    .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
2771                    .stream(&*tx)
2772                    .await?;
2773                while let Some(db_settings_file) = db_settings_files.next().await {
2774                    let db_settings_file = db_settings_file?;
2775                    if let Some(worktree) =
2776                        worktrees.get_mut(&(db_settings_file.worktree_id as u64))
2777                    {
2778                        worktree.settings_files.push(WorktreeSettingsFile {
2779                            path: db_settings_file.path,
2780                            content: db_settings_file.content,
2781                        });
2782                    }
2783                }
2784            }
2785
2786            // Populate language servers.
2787            let language_servers = project
2788                .find_related(language_server::Entity)
2789                .all(&*tx)
2790                .await?;
2791
2792            let project = Project {
2793                collaborators: collaborators
2794                    .into_iter()
2795                    .map(|collaborator| ProjectCollaborator {
2796                        connection_id: collaborator.connection(),
2797                        user_id: collaborator.user_id,
2798                        replica_id: collaborator.replica_id,
2799                        is_host: collaborator.is_host,
2800                    })
2801                    .collect(),
2802                worktrees,
2803                language_servers: language_servers
2804                    .into_iter()
2805                    .map(|language_server| proto::LanguageServer {
2806                        id: language_server.id as u64,
2807                        name: language_server.name,
2808                    })
2809                    .collect(),
2810            };
2811            Ok((project, replica_id as ReplicaId))
2812        })
2813        .await
2814    }
2815
2816    pub async fn leave_project(
2817        &self,
2818        project_id: ProjectId,
2819        connection: ConnectionId,
2820    ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
2821        let room_id = self.room_id_for_project(project_id).await?;
2822        self.room_transaction(room_id, |tx| async move {
2823            let result = project_collaborator::Entity::delete_many()
2824                .filter(
2825                    Condition::all()
2826                        .add(project_collaborator::Column::ProjectId.eq(project_id))
2827                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2828                        .add(
2829                            project_collaborator::Column::ConnectionServerId
2830                                .eq(connection.owner_id as i32),
2831                        ),
2832                )
2833                .exec(&*tx)
2834                .await?;
2835            if result.rows_affected == 0 {
2836                Err(anyhow!("not a collaborator on this project"))?;
2837            }
2838
2839            let project = project::Entity::find_by_id(project_id)
2840                .one(&*tx)
2841                .await?
2842                .ok_or_else(|| anyhow!("no such project"))?;
2843            let collaborators = project
2844                .find_related(project_collaborator::Entity)
2845                .all(&*tx)
2846                .await?;
2847            let connection_ids = collaborators
2848                .into_iter()
2849                .map(|collaborator| collaborator.connection())
2850                .collect();
2851
2852            follower::Entity::delete_many()
2853                .filter(
2854                    Condition::any()
2855                        .add(
2856                            Condition::all()
2857                                .add(follower::Column::ProjectId.eq(project_id))
2858                                .add(
2859                                    follower::Column::LeaderConnectionServerId
2860                                        .eq(connection.owner_id),
2861                                )
2862                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
2863                        )
2864                        .add(
2865                            Condition::all()
2866                                .add(follower::Column::ProjectId.eq(project_id))
2867                                .add(
2868                                    follower::Column::FollowerConnectionServerId
2869                                        .eq(connection.owner_id),
2870                                )
2871                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
2872                        ),
2873                )
2874                .exec(&*tx)
2875                .await?;
2876
2877            let room = self.get_room(project.room_id, &tx).await?;
2878            let left_project = LeftProject {
2879                id: project_id,
2880                host_user_id: project.host_user_id,
2881                host_connection_id: project.host_connection()?,
2882                connection_ids,
2883            };
2884            Ok((room, left_project))
2885        })
2886        .await
2887    }
2888
2889    pub async fn project_collaborators(
2890        &self,
2891        project_id: ProjectId,
2892        connection_id: ConnectionId,
2893    ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
2894        let room_id = self.room_id_for_project(project_id).await?;
2895        self.room_transaction(room_id, |tx| async move {
2896            let collaborators = project_collaborator::Entity::find()
2897                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2898                .all(&*tx)
2899                .await?
2900                .into_iter()
2901                .map(|collaborator| ProjectCollaborator {
2902                    connection_id: collaborator.connection(),
2903                    user_id: collaborator.user_id,
2904                    replica_id: collaborator.replica_id,
2905                    is_host: collaborator.is_host,
2906                })
2907                .collect::<Vec<_>>();
2908
2909            if collaborators
2910                .iter()
2911                .any(|collaborator| collaborator.connection_id == connection_id)
2912            {
2913                Ok(collaborators)
2914            } else {
2915                Err(anyhow!("no such project"))?
2916            }
2917        })
2918        .await
2919    }
2920
2921    pub async fn project_connection_ids(
2922        &self,
2923        project_id: ProjectId,
2924        connection_id: ConnectionId,
2925    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
2926        let room_id = self.room_id_for_project(project_id).await?;
2927        self.room_transaction(room_id, |tx| async move {
2928            let mut collaborators = project_collaborator::Entity::find()
2929                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2930                .stream(&*tx)
2931                .await?;
2932
2933            let mut connection_ids = HashSet::default();
2934            while let Some(collaborator) = collaborators.next().await {
2935                let collaborator = collaborator?;
2936                connection_ids.insert(collaborator.connection());
2937            }
2938
2939            if connection_ids.contains(&connection_id) {
2940                Ok(connection_ids)
2941            } else {
2942                Err(anyhow!("no such project"))?
2943            }
2944        })
2945        .await
2946    }
2947
2948    async fn project_guest_connection_ids(
2949        &self,
2950        project_id: ProjectId,
2951        tx: &DatabaseTransaction,
2952    ) -> Result<Vec<ConnectionId>> {
2953        let mut collaborators = project_collaborator::Entity::find()
2954            .filter(
2955                project_collaborator::Column::ProjectId
2956                    .eq(project_id)
2957                    .and(project_collaborator::Column::IsHost.eq(false)),
2958            )
2959            .stream(tx)
2960            .await?;
2961
2962        let mut guest_connection_ids = Vec::new();
2963        while let Some(collaborator) = collaborators.next().await {
2964            let collaborator = collaborator?;
2965            guest_connection_ids.push(collaborator.connection());
2966        }
2967        Ok(guest_connection_ids)
2968    }
2969
2970    async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
2971        self.transaction(|tx| async move {
2972            let project = project::Entity::find_by_id(project_id)
2973                .one(&*tx)
2974                .await?
2975                .ok_or_else(|| anyhow!("project {} not found", project_id))?;
2976            Ok(project.room_id)
2977        })
2978        .await
2979    }
2980
2981    // access tokens
2982
2983    pub async fn create_access_token(
2984        &self,
2985        user_id: UserId,
2986        access_token_hash: &str,
2987        max_access_token_count: usize,
2988    ) -> Result<AccessTokenId> {
2989        self.transaction(|tx| async {
2990            let tx = tx;
2991
2992            let token = access_token::ActiveModel {
2993                user_id: ActiveValue::set(user_id),
2994                hash: ActiveValue::set(access_token_hash.into()),
2995                ..Default::default()
2996            }
2997            .insert(&*tx)
2998            .await?;
2999
3000            access_token::Entity::delete_many()
3001                .filter(
3002                    access_token::Column::Id.in_subquery(
3003                        Query::select()
3004                            .column(access_token::Column::Id)
3005                            .from(access_token::Entity)
3006                            .and_where(access_token::Column::UserId.eq(user_id))
3007                            .order_by(access_token::Column::Id, sea_orm::Order::Desc)
3008                            .limit(10000)
3009                            .offset(max_access_token_count as u64)
3010                            .to_owned(),
3011                    ),
3012                )
3013                .exec(&*tx)
3014                .await?;
3015            Ok(token.id)
3016        })
3017        .await
3018    }
3019
3020    pub async fn get_access_token(
3021        &self,
3022        access_token_id: AccessTokenId,
3023    ) -> Result<access_token::Model> {
3024        self.transaction(|tx| async move {
3025            Ok(access_token::Entity::find_by_id(access_token_id)
3026                .one(&*tx)
3027                .await?
3028                .ok_or_else(|| anyhow!("no such access token"))?)
3029        })
3030        .await
3031    }
3032
3033    // channels
3034
3035    pub async fn create_channel(&self, name: &str) -> Result<ChannelId> {
3036        self.transaction(move |tx| async move {
3037            let tx = tx;
3038
3039            let channel = channel::ActiveModel {
3040                name: ActiveValue::Set(name.to_string()),
3041                ..Default::default()
3042            };
3043
3044            let channel = channel.insert(&*tx).await?;
3045
3046            Ok(channel.id)
3047        }).await
3048    }
3049
3050    pub async fn add_channel_member(&self, channel_id: ChannelId, user_id: UserId) -> Result<()> {
3051        self.transaction(move |tx| async move {
3052            let tx = tx;
3053
3054            let channel_membership = channel_member::ActiveModel {
3055                channel_id: ActiveValue::Set(channel_id),
3056                user_id: ActiveValue::Set(user_id),
3057                ..Default::default()
3058            };
3059
3060            channel_membership.insert(&*tx).await?;
3061
3062            Ok(())
3063        }).await
3064    }
3065
3066    pub async fn get_channels(&self, user_id: UserId) -> Vec<ChannelId> {
3067        self.transaction(|tx| async move {
3068            let tx = tx;
3069
3070        })
3071        //     let user = user::Model {
3072        //         id: user_id,
3073        //         ..Default::default()
3074        //     };
3075        //     let mut channel_ids = user
3076        //         .find_related(channel_member::Entity)
3077        //         .select_only()
3078        //         .column(channel_member::Column::ChannelId)
3079        //         .all(&*tx)
3080        //         .await;
3081
3082        //     // let descendants = Alias::new("descendants");
3083        //     // let cte_referencing = SelectStatement::new()
3084        //     //     .column(channel_parent::Column::ChildId)
3085        //     //     .from(channel::Entity)
3086        //     //     .and_where(
3087        //     //         Expr::col(channel_parent::Column::ParentId)
3088        //     //             .in_subquery(SelectStatement::new().from(descendants).take())
3089        //     //     );
3090
3091        //     // /*
3092        //     // WITH RECURSIVE descendant_ids(id) AS (
3093        //     //     $1
3094        //     //     UNION ALL
3095        //     //     SELECT child_id as id FROM channel_parents WHERE parent_id IN descendants
3096        //     // )
3097        //     // SELECT * from channels where id in descendant_ids
3098        //     // */
3099
3100
3101        //     // // WITH RECURSIVE descendants(id) AS (
3102        //     // //    // SQL QUERY FOR SELECTING Initial IDs
3103        //     // //   UNION
3104        //     // //    SELECT id FROM ancestors WHERE p.parent = id
3105        //     // // )
3106        //     // // SELECT * FROM descendants;
3107
3108
3109
3110        //     // // let descendant_channel_ids =
3111
3112
3113
3114        //     // // let query = sea_query::Query::with().recursive(true);
3115
3116
3117        //     // for id_path in id_paths {
3118        //     //     //
3119        //     // }
3120
3121
3122        //     // // zed/public/plugins
3123        //     // // zed/public/plugins/js
3124        //     // // zed/zed-livekit
3125        //     // // livekit/zed-livekit
3126        //     // // zed - 101
3127        //     // // livekit - 500
3128        //     // // zed-livekit - 510
3129        //     // // public - 150
3130        //     // // plugins - 200
3131        //     // // js - 300
3132        //     // //
3133        //     // // Channel, Parent - edges
3134        //     // // 510 - 500
3135        //     // // 510 - 101
3136        //     // //
3137        //     // // Given the channel 'Zed' (101)
3138        //     // // Select * from EDGES where parent = 101 => 510
3139        //     // //
3140
3141
3142        //     // "SELECT * from channels where id_path like '$1?'"
3143
3144        //     // // https://www.postgresql.org/docs/current/queries-with.html
3145        //     // // https://www.sqlite.org/lang_with.html
3146
3147        //     // "SELECT channel_id from channel_ancestors where ancestor_id IN $()"
3148
3149        //     // // | channel_id | ancestor_ids |
3150        //     // // 150              150
3151        //     // // 150              101
3152        //     // // 200              101
3153        //     // // 300              101
3154        //     // // 200              150
3155        //     // // 300              150
3156        //     // // 300              200
3157        //     // //
3158        //     // // // | channel_id | ancestor_ids |
3159        //     // // 150              101
3160        //     // // 200              101
3161        //     // // 300              101
3162        //     // // 200              150
3163        //     // // 300              [150, 200]
3164
3165        //     // channel::Entity::find()
3166        //     //     .filter(channel::Column::IdPath.like(id_paths.unwrap()))
3167
3168        //     // dbg!(&id_paths.unwrap()[0].id_path);
3169
3170        //     // // let mut channel_members_by_channel_id = HashMap::new();
3171        //     // // for channel_member in channel_members {
3172        //     // //     channel_members_by_channel_id
3173        //     // //         .entry(channel_member.channel_id)
3174        //     // //         .or_insert_with(Vec::new)
3175        //     // //         .push(channel_member);
3176        //     // // }
3177
3178        //     // // let mut channel_messages = channel_message::Entity::find()
3179        //     // //     .filter(channel_message::Column::ChannelId.in_selection(channel_ids))
3180        //     // //     .all(&*tx)
3181        //     // //     .await?;
3182
3183        //     // // let mut channel_messages_by_channel_id = HashMap::new();
3184        //     // // for channel_message in channel_messages {
3185        //     // //     channel_messages_by_channel_id
3186        //     // //         .entry(channel_message.channel_id)
3187        //     // //         .or_insert_with(Vec::new)
3188        //     // //         .push(channel_message);
3189        //     // // }
3190
3191        //     // todo!();
3192        //     // // Ok(channels)
3193        //     Err(Error("not implemented"))
3194        // })
3195        // .await
3196    }
3197
3198    async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
3199    where
3200        F: Send + Fn(TransactionHandle) -> Fut,
3201        Fut: Send + Future<Output = Result<T>>,
3202    {
3203        let body = async {
3204            let mut i = 0;
3205            loop {
3206                let (tx, result) = self.with_transaction(&f).await?;
3207                match result {
3208                    Ok(result) => match tx.commit().await.map_err(Into::into) {
3209                        Ok(()) => return Ok(result),
3210                        Err(error) => {
3211                            if !self.retry_on_serialization_error(&error, i).await {
3212                                return Err(error);
3213                            }
3214                        }
3215                    },
3216                    Err(error) => {
3217                        tx.rollback().await?;
3218                        if !self.retry_on_serialization_error(&error, i).await {
3219                            return Err(error);
3220                        }
3221                    }
3222                }
3223                i += 1;
3224            }
3225        };
3226
3227        self.run(body).await
3228    }
3229
3230    async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
3231    where
3232        F: Send + Fn(TransactionHandle) -> Fut,
3233        Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
3234    {
3235        let body = async {
3236            let mut i = 0;
3237            loop {
3238                let (tx, result) = self.with_transaction(&f).await?;
3239                match result {
3240                    Ok(Some((room_id, data))) => {
3241                        let lock = self.rooms.entry(room_id).or_default().clone();
3242                        let _guard = lock.lock_owned().await;
3243                        match tx.commit().await.map_err(Into::into) {
3244                            Ok(()) => {
3245                                return Ok(Some(RoomGuard {
3246                                    data,
3247                                    _guard,
3248                                    _not_send: PhantomData,
3249                                }));
3250                            }
3251                            Err(error) => {
3252                                if !self.retry_on_serialization_error(&error, i).await {
3253                                    return Err(error);
3254                                }
3255                            }
3256                        }
3257                    }
3258                    Ok(None) => match tx.commit().await.map_err(Into::into) {
3259                        Ok(()) => return Ok(None),
3260                        Err(error) => {
3261                            if !self.retry_on_serialization_error(&error, i).await {
3262                                return Err(error);
3263                            }
3264                        }
3265                    },
3266                    Err(error) => {
3267                        tx.rollback().await?;
3268                        if !self.retry_on_serialization_error(&error, i).await {
3269                            return Err(error);
3270                        }
3271                    }
3272                }
3273                i += 1;
3274            }
3275        };
3276
3277        self.run(body).await
3278    }
3279
3280    async fn room_transaction<F, Fut, T>(&self, room_id: RoomId, f: F) -> Result<RoomGuard<T>>
3281    where
3282        F: Send + Fn(TransactionHandle) -> Fut,
3283        Fut: Send + Future<Output = Result<T>>,
3284    {
3285        let body = async {
3286            let mut i = 0;
3287            loop {
3288                let lock = self.rooms.entry(room_id).or_default().clone();
3289                let _guard = lock.lock_owned().await;
3290                let (tx, result) = self.with_transaction(&f).await?;
3291                match result {
3292                    Ok(data) => match tx.commit().await.map_err(Into::into) {
3293                        Ok(()) => {
3294                            return Ok(RoomGuard {
3295                                data,
3296                                _guard,
3297                                _not_send: PhantomData,
3298                            });
3299                        }
3300                        Err(error) => {
3301                            if !self.retry_on_serialization_error(&error, i).await {
3302                                return Err(error);
3303                            }
3304                        }
3305                    },
3306                    Err(error) => {
3307                        tx.rollback().await?;
3308                        if !self.retry_on_serialization_error(&error, i).await {
3309                            return Err(error);
3310                        }
3311                    }
3312                }
3313                i += 1;
3314            }
3315        };
3316
3317        self.run(body).await
3318    }
3319
3320    async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
3321    where
3322        F: Send + Fn(TransactionHandle) -> Fut,
3323        Fut: Send + Future<Output = Result<T>>,
3324    {
3325        let tx = self
3326            .pool
3327            .begin_with_config(Some(IsolationLevel::Serializable), None)
3328            .await?;
3329
3330        let mut tx = Arc::new(Some(tx));
3331        let result = f(TransactionHandle(tx.clone())).await;
3332        let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
3333            return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
3334        };
3335
3336        Ok((tx, result))
3337    }
3338
3339    async fn run<F, T>(&self, future: F) -> Result<T>
3340    where
3341        F: Future<Output = Result<T>>,
3342    {
3343        #[cfg(test)]
3344        {
3345            if let Executor::Deterministic(executor) = &self.executor {
3346                executor.simulate_random_delay().await;
3347            }
3348
3349            self.runtime.as_ref().unwrap().block_on(future)
3350        }
3351
3352        #[cfg(not(test))]
3353        {
3354            future.await
3355        }
3356    }
3357
3358    async fn retry_on_serialization_error(&self, error: &Error, prev_attempt_count: u32) -> bool {
3359        // If the error is due to a failure to serialize concurrent transactions, then retry
3360        // this transaction after a delay. With each subsequent retry, double the delay duration.
3361        // Also vary the delay randomly in order to ensure different database connections retry
3362        // at different times.
3363        if is_serialization_error(error) {
3364            let base_delay = 4_u64 << prev_attempt_count.min(16);
3365            let randomized_delay = base_delay as f32 * self.rng.lock().await.gen_range(0.5..=2.0);
3366            log::info!(
3367                "retrying transaction after serialization error. delay: {} ms.",
3368                randomized_delay
3369            );
3370            self.executor
3371                .sleep(Duration::from_millis(randomized_delay as u64))
3372                .await;
3373            true
3374        } else {
3375            false
3376        }
3377    }
3378}
3379
3380fn is_serialization_error(error: &Error) -> bool {
3381    const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
3382    match error {
3383        Error::Database(
3384            DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
3385            | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
3386        ) if error
3387            .as_database_error()
3388            .and_then(|error| error.code())
3389            .as_deref()
3390            == Some(SERIALIZATION_FAILURE_CODE) =>
3391        {
3392            true
3393        }
3394        _ => false,
3395    }
3396}
3397
3398struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
3399
3400impl Deref for TransactionHandle {
3401    type Target = DatabaseTransaction;
3402
3403    fn deref(&self) -> &Self::Target {
3404        self.0.as_ref().as_ref().unwrap()
3405    }
3406}
3407
3408pub struct RoomGuard<T> {
3409    data: T,
3410    _guard: OwnedMutexGuard<()>,
3411    _not_send: PhantomData<Rc<()>>,
3412}
3413
3414impl<T> Deref for RoomGuard<T> {
3415    type Target = T;
3416
3417    fn deref(&self) -> &T {
3418        &self.data
3419    }
3420}
3421
3422impl<T> DerefMut for RoomGuard<T> {
3423    fn deref_mut(&mut self) -> &mut T {
3424        &mut self.data
3425    }
3426}
3427
3428#[derive(Debug, Serialize, Deserialize)]
3429pub struct NewUserParams {
3430    pub github_login: String,
3431    pub github_user_id: i32,
3432    pub invite_count: i32,
3433}
3434
3435#[derive(Debug)]
3436pub struct NewUserResult {
3437    pub user_id: UserId,
3438    pub metrics_id: String,
3439    pub inviting_user_id: Option<UserId>,
3440    pub signup_device_id: Option<String>,
3441}
3442
3443fn random_invite_code() -> String {
3444    nanoid::nanoid!(16)
3445}
3446
3447fn random_email_confirmation_code() -> String {
3448    nanoid::nanoid!(64)
3449}
3450
3451macro_rules! id_type {
3452    ($name:ident) => {
3453        #[derive(
3454            Clone,
3455            Copy,
3456            Debug,
3457            Default,
3458            PartialEq,
3459            Eq,
3460            PartialOrd,
3461            Ord,
3462            Hash,
3463            Serialize,
3464            Deserialize,
3465        )]
3466        #[serde(transparent)]
3467        pub struct $name(pub i32);
3468
3469        impl $name {
3470            #[allow(unused)]
3471            pub const MAX: Self = Self(i32::MAX);
3472
3473            #[allow(unused)]
3474            pub fn from_proto(value: u64) -> Self {
3475                Self(value as i32)
3476            }
3477
3478            #[allow(unused)]
3479            pub fn to_proto(self) -> u64 {
3480                self.0 as u64
3481            }
3482        }
3483
3484        impl std::fmt::Display for $name {
3485            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
3486                self.0.fmt(f)
3487            }
3488        }
3489
3490        impl From<$name> for sea_query::Value {
3491            fn from(value: $name) -> Self {
3492                sea_query::Value::Int(Some(value.0))
3493            }
3494        }
3495
3496        impl sea_orm::TryGetable for $name {
3497            fn try_get(
3498                res: &sea_orm::QueryResult,
3499                pre: &str,
3500                col: &str,
3501            ) -> Result<Self, sea_orm::TryGetError> {
3502                Ok(Self(i32::try_get(res, pre, col)?))
3503            }
3504        }
3505
3506        impl sea_query::ValueType for $name {
3507            fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
3508                match v {
3509                    Value::TinyInt(Some(int)) => {
3510                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3511                    }
3512                    Value::SmallInt(Some(int)) => {
3513                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3514                    }
3515                    Value::Int(Some(int)) => {
3516                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3517                    }
3518                    Value::BigInt(Some(int)) => {
3519                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3520                    }
3521                    Value::TinyUnsigned(Some(int)) => {
3522                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3523                    }
3524                    Value::SmallUnsigned(Some(int)) => {
3525                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3526                    }
3527                    Value::Unsigned(Some(int)) => {
3528                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3529                    }
3530                    Value::BigUnsigned(Some(int)) => {
3531                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3532                    }
3533                    _ => Err(sea_query::ValueTypeErr),
3534                }
3535            }
3536
3537            fn type_name() -> String {
3538                stringify!($name).into()
3539            }
3540
3541            fn array_type() -> sea_query::ArrayType {
3542                sea_query::ArrayType::Int
3543            }
3544
3545            fn column_type() -> sea_query::ColumnType {
3546                sea_query::ColumnType::Integer(None)
3547            }
3548        }
3549
3550        impl sea_orm::TryFromU64 for $name {
3551            fn try_from_u64(n: u64) -> Result<Self, DbErr> {
3552                Ok(Self(n.try_into().map_err(|_| {
3553                    DbErr::ConvertFromU64(concat!(
3554                        "error converting ",
3555                        stringify!($name),
3556                        " to u64"
3557                    ))
3558                })?))
3559            }
3560        }
3561
3562        impl sea_query::Nullable for $name {
3563            fn null() -> Value {
3564                Value::Int(None)
3565            }
3566        }
3567    };
3568}
3569
3570id_type!(AccessTokenId);
3571id_type!(ChannelId);
3572id_type!(ChannelMemberId);
3573id_type!(ContactId);
3574id_type!(FollowerId);
3575id_type!(RoomId);
3576id_type!(RoomParticipantId);
3577id_type!(ProjectId);
3578id_type!(ProjectCollaboratorId);
3579id_type!(ReplicaId);
3580id_type!(ServerId);
3581id_type!(SignupId);
3582id_type!(UserId);
3583
3584pub struct RejoinedRoom {
3585    pub room: proto::Room,
3586    pub rejoined_projects: Vec<RejoinedProject>,
3587    pub reshared_projects: Vec<ResharedProject>,
3588}
3589
3590pub struct ResharedProject {
3591    pub id: ProjectId,
3592    pub old_connection_id: ConnectionId,
3593    pub collaborators: Vec<ProjectCollaborator>,
3594    pub worktrees: Vec<proto::WorktreeMetadata>,
3595}
3596
3597pub struct RejoinedProject {
3598    pub id: ProjectId,
3599    pub old_connection_id: ConnectionId,
3600    pub collaborators: Vec<ProjectCollaborator>,
3601    pub worktrees: Vec<RejoinedWorktree>,
3602    pub language_servers: Vec<proto::LanguageServer>,
3603}
3604
3605#[derive(Debug)]
3606pub struct RejoinedWorktree {
3607    pub id: u64,
3608    pub abs_path: String,
3609    pub root_name: String,
3610    pub visible: bool,
3611    pub updated_entries: Vec<proto::Entry>,
3612    pub removed_entries: Vec<u64>,
3613    pub updated_repositories: Vec<proto::RepositoryEntry>,
3614    pub removed_repositories: Vec<u64>,
3615    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3616    pub settings_files: Vec<WorktreeSettingsFile>,
3617    pub scan_id: u64,
3618    pub completed_scan_id: u64,
3619}
3620
3621pub struct LeftRoom {
3622    pub room: proto::Room,
3623    pub left_projects: HashMap<ProjectId, LeftProject>,
3624    pub canceled_calls_to_user_ids: Vec<UserId>,
3625}
3626
3627pub struct RefreshedRoom {
3628    pub room: proto::Room,
3629    pub stale_participant_user_ids: Vec<UserId>,
3630    pub canceled_calls_to_user_ids: Vec<UserId>,
3631}
3632
3633pub struct Project {
3634    pub collaborators: Vec<ProjectCollaborator>,
3635    pub worktrees: BTreeMap<u64, Worktree>,
3636    pub language_servers: Vec<proto::LanguageServer>,
3637}
3638
3639pub struct ProjectCollaborator {
3640    pub connection_id: ConnectionId,
3641    pub user_id: UserId,
3642    pub replica_id: ReplicaId,
3643    pub is_host: bool,
3644}
3645
3646impl ProjectCollaborator {
3647    pub fn to_proto(&self) -> proto::Collaborator {
3648        proto::Collaborator {
3649            peer_id: Some(self.connection_id.into()),
3650            replica_id: self.replica_id.0 as u32,
3651            user_id: self.user_id.to_proto(),
3652        }
3653    }
3654}
3655
3656#[derive(Debug)]
3657pub struct LeftProject {
3658    pub id: ProjectId,
3659    pub host_user_id: UserId,
3660    pub host_connection_id: ConnectionId,
3661    pub connection_ids: Vec<ConnectionId>,
3662}
3663
3664pub struct Worktree {
3665    pub id: u64,
3666    pub abs_path: String,
3667    pub root_name: String,
3668    pub visible: bool,
3669    pub entries: Vec<proto::Entry>,
3670    pub repository_entries: BTreeMap<u64, proto::RepositoryEntry>,
3671    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3672    pub settings_files: Vec<WorktreeSettingsFile>,
3673    pub scan_id: u64,
3674    pub completed_scan_id: u64,
3675}
3676
3677#[derive(Debug)]
3678pub struct WorktreeSettingsFile {
3679    pub path: String,
3680    pub content: String,
3681}
3682
3683#[cfg(test)]
3684pub use test::*;
3685
3686#[cfg(test)]
3687mod test {
3688    use super::*;
3689    use gpui::executor::Background;
3690    use parking_lot::Mutex;
3691    use sea_orm::ConnectionTrait;
3692    use sqlx::migrate::MigrateDatabase;
3693    use std::sync::Arc;
3694
3695    pub struct TestDb {
3696        pub db: Option<Arc<Database>>,
3697        pub connection: Option<sqlx::AnyConnection>,
3698    }
3699
3700    impl TestDb {
3701        pub fn sqlite(background: Arc<Background>) -> Self {
3702            let url = format!("sqlite::memory:");
3703            let runtime = tokio::runtime::Builder::new_current_thread()
3704                .enable_io()
3705                .enable_time()
3706                .build()
3707                .unwrap();
3708
3709            let mut db = runtime.block_on(async {
3710                let mut options = ConnectOptions::new(url);
3711                options.max_connections(5);
3712                let db = Database::new(options, Executor::Deterministic(background))
3713                    .await
3714                    .unwrap();
3715                let sql = include_str!(concat!(
3716                    env!("CARGO_MANIFEST_DIR"),
3717                    "/migrations.sqlite/20221109000000_test_schema.sql"
3718                ));
3719                db.pool
3720                    .execute(sea_orm::Statement::from_string(
3721                        db.pool.get_database_backend(),
3722                        sql.into(),
3723                    ))
3724                    .await
3725                    .unwrap();
3726                db
3727            });
3728
3729            db.runtime = Some(runtime);
3730
3731            Self {
3732                db: Some(Arc::new(db)),
3733                connection: None,
3734            }
3735        }
3736
3737        pub fn postgres(background: Arc<Background>) -> Self {
3738            static LOCK: Mutex<()> = Mutex::new(());
3739
3740            let _guard = LOCK.lock();
3741            let mut rng = StdRng::from_entropy();
3742            let url = format!(
3743                "postgres://postgres@localhost/zed-test-{}",
3744                rng.gen::<u128>()
3745            );
3746            let runtime = tokio::runtime::Builder::new_current_thread()
3747                .enable_io()
3748                .enable_time()
3749                .build()
3750                .unwrap();
3751
3752            let mut db = runtime.block_on(async {
3753                sqlx::Postgres::create_database(&url)
3754                    .await
3755                    .expect("failed to create test db");
3756                let mut options = ConnectOptions::new(url);
3757                options
3758                    .max_connections(5)
3759                    .idle_timeout(Duration::from_secs(0));
3760                let db = Database::new(options, Executor::Deterministic(background))
3761                    .await
3762                    .unwrap();
3763                let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
3764                db.migrate(Path::new(migrations_path), false).await.unwrap();
3765                db
3766            });
3767
3768            db.runtime = Some(runtime);
3769
3770            Self {
3771                db: Some(Arc::new(db)),
3772                connection: None,
3773            }
3774        }
3775
3776        pub fn db(&self) -> &Arc<Database> {
3777            self.db.as_ref().unwrap()
3778        }
3779    }
3780
3781    impl Drop for TestDb {
3782        fn drop(&mut self) {
3783            let db = self.db.take().unwrap();
3784            if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
3785                db.runtime.as_ref().unwrap().block_on(async {
3786                    use util::ResultExt;
3787                    let query = "
3788                        SELECT pg_terminate_backend(pg_stat_activity.pid)
3789                        FROM pg_stat_activity
3790                        WHERE
3791                            pg_stat_activity.datname = current_database() AND
3792                            pid <> pg_backend_pid();
3793                    ";
3794                    db.pool
3795                        .execute(sea_orm::Statement::from_string(
3796                            db.pool.get_database_backend(),
3797                            query.into(),
3798                        ))
3799                        .await
3800                        .log_err();
3801                    sqlx::Postgres::drop_database(db.options.get_url())
3802                        .await
3803                        .log_err();
3804                })
3805            }
3806        }
3807    }
3808}