db.rs

   1mod access_token;
   2mod contact;
   3mod follower;
   4mod language_server;
   5mod project;
   6mod project_collaborator;
   7mod room;
   8mod room_participant;
   9mod server;
  10mod signup;
  11#[cfg(test)]
  12mod tests;
  13mod user;
  14mod worktree;
  15mod worktree_diagnostic_summary;
  16mod worktree_entry;
  17
  18use crate::{Error, Result};
  19use anyhow::anyhow;
  20use collections::{BTreeMap, HashMap, HashSet};
  21pub use contact::Contact;
  22use dashmap::DashMap;
  23use futures::StreamExt;
  24use hyper::StatusCode;
  25use rpc::{proto, ConnectionId};
  26use sea_orm::Condition;
  27pub use sea_orm::ConnectOptions;
  28use sea_orm::{
  29    entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
  30    DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
  31    Statement, TransactionTrait,
  32};
  33use sea_query::{Alias, Expr, OnConflict, Query};
  34use serde::{Deserialize, Serialize};
  35pub use signup::{Invite, NewSignup, WaitlistSummary};
  36use sqlx::migrate::{Migrate, Migration, MigrationSource};
  37use sqlx::Connection;
  38use std::ops::{Deref, DerefMut};
  39use std::path::Path;
  40use std::time::Duration;
  41use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
  42use tokio::sync::{Mutex, OwnedMutexGuard};
  43pub use user::Model as User;
  44
  45pub struct Database {
  46    options: ConnectOptions,
  47    pool: DatabaseConnection,
  48    rooms: DashMap<RoomId, Arc<Mutex<()>>>,
  49    #[cfg(test)]
  50    background: Option<std::sync::Arc<gpui::executor::Background>>,
  51    #[cfg(test)]
  52    runtime: Option<tokio::runtime::Runtime>,
  53}
  54
  55impl Database {
  56    pub async fn new(options: ConnectOptions) -> Result<Self> {
  57        Ok(Self {
  58            options: options.clone(),
  59            pool: sea_orm::Database::connect(options).await?,
  60            rooms: DashMap::with_capacity(16384),
  61            #[cfg(test)]
  62            background: None,
  63            #[cfg(test)]
  64            runtime: None,
  65        })
  66    }
  67
  68    #[cfg(test)]
  69    pub fn reset(&self) {
  70        self.rooms.clear();
  71    }
  72
  73    pub async fn migrate(
  74        &self,
  75        migrations_path: &Path,
  76        ignore_checksum_mismatch: bool,
  77    ) -> anyhow::Result<Vec<(Migration, Duration)>> {
  78        let migrations = MigrationSource::resolve(migrations_path)
  79            .await
  80            .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
  81
  82        let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
  83
  84        connection.ensure_migrations_table().await?;
  85        let applied_migrations: HashMap<_, _> = connection
  86            .list_applied_migrations()
  87            .await?
  88            .into_iter()
  89            .map(|m| (m.version, m))
  90            .collect();
  91
  92        let mut new_migrations = Vec::new();
  93        for migration in migrations {
  94            match applied_migrations.get(&migration.version) {
  95                Some(applied_migration) => {
  96                    if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
  97                    {
  98                        Err(anyhow!(
  99                            "checksum mismatch for applied migration {}",
 100                            migration.description
 101                        ))?;
 102                    }
 103                }
 104                None => {
 105                    let elapsed = connection.apply(&migration).await?;
 106                    new_migrations.push((migration, elapsed));
 107                }
 108            }
 109        }
 110
 111        Ok(new_migrations)
 112    }
 113
 114    pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
 115        self.transaction(|tx| async move {
 116            let server = server::ActiveModel {
 117                environment: ActiveValue::set(environment.into()),
 118                ..Default::default()
 119            }
 120            .insert(&*tx)
 121            .await?;
 122            Ok(server.id)
 123        })
 124        .await
 125    }
 126
 127    pub async fn stale_room_ids(
 128        &self,
 129        environment: &str,
 130        new_server_id: ServerId,
 131    ) -> Result<Vec<RoomId>> {
 132        self.transaction(|tx| async move {
 133            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 134            enum QueryAs {
 135                RoomId,
 136            }
 137
 138            let stale_server_epochs = self
 139                .stale_server_ids(environment, new_server_id, &tx)
 140                .await?;
 141            Ok(room_participant::Entity::find()
 142                .select_only()
 143                .column(room_participant::Column::RoomId)
 144                .distinct()
 145                .filter(
 146                    room_participant::Column::AnsweringConnectionServerId
 147                        .is_in(stale_server_epochs),
 148                )
 149                .into_values::<_, QueryAs>()
 150                .all(&*tx)
 151                .await?)
 152        })
 153        .await
 154    }
 155
 156    pub async fn refresh_room(
 157        &self,
 158        room_id: RoomId,
 159        new_server_id: ServerId,
 160    ) -> Result<RoomGuard<RefreshedRoom>> {
 161        self.room_transaction(|tx| async move {
 162            let stale_participant_filter = Condition::all()
 163                .add(room_participant::Column::RoomId.eq(room_id))
 164                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
 165                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
 166
 167            let stale_participant_user_ids = room_participant::Entity::find()
 168                .filter(stale_participant_filter.clone())
 169                .all(&*tx)
 170                .await?
 171                .into_iter()
 172                .map(|participant| participant.user_id)
 173                .collect::<Vec<_>>();
 174
 175            // Delete participants who failed to reconnect.
 176            room_participant::Entity::delete_many()
 177                .filter(stale_participant_filter)
 178                .exec(&*tx)
 179                .await?;
 180
 181            let room = self.get_room(room_id, &tx).await?;
 182            let mut canceled_calls_to_user_ids = Vec::new();
 183            // Delete the room if it becomes empty and cancel pending calls.
 184            if room.participants.is_empty() {
 185                canceled_calls_to_user_ids.extend(
 186                    room.pending_participants
 187                        .iter()
 188                        .map(|pending_participant| UserId::from_proto(pending_participant.user_id)),
 189                );
 190                room_participant::Entity::delete_many()
 191                    .filter(room_participant::Column::RoomId.eq(room_id))
 192                    .exec(&*tx)
 193                    .await?;
 194                room::Entity::delete_by_id(room_id).exec(&*tx).await?;
 195            }
 196
 197            Ok((
 198                room_id,
 199                RefreshedRoom {
 200                    room,
 201                    stale_participant_user_ids,
 202                    canceled_calls_to_user_ids,
 203                },
 204            ))
 205        })
 206        .await
 207    }
 208
 209    pub async fn delete_stale_servers(
 210        &self,
 211        environment: &str,
 212        new_server_id: ServerId,
 213    ) -> Result<()> {
 214        self.transaction(|tx| async move {
 215            server::Entity::delete_many()
 216                .filter(
 217                    Condition::all()
 218                        .add(server::Column::Environment.eq(environment))
 219                        .add(server::Column::Id.ne(new_server_id)),
 220                )
 221                .exec(&*tx)
 222                .await?;
 223            Ok(())
 224        })
 225        .await
 226    }
 227
 228    async fn stale_server_ids(
 229        &self,
 230        environment: &str,
 231        new_server_id: ServerId,
 232        tx: &DatabaseTransaction,
 233    ) -> Result<Vec<ServerId>> {
 234        let stale_servers = server::Entity::find()
 235            .filter(
 236                Condition::all()
 237                    .add(server::Column::Environment.eq(environment))
 238                    .add(server::Column::Id.ne(new_server_id)),
 239            )
 240            .all(&*tx)
 241            .await?;
 242        Ok(stale_servers.into_iter().map(|server| server.id).collect())
 243    }
 244
 245    // users
 246
 247    pub async fn create_user(
 248        &self,
 249        email_address: &str,
 250        admin: bool,
 251        params: NewUserParams,
 252    ) -> Result<NewUserResult> {
 253        self.transaction(|tx| async {
 254            let tx = tx;
 255            let user = user::Entity::insert(user::ActiveModel {
 256                email_address: ActiveValue::set(Some(email_address.into())),
 257                github_login: ActiveValue::set(params.github_login.clone()),
 258                github_user_id: ActiveValue::set(Some(params.github_user_id)),
 259                admin: ActiveValue::set(admin),
 260                metrics_id: ActiveValue::set(Uuid::new_v4()),
 261                ..Default::default()
 262            })
 263            .on_conflict(
 264                OnConflict::column(user::Column::GithubLogin)
 265                    .update_column(user::Column::GithubLogin)
 266                    .to_owned(),
 267            )
 268            .exec_with_returning(&*tx)
 269            .await?;
 270
 271            Ok(NewUserResult {
 272                user_id: user.id,
 273                metrics_id: user.metrics_id.to_string(),
 274                signup_device_id: None,
 275                inviting_user_id: None,
 276            })
 277        })
 278        .await
 279    }
 280
 281    pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
 282        self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
 283            .await
 284    }
 285
 286    pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
 287        self.transaction(|tx| async {
 288            let tx = tx;
 289            Ok(user::Entity::find()
 290                .filter(user::Column::Id.is_in(ids.iter().copied()))
 291                .all(&*tx)
 292                .await?)
 293        })
 294        .await
 295    }
 296
 297    pub async fn get_user_by_github_account(
 298        &self,
 299        github_login: &str,
 300        github_user_id: Option<i32>,
 301    ) -> Result<Option<User>> {
 302        self.transaction(|tx| async move {
 303            let tx = &*tx;
 304            if let Some(github_user_id) = github_user_id {
 305                if let Some(user_by_github_user_id) = user::Entity::find()
 306                    .filter(user::Column::GithubUserId.eq(github_user_id))
 307                    .one(tx)
 308                    .await?
 309                {
 310                    let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
 311                    user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
 312                    Ok(Some(user_by_github_user_id.update(tx).await?))
 313                } else if let Some(user_by_github_login) = user::Entity::find()
 314                    .filter(user::Column::GithubLogin.eq(github_login))
 315                    .one(tx)
 316                    .await?
 317                {
 318                    let mut user_by_github_login = user_by_github_login.into_active_model();
 319                    user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
 320                    Ok(Some(user_by_github_login.update(tx).await?))
 321                } else {
 322                    Ok(None)
 323                }
 324            } else {
 325                Ok(user::Entity::find()
 326                    .filter(user::Column::GithubLogin.eq(github_login))
 327                    .one(tx)
 328                    .await?)
 329            }
 330        })
 331        .await
 332    }
 333
 334    pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
 335        self.transaction(|tx| async move {
 336            Ok(user::Entity::find()
 337                .order_by_asc(user::Column::GithubLogin)
 338                .limit(limit as u64)
 339                .offset(page as u64 * limit as u64)
 340                .all(&*tx)
 341                .await?)
 342        })
 343        .await
 344    }
 345
 346    pub async fn get_users_with_no_invites(
 347        &self,
 348        invited_by_another_user: bool,
 349    ) -> Result<Vec<User>> {
 350        self.transaction(|tx| async move {
 351            Ok(user::Entity::find()
 352                .filter(
 353                    user::Column::InviteCount
 354                        .eq(0)
 355                        .and(if invited_by_another_user {
 356                            user::Column::InviterId.is_not_null()
 357                        } else {
 358                            user::Column::InviterId.is_null()
 359                        }),
 360                )
 361                .all(&*tx)
 362                .await?)
 363        })
 364        .await
 365    }
 366
 367    pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
 368        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 369        enum QueryAs {
 370            MetricsId,
 371        }
 372
 373        self.transaction(|tx| async move {
 374            let metrics_id: Uuid = user::Entity::find_by_id(id)
 375                .select_only()
 376                .column(user::Column::MetricsId)
 377                .into_values::<_, QueryAs>()
 378                .one(&*tx)
 379                .await?
 380                .ok_or_else(|| anyhow!("could not find user"))?;
 381            Ok(metrics_id.to_string())
 382        })
 383        .await
 384    }
 385
 386    pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
 387        self.transaction(|tx| async move {
 388            user::Entity::update_many()
 389                .filter(user::Column::Id.eq(id))
 390                .set(user::ActiveModel {
 391                    admin: ActiveValue::set(is_admin),
 392                    ..Default::default()
 393                })
 394                .exec(&*tx)
 395                .await?;
 396            Ok(())
 397        })
 398        .await
 399    }
 400
 401    pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
 402        self.transaction(|tx| async move {
 403            user::Entity::update_many()
 404                .filter(user::Column::Id.eq(id))
 405                .set(user::ActiveModel {
 406                    connected_once: ActiveValue::set(connected_once),
 407                    ..Default::default()
 408                })
 409                .exec(&*tx)
 410                .await?;
 411            Ok(())
 412        })
 413        .await
 414    }
 415
 416    pub async fn destroy_user(&self, id: UserId) -> Result<()> {
 417        self.transaction(|tx| async move {
 418            access_token::Entity::delete_many()
 419                .filter(access_token::Column::UserId.eq(id))
 420                .exec(&*tx)
 421                .await?;
 422            user::Entity::delete_by_id(id).exec(&*tx).await?;
 423            Ok(())
 424        })
 425        .await
 426    }
 427
 428    // contacts
 429
 430    pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
 431        #[derive(Debug, FromQueryResult)]
 432        struct ContactWithUserBusyStatuses {
 433            user_id_a: UserId,
 434            user_id_b: UserId,
 435            a_to_b: bool,
 436            accepted: bool,
 437            should_notify: bool,
 438            user_a_busy: bool,
 439            user_b_busy: bool,
 440        }
 441
 442        self.transaction(|tx| async move {
 443            let user_a_participant = Alias::new("user_a_participant");
 444            let user_b_participant = Alias::new("user_b_participant");
 445            let mut db_contacts = contact::Entity::find()
 446                .column_as(
 447                    Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
 448                        .is_not_null(),
 449                    "user_a_busy",
 450                )
 451                .column_as(
 452                    Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
 453                        .is_not_null(),
 454                    "user_b_busy",
 455                )
 456                .filter(
 457                    contact::Column::UserIdA
 458                        .eq(user_id)
 459                        .or(contact::Column::UserIdB.eq(user_id)),
 460                )
 461                .join_as(
 462                    JoinType::LeftJoin,
 463                    contact::Relation::UserARoomParticipant.def(),
 464                    user_a_participant,
 465                )
 466                .join_as(
 467                    JoinType::LeftJoin,
 468                    contact::Relation::UserBRoomParticipant.def(),
 469                    user_b_participant,
 470                )
 471                .into_model::<ContactWithUserBusyStatuses>()
 472                .stream(&*tx)
 473                .await?;
 474
 475            let mut contacts = Vec::new();
 476            while let Some(db_contact) = db_contacts.next().await {
 477                let db_contact = db_contact?;
 478                if db_contact.user_id_a == user_id {
 479                    if db_contact.accepted {
 480                        contacts.push(Contact::Accepted {
 481                            user_id: db_contact.user_id_b,
 482                            should_notify: db_contact.should_notify && db_contact.a_to_b,
 483                            busy: db_contact.user_b_busy,
 484                        });
 485                    } else if db_contact.a_to_b {
 486                        contacts.push(Contact::Outgoing {
 487                            user_id: db_contact.user_id_b,
 488                        })
 489                    } else {
 490                        contacts.push(Contact::Incoming {
 491                            user_id: db_contact.user_id_b,
 492                            should_notify: db_contact.should_notify,
 493                        });
 494                    }
 495                } else if db_contact.accepted {
 496                    contacts.push(Contact::Accepted {
 497                        user_id: db_contact.user_id_a,
 498                        should_notify: db_contact.should_notify && !db_contact.a_to_b,
 499                        busy: db_contact.user_a_busy,
 500                    });
 501                } else if db_contact.a_to_b {
 502                    contacts.push(Contact::Incoming {
 503                        user_id: db_contact.user_id_a,
 504                        should_notify: db_contact.should_notify,
 505                    });
 506                } else {
 507                    contacts.push(Contact::Outgoing {
 508                        user_id: db_contact.user_id_a,
 509                    });
 510                }
 511            }
 512
 513            contacts.sort_unstable_by_key(|contact| contact.user_id());
 514
 515            Ok(contacts)
 516        })
 517        .await
 518    }
 519
 520    pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
 521        self.transaction(|tx| async move {
 522            let participant = room_participant::Entity::find()
 523                .filter(room_participant::Column::UserId.eq(user_id))
 524                .one(&*tx)
 525                .await?;
 526            Ok(participant.is_some())
 527        })
 528        .await
 529    }
 530
 531    pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
 532        self.transaction(|tx| async move {
 533            let (id_a, id_b) = if user_id_1 < user_id_2 {
 534                (user_id_1, user_id_2)
 535            } else {
 536                (user_id_2, user_id_1)
 537            };
 538
 539            Ok(contact::Entity::find()
 540                .filter(
 541                    contact::Column::UserIdA
 542                        .eq(id_a)
 543                        .and(contact::Column::UserIdB.eq(id_b))
 544                        .and(contact::Column::Accepted.eq(true)),
 545                )
 546                .one(&*tx)
 547                .await?
 548                .is_some())
 549        })
 550        .await
 551    }
 552
 553    pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
 554        self.transaction(|tx| async move {
 555            let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
 556                (sender_id, receiver_id, true)
 557            } else {
 558                (receiver_id, sender_id, false)
 559            };
 560
 561            let rows_affected = contact::Entity::insert(contact::ActiveModel {
 562                user_id_a: ActiveValue::set(id_a),
 563                user_id_b: ActiveValue::set(id_b),
 564                a_to_b: ActiveValue::set(a_to_b),
 565                accepted: ActiveValue::set(false),
 566                should_notify: ActiveValue::set(true),
 567                ..Default::default()
 568            })
 569            .on_conflict(
 570                OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
 571                    .values([
 572                        (contact::Column::Accepted, true.into()),
 573                        (contact::Column::ShouldNotify, false.into()),
 574                    ])
 575                    .action_and_where(
 576                        contact::Column::Accepted.eq(false).and(
 577                            contact::Column::AToB
 578                                .eq(a_to_b)
 579                                .and(contact::Column::UserIdA.eq(id_b))
 580                                .or(contact::Column::AToB
 581                                    .ne(a_to_b)
 582                                    .and(contact::Column::UserIdA.eq(id_a))),
 583                        ),
 584                    )
 585                    .to_owned(),
 586            )
 587            .exec_without_returning(&*tx)
 588            .await?;
 589
 590            if rows_affected == 1 {
 591                Ok(())
 592            } else {
 593                Err(anyhow!("contact already requested"))?
 594            }
 595        })
 596        .await
 597    }
 598
 599    /// Returns a bool indicating whether the removed contact had originally accepted or not
 600    ///
 601    /// Deletes the contact identified by the requester and responder ids, and then returns
 602    /// whether the deleted contact had originally accepted or was a pending contact request.
 603    ///
 604    /// # Arguments
 605    ///
 606    /// * `requester_id` - The user that initiates this request
 607    /// * `responder_id` - The user that will be removed
 608    pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<bool> {
 609        self.transaction(|tx| async move {
 610            let (id_a, id_b) = if responder_id < requester_id {
 611                (responder_id, requester_id)
 612            } else {
 613                (requester_id, responder_id)
 614            };
 615
 616            let contact = contact::Entity::find()
 617                .filter(
 618                    contact::Column::UserIdA
 619                        .eq(id_a)
 620                        .and(contact::Column::UserIdB.eq(id_b)),
 621                )
 622                .one(&*tx)
 623                .await?
 624                .ok_or_else(|| anyhow!("no such contact"))?;
 625
 626            contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
 627            Ok(contact.accepted)
 628        })
 629        .await
 630    }
 631
 632    pub async fn dismiss_contact_notification(
 633        &self,
 634        user_id: UserId,
 635        contact_user_id: UserId,
 636    ) -> Result<()> {
 637        self.transaction(|tx| async move {
 638            let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
 639                (user_id, contact_user_id, true)
 640            } else {
 641                (contact_user_id, user_id, false)
 642            };
 643
 644            let result = contact::Entity::update_many()
 645                .set(contact::ActiveModel {
 646                    should_notify: ActiveValue::set(false),
 647                    ..Default::default()
 648                })
 649                .filter(
 650                    contact::Column::UserIdA
 651                        .eq(id_a)
 652                        .and(contact::Column::UserIdB.eq(id_b))
 653                        .and(
 654                            contact::Column::AToB
 655                                .eq(a_to_b)
 656                                .and(contact::Column::Accepted.eq(true))
 657                                .or(contact::Column::AToB
 658                                    .ne(a_to_b)
 659                                    .and(contact::Column::Accepted.eq(false))),
 660                        ),
 661                )
 662                .exec(&*tx)
 663                .await?;
 664            if result.rows_affected == 0 {
 665                Err(anyhow!("no such contact request"))?
 666            } else {
 667                Ok(())
 668            }
 669        })
 670        .await
 671    }
 672
 673    pub async fn respond_to_contact_request(
 674        &self,
 675        responder_id: UserId,
 676        requester_id: UserId,
 677        accept: bool,
 678    ) -> Result<()> {
 679        self.transaction(|tx| async move {
 680            let (id_a, id_b, a_to_b) = if responder_id < requester_id {
 681                (responder_id, requester_id, false)
 682            } else {
 683                (requester_id, responder_id, true)
 684            };
 685            let rows_affected = if accept {
 686                let result = contact::Entity::update_many()
 687                    .set(contact::ActiveModel {
 688                        accepted: ActiveValue::set(true),
 689                        should_notify: ActiveValue::set(true),
 690                        ..Default::default()
 691                    })
 692                    .filter(
 693                        contact::Column::UserIdA
 694                            .eq(id_a)
 695                            .and(contact::Column::UserIdB.eq(id_b))
 696                            .and(contact::Column::AToB.eq(a_to_b)),
 697                    )
 698                    .exec(&*tx)
 699                    .await?;
 700                result.rows_affected
 701            } else {
 702                let result = contact::Entity::delete_many()
 703                    .filter(
 704                        contact::Column::UserIdA
 705                            .eq(id_a)
 706                            .and(contact::Column::UserIdB.eq(id_b))
 707                            .and(contact::Column::AToB.eq(a_to_b))
 708                            .and(contact::Column::Accepted.eq(false)),
 709                    )
 710                    .exec(&*tx)
 711                    .await?;
 712
 713                result.rows_affected
 714            };
 715
 716            if rows_affected == 1 {
 717                Ok(())
 718            } else {
 719                Err(anyhow!("no such contact request"))?
 720            }
 721        })
 722        .await
 723    }
 724
 725    pub fn fuzzy_like_string(string: &str) -> String {
 726        let mut result = String::with_capacity(string.len() * 2 + 1);
 727        for c in string.chars() {
 728            if c.is_alphanumeric() {
 729                result.push('%');
 730                result.push(c);
 731            }
 732        }
 733        result.push('%');
 734        result
 735    }
 736
 737    pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
 738        self.transaction(|tx| async {
 739            let tx = tx;
 740            let like_string = Self::fuzzy_like_string(name_query);
 741            let query = "
 742                SELECT users.*
 743                FROM users
 744                WHERE github_login ILIKE $1
 745                ORDER BY github_login <-> $2
 746                LIMIT $3
 747            ";
 748
 749            Ok(user::Entity::find()
 750                .from_raw_sql(Statement::from_sql_and_values(
 751                    self.pool.get_database_backend(),
 752                    query.into(),
 753                    vec![like_string.into(), name_query.into(), limit.into()],
 754                ))
 755                .all(&*tx)
 756                .await?)
 757        })
 758        .await
 759    }
 760
 761    // signups
 762
 763    pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
 764        self.transaction(|tx| async move {
 765            signup::Entity::insert(signup::ActiveModel {
 766                email_address: ActiveValue::set(signup.email_address.clone()),
 767                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 768                email_confirmation_sent: ActiveValue::set(false),
 769                platform_mac: ActiveValue::set(signup.platform_mac),
 770                platform_windows: ActiveValue::set(signup.platform_windows),
 771                platform_linux: ActiveValue::set(signup.platform_linux),
 772                platform_unknown: ActiveValue::set(false),
 773                editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
 774                programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
 775                device_id: ActiveValue::set(signup.device_id.clone()),
 776                added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
 777                ..Default::default()
 778            })
 779            .on_conflict(
 780                OnConflict::column(signup::Column::EmailAddress)
 781                    .update_columns([
 782                        signup::Column::PlatformMac,
 783                        signup::Column::PlatformWindows,
 784                        signup::Column::PlatformLinux,
 785                        signup::Column::EditorFeatures,
 786                        signup::Column::ProgrammingLanguages,
 787                        signup::Column::DeviceId,
 788                        signup::Column::AddedToMailingList,
 789                    ])
 790                    .to_owned(),
 791            )
 792            .exec(&*tx)
 793            .await?;
 794            Ok(())
 795        })
 796        .await
 797    }
 798
 799    pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
 800        self.transaction(|tx| async move {
 801            let signup = signup::Entity::find()
 802                .filter(signup::Column::EmailAddress.eq(email_address))
 803                .one(&*tx)
 804                .await?
 805                .ok_or_else(|| {
 806                    anyhow!("signup with email address {} doesn't exist", email_address)
 807                })?;
 808
 809            Ok(signup)
 810        })
 811        .await
 812    }
 813
 814    pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
 815        self.transaction(|tx| async move {
 816            let query = "
 817                SELECT
 818                    COUNT(*) as count,
 819                    COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
 820                    COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
 821                    COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
 822                    COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
 823                FROM (
 824                    SELECT *
 825                    FROM signups
 826                    WHERE
 827                        NOT email_confirmation_sent
 828                ) AS unsent
 829            ";
 830            Ok(
 831                WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
 832                    self.pool.get_database_backend(),
 833                    query.into(),
 834                    vec![],
 835                ))
 836                .one(&*tx)
 837                .await?
 838                .ok_or_else(|| anyhow!("invalid result"))?,
 839            )
 840        })
 841        .await
 842    }
 843
 844    pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
 845        let emails = invites
 846            .iter()
 847            .map(|s| s.email_address.as_str())
 848            .collect::<Vec<_>>();
 849        self.transaction(|tx| async {
 850            let tx = tx;
 851            signup::Entity::update_many()
 852                .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
 853                .set(signup::ActiveModel {
 854                    email_confirmation_sent: ActiveValue::set(true),
 855                    ..Default::default()
 856                })
 857                .exec(&*tx)
 858                .await?;
 859            Ok(())
 860        })
 861        .await
 862    }
 863
 864    pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
 865        self.transaction(|tx| async move {
 866            Ok(signup::Entity::find()
 867                .select_only()
 868                .column(signup::Column::EmailAddress)
 869                .column(signup::Column::EmailConfirmationCode)
 870                .filter(
 871                    signup::Column::EmailConfirmationSent.eq(false).and(
 872                        signup::Column::PlatformMac
 873                            .eq(true)
 874                            .or(signup::Column::PlatformUnknown.eq(true)),
 875                    ),
 876                )
 877                .order_by_asc(signup::Column::CreatedAt)
 878                .limit(count as u64)
 879                .into_model()
 880                .all(&*tx)
 881                .await?)
 882        })
 883        .await
 884    }
 885
 886    // invite codes
 887
 888    pub async fn create_invite_from_code(
 889        &self,
 890        code: &str,
 891        email_address: &str,
 892        device_id: Option<&str>,
 893        added_to_mailing_list: bool,
 894    ) -> Result<Invite> {
 895        self.transaction(|tx| async move {
 896            let existing_user = user::Entity::find()
 897                .filter(user::Column::EmailAddress.eq(email_address))
 898                .one(&*tx)
 899                .await?;
 900
 901            if existing_user.is_some() {
 902                Err(anyhow!("email address is already in use"))?;
 903            }
 904
 905            let inviting_user_with_invites = match user::Entity::find()
 906                .filter(
 907                    user::Column::InviteCode
 908                        .eq(code)
 909                        .and(user::Column::InviteCount.gt(0)),
 910                )
 911                .one(&*tx)
 912                .await?
 913            {
 914                Some(inviting_user) => inviting_user,
 915                None => {
 916                    return Err(Error::Http(
 917                        StatusCode::UNAUTHORIZED,
 918                        "unable to find an invite code with invites remaining".to_string(),
 919                    ))?
 920                }
 921            };
 922            user::Entity::update_many()
 923                .filter(
 924                    user::Column::Id
 925                        .eq(inviting_user_with_invites.id)
 926                        .and(user::Column::InviteCount.gt(0)),
 927                )
 928                .col_expr(
 929                    user::Column::InviteCount,
 930                    Expr::col(user::Column::InviteCount).sub(1),
 931                )
 932                .exec(&*tx)
 933                .await?;
 934
 935            let signup = signup::Entity::insert(signup::ActiveModel {
 936                email_address: ActiveValue::set(email_address.into()),
 937                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 938                email_confirmation_sent: ActiveValue::set(false),
 939                inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
 940                platform_linux: ActiveValue::set(false),
 941                platform_mac: ActiveValue::set(false),
 942                platform_windows: ActiveValue::set(false),
 943                platform_unknown: ActiveValue::set(true),
 944                device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
 945                added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
 946                ..Default::default()
 947            })
 948            .on_conflict(
 949                OnConflict::column(signup::Column::EmailAddress)
 950                    .update_column(signup::Column::InvitingUserId)
 951                    .to_owned(),
 952            )
 953            .exec_with_returning(&*tx)
 954            .await?;
 955
 956            Ok(Invite {
 957                email_address: signup.email_address,
 958                email_confirmation_code: signup.email_confirmation_code,
 959            })
 960        })
 961        .await
 962    }
 963
 964    pub async fn create_user_from_invite(
 965        &self,
 966        invite: &Invite,
 967        user: NewUserParams,
 968    ) -> Result<Option<NewUserResult>> {
 969        self.transaction(|tx| async {
 970            let tx = tx;
 971            let signup = signup::Entity::find()
 972                .filter(
 973                    signup::Column::EmailAddress
 974                        .eq(invite.email_address.as_str())
 975                        .and(
 976                            signup::Column::EmailConfirmationCode
 977                                .eq(invite.email_confirmation_code.as_str()),
 978                        ),
 979                )
 980                .one(&*tx)
 981                .await?
 982                .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
 983
 984            if signup.user_id.is_some() {
 985                return Ok(None);
 986            }
 987
 988            let user = user::Entity::insert(user::ActiveModel {
 989                email_address: ActiveValue::set(Some(invite.email_address.clone())),
 990                github_login: ActiveValue::set(user.github_login.clone()),
 991                github_user_id: ActiveValue::set(Some(user.github_user_id)),
 992                admin: ActiveValue::set(false),
 993                invite_count: ActiveValue::set(user.invite_count),
 994                invite_code: ActiveValue::set(Some(random_invite_code())),
 995                metrics_id: ActiveValue::set(Uuid::new_v4()),
 996                ..Default::default()
 997            })
 998            .on_conflict(
 999                OnConflict::column(user::Column::GithubLogin)
1000                    .update_columns([
1001                        user::Column::EmailAddress,
1002                        user::Column::GithubUserId,
1003                        user::Column::Admin,
1004                    ])
1005                    .to_owned(),
1006            )
1007            .exec_with_returning(&*tx)
1008            .await?;
1009
1010            let mut signup = signup.into_active_model();
1011            signup.user_id = ActiveValue::set(Some(user.id));
1012            let signup = signup.update(&*tx).await?;
1013
1014            if let Some(inviting_user_id) = signup.inviting_user_id {
1015                let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1016                    (inviting_user_id, user.id, true)
1017                } else {
1018                    (user.id, inviting_user_id, false)
1019                };
1020
1021                contact::Entity::insert(contact::ActiveModel {
1022                    user_id_a: ActiveValue::set(user_id_a),
1023                    user_id_b: ActiveValue::set(user_id_b),
1024                    a_to_b: ActiveValue::set(a_to_b),
1025                    should_notify: ActiveValue::set(true),
1026                    accepted: ActiveValue::set(true),
1027                    ..Default::default()
1028                })
1029                .on_conflict(OnConflict::new().do_nothing().to_owned())
1030                .exec_without_returning(&*tx)
1031                .await?;
1032            }
1033
1034            Ok(Some(NewUserResult {
1035                user_id: user.id,
1036                metrics_id: user.metrics_id.to_string(),
1037                inviting_user_id: signup.inviting_user_id,
1038                signup_device_id: signup.device_id,
1039            }))
1040        })
1041        .await
1042    }
1043
1044    pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1045        self.transaction(|tx| async move {
1046            if count > 0 {
1047                user::Entity::update_many()
1048                    .filter(
1049                        user::Column::Id
1050                            .eq(id)
1051                            .and(user::Column::InviteCode.is_null()),
1052                    )
1053                    .set(user::ActiveModel {
1054                        invite_code: ActiveValue::set(Some(random_invite_code())),
1055                        ..Default::default()
1056                    })
1057                    .exec(&*tx)
1058                    .await?;
1059            }
1060
1061            user::Entity::update_many()
1062                .filter(user::Column::Id.eq(id))
1063                .set(user::ActiveModel {
1064                    invite_count: ActiveValue::set(count),
1065                    ..Default::default()
1066                })
1067                .exec(&*tx)
1068                .await?;
1069            Ok(())
1070        })
1071        .await
1072    }
1073
1074    pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1075        self.transaction(|tx| async move {
1076            match user::Entity::find_by_id(id).one(&*tx).await? {
1077                Some(user) if user.invite_code.is_some() => {
1078                    Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1079                }
1080                _ => Ok(None),
1081            }
1082        })
1083        .await
1084    }
1085
1086    pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1087        self.transaction(|tx| async move {
1088            user::Entity::find()
1089                .filter(user::Column::InviteCode.eq(code))
1090                .one(&*tx)
1091                .await?
1092                .ok_or_else(|| {
1093                    Error::Http(
1094                        StatusCode::NOT_FOUND,
1095                        "that invite code does not exist".to_string(),
1096                    )
1097                })
1098        })
1099        .await
1100    }
1101
1102    // rooms
1103
1104    pub async fn incoming_call_for_user(
1105        &self,
1106        user_id: UserId,
1107    ) -> Result<Option<proto::IncomingCall>> {
1108        self.transaction(|tx| async move {
1109            let pending_participant = room_participant::Entity::find()
1110                .filter(
1111                    room_participant::Column::UserId
1112                        .eq(user_id)
1113                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1114                )
1115                .one(&*tx)
1116                .await?;
1117
1118            if let Some(pending_participant) = pending_participant {
1119                let room = self.get_room(pending_participant.room_id, &tx).await?;
1120                Ok(Self::build_incoming_call(&room, user_id))
1121            } else {
1122                Ok(None)
1123            }
1124        })
1125        .await
1126    }
1127
1128    pub async fn create_room(
1129        &self,
1130        user_id: UserId,
1131        connection: ConnectionId,
1132        live_kit_room: &str,
1133    ) -> Result<RoomGuard<proto::Room>> {
1134        self.room_transaction(|tx| async move {
1135            let room = room::ActiveModel {
1136                live_kit_room: ActiveValue::set(live_kit_room.into()),
1137                ..Default::default()
1138            }
1139            .insert(&*tx)
1140            .await?;
1141            let room_id = room.id;
1142
1143            room_participant::ActiveModel {
1144                room_id: ActiveValue::set(room_id),
1145                user_id: ActiveValue::set(user_id),
1146                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1147                answering_connection_server_id: ActiveValue::set(Some(ServerId(
1148                    connection.owner_id as i32,
1149                ))),
1150                answering_connection_lost: ActiveValue::set(false),
1151                calling_user_id: ActiveValue::set(user_id),
1152                calling_connection_id: ActiveValue::set(connection.id as i32),
1153                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1154                    connection.owner_id as i32,
1155                ))),
1156                ..Default::default()
1157            }
1158            .insert(&*tx)
1159            .await?;
1160
1161            let room = self.get_room(room_id, &tx).await?;
1162            Ok((room_id, room))
1163        })
1164        .await
1165    }
1166
1167    pub async fn call(
1168        &self,
1169        room_id: RoomId,
1170        calling_user_id: UserId,
1171        calling_connection: ConnectionId,
1172        called_user_id: UserId,
1173        initial_project_id: Option<ProjectId>,
1174    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1175        self.room_transaction(|tx| async move {
1176            room_participant::ActiveModel {
1177                room_id: ActiveValue::set(room_id),
1178                user_id: ActiveValue::set(called_user_id),
1179                answering_connection_lost: ActiveValue::set(false),
1180                calling_user_id: ActiveValue::set(calling_user_id),
1181                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1182                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1183                    calling_connection.owner_id as i32,
1184                ))),
1185                initial_project_id: ActiveValue::set(initial_project_id),
1186                ..Default::default()
1187            }
1188            .insert(&*tx)
1189            .await?;
1190
1191            let room = self.get_room(room_id, &tx).await?;
1192            let incoming_call = Self::build_incoming_call(&room, called_user_id)
1193                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1194            Ok((room_id, (room, incoming_call)))
1195        })
1196        .await
1197    }
1198
1199    pub async fn call_failed(
1200        &self,
1201        room_id: RoomId,
1202        called_user_id: UserId,
1203    ) -> Result<RoomGuard<proto::Room>> {
1204        self.room_transaction(|tx| async move {
1205            room_participant::Entity::delete_many()
1206                .filter(
1207                    room_participant::Column::RoomId
1208                        .eq(room_id)
1209                        .and(room_participant::Column::UserId.eq(called_user_id)),
1210                )
1211                .exec(&*tx)
1212                .await?;
1213            let room = self.get_room(room_id, &tx).await?;
1214            Ok((room_id, room))
1215        })
1216        .await
1217    }
1218
1219    pub async fn decline_call(
1220        &self,
1221        expected_room_id: Option<RoomId>,
1222        user_id: UserId,
1223    ) -> Result<Option<RoomGuard<proto::Room>>> {
1224        self.optional_room_transaction(|tx| async move {
1225            let mut filter = Condition::all()
1226                .add(room_participant::Column::UserId.eq(user_id))
1227                .add(room_participant::Column::AnsweringConnectionId.is_null());
1228            if let Some(room_id) = expected_room_id {
1229                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1230            }
1231            let participant = room_participant::Entity::find()
1232                .filter(filter)
1233                .one(&*tx)
1234                .await?;
1235
1236            let participant = if let Some(participant) = participant {
1237                participant
1238            } else if expected_room_id.is_some() {
1239                return Err(anyhow!("could not find call to decline"))?;
1240            } else {
1241                return Ok(None);
1242            };
1243
1244            let room_id = participant.room_id;
1245            room_participant::Entity::delete(participant.into_active_model())
1246                .exec(&*tx)
1247                .await?;
1248
1249            let room = self.get_room(room_id, &tx).await?;
1250            Ok(Some((room_id, room)))
1251        })
1252        .await
1253    }
1254
1255    pub async fn cancel_call(
1256        &self,
1257        room_id: RoomId,
1258        calling_connection: ConnectionId,
1259        called_user_id: UserId,
1260    ) -> Result<RoomGuard<proto::Room>> {
1261        self.room_transaction(|tx| async move {
1262            let participant = room_participant::Entity::find()
1263                .filter(
1264                    Condition::all()
1265                        .add(room_participant::Column::UserId.eq(called_user_id))
1266                        .add(room_participant::Column::RoomId.eq(room_id))
1267                        .add(
1268                            room_participant::Column::CallingConnectionId
1269                                .eq(calling_connection.id as i32),
1270                        )
1271                        .add(
1272                            room_participant::Column::CallingConnectionServerId
1273                                .eq(calling_connection.owner_id as i32),
1274                        )
1275                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1276                )
1277                .one(&*tx)
1278                .await?
1279                .ok_or_else(|| anyhow!("no call to cancel"))?;
1280            let room_id = participant.room_id;
1281
1282            room_participant::Entity::delete(participant.into_active_model())
1283                .exec(&*tx)
1284                .await?;
1285
1286            let room = self.get_room(room_id, &tx).await?;
1287            Ok((room_id, room))
1288        })
1289        .await
1290    }
1291
1292    pub async fn join_room(
1293        &self,
1294        room_id: RoomId,
1295        user_id: UserId,
1296        connection: ConnectionId,
1297    ) -> Result<RoomGuard<proto::Room>> {
1298        self.room_transaction(|tx| async move {
1299            let result = room_participant::Entity::update_many()
1300                .filter(
1301                    Condition::all()
1302                        .add(room_participant::Column::RoomId.eq(room_id))
1303                        .add(room_participant::Column::UserId.eq(user_id))
1304                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1305                )
1306                .set(room_participant::ActiveModel {
1307                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1308                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1309                        connection.owner_id as i32,
1310                    ))),
1311                    answering_connection_lost: ActiveValue::set(false),
1312                    ..Default::default()
1313                })
1314                .exec(&*tx)
1315                .await?;
1316            if result.rows_affected == 0 {
1317                Err(anyhow!("room does not exist or was already joined"))?
1318            } else {
1319                let room = self.get_room(room_id, &tx).await?;
1320                Ok((room_id, room))
1321            }
1322        })
1323        .await
1324    }
1325
1326    pub async fn rejoin_room(
1327        &self,
1328        rejoin_room: proto::RejoinRoom,
1329        user_id: UserId,
1330        connection: ConnectionId,
1331    ) -> Result<RoomGuard<RejoinedRoom>> {
1332        self.room_transaction(|tx| async {
1333            let tx = tx;
1334            let room_id = RoomId::from_proto(rejoin_room.id);
1335            let participant_update = room_participant::Entity::update_many()
1336                .filter(
1337                    Condition::all()
1338                        .add(room_participant::Column::RoomId.eq(room_id))
1339                        .add(room_participant::Column::UserId.eq(user_id))
1340                        .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1341                        .add(
1342                            Condition::any()
1343                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1344                                .add(
1345                                    room_participant::Column::AnsweringConnectionServerId
1346                                        .ne(connection.owner_id as i32),
1347                                ),
1348                        ),
1349                )
1350                .set(room_participant::ActiveModel {
1351                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1352                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1353                        connection.owner_id as i32,
1354                    ))),
1355                    answering_connection_lost: ActiveValue::set(false),
1356                    ..Default::default()
1357                })
1358                .exec(&*tx)
1359                .await?;
1360            if participant_update.rows_affected == 0 {
1361                return Err(anyhow!("room does not exist or was already joined"))?;
1362            }
1363
1364            let mut reshared_projects = Vec::new();
1365            for reshared_project in &rejoin_room.reshared_projects {
1366                let project_id = ProjectId::from_proto(reshared_project.project_id);
1367                let project = project::Entity::find_by_id(project_id)
1368                    .one(&*tx)
1369                    .await?
1370                    .ok_or_else(|| anyhow!("project does not exist"))?;
1371                if project.host_user_id != user_id {
1372                    return Err(anyhow!("no such project"))?;
1373                }
1374
1375                let mut collaborators = project
1376                    .find_related(project_collaborator::Entity)
1377                    .all(&*tx)
1378                    .await?;
1379                let host_ix = collaborators
1380                    .iter()
1381                    .position(|collaborator| {
1382                        collaborator.user_id == user_id && collaborator.is_host
1383                    })
1384                    .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1385                let host = collaborators.swap_remove(host_ix);
1386                let old_connection_id = host.connection();
1387
1388                project::Entity::update(project::ActiveModel {
1389                    host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1390                    host_connection_server_id: ActiveValue::set(Some(ServerId(
1391                        connection.owner_id as i32,
1392                    ))),
1393                    ..project.into_active_model()
1394                })
1395                .exec(&*tx)
1396                .await?;
1397                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1398                    connection_id: ActiveValue::set(connection.id as i32),
1399                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1400                    ..host.into_active_model()
1401                })
1402                .exec(&*tx)
1403                .await?;
1404
1405                self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1406                    .await?;
1407
1408                reshared_projects.push(ResharedProject {
1409                    id: project_id,
1410                    old_connection_id,
1411                    collaborators: collaborators
1412                        .iter()
1413                        .map(|collaborator| ProjectCollaborator {
1414                            connection_id: collaborator.connection(),
1415                            user_id: collaborator.user_id,
1416                            replica_id: collaborator.replica_id,
1417                            is_host: collaborator.is_host,
1418                        })
1419                        .collect(),
1420                    worktrees: reshared_project.worktrees.clone(),
1421                });
1422            }
1423
1424            project::Entity::delete_many()
1425                .filter(
1426                    Condition::all()
1427                        .add(project::Column::RoomId.eq(room_id))
1428                        .add(project::Column::HostUserId.eq(user_id))
1429                        .add(
1430                            project::Column::Id
1431                                .is_not_in(reshared_projects.iter().map(|project| project.id)),
1432                        ),
1433                )
1434                .exec(&*tx)
1435                .await?;
1436
1437            let mut rejoined_projects = Vec::new();
1438            for rejoined_project in &rejoin_room.rejoined_projects {
1439                let project_id = ProjectId::from_proto(rejoined_project.id);
1440                let Some(project) = project::Entity::find_by_id(project_id)
1441                    .one(&*tx)
1442                    .await? else { continue };
1443
1444                let mut worktrees = Vec::new();
1445                let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1446                for db_worktree in db_worktrees {
1447                    let mut worktree = RejoinedWorktree {
1448                        id: db_worktree.id as u64,
1449                        abs_path: db_worktree.abs_path,
1450                        root_name: db_worktree.root_name,
1451                        visible: db_worktree.visible,
1452                        updated_entries: Default::default(),
1453                        removed_entries: Default::default(),
1454                        diagnostic_summaries: Default::default(),
1455                        scan_id: db_worktree.scan_id as u64,
1456                        completed_scan_id: db_worktree.completed_scan_id as u64,
1457                    };
1458
1459                    let rejoined_worktree = rejoined_project
1460                        .worktrees
1461                        .iter()
1462                        .find(|worktree| worktree.id == db_worktree.id as u64);
1463                    let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1464                        worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1465                    } else {
1466                        worktree_entry::Column::IsDeleted.eq(false)
1467                    };
1468
1469                    let mut db_entries = worktree_entry::Entity::find()
1470                        .filter(
1471                            Condition::all()
1472                                .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1473                                .add(entry_filter),
1474                        )
1475                        .stream(&*tx)
1476                        .await?;
1477
1478                    while let Some(db_entry) = db_entries.next().await {
1479                        let db_entry = db_entry?;
1480                        if db_entry.is_deleted {
1481                            worktree.removed_entries.push(db_entry.id as u64);
1482                        } else {
1483                            worktree.updated_entries.push(proto::Entry {
1484                                id: db_entry.id as u64,
1485                                is_dir: db_entry.is_dir,
1486                                path: db_entry.path,
1487                                inode: db_entry.inode as u64,
1488                                mtime: Some(proto::Timestamp {
1489                                    seconds: db_entry.mtime_seconds as u64,
1490                                    nanos: db_entry.mtime_nanos as u32,
1491                                }),
1492                                is_symlink: db_entry.is_symlink,
1493                                is_ignored: db_entry.is_ignored,
1494                            });
1495                        }
1496                    }
1497
1498                    worktrees.push(worktree);
1499                }
1500
1501                let language_servers = project
1502                    .find_related(language_server::Entity)
1503                    .all(&*tx)
1504                    .await?
1505                    .into_iter()
1506                    .map(|language_server| proto::LanguageServer {
1507                        id: language_server.id as u64,
1508                        name: language_server.name,
1509                    })
1510                    .collect::<Vec<_>>();
1511
1512                let mut collaborators = project
1513                    .find_related(project_collaborator::Entity)
1514                    .all(&*tx)
1515                    .await?;
1516                let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1517                    .iter()
1518                    .position(|collaborator| collaborator.user_id == user_id)
1519                {
1520                    collaborators.swap_remove(self_collaborator_ix)
1521                } else {
1522                    continue;
1523                };
1524                let old_connection_id = self_collaborator.connection();
1525                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1526                    connection_id: ActiveValue::set(connection.id as i32),
1527                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1528                    ..self_collaborator.into_active_model()
1529                })
1530                .exec(&*tx)
1531                .await?;
1532
1533                let collaborators = collaborators
1534                    .into_iter()
1535                    .map(|collaborator| ProjectCollaborator {
1536                        connection_id: collaborator.connection(),
1537                        user_id: collaborator.user_id,
1538                        replica_id: collaborator.replica_id,
1539                        is_host: collaborator.is_host,
1540                    })
1541                    .collect::<Vec<_>>();
1542
1543                rejoined_projects.push(RejoinedProject {
1544                    id: project_id,
1545                    old_connection_id,
1546                    collaborators,
1547                    worktrees,
1548                    language_servers,
1549                });
1550            }
1551
1552            let room = self.get_room(room_id, &tx).await?;
1553            Ok((
1554                room_id,
1555                RejoinedRoom {
1556                    room,
1557                    rejoined_projects,
1558                    reshared_projects,
1559                },
1560            ))
1561        })
1562        .await
1563    }
1564
1565    pub async fn leave_room(
1566        &self,
1567        connection: ConnectionId,
1568    ) -> Result<Option<RoomGuard<LeftRoom>>> {
1569        self.optional_room_transaction(|tx| async move {
1570            let leaving_participant = room_participant::Entity::find()
1571                .filter(
1572                    Condition::all()
1573                        .add(
1574                            room_participant::Column::AnsweringConnectionId
1575                                .eq(connection.id as i32),
1576                        )
1577                        .add(
1578                            room_participant::Column::AnsweringConnectionServerId
1579                                .eq(connection.owner_id as i32),
1580                        ),
1581                )
1582                .one(&*tx)
1583                .await?;
1584
1585            if let Some(leaving_participant) = leaving_participant {
1586                // Leave room.
1587                let room_id = leaving_participant.room_id;
1588                room_participant::Entity::delete_by_id(leaving_participant.id)
1589                    .exec(&*tx)
1590                    .await?;
1591
1592                // Cancel pending calls initiated by the leaving user.
1593                let called_participants = room_participant::Entity::find()
1594                    .filter(
1595                        Condition::all()
1596                            .add(
1597                                room_participant::Column::CallingUserId
1598                                    .eq(leaving_participant.user_id),
1599                            )
1600                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
1601                    )
1602                    .all(&*tx)
1603                    .await?;
1604                room_participant::Entity::delete_many()
1605                    .filter(
1606                        room_participant::Column::Id
1607                            .is_in(called_participants.iter().map(|participant| participant.id)),
1608                    )
1609                    .exec(&*tx)
1610                    .await?;
1611                let canceled_calls_to_user_ids = called_participants
1612                    .into_iter()
1613                    .map(|participant| participant.user_id)
1614                    .collect();
1615
1616                // Detect left projects.
1617                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1618                enum QueryProjectIds {
1619                    ProjectId,
1620                }
1621                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1622                    .select_only()
1623                    .column_as(
1624                        project_collaborator::Column::ProjectId,
1625                        QueryProjectIds::ProjectId,
1626                    )
1627                    .filter(
1628                        Condition::all()
1629                            .add(
1630                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1631                            )
1632                            .add(
1633                                project_collaborator::Column::ConnectionServerId
1634                                    .eq(connection.owner_id as i32),
1635                            ),
1636                    )
1637                    .into_values::<_, QueryProjectIds>()
1638                    .all(&*tx)
1639                    .await?;
1640                let mut left_projects = HashMap::default();
1641                let mut collaborators = project_collaborator::Entity::find()
1642                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1643                    .stream(&*tx)
1644                    .await?;
1645                while let Some(collaborator) = collaborators.next().await {
1646                    let collaborator = collaborator?;
1647                    let left_project =
1648                        left_projects
1649                            .entry(collaborator.project_id)
1650                            .or_insert(LeftProject {
1651                                id: collaborator.project_id,
1652                                host_user_id: Default::default(),
1653                                connection_ids: Default::default(),
1654                                host_connection_id: Default::default(),
1655                            });
1656
1657                    let collaborator_connection_id = collaborator.connection();
1658                    if collaborator_connection_id != connection {
1659                        left_project.connection_ids.push(collaborator_connection_id);
1660                    }
1661
1662                    if collaborator.is_host {
1663                        left_project.host_user_id = collaborator.user_id;
1664                        left_project.host_connection_id = collaborator_connection_id;
1665                    }
1666                }
1667                drop(collaborators);
1668
1669                // Leave projects.
1670                project_collaborator::Entity::delete_many()
1671                    .filter(
1672                        Condition::all()
1673                            .add(
1674                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1675                            )
1676                            .add(
1677                                project_collaborator::Column::ConnectionServerId
1678                                    .eq(connection.owner_id as i32),
1679                            ),
1680                    )
1681                    .exec(&*tx)
1682                    .await?;
1683
1684                // Unshare projects.
1685                project::Entity::delete_many()
1686                    .filter(
1687                        Condition::all()
1688                            .add(project::Column::RoomId.eq(room_id))
1689                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
1690                            .add(
1691                                project::Column::HostConnectionServerId
1692                                    .eq(connection.owner_id as i32),
1693                            ),
1694                    )
1695                    .exec(&*tx)
1696                    .await?;
1697
1698                let room = self.get_room(room_id, &tx).await?;
1699                if room.participants.is_empty() {
1700                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
1701                }
1702
1703                let left_room = LeftRoom {
1704                    room,
1705                    left_projects,
1706                    canceled_calls_to_user_ids,
1707                };
1708
1709                if left_room.room.participants.is_empty() {
1710                    self.rooms.remove(&room_id);
1711                }
1712
1713                Ok(Some((room_id, left_room)))
1714            } else {
1715                Ok(None)
1716            }
1717        })
1718        .await
1719    }
1720
1721    pub async fn follow(
1722        &self,
1723        project_id: ProjectId,
1724        leader_connection: ConnectionId,
1725        follower_connection: ConnectionId,
1726    ) -> Result<RoomGuard<proto::Room>> {
1727        self.room_transaction(|tx| async move {
1728            let room_id = self.room_id_for_project(project_id, &*tx).await?;
1729            follower::ActiveModel {
1730                room_id: ActiveValue::set(room_id),
1731                project_id: ActiveValue::set(project_id),
1732                leader_connection_server_id: ActiveValue::set(ServerId(
1733                    leader_connection.owner_id as i32,
1734                )),
1735                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1736                follower_connection_server_id: ActiveValue::set(ServerId(
1737                    follower_connection.owner_id as i32,
1738                )),
1739                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1740                ..Default::default()
1741            }
1742            .insert(&*tx)
1743            .await?;
1744
1745            Ok((room_id, self.get_room(room_id, &*tx).await?))
1746        })
1747        .await
1748    }
1749
1750    pub async fn unfollow(
1751        &self,
1752        project_id: ProjectId,
1753        leader_connection: ConnectionId,
1754        follower_connection: ConnectionId,
1755    ) -> Result<RoomGuard<proto::Room>> {
1756        self.room_transaction(|tx| async move {
1757            let room_id = self.room_id_for_project(project_id, &*tx).await?;
1758            follower::Entity::delete_many()
1759                .filter(
1760                    Condition::all()
1761                        .add(follower::Column::ProjectId.eq(project_id))
1762                        .add(
1763                            follower::Column::LeaderConnectionServerId
1764                                .eq(leader_connection.owner_id)
1765                                .and(follower::Column::LeaderConnectionId.eq(leader_connection.id)),
1766                        )
1767                        .add(
1768                            follower::Column::FollowerConnectionServerId
1769                                .eq(follower_connection.owner_id)
1770                                .and(
1771                                    follower::Column::FollowerConnectionId
1772                                        .eq(follower_connection.id),
1773                                ),
1774                        ),
1775                )
1776                .exec(&*tx)
1777                .await?;
1778
1779            Ok((room_id, self.get_room(room_id, &*tx).await?))
1780        })
1781        .await
1782    }
1783
1784    async fn room_id_for_project(
1785        &self,
1786        project_id: ProjectId,
1787        tx: &DatabaseTransaction,
1788    ) -> Result<RoomId> {
1789        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1790        enum QueryAs {
1791            RoomId,
1792        }
1793
1794        Ok(project::Entity::find_by_id(project_id)
1795            .select_only()
1796            .column(project::Column::RoomId)
1797            .into_values::<_, QueryAs>()
1798            .one(&*tx)
1799            .await?
1800            .ok_or_else(|| anyhow!("no such project"))?)
1801    }
1802
1803    pub async fn update_room_participant_location(
1804        &self,
1805        room_id: RoomId,
1806        connection: ConnectionId,
1807        location: proto::ParticipantLocation,
1808    ) -> Result<RoomGuard<proto::Room>> {
1809        self.room_transaction(|tx| async {
1810            let tx = tx;
1811            let location_kind;
1812            let location_project_id;
1813            match location
1814                .variant
1815                .as_ref()
1816                .ok_or_else(|| anyhow!("invalid location"))?
1817            {
1818                proto::participant_location::Variant::SharedProject(project) => {
1819                    location_kind = 0;
1820                    location_project_id = Some(ProjectId::from_proto(project.id));
1821                }
1822                proto::participant_location::Variant::UnsharedProject(_) => {
1823                    location_kind = 1;
1824                    location_project_id = None;
1825                }
1826                proto::participant_location::Variant::External(_) => {
1827                    location_kind = 2;
1828                    location_project_id = None;
1829                }
1830            }
1831
1832            let result = room_participant::Entity::update_many()
1833                .filter(
1834                    Condition::all()
1835                        .add(room_participant::Column::RoomId.eq(room_id))
1836                        .add(
1837                            room_participant::Column::AnsweringConnectionId
1838                                .eq(connection.id as i32),
1839                        )
1840                        .add(
1841                            room_participant::Column::AnsweringConnectionServerId
1842                                .eq(connection.owner_id as i32),
1843                        ),
1844                )
1845                .set(room_participant::ActiveModel {
1846                    location_kind: ActiveValue::set(Some(location_kind)),
1847                    location_project_id: ActiveValue::set(location_project_id),
1848                    ..Default::default()
1849                })
1850                .exec(&*tx)
1851                .await?;
1852
1853            if result.rows_affected == 1 {
1854                let room = self.get_room(room_id, &tx).await?;
1855                Ok((room_id, room))
1856            } else {
1857                Err(anyhow!("could not update room participant location"))?
1858            }
1859        })
1860        .await
1861    }
1862
1863    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1864        self.transaction(|tx| async move {
1865            let participant = room_participant::Entity::find()
1866                .filter(
1867                    Condition::all()
1868                        .add(
1869                            room_participant::Column::AnsweringConnectionId
1870                                .eq(connection.id as i32),
1871                        )
1872                        .add(
1873                            room_participant::Column::AnsweringConnectionServerId
1874                                .eq(connection.owner_id as i32),
1875                        ),
1876                )
1877                .one(&*tx)
1878                .await?
1879                .ok_or_else(|| anyhow!("not a participant in any room"))?;
1880
1881            room_participant::Entity::update(room_participant::ActiveModel {
1882                answering_connection_lost: ActiveValue::set(true),
1883                ..participant.into_active_model()
1884            })
1885            .exec(&*tx)
1886            .await?;
1887
1888            Ok(())
1889        })
1890        .await
1891    }
1892
1893    fn build_incoming_call(
1894        room: &proto::Room,
1895        called_user_id: UserId,
1896    ) -> Option<proto::IncomingCall> {
1897        let pending_participant = room
1898            .pending_participants
1899            .iter()
1900            .find(|participant| participant.user_id == called_user_id.to_proto())?;
1901
1902        Some(proto::IncomingCall {
1903            room_id: room.id,
1904            calling_user_id: pending_participant.calling_user_id,
1905            participant_user_ids: room
1906                .participants
1907                .iter()
1908                .map(|participant| participant.user_id)
1909                .collect(),
1910            initial_project: room.participants.iter().find_map(|participant| {
1911                let initial_project_id = pending_participant.initial_project_id?;
1912                participant
1913                    .projects
1914                    .iter()
1915                    .find(|project| project.id == initial_project_id)
1916                    .cloned()
1917            }),
1918        })
1919    }
1920
1921    async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1922        let db_room = room::Entity::find_by_id(room_id)
1923            .one(tx)
1924            .await?
1925            .ok_or_else(|| anyhow!("could not find room"))?;
1926
1927        let mut db_participants = db_room
1928            .find_related(room_participant::Entity)
1929            .stream(tx)
1930            .await?;
1931        let mut participants = HashMap::default();
1932        let mut pending_participants = Vec::new();
1933        while let Some(db_participant) = db_participants.next().await {
1934            let db_participant = db_participant?;
1935            if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
1936                .answering_connection_id
1937                .zip(db_participant.answering_connection_server_id)
1938            {
1939                let location = match (
1940                    db_participant.location_kind,
1941                    db_participant.location_project_id,
1942                ) {
1943                    (Some(0), Some(project_id)) => {
1944                        Some(proto::participant_location::Variant::SharedProject(
1945                            proto::participant_location::SharedProject {
1946                                id: project_id.to_proto(),
1947                            },
1948                        ))
1949                    }
1950                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1951                        Default::default(),
1952                    )),
1953                    _ => Some(proto::participant_location::Variant::External(
1954                        Default::default(),
1955                    )),
1956                };
1957
1958                let answering_connection = ConnectionId {
1959                    owner_id: answering_connection_server_id.0 as u32,
1960                    id: answering_connection_id as u32,
1961                };
1962                participants.insert(
1963                    answering_connection,
1964                    proto::Participant {
1965                        user_id: db_participant.user_id.to_proto(),
1966                        peer_id: Some(answering_connection.into()),
1967                        projects: Default::default(),
1968                        location: Some(proto::ParticipantLocation { variant: location }),
1969                    },
1970                );
1971            } else {
1972                pending_participants.push(proto::PendingParticipant {
1973                    user_id: db_participant.user_id.to_proto(),
1974                    calling_user_id: db_participant.calling_user_id.to_proto(),
1975                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1976                });
1977            }
1978        }
1979        drop(db_participants);
1980
1981        let mut db_projects = db_room
1982            .find_related(project::Entity)
1983            .find_with_related(worktree::Entity)
1984            .stream(tx)
1985            .await?;
1986
1987        while let Some(row) = db_projects.next().await {
1988            let (db_project, db_worktree) = row?;
1989            let host_connection = db_project.host_connection()?;
1990            if let Some(participant) = participants.get_mut(&host_connection) {
1991                let project = if let Some(project) = participant
1992                    .projects
1993                    .iter_mut()
1994                    .find(|project| project.id == db_project.id.to_proto())
1995                {
1996                    project
1997                } else {
1998                    participant.projects.push(proto::ParticipantProject {
1999                        id: db_project.id.to_proto(),
2000                        worktree_root_names: Default::default(),
2001                    });
2002                    participant.projects.last_mut().unwrap()
2003                };
2004
2005                if let Some(db_worktree) = db_worktree {
2006                    if db_worktree.visible {
2007                        project.worktree_root_names.push(db_worktree.root_name);
2008                    }
2009                }
2010            }
2011        }
2012        drop(db_projects);
2013
2014        let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
2015        let mut followers = Vec::new();
2016        while let Some(db_follower) = db_followers.next().await {
2017            let db_follower = db_follower?;
2018            followers.push(proto::Follower {
2019                leader_id: Some(db_follower.leader_connection().into()),
2020                follower_id: Some(db_follower.follower_connection().into()),
2021            });
2022        }
2023
2024        Ok(proto::Room {
2025            id: db_room.id.to_proto(),
2026            live_kit_room: db_room.live_kit_room,
2027            participants: participants.into_values().collect(),
2028            pending_participants,
2029            followers,
2030        })
2031    }
2032
2033    // projects
2034
2035    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
2036        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2037        enum QueryAs {
2038            Count,
2039        }
2040
2041        self.transaction(|tx| async move {
2042            Ok(project::Entity::find()
2043                .select_only()
2044                .column_as(project::Column::Id.count(), QueryAs::Count)
2045                .inner_join(user::Entity)
2046                .filter(user::Column::Admin.eq(false))
2047                .into_values::<_, QueryAs>()
2048                .one(&*tx)
2049                .await?
2050                .unwrap_or(0i64) as usize)
2051        })
2052        .await
2053    }
2054
2055    pub async fn share_project(
2056        &self,
2057        room_id: RoomId,
2058        connection: ConnectionId,
2059        worktrees: &[proto::WorktreeMetadata],
2060    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
2061        self.room_transaction(|tx| async move {
2062            let participant = room_participant::Entity::find()
2063                .filter(
2064                    Condition::all()
2065                        .add(
2066                            room_participant::Column::AnsweringConnectionId
2067                                .eq(connection.id as i32),
2068                        )
2069                        .add(
2070                            room_participant::Column::AnsweringConnectionServerId
2071                                .eq(connection.owner_id as i32),
2072                        ),
2073                )
2074                .one(&*tx)
2075                .await?
2076                .ok_or_else(|| anyhow!("could not find participant"))?;
2077            if participant.room_id != room_id {
2078                return Err(anyhow!("shared project on unexpected room"))?;
2079            }
2080
2081            let project = project::ActiveModel {
2082                room_id: ActiveValue::set(participant.room_id),
2083                host_user_id: ActiveValue::set(participant.user_id),
2084                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
2085                host_connection_server_id: ActiveValue::set(Some(ServerId(
2086                    connection.owner_id as i32,
2087                ))),
2088                ..Default::default()
2089            }
2090            .insert(&*tx)
2091            .await?;
2092
2093            if !worktrees.is_empty() {
2094                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2095                    worktree::ActiveModel {
2096                        id: ActiveValue::set(worktree.id as i64),
2097                        project_id: ActiveValue::set(project.id),
2098                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
2099                        root_name: ActiveValue::set(worktree.root_name.clone()),
2100                        visible: ActiveValue::set(worktree.visible),
2101                        scan_id: ActiveValue::set(0),
2102                        completed_scan_id: ActiveValue::set(0),
2103                    }
2104                }))
2105                .exec(&*tx)
2106                .await?;
2107            }
2108
2109            project_collaborator::ActiveModel {
2110                project_id: ActiveValue::set(project.id),
2111                connection_id: ActiveValue::set(connection.id as i32),
2112                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2113                user_id: ActiveValue::set(participant.user_id),
2114                replica_id: ActiveValue::set(ReplicaId(0)),
2115                is_host: ActiveValue::set(true),
2116                ..Default::default()
2117            }
2118            .insert(&*tx)
2119            .await?;
2120
2121            let room = self.get_room(room_id, &tx).await?;
2122            Ok((room_id, (project.id, room)))
2123        })
2124        .await
2125    }
2126
2127    pub async fn unshare_project(
2128        &self,
2129        project_id: ProjectId,
2130        connection: ConnectionId,
2131    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2132        self.room_transaction(|tx| async move {
2133            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2134
2135            let project = project::Entity::find_by_id(project_id)
2136                .one(&*tx)
2137                .await?
2138                .ok_or_else(|| anyhow!("project not found"))?;
2139            if project.host_connection()? == connection {
2140                let room_id = project.room_id;
2141                project::Entity::delete(project.into_active_model())
2142                    .exec(&*tx)
2143                    .await?;
2144                let room = self.get_room(room_id, &tx).await?;
2145                Ok((room_id, (room, guest_connection_ids)))
2146            } else {
2147                Err(anyhow!("cannot unshare a project hosted by another user"))?
2148            }
2149        })
2150        .await
2151    }
2152
2153    pub async fn update_project(
2154        &self,
2155        project_id: ProjectId,
2156        connection: ConnectionId,
2157        worktrees: &[proto::WorktreeMetadata],
2158    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2159        self.room_transaction(|tx| async move {
2160            let project = project::Entity::find_by_id(project_id)
2161                .filter(
2162                    Condition::all()
2163                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2164                        .add(
2165                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2166                        ),
2167                )
2168                .one(&*tx)
2169                .await?
2170                .ok_or_else(|| anyhow!("no such project"))?;
2171
2172            self.update_project_worktrees(project.id, worktrees, &tx)
2173                .await?;
2174
2175            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2176            let room = self.get_room(project.room_id, &tx).await?;
2177            Ok((project.room_id, (room, guest_connection_ids)))
2178        })
2179        .await
2180    }
2181
2182    async fn update_project_worktrees(
2183        &self,
2184        project_id: ProjectId,
2185        worktrees: &[proto::WorktreeMetadata],
2186        tx: &DatabaseTransaction,
2187    ) -> Result<()> {
2188        if !worktrees.is_empty() {
2189            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2190                id: ActiveValue::set(worktree.id as i64),
2191                project_id: ActiveValue::set(project_id),
2192                abs_path: ActiveValue::set(worktree.abs_path.clone()),
2193                root_name: ActiveValue::set(worktree.root_name.clone()),
2194                visible: ActiveValue::set(worktree.visible),
2195                scan_id: ActiveValue::set(0),
2196                completed_scan_id: ActiveValue::set(0),
2197            }))
2198            .on_conflict(
2199                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2200                    .update_column(worktree::Column::RootName)
2201                    .to_owned(),
2202            )
2203            .exec(&*tx)
2204            .await?;
2205        }
2206
2207        worktree::Entity::delete_many()
2208            .filter(worktree::Column::ProjectId.eq(project_id).and(
2209                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2210            ))
2211            .exec(&*tx)
2212            .await?;
2213
2214        Ok(())
2215    }
2216
2217    pub async fn update_worktree(
2218        &self,
2219        update: &proto::UpdateWorktree,
2220        connection: ConnectionId,
2221    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2222        self.room_transaction(|tx| async move {
2223            let project_id = ProjectId::from_proto(update.project_id);
2224            let worktree_id = update.worktree_id as i64;
2225
2226            // Ensure the update comes from the host.
2227            let project = project::Entity::find_by_id(project_id)
2228                .filter(
2229                    Condition::all()
2230                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2231                        .add(
2232                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2233                        ),
2234                )
2235                .one(&*tx)
2236                .await?
2237                .ok_or_else(|| anyhow!("no such project"))?;
2238            let room_id = project.room_id;
2239
2240            // Update metadata.
2241            worktree::Entity::update(worktree::ActiveModel {
2242                id: ActiveValue::set(worktree_id),
2243                project_id: ActiveValue::set(project_id),
2244                root_name: ActiveValue::set(update.root_name.clone()),
2245                scan_id: ActiveValue::set(update.scan_id as i64),
2246                completed_scan_id: if update.is_last_update {
2247                    ActiveValue::set(update.scan_id as i64)
2248                } else {
2249                    ActiveValue::default()
2250                },
2251                abs_path: ActiveValue::set(update.abs_path.clone()),
2252                ..Default::default()
2253            })
2254            .exec(&*tx)
2255            .await?;
2256
2257            if !update.updated_entries.is_empty() {
2258                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2259                    let mtime = entry.mtime.clone().unwrap_or_default();
2260                    worktree_entry::ActiveModel {
2261                        project_id: ActiveValue::set(project_id),
2262                        worktree_id: ActiveValue::set(worktree_id),
2263                        id: ActiveValue::set(entry.id as i64),
2264                        is_dir: ActiveValue::set(entry.is_dir),
2265                        path: ActiveValue::set(entry.path.clone()),
2266                        inode: ActiveValue::set(entry.inode as i64),
2267                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2268                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2269                        is_symlink: ActiveValue::set(entry.is_symlink),
2270                        is_ignored: ActiveValue::set(entry.is_ignored),
2271                        is_deleted: ActiveValue::set(false),
2272                        scan_id: ActiveValue::set(update.scan_id as i64),
2273                    }
2274                }))
2275                .on_conflict(
2276                    OnConflict::columns([
2277                        worktree_entry::Column::ProjectId,
2278                        worktree_entry::Column::WorktreeId,
2279                        worktree_entry::Column::Id,
2280                    ])
2281                    .update_columns([
2282                        worktree_entry::Column::IsDir,
2283                        worktree_entry::Column::Path,
2284                        worktree_entry::Column::Inode,
2285                        worktree_entry::Column::MtimeSeconds,
2286                        worktree_entry::Column::MtimeNanos,
2287                        worktree_entry::Column::IsSymlink,
2288                        worktree_entry::Column::IsIgnored,
2289                        worktree_entry::Column::ScanId,
2290                    ])
2291                    .to_owned(),
2292                )
2293                .exec(&*tx)
2294                .await?;
2295            }
2296
2297            if !update.removed_entries.is_empty() {
2298                worktree_entry::Entity::update_many()
2299                    .filter(
2300                        worktree_entry::Column::ProjectId
2301                            .eq(project_id)
2302                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2303                            .and(
2304                                worktree_entry::Column::Id
2305                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2306                            ),
2307                    )
2308                    .set(worktree_entry::ActiveModel {
2309                        is_deleted: ActiveValue::Set(true),
2310                        scan_id: ActiveValue::Set(update.scan_id as i64),
2311                        ..Default::default()
2312                    })
2313                    .exec(&*tx)
2314                    .await?;
2315            }
2316
2317            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2318            Ok((room_id, connection_ids))
2319        })
2320        .await
2321    }
2322
2323    pub async fn update_diagnostic_summary(
2324        &self,
2325        update: &proto::UpdateDiagnosticSummary,
2326        connection: ConnectionId,
2327    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2328        self.room_transaction(|tx| async move {
2329            let project_id = ProjectId::from_proto(update.project_id);
2330            let worktree_id = update.worktree_id as i64;
2331            let summary = update
2332                .summary
2333                .as_ref()
2334                .ok_or_else(|| anyhow!("invalid summary"))?;
2335
2336            // Ensure the update comes from the host.
2337            let project = project::Entity::find_by_id(project_id)
2338                .one(&*tx)
2339                .await?
2340                .ok_or_else(|| anyhow!("no such project"))?;
2341            if project.host_connection()? != connection {
2342                return Err(anyhow!("can't update a project hosted by someone else"))?;
2343            }
2344
2345            // Update summary.
2346            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2347                project_id: ActiveValue::set(project_id),
2348                worktree_id: ActiveValue::set(worktree_id),
2349                path: ActiveValue::set(summary.path.clone()),
2350                language_server_id: ActiveValue::set(summary.language_server_id as i64),
2351                error_count: ActiveValue::set(summary.error_count as i32),
2352                warning_count: ActiveValue::set(summary.warning_count as i32),
2353                ..Default::default()
2354            })
2355            .on_conflict(
2356                OnConflict::columns([
2357                    worktree_diagnostic_summary::Column::ProjectId,
2358                    worktree_diagnostic_summary::Column::WorktreeId,
2359                    worktree_diagnostic_summary::Column::Path,
2360                ])
2361                .update_columns([
2362                    worktree_diagnostic_summary::Column::LanguageServerId,
2363                    worktree_diagnostic_summary::Column::ErrorCount,
2364                    worktree_diagnostic_summary::Column::WarningCount,
2365                ])
2366                .to_owned(),
2367            )
2368            .exec(&*tx)
2369            .await?;
2370
2371            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2372            Ok((project.room_id, connection_ids))
2373        })
2374        .await
2375    }
2376
2377    pub async fn start_language_server(
2378        &self,
2379        update: &proto::StartLanguageServer,
2380        connection: ConnectionId,
2381    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2382        self.room_transaction(|tx| async move {
2383            let project_id = ProjectId::from_proto(update.project_id);
2384            let server = update
2385                .server
2386                .as_ref()
2387                .ok_or_else(|| anyhow!("invalid language server"))?;
2388
2389            // Ensure the update comes from the host.
2390            let project = project::Entity::find_by_id(project_id)
2391                .one(&*tx)
2392                .await?
2393                .ok_or_else(|| anyhow!("no such project"))?;
2394            if project.host_connection()? != connection {
2395                return Err(anyhow!("can't update a project hosted by someone else"))?;
2396            }
2397
2398            // Add the newly-started language server.
2399            language_server::Entity::insert(language_server::ActiveModel {
2400                project_id: ActiveValue::set(project_id),
2401                id: ActiveValue::set(server.id as i64),
2402                name: ActiveValue::set(server.name.clone()),
2403                ..Default::default()
2404            })
2405            .on_conflict(
2406                OnConflict::columns([
2407                    language_server::Column::ProjectId,
2408                    language_server::Column::Id,
2409                ])
2410                .update_column(language_server::Column::Name)
2411                .to_owned(),
2412            )
2413            .exec(&*tx)
2414            .await?;
2415
2416            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2417            Ok((project.room_id, connection_ids))
2418        })
2419        .await
2420    }
2421
2422    pub async fn join_project(
2423        &self,
2424        project_id: ProjectId,
2425        connection: ConnectionId,
2426    ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2427        self.room_transaction(|tx| async move {
2428            let participant = room_participant::Entity::find()
2429                .filter(
2430                    Condition::all()
2431                        .add(
2432                            room_participant::Column::AnsweringConnectionId
2433                                .eq(connection.id as i32),
2434                        )
2435                        .add(
2436                            room_participant::Column::AnsweringConnectionServerId
2437                                .eq(connection.owner_id as i32),
2438                        ),
2439                )
2440                .one(&*tx)
2441                .await?
2442                .ok_or_else(|| anyhow!("must join a room first"))?;
2443
2444            let project = project::Entity::find_by_id(project_id)
2445                .one(&*tx)
2446                .await?
2447                .ok_or_else(|| anyhow!("no such project"))?;
2448            if project.room_id != participant.room_id {
2449                return Err(anyhow!("no such project"))?;
2450            }
2451
2452            let mut collaborators = project
2453                .find_related(project_collaborator::Entity)
2454                .all(&*tx)
2455                .await?;
2456            let replica_ids = collaborators
2457                .iter()
2458                .map(|c| c.replica_id)
2459                .collect::<HashSet<_>>();
2460            let mut replica_id = ReplicaId(1);
2461            while replica_ids.contains(&replica_id) {
2462                replica_id.0 += 1;
2463            }
2464            let new_collaborator = project_collaborator::ActiveModel {
2465                project_id: ActiveValue::set(project_id),
2466                connection_id: ActiveValue::set(connection.id as i32),
2467                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2468                user_id: ActiveValue::set(participant.user_id),
2469                replica_id: ActiveValue::set(replica_id),
2470                is_host: ActiveValue::set(false),
2471                ..Default::default()
2472            }
2473            .insert(&*tx)
2474            .await?;
2475            collaborators.push(new_collaborator);
2476
2477            let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2478            let mut worktrees = db_worktrees
2479                .into_iter()
2480                .map(|db_worktree| {
2481                    (
2482                        db_worktree.id as u64,
2483                        Worktree {
2484                            id: db_worktree.id as u64,
2485                            abs_path: db_worktree.abs_path,
2486                            root_name: db_worktree.root_name,
2487                            visible: db_worktree.visible,
2488                            entries: Default::default(),
2489                            diagnostic_summaries: Default::default(),
2490                            scan_id: db_worktree.scan_id as u64,
2491                            completed_scan_id: db_worktree.completed_scan_id as u64,
2492                        },
2493                    )
2494                })
2495                .collect::<BTreeMap<_, _>>();
2496
2497            // Populate worktree entries.
2498            {
2499                let mut db_entries = worktree_entry::Entity::find()
2500                    .filter(
2501                        Condition::all()
2502                            .add(worktree_entry::Column::ProjectId.eq(project_id))
2503                            .add(worktree_entry::Column::IsDeleted.eq(false)),
2504                    )
2505                    .stream(&*tx)
2506                    .await?;
2507                while let Some(db_entry) = db_entries.next().await {
2508                    let db_entry = db_entry?;
2509                    if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2510                        worktree.entries.push(proto::Entry {
2511                            id: db_entry.id as u64,
2512                            is_dir: db_entry.is_dir,
2513                            path: db_entry.path,
2514                            inode: db_entry.inode as u64,
2515                            mtime: Some(proto::Timestamp {
2516                                seconds: db_entry.mtime_seconds as u64,
2517                                nanos: db_entry.mtime_nanos as u32,
2518                            }),
2519                            is_symlink: db_entry.is_symlink,
2520                            is_ignored: db_entry.is_ignored,
2521                        });
2522                    }
2523                }
2524            }
2525
2526            // Populate worktree diagnostic summaries.
2527            {
2528                let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2529                    .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2530                    .stream(&*tx)
2531                    .await?;
2532                while let Some(db_summary) = db_summaries.next().await {
2533                    let db_summary = db_summary?;
2534                    if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2535                        worktree
2536                            .diagnostic_summaries
2537                            .push(proto::DiagnosticSummary {
2538                                path: db_summary.path,
2539                                language_server_id: db_summary.language_server_id as u64,
2540                                error_count: db_summary.error_count as u32,
2541                                warning_count: db_summary.warning_count as u32,
2542                            });
2543                    }
2544                }
2545            }
2546
2547            // Populate language servers.
2548            let language_servers = project
2549                .find_related(language_server::Entity)
2550                .all(&*tx)
2551                .await?;
2552
2553            let room_id = project.room_id;
2554            let project = Project {
2555                collaborators: collaborators
2556                    .into_iter()
2557                    .map(|collaborator| ProjectCollaborator {
2558                        connection_id: collaborator.connection(),
2559                        user_id: collaborator.user_id,
2560                        replica_id: collaborator.replica_id,
2561                        is_host: collaborator.is_host,
2562                    })
2563                    .collect(),
2564                worktrees,
2565                language_servers: language_servers
2566                    .into_iter()
2567                    .map(|language_server| proto::LanguageServer {
2568                        id: language_server.id as u64,
2569                        name: language_server.name,
2570                    })
2571                    .collect(),
2572            };
2573            Ok((room_id, (project, replica_id as ReplicaId)))
2574        })
2575        .await
2576    }
2577
2578    pub async fn leave_project(
2579        &self,
2580        project_id: ProjectId,
2581        connection: ConnectionId,
2582    ) -> Result<RoomGuard<LeftProject>> {
2583        self.room_transaction(|tx| async move {
2584            let result = project_collaborator::Entity::delete_many()
2585                .filter(
2586                    Condition::all()
2587                        .add(project_collaborator::Column::ProjectId.eq(project_id))
2588                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2589                        .add(
2590                            project_collaborator::Column::ConnectionServerId
2591                                .eq(connection.owner_id as i32),
2592                        ),
2593                )
2594                .exec(&*tx)
2595                .await?;
2596            if result.rows_affected == 0 {
2597                Err(anyhow!("not a collaborator on this project"))?;
2598            }
2599
2600            let project = project::Entity::find_by_id(project_id)
2601                .one(&*tx)
2602                .await?
2603                .ok_or_else(|| anyhow!("no such project"))?;
2604            let collaborators = project
2605                .find_related(project_collaborator::Entity)
2606                .all(&*tx)
2607                .await?;
2608            let connection_ids = collaborators
2609                .into_iter()
2610                .map(|collaborator| collaborator.connection())
2611                .collect();
2612
2613            let left_project = LeftProject {
2614                id: project_id,
2615                host_user_id: project.host_user_id,
2616                host_connection_id: project.host_connection()?,
2617                connection_ids,
2618            };
2619            Ok((project.room_id, left_project))
2620        })
2621        .await
2622    }
2623
2624    pub async fn project_collaborators(
2625        &self,
2626        project_id: ProjectId,
2627        connection_id: ConnectionId,
2628    ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
2629        self.room_transaction(|tx| async move {
2630            let project = project::Entity::find_by_id(project_id)
2631                .one(&*tx)
2632                .await?
2633                .ok_or_else(|| anyhow!("no such project"))?;
2634            let collaborators = project_collaborator::Entity::find()
2635                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2636                .all(&*tx)
2637                .await?
2638                .into_iter()
2639                .map(|collaborator| ProjectCollaborator {
2640                    connection_id: collaborator.connection(),
2641                    user_id: collaborator.user_id,
2642                    replica_id: collaborator.replica_id,
2643                    is_host: collaborator.is_host,
2644                })
2645                .collect::<Vec<_>>();
2646
2647            if collaborators
2648                .iter()
2649                .any(|collaborator| collaborator.connection_id == connection_id)
2650            {
2651                Ok((project.room_id, collaborators))
2652            } else {
2653                Err(anyhow!("no such project"))?
2654            }
2655        })
2656        .await
2657    }
2658
2659    pub async fn project_connection_ids(
2660        &self,
2661        project_id: ProjectId,
2662        connection_id: ConnectionId,
2663    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
2664        self.room_transaction(|tx| async move {
2665            let project = project::Entity::find_by_id(project_id)
2666                .one(&*tx)
2667                .await?
2668                .ok_or_else(|| anyhow!("no such project"))?;
2669            let mut collaborators = project_collaborator::Entity::find()
2670                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2671                .stream(&*tx)
2672                .await?;
2673
2674            let mut connection_ids = HashSet::default();
2675            while let Some(collaborator) = collaborators.next().await {
2676                let collaborator = collaborator?;
2677                connection_ids.insert(collaborator.connection());
2678            }
2679
2680            if connection_ids.contains(&connection_id) {
2681                Ok((project.room_id, connection_ids))
2682            } else {
2683                Err(anyhow!("no such project"))?
2684            }
2685        })
2686        .await
2687    }
2688
2689    async fn project_guest_connection_ids(
2690        &self,
2691        project_id: ProjectId,
2692        tx: &DatabaseTransaction,
2693    ) -> Result<Vec<ConnectionId>> {
2694        let mut collaborators = project_collaborator::Entity::find()
2695            .filter(
2696                project_collaborator::Column::ProjectId
2697                    .eq(project_id)
2698                    .and(project_collaborator::Column::IsHost.eq(false)),
2699            )
2700            .stream(tx)
2701            .await?;
2702
2703        let mut guest_connection_ids = Vec::new();
2704        while let Some(collaborator) = collaborators.next().await {
2705            let collaborator = collaborator?;
2706            guest_connection_ids.push(collaborator.connection());
2707        }
2708        Ok(guest_connection_ids)
2709    }
2710
2711    // access tokens
2712
2713    pub async fn create_access_token_hash(
2714        &self,
2715        user_id: UserId,
2716        access_token_hash: &str,
2717        max_access_token_count: usize,
2718    ) -> Result<()> {
2719        self.transaction(|tx| async {
2720            let tx = tx;
2721
2722            access_token::ActiveModel {
2723                user_id: ActiveValue::set(user_id),
2724                hash: ActiveValue::set(access_token_hash.into()),
2725                ..Default::default()
2726            }
2727            .insert(&*tx)
2728            .await?;
2729
2730            access_token::Entity::delete_many()
2731                .filter(
2732                    access_token::Column::Id.in_subquery(
2733                        Query::select()
2734                            .column(access_token::Column::Id)
2735                            .from(access_token::Entity)
2736                            .and_where(access_token::Column::UserId.eq(user_id))
2737                            .order_by(access_token::Column::Id, sea_orm::Order::Desc)
2738                            .limit(10000)
2739                            .offset(max_access_token_count as u64)
2740                            .to_owned(),
2741                    ),
2742                )
2743                .exec(&*tx)
2744                .await?;
2745            Ok(())
2746        })
2747        .await
2748    }
2749
2750    pub async fn get_access_token_hashes(&self, user_id: UserId) -> Result<Vec<String>> {
2751        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2752        enum QueryAs {
2753            Hash,
2754        }
2755
2756        self.transaction(|tx| async move {
2757            Ok(access_token::Entity::find()
2758                .select_only()
2759                .column(access_token::Column::Hash)
2760                .filter(access_token::Column::UserId.eq(user_id))
2761                .order_by_desc(access_token::Column::Id)
2762                .into_values::<_, QueryAs>()
2763                .all(&*tx)
2764                .await?)
2765        })
2766        .await
2767    }
2768
2769    async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
2770    where
2771        F: Send + Fn(TransactionHandle) -> Fut,
2772        Fut: Send + Future<Output = Result<T>>,
2773    {
2774        let body = async {
2775            loop {
2776                let (tx, result) = self.with_transaction(&f).await?;
2777                match result {
2778                    Ok(result) => {
2779                        match tx.commit().await.map_err(Into::into) {
2780                            Ok(()) => return Ok(result),
2781                            Err(error) => {
2782                                if is_serialization_error(&error) {
2783                                    // Retry (don't break the loop)
2784                                } else {
2785                                    return Err(error);
2786                                }
2787                            }
2788                        }
2789                    }
2790                    Err(error) => {
2791                        tx.rollback().await?;
2792                        if is_serialization_error(&error) {
2793                            // Retry (don't break the loop)
2794                        } else {
2795                            return Err(error);
2796                        }
2797                    }
2798                }
2799            }
2800        };
2801
2802        self.run(body).await
2803    }
2804
2805    async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
2806    where
2807        F: Send + Fn(TransactionHandle) -> Fut,
2808        Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
2809    {
2810        let body = async {
2811            loop {
2812                let (tx, result) = self.with_transaction(&f).await?;
2813                match result {
2814                    Ok(Some((room_id, data))) => {
2815                        let lock = self.rooms.entry(room_id).or_default().clone();
2816                        let _guard = lock.lock_owned().await;
2817                        match tx.commit().await.map_err(Into::into) {
2818                            Ok(()) => {
2819                                return Ok(Some(RoomGuard {
2820                                    data,
2821                                    _guard,
2822                                    _not_send: PhantomData,
2823                                }));
2824                            }
2825                            Err(error) => {
2826                                if is_serialization_error(&error) {
2827                                    // Retry (don't break the loop)
2828                                } else {
2829                                    return Err(error);
2830                                }
2831                            }
2832                        }
2833                    }
2834                    Ok(None) => {
2835                        match tx.commit().await.map_err(Into::into) {
2836                            Ok(()) => return Ok(None),
2837                            Err(error) => {
2838                                if is_serialization_error(&error) {
2839                                    // Retry (don't break the loop)
2840                                } else {
2841                                    return Err(error);
2842                                }
2843                            }
2844                        }
2845                    }
2846                    Err(error) => {
2847                        tx.rollback().await?;
2848                        if is_serialization_error(&error) {
2849                            // Retry (don't break the loop)
2850                        } else {
2851                            return Err(error);
2852                        }
2853                    }
2854                }
2855            }
2856        };
2857
2858        self.run(body).await
2859    }
2860
2861    async fn room_transaction<F, Fut, T>(&self, f: F) -> Result<RoomGuard<T>>
2862    where
2863        F: Send + Fn(TransactionHandle) -> Fut,
2864        Fut: Send + Future<Output = Result<(RoomId, T)>>,
2865    {
2866        let data = self
2867            .optional_room_transaction(move |tx| {
2868                let future = f(tx);
2869                async {
2870                    let data = future.await?;
2871                    Ok(Some(data))
2872                }
2873            })
2874            .await?;
2875        Ok(data.unwrap())
2876    }
2877
2878    async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
2879    where
2880        F: Send + Fn(TransactionHandle) -> Fut,
2881        Fut: Send + Future<Output = Result<T>>,
2882    {
2883        let tx = self
2884            .pool
2885            .begin_with_config(Some(IsolationLevel::Serializable), None)
2886            .await?;
2887
2888        let mut tx = Arc::new(Some(tx));
2889        let result = f(TransactionHandle(tx.clone())).await;
2890        let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
2891            return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
2892        };
2893
2894        Ok((tx, result))
2895    }
2896
2897    async fn run<F, T>(&self, future: F) -> T
2898    where
2899        F: Future<Output = T>,
2900    {
2901        #[cfg(test)]
2902        {
2903            if let Some(background) = self.background.as_ref() {
2904                background.simulate_random_delay().await;
2905            }
2906
2907            self.runtime.as_ref().unwrap().block_on(future)
2908        }
2909
2910        #[cfg(not(test))]
2911        {
2912            future.await
2913        }
2914    }
2915}
2916
2917fn is_serialization_error(error: &Error) -> bool {
2918    const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
2919    match error {
2920        Error::Database(
2921            DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
2922            | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
2923        ) if error
2924            .as_database_error()
2925            .and_then(|error| error.code())
2926            .as_deref()
2927            == Some(SERIALIZATION_FAILURE_CODE) =>
2928        {
2929            true
2930        }
2931        _ => false,
2932    }
2933}
2934
2935struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
2936
2937impl Deref for TransactionHandle {
2938    type Target = DatabaseTransaction;
2939
2940    fn deref(&self) -> &Self::Target {
2941        self.0.as_ref().as_ref().unwrap()
2942    }
2943}
2944
2945pub struct RoomGuard<T> {
2946    data: T,
2947    _guard: OwnedMutexGuard<()>,
2948    _not_send: PhantomData<Rc<()>>,
2949}
2950
2951impl<T> Deref for RoomGuard<T> {
2952    type Target = T;
2953
2954    fn deref(&self) -> &T {
2955        &self.data
2956    }
2957}
2958
2959impl<T> DerefMut for RoomGuard<T> {
2960    fn deref_mut(&mut self) -> &mut T {
2961        &mut self.data
2962    }
2963}
2964
2965#[derive(Debug, Serialize, Deserialize)]
2966pub struct NewUserParams {
2967    pub github_login: String,
2968    pub github_user_id: i32,
2969    pub invite_count: i32,
2970}
2971
2972#[derive(Debug)]
2973pub struct NewUserResult {
2974    pub user_id: UserId,
2975    pub metrics_id: String,
2976    pub inviting_user_id: Option<UserId>,
2977    pub signup_device_id: Option<String>,
2978}
2979
2980fn random_invite_code() -> String {
2981    nanoid::nanoid!(16)
2982}
2983
2984fn random_email_confirmation_code() -> String {
2985    nanoid::nanoid!(64)
2986}
2987
2988macro_rules! id_type {
2989    ($name:ident) => {
2990        #[derive(
2991            Clone,
2992            Copy,
2993            Debug,
2994            Default,
2995            PartialEq,
2996            Eq,
2997            PartialOrd,
2998            Ord,
2999            Hash,
3000            Serialize,
3001            Deserialize,
3002        )]
3003        #[serde(transparent)]
3004        pub struct $name(pub i32);
3005
3006        impl $name {
3007            #[allow(unused)]
3008            pub const MAX: Self = Self(i32::MAX);
3009
3010            #[allow(unused)]
3011            pub fn from_proto(value: u64) -> Self {
3012                Self(value as i32)
3013            }
3014
3015            #[allow(unused)]
3016            pub fn to_proto(self) -> u64 {
3017                self.0 as u64
3018            }
3019        }
3020
3021        impl std::fmt::Display for $name {
3022            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
3023                self.0.fmt(f)
3024            }
3025        }
3026
3027        impl From<$name> for sea_query::Value {
3028            fn from(value: $name) -> Self {
3029                sea_query::Value::Int(Some(value.0))
3030            }
3031        }
3032
3033        impl sea_orm::TryGetable for $name {
3034            fn try_get(
3035                res: &sea_orm::QueryResult,
3036                pre: &str,
3037                col: &str,
3038            ) -> Result<Self, sea_orm::TryGetError> {
3039                Ok(Self(i32::try_get(res, pre, col)?))
3040            }
3041        }
3042
3043        impl sea_query::ValueType for $name {
3044            fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
3045                match v {
3046                    Value::TinyInt(Some(int)) => {
3047                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3048                    }
3049                    Value::SmallInt(Some(int)) => {
3050                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3051                    }
3052                    Value::Int(Some(int)) => {
3053                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3054                    }
3055                    Value::BigInt(Some(int)) => {
3056                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3057                    }
3058                    Value::TinyUnsigned(Some(int)) => {
3059                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3060                    }
3061                    Value::SmallUnsigned(Some(int)) => {
3062                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3063                    }
3064                    Value::Unsigned(Some(int)) => {
3065                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3066                    }
3067                    Value::BigUnsigned(Some(int)) => {
3068                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3069                    }
3070                    _ => Err(sea_query::ValueTypeErr),
3071                }
3072            }
3073
3074            fn type_name() -> String {
3075                stringify!($name).into()
3076            }
3077
3078            fn array_type() -> sea_query::ArrayType {
3079                sea_query::ArrayType::Int
3080            }
3081
3082            fn column_type() -> sea_query::ColumnType {
3083                sea_query::ColumnType::Integer(None)
3084            }
3085        }
3086
3087        impl sea_orm::TryFromU64 for $name {
3088            fn try_from_u64(n: u64) -> Result<Self, DbErr> {
3089                Ok(Self(n.try_into().map_err(|_| {
3090                    DbErr::ConvertFromU64(concat!(
3091                        "error converting ",
3092                        stringify!($name),
3093                        " to u64"
3094                    ))
3095                })?))
3096            }
3097        }
3098
3099        impl sea_query::Nullable for $name {
3100            fn null() -> Value {
3101                Value::Int(None)
3102            }
3103        }
3104    };
3105}
3106
3107id_type!(AccessTokenId);
3108id_type!(ContactId);
3109id_type!(FollowerId);
3110id_type!(RoomId);
3111id_type!(RoomParticipantId);
3112id_type!(ProjectId);
3113id_type!(ProjectCollaboratorId);
3114id_type!(ReplicaId);
3115id_type!(ServerId);
3116id_type!(SignupId);
3117id_type!(UserId);
3118
3119pub struct RejoinedRoom {
3120    pub room: proto::Room,
3121    pub rejoined_projects: Vec<RejoinedProject>,
3122    pub reshared_projects: Vec<ResharedProject>,
3123}
3124
3125pub struct ResharedProject {
3126    pub id: ProjectId,
3127    pub old_connection_id: ConnectionId,
3128    pub collaborators: Vec<ProjectCollaborator>,
3129    pub worktrees: Vec<proto::WorktreeMetadata>,
3130}
3131
3132pub struct RejoinedProject {
3133    pub id: ProjectId,
3134    pub old_connection_id: ConnectionId,
3135    pub collaborators: Vec<ProjectCollaborator>,
3136    pub worktrees: Vec<RejoinedWorktree>,
3137    pub language_servers: Vec<proto::LanguageServer>,
3138}
3139
3140#[derive(Debug)]
3141pub struct RejoinedWorktree {
3142    pub id: u64,
3143    pub abs_path: String,
3144    pub root_name: String,
3145    pub visible: bool,
3146    pub updated_entries: Vec<proto::Entry>,
3147    pub removed_entries: Vec<u64>,
3148    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3149    pub scan_id: u64,
3150    pub completed_scan_id: u64,
3151}
3152
3153pub struct LeftRoom {
3154    pub room: proto::Room,
3155    pub left_projects: HashMap<ProjectId, LeftProject>,
3156    pub canceled_calls_to_user_ids: Vec<UserId>,
3157}
3158
3159pub struct RefreshedRoom {
3160    pub room: proto::Room,
3161    pub stale_participant_user_ids: Vec<UserId>,
3162    pub canceled_calls_to_user_ids: Vec<UserId>,
3163}
3164
3165pub struct Project {
3166    pub collaborators: Vec<ProjectCollaborator>,
3167    pub worktrees: BTreeMap<u64, Worktree>,
3168    pub language_servers: Vec<proto::LanguageServer>,
3169}
3170
3171pub struct ProjectCollaborator {
3172    pub connection_id: ConnectionId,
3173    pub user_id: UserId,
3174    pub replica_id: ReplicaId,
3175    pub is_host: bool,
3176}
3177
3178impl ProjectCollaborator {
3179    pub fn to_proto(&self) -> proto::Collaborator {
3180        proto::Collaborator {
3181            peer_id: Some(self.connection_id.into()),
3182            replica_id: self.replica_id.0 as u32,
3183            user_id: self.user_id.to_proto(),
3184        }
3185    }
3186}
3187
3188#[derive(Debug)]
3189pub struct LeftProject {
3190    pub id: ProjectId,
3191    pub host_user_id: UserId,
3192    pub host_connection_id: ConnectionId,
3193    pub connection_ids: Vec<ConnectionId>,
3194}
3195
3196pub struct Worktree {
3197    pub id: u64,
3198    pub abs_path: String,
3199    pub root_name: String,
3200    pub visible: bool,
3201    pub entries: Vec<proto::Entry>,
3202    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3203    pub scan_id: u64,
3204    pub completed_scan_id: u64,
3205}
3206
3207#[cfg(test)]
3208pub use test::*;
3209
3210#[cfg(test)]
3211mod test {
3212    use super::*;
3213    use gpui::executor::Background;
3214    use lazy_static::lazy_static;
3215    use parking_lot::Mutex;
3216    use rand::prelude::*;
3217    use sea_orm::ConnectionTrait;
3218    use sqlx::migrate::MigrateDatabase;
3219    use std::sync::Arc;
3220
3221    pub struct TestDb {
3222        pub db: Option<Arc<Database>>,
3223        pub connection: Option<sqlx::AnyConnection>,
3224    }
3225
3226    impl TestDb {
3227        pub fn sqlite(background: Arc<Background>) -> Self {
3228            let url = format!("sqlite::memory:");
3229            let runtime = tokio::runtime::Builder::new_current_thread()
3230                .enable_io()
3231                .enable_time()
3232                .build()
3233                .unwrap();
3234
3235            let mut db = runtime.block_on(async {
3236                let mut options = ConnectOptions::new(url);
3237                options.max_connections(5);
3238                let db = Database::new(options).await.unwrap();
3239                let sql = include_str!(concat!(
3240                    env!("CARGO_MANIFEST_DIR"),
3241                    "/migrations.sqlite/20221109000000_test_schema.sql"
3242                ));
3243                db.pool
3244                    .execute(sea_orm::Statement::from_string(
3245                        db.pool.get_database_backend(),
3246                        sql.into(),
3247                    ))
3248                    .await
3249                    .unwrap();
3250                db
3251            });
3252
3253            db.background = Some(background);
3254            db.runtime = Some(runtime);
3255
3256            Self {
3257                db: Some(Arc::new(db)),
3258                connection: None,
3259            }
3260        }
3261
3262        pub fn postgres(background: Arc<Background>) -> Self {
3263            lazy_static! {
3264                static ref LOCK: Mutex<()> = Mutex::new(());
3265            }
3266
3267            let _guard = LOCK.lock();
3268            let mut rng = StdRng::from_entropy();
3269            let url = format!(
3270                "postgres://postgres@localhost/zed-test-{}",
3271                rng.gen::<u128>()
3272            );
3273            let runtime = tokio::runtime::Builder::new_current_thread()
3274                .enable_io()
3275                .enable_time()
3276                .build()
3277                .unwrap();
3278
3279            let mut db = runtime.block_on(async {
3280                sqlx::Postgres::create_database(&url)
3281                    .await
3282                    .expect("failed to create test db");
3283                let mut options = ConnectOptions::new(url);
3284                options
3285                    .max_connections(5)
3286                    .idle_timeout(Duration::from_secs(0));
3287                let db = Database::new(options).await.unwrap();
3288                let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
3289                db.migrate(Path::new(migrations_path), false).await.unwrap();
3290                db
3291            });
3292
3293            db.background = Some(background);
3294            db.runtime = Some(runtime);
3295
3296            Self {
3297                db: Some(Arc::new(db)),
3298                connection: None,
3299            }
3300        }
3301
3302        pub fn db(&self) -> &Arc<Database> {
3303            self.db.as_ref().unwrap()
3304        }
3305    }
3306
3307    impl Drop for TestDb {
3308        fn drop(&mut self) {
3309            let db = self.db.take().unwrap();
3310            if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
3311                db.runtime.as_ref().unwrap().block_on(async {
3312                    use util::ResultExt;
3313                    let query = "
3314                        SELECT pg_terminate_backend(pg_stat_activity.pid)
3315                        FROM pg_stat_activity
3316                        WHERE
3317                            pg_stat_activity.datname = current_database() AND
3318                            pid <> pg_backend_pid();
3319                    ";
3320                    db.pool
3321                        .execute(sea_orm::Statement::from_string(
3322                            db.pool.get_database_backend(),
3323                            query.into(),
3324                        ))
3325                        .await
3326                        .log_err();
3327                    sqlx::Postgres::drop_database(db.options.get_url())
3328                        .await
3329                        .log_err();
3330                })
3331            }
3332        }
3333    }
3334}