db.rs

   1mod access_token;
   2mod contact;
   3mod follower;
   4mod language_server;
   5mod project;
   6mod project_collaborator;
   7mod room;
   8mod room_participant;
   9mod server;
  10mod signup;
  11#[cfg(test)]
  12mod tests;
  13mod user;
  14mod worktree;
  15mod worktree_diagnostic_summary;
  16mod worktree_entry;
  17
  18use crate::{Error, Result};
  19use anyhow::anyhow;
  20use collections::{BTreeMap, HashMap, HashSet};
  21pub use contact::Contact;
  22use dashmap::DashMap;
  23use futures::StreamExt;
  24use hyper::StatusCode;
  25use rpc::{proto, ConnectionId};
  26use sea_orm::Condition;
  27pub use sea_orm::ConnectOptions;
  28use sea_orm::{
  29    entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
  30    DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
  31    Statement, TransactionTrait,
  32};
  33use sea_query::{Alias, Expr, OnConflict, Query};
  34use serde::{Deserialize, Serialize};
  35pub use signup::{Invite, NewSignup, WaitlistSummary};
  36use sqlx::migrate::{Migrate, Migration, MigrationSource};
  37use sqlx::Connection;
  38use std::ops::{Deref, DerefMut};
  39use std::path::Path;
  40use std::time::Duration;
  41use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
  42use tokio::sync::{Mutex, OwnedMutexGuard};
  43pub use user::Model as User;
  44
  45pub struct Database {
  46    options: ConnectOptions,
  47    pool: DatabaseConnection,
  48    rooms: DashMap<RoomId, Arc<Mutex<()>>>,
  49    #[cfg(test)]
  50    background: Option<std::sync::Arc<gpui::executor::Background>>,
  51    #[cfg(test)]
  52    runtime: Option<tokio::runtime::Runtime>,
  53}
  54
  55impl Database {
  56    pub async fn new(options: ConnectOptions) -> Result<Self> {
  57        Ok(Self {
  58            options: options.clone(),
  59            pool: sea_orm::Database::connect(options).await?,
  60            rooms: DashMap::with_capacity(16384),
  61            #[cfg(test)]
  62            background: None,
  63            #[cfg(test)]
  64            runtime: None,
  65        })
  66    }
  67
  68    #[cfg(test)]
  69    pub fn reset(&self) {
  70        self.rooms.clear();
  71    }
  72
  73    pub async fn migrate(
  74        &self,
  75        migrations_path: &Path,
  76        ignore_checksum_mismatch: bool,
  77    ) -> anyhow::Result<Vec<(Migration, Duration)>> {
  78        let migrations = MigrationSource::resolve(migrations_path)
  79            .await
  80            .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
  81
  82        let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
  83
  84        connection.ensure_migrations_table().await?;
  85        let applied_migrations: HashMap<_, _> = connection
  86            .list_applied_migrations()
  87            .await?
  88            .into_iter()
  89            .map(|m| (m.version, m))
  90            .collect();
  91
  92        let mut new_migrations = Vec::new();
  93        for migration in migrations {
  94            match applied_migrations.get(&migration.version) {
  95                Some(applied_migration) => {
  96                    if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
  97                    {
  98                        Err(anyhow!(
  99                            "checksum mismatch for applied migration {}",
 100                            migration.description
 101                        ))?;
 102                    }
 103                }
 104                None => {
 105                    let elapsed = connection.apply(&migration).await?;
 106                    new_migrations.push((migration, elapsed));
 107                }
 108            }
 109        }
 110
 111        Ok(new_migrations)
 112    }
 113
 114    pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
 115        self.transaction(|tx| async move {
 116            let server = server::ActiveModel {
 117                environment: ActiveValue::set(environment.into()),
 118                ..Default::default()
 119            }
 120            .insert(&*tx)
 121            .await?;
 122            Ok(server.id)
 123        })
 124        .await
 125    }
 126
 127    pub async fn stale_room_ids(
 128        &self,
 129        environment: &str,
 130        new_server_id: ServerId,
 131    ) -> Result<Vec<RoomId>> {
 132        self.transaction(|tx| async move {
 133            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 134            enum QueryAs {
 135                RoomId,
 136            }
 137
 138            let stale_server_epochs = self
 139                .stale_server_ids(environment, new_server_id, &tx)
 140                .await?;
 141            Ok(room_participant::Entity::find()
 142                .select_only()
 143                .column(room_participant::Column::RoomId)
 144                .distinct()
 145                .filter(
 146                    room_participant::Column::AnsweringConnectionServerId
 147                        .is_in(stale_server_epochs),
 148                )
 149                .into_values::<_, QueryAs>()
 150                .all(&*tx)
 151                .await?)
 152        })
 153        .await
 154    }
 155
 156    pub async fn refresh_room(
 157        &self,
 158        room_id: RoomId,
 159        new_server_id: ServerId,
 160    ) -> Result<RoomGuard<RefreshedRoom>> {
 161        self.room_transaction(room_id, |tx| async move {
 162            let stale_participant_filter = Condition::all()
 163                .add(room_participant::Column::RoomId.eq(room_id))
 164                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
 165                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
 166
 167            let stale_participant_user_ids = room_participant::Entity::find()
 168                .filter(stale_participant_filter.clone())
 169                .all(&*tx)
 170                .await?
 171                .into_iter()
 172                .map(|participant| participant.user_id)
 173                .collect::<Vec<_>>();
 174
 175            // Delete participants who failed to reconnect.
 176            room_participant::Entity::delete_many()
 177                .filter(stale_participant_filter)
 178                .exec(&*tx)
 179                .await?;
 180
 181            let room = self.get_room(room_id, &tx).await?;
 182            let mut canceled_calls_to_user_ids = Vec::new();
 183            // Delete the room if it becomes empty and cancel pending calls.
 184            if room.participants.is_empty() {
 185                canceled_calls_to_user_ids.extend(
 186                    room.pending_participants
 187                        .iter()
 188                        .map(|pending_participant| UserId::from_proto(pending_participant.user_id)),
 189                );
 190                room_participant::Entity::delete_many()
 191                    .filter(room_participant::Column::RoomId.eq(room_id))
 192                    .exec(&*tx)
 193                    .await?;
 194                room::Entity::delete_by_id(room_id).exec(&*tx).await?;
 195            }
 196
 197            Ok(RefreshedRoom {
 198                room,
 199                stale_participant_user_ids,
 200                canceled_calls_to_user_ids,
 201            })
 202        })
 203        .await
 204    }
 205
 206    pub async fn delete_stale_servers(
 207        &self,
 208        environment: &str,
 209        new_server_id: ServerId,
 210    ) -> Result<()> {
 211        self.transaction(|tx| async move {
 212            server::Entity::delete_many()
 213                .filter(
 214                    Condition::all()
 215                        .add(server::Column::Environment.eq(environment))
 216                        .add(server::Column::Id.ne(new_server_id)),
 217                )
 218                .exec(&*tx)
 219                .await?;
 220            Ok(())
 221        })
 222        .await
 223    }
 224
 225    async fn stale_server_ids(
 226        &self,
 227        environment: &str,
 228        new_server_id: ServerId,
 229        tx: &DatabaseTransaction,
 230    ) -> Result<Vec<ServerId>> {
 231        let stale_servers = server::Entity::find()
 232            .filter(
 233                Condition::all()
 234                    .add(server::Column::Environment.eq(environment))
 235                    .add(server::Column::Id.ne(new_server_id)),
 236            )
 237            .all(&*tx)
 238            .await?;
 239        Ok(stale_servers.into_iter().map(|server| server.id).collect())
 240    }
 241
 242    // users
 243
 244    pub async fn create_user(
 245        &self,
 246        email_address: &str,
 247        admin: bool,
 248        params: NewUserParams,
 249    ) -> Result<NewUserResult> {
 250        self.transaction(|tx| async {
 251            let tx = tx;
 252            let user = user::Entity::insert(user::ActiveModel {
 253                email_address: ActiveValue::set(Some(email_address.into())),
 254                github_login: ActiveValue::set(params.github_login.clone()),
 255                github_user_id: ActiveValue::set(Some(params.github_user_id)),
 256                admin: ActiveValue::set(admin),
 257                metrics_id: ActiveValue::set(Uuid::new_v4()),
 258                ..Default::default()
 259            })
 260            .on_conflict(
 261                OnConflict::column(user::Column::GithubLogin)
 262                    .update_column(user::Column::GithubLogin)
 263                    .to_owned(),
 264            )
 265            .exec_with_returning(&*tx)
 266            .await?;
 267
 268            Ok(NewUserResult {
 269                user_id: user.id,
 270                metrics_id: user.metrics_id.to_string(),
 271                signup_device_id: None,
 272                inviting_user_id: None,
 273            })
 274        })
 275        .await
 276    }
 277
 278    pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
 279        self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
 280            .await
 281    }
 282
 283    pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
 284        self.transaction(|tx| async {
 285            let tx = tx;
 286            Ok(user::Entity::find()
 287                .filter(user::Column::Id.is_in(ids.iter().copied()))
 288                .all(&*tx)
 289                .await?)
 290        })
 291        .await
 292    }
 293
 294    pub async fn get_user_by_github_account(
 295        &self,
 296        github_login: &str,
 297        github_user_id: Option<i32>,
 298    ) -> Result<Option<User>> {
 299        self.transaction(|tx| async move {
 300            let tx = &*tx;
 301            if let Some(github_user_id) = github_user_id {
 302                if let Some(user_by_github_user_id) = user::Entity::find()
 303                    .filter(user::Column::GithubUserId.eq(github_user_id))
 304                    .one(tx)
 305                    .await?
 306                {
 307                    let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
 308                    user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
 309                    Ok(Some(user_by_github_user_id.update(tx).await?))
 310                } else if let Some(user_by_github_login) = user::Entity::find()
 311                    .filter(user::Column::GithubLogin.eq(github_login))
 312                    .one(tx)
 313                    .await?
 314                {
 315                    let mut user_by_github_login = user_by_github_login.into_active_model();
 316                    user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
 317                    Ok(Some(user_by_github_login.update(tx).await?))
 318                } else {
 319                    Ok(None)
 320                }
 321            } else {
 322                Ok(user::Entity::find()
 323                    .filter(user::Column::GithubLogin.eq(github_login))
 324                    .one(tx)
 325                    .await?)
 326            }
 327        })
 328        .await
 329    }
 330
 331    pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
 332        self.transaction(|tx| async move {
 333            Ok(user::Entity::find()
 334                .order_by_asc(user::Column::GithubLogin)
 335                .limit(limit as u64)
 336                .offset(page as u64 * limit as u64)
 337                .all(&*tx)
 338                .await?)
 339        })
 340        .await
 341    }
 342
 343    pub async fn get_users_with_no_invites(
 344        &self,
 345        invited_by_another_user: bool,
 346    ) -> Result<Vec<User>> {
 347        self.transaction(|tx| async move {
 348            Ok(user::Entity::find()
 349                .filter(
 350                    user::Column::InviteCount
 351                        .eq(0)
 352                        .and(if invited_by_another_user {
 353                            user::Column::InviterId.is_not_null()
 354                        } else {
 355                            user::Column::InviterId.is_null()
 356                        }),
 357                )
 358                .all(&*tx)
 359                .await?)
 360        })
 361        .await
 362    }
 363
 364    pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
 365        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 366        enum QueryAs {
 367            MetricsId,
 368        }
 369
 370        self.transaction(|tx| async move {
 371            let metrics_id: Uuid = user::Entity::find_by_id(id)
 372                .select_only()
 373                .column(user::Column::MetricsId)
 374                .into_values::<_, QueryAs>()
 375                .one(&*tx)
 376                .await?
 377                .ok_or_else(|| anyhow!("could not find user"))?;
 378            Ok(metrics_id.to_string())
 379        })
 380        .await
 381    }
 382
 383    pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
 384        self.transaction(|tx| async move {
 385            user::Entity::update_many()
 386                .filter(user::Column::Id.eq(id))
 387                .set(user::ActiveModel {
 388                    admin: ActiveValue::set(is_admin),
 389                    ..Default::default()
 390                })
 391                .exec(&*tx)
 392                .await?;
 393            Ok(())
 394        })
 395        .await
 396    }
 397
 398    pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
 399        self.transaction(|tx| async move {
 400            user::Entity::update_many()
 401                .filter(user::Column::Id.eq(id))
 402                .set(user::ActiveModel {
 403                    connected_once: ActiveValue::set(connected_once),
 404                    ..Default::default()
 405                })
 406                .exec(&*tx)
 407                .await?;
 408            Ok(())
 409        })
 410        .await
 411    }
 412
 413    pub async fn destroy_user(&self, id: UserId) -> Result<()> {
 414        self.transaction(|tx| async move {
 415            access_token::Entity::delete_many()
 416                .filter(access_token::Column::UserId.eq(id))
 417                .exec(&*tx)
 418                .await?;
 419            user::Entity::delete_by_id(id).exec(&*tx).await?;
 420            Ok(())
 421        })
 422        .await
 423    }
 424
 425    // contacts
 426
 427    pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
 428        #[derive(Debug, FromQueryResult)]
 429        struct ContactWithUserBusyStatuses {
 430            user_id_a: UserId,
 431            user_id_b: UserId,
 432            a_to_b: bool,
 433            accepted: bool,
 434            should_notify: bool,
 435            user_a_busy: bool,
 436            user_b_busy: bool,
 437        }
 438
 439        self.transaction(|tx| async move {
 440            let user_a_participant = Alias::new("user_a_participant");
 441            let user_b_participant = Alias::new("user_b_participant");
 442            let mut db_contacts = contact::Entity::find()
 443                .column_as(
 444                    Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
 445                        .is_not_null(),
 446                    "user_a_busy",
 447                )
 448                .column_as(
 449                    Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
 450                        .is_not_null(),
 451                    "user_b_busy",
 452                )
 453                .filter(
 454                    contact::Column::UserIdA
 455                        .eq(user_id)
 456                        .or(contact::Column::UserIdB.eq(user_id)),
 457                )
 458                .join_as(
 459                    JoinType::LeftJoin,
 460                    contact::Relation::UserARoomParticipant.def(),
 461                    user_a_participant,
 462                )
 463                .join_as(
 464                    JoinType::LeftJoin,
 465                    contact::Relation::UserBRoomParticipant.def(),
 466                    user_b_participant,
 467                )
 468                .into_model::<ContactWithUserBusyStatuses>()
 469                .stream(&*tx)
 470                .await?;
 471
 472            let mut contacts = Vec::new();
 473            while let Some(db_contact) = db_contacts.next().await {
 474                let db_contact = db_contact?;
 475                if db_contact.user_id_a == user_id {
 476                    if db_contact.accepted {
 477                        contacts.push(Contact::Accepted {
 478                            user_id: db_contact.user_id_b,
 479                            should_notify: db_contact.should_notify && db_contact.a_to_b,
 480                            busy: db_contact.user_b_busy,
 481                        });
 482                    } else if db_contact.a_to_b {
 483                        contacts.push(Contact::Outgoing {
 484                            user_id: db_contact.user_id_b,
 485                        })
 486                    } else {
 487                        contacts.push(Contact::Incoming {
 488                            user_id: db_contact.user_id_b,
 489                            should_notify: db_contact.should_notify,
 490                        });
 491                    }
 492                } else if db_contact.accepted {
 493                    contacts.push(Contact::Accepted {
 494                        user_id: db_contact.user_id_a,
 495                        should_notify: db_contact.should_notify && !db_contact.a_to_b,
 496                        busy: db_contact.user_a_busy,
 497                    });
 498                } else if db_contact.a_to_b {
 499                    contacts.push(Contact::Incoming {
 500                        user_id: db_contact.user_id_a,
 501                        should_notify: db_contact.should_notify,
 502                    });
 503                } else {
 504                    contacts.push(Contact::Outgoing {
 505                        user_id: db_contact.user_id_a,
 506                    });
 507                }
 508            }
 509
 510            contacts.sort_unstable_by_key(|contact| contact.user_id());
 511
 512            Ok(contacts)
 513        })
 514        .await
 515    }
 516
 517    pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
 518        self.transaction(|tx| async move {
 519            let participant = room_participant::Entity::find()
 520                .filter(room_participant::Column::UserId.eq(user_id))
 521                .one(&*tx)
 522                .await?;
 523            Ok(participant.is_some())
 524        })
 525        .await
 526    }
 527
 528    pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
 529        self.transaction(|tx| async move {
 530            let (id_a, id_b) = if user_id_1 < user_id_2 {
 531                (user_id_1, user_id_2)
 532            } else {
 533                (user_id_2, user_id_1)
 534            };
 535
 536            Ok(contact::Entity::find()
 537                .filter(
 538                    contact::Column::UserIdA
 539                        .eq(id_a)
 540                        .and(contact::Column::UserIdB.eq(id_b))
 541                        .and(contact::Column::Accepted.eq(true)),
 542                )
 543                .one(&*tx)
 544                .await?
 545                .is_some())
 546        })
 547        .await
 548    }
 549
 550    pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
 551        self.transaction(|tx| async move {
 552            let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
 553                (sender_id, receiver_id, true)
 554            } else {
 555                (receiver_id, sender_id, false)
 556            };
 557
 558            let rows_affected = contact::Entity::insert(contact::ActiveModel {
 559                user_id_a: ActiveValue::set(id_a),
 560                user_id_b: ActiveValue::set(id_b),
 561                a_to_b: ActiveValue::set(a_to_b),
 562                accepted: ActiveValue::set(false),
 563                should_notify: ActiveValue::set(true),
 564                ..Default::default()
 565            })
 566            .on_conflict(
 567                OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
 568                    .values([
 569                        (contact::Column::Accepted, true.into()),
 570                        (contact::Column::ShouldNotify, false.into()),
 571                    ])
 572                    .action_and_where(
 573                        contact::Column::Accepted.eq(false).and(
 574                            contact::Column::AToB
 575                                .eq(a_to_b)
 576                                .and(contact::Column::UserIdA.eq(id_b))
 577                                .or(contact::Column::AToB
 578                                    .ne(a_to_b)
 579                                    .and(contact::Column::UserIdA.eq(id_a))),
 580                        ),
 581                    )
 582                    .to_owned(),
 583            )
 584            .exec_without_returning(&*tx)
 585            .await?;
 586
 587            if rows_affected == 1 {
 588                Ok(())
 589            } else {
 590                Err(anyhow!("contact already requested"))?
 591            }
 592        })
 593        .await
 594    }
 595
 596    /// Returns a bool indicating whether the removed contact had originally accepted or not
 597    ///
 598    /// Deletes the contact identified by the requester and responder ids, and then returns
 599    /// whether the deleted contact had originally accepted or was a pending contact request.
 600    ///
 601    /// # Arguments
 602    ///
 603    /// * `requester_id` - The user that initiates this request
 604    /// * `responder_id` - The user that will be removed
 605    pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<bool> {
 606        self.transaction(|tx| async move {
 607            let (id_a, id_b) = if responder_id < requester_id {
 608                (responder_id, requester_id)
 609            } else {
 610                (requester_id, responder_id)
 611            };
 612
 613            let contact = contact::Entity::find()
 614                .filter(
 615                    contact::Column::UserIdA
 616                        .eq(id_a)
 617                        .and(contact::Column::UserIdB.eq(id_b)),
 618                )
 619                .one(&*tx)
 620                .await?
 621                .ok_or_else(|| anyhow!("no such contact"))?;
 622
 623            contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
 624            Ok(contact.accepted)
 625        })
 626        .await
 627    }
 628
 629    pub async fn dismiss_contact_notification(
 630        &self,
 631        user_id: UserId,
 632        contact_user_id: UserId,
 633    ) -> Result<()> {
 634        self.transaction(|tx| async move {
 635            let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
 636                (user_id, contact_user_id, true)
 637            } else {
 638                (contact_user_id, user_id, false)
 639            };
 640
 641            let result = contact::Entity::update_many()
 642                .set(contact::ActiveModel {
 643                    should_notify: ActiveValue::set(false),
 644                    ..Default::default()
 645                })
 646                .filter(
 647                    contact::Column::UserIdA
 648                        .eq(id_a)
 649                        .and(contact::Column::UserIdB.eq(id_b))
 650                        .and(
 651                            contact::Column::AToB
 652                                .eq(a_to_b)
 653                                .and(contact::Column::Accepted.eq(true))
 654                                .or(contact::Column::AToB
 655                                    .ne(a_to_b)
 656                                    .and(contact::Column::Accepted.eq(false))),
 657                        ),
 658                )
 659                .exec(&*tx)
 660                .await?;
 661            if result.rows_affected == 0 {
 662                Err(anyhow!("no such contact request"))?
 663            } else {
 664                Ok(())
 665            }
 666        })
 667        .await
 668    }
 669
 670    pub async fn respond_to_contact_request(
 671        &self,
 672        responder_id: UserId,
 673        requester_id: UserId,
 674        accept: bool,
 675    ) -> Result<()> {
 676        self.transaction(|tx| async move {
 677            let (id_a, id_b, a_to_b) = if responder_id < requester_id {
 678                (responder_id, requester_id, false)
 679            } else {
 680                (requester_id, responder_id, true)
 681            };
 682            let rows_affected = if accept {
 683                let result = contact::Entity::update_many()
 684                    .set(contact::ActiveModel {
 685                        accepted: ActiveValue::set(true),
 686                        should_notify: ActiveValue::set(true),
 687                        ..Default::default()
 688                    })
 689                    .filter(
 690                        contact::Column::UserIdA
 691                            .eq(id_a)
 692                            .and(contact::Column::UserIdB.eq(id_b))
 693                            .and(contact::Column::AToB.eq(a_to_b)),
 694                    )
 695                    .exec(&*tx)
 696                    .await?;
 697                result.rows_affected
 698            } else {
 699                let result = contact::Entity::delete_many()
 700                    .filter(
 701                        contact::Column::UserIdA
 702                            .eq(id_a)
 703                            .and(contact::Column::UserIdB.eq(id_b))
 704                            .and(contact::Column::AToB.eq(a_to_b))
 705                            .and(contact::Column::Accepted.eq(false)),
 706                    )
 707                    .exec(&*tx)
 708                    .await?;
 709
 710                result.rows_affected
 711            };
 712
 713            if rows_affected == 1 {
 714                Ok(())
 715            } else {
 716                Err(anyhow!("no such contact request"))?
 717            }
 718        })
 719        .await
 720    }
 721
 722    pub fn fuzzy_like_string(string: &str) -> String {
 723        let mut result = String::with_capacity(string.len() * 2 + 1);
 724        for c in string.chars() {
 725            if c.is_alphanumeric() {
 726                result.push('%');
 727                result.push(c);
 728            }
 729        }
 730        result.push('%');
 731        result
 732    }
 733
 734    pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
 735        self.transaction(|tx| async {
 736            let tx = tx;
 737            let like_string = Self::fuzzy_like_string(name_query);
 738            let query = "
 739                SELECT users.*
 740                FROM users
 741                WHERE github_login ILIKE $1
 742                ORDER BY github_login <-> $2
 743                LIMIT $3
 744            ";
 745
 746            Ok(user::Entity::find()
 747                .from_raw_sql(Statement::from_sql_and_values(
 748                    self.pool.get_database_backend(),
 749                    query.into(),
 750                    vec![like_string.into(), name_query.into(), limit.into()],
 751                ))
 752                .all(&*tx)
 753                .await?)
 754        })
 755        .await
 756    }
 757
 758    // signups
 759
 760    pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
 761        self.transaction(|tx| async move {
 762            signup::Entity::insert(signup::ActiveModel {
 763                email_address: ActiveValue::set(signup.email_address.clone()),
 764                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 765                email_confirmation_sent: ActiveValue::set(false),
 766                platform_mac: ActiveValue::set(signup.platform_mac),
 767                platform_windows: ActiveValue::set(signup.platform_windows),
 768                platform_linux: ActiveValue::set(signup.platform_linux),
 769                platform_unknown: ActiveValue::set(false),
 770                editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
 771                programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
 772                device_id: ActiveValue::set(signup.device_id.clone()),
 773                added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
 774                ..Default::default()
 775            })
 776            .on_conflict(
 777                OnConflict::column(signup::Column::EmailAddress)
 778                    .update_columns([
 779                        signup::Column::PlatformMac,
 780                        signup::Column::PlatformWindows,
 781                        signup::Column::PlatformLinux,
 782                        signup::Column::EditorFeatures,
 783                        signup::Column::ProgrammingLanguages,
 784                        signup::Column::DeviceId,
 785                        signup::Column::AddedToMailingList,
 786                    ])
 787                    .to_owned(),
 788            )
 789            .exec(&*tx)
 790            .await?;
 791            Ok(())
 792        })
 793        .await
 794    }
 795
 796    pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
 797        self.transaction(|tx| async move {
 798            let signup = signup::Entity::find()
 799                .filter(signup::Column::EmailAddress.eq(email_address))
 800                .one(&*tx)
 801                .await?
 802                .ok_or_else(|| {
 803                    anyhow!("signup with email address {} doesn't exist", email_address)
 804                })?;
 805
 806            Ok(signup)
 807        })
 808        .await
 809    }
 810
 811    pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
 812        self.transaction(|tx| async move {
 813            let query = "
 814                SELECT
 815                    COUNT(*) as count,
 816                    COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
 817                    COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
 818                    COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
 819                    COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
 820                FROM (
 821                    SELECT *
 822                    FROM signups
 823                    WHERE
 824                        NOT email_confirmation_sent
 825                ) AS unsent
 826            ";
 827            Ok(
 828                WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
 829                    self.pool.get_database_backend(),
 830                    query.into(),
 831                    vec![],
 832                ))
 833                .one(&*tx)
 834                .await?
 835                .ok_or_else(|| anyhow!("invalid result"))?,
 836            )
 837        })
 838        .await
 839    }
 840
 841    pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
 842        let emails = invites
 843            .iter()
 844            .map(|s| s.email_address.as_str())
 845            .collect::<Vec<_>>();
 846        self.transaction(|tx| async {
 847            let tx = tx;
 848            signup::Entity::update_many()
 849                .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
 850                .set(signup::ActiveModel {
 851                    email_confirmation_sent: ActiveValue::set(true),
 852                    ..Default::default()
 853                })
 854                .exec(&*tx)
 855                .await?;
 856            Ok(())
 857        })
 858        .await
 859    }
 860
 861    pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
 862        self.transaction(|tx| async move {
 863            Ok(signup::Entity::find()
 864                .select_only()
 865                .column(signup::Column::EmailAddress)
 866                .column(signup::Column::EmailConfirmationCode)
 867                .filter(
 868                    signup::Column::EmailConfirmationSent.eq(false).and(
 869                        signup::Column::PlatformMac
 870                            .eq(true)
 871                            .or(signup::Column::PlatformUnknown.eq(true)),
 872                    ),
 873                )
 874                .order_by_asc(signup::Column::CreatedAt)
 875                .limit(count as u64)
 876                .into_model()
 877                .all(&*tx)
 878                .await?)
 879        })
 880        .await
 881    }
 882
 883    // invite codes
 884
 885    pub async fn create_invite_from_code(
 886        &self,
 887        code: &str,
 888        email_address: &str,
 889        device_id: Option<&str>,
 890        added_to_mailing_list: bool,
 891    ) -> Result<Invite> {
 892        self.transaction(|tx| async move {
 893            let existing_user = user::Entity::find()
 894                .filter(user::Column::EmailAddress.eq(email_address))
 895                .one(&*tx)
 896                .await?;
 897
 898            if existing_user.is_some() {
 899                Err(anyhow!("email address is already in use"))?;
 900            }
 901
 902            let inviting_user_with_invites = match user::Entity::find()
 903                .filter(
 904                    user::Column::InviteCode
 905                        .eq(code)
 906                        .and(user::Column::InviteCount.gt(0)),
 907                )
 908                .one(&*tx)
 909                .await?
 910            {
 911                Some(inviting_user) => inviting_user,
 912                None => {
 913                    return Err(Error::Http(
 914                        StatusCode::UNAUTHORIZED,
 915                        "unable to find an invite code with invites remaining".to_string(),
 916                    ))?
 917                }
 918            };
 919            user::Entity::update_many()
 920                .filter(
 921                    user::Column::Id
 922                        .eq(inviting_user_with_invites.id)
 923                        .and(user::Column::InviteCount.gt(0)),
 924                )
 925                .col_expr(
 926                    user::Column::InviteCount,
 927                    Expr::col(user::Column::InviteCount).sub(1),
 928                )
 929                .exec(&*tx)
 930                .await?;
 931
 932            let signup = signup::Entity::insert(signup::ActiveModel {
 933                email_address: ActiveValue::set(email_address.into()),
 934                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 935                email_confirmation_sent: ActiveValue::set(false),
 936                inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
 937                platform_linux: ActiveValue::set(false),
 938                platform_mac: ActiveValue::set(false),
 939                platform_windows: ActiveValue::set(false),
 940                platform_unknown: ActiveValue::set(true),
 941                device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
 942                added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
 943                ..Default::default()
 944            })
 945            .on_conflict(
 946                OnConflict::column(signup::Column::EmailAddress)
 947                    .update_column(signup::Column::InvitingUserId)
 948                    .to_owned(),
 949            )
 950            .exec_with_returning(&*tx)
 951            .await?;
 952
 953            Ok(Invite {
 954                email_address: signup.email_address,
 955                email_confirmation_code: signup.email_confirmation_code,
 956            })
 957        })
 958        .await
 959    }
 960
 961    pub async fn create_user_from_invite(
 962        &self,
 963        invite: &Invite,
 964        user: NewUserParams,
 965    ) -> Result<Option<NewUserResult>> {
 966        self.transaction(|tx| async {
 967            let tx = tx;
 968            let signup = signup::Entity::find()
 969                .filter(
 970                    signup::Column::EmailAddress
 971                        .eq(invite.email_address.as_str())
 972                        .and(
 973                            signup::Column::EmailConfirmationCode
 974                                .eq(invite.email_confirmation_code.as_str()),
 975                        ),
 976                )
 977                .one(&*tx)
 978                .await?
 979                .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
 980
 981            if signup.user_id.is_some() {
 982                return Ok(None);
 983            }
 984
 985            let user = user::Entity::insert(user::ActiveModel {
 986                email_address: ActiveValue::set(Some(invite.email_address.clone())),
 987                github_login: ActiveValue::set(user.github_login.clone()),
 988                github_user_id: ActiveValue::set(Some(user.github_user_id)),
 989                admin: ActiveValue::set(false),
 990                invite_count: ActiveValue::set(user.invite_count),
 991                invite_code: ActiveValue::set(Some(random_invite_code())),
 992                metrics_id: ActiveValue::set(Uuid::new_v4()),
 993                ..Default::default()
 994            })
 995            .on_conflict(
 996                OnConflict::column(user::Column::GithubLogin)
 997                    .update_columns([
 998                        user::Column::EmailAddress,
 999                        user::Column::GithubUserId,
1000                        user::Column::Admin,
1001                    ])
1002                    .to_owned(),
1003            )
1004            .exec_with_returning(&*tx)
1005            .await?;
1006
1007            let mut signup = signup.into_active_model();
1008            signup.user_id = ActiveValue::set(Some(user.id));
1009            let signup = signup.update(&*tx).await?;
1010
1011            if let Some(inviting_user_id) = signup.inviting_user_id {
1012                let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1013                    (inviting_user_id, user.id, true)
1014                } else {
1015                    (user.id, inviting_user_id, false)
1016                };
1017
1018                contact::Entity::insert(contact::ActiveModel {
1019                    user_id_a: ActiveValue::set(user_id_a),
1020                    user_id_b: ActiveValue::set(user_id_b),
1021                    a_to_b: ActiveValue::set(a_to_b),
1022                    should_notify: ActiveValue::set(true),
1023                    accepted: ActiveValue::set(true),
1024                    ..Default::default()
1025                })
1026                .on_conflict(OnConflict::new().do_nothing().to_owned())
1027                .exec_without_returning(&*tx)
1028                .await?;
1029            }
1030
1031            Ok(Some(NewUserResult {
1032                user_id: user.id,
1033                metrics_id: user.metrics_id.to_string(),
1034                inviting_user_id: signup.inviting_user_id,
1035                signup_device_id: signup.device_id,
1036            }))
1037        })
1038        .await
1039    }
1040
1041    pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1042        self.transaction(|tx| async move {
1043            if count > 0 {
1044                user::Entity::update_many()
1045                    .filter(
1046                        user::Column::Id
1047                            .eq(id)
1048                            .and(user::Column::InviteCode.is_null()),
1049                    )
1050                    .set(user::ActiveModel {
1051                        invite_code: ActiveValue::set(Some(random_invite_code())),
1052                        ..Default::default()
1053                    })
1054                    .exec(&*tx)
1055                    .await?;
1056            }
1057
1058            user::Entity::update_many()
1059                .filter(user::Column::Id.eq(id))
1060                .set(user::ActiveModel {
1061                    invite_count: ActiveValue::set(count),
1062                    ..Default::default()
1063                })
1064                .exec(&*tx)
1065                .await?;
1066            Ok(())
1067        })
1068        .await
1069    }
1070
1071    pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1072        self.transaction(|tx| async move {
1073            match user::Entity::find_by_id(id).one(&*tx).await? {
1074                Some(user) if user.invite_code.is_some() => {
1075                    Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1076                }
1077                _ => Ok(None),
1078            }
1079        })
1080        .await
1081    }
1082
1083    pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1084        self.transaction(|tx| async move {
1085            user::Entity::find()
1086                .filter(user::Column::InviteCode.eq(code))
1087                .one(&*tx)
1088                .await?
1089                .ok_or_else(|| {
1090                    Error::Http(
1091                        StatusCode::NOT_FOUND,
1092                        "that invite code does not exist".to_string(),
1093                    )
1094                })
1095        })
1096        .await
1097    }
1098
1099    // rooms
1100
1101    pub async fn incoming_call_for_user(
1102        &self,
1103        user_id: UserId,
1104    ) -> Result<Option<proto::IncomingCall>> {
1105        self.transaction(|tx| async move {
1106            let pending_participant = room_participant::Entity::find()
1107                .filter(
1108                    room_participant::Column::UserId
1109                        .eq(user_id)
1110                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1111                )
1112                .one(&*tx)
1113                .await?;
1114
1115            if let Some(pending_participant) = pending_participant {
1116                let room = self.get_room(pending_participant.room_id, &tx).await?;
1117                Ok(Self::build_incoming_call(&room, user_id))
1118            } else {
1119                Ok(None)
1120            }
1121        })
1122        .await
1123    }
1124
1125    pub async fn create_room(
1126        &self,
1127        user_id: UserId,
1128        connection: ConnectionId,
1129        live_kit_room: &str,
1130    ) -> Result<proto::Room> {
1131        self.transaction(|tx| async move {
1132            let room = room::ActiveModel {
1133                live_kit_room: ActiveValue::set(live_kit_room.into()),
1134                ..Default::default()
1135            }
1136            .insert(&*tx)
1137            .await?;
1138            room_participant::ActiveModel {
1139                room_id: ActiveValue::set(room.id),
1140                user_id: ActiveValue::set(user_id),
1141                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1142                answering_connection_server_id: ActiveValue::set(Some(ServerId(
1143                    connection.owner_id as i32,
1144                ))),
1145                answering_connection_lost: ActiveValue::set(false),
1146                calling_user_id: ActiveValue::set(user_id),
1147                calling_connection_id: ActiveValue::set(connection.id as i32),
1148                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1149                    connection.owner_id as i32,
1150                ))),
1151                ..Default::default()
1152            }
1153            .insert(&*tx)
1154            .await?;
1155
1156            let room = self.get_room(room.id, &tx).await?;
1157            Ok(room)
1158        })
1159        .await
1160    }
1161
1162    pub async fn call(
1163        &self,
1164        room_id: RoomId,
1165        calling_user_id: UserId,
1166        calling_connection: ConnectionId,
1167        called_user_id: UserId,
1168        initial_project_id: Option<ProjectId>,
1169    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1170        self.room_transaction(room_id, |tx| async move {
1171            room_participant::ActiveModel {
1172                room_id: ActiveValue::set(room_id),
1173                user_id: ActiveValue::set(called_user_id),
1174                answering_connection_lost: ActiveValue::set(false),
1175                calling_user_id: ActiveValue::set(calling_user_id),
1176                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1177                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1178                    calling_connection.owner_id as i32,
1179                ))),
1180                initial_project_id: ActiveValue::set(initial_project_id),
1181                ..Default::default()
1182            }
1183            .insert(&*tx)
1184            .await?;
1185
1186            let room = self.get_room(room_id, &tx).await?;
1187            let incoming_call = Self::build_incoming_call(&room, called_user_id)
1188                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1189            Ok((room, incoming_call))
1190        })
1191        .await
1192    }
1193
1194    pub async fn call_failed(
1195        &self,
1196        room_id: RoomId,
1197        called_user_id: UserId,
1198    ) -> Result<RoomGuard<proto::Room>> {
1199        self.room_transaction(room_id, |tx| async move {
1200            room_participant::Entity::delete_many()
1201                .filter(
1202                    room_participant::Column::RoomId
1203                        .eq(room_id)
1204                        .and(room_participant::Column::UserId.eq(called_user_id)),
1205                )
1206                .exec(&*tx)
1207                .await?;
1208            let room = self.get_room(room_id, &tx).await?;
1209            Ok(room)
1210        })
1211        .await
1212    }
1213
1214    pub async fn decline_call(
1215        &self,
1216        expected_room_id: Option<RoomId>,
1217        user_id: UserId,
1218    ) -> Result<Option<RoomGuard<proto::Room>>> {
1219        self.optional_room_transaction(|tx| async move {
1220            let mut filter = Condition::all()
1221                .add(room_participant::Column::UserId.eq(user_id))
1222                .add(room_participant::Column::AnsweringConnectionId.is_null());
1223            if let Some(room_id) = expected_room_id {
1224                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1225            }
1226            let participant = room_participant::Entity::find()
1227                .filter(filter)
1228                .one(&*tx)
1229                .await?;
1230
1231            let participant = if let Some(participant) = participant {
1232                participant
1233            } else if expected_room_id.is_some() {
1234                return Err(anyhow!("could not find call to decline"))?;
1235            } else {
1236                return Ok(None);
1237            };
1238
1239            let room_id = participant.room_id;
1240            room_participant::Entity::delete(participant.into_active_model())
1241                .exec(&*tx)
1242                .await?;
1243
1244            let room = self.get_room(room_id, &tx).await?;
1245            Ok(Some((room_id, room)))
1246        })
1247        .await
1248    }
1249
1250    pub async fn cancel_call(
1251        &self,
1252        room_id: RoomId,
1253        calling_connection: ConnectionId,
1254        called_user_id: UserId,
1255    ) -> Result<RoomGuard<proto::Room>> {
1256        self.room_transaction(room_id, |tx| async move {
1257            let participant = room_participant::Entity::find()
1258                .filter(
1259                    Condition::all()
1260                        .add(room_participant::Column::UserId.eq(called_user_id))
1261                        .add(room_participant::Column::RoomId.eq(room_id))
1262                        .add(
1263                            room_participant::Column::CallingConnectionId
1264                                .eq(calling_connection.id as i32),
1265                        )
1266                        .add(
1267                            room_participant::Column::CallingConnectionServerId
1268                                .eq(calling_connection.owner_id as i32),
1269                        )
1270                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1271                )
1272                .one(&*tx)
1273                .await?
1274                .ok_or_else(|| anyhow!("no call to cancel"))?;
1275
1276            room_participant::Entity::delete(participant.into_active_model())
1277                .exec(&*tx)
1278                .await?;
1279
1280            let room = self.get_room(room_id, &tx).await?;
1281            Ok(room)
1282        })
1283        .await
1284    }
1285
1286    pub async fn join_room(
1287        &self,
1288        room_id: RoomId,
1289        user_id: UserId,
1290        connection: ConnectionId,
1291    ) -> Result<RoomGuard<proto::Room>> {
1292        self.room_transaction(room_id, |tx| async move {
1293            let result = room_participant::Entity::update_many()
1294                .filter(
1295                    Condition::all()
1296                        .add(room_participant::Column::RoomId.eq(room_id))
1297                        .add(room_participant::Column::UserId.eq(user_id))
1298                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1299                )
1300                .set(room_participant::ActiveModel {
1301                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1302                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1303                        connection.owner_id as i32,
1304                    ))),
1305                    answering_connection_lost: ActiveValue::set(false),
1306                    ..Default::default()
1307                })
1308                .exec(&*tx)
1309                .await?;
1310            if result.rows_affected == 0 {
1311                Err(anyhow!("room does not exist or was already joined"))?
1312            } else {
1313                let room = self.get_room(room_id, &tx).await?;
1314                Ok(room)
1315            }
1316        })
1317        .await
1318    }
1319
1320    pub async fn rejoin_room(
1321        &self,
1322        rejoin_room: proto::RejoinRoom,
1323        user_id: UserId,
1324        connection: ConnectionId,
1325    ) -> Result<RoomGuard<RejoinedRoom>> {
1326        let room_id = RoomId::from_proto(rejoin_room.id);
1327        self.room_transaction(room_id, |tx| async {
1328            let tx = tx;
1329            let participant_update = room_participant::Entity::update_many()
1330                .filter(
1331                    Condition::all()
1332                        .add(room_participant::Column::RoomId.eq(room_id))
1333                        .add(room_participant::Column::UserId.eq(user_id))
1334                        .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1335                        .add(
1336                            Condition::any()
1337                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1338                                .add(
1339                                    room_participant::Column::AnsweringConnectionServerId
1340                                        .ne(connection.owner_id as i32),
1341                                ),
1342                        ),
1343                )
1344                .set(room_participant::ActiveModel {
1345                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1346                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1347                        connection.owner_id as i32,
1348                    ))),
1349                    answering_connection_lost: ActiveValue::set(false),
1350                    ..Default::default()
1351                })
1352                .exec(&*tx)
1353                .await?;
1354            if participant_update.rows_affected == 0 {
1355                return Err(anyhow!("room does not exist or was already joined"))?;
1356            }
1357
1358            let mut reshared_projects = Vec::new();
1359            for reshared_project in &rejoin_room.reshared_projects {
1360                let project_id = ProjectId::from_proto(reshared_project.project_id);
1361                let project = project::Entity::find_by_id(project_id)
1362                    .one(&*tx)
1363                    .await?
1364                    .ok_or_else(|| anyhow!("project does not exist"))?;
1365                if project.host_user_id != user_id {
1366                    return Err(anyhow!("no such project"))?;
1367                }
1368
1369                let mut collaborators = project
1370                    .find_related(project_collaborator::Entity)
1371                    .all(&*tx)
1372                    .await?;
1373                let host_ix = collaborators
1374                    .iter()
1375                    .position(|collaborator| {
1376                        collaborator.user_id == user_id && collaborator.is_host
1377                    })
1378                    .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1379                let host = collaborators.swap_remove(host_ix);
1380                let old_connection_id = host.connection();
1381
1382                project::Entity::update(project::ActiveModel {
1383                    host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1384                    host_connection_server_id: ActiveValue::set(Some(ServerId(
1385                        connection.owner_id as i32,
1386                    ))),
1387                    ..project.into_active_model()
1388                })
1389                .exec(&*tx)
1390                .await?;
1391                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1392                    connection_id: ActiveValue::set(connection.id as i32),
1393                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1394                    ..host.into_active_model()
1395                })
1396                .exec(&*tx)
1397                .await?;
1398
1399                self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1400                    .await?;
1401
1402                reshared_projects.push(ResharedProject {
1403                    id: project_id,
1404                    old_connection_id,
1405                    collaborators: collaborators
1406                        .iter()
1407                        .map(|collaborator| ProjectCollaborator {
1408                            connection_id: collaborator.connection(),
1409                            user_id: collaborator.user_id,
1410                            replica_id: collaborator.replica_id,
1411                            is_host: collaborator.is_host,
1412                        })
1413                        .collect(),
1414                    worktrees: reshared_project.worktrees.clone(),
1415                });
1416            }
1417
1418            project::Entity::delete_many()
1419                .filter(
1420                    Condition::all()
1421                        .add(project::Column::RoomId.eq(room_id))
1422                        .add(project::Column::HostUserId.eq(user_id))
1423                        .add(
1424                            project::Column::Id
1425                                .is_not_in(reshared_projects.iter().map(|project| project.id)),
1426                        ),
1427                )
1428                .exec(&*tx)
1429                .await?;
1430
1431            let mut rejoined_projects = Vec::new();
1432            for rejoined_project in &rejoin_room.rejoined_projects {
1433                let project_id = ProjectId::from_proto(rejoined_project.id);
1434                let Some(project) = project::Entity::find_by_id(project_id)
1435                    .one(&*tx)
1436                    .await? else { continue };
1437
1438                let mut worktrees = Vec::new();
1439                let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1440                for db_worktree in db_worktrees {
1441                    let mut worktree = RejoinedWorktree {
1442                        id: db_worktree.id as u64,
1443                        abs_path: db_worktree.abs_path,
1444                        root_name: db_worktree.root_name,
1445                        visible: db_worktree.visible,
1446                        updated_entries: Default::default(),
1447                        removed_entries: Default::default(),
1448                        diagnostic_summaries: Default::default(),
1449                        scan_id: db_worktree.scan_id as u64,
1450                        completed_scan_id: db_worktree.completed_scan_id as u64,
1451                    };
1452
1453                    let rejoined_worktree = rejoined_project
1454                        .worktrees
1455                        .iter()
1456                        .find(|worktree| worktree.id == db_worktree.id as u64);
1457                    let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1458                        worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1459                    } else {
1460                        worktree_entry::Column::IsDeleted.eq(false)
1461                    };
1462
1463                    let mut db_entries = worktree_entry::Entity::find()
1464                        .filter(
1465                            Condition::all()
1466                                .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1467                                .add(entry_filter),
1468                        )
1469                        .stream(&*tx)
1470                        .await?;
1471
1472                    while let Some(db_entry) = db_entries.next().await {
1473                        let db_entry = db_entry?;
1474                        if db_entry.is_deleted {
1475                            worktree.removed_entries.push(db_entry.id as u64);
1476                        } else {
1477                            worktree.updated_entries.push(proto::Entry {
1478                                id: db_entry.id as u64,
1479                                is_dir: db_entry.is_dir,
1480                                path: db_entry.path,
1481                                inode: db_entry.inode as u64,
1482                                mtime: Some(proto::Timestamp {
1483                                    seconds: db_entry.mtime_seconds as u64,
1484                                    nanos: db_entry.mtime_nanos as u32,
1485                                }),
1486                                is_symlink: db_entry.is_symlink,
1487                                is_ignored: db_entry.is_ignored,
1488                            });
1489                        }
1490                    }
1491
1492                    worktrees.push(worktree);
1493                }
1494
1495                let language_servers = project
1496                    .find_related(language_server::Entity)
1497                    .all(&*tx)
1498                    .await?
1499                    .into_iter()
1500                    .map(|language_server| proto::LanguageServer {
1501                        id: language_server.id as u64,
1502                        name: language_server.name,
1503                    })
1504                    .collect::<Vec<_>>();
1505
1506                let mut collaborators = project
1507                    .find_related(project_collaborator::Entity)
1508                    .all(&*tx)
1509                    .await?;
1510                let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1511                    .iter()
1512                    .position(|collaborator| collaborator.user_id == user_id)
1513                {
1514                    collaborators.swap_remove(self_collaborator_ix)
1515                } else {
1516                    continue;
1517                };
1518                let old_connection_id = self_collaborator.connection();
1519                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1520                    connection_id: ActiveValue::set(connection.id as i32),
1521                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1522                    ..self_collaborator.into_active_model()
1523                })
1524                .exec(&*tx)
1525                .await?;
1526
1527                let collaborators = collaborators
1528                    .into_iter()
1529                    .map(|collaborator| ProjectCollaborator {
1530                        connection_id: collaborator.connection(),
1531                        user_id: collaborator.user_id,
1532                        replica_id: collaborator.replica_id,
1533                        is_host: collaborator.is_host,
1534                    })
1535                    .collect::<Vec<_>>();
1536
1537                rejoined_projects.push(RejoinedProject {
1538                    id: project_id,
1539                    old_connection_id,
1540                    collaborators,
1541                    worktrees,
1542                    language_servers,
1543                });
1544            }
1545
1546            let room = self.get_room(room_id, &tx).await?;
1547            Ok(RejoinedRoom {
1548                room,
1549                rejoined_projects,
1550                reshared_projects,
1551            })
1552        })
1553        .await
1554    }
1555
1556    pub async fn leave_room(
1557        &self,
1558        connection: ConnectionId,
1559    ) -> Result<Option<RoomGuard<LeftRoom>>> {
1560        self.optional_room_transaction(|tx| async move {
1561            let leaving_participant = room_participant::Entity::find()
1562                .filter(
1563                    Condition::all()
1564                        .add(
1565                            room_participant::Column::AnsweringConnectionId
1566                                .eq(connection.id as i32),
1567                        )
1568                        .add(
1569                            room_participant::Column::AnsweringConnectionServerId
1570                                .eq(connection.owner_id as i32),
1571                        ),
1572                )
1573                .one(&*tx)
1574                .await?;
1575
1576            if let Some(leaving_participant) = leaving_participant {
1577                // Leave room.
1578                let room_id = leaving_participant.room_id;
1579                room_participant::Entity::delete_by_id(leaving_participant.id)
1580                    .exec(&*tx)
1581                    .await?;
1582
1583                // Cancel pending calls initiated by the leaving user.
1584                let called_participants = room_participant::Entity::find()
1585                    .filter(
1586                        Condition::all()
1587                            .add(
1588                                room_participant::Column::CallingUserId
1589                                    .eq(leaving_participant.user_id),
1590                            )
1591                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
1592                    )
1593                    .all(&*tx)
1594                    .await?;
1595                room_participant::Entity::delete_many()
1596                    .filter(
1597                        room_participant::Column::Id
1598                            .is_in(called_participants.iter().map(|participant| participant.id)),
1599                    )
1600                    .exec(&*tx)
1601                    .await?;
1602                let canceled_calls_to_user_ids = called_participants
1603                    .into_iter()
1604                    .map(|participant| participant.user_id)
1605                    .collect();
1606
1607                // Detect left projects.
1608                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1609                enum QueryProjectIds {
1610                    ProjectId,
1611                }
1612                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1613                    .select_only()
1614                    .column_as(
1615                        project_collaborator::Column::ProjectId,
1616                        QueryProjectIds::ProjectId,
1617                    )
1618                    .filter(
1619                        Condition::all()
1620                            .add(
1621                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1622                            )
1623                            .add(
1624                                project_collaborator::Column::ConnectionServerId
1625                                    .eq(connection.owner_id as i32),
1626                            ),
1627                    )
1628                    .into_values::<_, QueryProjectIds>()
1629                    .all(&*tx)
1630                    .await?;
1631                let mut left_projects = HashMap::default();
1632                let mut collaborators = project_collaborator::Entity::find()
1633                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1634                    .stream(&*tx)
1635                    .await?;
1636                while let Some(collaborator) = collaborators.next().await {
1637                    let collaborator = collaborator?;
1638                    let left_project =
1639                        left_projects
1640                            .entry(collaborator.project_id)
1641                            .or_insert(LeftProject {
1642                                id: collaborator.project_id,
1643                                host_user_id: Default::default(),
1644                                connection_ids: Default::default(),
1645                                host_connection_id: Default::default(),
1646                            });
1647
1648                    let collaborator_connection_id = collaborator.connection();
1649                    if collaborator_connection_id != connection {
1650                        left_project.connection_ids.push(collaborator_connection_id);
1651                    }
1652
1653                    if collaborator.is_host {
1654                        left_project.host_user_id = collaborator.user_id;
1655                        left_project.host_connection_id = collaborator_connection_id;
1656                    }
1657                }
1658                drop(collaborators);
1659
1660                // Leave projects.
1661                project_collaborator::Entity::delete_many()
1662                    .filter(
1663                        Condition::all()
1664                            .add(
1665                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1666                            )
1667                            .add(
1668                                project_collaborator::Column::ConnectionServerId
1669                                    .eq(connection.owner_id as i32),
1670                            ),
1671                    )
1672                    .exec(&*tx)
1673                    .await?;
1674
1675                // Unshare projects.
1676                project::Entity::delete_many()
1677                    .filter(
1678                        Condition::all()
1679                            .add(project::Column::RoomId.eq(room_id))
1680                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
1681                            .add(
1682                                project::Column::HostConnectionServerId
1683                                    .eq(connection.owner_id as i32),
1684                            ),
1685                    )
1686                    .exec(&*tx)
1687                    .await?;
1688
1689                let room = self.get_room(room_id, &tx).await?;
1690                if room.participants.is_empty() {
1691                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
1692                }
1693
1694                let left_room = LeftRoom {
1695                    room,
1696                    left_projects,
1697                    canceled_calls_to_user_ids,
1698                };
1699
1700                if left_room.room.participants.is_empty() {
1701                    self.rooms.remove(&room_id);
1702                }
1703
1704                Ok(Some((room_id, left_room)))
1705            } else {
1706                Ok(None)
1707            }
1708        })
1709        .await
1710    }
1711
1712    pub async fn follow(
1713        &self,
1714        project_id: ProjectId,
1715        leader_connection: ConnectionId,
1716        follower_connection: ConnectionId,
1717    ) -> Result<RoomGuard<proto::Room>> {
1718        let room_id = self.room_id_for_project(project_id).await?;
1719        self.room_transaction(room_id, |tx| async move {
1720            follower::ActiveModel {
1721                room_id: ActiveValue::set(room_id),
1722                project_id: ActiveValue::set(project_id),
1723                leader_connection_server_id: ActiveValue::set(ServerId(
1724                    leader_connection.owner_id as i32,
1725                )),
1726                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1727                follower_connection_server_id: ActiveValue::set(ServerId(
1728                    follower_connection.owner_id as i32,
1729                )),
1730                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1731                ..Default::default()
1732            }
1733            .insert(&*tx)
1734            .await?;
1735
1736            let room = self.get_room(room_id, &*tx).await?;
1737            Ok(room)
1738        })
1739        .await
1740    }
1741
1742    pub async fn unfollow(
1743        &self,
1744        project_id: ProjectId,
1745        leader_connection: ConnectionId,
1746        follower_connection: ConnectionId,
1747    ) -> Result<RoomGuard<proto::Room>> {
1748        let room_id = self.room_id_for_project(project_id).await?;
1749        self.room_transaction(room_id, |tx| async move {
1750            follower::Entity::delete_many()
1751                .filter(
1752                    Condition::all()
1753                        .add(follower::Column::ProjectId.eq(project_id))
1754                        .add(
1755                            follower::Column::LeaderConnectionServerId
1756                                .eq(leader_connection.owner_id)
1757                                .and(follower::Column::LeaderConnectionId.eq(leader_connection.id)),
1758                        )
1759                        .add(
1760                            follower::Column::FollowerConnectionServerId
1761                                .eq(follower_connection.owner_id)
1762                                .and(
1763                                    follower::Column::FollowerConnectionId
1764                                        .eq(follower_connection.id),
1765                                ),
1766                        ),
1767                )
1768                .exec(&*tx)
1769                .await?;
1770
1771            let room = self.get_room(room_id, &*tx).await?;
1772            Ok(room)
1773        })
1774        .await
1775    }
1776
1777    pub async fn update_room_participant_location(
1778        &self,
1779        room_id: RoomId,
1780        connection: ConnectionId,
1781        location: proto::ParticipantLocation,
1782    ) -> Result<RoomGuard<proto::Room>> {
1783        self.room_transaction(room_id, |tx| async {
1784            let tx = tx;
1785            let location_kind;
1786            let location_project_id;
1787            match location
1788                .variant
1789                .as_ref()
1790                .ok_or_else(|| anyhow!("invalid location"))?
1791            {
1792                proto::participant_location::Variant::SharedProject(project) => {
1793                    location_kind = 0;
1794                    location_project_id = Some(ProjectId::from_proto(project.id));
1795                }
1796                proto::participant_location::Variant::UnsharedProject(_) => {
1797                    location_kind = 1;
1798                    location_project_id = None;
1799                }
1800                proto::participant_location::Variant::External(_) => {
1801                    location_kind = 2;
1802                    location_project_id = None;
1803                }
1804            }
1805
1806            let result = room_participant::Entity::update_many()
1807                .filter(
1808                    Condition::all()
1809                        .add(room_participant::Column::RoomId.eq(room_id))
1810                        .add(
1811                            room_participant::Column::AnsweringConnectionId
1812                                .eq(connection.id as i32),
1813                        )
1814                        .add(
1815                            room_participant::Column::AnsweringConnectionServerId
1816                                .eq(connection.owner_id as i32),
1817                        ),
1818                )
1819                .set(room_participant::ActiveModel {
1820                    location_kind: ActiveValue::set(Some(location_kind)),
1821                    location_project_id: ActiveValue::set(location_project_id),
1822                    ..Default::default()
1823                })
1824                .exec(&*tx)
1825                .await?;
1826
1827            if result.rows_affected == 1 {
1828                let room = self.get_room(room_id, &tx).await?;
1829                Ok(room)
1830            } else {
1831                Err(anyhow!("could not update room participant location"))?
1832            }
1833        })
1834        .await
1835    }
1836
1837    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1838        self.transaction(|tx| async move {
1839            let participant = room_participant::Entity::find()
1840                .filter(
1841                    Condition::all()
1842                        .add(
1843                            room_participant::Column::AnsweringConnectionId
1844                                .eq(connection.id as i32),
1845                        )
1846                        .add(
1847                            room_participant::Column::AnsweringConnectionServerId
1848                                .eq(connection.owner_id as i32),
1849                        ),
1850                )
1851                .one(&*tx)
1852                .await?
1853                .ok_or_else(|| anyhow!("not a participant in any room"))?;
1854
1855            room_participant::Entity::update(room_participant::ActiveModel {
1856                answering_connection_lost: ActiveValue::set(true),
1857                ..participant.into_active_model()
1858            })
1859            .exec(&*tx)
1860            .await?;
1861
1862            Ok(())
1863        })
1864        .await
1865    }
1866
1867    fn build_incoming_call(
1868        room: &proto::Room,
1869        called_user_id: UserId,
1870    ) -> Option<proto::IncomingCall> {
1871        let pending_participant = room
1872            .pending_participants
1873            .iter()
1874            .find(|participant| participant.user_id == called_user_id.to_proto())?;
1875
1876        Some(proto::IncomingCall {
1877            room_id: room.id,
1878            calling_user_id: pending_participant.calling_user_id,
1879            participant_user_ids: room
1880                .participants
1881                .iter()
1882                .map(|participant| participant.user_id)
1883                .collect(),
1884            initial_project: room.participants.iter().find_map(|participant| {
1885                let initial_project_id = pending_participant.initial_project_id?;
1886                participant
1887                    .projects
1888                    .iter()
1889                    .find(|project| project.id == initial_project_id)
1890                    .cloned()
1891            }),
1892        })
1893    }
1894
1895    async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1896        let db_room = room::Entity::find_by_id(room_id)
1897            .one(tx)
1898            .await?
1899            .ok_or_else(|| anyhow!("could not find room"))?;
1900
1901        let mut db_participants = db_room
1902            .find_related(room_participant::Entity)
1903            .stream(tx)
1904            .await?;
1905        let mut participants = HashMap::default();
1906        let mut pending_participants = Vec::new();
1907        while let Some(db_participant) = db_participants.next().await {
1908            let db_participant = db_participant?;
1909            if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
1910                .answering_connection_id
1911                .zip(db_participant.answering_connection_server_id)
1912            {
1913                let location = match (
1914                    db_participant.location_kind,
1915                    db_participant.location_project_id,
1916                ) {
1917                    (Some(0), Some(project_id)) => {
1918                        Some(proto::participant_location::Variant::SharedProject(
1919                            proto::participant_location::SharedProject {
1920                                id: project_id.to_proto(),
1921                            },
1922                        ))
1923                    }
1924                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1925                        Default::default(),
1926                    )),
1927                    _ => Some(proto::participant_location::Variant::External(
1928                        Default::default(),
1929                    )),
1930                };
1931
1932                let answering_connection = ConnectionId {
1933                    owner_id: answering_connection_server_id.0 as u32,
1934                    id: answering_connection_id as u32,
1935                };
1936                participants.insert(
1937                    answering_connection,
1938                    proto::Participant {
1939                        user_id: db_participant.user_id.to_proto(),
1940                        peer_id: Some(answering_connection.into()),
1941                        projects: Default::default(),
1942                        location: Some(proto::ParticipantLocation { variant: location }),
1943                    },
1944                );
1945            } else {
1946                pending_participants.push(proto::PendingParticipant {
1947                    user_id: db_participant.user_id.to_proto(),
1948                    calling_user_id: db_participant.calling_user_id.to_proto(),
1949                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1950                });
1951            }
1952        }
1953        drop(db_participants);
1954
1955        let mut db_projects = db_room
1956            .find_related(project::Entity)
1957            .find_with_related(worktree::Entity)
1958            .stream(tx)
1959            .await?;
1960
1961        while let Some(row) = db_projects.next().await {
1962            let (db_project, db_worktree) = row?;
1963            let host_connection = db_project.host_connection()?;
1964            if let Some(participant) = participants.get_mut(&host_connection) {
1965                let project = if let Some(project) = participant
1966                    .projects
1967                    .iter_mut()
1968                    .find(|project| project.id == db_project.id.to_proto())
1969                {
1970                    project
1971                } else {
1972                    participant.projects.push(proto::ParticipantProject {
1973                        id: db_project.id.to_proto(),
1974                        worktree_root_names: Default::default(),
1975                    });
1976                    participant.projects.last_mut().unwrap()
1977                };
1978
1979                if let Some(db_worktree) = db_worktree {
1980                    if db_worktree.visible {
1981                        project.worktree_root_names.push(db_worktree.root_name);
1982                    }
1983                }
1984            }
1985        }
1986        drop(db_projects);
1987
1988        let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
1989        let mut followers = Vec::new();
1990        while let Some(db_follower) = db_followers.next().await {
1991            let db_follower = db_follower?;
1992            followers.push(proto::Follower {
1993                leader_id: Some(db_follower.leader_connection().into()),
1994                follower_id: Some(db_follower.follower_connection().into()),
1995                project_id: db_follower.project_id.to_proto(),
1996            });
1997        }
1998
1999        Ok(proto::Room {
2000            id: db_room.id.to_proto(),
2001            live_kit_room: db_room.live_kit_room,
2002            participants: participants.into_values().collect(),
2003            pending_participants,
2004            followers,
2005        })
2006    }
2007
2008    // projects
2009
2010    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
2011        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2012        enum QueryAs {
2013            Count,
2014        }
2015
2016        self.transaction(|tx| async move {
2017            Ok(project::Entity::find()
2018                .select_only()
2019                .column_as(project::Column::Id.count(), QueryAs::Count)
2020                .inner_join(user::Entity)
2021                .filter(user::Column::Admin.eq(false))
2022                .into_values::<_, QueryAs>()
2023                .one(&*tx)
2024                .await?
2025                .unwrap_or(0i64) as usize)
2026        })
2027        .await
2028    }
2029
2030    pub async fn share_project(
2031        &self,
2032        room_id: RoomId,
2033        connection: ConnectionId,
2034        worktrees: &[proto::WorktreeMetadata],
2035    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
2036        self.room_transaction(room_id, |tx| async move {
2037            let participant = room_participant::Entity::find()
2038                .filter(
2039                    Condition::all()
2040                        .add(
2041                            room_participant::Column::AnsweringConnectionId
2042                                .eq(connection.id as i32),
2043                        )
2044                        .add(
2045                            room_participant::Column::AnsweringConnectionServerId
2046                                .eq(connection.owner_id as i32),
2047                        ),
2048                )
2049                .one(&*tx)
2050                .await?
2051                .ok_or_else(|| anyhow!("could not find participant"))?;
2052            if participant.room_id != room_id {
2053                return Err(anyhow!("shared project on unexpected room"))?;
2054            }
2055
2056            let project = project::ActiveModel {
2057                room_id: ActiveValue::set(participant.room_id),
2058                host_user_id: ActiveValue::set(participant.user_id),
2059                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
2060                host_connection_server_id: ActiveValue::set(Some(ServerId(
2061                    connection.owner_id as i32,
2062                ))),
2063                ..Default::default()
2064            }
2065            .insert(&*tx)
2066            .await?;
2067
2068            if !worktrees.is_empty() {
2069                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2070                    worktree::ActiveModel {
2071                        id: ActiveValue::set(worktree.id as i64),
2072                        project_id: ActiveValue::set(project.id),
2073                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
2074                        root_name: ActiveValue::set(worktree.root_name.clone()),
2075                        visible: ActiveValue::set(worktree.visible),
2076                        scan_id: ActiveValue::set(0),
2077                        completed_scan_id: ActiveValue::set(0),
2078                    }
2079                }))
2080                .exec(&*tx)
2081                .await?;
2082            }
2083
2084            project_collaborator::ActiveModel {
2085                project_id: ActiveValue::set(project.id),
2086                connection_id: ActiveValue::set(connection.id as i32),
2087                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2088                user_id: ActiveValue::set(participant.user_id),
2089                replica_id: ActiveValue::set(ReplicaId(0)),
2090                is_host: ActiveValue::set(true),
2091                ..Default::default()
2092            }
2093            .insert(&*tx)
2094            .await?;
2095
2096            let room = self.get_room(room_id, &tx).await?;
2097            Ok((project.id, room))
2098        })
2099        .await
2100    }
2101
2102    pub async fn unshare_project(
2103        &self,
2104        project_id: ProjectId,
2105        connection: ConnectionId,
2106    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2107        let room_id = self.room_id_for_project(project_id).await?;
2108        self.room_transaction(room_id, |tx| async move {
2109            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2110
2111            let project = project::Entity::find_by_id(project_id)
2112                .one(&*tx)
2113                .await?
2114                .ok_or_else(|| anyhow!("project not found"))?;
2115            if project.host_connection()? == connection {
2116                project::Entity::delete(project.into_active_model())
2117                    .exec(&*tx)
2118                    .await?;
2119                let room = self.get_room(room_id, &tx).await?;
2120                Ok((room, guest_connection_ids))
2121            } else {
2122                Err(anyhow!("cannot unshare a project hosted by another user"))?
2123            }
2124        })
2125        .await
2126    }
2127
2128    pub async fn update_project(
2129        &self,
2130        project_id: ProjectId,
2131        connection: ConnectionId,
2132        worktrees: &[proto::WorktreeMetadata],
2133    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2134        let room_id = self.room_id_for_project(project_id).await?;
2135        self.room_transaction(room_id, |tx| async move {
2136            let project = project::Entity::find_by_id(project_id)
2137                .filter(
2138                    Condition::all()
2139                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2140                        .add(
2141                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2142                        ),
2143                )
2144                .one(&*tx)
2145                .await?
2146                .ok_or_else(|| anyhow!("no such project"))?;
2147
2148            self.update_project_worktrees(project.id, worktrees, &tx)
2149                .await?;
2150
2151            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2152            let room = self.get_room(project.room_id, &tx).await?;
2153            Ok((room, guest_connection_ids))
2154        })
2155        .await
2156    }
2157
2158    async fn update_project_worktrees(
2159        &self,
2160        project_id: ProjectId,
2161        worktrees: &[proto::WorktreeMetadata],
2162        tx: &DatabaseTransaction,
2163    ) -> Result<()> {
2164        if !worktrees.is_empty() {
2165            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2166                id: ActiveValue::set(worktree.id as i64),
2167                project_id: ActiveValue::set(project_id),
2168                abs_path: ActiveValue::set(worktree.abs_path.clone()),
2169                root_name: ActiveValue::set(worktree.root_name.clone()),
2170                visible: ActiveValue::set(worktree.visible),
2171                scan_id: ActiveValue::set(0),
2172                completed_scan_id: ActiveValue::set(0),
2173            }))
2174            .on_conflict(
2175                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2176                    .update_column(worktree::Column::RootName)
2177                    .to_owned(),
2178            )
2179            .exec(&*tx)
2180            .await?;
2181        }
2182
2183        worktree::Entity::delete_many()
2184            .filter(worktree::Column::ProjectId.eq(project_id).and(
2185                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2186            ))
2187            .exec(&*tx)
2188            .await?;
2189
2190        Ok(())
2191    }
2192
2193    pub async fn update_worktree(
2194        &self,
2195        update: &proto::UpdateWorktree,
2196        connection: ConnectionId,
2197    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2198        let project_id = ProjectId::from_proto(update.project_id);
2199        let worktree_id = update.worktree_id as i64;
2200        let room_id = self.room_id_for_project(project_id).await?;
2201        self.room_transaction(room_id, |tx| async move {
2202            // Ensure the update comes from the host.
2203            let _project = project::Entity::find_by_id(project_id)
2204                .filter(
2205                    Condition::all()
2206                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2207                        .add(
2208                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2209                        ),
2210                )
2211                .one(&*tx)
2212                .await?
2213                .ok_or_else(|| anyhow!("no such project"))?;
2214
2215            // Update metadata.
2216            worktree::Entity::update(worktree::ActiveModel {
2217                id: ActiveValue::set(worktree_id),
2218                project_id: ActiveValue::set(project_id),
2219                root_name: ActiveValue::set(update.root_name.clone()),
2220                scan_id: ActiveValue::set(update.scan_id as i64),
2221                completed_scan_id: if update.is_last_update {
2222                    ActiveValue::set(update.scan_id as i64)
2223                } else {
2224                    ActiveValue::default()
2225                },
2226                abs_path: ActiveValue::set(update.abs_path.clone()),
2227                ..Default::default()
2228            })
2229            .exec(&*tx)
2230            .await?;
2231
2232            if !update.updated_entries.is_empty() {
2233                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2234                    let mtime = entry.mtime.clone().unwrap_or_default();
2235                    worktree_entry::ActiveModel {
2236                        project_id: ActiveValue::set(project_id),
2237                        worktree_id: ActiveValue::set(worktree_id),
2238                        id: ActiveValue::set(entry.id as i64),
2239                        is_dir: ActiveValue::set(entry.is_dir),
2240                        path: ActiveValue::set(entry.path.clone()),
2241                        inode: ActiveValue::set(entry.inode as i64),
2242                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2243                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2244                        is_symlink: ActiveValue::set(entry.is_symlink),
2245                        is_ignored: ActiveValue::set(entry.is_ignored),
2246                        is_deleted: ActiveValue::set(false),
2247                        scan_id: ActiveValue::set(update.scan_id as i64),
2248                    }
2249                }))
2250                .on_conflict(
2251                    OnConflict::columns([
2252                        worktree_entry::Column::ProjectId,
2253                        worktree_entry::Column::WorktreeId,
2254                        worktree_entry::Column::Id,
2255                    ])
2256                    .update_columns([
2257                        worktree_entry::Column::IsDir,
2258                        worktree_entry::Column::Path,
2259                        worktree_entry::Column::Inode,
2260                        worktree_entry::Column::MtimeSeconds,
2261                        worktree_entry::Column::MtimeNanos,
2262                        worktree_entry::Column::IsSymlink,
2263                        worktree_entry::Column::IsIgnored,
2264                        worktree_entry::Column::ScanId,
2265                    ])
2266                    .to_owned(),
2267                )
2268                .exec(&*tx)
2269                .await?;
2270            }
2271
2272            if !update.removed_entries.is_empty() {
2273                worktree_entry::Entity::update_many()
2274                    .filter(
2275                        worktree_entry::Column::ProjectId
2276                            .eq(project_id)
2277                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2278                            .and(
2279                                worktree_entry::Column::Id
2280                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2281                            ),
2282                    )
2283                    .set(worktree_entry::ActiveModel {
2284                        is_deleted: ActiveValue::Set(true),
2285                        scan_id: ActiveValue::Set(update.scan_id as i64),
2286                        ..Default::default()
2287                    })
2288                    .exec(&*tx)
2289                    .await?;
2290            }
2291
2292            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2293            Ok(connection_ids)
2294        })
2295        .await
2296    }
2297
2298    pub async fn update_diagnostic_summary(
2299        &self,
2300        update: &proto::UpdateDiagnosticSummary,
2301        connection: ConnectionId,
2302    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2303        let project_id = ProjectId::from_proto(update.project_id);
2304        let worktree_id = update.worktree_id as i64;
2305        let room_id = self.room_id_for_project(project_id).await?;
2306        self.room_transaction(room_id, |tx| async move {
2307            let summary = update
2308                .summary
2309                .as_ref()
2310                .ok_or_else(|| anyhow!("invalid summary"))?;
2311
2312            // Ensure the update comes from the host.
2313            let project = project::Entity::find_by_id(project_id)
2314                .one(&*tx)
2315                .await?
2316                .ok_or_else(|| anyhow!("no such project"))?;
2317            if project.host_connection()? != connection {
2318                return Err(anyhow!("can't update a project hosted by someone else"))?;
2319            }
2320
2321            // Update summary.
2322            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2323                project_id: ActiveValue::set(project_id),
2324                worktree_id: ActiveValue::set(worktree_id),
2325                path: ActiveValue::set(summary.path.clone()),
2326                language_server_id: ActiveValue::set(summary.language_server_id as i64),
2327                error_count: ActiveValue::set(summary.error_count as i32),
2328                warning_count: ActiveValue::set(summary.warning_count as i32),
2329                ..Default::default()
2330            })
2331            .on_conflict(
2332                OnConflict::columns([
2333                    worktree_diagnostic_summary::Column::ProjectId,
2334                    worktree_diagnostic_summary::Column::WorktreeId,
2335                    worktree_diagnostic_summary::Column::Path,
2336                ])
2337                .update_columns([
2338                    worktree_diagnostic_summary::Column::LanguageServerId,
2339                    worktree_diagnostic_summary::Column::ErrorCount,
2340                    worktree_diagnostic_summary::Column::WarningCount,
2341                ])
2342                .to_owned(),
2343            )
2344            .exec(&*tx)
2345            .await?;
2346
2347            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2348            Ok(connection_ids)
2349        })
2350        .await
2351    }
2352
2353    pub async fn start_language_server(
2354        &self,
2355        update: &proto::StartLanguageServer,
2356        connection: ConnectionId,
2357    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2358        let project_id = ProjectId::from_proto(update.project_id);
2359        let room_id = self.room_id_for_project(project_id).await?;
2360        self.room_transaction(room_id, |tx| async move {
2361            let server = update
2362                .server
2363                .as_ref()
2364                .ok_or_else(|| anyhow!("invalid language server"))?;
2365
2366            // Ensure the update comes from the host.
2367            let project = project::Entity::find_by_id(project_id)
2368                .one(&*tx)
2369                .await?
2370                .ok_or_else(|| anyhow!("no such project"))?;
2371            if project.host_connection()? != connection {
2372                return Err(anyhow!("can't update a project hosted by someone else"))?;
2373            }
2374
2375            // Add the newly-started language server.
2376            language_server::Entity::insert(language_server::ActiveModel {
2377                project_id: ActiveValue::set(project_id),
2378                id: ActiveValue::set(server.id as i64),
2379                name: ActiveValue::set(server.name.clone()),
2380                ..Default::default()
2381            })
2382            .on_conflict(
2383                OnConflict::columns([
2384                    language_server::Column::ProjectId,
2385                    language_server::Column::Id,
2386                ])
2387                .update_column(language_server::Column::Name)
2388                .to_owned(),
2389            )
2390            .exec(&*tx)
2391            .await?;
2392
2393            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2394            Ok(connection_ids)
2395        })
2396        .await
2397    }
2398
2399    pub async fn join_project(
2400        &self,
2401        project_id: ProjectId,
2402        connection: ConnectionId,
2403    ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2404        let room_id = self.room_id_for_project(project_id).await?;
2405        self.room_transaction(room_id, |tx| async move {
2406            let participant = room_participant::Entity::find()
2407                .filter(
2408                    Condition::all()
2409                        .add(
2410                            room_participant::Column::AnsweringConnectionId
2411                                .eq(connection.id as i32),
2412                        )
2413                        .add(
2414                            room_participant::Column::AnsweringConnectionServerId
2415                                .eq(connection.owner_id as i32),
2416                        ),
2417                )
2418                .one(&*tx)
2419                .await?
2420                .ok_or_else(|| anyhow!("must join a room first"))?;
2421
2422            let project = project::Entity::find_by_id(project_id)
2423                .one(&*tx)
2424                .await?
2425                .ok_or_else(|| anyhow!("no such project"))?;
2426            if project.room_id != participant.room_id {
2427                return Err(anyhow!("no such project"))?;
2428            }
2429
2430            let mut collaborators = project
2431                .find_related(project_collaborator::Entity)
2432                .all(&*tx)
2433                .await?;
2434            let replica_ids = collaborators
2435                .iter()
2436                .map(|c| c.replica_id)
2437                .collect::<HashSet<_>>();
2438            let mut replica_id = ReplicaId(1);
2439            while replica_ids.contains(&replica_id) {
2440                replica_id.0 += 1;
2441            }
2442            let new_collaborator = project_collaborator::ActiveModel {
2443                project_id: ActiveValue::set(project_id),
2444                connection_id: ActiveValue::set(connection.id as i32),
2445                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2446                user_id: ActiveValue::set(participant.user_id),
2447                replica_id: ActiveValue::set(replica_id),
2448                is_host: ActiveValue::set(false),
2449                ..Default::default()
2450            }
2451            .insert(&*tx)
2452            .await?;
2453            collaborators.push(new_collaborator);
2454
2455            let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2456            let mut worktrees = db_worktrees
2457                .into_iter()
2458                .map(|db_worktree| {
2459                    (
2460                        db_worktree.id as u64,
2461                        Worktree {
2462                            id: db_worktree.id as u64,
2463                            abs_path: db_worktree.abs_path,
2464                            root_name: db_worktree.root_name,
2465                            visible: db_worktree.visible,
2466                            entries: Default::default(),
2467                            diagnostic_summaries: Default::default(),
2468                            scan_id: db_worktree.scan_id as u64,
2469                            completed_scan_id: db_worktree.completed_scan_id as u64,
2470                        },
2471                    )
2472                })
2473                .collect::<BTreeMap<_, _>>();
2474
2475            // Populate worktree entries.
2476            {
2477                let mut db_entries = worktree_entry::Entity::find()
2478                    .filter(
2479                        Condition::all()
2480                            .add(worktree_entry::Column::ProjectId.eq(project_id))
2481                            .add(worktree_entry::Column::IsDeleted.eq(false)),
2482                    )
2483                    .stream(&*tx)
2484                    .await?;
2485                while let Some(db_entry) = db_entries.next().await {
2486                    let db_entry = db_entry?;
2487                    if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2488                        worktree.entries.push(proto::Entry {
2489                            id: db_entry.id as u64,
2490                            is_dir: db_entry.is_dir,
2491                            path: db_entry.path,
2492                            inode: db_entry.inode as u64,
2493                            mtime: Some(proto::Timestamp {
2494                                seconds: db_entry.mtime_seconds as u64,
2495                                nanos: db_entry.mtime_nanos as u32,
2496                            }),
2497                            is_symlink: db_entry.is_symlink,
2498                            is_ignored: db_entry.is_ignored,
2499                        });
2500                    }
2501                }
2502            }
2503
2504            // Populate worktree diagnostic summaries.
2505            {
2506                let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2507                    .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2508                    .stream(&*tx)
2509                    .await?;
2510                while let Some(db_summary) = db_summaries.next().await {
2511                    let db_summary = db_summary?;
2512                    if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2513                        worktree
2514                            .diagnostic_summaries
2515                            .push(proto::DiagnosticSummary {
2516                                path: db_summary.path,
2517                                language_server_id: db_summary.language_server_id as u64,
2518                                error_count: db_summary.error_count as u32,
2519                                warning_count: db_summary.warning_count as u32,
2520                            });
2521                    }
2522                }
2523            }
2524
2525            // Populate language servers.
2526            let language_servers = project
2527                .find_related(language_server::Entity)
2528                .all(&*tx)
2529                .await?;
2530
2531            let project = Project {
2532                collaborators: collaborators
2533                    .into_iter()
2534                    .map(|collaborator| ProjectCollaborator {
2535                        connection_id: collaborator.connection(),
2536                        user_id: collaborator.user_id,
2537                        replica_id: collaborator.replica_id,
2538                        is_host: collaborator.is_host,
2539                    })
2540                    .collect(),
2541                worktrees,
2542                language_servers: language_servers
2543                    .into_iter()
2544                    .map(|language_server| proto::LanguageServer {
2545                        id: language_server.id as u64,
2546                        name: language_server.name,
2547                    })
2548                    .collect(),
2549            };
2550            Ok((project, replica_id as ReplicaId))
2551        })
2552        .await
2553    }
2554
2555    pub async fn leave_project(
2556        &self,
2557        project_id: ProjectId,
2558        connection: ConnectionId,
2559    ) -> Result<RoomGuard<LeftProject>> {
2560        let room_id = self.room_id_for_project(project_id).await?;
2561        self.room_transaction(room_id, |tx| async move {
2562            let result = project_collaborator::Entity::delete_many()
2563                .filter(
2564                    Condition::all()
2565                        .add(project_collaborator::Column::ProjectId.eq(project_id))
2566                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2567                        .add(
2568                            project_collaborator::Column::ConnectionServerId
2569                                .eq(connection.owner_id as i32),
2570                        ),
2571                )
2572                .exec(&*tx)
2573                .await?;
2574            if result.rows_affected == 0 {
2575                Err(anyhow!("not a collaborator on this project"))?;
2576            }
2577
2578            let project = project::Entity::find_by_id(project_id)
2579                .one(&*tx)
2580                .await?
2581                .ok_or_else(|| anyhow!("no such project"))?;
2582            let collaborators = project
2583                .find_related(project_collaborator::Entity)
2584                .all(&*tx)
2585                .await?;
2586            let connection_ids = collaborators
2587                .into_iter()
2588                .map(|collaborator| collaborator.connection())
2589                .collect();
2590
2591            let left_project = LeftProject {
2592                id: project_id,
2593                host_user_id: project.host_user_id,
2594                host_connection_id: project.host_connection()?,
2595                connection_ids,
2596            };
2597            Ok(left_project)
2598        })
2599        .await
2600    }
2601
2602    pub async fn project_collaborators(
2603        &self,
2604        project_id: ProjectId,
2605        connection_id: ConnectionId,
2606    ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
2607        let room_id = self.room_id_for_project(project_id).await?;
2608        self.room_transaction(room_id, |tx| async move {
2609            let collaborators = project_collaborator::Entity::find()
2610                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2611                .all(&*tx)
2612                .await?
2613                .into_iter()
2614                .map(|collaborator| ProjectCollaborator {
2615                    connection_id: collaborator.connection(),
2616                    user_id: collaborator.user_id,
2617                    replica_id: collaborator.replica_id,
2618                    is_host: collaborator.is_host,
2619                })
2620                .collect::<Vec<_>>();
2621
2622            if collaborators
2623                .iter()
2624                .any(|collaborator| collaborator.connection_id == connection_id)
2625            {
2626                Ok(collaborators)
2627            } else {
2628                Err(anyhow!("no such project"))?
2629            }
2630        })
2631        .await
2632    }
2633
2634    pub async fn project_connection_ids(
2635        &self,
2636        project_id: ProjectId,
2637        connection_id: ConnectionId,
2638    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
2639        let room_id = self.room_id_for_project(project_id).await?;
2640        self.room_transaction(room_id, |tx| async move {
2641            let mut collaborators = project_collaborator::Entity::find()
2642                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2643                .stream(&*tx)
2644                .await?;
2645
2646            let mut connection_ids = HashSet::default();
2647            while let Some(collaborator) = collaborators.next().await {
2648                let collaborator = collaborator?;
2649                connection_ids.insert(collaborator.connection());
2650            }
2651
2652            if connection_ids.contains(&connection_id) {
2653                Ok(connection_ids)
2654            } else {
2655                Err(anyhow!("no such project"))?
2656            }
2657        })
2658        .await
2659    }
2660
2661    async fn project_guest_connection_ids(
2662        &self,
2663        project_id: ProjectId,
2664        tx: &DatabaseTransaction,
2665    ) -> Result<Vec<ConnectionId>> {
2666        let mut collaborators = project_collaborator::Entity::find()
2667            .filter(
2668                project_collaborator::Column::ProjectId
2669                    .eq(project_id)
2670                    .and(project_collaborator::Column::IsHost.eq(false)),
2671            )
2672            .stream(tx)
2673            .await?;
2674
2675        let mut guest_connection_ids = Vec::new();
2676        while let Some(collaborator) = collaborators.next().await {
2677            let collaborator = collaborator?;
2678            guest_connection_ids.push(collaborator.connection());
2679        }
2680        Ok(guest_connection_ids)
2681    }
2682
2683    async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
2684        self.transaction(|tx| async move {
2685            let project = project::Entity::find_by_id(project_id)
2686                .one(&*tx)
2687                .await?
2688                .ok_or_else(|| anyhow!("project {} not found", project_id))?;
2689            Ok(project.room_id)
2690        })
2691        .await
2692    }
2693
2694    // access tokens
2695
2696    pub async fn create_access_token_hash(
2697        &self,
2698        user_id: UserId,
2699        access_token_hash: &str,
2700        max_access_token_count: usize,
2701    ) -> Result<()> {
2702        self.transaction(|tx| async {
2703            let tx = tx;
2704
2705            access_token::ActiveModel {
2706                user_id: ActiveValue::set(user_id),
2707                hash: ActiveValue::set(access_token_hash.into()),
2708                ..Default::default()
2709            }
2710            .insert(&*tx)
2711            .await?;
2712
2713            access_token::Entity::delete_many()
2714                .filter(
2715                    access_token::Column::Id.in_subquery(
2716                        Query::select()
2717                            .column(access_token::Column::Id)
2718                            .from(access_token::Entity)
2719                            .and_where(access_token::Column::UserId.eq(user_id))
2720                            .order_by(access_token::Column::Id, sea_orm::Order::Desc)
2721                            .limit(10000)
2722                            .offset(max_access_token_count as u64)
2723                            .to_owned(),
2724                    ),
2725                )
2726                .exec(&*tx)
2727                .await?;
2728            Ok(())
2729        })
2730        .await
2731    }
2732
2733    pub async fn get_access_token_hashes(&self, user_id: UserId) -> Result<Vec<String>> {
2734        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2735        enum QueryAs {
2736            Hash,
2737        }
2738
2739        self.transaction(|tx| async move {
2740            Ok(access_token::Entity::find()
2741                .select_only()
2742                .column(access_token::Column::Hash)
2743                .filter(access_token::Column::UserId.eq(user_id))
2744                .order_by_desc(access_token::Column::Id)
2745                .into_values::<_, QueryAs>()
2746                .all(&*tx)
2747                .await?)
2748        })
2749        .await
2750    }
2751
2752    async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
2753    where
2754        F: Send + Fn(TransactionHandle) -> Fut,
2755        Fut: Send + Future<Output = Result<T>>,
2756    {
2757        let body = async {
2758            loop {
2759                let (tx, result) = self.with_transaction(&f).await?;
2760                match result {
2761                    Ok(result) => {
2762                        match tx.commit().await.map_err(Into::into) {
2763                            Ok(()) => return Ok(result),
2764                            Err(error) => {
2765                                if is_serialization_error(&error) {
2766                                    // Retry (don't break the loop)
2767                                } else {
2768                                    return Err(error);
2769                                }
2770                            }
2771                        }
2772                    }
2773                    Err(error) => {
2774                        tx.rollback().await?;
2775                        if is_serialization_error(&error) {
2776                            // Retry (don't break the loop)
2777                        } else {
2778                            return Err(error);
2779                        }
2780                    }
2781                }
2782            }
2783        };
2784
2785        self.run(body).await
2786    }
2787
2788    async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
2789    where
2790        F: Send + Fn(TransactionHandle) -> Fut,
2791        Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
2792    {
2793        let body = async {
2794            loop {
2795                let (tx, result) = self.with_transaction(&f).await?;
2796                match result {
2797                    Ok(Some((room_id, data))) => {
2798                        let lock = self.rooms.entry(room_id).or_default().clone();
2799                        let _guard = lock.lock_owned().await;
2800                        match tx.commit().await.map_err(Into::into) {
2801                            Ok(()) => {
2802                                return Ok(Some(RoomGuard {
2803                                    data,
2804                                    _guard,
2805                                    _not_send: PhantomData,
2806                                }));
2807                            }
2808                            Err(error) => {
2809                                if is_serialization_error(&error) {
2810                                    // Retry (don't break the loop)
2811                                } else {
2812                                    return Err(error);
2813                                }
2814                            }
2815                        }
2816                    }
2817                    Ok(None) => {
2818                        match tx.commit().await.map_err(Into::into) {
2819                            Ok(()) => return Ok(None),
2820                            Err(error) => {
2821                                if is_serialization_error(&error) {
2822                                    // Retry (don't break the loop)
2823                                } else {
2824                                    return Err(error);
2825                                }
2826                            }
2827                        }
2828                    }
2829                    Err(error) => {
2830                        tx.rollback().await?;
2831                        if is_serialization_error(&error) {
2832                            // Retry (don't break the loop)
2833                        } else {
2834                            return Err(error);
2835                        }
2836                    }
2837                }
2838            }
2839        };
2840
2841        self.run(body).await
2842    }
2843
2844    async fn room_transaction<F, Fut, T>(&self, room_id: RoomId, f: F) -> Result<RoomGuard<T>>
2845    where
2846        F: Send + Fn(TransactionHandle) -> Fut,
2847        Fut: Send + Future<Output = Result<T>>,
2848    {
2849        let body = async {
2850            loop {
2851                let lock = self.rooms.entry(room_id).or_default().clone();
2852                let _guard = lock.lock_owned().await;
2853                let (tx, result) = self.with_transaction(&f).await?;
2854                match result {
2855                    Ok(data) => {
2856                        match tx.commit().await.map_err(Into::into) {
2857                            Ok(()) => {
2858                                return Ok(RoomGuard {
2859                                    data,
2860                                    _guard,
2861                                    _not_send: PhantomData,
2862                                });
2863                            }
2864                            Err(error) => {
2865                                if is_serialization_error(&error) {
2866                                    // Retry (don't break the loop)
2867                                } else {
2868                                    return Err(error);
2869                                }
2870                            }
2871                        }
2872                    }
2873                    Err(error) => {
2874                        tx.rollback().await?;
2875                        if is_serialization_error(&error) {
2876                            // Retry (don't break the loop)
2877                        } else {
2878                            return Err(error);
2879                        }
2880                    }
2881                }
2882            }
2883        };
2884
2885        self.run(body).await
2886    }
2887
2888    async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
2889    where
2890        F: Send + Fn(TransactionHandle) -> Fut,
2891        Fut: Send + Future<Output = Result<T>>,
2892    {
2893        let tx = self
2894            .pool
2895            .begin_with_config(Some(IsolationLevel::Serializable), None)
2896            .await?;
2897
2898        let mut tx = Arc::new(Some(tx));
2899        let result = f(TransactionHandle(tx.clone())).await;
2900        let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
2901            return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
2902        };
2903
2904        Ok((tx, result))
2905    }
2906
2907    async fn run<F, T>(&self, future: F) -> T
2908    where
2909        F: Future<Output = T>,
2910    {
2911        #[cfg(test)]
2912        {
2913            if let Some(background) = self.background.as_ref() {
2914                background.simulate_random_delay().await;
2915            }
2916
2917            self.runtime.as_ref().unwrap().block_on(future)
2918        }
2919
2920        #[cfg(not(test))]
2921        {
2922            future.await
2923        }
2924    }
2925}
2926
2927fn is_serialization_error(error: &Error) -> bool {
2928    const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
2929    match error {
2930        Error::Database(
2931            DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
2932            | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
2933        ) if error
2934            .as_database_error()
2935            .and_then(|error| error.code())
2936            .as_deref()
2937            == Some(SERIALIZATION_FAILURE_CODE) =>
2938        {
2939            true
2940        }
2941        _ => false,
2942    }
2943}
2944
2945struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
2946
2947impl Deref for TransactionHandle {
2948    type Target = DatabaseTransaction;
2949
2950    fn deref(&self) -> &Self::Target {
2951        self.0.as_ref().as_ref().unwrap()
2952    }
2953}
2954
2955pub struct RoomGuard<T> {
2956    data: T,
2957    _guard: OwnedMutexGuard<()>,
2958    _not_send: PhantomData<Rc<()>>,
2959}
2960
2961impl<T> Deref for RoomGuard<T> {
2962    type Target = T;
2963
2964    fn deref(&self) -> &T {
2965        &self.data
2966    }
2967}
2968
2969impl<T> DerefMut for RoomGuard<T> {
2970    fn deref_mut(&mut self) -> &mut T {
2971        &mut self.data
2972    }
2973}
2974
2975#[derive(Debug, Serialize, Deserialize)]
2976pub struct NewUserParams {
2977    pub github_login: String,
2978    pub github_user_id: i32,
2979    pub invite_count: i32,
2980}
2981
2982#[derive(Debug)]
2983pub struct NewUserResult {
2984    pub user_id: UserId,
2985    pub metrics_id: String,
2986    pub inviting_user_id: Option<UserId>,
2987    pub signup_device_id: Option<String>,
2988}
2989
2990fn random_invite_code() -> String {
2991    nanoid::nanoid!(16)
2992}
2993
2994fn random_email_confirmation_code() -> String {
2995    nanoid::nanoid!(64)
2996}
2997
2998macro_rules! id_type {
2999    ($name:ident) => {
3000        #[derive(
3001            Clone,
3002            Copy,
3003            Debug,
3004            Default,
3005            PartialEq,
3006            Eq,
3007            PartialOrd,
3008            Ord,
3009            Hash,
3010            Serialize,
3011            Deserialize,
3012        )]
3013        #[serde(transparent)]
3014        pub struct $name(pub i32);
3015
3016        impl $name {
3017            #[allow(unused)]
3018            pub const MAX: Self = Self(i32::MAX);
3019
3020            #[allow(unused)]
3021            pub fn from_proto(value: u64) -> Self {
3022                Self(value as i32)
3023            }
3024
3025            #[allow(unused)]
3026            pub fn to_proto(self) -> u64 {
3027                self.0 as u64
3028            }
3029        }
3030
3031        impl std::fmt::Display for $name {
3032            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
3033                self.0.fmt(f)
3034            }
3035        }
3036
3037        impl From<$name> for sea_query::Value {
3038            fn from(value: $name) -> Self {
3039                sea_query::Value::Int(Some(value.0))
3040            }
3041        }
3042
3043        impl sea_orm::TryGetable for $name {
3044            fn try_get(
3045                res: &sea_orm::QueryResult,
3046                pre: &str,
3047                col: &str,
3048            ) -> Result<Self, sea_orm::TryGetError> {
3049                Ok(Self(i32::try_get(res, pre, col)?))
3050            }
3051        }
3052
3053        impl sea_query::ValueType for $name {
3054            fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
3055                match v {
3056                    Value::TinyInt(Some(int)) => {
3057                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3058                    }
3059                    Value::SmallInt(Some(int)) => {
3060                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3061                    }
3062                    Value::Int(Some(int)) => {
3063                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3064                    }
3065                    Value::BigInt(Some(int)) => {
3066                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3067                    }
3068                    Value::TinyUnsigned(Some(int)) => {
3069                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3070                    }
3071                    Value::SmallUnsigned(Some(int)) => {
3072                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3073                    }
3074                    Value::Unsigned(Some(int)) => {
3075                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3076                    }
3077                    Value::BigUnsigned(Some(int)) => {
3078                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3079                    }
3080                    _ => Err(sea_query::ValueTypeErr),
3081                }
3082            }
3083
3084            fn type_name() -> String {
3085                stringify!($name).into()
3086            }
3087
3088            fn array_type() -> sea_query::ArrayType {
3089                sea_query::ArrayType::Int
3090            }
3091
3092            fn column_type() -> sea_query::ColumnType {
3093                sea_query::ColumnType::Integer(None)
3094            }
3095        }
3096
3097        impl sea_orm::TryFromU64 for $name {
3098            fn try_from_u64(n: u64) -> Result<Self, DbErr> {
3099                Ok(Self(n.try_into().map_err(|_| {
3100                    DbErr::ConvertFromU64(concat!(
3101                        "error converting ",
3102                        stringify!($name),
3103                        " to u64"
3104                    ))
3105                })?))
3106            }
3107        }
3108
3109        impl sea_query::Nullable for $name {
3110            fn null() -> Value {
3111                Value::Int(None)
3112            }
3113        }
3114    };
3115}
3116
3117id_type!(AccessTokenId);
3118id_type!(ContactId);
3119id_type!(FollowerId);
3120id_type!(RoomId);
3121id_type!(RoomParticipantId);
3122id_type!(ProjectId);
3123id_type!(ProjectCollaboratorId);
3124id_type!(ReplicaId);
3125id_type!(ServerId);
3126id_type!(SignupId);
3127id_type!(UserId);
3128
3129pub struct RejoinedRoom {
3130    pub room: proto::Room,
3131    pub rejoined_projects: Vec<RejoinedProject>,
3132    pub reshared_projects: Vec<ResharedProject>,
3133}
3134
3135pub struct ResharedProject {
3136    pub id: ProjectId,
3137    pub old_connection_id: ConnectionId,
3138    pub collaborators: Vec<ProjectCollaborator>,
3139    pub worktrees: Vec<proto::WorktreeMetadata>,
3140}
3141
3142pub struct RejoinedProject {
3143    pub id: ProjectId,
3144    pub old_connection_id: ConnectionId,
3145    pub collaborators: Vec<ProjectCollaborator>,
3146    pub worktrees: Vec<RejoinedWorktree>,
3147    pub language_servers: Vec<proto::LanguageServer>,
3148}
3149
3150#[derive(Debug)]
3151pub struct RejoinedWorktree {
3152    pub id: u64,
3153    pub abs_path: String,
3154    pub root_name: String,
3155    pub visible: bool,
3156    pub updated_entries: Vec<proto::Entry>,
3157    pub removed_entries: Vec<u64>,
3158    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3159    pub scan_id: u64,
3160    pub completed_scan_id: u64,
3161}
3162
3163pub struct LeftRoom {
3164    pub room: proto::Room,
3165    pub left_projects: HashMap<ProjectId, LeftProject>,
3166    pub canceled_calls_to_user_ids: Vec<UserId>,
3167}
3168
3169pub struct RefreshedRoom {
3170    pub room: proto::Room,
3171    pub stale_participant_user_ids: Vec<UserId>,
3172    pub canceled_calls_to_user_ids: Vec<UserId>,
3173}
3174
3175pub struct Project {
3176    pub collaborators: Vec<ProjectCollaborator>,
3177    pub worktrees: BTreeMap<u64, Worktree>,
3178    pub language_servers: Vec<proto::LanguageServer>,
3179}
3180
3181pub struct ProjectCollaborator {
3182    pub connection_id: ConnectionId,
3183    pub user_id: UserId,
3184    pub replica_id: ReplicaId,
3185    pub is_host: bool,
3186}
3187
3188impl ProjectCollaborator {
3189    pub fn to_proto(&self) -> proto::Collaborator {
3190        proto::Collaborator {
3191            peer_id: Some(self.connection_id.into()),
3192            replica_id: self.replica_id.0 as u32,
3193            user_id: self.user_id.to_proto(),
3194        }
3195    }
3196}
3197
3198#[derive(Debug)]
3199pub struct LeftProject {
3200    pub id: ProjectId,
3201    pub host_user_id: UserId,
3202    pub host_connection_id: ConnectionId,
3203    pub connection_ids: Vec<ConnectionId>,
3204}
3205
3206pub struct Worktree {
3207    pub id: u64,
3208    pub abs_path: String,
3209    pub root_name: String,
3210    pub visible: bool,
3211    pub entries: Vec<proto::Entry>,
3212    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3213    pub scan_id: u64,
3214    pub completed_scan_id: u64,
3215}
3216
3217#[cfg(test)]
3218pub use test::*;
3219
3220#[cfg(test)]
3221mod test {
3222    use super::*;
3223    use gpui::executor::Background;
3224    use lazy_static::lazy_static;
3225    use parking_lot::Mutex;
3226    use rand::prelude::*;
3227    use sea_orm::ConnectionTrait;
3228    use sqlx::migrate::MigrateDatabase;
3229    use std::sync::Arc;
3230
3231    pub struct TestDb {
3232        pub db: Option<Arc<Database>>,
3233        pub connection: Option<sqlx::AnyConnection>,
3234    }
3235
3236    impl TestDb {
3237        pub fn sqlite(background: Arc<Background>) -> Self {
3238            let url = format!("sqlite::memory:");
3239            let runtime = tokio::runtime::Builder::new_current_thread()
3240                .enable_io()
3241                .enable_time()
3242                .build()
3243                .unwrap();
3244
3245            let mut db = runtime.block_on(async {
3246                let mut options = ConnectOptions::new(url);
3247                options.max_connections(5);
3248                let db = Database::new(options).await.unwrap();
3249                let sql = include_str!(concat!(
3250                    env!("CARGO_MANIFEST_DIR"),
3251                    "/migrations.sqlite/20221109000000_test_schema.sql"
3252                ));
3253                db.pool
3254                    .execute(sea_orm::Statement::from_string(
3255                        db.pool.get_database_backend(),
3256                        sql.into(),
3257                    ))
3258                    .await
3259                    .unwrap();
3260                db
3261            });
3262
3263            db.background = Some(background);
3264            db.runtime = Some(runtime);
3265
3266            Self {
3267                db: Some(Arc::new(db)),
3268                connection: None,
3269            }
3270        }
3271
3272        pub fn postgres(background: Arc<Background>) -> Self {
3273            lazy_static! {
3274                static ref LOCK: Mutex<()> = Mutex::new(());
3275            }
3276
3277            let _guard = LOCK.lock();
3278            let mut rng = StdRng::from_entropy();
3279            let url = format!(
3280                "postgres://postgres@localhost/zed-test-{}",
3281                rng.gen::<u128>()
3282            );
3283            let runtime = tokio::runtime::Builder::new_current_thread()
3284                .enable_io()
3285                .enable_time()
3286                .build()
3287                .unwrap();
3288
3289            let mut db = runtime.block_on(async {
3290                sqlx::Postgres::create_database(&url)
3291                    .await
3292                    .expect("failed to create test db");
3293                let mut options = ConnectOptions::new(url);
3294                options
3295                    .max_connections(5)
3296                    .idle_timeout(Duration::from_secs(0));
3297                let db = Database::new(options).await.unwrap();
3298                let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
3299                db.migrate(Path::new(migrations_path), false).await.unwrap();
3300                db
3301            });
3302
3303            db.background = Some(background);
3304            db.runtime = Some(runtime);
3305
3306            Self {
3307                db: Some(Arc::new(db)),
3308                connection: None,
3309            }
3310        }
3311
3312        pub fn db(&self) -> &Arc<Database> {
3313            self.db.as_ref().unwrap()
3314        }
3315    }
3316
3317    impl Drop for TestDb {
3318        fn drop(&mut self) {
3319            let db = self.db.take().unwrap();
3320            if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
3321                db.runtime.as_ref().unwrap().block_on(async {
3322                    use util::ResultExt;
3323                    let query = "
3324                        SELECT pg_terminate_backend(pg_stat_activity.pid)
3325                        FROM pg_stat_activity
3326                        WHERE
3327                            pg_stat_activity.datname = current_database() AND
3328                            pid <> pg_backend_pid();
3329                    ";
3330                    db.pool
3331                        .execute(sea_orm::Statement::from_string(
3332                            db.pool.get_database_backend(),
3333                            query.into(),
3334                        ))
3335                        .await
3336                        .log_err();
3337                    sqlx::Postgres::drop_database(db.options.get_url())
3338                        .await
3339                        .log_err();
3340                })
3341            }
3342        }
3343    }
3344}