db.rs

   1mod access_token;
   2mod contact;
   3mod follower;
   4mod language_server;
   5mod project;
   6mod project_collaborator;
   7mod room;
   8mod room_participant;
   9mod server;
  10mod signup;
  11#[cfg(test)]
  12mod tests;
  13mod user;
  14mod worktree;
  15mod worktree_diagnostic_summary;
  16mod worktree_entry;
  17
  18use crate::{Error, Result};
  19use anyhow::anyhow;
  20use collections::{BTreeMap, HashMap, HashSet};
  21pub use contact::Contact;
  22use dashmap::DashMap;
  23use futures::StreamExt;
  24use hyper::StatusCode;
  25use rpc::{proto, ConnectionId};
  26use sea_orm::Condition;
  27pub use sea_orm::ConnectOptions;
  28use sea_orm::{
  29    entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
  30    DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
  31    Statement, TransactionTrait,
  32};
  33use sea_query::{Alias, Expr, OnConflict, Query};
  34use serde::{Deserialize, Serialize};
  35pub use signup::{Invite, NewSignup, WaitlistSummary};
  36use sqlx::migrate::{Migrate, Migration, MigrationSource};
  37use sqlx::Connection;
  38use std::ops::{Deref, DerefMut};
  39use std::path::Path;
  40use std::time::Duration;
  41use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
  42use tokio::sync::{Mutex, OwnedMutexGuard};
  43pub use user::Model as User;
  44
  45pub struct Database {
  46    options: ConnectOptions,
  47    pool: DatabaseConnection,
  48    rooms: DashMap<RoomId, Arc<Mutex<()>>>,
  49    #[cfg(test)]
  50    background: Option<std::sync::Arc<gpui::executor::Background>>,
  51    #[cfg(test)]
  52    runtime: Option<tokio::runtime::Runtime>,
  53}
  54
  55impl Database {
  56    pub async fn new(options: ConnectOptions) -> Result<Self> {
  57        Ok(Self {
  58            options: options.clone(),
  59            pool: sea_orm::Database::connect(options).await?,
  60            rooms: DashMap::with_capacity(16384),
  61            #[cfg(test)]
  62            background: None,
  63            #[cfg(test)]
  64            runtime: None,
  65        })
  66    }
  67
  68    #[cfg(test)]
  69    pub fn reset(&self) {
  70        self.rooms.clear();
  71    }
  72
  73    pub async fn migrate(
  74        &self,
  75        migrations_path: &Path,
  76        ignore_checksum_mismatch: bool,
  77    ) -> anyhow::Result<Vec<(Migration, Duration)>> {
  78        let migrations = MigrationSource::resolve(migrations_path)
  79            .await
  80            .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
  81
  82        let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
  83
  84        connection.ensure_migrations_table().await?;
  85        let applied_migrations: HashMap<_, _> = connection
  86            .list_applied_migrations()
  87            .await?
  88            .into_iter()
  89            .map(|m| (m.version, m))
  90            .collect();
  91
  92        let mut new_migrations = Vec::new();
  93        for migration in migrations {
  94            match applied_migrations.get(&migration.version) {
  95                Some(applied_migration) => {
  96                    if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
  97                    {
  98                        Err(anyhow!(
  99                            "checksum mismatch for applied migration {}",
 100                            migration.description
 101                        ))?;
 102                    }
 103                }
 104                None => {
 105                    let elapsed = connection.apply(&migration).await?;
 106                    new_migrations.push((migration, elapsed));
 107                }
 108            }
 109        }
 110
 111        Ok(new_migrations)
 112    }
 113
 114    pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
 115        self.transaction(|tx| async move {
 116            let server = server::ActiveModel {
 117                environment: ActiveValue::set(environment.into()),
 118                ..Default::default()
 119            }
 120            .insert(&*tx)
 121            .await?;
 122            Ok(server.id)
 123        })
 124        .await
 125    }
 126
 127    pub async fn stale_room_ids(
 128        &self,
 129        environment: &str,
 130        new_server_id: ServerId,
 131    ) -> Result<Vec<RoomId>> {
 132        self.transaction(|tx| async move {
 133            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 134            enum QueryAs {
 135                RoomId,
 136            }
 137
 138            let stale_server_epochs = self
 139                .stale_server_ids(environment, new_server_id, &tx)
 140                .await?;
 141            Ok(room_participant::Entity::find()
 142                .select_only()
 143                .column(room_participant::Column::RoomId)
 144                .distinct()
 145                .filter(
 146                    room_participant::Column::AnsweringConnectionServerId
 147                        .is_in(stale_server_epochs),
 148                )
 149                .into_values::<_, QueryAs>()
 150                .all(&*tx)
 151                .await?)
 152        })
 153        .await
 154    }
 155
 156    pub async fn refresh_room(
 157        &self,
 158        room_id: RoomId,
 159        new_server_id: ServerId,
 160    ) -> Result<RoomGuard<RefreshedRoom>> {
 161        self.room_transaction(|tx| async move {
 162            let stale_participant_filter = Condition::all()
 163                .add(room_participant::Column::RoomId.eq(room_id))
 164                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
 165                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
 166
 167            let stale_participant_user_ids = room_participant::Entity::find()
 168                .filter(stale_participant_filter.clone())
 169                .all(&*tx)
 170                .await?
 171                .into_iter()
 172                .map(|participant| participant.user_id)
 173                .collect::<Vec<_>>();
 174
 175            // Delete participants who failed to reconnect.
 176            room_participant::Entity::delete_many()
 177                .filter(stale_participant_filter)
 178                .exec(&*tx)
 179                .await?;
 180
 181            let room = self.get_room(room_id, &tx).await?;
 182            let mut canceled_calls_to_user_ids = Vec::new();
 183            // Delete the room if it becomes empty and cancel pending calls.
 184            if room.participants.is_empty() {
 185                canceled_calls_to_user_ids.extend(
 186                    room.pending_participants
 187                        .iter()
 188                        .map(|pending_participant| UserId::from_proto(pending_participant.user_id)),
 189                );
 190                room_participant::Entity::delete_many()
 191                    .filter(room_participant::Column::RoomId.eq(room_id))
 192                    .exec(&*tx)
 193                    .await?;
 194                room::Entity::delete_by_id(room_id).exec(&*tx).await?;
 195            }
 196
 197            Ok((
 198                room_id,
 199                RefreshedRoom {
 200                    room,
 201                    stale_participant_user_ids,
 202                    canceled_calls_to_user_ids,
 203                },
 204            ))
 205        })
 206        .await
 207    }
 208
 209    pub async fn delete_stale_servers(
 210        &self,
 211        environment: &str,
 212        new_server_id: ServerId,
 213    ) -> Result<()> {
 214        self.transaction(|tx| async move {
 215            server::Entity::delete_many()
 216                .filter(
 217                    Condition::all()
 218                        .add(server::Column::Environment.eq(environment))
 219                        .add(server::Column::Id.ne(new_server_id)),
 220                )
 221                .exec(&*tx)
 222                .await?;
 223            Ok(())
 224        })
 225        .await
 226    }
 227
 228    async fn stale_server_ids(
 229        &self,
 230        environment: &str,
 231        new_server_id: ServerId,
 232        tx: &DatabaseTransaction,
 233    ) -> Result<Vec<ServerId>> {
 234        let stale_servers = server::Entity::find()
 235            .filter(
 236                Condition::all()
 237                    .add(server::Column::Environment.eq(environment))
 238                    .add(server::Column::Id.ne(new_server_id)),
 239            )
 240            .all(&*tx)
 241            .await?;
 242        Ok(stale_servers.into_iter().map(|server| server.id).collect())
 243    }
 244
 245    // users
 246
 247    pub async fn create_user(
 248        &self,
 249        email_address: &str,
 250        admin: bool,
 251        params: NewUserParams,
 252    ) -> Result<NewUserResult> {
 253        self.transaction(|tx| async {
 254            let tx = tx;
 255            let user = user::Entity::insert(user::ActiveModel {
 256                email_address: ActiveValue::set(Some(email_address.into())),
 257                github_login: ActiveValue::set(params.github_login.clone()),
 258                github_user_id: ActiveValue::set(Some(params.github_user_id)),
 259                admin: ActiveValue::set(admin),
 260                metrics_id: ActiveValue::set(Uuid::new_v4()),
 261                ..Default::default()
 262            })
 263            .on_conflict(
 264                OnConflict::column(user::Column::GithubLogin)
 265                    .update_column(user::Column::GithubLogin)
 266                    .to_owned(),
 267            )
 268            .exec_with_returning(&*tx)
 269            .await?;
 270
 271            Ok(NewUserResult {
 272                user_id: user.id,
 273                metrics_id: user.metrics_id.to_string(),
 274                signup_device_id: None,
 275                inviting_user_id: None,
 276            })
 277        })
 278        .await
 279    }
 280
 281    pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
 282        self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
 283            .await
 284    }
 285
 286    pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
 287        self.transaction(|tx| async {
 288            let tx = tx;
 289            Ok(user::Entity::find()
 290                .filter(user::Column::Id.is_in(ids.iter().copied()))
 291                .all(&*tx)
 292                .await?)
 293        })
 294        .await
 295    }
 296
 297    pub async fn get_user_by_github_account(
 298        &self,
 299        github_login: &str,
 300        github_user_id: Option<i32>,
 301    ) -> Result<Option<User>> {
 302        self.transaction(|tx| async move {
 303            let tx = &*tx;
 304            if let Some(github_user_id) = github_user_id {
 305                if let Some(user_by_github_user_id) = user::Entity::find()
 306                    .filter(user::Column::GithubUserId.eq(github_user_id))
 307                    .one(tx)
 308                    .await?
 309                {
 310                    let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
 311                    user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
 312                    Ok(Some(user_by_github_user_id.update(tx).await?))
 313                } else if let Some(user_by_github_login) = user::Entity::find()
 314                    .filter(user::Column::GithubLogin.eq(github_login))
 315                    .one(tx)
 316                    .await?
 317                {
 318                    let mut user_by_github_login = user_by_github_login.into_active_model();
 319                    user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
 320                    Ok(Some(user_by_github_login.update(tx).await?))
 321                } else {
 322                    Ok(None)
 323                }
 324            } else {
 325                Ok(user::Entity::find()
 326                    .filter(user::Column::GithubLogin.eq(github_login))
 327                    .one(tx)
 328                    .await?)
 329            }
 330        })
 331        .await
 332    }
 333
 334    pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
 335        self.transaction(|tx| async move {
 336            Ok(user::Entity::find()
 337                .order_by_asc(user::Column::GithubLogin)
 338                .limit(limit as u64)
 339                .offset(page as u64 * limit as u64)
 340                .all(&*tx)
 341                .await?)
 342        })
 343        .await
 344    }
 345
 346    pub async fn get_users_with_no_invites(
 347        &self,
 348        invited_by_another_user: bool,
 349    ) -> Result<Vec<User>> {
 350        self.transaction(|tx| async move {
 351            Ok(user::Entity::find()
 352                .filter(
 353                    user::Column::InviteCount
 354                        .eq(0)
 355                        .and(if invited_by_another_user {
 356                            user::Column::InviterId.is_not_null()
 357                        } else {
 358                            user::Column::InviterId.is_null()
 359                        }),
 360                )
 361                .all(&*tx)
 362                .await?)
 363        })
 364        .await
 365    }
 366
 367    pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
 368        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 369        enum QueryAs {
 370            MetricsId,
 371        }
 372
 373        self.transaction(|tx| async move {
 374            let metrics_id: Uuid = user::Entity::find_by_id(id)
 375                .select_only()
 376                .column(user::Column::MetricsId)
 377                .into_values::<_, QueryAs>()
 378                .one(&*tx)
 379                .await?
 380                .ok_or_else(|| anyhow!("could not find user"))?;
 381            Ok(metrics_id.to_string())
 382        })
 383        .await
 384    }
 385
 386    pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
 387        self.transaction(|tx| async move {
 388            user::Entity::update_many()
 389                .filter(user::Column::Id.eq(id))
 390                .set(user::ActiveModel {
 391                    admin: ActiveValue::set(is_admin),
 392                    ..Default::default()
 393                })
 394                .exec(&*tx)
 395                .await?;
 396            Ok(())
 397        })
 398        .await
 399    }
 400
 401    pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
 402        self.transaction(|tx| async move {
 403            user::Entity::update_many()
 404                .filter(user::Column::Id.eq(id))
 405                .set(user::ActiveModel {
 406                    connected_once: ActiveValue::set(connected_once),
 407                    ..Default::default()
 408                })
 409                .exec(&*tx)
 410                .await?;
 411            Ok(())
 412        })
 413        .await
 414    }
 415
 416    pub async fn destroy_user(&self, id: UserId) -> Result<()> {
 417        self.transaction(|tx| async move {
 418            access_token::Entity::delete_many()
 419                .filter(access_token::Column::UserId.eq(id))
 420                .exec(&*tx)
 421                .await?;
 422            user::Entity::delete_by_id(id).exec(&*tx).await?;
 423            Ok(())
 424        })
 425        .await
 426    }
 427
 428    // contacts
 429
 430    pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
 431        #[derive(Debug, FromQueryResult)]
 432        struct ContactWithUserBusyStatuses {
 433            user_id_a: UserId,
 434            user_id_b: UserId,
 435            a_to_b: bool,
 436            accepted: bool,
 437            should_notify: bool,
 438            user_a_busy: bool,
 439            user_b_busy: bool,
 440        }
 441
 442        self.transaction(|tx| async move {
 443            let user_a_participant = Alias::new("user_a_participant");
 444            let user_b_participant = Alias::new("user_b_participant");
 445            let mut db_contacts = contact::Entity::find()
 446                .column_as(
 447                    Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
 448                        .is_not_null(),
 449                    "user_a_busy",
 450                )
 451                .column_as(
 452                    Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
 453                        .is_not_null(),
 454                    "user_b_busy",
 455                )
 456                .filter(
 457                    contact::Column::UserIdA
 458                        .eq(user_id)
 459                        .or(contact::Column::UserIdB.eq(user_id)),
 460                )
 461                .join_as(
 462                    JoinType::LeftJoin,
 463                    contact::Relation::UserARoomParticipant.def(),
 464                    user_a_participant,
 465                )
 466                .join_as(
 467                    JoinType::LeftJoin,
 468                    contact::Relation::UserBRoomParticipant.def(),
 469                    user_b_participant,
 470                )
 471                .into_model::<ContactWithUserBusyStatuses>()
 472                .stream(&*tx)
 473                .await?;
 474
 475            let mut contacts = Vec::new();
 476            while let Some(db_contact) = db_contacts.next().await {
 477                let db_contact = db_contact?;
 478                if db_contact.user_id_a == user_id {
 479                    if db_contact.accepted {
 480                        contacts.push(Contact::Accepted {
 481                            user_id: db_contact.user_id_b,
 482                            should_notify: db_contact.should_notify && db_contact.a_to_b,
 483                            busy: db_contact.user_b_busy,
 484                        });
 485                    } else if db_contact.a_to_b {
 486                        contacts.push(Contact::Outgoing {
 487                            user_id: db_contact.user_id_b,
 488                        })
 489                    } else {
 490                        contacts.push(Contact::Incoming {
 491                            user_id: db_contact.user_id_b,
 492                            should_notify: db_contact.should_notify,
 493                        });
 494                    }
 495                } else if db_contact.accepted {
 496                    contacts.push(Contact::Accepted {
 497                        user_id: db_contact.user_id_a,
 498                        should_notify: db_contact.should_notify && !db_contact.a_to_b,
 499                        busy: db_contact.user_a_busy,
 500                    });
 501                } else if db_contact.a_to_b {
 502                    contacts.push(Contact::Incoming {
 503                        user_id: db_contact.user_id_a,
 504                        should_notify: db_contact.should_notify,
 505                    });
 506                } else {
 507                    contacts.push(Contact::Outgoing {
 508                        user_id: db_contact.user_id_a,
 509                    });
 510                }
 511            }
 512
 513            contacts.sort_unstable_by_key(|contact| contact.user_id());
 514
 515            Ok(contacts)
 516        })
 517        .await
 518    }
 519
 520    pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
 521        self.transaction(|tx| async move {
 522            let participant = room_participant::Entity::find()
 523                .filter(room_participant::Column::UserId.eq(user_id))
 524                .one(&*tx)
 525                .await?;
 526            Ok(participant.is_some())
 527        })
 528        .await
 529    }
 530
 531    pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
 532        self.transaction(|tx| async move {
 533            let (id_a, id_b) = if user_id_1 < user_id_2 {
 534                (user_id_1, user_id_2)
 535            } else {
 536                (user_id_2, user_id_1)
 537            };
 538
 539            Ok(contact::Entity::find()
 540                .filter(
 541                    contact::Column::UserIdA
 542                        .eq(id_a)
 543                        .and(contact::Column::UserIdB.eq(id_b))
 544                        .and(contact::Column::Accepted.eq(true)),
 545                )
 546                .one(&*tx)
 547                .await?
 548                .is_some())
 549        })
 550        .await
 551    }
 552
 553    pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
 554        self.transaction(|tx| async move {
 555            let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
 556                (sender_id, receiver_id, true)
 557            } else {
 558                (receiver_id, sender_id, false)
 559            };
 560
 561            let rows_affected = contact::Entity::insert(contact::ActiveModel {
 562                user_id_a: ActiveValue::set(id_a),
 563                user_id_b: ActiveValue::set(id_b),
 564                a_to_b: ActiveValue::set(a_to_b),
 565                accepted: ActiveValue::set(false),
 566                should_notify: ActiveValue::set(true),
 567                ..Default::default()
 568            })
 569            .on_conflict(
 570                OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
 571                    .values([
 572                        (contact::Column::Accepted, true.into()),
 573                        (contact::Column::ShouldNotify, false.into()),
 574                    ])
 575                    .action_and_where(
 576                        contact::Column::Accepted.eq(false).and(
 577                            contact::Column::AToB
 578                                .eq(a_to_b)
 579                                .and(contact::Column::UserIdA.eq(id_b))
 580                                .or(contact::Column::AToB
 581                                    .ne(a_to_b)
 582                                    .and(contact::Column::UserIdA.eq(id_a))),
 583                        ),
 584                    )
 585                    .to_owned(),
 586            )
 587            .exec_without_returning(&*tx)
 588            .await?;
 589
 590            if rows_affected == 1 {
 591                Ok(())
 592            } else {
 593                Err(anyhow!("contact already requested"))?
 594            }
 595        })
 596        .await
 597    }
 598
 599    /// Returns a bool indicating whether the removed contact had originally accepted or not
 600    ///
 601    /// Deletes the contact identified by the requester and responder ids, and then returns
 602    /// whether the deleted contact had originally accepted or was a pending contact request.
 603    ///
 604    /// # Arguments
 605    ///
 606    /// * `requester_id` - The user that initiates this request
 607    /// * `responder_id` - The user that will be removed
 608    pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<bool> {
 609        self.transaction(|tx| async move {
 610            let (id_a, id_b) = if responder_id < requester_id {
 611                (responder_id, requester_id)
 612            } else {
 613                (requester_id, responder_id)
 614            };
 615
 616            let contact = contact::Entity::find()
 617                .filter(
 618                    contact::Column::UserIdA
 619                        .eq(id_a)
 620                        .and(contact::Column::UserIdB.eq(id_b)),
 621                )
 622                .one(&*tx)
 623                .await?
 624                .ok_or_else(|| anyhow!("no such contact"))?;
 625
 626            contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
 627            Ok(contact.accepted)
 628        })
 629        .await
 630    }
 631
 632    pub async fn dismiss_contact_notification(
 633        &self,
 634        user_id: UserId,
 635        contact_user_id: UserId,
 636    ) -> Result<()> {
 637        self.transaction(|tx| async move {
 638            let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
 639                (user_id, contact_user_id, true)
 640            } else {
 641                (contact_user_id, user_id, false)
 642            };
 643
 644            let result = contact::Entity::update_many()
 645                .set(contact::ActiveModel {
 646                    should_notify: ActiveValue::set(false),
 647                    ..Default::default()
 648                })
 649                .filter(
 650                    contact::Column::UserIdA
 651                        .eq(id_a)
 652                        .and(contact::Column::UserIdB.eq(id_b))
 653                        .and(
 654                            contact::Column::AToB
 655                                .eq(a_to_b)
 656                                .and(contact::Column::Accepted.eq(true))
 657                                .or(contact::Column::AToB
 658                                    .ne(a_to_b)
 659                                    .and(contact::Column::Accepted.eq(false))),
 660                        ),
 661                )
 662                .exec(&*tx)
 663                .await?;
 664            if result.rows_affected == 0 {
 665                Err(anyhow!("no such contact request"))?
 666            } else {
 667                Ok(())
 668            }
 669        })
 670        .await
 671    }
 672
 673    pub async fn respond_to_contact_request(
 674        &self,
 675        responder_id: UserId,
 676        requester_id: UserId,
 677        accept: bool,
 678    ) -> Result<()> {
 679        self.transaction(|tx| async move {
 680            let (id_a, id_b, a_to_b) = if responder_id < requester_id {
 681                (responder_id, requester_id, false)
 682            } else {
 683                (requester_id, responder_id, true)
 684            };
 685            let rows_affected = if accept {
 686                let result = contact::Entity::update_many()
 687                    .set(contact::ActiveModel {
 688                        accepted: ActiveValue::set(true),
 689                        should_notify: ActiveValue::set(true),
 690                        ..Default::default()
 691                    })
 692                    .filter(
 693                        contact::Column::UserIdA
 694                            .eq(id_a)
 695                            .and(contact::Column::UserIdB.eq(id_b))
 696                            .and(contact::Column::AToB.eq(a_to_b)),
 697                    )
 698                    .exec(&*tx)
 699                    .await?;
 700                result.rows_affected
 701            } else {
 702                let result = contact::Entity::delete_many()
 703                    .filter(
 704                        contact::Column::UserIdA
 705                            .eq(id_a)
 706                            .and(contact::Column::UserIdB.eq(id_b))
 707                            .and(contact::Column::AToB.eq(a_to_b))
 708                            .and(contact::Column::Accepted.eq(false)),
 709                    )
 710                    .exec(&*tx)
 711                    .await?;
 712
 713                result.rows_affected
 714            };
 715
 716            if rows_affected == 1 {
 717                Ok(())
 718            } else {
 719                Err(anyhow!("no such contact request"))?
 720            }
 721        })
 722        .await
 723    }
 724
 725    pub fn fuzzy_like_string(string: &str) -> String {
 726        let mut result = String::with_capacity(string.len() * 2 + 1);
 727        for c in string.chars() {
 728            if c.is_alphanumeric() {
 729                result.push('%');
 730                result.push(c);
 731            }
 732        }
 733        result.push('%');
 734        result
 735    }
 736
 737    pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
 738        self.transaction(|tx| async {
 739            let tx = tx;
 740            let like_string = Self::fuzzy_like_string(name_query);
 741            let query = "
 742                SELECT users.*
 743                FROM users
 744                WHERE github_login ILIKE $1
 745                ORDER BY github_login <-> $2
 746                LIMIT $3
 747            ";
 748
 749            Ok(user::Entity::find()
 750                .from_raw_sql(Statement::from_sql_and_values(
 751                    self.pool.get_database_backend(),
 752                    query.into(),
 753                    vec![like_string.into(), name_query.into(), limit.into()],
 754                ))
 755                .all(&*tx)
 756                .await?)
 757        })
 758        .await
 759    }
 760
 761    // signups
 762
 763    pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
 764        self.transaction(|tx| async move {
 765            signup::Entity::insert(signup::ActiveModel {
 766                email_address: ActiveValue::set(signup.email_address.clone()),
 767                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 768                email_confirmation_sent: ActiveValue::set(false),
 769                platform_mac: ActiveValue::set(signup.platform_mac),
 770                platform_windows: ActiveValue::set(signup.platform_windows),
 771                platform_linux: ActiveValue::set(signup.platform_linux),
 772                platform_unknown: ActiveValue::set(false),
 773                editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
 774                programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
 775                device_id: ActiveValue::set(signup.device_id.clone()),
 776                added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
 777                ..Default::default()
 778            })
 779            .on_conflict(
 780                OnConflict::column(signup::Column::EmailAddress)
 781                    .update_columns([
 782                        signup::Column::PlatformMac,
 783                        signup::Column::PlatformWindows,
 784                        signup::Column::PlatformLinux,
 785                        signup::Column::EditorFeatures,
 786                        signup::Column::ProgrammingLanguages,
 787                        signup::Column::DeviceId,
 788                        signup::Column::AddedToMailingList,
 789                    ])
 790                    .to_owned(),
 791            )
 792            .exec(&*tx)
 793            .await?;
 794            Ok(())
 795        })
 796        .await
 797    }
 798
 799    pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
 800        self.transaction(|tx| async move {
 801            let signup = signup::Entity::find()
 802                .filter(signup::Column::EmailAddress.eq(email_address))
 803                .one(&*tx)
 804                .await?
 805                .ok_or_else(|| {
 806                    anyhow!("signup with email address {} doesn't exist", email_address)
 807                })?;
 808
 809            Ok(signup)
 810        })
 811        .await
 812    }
 813
 814    pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
 815        self.transaction(|tx| async move {
 816            let query = "
 817                SELECT
 818                    COUNT(*) as count,
 819                    COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
 820                    COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
 821                    COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
 822                    COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
 823                FROM (
 824                    SELECT *
 825                    FROM signups
 826                    WHERE
 827                        NOT email_confirmation_sent
 828                ) AS unsent
 829            ";
 830            Ok(
 831                WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
 832                    self.pool.get_database_backend(),
 833                    query.into(),
 834                    vec![],
 835                ))
 836                .one(&*tx)
 837                .await?
 838                .ok_or_else(|| anyhow!("invalid result"))?,
 839            )
 840        })
 841        .await
 842    }
 843
 844    pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
 845        let emails = invites
 846            .iter()
 847            .map(|s| s.email_address.as_str())
 848            .collect::<Vec<_>>();
 849        self.transaction(|tx| async {
 850            let tx = tx;
 851            signup::Entity::update_many()
 852                .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
 853                .set(signup::ActiveModel {
 854                    email_confirmation_sent: ActiveValue::set(true),
 855                    ..Default::default()
 856                })
 857                .exec(&*tx)
 858                .await?;
 859            Ok(())
 860        })
 861        .await
 862    }
 863
 864    pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
 865        self.transaction(|tx| async move {
 866            Ok(signup::Entity::find()
 867                .select_only()
 868                .column(signup::Column::EmailAddress)
 869                .column(signup::Column::EmailConfirmationCode)
 870                .filter(
 871                    signup::Column::EmailConfirmationSent.eq(false).and(
 872                        signup::Column::PlatformMac
 873                            .eq(true)
 874                            .or(signup::Column::PlatformUnknown.eq(true)),
 875                    ),
 876                )
 877                .order_by_asc(signup::Column::CreatedAt)
 878                .limit(count as u64)
 879                .into_model()
 880                .all(&*tx)
 881                .await?)
 882        })
 883        .await
 884    }
 885
 886    // invite codes
 887
 888    pub async fn create_invite_from_code(
 889        &self,
 890        code: &str,
 891        email_address: &str,
 892        device_id: Option<&str>,
 893        added_to_mailing_list: bool,
 894    ) -> Result<Invite> {
 895        self.transaction(|tx| async move {
 896            let existing_user = user::Entity::find()
 897                .filter(user::Column::EmailAddress.eq(email_address))
 898                .one(&*tx)
 899                .await?;
 900
 901            if existing_user.is_some() {
 902                Err(anyhow!("email address is already in use"))?;
 903            }
 904
 905            let inviting_user_with_invites = match user::Entity::find()
 906                .filter(
 907                    user::Column::InviteCode
 908                        .eq(code)
 909                        .and(user::Column::InviteCount.gt(0)),
 910                )
 911                .one(&*tx)
 912                .await?
 913            {
 914                Some(inviting_user) => inviting_user,
 915                None => {
 916                    return Err(Error::Http(
 917                        StatusCode::UNAUTHORIZED,
 918                        "unable to find an invite code with invites remaining".to_string(),
 919                    ))?
 920                }
 921            };
 922            user::Entity::update_many()
 923                .filter(
 924                    user::Column::Id
 925                        .eq(inviting_user_with_invites.id)
 926                        .and(user::Column::InviteCount.gt(0)),
 927                )
 928                .col_expr(
 929                    user::Column::InviteCount,
 930                    Expr::col(user::Column::InviteCount).sub(1),
 931                )
 932                .exec(&*tx)
 933                .await?;
 934
 935            let signup = signup::Entity::insert(signup::ActiveModel {
 936                email_address: ActiveValue::set(email_address.into()),
 937                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 938                email_confirmation_sent: ActiveValue::set(false),
 939                inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
 940                platform_linux: ActiveValue::set(false),
 941                platform_mac: ActiveValue::set(false),
 942                platform_windows: ActiveValue::set(false),
 943                platform_unknown: ActiveValue::set(true),
 944                device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
 945                added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
 946                ..Default::default()
 947            })
 948            .on_conflict(
 949                OnConflict::column(signup::Column::EmailAddress)
 950                    .update_column(signup::Column::InvitingUserId)
 951                    .to_owned(),
 952            )
 953            .exec_with_returning(&*tx)
 954            .await?;
 955
 956            Ok(Invite {
 957                email_address: signup.email_address,
 958                email_confirmation_code: signup.email_confirmation_code,
 959            })
 960        })
 961        .await
 962    }
 963
 964    pub async fn create_user_from_invite(
 965        &self,
 966        invite: &Invite,
 967        user: NewUserParams,
 968    ) -> Result<Option<NewUserResult>> {
 969        self.transaction(|tx| async {
 970            let tx = tx;
 971            let signup = signup::Entity::find()
 972                .filter(
 973                    signup::Column::EmailAddress
 974                        .eq(invite.email_address.as_str())
 975                        .and(
 976                            signup::Column::EmailConfirmationCode
 977                                .eq(invite.email_confirmation_code.as_str()),
 978                        ),
 979                )
 980                .one(&*tx)
 981                .await?
 982                .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
 983
 984            if signup.user_id.is_some() {
 985                return Ok(None);
 986            }
 987
 988            let user = user::Entity::insert(user::ActiveModel {
 989                email_address: ActiveValue::set(Some(invite.email_address.clone())),
 990                github_login: ActiveValue::set(user.github_login.clone()),
 991                github_user_id: ActiveValue::set(Some(user.github_user_id)),
 992                admin: ActiveValue::set(false),
 993                invite_count: ActiveValue::set(user.invite_count),
 994                invite_code: ActiveValue::set(Some(random_invite_code())),
 995                metrics_id: ActiveValue::set(Uuid::new_v4()),
 996                ..Default::default()
 997            })
 998            .on_conflict(
 999                OnConflict::column(user::Column::GithubLogin)
1000                    .update_columns([
1001                        user::Column::EmailAddress,
1002                        user::Column::GithubUserId,
1003                        user::Column::Admin,
1004                    ])
1005                    .to_owned(),
1006            )
1007            .exec_with_returning(&*tx)
1008            .await?;
1009
1010            let mut signup = signup.into_active_model();
1011            signup.user_id = ActiveValue::set(Some(user.id));
1012            let signup = signup.update(&*tx).await?;
1013
1014            if let Some(inviting_user_id) = signup.inviting_user_id {
1015                let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1016                    (inviting_user_id, user.id, true)
1017                } else {
1018                    (user.id, inviting_user_id, false)
1019                };
1020
1021                contact::Entity::insert(contact::ActiveModel {
1022                    user_id_a: ActiveValue::set(user_id_a),
1023                    user_id_b: ActiveValue::set(user_id_b),
1024                    a_to_b: ActiveValue::set(a_to_b),
1025                    should_notify: ActiveValue::set(true),
1026                    accepted: ActiveValue::set(true),
1027                    ..Default::default()
1028                })
1029                .on_conflict(OnConflict::new().do_nothing().to_owned())
1030                .exec_without_returning(&*tx)
1031                .await?;
1032            }
1033
1034            Ok(Some(NewUserResult {
1035                user_id: user.id,
1036                metrics_id: user.metrics_id.to_string(),
1037                inviting_user_id: signup.inviting_user_id,
1038                signup_device_id: signup.device_id,
1039            }))
1040        })
1041        .await
1042    }
1043
1044    pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1045        self.transaction(|tx| async move {
1046            if count > 0 {
1047                user::Entity::update_many()
1048                    .filter(
1049                        user::Column::Id
1050                            .eq(id)
1051                            .and(user::Column::InviteCode.is_null()),
1052                    )
1053                    .set(user::ActiveModel {
1054                        invite_code: ActiveValue::set(Some(random_invite_code())),
1055                        ..Default::default()
1056                    })
1057                    .exec(&*tx)
1058                    .await?;
1059            }
1060
1061            user::Entity::update_many()
1062                .filter(user::Column::Id.eq(id))
1063                .set(user::ActiveModel {
1064                    invite_count: ActiveValue::set(count),
1065                    ..Default::default()
1066                })
1067                .exec(&*tx)
1068                .await?;
1069            Ok(())
1070        })
1071        .await
1072    }
1073
1074    pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1075        self.transaction(|tx| async move {
1076            match user::Entity::find_by_id(id).one(&*tx).await? {
1077                Some(user) if user.invite_code.is_some() => {
1078                    Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1079                }
1080                _ => Ok(None),
1081            }
1082        })
1083        .await
1084    }
1085
1086    pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1087        self.transaction(|tx| async move {
1088            user::Entity::find()
1089                .filter(user::Column::InviteCode.eq(code))
1090                .one(&*tx)
1091                .await?
1092                .ok_or_else(|| {
1093                    Error::Http(
1094                        StatusCode::NOT_FOUND,
1095                        "that invite code does not exist".to_string(),
1096                    )
1097                })
1098        })
1099        .await
1100    }
1101
1102    // rooms
1103
1104    pub async fn incoming_call_for_user(
1105        &self,
1106        user_id: UserId,
1107    ) -> Result<Option<proto::IncomingCall>> {
1108        self.transaction(|tx| async move {
1109            let pending_participant = room_participant::Entity::find()
1110                .filter(
1111                    room_participant::Column::UserId
1112                        .eq(user_id)
1113                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1114                )
1115                .one(&*tx)
1116                .await?;
1117
1118            if let Some(pending_participant) = pending_participant {
1119                let room = self.get_room(pending_participant.room_id, &tx).await?;
1120                Ok(Self::build_incoming_call(&room, user_id))
1121            } else {
1122                Ok(None)
1123            }
1124        })
1125        .await
1126    }
1127
1128    pub async fn create_room(
1129        &self,
1130        user_id: UserId,
1131        connection: ConnectionId,
1132        live_kit_room: &str,
1133    ) -> Result<RoomGuard<proto::Room>> {
1134        self.room_transaction(|tx| async move {
1135            let room = room::ActiveModel {
1136                live_kit_room: ActiveValue::set(live_kit_room.into()),
1137                ..Default::default()
1138            }
1139            .insert(&*tx)
1140            .await?;
1141            let room_id = room.id;
1142
1143            room_participant::ActiveModel {
1144                room_id: ActiveValue::set(room_id),
1145                user_id: ActiveValue::set(user_id),
1146                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1147                answering_connection_server_id: ActiveValue::set(Some(ServerId(
1148                    connection.owner_id as i32,
1149                ))),
1150                answering_connection_lost: ActiveValue::set(false),
1151                calling_user_id: ActiveValue::set(user_id),
1152                calling_connection_id: ActiveValue::set(connection.id as i32),
1153                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1154                    connection.owner_id as i32,
1155                ))),
1156                ..Default::default()
1157            }
1158            .insert(&*tx)
1159            .await?;
1160
1161            let room = self.get_room(room_id, &tx).await?;
1162            Ok((room_id, room))
1163        })
1164        .await
1165    }
1166
1167    pub async fn call(
1168        &self,
1169        room_id: RoomId,
1170        calling_user_id: UserId,
1171        calling_connection: ConnectionId,
1172        called_user_id: UserId,
1173        initial_project_id: Option<ProjectId>,
1174    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1175        self.room_transaction(|tx| async move {
1176            room_participant::ActiveModel {
1177                room_id: ActiveValue::set(room_id),
1178                user_id: ActiveValue::set(called_user_id),
1179                answering_connection_lost: ActiveValue::set(false),
1180                calling_user_id: ActiveValue::set(calling_user_id),
1181                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1182                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1183                    calling_connection.owner_id as i32,
1184                ))),
1185                initial_project_id: ActiveValue::set(initial_project_id),
1186                ..Default::default()
1187            }
1188            .insert(&*tx)
1189            .await?;
1190
1191            let room = self.get_room(room_id, &tx).await?;
1192            let incoming_call = Self::build_incoming_call(&room, called_user_id)
1193                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1194            Ok((room_id, (room, incoming_call)))
1195        })
1196        .await
1197    }
1198
1199    pub async fn call_failed(
1200        &self,
1201        room_id: RoomId,
1202        called_user_id: UserId,
1203    ) -> Result<RoomGuard<proto::Room>> {
1204        self.room_transaction(|tx| async move {
1205            room_participant::Entity::delete_many()
1206                .filter(
1207                    room_participant::Column::RoomId
1208                        .eq(room_id)
1209                        .and(room_participant::Column::UserId.eq(called_user_id)),
1210                )
1211                .exec(&*tx)
1212                .await?;
1213            let room = self.get_room(room_id, &tx).await?;
1214            Ok((room_id, room))
1215        })
1216        .await
1217    }
1218
1219    pub async fn decline_call(
1220        &self,
1221        expected_room_id: Option<RoomId>,
1222        user_id: UserId,
1223    ) -> Result<Option<RoomGuard<proto::Room>>> {
1224        self.optional_room_transaction(|tx| async move {
1225            let mut filter = Condition::all()
1226                .add(room_participant::Column::UserId.eq(user_id))
1227                .add(room_participant::Column::AnsweringConnectionId.is_null());
1228            if let Some(room_id) = expected_room_id {
1229                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1230            }
1231            let participant = room_participant::Entity::find()
1232                .filter(filter)
1233                .one(&*tx)
1234                .await?;
1235
1236            let participant = if let Some(participant) = participant {
1237                participant
1238            } else if expected_room_id.is_some() {
1239                return Err(anyhow!("could not find call to decline"))?;
1240            } else {
1241                return Ok(None);
1242            };
1243
1244            let room_id = participant.room_id;
1245            room_participant::Entity::delete(participant.into_active_model())
1246                .exec(&*tx)
1247                .await?;
1248
1249            let room = self.get_room(room_id, &tx).await?;
1250            Ok(Some((room_id, room)))
1251        })
1252        .await
1253    }
1254
1255    pub async fn cancel_call(
1256        &self,
1257        room_id: RoomId,
1258        calling_connection: ConnectionId,
1259        called_user_id: UserId,
1260    ) -> Result<RoomGuard<proto::Room>> {
1261        self.room_transaction(|tx| async move {
1262            let participant = room_participant::Entity::find()
1263                .filter(
1264                    Condition::all()
1265                        .add(room_participant::Column::UserId.eq(called_user_id))
1266                        .add(room_participant::Column::RoomId.eq(room_id))
1267                        .add(
1268                            room_participant::Column::CallingConnectionId
1269                                .eq(calling_connection.id as i32),
1270                        )
1271                        .add(
1272                            room_participant::Column::CallingConnectionServerId
1273                                .eq(calling_connection.owner_id as i32),
1274                        )
1275                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1276                )
1277                .one(&*tx)
1278                .await?
1279                .ok_or_else(|| anyhow!("no call to cancel"))?;
1280            let room_id = participant.room_id;
1281
1282            room_participant::Entity::delete(participant.into_active_model())
1283                .exec(&*tx)
1284                .await?;
1285
1286            let room = self.get_room(room_id, &tx).await?;
1287            Ok((room_id, room))
1288        })
1289        .await
1290    }
1291
1292    pub async fn join_room(
1293        &self,
1294        room_id: RoomId,
1295        user_id: UserId,
1296        connection: ConnectionId,
1297    ) -> Result<RoomGuard<proto::Room>> {
1298        self.room_transaction(|tx| async move {
1299            let result = room_participant::Entity::update_many()
1300                .filter(
1301                    Condition::all()
1302                        .add(room_participant::Column::RoomId.eq(room_id))
1303                        .add(room_participant::Column::UserId.eq(user_id))
1304                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1305                )
1306                .set(room_participant::ActiveModel {
1307                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1308                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1309                        connection.owner_id as i32,
1310                    ))),
1311                    answering_connection_lost: ActiveValue::set(false),
1312                    ..Default::default()
1313                })
1314                .exec(&*tx)
1315                .await?;
1316            if result.rows_affected == 0 {
1317                Err(anyhow!("room does not exist or was already joined"))?
1318            } else {
1319                let room = self.get_room(room_id, &tx).await?;
1320                Ok((room_id, room))
1321            }
1322        })
1323        .await
1324    }
1325
1326    pub async fn rejoin_room(
1327        &self,
1328        rejoin_room: proto::RejoinRoom,
1329        user_id: UserId,
1330        connection: ConnectionId,
1331    ) -> Result<RoomGuard<RejoinedRoom>> {
1332        self.room_transaction(|tx| async {
1333            let tx = tx;
1334            let room_id = RoomId::from_proto(rejoin_room.id);
1335            let participant_update = room_participant::Entity::update_many()
1336                .filter(
1337                    Condition::all()
1338                        .add(room_participant::Column::RoomId.eq(room_id))
1339                        .add(room_participant::Column::UserId.eq(user_id))
1340                        .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1341                        .add(
1342                            Condition::any()
1343                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1344                                .add(
1345                                    room_participant::Column::AnsweringConnectionServerId
1346                                        .ne(connection.owner_id as i32),
1347                                ),
1348                        ),
1349                )
1350                .set(room_participant::ActiveModel {
1351                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1352                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1353                        connection.owner_id as i32,
1354                    ))),
1355                    answering_connection_lost: ActiveValue::set(false),
1356                    ..Default::default()
1357                })
1358                .exec(&*tx)
1359                .await?;
1360            if participant_update.rows_affected == 0 {
1361                return Err(anyhow!("room does not exist or was already joined"))?;
1362            }
1363
1364            let mut reshared_projects = Vec::new();
1365            for reshared_project in &rejoin_room.reshared_projects {
1366                let project_id = ProjectId::from_proto(reshared_project.project_id);
1367                let project = project::Entity::find_by_id(project_id)
1368                    .one(&*tx)
1369                    .await?
1370                    .ok_or_else(|| anyhow!("project does not exist"))?;
1371                if project.host_user_id != user_id {
1372                    return Err(anyhow!("no such project"))?;
1373                }
1374
1375                let mut collaborators = project
1376                    .find_related(project_collaborator::Entity)
1377                    .all(&*tx)
1378                    .await?;
1379                let host_ix = collaborators
1380                    .iter()
1381                    .position(|collaborator| {
1382                        collaborator.user_id == user_id && collaborator.is_host
1383                    })
1384                    .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1385                let host = collaborators.swap_remove(host_ix);
1386                let old_connection_id = host.connection();
1387
1388                project::Entity::update(project::ActiveModel {
1389                    host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1390                    host_connection_server_id: ActiveValue::set(Some(ServerId(
1391                        connection.owner_id as i32,
1392                    ))),
1393                    ..project.into_active_model()
1394                })
1395                .exec(&*tx)
1396                .await?;
1397                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1398                    connection_id: ActiveValue::set(connection.id as i32),
1399                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1400                    ..host.into_active_model()
1401                })
1402                .exec(&*tx)
1403                .await?;
1404
1405                self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1406                    .await?;
1407
1408                reshared_projects.push(ResharedProject {
1409                    id: project_id,
1410                    old_connection_id,
1411                    collaborators: collaborators
1412                        .iter()
1413                        .map(|collaborator| ProjectCollaborator {
1414                            connection_id: collaborator.connection(),
1415                            user_id: collaborator.user_id,
1416                            replica_id: collaborator.replica_id,
1417                            is_host: collaborator.is_host,
1418                        })
1419                        .collect(),
1420                    worktrees: reshared_project.worktrees.clone(),
1421                });
1422            }
1423
1424            project::Entity::delete_many()
1425                .filter(
1426                    Condition::all()
1427                        .add(project::Column::RoomId.eq(room_id))
1428                        .add(project::Column::HostUserId.eq(user_id))
1429                        .add(
1430                            project::Column::Id
1431                                .is_not_in(reshared_projects.iter().map(|project| project.id)),
1432                        ),
1433                )
1434                .exec(&*tx)
1435                .await?;
1436
1437            let mut rejoined_projects = Vec::new();
1438            for rejoined_project in &rejoin_room.rejoined_projects {
1439                let project_id = ProjectId::from_proto(rejoined_project.id);
1440                let Some(project) = project::Entity::find_by_id(project_id)
1441                    .one(&*tx)
1442                    .await? else { continue };
1443
1444                let mut worktrees = Vec::new();
1445                let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1446                for db_worktree in db_worktrees {
1447                    let mut worktree = RejoinedWorktree {
1448                        id: db_worktree.id as u64,
1449                        abs_path: db_worktree.abs_path,
1450                        root_name: db_worktree.root_name,
1451                        visible: db_worktree.visible,
1452                        updated_entries: Default::default(),
1453                        removed_entries: Default::default(),
1454                        diagnostic_summaries: Default::default(),
1455                        scan_id: db_worktree.scan_id as u64,
1456                        completed_scan_id: db_worktree.completed_scan_id as u64,
1457                    };
1458
1459                    let rejoined_worktree = rejoined_project
1460                        .worktrees
1461                        .iter()
1462                        .find(|worktree| worktree.id == db_worktree.id as u64);
1463                    let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1464                        worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1465                    } else {
1466                        worktree_entry::Column::IsDeleted.eq(false)
1467                    };
1468
1469                    let mut db_entries = worktree_entry::Entity::find()
1470                        .filter(
1471                            Condition::all()
1472                                .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1473                                .add(entry_filter),
1474                        )
1475                        .stream(&*tx)
1476                        .await?;
1477
1478                    while let Some(db_entry) = db_entries.next().await {
1479                        let db_entry = db_entry?;
1480                        if db_entry.is_deleted {
1481                            worktree.removed_entries.push(db_entry.id as u64);
1482                        } else {
1483                            worktree.updated_entries.push(proto::Entry {
1484                                id: db_entry.id as u64,
1485                                is_dir: db_entry.is_dir,
1486                                path: db_entry.path,
1487                                inode: db_entry.inode as u64,
1488                                mtime: Some(proto::Timestamp {
1489                                    seconds: db_entry.mtime_seconds as u64,
1490                                    nanos: db_entry.mtime_nanos as u32,
1491                                }),
1492                                is_symlink: db_entry.is_symlink,
1493                                is_ignored: db_entry.is_ignored,
1494                            });
1495                        }
1496                    }
1497
1498                    worktrees.push(worktree);
1499                }
1500
1501                let language_servers = project
1502                    .find_related(language_server::Entity)
1503                    .all(&*tx)
1504                    .await?
1505                    .into_iter()
1506                    .map(|language_server| proto::LanguageServer {
1507                        id: language_server.id as u64,
1508                        name: language_server.name,
1509                    })
1510                    .collect::<Vec<_>>();
1511
1512                let mut collaborators = project
1513                    .find_related(project_collaborator::Entity)
1514                    .all(&*tx)
1515                    .await?;
1516                let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1517                    .iter()
1518                    .position(|collaborator| collaborator.user_id == user_id)
1519                {
1520                    collaborators.swap_remove(self_collaborator_ix)
1521                } else {
1522                    continue;
1523                };
1524                let old_connection_id = self_collaborator.connection();
1525                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1526                    connection_id: ActiveValue::set(connection.id as i32),
1527                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1528                    ..self_collaborator.into_active_model()
1529                })
1530                .exec(&*tx)
1531                .await?;
1532
1533                let collaborators = collaborators
1534                    .into_iter()
1535                    .map(|collaborator| ProjectCollaborator {
1536                        connection_id: collaborator.connection(),
1537                        user_id: collaborator.user_id,
1538                        replica_id: collaborator.replica_id,
1539                        is_host: collaborator.is_host,
1540                    })
1541                    .collect::<Vec<_>>();
1542
1543                rejoined_projects.push(RejoinedProject {
1544                    id: project_id,
1545                    old_connection_id,
1546                    collaborators,
1547                    worktrees,
1548                    language_servers,
1549                });
1550            }
1551
1552            let room = self.get_room(room_id, &tx).await?;
1553            Ok((
1554                room_id,
1555                RejoinedRoom {
1556                    room,
1557                    rejoined_projects,
1558                    reshared_projects,
1559                },
1560            ))
1561        })
1562        .await
1563    }
1564
1565    pub async fn leave_room(
1566        &self,
1567        connection: ConnectionId,
1568    ) -> Result<Option<RoomGuard<LeftRoom>>> {
1569        self.optional_room_transaction(|tx| async move {
1570            let leaving_participant = room_participant::Entity::find()
1571                .filter(
1572                    Condition::all()
1573                        .add(
1574                            room_participant::Column::AnsweringConnectionId
1575                                .eq(connection.id as i32),
1576                        )
1577                        .add(
1578                            room_participant::Column::AnsweringConnectionServerId
1579                                .eq(connection.owner_id as i32),
1580                        ),
1581                )
1582                .one(&*tx)
1583                .await?;
1584
1585            if let Some(leaving_participant) = leaving_participant {
1586                // Leave room.
1587                let room_id = leaving_participant.room_id;
1588                room_participant::Entity::delete_by_id(leaving_participant.id)
1589                    .exec(&*tx)
1590                    .await?;
1591
1592                // Cancel pending calls initiated by the leaving user.
1593                let called_participants = room_participant::Entity::find()
1594                    .filter(
1595                        Condition::all()
1596                            .add(
1597                                room_participant::Column::CallingUserId
1598                                    .eq(leaving_participant.user_id),
1599                            )
1600                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
1601                    )
1602                    .all(&*tx)
1603                    .await?;
1604                room_participant::Entity::delete_many()
1605                    .filter(
1606                        room_participant::Column::Id
1607                            .is_in(called_participants.iter().map(|participant| participant.id)),
1608                    )
1609                    .exec(&*tx)
1610                    .await?;
1611                let canceled_calls_to_user_ids = called_participants
1612                    .into_iter()
1613                    .map(|participant| participant.user_id)
1614                    .collect();
1615
1616                // Detect left projects.
1617                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1618                enum QueryProjectIds {
1619                    ProjectId,
1620                }
1621                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1622                    .select_only()
1623                    .column_as(
1624                        project_collaborator::Column::ProjectId,
1625                        QueryProjectIds::ProjectId,
1626                    )
1627                    .filter(
1628                        Condition::all()
1629                            .add(
1630                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1631                            )
1632                            .add(
1633                                project_collaborator::Column::ConnectionServerId
1634                                    .eq(connection.owner_id as i32),
1635                            ),
1636                    )
1637                    .into_values::<_, QueryProjectIds>()
1638                    .all(&*tx)
1639                    .await?;
1640                let mut left_projects = HashMap::default();
1641                let mut collaborators = project_collaborator::Entity::find()
1642                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1643                    .stream(&*tx)
1644                    .await?;
1645                while let Some(collaborator) = collaborators.next().await {
1646                    let collaborator = collaborator?;
1647                    let left_project =
1648                        left_projects
1649                            .entry(collaborator.project_id)
1650                            .or_insert(LeftProject {
1651                                id: collaborator.project_id,
1652                                host_user_id: Default::default(),
1653                                connection_ids: Default::default(),
1654                                host_connection_id: Default::default(),
1655                            });
1656
1657                    let collaborator_connection_id = collaborator.connection();
1658                    if collaborator_connection_id != connection {
1659                        left_project.connection_ids.push(collaborator_connection_id);
1660                    }
1661
1662                    if collaborator.is_host {
1663                        left_project.host_user_id = collaborator.user_id;
1664                        left_project.host_connection_id = collaborator_connection_id;
1665                    }
1666                }
1667                drop(collaborators);
1668
1669                // Leave projects.
1670                project_collaborator::Entity::delete_many()
1671                    .filter(
1672                        Condition::all()
1673                            .add(
1674                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1675                            )
1676                            .add(
1677                                project_collaborator::Column::ConnectionServerId
1678                                    .eq(connection.owner_id as i32),
1679                            ),
1680                    )
1681                    .exec(&*tx)
1682                    .await?;
1683
1684                // Unshare projects.
1685                project::Entity::delete_many()
1686                    .filter(
1687                        Condition::all()
1688                            .add(project::Column::RoomId.eq(room_id))
1689                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
1690                            .add(
1691                                project::Column::HostConnectionServerId
1692                                    .eq(connection.owner_id as i32),
1693                            ),
1694                    )
1695                    .exec(&*tx)
1696                    .await?;
1697
1698                let room = self.get_room(room_id, &tx).await?;
1699                if room.participants.is_empty() {
1700                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
1701                }
1702
1703                let left_room = LeftRoom {
1704                    room,
1705                    left_projects,
1706                    canceled_calls_to_user_ids,
1707                };
1708
1709                if left_room.room.participants.is_empty() {
1710                    self.rooms.remove(&room_id);
1711                }
1712
1713                Ok(Some((room_id, left_room)))
1714            } else {
1715                Ok(None)
1716            }
1717        })
1718        .await
1719    }
1720
1721    pub async fn follow(
1722        &self,
1723        project_id: ProjectId,
1724        leader_connection: ConnectionId,
1725        follower_connection: ConnectionId,
1726    ) -> Result<RoomGuard<proto::Room>> {
1727        self.room_transaction(|tx| async move {
1728            let room_id = self.room_id_for_project(project_id, &*tx).await?;
1729            follower::ActiveModel {
1730                room_id: ActiveValue::set(room_id),
1731                project_id: ActiveValue::set(project_id),
1732                leader_connection_server_id: ActiveValue::set(ServerId(
1733                    leader_connection.owner_id as i32,
1734                )),
1735                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1736                follower_connection_server_id: ActiveValue::set(ServerId(
1737                    follower_connection.owner_id as i32,
1738                )),
1739                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1740                ..Default::default()
1741            }
1742            .insert(&*tx)
1743            .await?;
1744
1745            Ok((room_id, self.get_room(room_id, &*tx).await?))
1746        })
1747        .await
1748    }
1749
1750    pub async fn unfollow(
1751        &self,
1752        project_id: ProjectId,
1753        leader_connection: ConnectionId,
1754        follower_connection: ConnectionId,
1755    ) -> Result<RoomGuard<proto::Room>> {
1756        self.room_transaction(|tx| async move {
1757            let room_id = self.room_id_for_project(project_id, &*tx).await?;
1758            follower::Entity::delete_many()
1759                .filter(
1760                    Condition::all()
1761                        .add(follower::Column::ProjectId.eq(project_id))
1762                        .add(
1763                            Condition::any()
1764                                .add(
1765                                    follower::Column::LeaderConnectionServerId
1766                                        .eq(leader_connection.owner_id)
1767                                        .and(
1768                                            follower::Column::LeaderConnectionId
1769                                                .eq(leader_connection.id),
1770                                        ),
1771                                )
1772                                .add(
1773                                    follower::Column::FollowerConnectionServerId
1774                                        .eq(follower_connection.owner_id)
1775                                        .and(
1776                                            follower::Column::FollowerConnectionId
1777                                                .eq(follower_connection.id),
1778                                        ),
1779                                ),
1780                        ),
1781                )
1782                .exec(&*tx)
1783                .await?;
1784
1785            Ok((room_id, self.get_room(room_id, &*tx).await?))
1786        })
1787        .await
1788    }
1789
1790    async fn room_id_for_project(
1791        &self,
1792        project_id: ProjectId,
1793        tx: &DatabaseTransaction,
1794    ) -> Result<RoomId> {
1795        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1796        enum QueryAs {
1797            RoomId,
1798        }
1799
1800        Ok(project::Entity::find_by_id(project_id)
1801            .select_only()
1802            .column(project::Column::RoomId)
1803            .into_values::<_, QueryAs>()
1804            .one(&*tx)
1805            .await?
1806            .ok_or_else(|| anyhow!("no such project"))?)
1807    }
1808
1809    pub async fn update_room_participant_location(
1810        &self,
1811        room_id: RoomId,
1812        connection: ConnectionId,
1813        location: proto::ParticipantLocation,
1814    ) -> Result<RoomGuard<proto::Room>> {
1815        self.room_transaction(|tx| async {
1816            let tx = tx;
1817            let location_kind;
1818            let location_project_id;
1819            match location
1820                .variant
1821                .as_ref()
1822                .ok_or_else(|| anyhow!("invalid location"))?
1823            {
1824                proto::participant_location::Variant::SharedProject(project) => {
1825                    location_kind = 0;
1826                    location_project_id = Some(ProjectId::from_proto(project.id));
1827                }
1828                proto::participant_location::Variant::UnsharedProject(_) => {
1829                    location_kind = 1;
1830                    location_project_id = None;
1831                }
1832                proto::participant_location::Variant::External(_) => {
1833                    location_kind = 2;
1834                    location_project_id = None;
1835                }
1836            }
1837
1838            let result = room_participant::Entity::update_many()
1839                .filter(
1840                    Condition::all()
1841                        .add(room_participant::Column::RoomId.eq(room_id))
1842                        .add(
1843                            room_participant::Column::AnsweringConnectionId
1844                                .eq(connection.id as i32),
1845                        )
1846                        .add(
1847                            room_participant::Column::AnsweringConnectionServerId
1848                                .eq(connection.owner_id as i32),
1849                        ),
1850                )
1851                .set(room_participant::ActiveModel {
1852                    location_kind: ActiveValue::set(Some(location_kind)),
1853                    location_project_id: ActiveValue::set(location_project_id),
1854                    ..Default::default()
1855                })
1856                .exec(&*tx)
1857                .await?;
1858
1859            if result.rows_affected == 1 {
1860                let room = self.get_room(room_id, &tx).await?;
1861                Ok((room_id, room))
1862            } else {
1863                Err(anyhow!("could not update room participant location"))?
1864            }
1865        })
1866        .await
1867    }
1868
1869    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1870        self.transaction(|tx| async move {
1871            let participant = room_participant::Entity::find()
1872                .filter(
1873                    Condition::all()
1874                        .add(
1875                            room_participant::Column::AnsweringConnectionId
1876                                .eq(connection.id as i32),
1877                        )
1878                        .add(
1879                            room_participant::Column::AnsweringConnectionServerId
1880                                .eq(connection.owner_id as i32),
1881                        ),
1882                )
1883                .one(&*tx)
1884                .await?
1885                .ok_or_else(|| anyhow!("not a participant in any room"))?;
1886
1887            room_participant::Entity::update(room_participant::ActiveModel {
1888                answering_connection_lost: ActiveValue::set(true),
1889                ..participant.into_active_model()
1890            })
1891            .exec(&*tx)
1892            .await?;
1893
1894            Ok(())
1895        })
1896        .await
1897    }
1898
1899    fn build_incoming_call(
1900        room: &proto::Room,
1901        called_user_id: UserId,
1902    ) -> Option<proto::IncomingCall> {
1903        let pending_participant = room
1904            .pending_participants
1905            .iter()
1906            .find(|participant| participant.user_id == called_user_id.to_proto())?;
1907
1908        Some(proto::IncomingCall {
1909            room_id: room.id,
1910            calling_user_id: pending_participant.calling_user_id,
1911            participant_user_ids: room
1912                .participants
1913                .iter()
1914                .map(|participant| participant.user_id)
1915                .collect(),
1916            initial_project: room.participants.iter().find_map(|participant| {
1917                let initial_project_id = pending_participant.initial_project_id?;
1918                participant
1919                    .projects
1920                    .iter()
1921                    .find(|project| project.id == initial_project_id)
1922                    .cloned()
1923            }),
1924        })
1925    }
1926
1927    async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1928        let db_room = room::Entity::find_by_id(room_id)
1929            .one(tx)
1930            .await?
1931            .ok_or_else(|| anyhow!("could not find room"))?;
1932
1933        let mut db_participants = db_room
1934            .find_related(room_participant::Entity)
1935            .stream(tx)
1936            .await?;
1937        let mut participants = HashMap::default();
1938        let mut pending_participants = Vec::new();
1939        while let Some(db_participant) = db_participants.next().await {
1940            let db_participant = db_participant?;
1941            if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
1942                .answering_connection_id
1943                .zip(db_participant.answering_connection_server_id)
1944            {
1945                let location = match (
1946                    db_participant.location_kind,
1947                    db_participant.location_project_id,
1948                ) {
1949                    (Some(0), Some(project_id)) => {
1950                        Some(proto::participant_location::Variant::SharedProject(
1951                            proto::participant_location::SharedProject {
1952                                id: project_id.to_proto(),
1953                            },
1954                        ))
1955                    }
1956                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1957                        Default::default(),
1958                    )),
1959                    _ => Some(proto::participant_location::Variant::External(
1960                        Default::default(),
1961                    )),
1962                };
1963
1964                let answering_connection = ConnectionId {
1965                    owner_id: answering_connection_server_id.0 as u32,
1966                    id: answering_connection_id as u32,
1967                };
1968                participants.insert(
1969                    answering_connection,
1970                    proto::Participant {
1971                        user_id: db_participant.user_id.to_proto(),
1972                        peer_id: Some(answering_connection.into()),
1973                        projects: Default::default(),
1974                        location: Some(proto::ParticipantLocation { variant: location }),
1975                    },
1976                );
1977            } else {
1978                pending_participants.push(proto::PendingParticipant {
1979                    user_id: db_participant.user_id.to_proto(),
1980                    calling_user_id: db_participant.calling_user_id.to_proto(),
1981                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1982                });
1983            }
1984        }
1985        drop(db_participants);
1986
1987        let mut db_projects = db_room
1988            .find_related(project::Entity)
1989            .find_with_related(worktree::Entity)
1990            .stream(tx)
1991            .await?;
1992
1993        while let Some(row) = db_projects.next().await {
1994            let (db_project, db_worktree) = row?;
1995            let host_connection = db_project.host_connection()?;
1996            if let Some(participant) = participants.get_mut(&host_connection) {
1997                let project = if let Some(project) = participant
1998                    .projects
1999                    .iter_mut()
2000                    .find(|project| project.id == db_project.id.to_proto())
2001                {
2002                    project
2003                } else {
2004                    participant.projects.push(proto::ParticipantProject {
2005                        id: db_project.id.to_proto(),
2006                        worktree_root_names: Default::default(),
2007                    });
2008                    participant.projects.last_mut().unwrap()
2009                };
2010
2011                if let Some(db_worktree) = db_worktree {
2012                    if db_worktree.visible {
2013                        project.worktree_root_names.push(db_worktree.root_name);
2014                    }
2015                }
2016            }
2017        }
2018        drop(db_projects);
2019
2020        let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
2021        let mut followers = Vec::new();
2022        while let Some(db_follower) = db_followers.next().await {
2023            let db_follower = db_follower?;
2024            followers.push(proto::Follower {
2025                leader_id: Some(db_follower.leader_connection().into()),
2026                follower_id: Some(db_follower.follower_connection().into()),
2027            });
2028        }
2029
2030        Ok(proto::Room {
2031            id: db_room.id.to_proto(),
2032            live_kit_room: db_room.live_kit_room,
2033            participants: participants.into_values().collect(),
2034            pending_participants,
2035            followers,
2036        })
2037    }
2038
2039    // projects
2040
2041    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
2042        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2043        enum QueryAs {
2044            Count,
2045        }
2046
2047        self.transaction(|tx| async move {
2048            Ok(project::Entity::find()
2049                .select_only()
2050                .column_as(project::Column::Id.count(), QueryAs::Count)
2051                .inner_join(user::Entity)
2052                .filter(user::Column::Admin.eq(false))
2053                .into_values::<_, QueryAs>()
2054                .one(&*tx)
2055                .await?
2056                .unwrap_or(0i64) as usize)
2057        })
2058        .await
2059    }
2060
2061    pub async fn share_project(
2062        &self,
2063        room_id: RoomId,
2064        connection: ConnectionId,
2065        worktrees: &[proto::WorktreeMetadata],
2066    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
2067        self.room_transaction(|tx| async move {
2068            let participant = room_participant::Entity::find()
2069                .filter(
2070                    Condition::all()
2071                        .add(
2072                            room_participant::Column::AnsweringConnectionId
2073                                .eq(connection.id as i32),
2074                        )
2075                        .add(
2076                            room_participant::Column::AnsweringConnectionServerId
2077                                .eq(connection.owner_id as i32),
2078                        ),
2079                )
2080                .one(&*tx)
2081                .await?
2082                .ok_or_else(|| anyhow!("could not find participant"))?;
2083            if participant.room_id != room_id {
2084                return Err(anyhow!("shared project on unexpected room"))?;
2085            }
2086
2087            let project = project::ActiveModel {
2088                room_id: ActiveValue::set(participant.room_id),
2089                host_user_id: ActiveValue::set(participant.user_id),
2090                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
2091                host_connection_server_id: ActiveValue::set(Some(ServerId(
2092                    connection.owner_id as i32,
2093                ))),
2094                ..Default::default()
2095            }
2096            .insert(&*tx)
2097            .await?;
2098
2099            if !worktrees.is_empty() {
2100                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2101                    worktree::ActiveModel {
2102                        id: ActiveValue::set(worktree.id as i64),
2103                        project_id: ActiveValue::set(project.id),
2104                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
2105                        root_name: ActiveValue::set(worktree.root_name.clone()),
2106                        visible: ActiveValue::set(worktree.visible),
2107                        scan_id: ActiveValue::set(0),
2108                        completed_scan_id: ActiveValue::set(0),
2109                    }
2110                }))
2111                .exec(&*tx)
2112                .await?;
2113            }
2114
2115            project_collaborator::ActiveModel {
2116                project_id: ActiveValue::set(project.id),
2117                connection_id: ActiveValue::set(connection.id as i32),
2118                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2119                user_id: ActiveValue::set(participant.user_id),
2120                replica_id: ActiveValue::set(ReplicaId(0)),
2121                is_host: ActiveValue::set(true),
2122                ..Default::default()
2123            }
2124            .insert(&*tx)
2125            .await?;
2126
2127            let room = self.get_room(room_id, &tx).await?;
2128            Ok((room_id, (project.id, room)))
2129        })
2130        .await
2131    }
2132
2133    pub async fn unshare_project(
2134        &self,
2135        project_id: ProjectId,
2136        connection: ConnectionId,
2137    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2138        self.room_transaction(|tx| async move {
2139            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2140
2141            let project = project::Entity::find_by_id(project_id)
2142                .one(&*tx)
2143                .await?
2144                .ok_or_else(|| anyhow!("project not found"))?;
2145            if project.host_connection()? == connection {
2146                let room_id = project.room_id;
2147                project::Entity::delete(project.into_active_model())
2148                    .exec(&*tx)
2149                    .await?;
2150                let room = self.get_room(room_id, &tx).await?;
2151                Ok((room_id, (room, guest_connection_ids)))
2152            } else {
2153                Err(anyhow!("cannot unshare a project hosted by another user"))?
2154            }
2155        })
2156        .await
2157    }
2158
2159    pub async fn update_project(
2160        &self,
2161        project_id: ProjectId,
2162        connection: ConnectionId,
2163        worktrees: &[proto::WorktreeMetadata],
2164    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2165        self.room_transaction(|tx| async move {
2166            let project = project::Entity::find_by_id(project_id)
2167                .filter(
2168                    Condition::all()
2169                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2170                        .add(
2171                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2172                        ),
2173                )
2174                .one(&*tx)
2175                .await?
2176                .ok_or_else(|| anyhow!("no such project"))?;
2177
2178            self.update_project_worktrees(project.id, worktrees, &tx)
2179                .await?;
2180
2181            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2182            let room = self.get_room(project.room_id, &tx).await?;
2183            Ok((project.room_id, (room, guest_connection_ids)))
2184        })
2185        .await
2186    }
2187
2188    async fn update_project_worktrees(
2189        &self,
2190        project_id: ProjectId,
2191        worktrees: &[proto::WorktreeMetadata],
2192        tx: &DatabaseTransaction,
2193    ) -> Result<()> {
2194        if !worktrees.is_empty() {
2195            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2196                id: ActiveValue::set(worktree.id as i64),
2197                project_id: ActiveValue::set(project_id),
2198                abs_path: ActiveValue::set(worktree.abs_path.clone()),
2199                root_name: ActiveValue::set(worktree.root_name.clone()),
2200                visible: ActiveValue::set(worktree.visible),
2201                scan_id: ActiveValue::set(0),
2202                completed_scan_id: ActiveValue::set(0),
2203            }))
2204            .on_conflict(
2205                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2206                    .update_column(worktree::Column::RootName)
2207                    .to_owned(),
2208            )
2209            .exec(&*tx)
2210            .await?;
2211        }
2212
2213        worktree::Entity::delete_many()
2214            .filter(worktree::Column::ProjectId.eq(project_id).and(
2215                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2216            ))
2217            .exec(&*tx)
2218            .await?;
2219
2220        Ok(())
2221    }
2222
2223    pub async fn update_worktree(
2224        &self,
2225        update: &proto::UpdateWorktree,
2226        connection: ConnectionId,
2227    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2228        self.room_transaction(|tx| async move {
2229            let project_id = ProjectId::from_proto(update.project_id);
2230            let worktree_id = update.worktree_id as i64;
2231
2232            // Ensure the update comes from the host.
2233            let project = project::Entity::find_by_id(project_id)
2234                .filter(
2235                    Condition::all()
2236                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2237                        .add(
2238                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2239                        ),
2240                )
2241                .one(&*tx)
2242                .await?
2243                .ok_or_else(|| anyhow!("no such project"))?;
2244            let room_id = project.room_id;
2245
2246            // Update metadata.
2247            worktree::Entity::update(worktree::ActiveModel {
2248                id: ActiveValue::set(worktree_id),
2249                project_id: ActiveValue::set(project_id),
2250                root_name: ActiveValue::set(update.root_name.clone()),
2251                scan_id: ActiveValue::set(update.scan_id as i64),
2252                completed_scan_id: if update.is_last_update {
2253                    ActiveValue::set(update.scan_id as i64)
2254                } else {
2255                    ActiveValue::default()
2256                },
2257                abs_path: ActiveValue::set(update.abs_path.clone()),
2258                ..Default::default()
2259            })
2260            .exec(&*tx)
2261            .await?;
2262
2263            if !update.updated_entries.is_empty() {
2264                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2265                    let mtime = entry.mtime.clone().unwrap_or_default();
2266                    worktree_entry::ActiveModel {
2267                        project_id: ActiveValue::set(project_id),
2268                        worktree_id: ActiveValue::set(worktree_id),
2269                        id: ActiveValue::set(entry.id as i64),
2270                        is_dir: ActiveValue::set(entry.is_dir),
2271                        path: ActiveValue::set(entry.path.clone()),
2272                        inode: ActiveValue::set(entry.inode as i64),
2273                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2274                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2275                        is_symlink: ActiveValue::set(entry.is_symlink),
2276                        is_ignored: ActiveValue::set(entry.is_ignored),
2277                        is_deleted: ActiveValue::set(false),
2278                        scan_id: ActiveValue::set(update.scan_id as i64),
2279                    }
2280                }))
2281                .on_conflict(
2282                    OnConflict::columns([
2283                        worktree_entry::Column::ProjectId,
2284                        worktree_entry::Column::WorktreeId,
2285                        worktree_entry::Column::Id,
2286                    ])
2287                    .update_columns([
2288                        worktree_entry::Column::IsDir,
2289                        worktree_entry::Column::Path,
2290                        worktree_entry::Column::Inode,
2291                        worktree_entry::Column::MtimeSeconds,
2292                        worktree_entry::Column::MtimeNanos,
2293                        worktree_entry::Column::IsSymlink,
2294                        worktree_entry::Column::IsIgnored,
2295                        worktree_entry::Column::ScanId,
2296                    ])
2297                    .to_owned(),
2298                )
2299                .exec(&*tx)
2300                .await?;
2301            }
2302
2303            if !update.removed_entries.is_empty() {
2304                worktree_entry::Entity::update_many()
2305                    .filter(
2306                        worktree_entry::Column::ProjectId
2307                            .eq(project_id)
2308                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2309                            .and(
2310                                worktree_entry::Column::Id
2311                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2312                            ),
2313                    )
2314                    .set(worktree_entry::ActiveModel {
2315                        is_deleted: ActiveValue::Set(true),
2316                        scan_id: ActiveValue::Set(update.scan_id as i64),
2317                        ..Default::default()
2318                    })
2319                    .exec(&*tx)
2320                    .await?;
2321            }
2322
2323            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2324            Ok((room_id, connection_ids))
2325        })
2326        .await
2327    }
2328
2329    pub async fn update_diagnostic_summary(
2330        &self,
2331        update: &proto::UpdateDiagnosticSummary,
2332        connection: ConnectionId,
2333    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2334        self.room_transaction(|tx| async move {
2335            let project_id = ProjectId::from_proto(update.project_id);
2336            let worktree_id = update.worktree_id as i64;
2337            let summary = update
2338                .summary
2339                .as_ref()
2340                .ok_or_else(|| anyhow!("invalid summary"))?;
2341
2342            // Ensure the update comes from the host.
2343            let project = project::Entity::find_by_id(project_id)
2344                .one(&*tx)
2345                .await?
2346                .ok_or_else(|| anyhow!("no such project"))?;
2347            if project.host_connection()? != connection {
2348                return Err(anyhow!("can't update a project hosted by someone else"))?;
2349            }
2350
2351            // Update summary.
2352            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2353                project_id: ActiveValue::set(project_id),
2354                worktree_id: ActiveValue::set(worktree_id),
2355                path: ActiveValue::set(summary.path.clone()),
2356                language_server_id: ActiveValue::set(summary.language_server_id as i64),
2357                error_count: ActiveValue::set(summary.error_count as i32),
2358                warning_count: ActiveValue::set(summary.warning_count as i32),
2359                ..Default::default()
2360            })
2361            .on_conflict(
2362                OnConflict::columns([
2363                    worktree_diagnostic_summary::Column::ProjectId,
2364                    worktree_diagnostic_summary::Column::WorktreeId,
2365                    worktree_diagnostic_summary::Column::Path,
2366                ])
2367                .update_columns([
2368                    worktree_diagnostic_summary::Column::LanguageServerId,
2369                    worktree_diagnostic_summary::Column::ErrorCount,
2370                    worktree_diagnostic_summary::Column::WarningCount,
2371                ])
2372                .to_owned(),
2373            )
2374            .exec(&*tx)
2375            .await?;
2376
2377            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2378            Ok((project.room_id, connection_ids))
2379        })
2380        .await
2381    }
2382
2383    pub async fn start_language_server(
2384        &self,
2385        update: &proto::StartLanguageServer,
2386        connection: ConnectionId,
2387    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2388        self.room_transaction(|tx| async move {
2389            let project_id = ProjectId::from_proto(update.project_id);
2390            let server = update
2391                .server
2392                .as_ref()
2393                .ok_or_else(|| anyhow!("invalid language server"))?;
2394
2395            // Ensure the update comes from the host.
2396            let project = project::Entity::find_by_id(project_id)
2397                .one(&*tx)
2398                .await?
2399                .ok_or_else(|| anyhow!("no such project"))?;
2400            if project.host_connection()? != connection {
2401                return Err(anyhow!("can't update a project hosted by someone else"))?;
2402            }
2403
2404            // Add the newly-started language server.
2405            language_server::Entity::insert(language_server::ActiveModel {
2406                project_id: ActiveValue::set(project_id),
2407                id: ActiveValue::set(server.id as i64),
2408                name: ActiveValue::set(server.name.clone()),
2409                ..Default::default()
2410            })
2411            .on_conflict(
2412                OnConflict::columns([
2413                    language_server::Column::ProjectId,
2414                    language_server::Column::Id,
2415                ])
2416                .update_column(language_server::Column::Name)
2417                .to_owned(),
2418            )
2419            .exec(&*tx)
2420            .await?;
2421
2422            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2423            Ok((project.room_id, connection_ids))
2424        })
2425        .await
2426    }
2427
2428    pub async fn join_project(
2429        &self,
2430        project_id: ProjectId,
2431        connection: ConnectionId,
2432    ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2433        self.room_transaction(|tx| async move {
2434            let participant = room_participant::Entity::find()
2435                .filter(
2436                    Condition::all()
2437                        .add(
2438                            room_participant::Column::AnsweringConnectionId
2439                                .eq(connection.id as i32),
2440                        )
2441                        .add(
2442                            room_participant::Column::AnsweringConnectionServerId
2443                                .eq(connection.owner_id as i32),
2444                        ),
2445                )
2446                .one(&*tx)
2447                .await?
2448                .ok_or_else(|| anyhow!("must join a room first"))?;
2449
2450            let project = project::Entity::find_by_id(project_id)
2451                .one(&*tx)
2452                .await?
2453                .ok_or_else(|| anyhow!("no such project"))?;
2454            if project.room_id != participant.room_id {
2455                return Err(anyhow!("no such project"))?;
2456            }
2457
2458            let mut collaborators = project
2459                .find_related(project_collaborator::Entity)
2460                .all(&*tx)
2461                .await?;
2462            let replica_ids = collaborators
2463                .iter()
2464                .map(|c| c.replica_id)
2465                .collect::<HashSet<_>>();
2466            let mut replica_id = ReplicaId(1);
2467            while replica_ids.contains(&replica_id) {
2468                replica_id.0 += 1;
2469            }
2470            let new_collaborator = project_collaborator::ActiveModel {
2471                project_id: ActiveValue::set(project_id),
2472                connection_id: ActiveValue::set(connection.id as i32),
2473                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2474                user_id: ActiveValue::set(participant.user_id),
2475                replica_id: ActiveValue::set(replica_id),
2476                is_host: ActiveValue::set(false),
2477                ..Default::default()
2478            }
2479            .insert(&*tx)
2480            .await?;
2481            collaborators.push(new_collaborator);
2482
2483            let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2484            let mut worktrees = db_worktrees
2485                .into_iter()
2486                .map(|db_worktree| {
2487                    (
2488                        db_worktree.id as u64,
2489                        Worktree {
2490                            id: db_worktree.id as u64,
2491                            abs_path: db_worktree.abs_path,
2492                            root_name: db_worktree.root_name,
2493                            visible: db_worktree.visible,
2494                            entries: Default::default(),
2495                            diagnostic_summaries: Default::default(),
2496                            scan_id: db_worktree.scan_id as u64,
2497                            completed_scan_id: db_worktree.completed_scan_id as u64,
2498                        },
2499                    )
2500                })
2501                .collect::<BTreeMap<_, _>>();
2502
2503            // Populate worktree entries.
2504            {
2505                let mut db_entries = worktree_entry::Entity::find()
2506                    .filter(
2507                        Condition::all()
2508                            .add(worktree_entry::Column::ProjectId.eq(project_id))
2509                            .add(worktree_entry::Column::IsDeleted.eq(false)),
2510                    )
2511                    .stream(&*tx)
2512                    .await?;
2513                while let Some(db_entry) = db_entries.next().await {
2514                    let db_entry = db_entry?;
2515                    if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2516                        worktree.entries.push(proto::Entry {
2517                            id: db_entry.id as u64,
2518                            is_dir: db_entry.is_dir,
2519                            path: db_entry.path,
2520                            inode: db_entry.inode as u64,
2521                            mtime: Some(proto::Timestamp {
2522                                seconds: db_entry.mtime_seconds as u64,
2523                                nanos: db_entry.mtime_nanos as u32,
2524                            }),
2525                            is_symlink: db_entry.is_symlink,
2526                            is_ignored: db_entry.is_ignored,
2527                        });
2528                    }
2529                }
2530            }
2531
2532            // Populate worktree diagnostic summaries.
2533            {
2534                let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2535                    .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2536                    .stream(&*tx)
2537                    .await?;
2538                while let Some(db_summary) = db_summaries.next().await {
2539                    let db_summary = db_summary?;
2540                    if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2541                        worktree
2542                            .diagnostic_summaries
2543                            .push(proto::DiagnosticSummary {
2544                                path: db_summary.path,
2545                                language_server_id: db_summary.language_server_id as u64,
2546                                error_count: db_summary.error_count as u32,
2547                                warning_count: db_summary.warning_count as u32,
2548                            });
2549                    }
2550                }
2551            }
2552
2553            // Populate language servers.
2554            let language_servers = project
2555                .find_related(language_server::Entity)
2556                .all(&*tx)
2557                .await?;
2558
2559            let room_id = project.room_id;
2560            let project = Project {
2561                collaborators: collaborators
2562                    .into_iter()
2563                    .map(|collaborator| ProjectCollaborator {
2564                        connection_id: collaborator.connection(),
2565                        user_id: collaborator.user_id,
2566                        replica_id: collaborator.replica_id,
2567                        is_host: collaborator.is_host,
2568                    })
2569                    .collect(),
2570                worktrees,
2571                language_servers: language_servers
2572                    .into_iter()
2573                    .map(|language_server| proto::LanguageServer {
2574                        id: language_server.id as u64,
2575                        name: language_server.name,
2576                    })
2577                    .collect(),
2578            };
2579            Ok((room_id, (project, replica_id as ReplicaId)))
2580        })
2581        .await
2582    }
2583
2584    pub async fn leave_project(
2585        &self,
2586        project_id: ProjectId,
2587        connection: ConnectionId,
2588    ) -> Result<RoomGuard<LeftProject>> {
2589        self.room_transaction(|tx| async move {
2590            let result = project_collaborator::Entity::delete_many()
2591                .filter(
2592                    Condition::all()
2593                        .add(project_collaborator::Column::ProjectId.eq(project_id))
2594                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2595                        .add(
2596                            project_collaborator::Column::ConnectionServerId
2597                                .eq(connection.owner_id as i32),
2598                        ),
2599                )
2600                .exec(&*tx)
2601                .await?;
2602            if result.rows_affected == 0 {
2603                Err(anyhow!("not a collaborator on this project"))?;
2604            }
2605
2606            let project = project::Entity::find_by_id(project_id)
2607                .one(&*tx)
2608                .await?
2609                .ok_or_else(|| anyhow!("no such project"))?;
2610            let collaborators = project
2611                .find_related(project_collaborator::Entity)
2612                .all(&*tx)
2613                .await?;
2614            let connection_ids = collaborators
2615                .into_iter()
2616                .map(|collaborator| collaborator.connection())
2617                .collect();
2618
2619            let left_project = LeftProject {
2620                id: project_id,
2621                host_user_id: project.host_user_id,
2622                host_connection_id: project.host_connection()?,
2623                connection_ids,
2624            };
2625            Ok((project.room_id, left_project))
2626        })
2627        .await
2628    }
2629
2630    pub async fn project_collaborators(
2631        &self,
2632        project_id: ProjectId,
2633        connection_id: ConnectionId,
2634    ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
2635        self.room_transaction(|tx| async move {
2636            let project = project::Entity::find_by_id(project_id)
2637                .one(&*tx)
2638                .await?
2639                .ok_or_else(|| anyhow!("no such project"))?;
2640            let collaborators = project_collaborator::Entity::find()
2641                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2642                .all(&*tx)
2643                .await?
2644                .into_iter()
2645                .map(|collaborator| ProjectCollaborator {
2646                    connection_id: collaborator.connection(),
2647                    user_id: collaborator.user_id,
2648                    replica_id: collaborator.replica_id,
2649                    is_host: collaborator.is_host,
2650                })
2651                .collect::<Vec<_>>();
2652
2653            if collaborators
2654                .iter()
2655                .any(|collaborator| collaborator.connection_id == connection_id)
2656            {
2657                Ok((project.room_id, collaborators))
2658            } else {
2659                Err(anyhow!("no such project"))?
2660            }
2661        })
2662        .await
2663    }
2664
2665    pub async fn project_connection_ids(
2666        &self,
2667        project_id: ProjectId,
2668        connection_id: ConnectionId,
2669    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
2670        self.room_transaction(|tx| async move {
2671            let project = project::Entity::find_by_id(project_id)
2672                .one(&*tx)
2673                .await?
2674                .ok_or_else(|| anyhow!("no such project"))?;
2675            let mut collaborators = project_collaborator::Entity::find()
2676                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2677                .stream(&*tx)
2678                .await?;
2679
2680            let mut connection_ids = HashSet::default();
2681            while let Some(collaborator) = collaborators.next().await {
2682                let collaborator = collaborator?;
2683                connection_ids.insert(collaborator.connection());
2684            }
2685
2686            if connection_ids.contains(&connection_id) {
2687                Ok((project.room_id, connection_ids))
2688            } else {
2689                Err(anyhow!("no such project"))?
2690            }
2691        })
2692        .await
2693    }
2694
2695    async fn project_guest_connection_ids(
2696        &self,
2697        project_id: ProjectId,
2698        tx: &DatabaseTransaction,
2699    ) -> Result<Vec<ConnectionId>> {
2700        let mut collaborators = project_collaborator::Entity::find()
2701            .filter(
2702                project_collaborator::Column::ProjectId
2703                    .eq(project_id)
2704                    .and(project_collaborator::Column::IsHost.eq(false)),
2705            )
2706            .stream(tx)
2707            .await?;
2708
2709        let mut guest_connection_ids = Vec::new();
2710        while let Some(collaborator) = collaborators.next().await {
2711            let collaborator = collaborator?;
2712            guest_connection_ids.push(collaborator.connection());
2713        }
2714        Ok(guest_connection_ids)
2715    }
2716
2717    // access tokens
2718
2719    pub async fn create_access_token_hash(
2720        &self,
2721        user_id: UserId,
2722        access_token_hash: &str,
2723        max_access_token_count: usize,
2724    ) -> Result<()> {
2725        self.transaction(|tx| async {
2726            let tx = tx;
2727
2728            access_token::ActiveModel {
2729                user_id: ActiveValue::set(user_id),
2730                hash: ActiveValue::set(access_token_hash.into()),
2731                ..Default::default()
2732            }
2733            .insert(&*tx)
2734            .await?;
2735
2736            access_token::Entity::delete_many()
2737                .filter(
2738                    access_token::Column::Id.in_subquery(
2739                        Query::select()
2740                            .column(access_token::Column::Id)
2741                            .from(access_token::Entity)
2742                            .and_where(access_token::Column::UserId.eq(user_id))
2743                            .order_by(access_token::Column::Id, sea_orm::Order::Desc)
2744                            .limit(10000)
2745                            .offset(max_access_token_count as u64)
2746                            .to_owned(),
2747                    ),
2748                )
2749                .exec(&*tx)
2750                .await?;
2751            Ok(())
2752        })
2753        .await
2754    }
2755
2756    pub async fn get_access_token_hashes(&self, user_id: UserId) -> Result<Vec<String>> {
2757        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2758        enum QueryAs {
2759            Hash,
2760        }
2761
2762        self.transaction(|tx| async move {
2763            Ok(access_token::Entity::find()
2764                .select_only()
2765                .column(access_token::Column::Hash)
2766                .filter(access_token::Column::UserId.eq(user_id))
2767                .order_by_desc(access_token::Column::Id)
2768                .into_values::<_, QueryAs>()
2769                .all(&*tx)
2770                .await?)
2771        })
2772        .await
2773    }
2774
2775    async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
2776    where
2777        F: Send + Fn(TransactionHandle) -> Fut,
2778        Fut: Send + Future<Output = Result<T>>,
2779    {
2780        let body = async {
2781            loop {
2782                let (tx, result) = self.with_transaction(&f).await?;
2783                match result {
2784                    Ok(result) => {
2785                        match tx.commit().await.map_err(Into::into) {
2786                            Ok(()) => return Ok(result),
2787                            Err(error) => {
2788                                if is_serialization_error(&error) {
2789                                    // Retry (don't break the loop)
2790                                } else {
2791                                    return Err(error);
2792                                }
2793                            }
2794                        }
2795                    }
2796                    Err(error) => {
2797                        tx.rollback().await?;
2798                        if is_serialization_error(&error) {
2799                            // Retry (don't break the loop)
2800                        } else {
2801                            return Err(error);
2802                        }
2803                    }
2804                }
2805            }
2806        };
2807
2808        self.run(body).await
2809    }
2810
2811    async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
2812    where
2813        F: Send + Fn(TransactionHandle) -> Fut,
2814        Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
2815    {
2816        let body = async {
2817            loop {
2818                let (tx, result) = self.with_transaction(&f).await?;
2819                match result {
2820                    Ok(Some((room_id, data))) => {
2821                        let lock = self.rooms.entry(room_id).or_default().clone();
2822                        let _guard = lock.lock_owned().await;
2823                        match tx.commit().await.map_err(Into::into) {
2824                            Ok(()) => {
2825                                return Ok(Some(RoomGuard {
2826                                    data,
2827                                    _guard,
2828                                    _not_send: PhantomData,
2829                                }));
2830                            }
2831                            Err(error) => {
2832                                if is_serialization_error(&error) {
2833                                    // Retry (don't break the loop)
2834                                } else {
2835                                    return Err(error);
2836                                }
2837                            }
2838                        }
2839                    }
2840                    Ok(None) => {
2841                        match tx.commit().await.map_err(Into::into) {
2842                            Ok(()) => return Ok(None),
2843                            Err(error) => {
2844                                if is_serialization_error(&error) {
2845                                    // Retry (don't break the loop)
2846                                } else {
2847                                    return Err(error);
2848                                }
2849                            }
2850                        }
2851                    }
2852                    Err(error) => {
2853                        tx.rollback().await?;
2854                        if is_serialization_error(&error) {
2855                            // Retry (don't break the loop)
2856                        } else {
2857                            return Err(error);
2858                        }
2859                    }
2860                }
2861            }
2862        };
2863
2864        self.run(body).await
2865    }
2866
2867    async fn room_transaction<F, Fut, T>(&self, f: F) -> Result<RoomGuard<T>>
2868    where
2869        F: Send + Fn(TransactionHandle) -> Fut,
2870        Fut: Send + Future<Output = Result<(RoomId, T)>>,
2871    {
2872        let data = self
2873            .optional_room_transaction(move |tx| {
2874                let future = f(tx);
2875                async {
2876                    let data = future.await?;
2877                    Ok(Some(data))
2878                }
2879            })
2880            .await?;
2881        Ok(data.unwrap())
2882    }
2883
2884    async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
2885    where
2886        F: Send + Fn(TransactionHandle) -> Fut,
2887        Fut: Send + Future<Output = Result<T>>,
2888    {
2889        let tx = self
2890            .pool
2891            .begin_with_config(Some(IsolationLevel::Serializable), None)
2892            .await?;
2893
2894        let mut tx = Arc::new(Some(tx));
2895        let result = f(TransactionHandle(tx.clone())).await;
2896        let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
2897            return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
2898        };
2899
2900        Ok((tx, result))
2901    }
2902
2903    async fn run<F, T>(&self, future: F) -> T
2904    where
2905        F: Future<Output = T>,
2906    {
2907        #[cfg(test)]
2908        {
2909            if let Some(background) = self.background.as_ref() {
2910                background.simulate_random_delay().await;
2911            }
2912
2913            self.runtime.as_ref().unwrap().block_on(future)
2914        }
2915
2916        #[cfg(not(test))]
2917        {
2918            future.await
2919        }
2920    }
2921}
2922
2923fn is_serialization_error(error: &Error) -> bool {
2924    const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
2925    match error {
2926        Error::Database(
2927            DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
2928            | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
2929        ) if error
2930            .as_database_error()
2931            .and_then(|error| error.code())
2932            .as_deref()
2933            == Some(SERIALIZATION_FAILURE_CODE) =>
2934        {
2935            true
2936        }
2937        _ => false,
2938    }
2939}
2940
2941struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
2942
2943impl Deref for TransactionHandle {
2944    type Target = DatabaseTransaction;
2945
2946    fn deref(&self) -> &Self::Target {
2947        self.0.as_ref().as_ref().unwrap()
2948    }
2949}
2950
2951pub struct RoomGuard<T> {
2952    data: T,
2953    _guard: OwnedMutexGuard<()>,
2954    _not_send: PhantomData<Rc<()>>,
2955}
2956
2957impl<T> Deref for RoomGuard<T> {
2958    type Target = T;
2959
2960    fn deref(&self) -> &T {
2961        &self.data
2962    }
2963}
2964
2965impl<T> DerefMut for RoomGuard<T> {
2966    fn deref_mut(&mut self) -> &mut T {
2967        &mut self.data
2968    }
2969}
2970
2971#[derive(Debug, Serialize, Deserialize)]
2972pub struct NewUserParams {
2973    pub github_login: String,
2974    pub github_user_id: i32,
2975    pub invite_count: i32,
2976}
2977
2978#[derive(Debug)]
2979pub struct NewUserResult {
2980    pub user_id: UserId,
2981    pub metrics_id: String,
2982    pub inviting_user_id: Option<UserId>,
2983    pub signup_device_id: Option<String>,
2984}
2985
2986fn random_invite_code() -> String {
2987    nanoid::nanoid!(16)
2988}
2989
2990fn random_email_confirmation_code() -> String {
2991    nanoid::nanoid!(64)
2992}
2993
2994macro_rules! id_type {
2995    ($name:ident) => {
2996        #[derive(
2997            Clone,
2998            Copy,
2999            Debug,
3000            Default,
3001            PartialEq,
3002            Eq,
3003            PartialOrd,
3004            Ord,
3005            Hash,
3006            Serialize,
3007            Deserialize,
3008        )]
3009        #[serde(transparent)]
3010        pub struct $name(pub i32);
3011
3012        impl $name {
3013            #[allow(unused)]
3014            pub const MAX: Self = Self(i32::MAX);
3015
3016            #[allow(unused)]
3017            pub fn from_proto(value: u64) -> Self {
3018                Self(value as i32)
3019            }
3020
3021            #[allow(unused)]
3022            pub fn to_proto(self) -> u64 {
3023                self.0 as u64
3024            }
3025        }
3026
3027        impl std::fmt::Display for $name {
3028            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
3029                self.0.fmt(f)
3030            }
3031        }
3032
3033        impl From<$name> for sea_query::Value {
3034            fn from(value: $name) -> Self {
3035                sea_query::Value::Int(Some(value.0))
3036            }
3037        }
3038
3039        impl sea_orm::TryGetable for $name {
3040            fn try_get(
3041                res: &sea_orm::QueryResult,
3042                pre: &str,
3043                col: &str,
3044            ) -> Result<Self, sea_orm::TryGetError> {
3045                Ok(Self(i32::try_get(res, pre, col)?))
3046            }
3047        }
3048
3049        impl sea_query::ValueType for $name {
3050            fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
3051                match v {
3052                    Value::TinyInt(Some(int)) => {
3053                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3054                    }
3055                    Value::SmallInt(Some(int)) => {
3056                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3057                    }
3058                    Value::Int(Some(int)) => {
3059                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3060                    }
3061                    Value::BigInt(Some(int)) => {
3062                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3063                    }
3064                    Value::TinyUnsigned(Some(int)) => {
3065                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3066                    }
3067                    Value::SmallUnsigned(Some(int)) => {
3068                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3069                    }
3070                    Value::Unsigned(Some(int)) => {
3071                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3072                    }
3073                    Value::BigUnsigned(Some(int)) => {
3074                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3075                    }
3076                    _ => Err(sea_query::ValueTypeErr),
3077                }
3078            }
3079
3080            fn type_name() -> String {
3081                stringify!($name).into()
3082            }
3083
3084            fn array_type() -> sea_query::ArrayType {
3085                sea_query::ArrayType::Int
3086            }
3087
3088            fn column_type() -> sea_query::ColumnType {
3089                sea_query::ColumnType::Integer(None)
3090            }
3091        }
3092
3093        impl sea_orm::TryFromU64 for $name {
3094            fn try_from_u64(n: u64) -> Result<Self, DbErr> {
3095                Ok(Self(n.try_into().map_err(|_| {
3096                    DbErr::ConvertFromU64(concat!(
3097                        "error converting ",
3098                        stringify!($name),
3099                        " to u64"
3100                    ))
3101                })?))
3102            }
3103        }
3104
3105        impl sea_query::Nullable for $name {
3106            fn null() -> Value {
3107                Value::Int(None)
3108            }
3109        }
3110    };
3111}
3112
3113id_type!(AccessTokenId);
3114id_type!(ContactId);
3115id_type!(FollowerId);
3116id_type!(RoomId);
3117id_type!(RoomParticipantId);
3118id_type!(ProjectId);
3119id_type!(ProjectCollaboratorId);
3120id_type!(ReplicaId);
3121id_type!(ServerId);
3122id_type!(SignupId);
3123id_type!(UserId);
3124
3125pub struct RejoinedRoom {
3126    pub room: proto::Room,
3127    pub rejoined_projects: Vec<RejoinedProject>,
3128    pub reshared_projects: Vec<ResharedProject>,
3129}
3130
3131pub struct ResharedProject {
3132    pub id: ProjectId,
3133    pub old_connection_id: ConnectionId,
3134    pub collaborators: Vec<ProjectCollaborator>,
3135    pub worktrees: Vec<proto::WorktreeMetadata>,
3136}
3137
3138pub struct RejoinedProject {
3139    pub id: ProjectId,
3140    pub old_connection_id: ConnectionId,
3141    pub collaborators: Vec<ProjectCollaborator>,
3142    pub worktrees: Vec<RejoinedWorktree>,
3143    pub language_servers: Vec<proto::LanguageServer>,
3144}
3145
3146#[derive(Debug)]
3147pub struct RejoinedWorktree {
3148    pub id: u64,
3149    pub abs_path: String,
3150    pub root_name: String,
3151    pub visible: bool,
3152    pub updated_entries: Vec<proto::Entry>,
3153    pub removed_entries: Vec<u64>,
3154    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3155    pub scan_id: u64,
3156    pub completed_scan_id: u64,
3157}
3158
3159pub struct LeftRoom {
3160    pub room: proto::Room,
3161    pub left_projects: HashMap<ProjectId, LeftProject>,
3162    pub canceled_calls_to_user_ids: Vec<UserId>,
3163}
3164
3165pub struct RefreshedRoom {
3166    pub room: proto::Room,
3167    pub stale_participant_user_ids: Vec<UserId>,
3168    pub canceled_calls_to_user_ids: Vec<UserId>,
3169}
3170
3171pub struct Project {
3172    pub collaborators: Vec<ProjectCollaborator>,
3173    pub worktrees: BTreeMap<u64, Worktree>,
3174    pub language_servers: Vec<proto::LanguageServer>,
3175}
3176
3177pub struct ProjectCollaborator {
3178    pub connection_id: ConnectionId,
3179    pub user_id: UserId,
3180    pub replica_id: ReplicaId,
3181    pub is_host: bool,
3182}
3183
3184impl ProjectCollaborator {
3185    pub fn to_proto(&self) -> proto::Collaborator {
3186        proto::Collaborator {
3187            peer_id: Some(self.connection_id.into()),
3188            replica_id: self.replica_id.0 as u32,
3189            user_id: self.user_id.to_proto(),
3190        }
3191    }
3192}
3193
3194#[derive(Debug)]
3195pub struct LeftProject {
3196    pub id: ProjectId,
3197    pub host_user_id: UserId,
3198    pub host_connection_id: ConnectionId,
3199    pub connection_ids: Vec<ConnectionId>,
3200}
3201
3202pub struct Worktree {
3203    pub id: u64,
3204    pub abs_path: String,
3205    pub root_name: String,
3206    pub visible: bool,
3207    pub entries: Vec<proto::Entry>,
3208    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3209    pub scan_id: u64,
3210    pub completed_scan_id: u64,
3211}
3212
3213#[cfg(test)]
3214pub use test::*;
3215
3216#[cfg(test)]
3217mod test {
3218    use super::*;
3219    use gpui::executor::Background;
3220    use lazy_static::lazy_static;
3221    use parking_lot::Mutex;
3222    use rand::prelude::*;
3223    use sea_orm::ConnectionTrait;
3224    use sqlx::migrate::MigrateDatabase;
3225    use std::sync::Arc;
3226
3227    pub struct TestDb {
3228        pub db: Option<Arc<Database>>,
3229        pub connection: Option<sqlx::AnyConnection>,
3230    }
3231
3232    impl TestDb {
3233        pub fn sqlite(background: Arc<Background>) -> Self {
3234            let url = format!("sqlite::memory:");
3235            let runtime = tokio::runtime::Builder::new_current_thread()
3236                .enable_io()
3237                .enable_time()
3238                .build()
3239                .unwrap();
3240
3241            let mut db = runtime.block_on(async {
3242                let mut options = ConnectOptions::new(url);
3243                options.max_connections(5);
3244                let db = Database::new(options).await.unwrap();
3245                let sql = include_str!(concat!(
3246                    env!("CARGO_MANIFEST_DIR"),
3247                    "/migrations.sqlite/20221109000000_test_schema.sql"
3248                ));
3249                db.pool
3250                    .execute(sea_orm::Statement::from_string(
3251                        db.pool.get_database_backend(),
3252                        sql.into(),
3253                    ))
3254                    .await
3255                    .unwrap();
3256                db
3257            });
3258
3259            db.background = Some(background);
3260            db.runtime = Some(runtime);
3261
3262            Self {
3263                db: Some(Arc::new(db)),
3264                connection: None,
3265            }
3266        }
3267
3268        pub fn postgres(background: Arc<Background>) -> Self {
3269            lazy_static! {
3270                static ref LOCK: Mutex<()> = Mutex::new(());
3271            }
3272
3273            let _guard = LOCK.lock();
3274            let mut rng = StdRng::from_entropy();
3275            let url = format!(
3276                "postgres://postgres@localhost/zed-test-{}",
3277                rng.gen::<u128>()
3278            );
3279            let runtime = tokio::runtime::Builder::new_current_thread()
3280                .enable_io()
3281                .enable_time()
3282                .build()
3283                .unwrap();
3284
3285            let mut db = runtime.block_on(async {
3286                sqlx::Postgres::create_database(&url)
3287                    .await
3288                    .expect("failed to create test db");
3289                let mut options = ConnectOptions::new(url);
3290                options
3291                    .max_connections(5)
3292                    .idle_timeout(Duration::from_secs(0));
3293                let db = Database::new(options).await.unwrap();
3294                let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
3295                db.migrate(Path::new(migrations_path), false).await.unwrap();
3296                db
3297            });
3298
3299            db.background = Some(background);
3300            db.runtime = Some(runtime);
3301
3302            Self {
3303                db: Some(Arc::new(db)),
3304                connection: None,
3305            }
3306        }
3307
3308        pub fn db(&self) -> &Arc<Database> {
3309            self.db.as_ref().unwrap()
3310        }
3311    }
3312
3313    impl Drop for TestDb {
3314        fn drop(&mut self) {
3315            let db = self.db.take().unwrap();
3316            if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
3317                db.runtime.as_ref().unwrap().block_on(async {
3318                    use util::ResultExt;
3319                    let query = "
3320                        SELECT pg_terminate_backend(pg_stat_activity.pid)
3321                        FROM pg_stat_activity
3322                        WHERE
3323                            pg_stat_activity.datname = current_database() AND
3324                            pid <> pg_backend_pid();
3325                    ";
3326                    db.pool
3327                        .execute(sea_orm::Statement::from_string(
3328                            db.pool.get_database_backend(),
3329                            query.into(),
3330                        ))
3331                        .await
3332                        .log_err();
3333                    sqlx::Postgres::drop_database(db.options.get_url())
3334                        .await
3335                        .log_err();
3336                })
3337            }
3338        }
3339    }
3340}