db.rs

   1mod access_token;
   2mod contact;
   3mod follower;
   4mod language_server;
   5mod project;
   6mod project_collaborator;
   7mod room;
   8mod room_participant;
   9mod server;
  10mod signup;
  11#[cfg(test)]
  12mod tests;
  13mod user;
  14mod worktree;
  15mod worktree_diagnostic_summary;
  16mod worktree_entry;
  17
  18use crate::{Error, Result};
  19use anyhow::anyhow;
  20use collections::{BTreeMap, HashMap, HashSet};
  21pub use contact::Contact;
  22use dashmap::DashMap;
  23use futures::StreamExt;
  24use hyper::StatusCode;
  25use rpc::{proto, ConnectionId};
  26use sea_orm::Condition;
  27pub use sea_orm::ConnectOptions;
  28use sea_orm::{
  29    entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
  30    DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
  31    Statement, TransactionTrait,
  32};
  33use sea_query::{Alias, Expr, OnConflict, Query};
  34use serde::{Deserialize, Serialize};
  35pub use signup::{Invite, NewSignup, WaitlistSummary};
  36use sqlx::migrate::{Migrate, Migration, MigrationSource};
  37use sqlx::Connection;
  38use std::ops::{Deref, DerefMut};
  39use std::path::Path;
  40use std::time::Duration;
  41use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
  42use tokio::sync::{Mutex, OwnedMutexGuard};
  43pub use user::Model as User;
  44
  45pub struct Database {
  46    options: ConnectOptions,
  47    pool: DatabaseConnection,
  48    rooms: DashMap<RoomId, Arc<Mutex<()>>>,
  49    #[cfg(test)]
  50    background: Option<std::sync::Arc<gpui::executor::Background>>,
  51    #[cfg(test)]
  52    runtime: Option<tokio::runtime::Runtime>,
  53}
  54
  55impl Database {
  56    pub async fn new(options: ConnectOptions) -> Result<Self> {
  57        Ok(Self {
  58            options: options.clone(),
  59            pool: sea_orm::Database::connect(options).await?,
  60            rooms: DashMap::with_capacity(16384),
  61            #[cfg(test)]
  62            background: None,
  63            #[cfg(test)]
  64            runtime: None,
  65        })
  66    }
  67
  68    #[cfg(test)]
  69    pub fn reset(&self) {
  70        self.rooms.clear();
  71    }
  72
  73    pub async fn migrate(
  74        &self,
  75        migrations_path: &Path,
  76        ignore_checksum_mismatch: bool,
  77    ) -> anyhow::Result<Vec<(Migration, Duration)>> {
  78        let migrations = MigrationSource::resolve(migrations_path)
  79            .await
  80            .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
  81
  82        let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
  83
  84        connection.ensure_migrations_table().await?;
  85        let applied_migrations: HashMap<_, _> = connection
  86            .list_applied_migrations()
  87            .await?
  88            .into_iter()
  89            .map(|m| (m.version, m))
  90            .collect();
  91
  92        let mut new_migrations = Vec::new();
  93        for migration in migrations {
  94            match applied_migrations.get(&migration.version) {
  95                Some(applied_migration) => {
  96                    if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
  97                    {
  98                        Err(anyhow!(
  99                            "checksum mismatch for applied migration {}",
 100                            migration.description
 101                        ))?;
 102                    }
 103                }
 104                None => {
 105                    let elapsed = connection.apply(&migration).await?;
 106                    new_migrations.push((migration, elapsed));
 107                }
 108            }
 109        }
 110
 111        Ok(new_migrations)
 112    }
 113
 114    pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
 115        self.transaction(|tx| async move {
 116            let server = server::ActiveModel {
 117                environment: ActiveValue::set(environment.into()),
 118                ..Default::default()
 119            }
 120            .insert(&*tx)
 121            .await?;
 122            Ok(server.id)
 123        })
 124        .await
 125    }
 126
 127    pub async fn stale_room_ids(
 128        &self,
 129        environment: &str,
 130        new_server_id: ServerId,
 131    ) -> Result<Vec<RoomId>> {
 132        self.transaction(|tx| async move {
 133            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 134            enum QueryAs {
 135                RoomId,
 136            }
 137
 138            let stale_server_epochs = self
 139                .stale_server_ids(environment, new_server_id, &tx)
 140                .await?;
 141            Ok(room_participant::Entity::find()
 142                .select_only()
 143                .column(room_participant::Column::RoomId)
 144                .distinct()
 145                .filter(
 146                    room_participant::Column::AnsweringConnectionServerId
 147                        .is_in(stale_server_epochs),
 148                )
 149                .into_values::<_, QueryAs>()
 150                .all(&*tx)
 151                .await?)
 152        })
 153        .await
 154    }
 155
 156    pub async fn refresh_room(
 157        &self,
 158        room_id: RoomId,
 159        new_server_id: ServerId,
 160    ) -> Result<RoomGuard<RefreshedRoom>> {
 161        self.room_transaction(|tx| async move {
 162            let stale_participant_filter = Condition::all()
 163                .add(room_participant::Column::RoomId.eq(room_id))
 164                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
 165                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
 166
 167            let stale_participant_user_ids = room_participant::Entity::find()
 168                .filter(stale_participant_filter.clone())
 169                .all(&*tx)
 170                .await?
 171                .into_iter()
 172                .map(|participant| participant.user_id)
 173                .collect::<Vec<_>>();
 174
 175            // Delete participants who failed to reconnect.
 176            room_participant::Entity::delete_many()
 177                .filter(stale_participant_filter)
 178                .exec(&*tx)
 179                .await?;
 180
 181            let room = self.get_room(room_id, &tx).await?;
 182            let mut canceled_calls_to_user_ids = Vec::new();
 183            // Delete the room if it becomes empty and cancel pending calls.
 184            if room.participants.is_empty() {
 185                canceled_calls_to_user_ids.extend(
 186                    room.pending_participants
 187                        .iter()
 188                        .map(|pending_participant| UserId::from_proto(pending_participant.user_id)),
 189                );
 190                room_participant::Entity::delete_many()
 191                    .filter(room_participant::Column::RoomId.eq(room_id))
 192                    .exec(&*tx)
 193                    .await?;
 194                room::Entity::delete_by_id(room_id).exec(&*tx).await?;
 195            }
 196
 197            Ok((
 198                room_id,
 199                RefreshedRoom {
 200                    room,
 201                    stale_participant_user_ids,
 202                    canceled_calls_to_user_ids,
 203                },
 204            ))
 205        })
 206        .await
 207    }
 208
 209    pub async fn delete_stale_servers(
 210        &self,
 211        environment: &str,
 212        new_server_id: ServerId,
 213    ) -> Result<()> {
 214        self.transaction(|tx| async move {
 215            server::Entity::delete_many()
 216                .filter(
 217                    Condition::all()
 218                        .add(server::Column::Environment.eq(environment))
 219                        .add(server::Column::Id.ne(new_server_id)),
 220                )
 221                .exec(&*tx)
 222                .await?;
 223            Ok(())
 224        })
 225        .await
 226    }
 227
 228    async fn stale_server_ids(
 229        &self,
 230        environment: &str,
 231        new_server_id: ServerId,
 232        tx: &DatabaseTransaction,
 233    ) -> Result<Vec<ServerId>> {
 234        let stale_servers = server::Entity::find()
 235            .filter(
 236                Condition::all()
 237                    .add(server::Column::Environment.eq(environment))
 238                    .add(server::Column::Id.ne(new_server_id)),
 239            )
 240            .all(&*tx)
 241            .await?;
 242        Ok(stale_servers.into_iter().map(|server| server.id).collect())
 243    }
 244
 245    // users
 246
 247    pub async fn create_user(
 248        &self,
 249        email_address: &str,
 250        admin: bool,
 251        params: NewUserParams,
 252    ) -> Result<NewUserResult> {
 253        self.transaction(|tx| async {
 254            let tx = tx;
 255            let user = user::Entity::insert(user::ActiveModel {
 256                email_address: ActiveValue::set(Some(email_address.into())),
 257                github_login: ActiveValue::set(params.github_login.clone()),
 258                github_user_id: ActiveValue::set(Some(params.github_user_id)),
 259                admin: ActiveValue::set(admin),
 260                metrics_id: ActiveValue::set(Uuid::new_v4()),
 261                ..Default::default()
 262            })
 263            .on_conflict(
 264                OnConflict::column(user::Column::GithubLogin)
 265                    .update_column(user::Column::GithubLogin)
 266                    .to_owned(),
 267            )
 268            .exec_with_returning(&*tx)
 269            .await?;
 270
 271            Ok(NewUserResult {
 272                user_id: user.id,
 273                metrics_id: user.metrics_id.to_string(),
 274                signup_device_id: None,
 275                inviting_user_id: None,
 276            })
 277        })
 278        .await
 279    }
 280
 281    pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
 282        self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
 283            .await
 284    }
 285
 286    pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
 287        self.transaction(|tx| async {
 288            let tx = tx;
 289            Ok(user::Entity::find()
 290                .filter(user::Column::Id.is_in(ids.iter().copied()))
 291                .all(&*tx)
 292                .await?)
 293        })
 294        .await
 295    }
 296
 297    pub async fn get_user_by_github_account(
 298        &self,
 299        github_login: &str,
 300        github_user_id: Option<i32>,
 301    ) -> Result<Option<User>> {
 302        self.transaction(|tx| async move {
 303            let tx = &*tx;
 304            if let Some(github_user_id) = github_user_id {
 305                if let Some(user_by_github_user_id) = user::Entity::find()
 306                    .filter(user::Column::GithubUserId.eq(github_user_id))
 307                    .one(tx)
 308                    .await?
 309                {
 310                    let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
 311                    user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
 312                    Ok(Some(user_by_github_user_id.update(tx).await?))
 313                } else if let Some(user_by_github_login) = user::Entity::find()
 314                    .filter(user::Column::GithubLogin.eq(github_login))
 315                    .one(tx)
 316                    .await?
 317                {
 318                    let mut user_by_github_login = user_by_github_login.into_active_model();
 319                    user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
 320                    Ok(Some(user_by_github_login.update(tx).await?))
 321                } else {
 322                    Ok(None)
 323                }
 324            } else {
 325                Ok(user::Entity::find()
 326                    .filter(user::Column::GithubLogin.eq(github_login))
 327                    .one(tx)
 328                    .await?)
 329            }
 330        })
 331        .await
 332    }
 333
 334    pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
 335        self.transaction(|tx| async move {
 336            Ok(user::Entity::find()
 337                .order_by_asc(user::Column::GithubLogin)
 338                .limit(limit as u64)
 339                .offset(page as u64 * limit as u64)
 340                .all(&*tx)
 341                .await?)
 342        })
 343        .await
 344    }
 345
 346    pub async fn get_users_with_no_invites(
 347        &self,
 348        invited_by_another_user: bool,
 349    ) -> Result<Vec<User>> {
 350        self.transaction(|tx| async move {
 351            Ok(user::Entity::find()
 352                .filter(
 353                    user::Column::InviteCount
 354                        .eq(0)
 355                        .and(if invited_by_another_user {
 356                            user::Column::InviterId.is_not_null()
 357                        } else {
 358                            user::Column::InviterId.is_null()
 359                        }),
 360                )
 361                .all(&*tx)
 362                .await?)
 363        })
 364        .await
 365    }
 366
 367    pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
 368        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 369        enum QueryAs {
 370            MetricsId,
 371        }
 372
 373        self.transaction(|tx| async move {
 374            let metrics_id: Uuid = user::Entity::find_by_id(id)
 375                .select_only()
 376                .column(user::Column::MetricsId)
 377                .into_values::<_, QueryAs>()
 378                .one(&*tx)
 379                .await?
 380                .ok_or_else(|| anyhow!("could not find user"))?;
 381            Ok(metrics_id.to_string())
 382        })
 383        .await
 384    }
 385
 386    pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
 387        self.transaction(|tx| async move {
 388            user::Entity::update_many()
 389                .filter(user::Column::Id.eq(id))
 390                .set(user::ActiveModel {
 391                    admin: ActiveValue::set(is_admin),
 392                    ..Default::default()
 393                })
 394                .exec(&*tx)
 395                .await?;
 396            Ok(())
 397        })
 398        .await
 399    }
 400
 401    pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
 402        self.transaction(|tx| async move {
 403            user::Entity::update_many()
 404                .filter(user::Column::Id.eq(id))
 405                .set(user::ActiveModel {
 406                    connected_once: ActiveValue::set(connected_once),
 407                    ..Default::default()
 408                })
 409                .exec(&*tx)
 410                .await?;
 411            Ok(())
 412        })
 413        .await
 414    }
 415
 416    pub async fn destroy_user(&self, id: UserId) -> Result<()> {
 417        self.transaction(|tx| async move {
 418            access_token::Entity::delete_many()
 419                .filter(access_token::Column::UserId.eq(id))
 420                .exec(&*tx)
 421                .await?;
 422            user::Entity::delete_by_id(id).exec(&*tx).await?;
 423            Ok(())
 424        })
 425        .await
 426    }
 427
 428    // contacts
 429
 430    pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
 431        #[derive(Debug, FromQueryResult)]
 432        struct ContactWithUserBusyStatuses {
 433            user_id_a: UserId,
 434            user_id_b: UserId,
 435            a_to_b: bool,
 436            accepted: bool,
 437            should_notify: bool,
 438            user_a_busy: bool,
 439            user_b_busy: bool,
 440        }
 441
 442        self.transaction(|tx| async move {
 443            let user_a_participant = Alias::new("user_a_participant");
 444            let user_b_participant = Alias::new("user_b_participant");
 445            let mut db_contacts = contact::Entity::find()
 446                .column_as(
 447                    Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
 448                        .is_not_null(),
 449                    "user_a_busy",
 450                )
 451                .column_as(
 452                    Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
 453                        .is_not_null(),
 454                    "user_b_busy",
 455                )
 456                .filter(
 457                    contact::Column::UserIdA
 458                        .eq(user_id)
 459                        .or(contact::Column::UserIdB.eq(user_id)),
 460                )
 461                .join_as(
 462                    JoinType::LeftJoin,
 463                    contact::Relation::UserARoomParticipant.def(),
 464                    user_a_participant,
 465                )
 466                .join_as(
 467                    JoinType::LeftJoin,
 468                    contact::Relation::UserBRoomParticipant.def(),
 469                    user_b_participant,
 470                )
 471                .into_model::<ContactWithUserBusyStatuses>()
 472                .stream(&*tx)
 473                .await?;
 474
 475            let mut contacts = Vec::new();
 476            while let Some(db_contact) = db_contacts.next().await {
 477                let db_contact = db_contact?;
 478                if db_contact.user_id_a == user_id {
 479                    if db_contact.accepted {
 480                        contacts.push(Contact::Accepted {
 481                            user_id: db_contact.user_id_b,
 482                            should_notify: db_contact.should_notify && db_contact.a_to_b,
 483                            busy: db_contact.user_b_busy,
 484                        });
 485                    } else if db_contact.a_to_b {
 486                        contacts.push(Contact::Outgoing {
 487                            user_id: db_contact.user_id_b,
 488                        })
 489                    } else {
 490                        contacts.push(Contact::Incoming {
 491                            user_id: db_contact.user_id_b,
 492                            should_notify: db_contact.should_notify,
 493                        });
 494                    }
 495                } else if db_contact.accepted {
 496                    contacts.push(Contact::Accepted {
 497                        user_id: db_contact.user_id_a,
 498                        should_notify: db_contact.should_notify && !db_contact.a_to_b,
 499                        busy: db_contact.user_a_busy,
 500                    });
 501                } else if db_contact.a_to_b {
 502                    contacts.push(Contact::Incoming {
 503                        user_id: db_contact.user_id_a,
 504                        should_notify: db_contact.should_notify,
 505                    });
 506                } else {
 507                    contacts.push(Contact::Outgoing {
 508                        user_id: db_contact.user_id_a,
 509                    });
 510                }
 511            }
 512
 513            contacts.sort_unstable_by_key(|contact| contact.user_id());
 514
 515            Ok(contacts)
 516        })
 517        .await
 518    }
 519
 520    pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
 521        self.transaction(|tx| async move {
 522            let participant = room_participant::Entity::find()
 523                .filter(room_participant::Column::UserId.eq(user_id))
 524                .one(&*tx)
 525                .await?;
 526            Ok(participant.is_some())
 527        })
 528        .await
 529    }
 530
 531    pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
 532        self.transaction(|tx| async move {
 533            let (id_a, id_b) = if user_id_1 < user_id_2 {
 534                (user_id_1, user_id_2)
 535            } else {
 536                (user_id_2, user_id_1)
 537            };
 538
 539            Ok(contact::Entity::find()
 540                .filter(
 541                    contact::Column::UserIdA
 542                        .eq(id_a)
 543                        .and(contact::Column::UserIdB.eq(id_b))
 544                        .and(contact::Column::Accepted.eq(true)),
 545                )
 546                .one(&*tx)
 547                .await?
 548                .is_some())
 549        })
 550        .await
 551    }
 552
 553    pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
 554        self.transaction(|tx| async move {
 555            let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
 556                (sender_id, receiver_id, true)
 557            } else {
 558                (receiver_id, sender_id, false)
 559            };
 560
 561            let rows_affected = contact::Entity::insert(contact::ActiveModel {
 562                user_id_a: ActiveValue::set(id_a),
 563                user_id_b: ActiveValue::set(id_b),
 564                a_to_b: ActiveValue::set(a_to_b),
 565                accepted: ActiveValue::set(false),
 566                should_notify: ActiveValue::set(true),
 567                ..Default::default()
 568            })
 569            .on_conflict(
 570                OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
 571                    .values([
 572                        (contact::Column::Accepted, true.into()),
 573                        (contact::Column::ShouldNotify, false.into()),
 574                    ])
 575                    .action_and_where(
 576                        contact::Column::Accepted.eq(false).and(
 577                            contact::Column::AToB
 578                                .eq(a_to_b)
 579                                .and(contact::Column::UserIdA.eq(id_b))
 580                                .or(contact::Column::AToB
 581                                    .ne(a_to_b)
 582                                    .and(contact::Column::UserIdA.eq(id_a))),
 583                        ),
 584                    )
 585                    .to_owned(),
 586            )
 587            .exec_without_returning(&*tx)
 588            .await?;
 589
 590            if rows_affected == 1 {
 591                Ok(())
 592            } else {
 593                Err(anyhow!("contact already requested"))?
 594            }
 595        })
 596        .await
 597    }
 598
 599    /// Returns a bool indicating whether the removed contact had originally accepted or not
 600    ///
 601    /// Deletes the contact identified by the requester and responder ids, and then returns
 602    /// whether the deleted contact had originally accepted or was a pending contact request.
 603    ///
 604    /// # Arguments
 605    ///
 606    /// * `requester_id` - The user that initiates this request
 607    /// * `responder_id` - The user that will be removed
 608    pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<bool> {
 609        self.transaction(|tx| async move {
 610            let (id_a, id_b) = if responder_id < requester_id {
 611                (responder_id, requester_id)
 612            } else {
 613                (requester_id, responder_id)
 614            };
 615
 616            let contact = contact::Entity::find()
 617                .filter(
 618                    contact::Column::UserIdA
 619                        .eq(id_a)
 620                        .and(contact::Column::UserIdB.eq(id_b)),
 621                )
 622                .one(&*tx)
 623                .await?
 624                .ok_or_else(|| anyhow!("no such contact"))?;
 625
 626            contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
 627            Ok(contact.accepted)
 628        })
 629        .await
 630    }
 631
 632    pub async fn dismiss_contact_notification(
 633        &self,
 634        user_id: UserId,
 635        contact_user_id: UserId,
 636    ) -> Result<()> {
 637        self.transaction(|tx| async move {
 638            let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
 639                (user_id, contact_user_id, true)
 640            } else {
 641                (contact_user_id, user_id, false)
 642            };
 643
 644            let result = contact::Entity::update_many()
 645                .set(contact::ActiveModel {
 646                    should_notify: ActiveValue::set(false),
 647                    ..Default::default()
 648                })
 649                .filter(
 650                    contact::Column::UserIdA
 651                        .eq(id_a)
 652                        .and(contact::Column::UserIdB.eq(id_b))
 653                        .and(
 654                            contact::Column::AToB
 655                                .eq(a_to_b)
 656                                .and(contact::Column::Accepted.eq(true))
 657                                .or(contact::Column::AToB
 658                                    .ne(a_to_b)
 659                                    .and(contact::Column::Accepted.eq(false))),
 660                        ),
 661                )
 662                .exec(&*tx)
 663                .await?;
 664            if result.rows_affected == 0 {
 665                Err(anyhow!("no such contact request"))?
 666            } else {
 667                Ok(())
 668            }
 669        })
 670        .await
 671    }
 672
 673    pub async fn respond_to_contact_request(
 674        &self,
 675        responder_id: UserId,
 676        requester_id: UserId,
 677        accept: bool,
 678    ) -> Result<()> {
 679        self.transaction(|tx| async move {
 680            let (id_a, id_b, a_to_b) = if responder_id < requester_id {
 681                (responder_id, requester_id, false)
 682            } else {
 683                (requester_id, responder_id, true)
 684            };
 685            let rows_affected = if accept {
 686                let result = contact::Entity::update_many()
 687                    .set(contact::ActiveModel {
 688                        accepted: ActiveValue::set(true),
 689                        should_notify: ActiveValue::set(true),
 690                        ..Default::default()
 691                    })
 692                    .filter(
 693                        contact::Column::UserIdA
 694                            .eq(id_a)
 695                            .and(contact::Column::UserIdB.eq(id_b))
 696                            .and(contact::Column::AToB.eq(a_to_b)),
 697                    )
 698                    .exec(&*tx)
 699                    .await?;
 700                result.rows_affected
 701            } else {
 702                let result = contact::Entity::delete_many()
 703                    .filter(
 704                        contact::Column::UserIdA
 705                            .eq(id_a)
 706                            .and(contact::Column::UserIdB.eq(id_b))
 707                            .and(contact::Column::AToB.eq(a_to_b))
 708                            .and(contact::Column::Accepted.eq(false)),
 709                    )
 710                    .exec(&*tx)
 711                    .await?;
 712
 713                result.rows_affected
 714            };
 715
 716            if rows_affected == 1 {
 717                Ok(())
 718            } else {
 719                Err(anyhow!("no such contact request"))?
 720            }
 721        })
 722        .await
 723    }
 724
 725    pub fn fuzzy_like_string(string: &str) -> String {
 726        let mut result = String::with_capacity(string.len() * 2 + 1);
 727        for c in string.chars() {
 728            if c.is_alphanumeric() {
 729                result.push('%');
 730                result.push(c);
 731            }
 732        }
 733        result.push('%');
 734        result
 735    }
 736
 737    pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
 738        self.transaction(|tx| async {
 739            let tx = tx;
 740            let like_string = Self::fuzzy_like_string(name_query);
 741            let query = "
 742                SELECT users.*
 743                FROM users
 744                WHERE github_login ILIKE $1
 745                ORDER BY github_login <-> $2
 746                LIMIT $3
 747            ";
 748
 749            Ok(user::Entity::find()
 750                .from_raw_sql(Statement::from_sql_and_values(
 751                    self.pool.get_database_backend(),
 752                    query.into(),
 753                    vec![like_string.into(), name_query.into(), limit.into()],
 754                ))
 755                .all(&*tx)
 756                .await?)
 757        })
 758        .await
 759    }
 760
 761    // signups
 762
 763    pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
 764        self.transaction(|tx| async move {
 765            signup::Entity::insert(signup::ActiveModel {
 766                email_address: ActiveValue::set(signup.email_address.clone()),
 767                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 768                email_confirmation_sent: ActiveValue::set(false),
 769                platform_mac: ActiveValue::set(signup.platform_mac),
 770                platform_windows: ActiveValue::set(signup.platform_windows),
 771                platform_linux: ActiveValue::set(signup.platform_linux),
 772                platform_unknown: ActiveValue::set(false),
 773                editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
 774                programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
 775                device_id: ActiveValue::set(signup.device_id.clone()),
 776                added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
 777                ..Default::default()
 778            })
 779            .on_conflict(
 780                OnConflict::column(signup::Column::EmailAddress)
 781                    .update_columns([
 782                        signup::Column::PlatformMac,
 783                        signup::Column::PlatformWindows,
 784                        signup::Column::PlatformLinux,
 785                        signup::Column::EditorFeatures,
 786                        signup::Column::ProgrammingLanguages,
 787                        signup::Column::DeviceId,
 788                        signup::Column::AddedToMailingList,
 789                    ])
 790                    .to_owned(),
 791            )
 792            .exec(&*tx)
 793            .await?;
 794            Ok(())
 795        })
 796        .await
 797    }
 798
 799    pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
 800        self.transaction(|tx| async move {
 801            let signup = signup::Entity::find()
 802                .filter(signup::Column::EmailAddress.eq(email_address))
 803                .one(&*tx)
 804                .await?
 805                .ok_or_else(|| {
 806                    anyhow!("signup with email address {} doesn't exist", email_address)
 807                })?;
 808
 809            Ok(signup)
 810        })
 811        .await
 812    }
 813
 814    pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
 815        self.transaction(|tx| async move {
 816            let query = "
 817                SELECT
 818                    COUNT(*) as count,
 819                    COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
 820                    COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
 821                    COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
 822                    COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
 823                FROM (
 824                    SELECT *
 825                    FROM signups
 826                    WHERE
 827                        NOT email_confirmation_sent
 828                ) AS unsent
 829            ";
 830            Ok(
 831                WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
 832                    self.pool.get_database_backend(),
 833                    query.into(),
 834                    vec![],
 835                ))
 836                .one(&*tx)
 837                .await?
 838                .ok_or_else(|| anyhow!("invalid result"))?,
 839            )
 840        })
 841        .await
 842    }
 843
 844    pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
 845        let emails = invites
 846            .iter()
 847            .map(|s| s.email_address.as_str())
 848            .collect::<Vec<_>>();
 849        self.transaction(|tx| async {
 850            let tx = tx;
 851            signup::Entity::update_many()
 852                .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
 853                .set(signup::ActiveModel {
 854                    email_confirmation_sent: ActiveValue::set(true),
 855                    ..Default::default()
 856                })
 857                .exec(&*tx)
 858                .await?;
 859            Ok(())
 860        })
 861        .await
 862    }
 863
 864    pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
 865        self.transaction(|tx| async move {
 866            Ok(signup::Entity::find()
 867                .select_only()
 868                .column(signup::Column::EmailAddress)
 869                .column(signup::Column::EmailConfirmationCode)
 870                .filter(
 871                    signup::Column::EmailConfirmationSent.eq(false).and(
 872                        signup::Column::PlatformMac
 873                            .eq(true)
 874                            .or(signup::Column::PlatformUnknown.eq(true)),
 875                    ),
 876                )
 877                .order_by_asc(signup::Column::CreatedAt)
 878                .limit(count as u64)
 879                .into_model()
 880                .all(&*tx)
 881                .await?)
 882        })
 883        .await
 884    }
 885
 886    // invite codes
 887
 888    pub async fn create_invite_from_code(
 889        &self,
 890        code: &str,
 891        email_address: &str,
 892        device_id: Option<&str>,
 893        added_to_mailing_list: bool,
 894    ) -> Result<Invite> {
 895        self.transaction(|tx| async move {
 896            let existing_user = user::Entity::find()
 897                .filter(user::Column::EmailAddress.eq(email_address))
 898                .one(&*tx)
 899                .await?;
 900
 901            if existing_user.is_some() {
 902                Err(anyhow!("email address is already in use"))?;
 903            }
 904
 905            let inviting_user_with_invites = match user::Entity::find()
 906                .filter(
 907                    user::Column::InviteCode
 908                        .eq(code)
 909                        .and(user::Column::InviteCount.gt(0)),
 910                )
 911                .one(&*tx)
 912                .await?
 913            {
 914                Some(inviting_user) => inviting_user,
 915                None => {
 916                    return Err(Error::Http(
 917                        StatusCode::UNAUTHORIZED,
 918                        "unable to find an invite code with invites remaining".to_string(),
 919                    ))?
 920                }
 921            };
 922            user::Entity::update_many()
 923                .filter(
 924                    user::Column::Id
 925                        .eq(inviting_user_with_invites.id)
 926                        .and(user::Column::InviteCount.gt(0)),
 927                )
 928                .col_expr(
 929                    user::Column::InviteCount,
 930                    Expr::col(user::Column::InviteCount).sub(1),
 931                )
 932                .exec(&*tx)
 933                .await?;
 934
 935            let signup = signup::Entity::insert(signup::ActiveModel {
 936                email_address: ActiveValue::set(email_address.into()),
 937                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 938                email_confirmation_sent: ActiveValue::set(false),
 939                inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
 940                platform_linux: ActiveValue::set(false),
 941                platform_mac: ActiveValue::set(false),
 942                platform_windows: ActiveValue::set(false),
 943                platform_unknown: ActiveValue::set(true),
 944                device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
 945                added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
 946                ..Default::default()
 947            })
 948            .on_conflict(
 949                OnConflict::column(signup::Column::EmailAddress)
 950                    .update_column(signup::Column::InvitingUserId)
 951                    .to_owned(),
 952            )
 953            .exec_with_returning(&*tx)
 954            .await?;
 955
 956            Ok(Invite {
 957                email_address: signup.email_address,
 958                email_confirmation_code: signup.email_confirmation_code,
 959            })
 960        })
 961        .await
 962    }
 963
 964    pub async fn create_user_from_invite(
 965        &self,
 966        invite: &Invite,
 967        user: NewUserParams,
 968    ) -> Result<Option<NewUserResult>> {
 969        self.transaction(|tx| async {
 970            let tx = tx;
 971            let signup = signup::Entity::find()
 972                .filter(
 973                    signup::Column::EmailAddress
 974                        .eq(invite.email_address.as_str())
 975                        .and(
 976                            signup::Column::EmailConfirmationCode
 977                                .eq(invite.email_confirmation_code.as_str()),
 978                        ),
 979                )
 980                .one(&*tx)
 981                .await?
 982                .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
 983
 984            if signup.user_id.is_some() {
 985                return Ok(None);
 986            }
 987
 988            let user = user::Entity::insert(user::ActiveModel {
 989                email_address: ActiveValue::set(Some(invite.email_address.clone())),
 990                github_login: ActiveValue::set(user.github_login.clone()),
 991                github_user_id: ActiveValue::set(Some(user.github_user_id)),
 992                admin: ActiveValue::set(false),
 993                invite_count: ActiveValue::set(user.invite_count),
 994                invite_code: ActiveValue::set(Some(random_invite_code())),
 995                metrics_id: ActiveValue::set(Uuid::new_v4()),
 996                ..Default::default()
 997            })
 998            .on_conflict(
 999                OnConflict::column(user::Column::GithubLogin)
1000                    .update_columns([
1001                        user::Column::EmailAddress,
1002                        user::Column::GithubUserId,
1003                        user::Column::Admin,
1004                    ])
1005                    .to_owned(),
1006            )
1007            .exec_with_returning(&*tx)
1008            .await?;
1009
1010            let mut signup = signup.into_active_model();
1011            signup.user_id = ActiveValue::set(Some(user.id));
1012            let signup = signup.update(&*tx).await?;
1013
1014            if let Some(inviting_user_id) = signup.inviting_user_id {
1015                let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1016                    (inviting_user_id, user.id, true)
1017                } else {
1018                    (user.id, inviting_user_id, false)
1019                };
1020
1021                contact::Entity::insert(contact::ActiveModel {
1022                    user_id_a: ActiveValue::set(user_id_a),
1023                    user_id_b: ActiveValue::set(user_id_b),
1024                    a_to_b: ActiveValue::set(a_to_b),
1025                    should_notify: ActiveValue::set(true),
1026                    accepted: ActiveValue::set(true),
1027                    ..Default::default()
1028                })
1029                .on_conflict(OnConflict::new().do_nothing().to_owned())
1030                .exec_without_returning(&*tx)
1031                .await?;
1032            }
1033
1034            Ok(Some(NewUserResult {
1035                user_id: user.id,
1036                metrics_id: user.metrics_id.to_string(),
1037                inviting_user_id: signup.inviting_user_id,
1038                signup_device_id: signup.device_id,
1039            }))
1040        })
1041        .await
1042    }
1043
1044    pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1045        self.transaction(|tx| async move {
1046            if count > 0 {
1047                user::Entity::update_many()
1048                    .filter(
1049                        user::Column::Id
1050                            .eq(id)
1051                            .and(user::Column::InviteCode.is_null()),
1052                    )
1053                    .set(user::ActiveModel {
1054                        invite_code: ActiveValue::set(Some(random_invite_code())),
1055                        ..Default::default()
1056                    })
1057                    .exec(&*tx)
1058                    .await?;
1059            }
1060
1061            user::Entity::update_many()
1062                .filter(user::Column::Id.eq(id))
1063                .set(user::ActiveModel {
1064                    invite_count: ActiveValue::set(count),
1065                    ..Default::default()
1066                })
1067                .exec(&*tx)
1068                .await?;
1069            Ok(())
1070        })
1071        .await
1072    }
1073
1074    pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1075        self.transaction(|tx| async move {
1076            match user::Entity::find_by_id(id).one(&*tx).await? {
1077                Some(user) if user.invite_code.is_some() => {
1078                    Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1079                }
1080                _ => Ok(None),
1081            }
1082        })
1083        .await
1084    }
1085
1086    pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1087        self.transaction(|tx| async move {
1088            user::Entity::find()
1089                .filter(user::Column::InviteCode.eq(code))
1090                .one(&*tx)
1091                .await?
1092                .ok_or_else(|| {
1093                    Error::Http(
1094                        StatusCode::NOT_FOUND,
1095                        "that invite code does not exist".to_string(),
1096                    )
1097                })
1098        })
1099        .await
1100    }
1101
1102    // rooms
1103
1104    pub async fn incoming_call_for_user(
1105        &self,
1106        user_id: UserId,
1107    ) -> Result<Option<proto::IncomingCall>> {
1108        self.transaction(|tx| async move {
1109            let pending_participant = room_participant::Entity::find()
1110                .filter(
1111                    room_participant::Column::UserId
1112                        .eq(user_id)
1113                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1114                )
1115                .one(&*tx)
1116                .await?;
1117
1118            if let Some(pending_participant) = pending_participant {
1119                let room = self.get_room(pending_participant.room_id, &tx).await?;
1120                Ok(Self::build_incoming_call(&room, user_id))
1121            } else {
1122                Ok(None)
1123            }
1124        })
1125        .await
1126    }
1127
1128    pub async fn create_room(
1129        &self,
1130        user_id: UserId,
1131        connection: ConnectionId,
1132        live_kit_room: &str,
1133    ) -> Result<RoomGuard<proto::Room>> {
1134        self.room_transaction(|tx| async move {
1135            let room = room::ActiveModel {
1136                live_kit_room: ActiveValue::set(live_kit_room.into()),
1137                ..Default::default()
1138            }
1139            .insert(&*tx)
1140            .await?;
1141            let room_id = room.id;
1142
1143            room_participant::ActiveModel {
1144                room_id: ActiveValue::set(room_id),
1145                user_id: ActiveValue::set(user_id),
1146                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1147                answering_connection_server_id: ActiveValue::set(Some(ServerId(
1148                    connection.owner_id as i32,
1149                ))),
1150                answering_connection_lost: ActiveValue::set(false),
1151                calling_user_id: ActiveValue::set(user_id),
1152                calling_connection_id: ActiveValue::set(connection.id as i32),
1153                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1154                    connection.owner_id as i32,
1155                ))),
1156                ..Default::default()
1157            }
1158            .insert(&*tx)
1159            .await?;
1160
1161            let room = self.get_room(room_id, &tx).await?;
1162            Ok((room_id, room))
1163        })
1164        .await
1165    }
1166
1167    pub async fn call(
1168        &self,
1169        room_id: RoomId,
1170        calling_user_id: UserId,
1171        calling_connection: ConnectionId,
1172        called_user_id: UserId,
1173        initial_project_id: Option<ProjectId>,
1174    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1175        self.room_transaction(|tx| async move {
1176            room_participant::ActiveModel {
1177                room_id: ActiveValue::set(room_id),
1178                user_id: ActiveValue::set(called_user_id),
1179                answering_connection_lost: ActiveValue::set(false),
1180                calling_user_id: ActiveValue::set(calling_user_id),
1181                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1182                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1183                    calling_connection.owner_id as i32,
1184                ))),
1185                initial_project_id: ActiveValue::set(initial_project_id),
1186                ..Default::default()
1187            }
1188            .insert(&*tx)
1189            .await?;
1190
1191            let room = self.get_room(room_id, &tx).await?;
1192            let incoming_call = Self::build_incoming_call(&room, called_user_id)
1193                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1194            Ok((room_id, (room, incoming_call)))
1195        })
1196        .await
1197    }
1198
1199    pub async fn call_failed(
1200        &self,
1201        room_id: RoomId,
1202        called_user_id: UserId,
1203    ) -> Result<RoomGuard<proto::Room>> {
1204        self.room_transaction(|tx| async move {
1205            room_participant::Entity::delete_many()
1206                .filter(
1207                    room_participant::Column::RoomId
1208                        .eq(room_id)
1209                        .and(room_participant::Column::UserId.eq(called_user_id)),
1210                )
1211                .exec(&*tx)
1212                .await?;
1213            let room = self.get_room(room_id, &tx).await?;
1214            Ok((room_id, room))
1215        })
1216        .await
1217    }
1218
1219    pub async fn decline_call(
1220        &self,
1221        expected_room_id: Option<RoomId>,
1222        user_id: UserId,
1223    ) -> Result<Option<RoomGuard<proto::Room>>> {
1224        self.optional_room_transaction(|tx| async move {
1225            let mut filter = Condition::all()
1226                .add(room_participant::Column::UserId.eq(user_id))
1227                .add(room_participant::Column::AnsweringConnectionId.is_null());
1228            if let Some(room_id) = expected_room_id {
1229                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1230            }
1231            let participant = room_participant::Entity::find()
1232                .filter(filter)
1233                .one(&*tx)
1234                .await?;
1235
1236            let participant = if let Some(participant) = participant {
1237                participant
1238            } else if expected_room_id.is_some() {
1239                return Err(anyhow!("could not find call to decline"))?;
1240            } else {
1241                return Ok(None);
1242            };
1243
1244            let room_id = participant.room_id;
1245            room_participant::Entity::delete(participant.into_active_model())
1246                .exec(&*tx)
1247                .await?;
1248
1249            let room = self.get_room(room_id, &tx).await?;
1250            Ok(Some((room_id, room)))
1251        })
1252        .await
1253    }
1254
1255    pub async fn cancel_call(
1256        &self,
1257        room_id: RoomId,
1258        calling_connection: ConnectionId,
1259        called_user_id: UserId,
1260    ) -> Result<RoomGuard<proto::Room>> {
1261        self.room_transaction(|tx| async move {
1262            let participant = room_participant::Entity::find()
1263                .filter(
1264                    Condition::all()
1265                        .add(room_participant::Column::UserId.eq(called_user_id))
1266                        .add(room_participant::Column::RoomId.eq(room_id))
1267                        .add(
1268                            room_participant::Column::CallingConnectionId
1269                                .eq(calling_connection.id as i32),
1270                        )
1271                        .add(
1272                            room_participant::Column::CallingConnectionServerId
1273                                .eq(calling_connection.owner_id as i32),
1274                        )
1275                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1276                )
1277                .one(&*tx)
1278                .await?
1279                .ok_or_else(|| anyhow!("no call to cancel"))?;
1280            let room_id = participant.room_id;
1281
1282            room_participant::Entity::delete(participant.into_active_model())
1283                .exec(&*tx)
1284                .await?;
1285
1286            let room = self.get_room(room_id, &tx).await?;
1287            Ok((room_id, room))
1288        })
1289        .await
1290    }
1291
1292    pub async fn join_room(
1293        &self,
1294        room_id: RoomId,
1295        user_id: UserId,
1296        connection: ConnectionId,
1297    ) -> Result<RoomGuard<proto::Room>> {
1298        self.room_transaction(|tx| async move {
1299            let result = room_participant::Entity::update_many()
1300                .filter(
1301                    Condition::all()
1302                        .add(room_participant::Column::RoomId.eq(room_id))
1303                        .add(room_participant::Column::UserId.eq(user_id))
1304                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1305                )
1306                .set(room_participant::ActiveModel {
1307                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1308                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1309                        connection.owner_id as i32,
1310                    ))),
1311                    answering_connection_lost: ActiveValue::set(false),
1312                    ..Default::default()
1313                })
1314                .exec(&*tx)
1315                .await?;
1316            if result.rows_affected == 0 {
1317                Err(anyhow!("room does not exist or was already joined"))?
1318            } else {
1319                let room = self.get_room(room_id, &tx).await?;
1320                Ok((room_id, room))
1321            }
1322        })
1323        .await
1324    }
1325
1326    pub async fn rejoin_room(
1327        &self,
1328        rejoin_room: proto::RejoinRoom,
1329        user_id: UserId,
1330        connection: ConnectionId,
1331    ) -> Result<RoomGuard<RejoinedRoom>> {
1332        self.room_transaction(|tx| async {
1333            let tx = tx;
1334            let room_id = RoomId::from_proto(rejoin_room.id);
1335            let participant_update = room_participant::Entity::update_many()
1336                .filter(
1337                    Condition::all()
1338                        .add(room_participant::Column::RoomId.eq(room_id))
1339                        .add(room_participant::Column::UserId.eq(user_id))
1340                        .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1341                        .add(
1342                            Condition::any()
1343                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1344                                .add(
1345                                    room_participant::Column::AnsweringConnectionServerId
1346                                        .ne(connection.owner_id as i32),
1347                                ),
1348                        ),
1349                )
1350                .set(room_participant::ActiveModel {
1351                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1352                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1353                        connection.owner_id as i32,
1354                    ))),
1355                    answering_connection_lost: ActiveValue::set(false),
1356                    ..Default::default()
1357                })
1358                .exec(&*tx)
1359                .await?;
1360            if participant_update.rows_affected == 0 {
1361                return Err(anyhow!("room does not exist or was already joined"))?;
1362            }
1363
1364            let mut reshared_projects = Vec::new();
1365            for reshared_project in &rejoin_room.reshared_projects {
1366                let project_id = ProjectId::from_proto(reshared_project.project_id);
1367                let project = project::Entity::find_by_id(project_id)
1368                    .one(&*tx)
1369                    .await?
1370                    .ok_or_else(|| anyhow!("project does not exist"))?;
1371                if project.host_user_id != user_id {
1372                    return Err(anyhow!("no such project"))?;
1373                }
1374
1375                let mut collaborators = project
1376                    .find_related(project_collaborator::Entity)
1377                    .all(&*tx)
1378                    .await?;
1379                let host_ix = collaborators
1380                    .iter()
1381                    .position(|collaborator| {
1382                        collaborator.user_id == user_id && collaborator.is_host
1383                    })
1384                    .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1385                let host = collaborators.swap_remove(host_ix);
1386                let old_connection_id = host.connection();
1387
1388                project::Entity::update(project::ActiveModel {
1389                    host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1390                    host_connection_server_id: ActiveValue::set(Some(ServerId(
1391                        connection.owner_id as i32,
1392                    ))),
1393                    ..project.into_active_model()
1394                })
1395                .exec(&*tx)
1396                .await?;
1397                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1398                    connection_id: ActiveValue::set(connection.id as i32),
1399                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1400                    ..host.into_active_model()
1401                })
1402                .exec(&*tx)
1403                .await?;
1404
1405                self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1406                    .await?;
1407
1408                reshared_projects.push(ResharedProject {
1409                    id: project_id,
1410                    old_connection_id,
1411                    collaborators: collaborators
1412                        .iter()
1413                        .map(|collaborator| ProjectCollaborator {
1414                            connection_id: collaborator.connection(),
1415                            user_id: collaborator.user_id,
1416                            replica_id: collaborator.replica_id,
1417                            is_host: collaborator.is_host,
1418                        })
1419                        .collect(),
1420                    worktrees: reshared_project.worktrees.clone(),
1421                });
1422            }
1423
1424            project::Entity::delete_many()
1425                .filter(
1426                    Condition::all()
1427                        .add(project::Column::RoomId.eq(room_id))
1428                        .add(project::Column::HostUserId.eq(user_id))
1429                        .add(
1430                            project::Column::Id
1431                                .is_not_in(reshared_projects.iter().map(|project| project.id)),
1432                        ),
1433                )
1434                .exec(&*tx)
1435                .await?;
1436
1437            let mut rejoined_projects = Vec::new();
1438            for rejoined_project in &rejoin_room.rejoined_projects {
1439                let project_id = ProjectId::from_proto(rejoined_project.id);
1440                let Some(project) = project::Entity::find_by_id(project_id)
1441                    .one(&*tx)
1442                    .await? else { continue };
1443
1444                let mut worktrees = Vec::new();
1445                let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1446                for db_worktree in db_worktrees {
1447                    let mut worktree = RejoinedWorktree {
1448                        id: db_worktree.id as u64,
1449                        abs_path: db_worktree.abs_path,
1450                        root_name: db_worktree.root_name,
1451                        visible: db_worktree.visible,
1452                        updated_entries: Default::default(),
1453                        removed_entries: Default::default(),
1454                        diagnostic_summaries: Default::default(),
1455                        scan_id: db_worktree.scan_id as u64,
1456                        completed_scan_id: db_worktree.completed_scan_id as u64,
1457                    };
1458
1459                    let rejoined_worktree = rejoined_project
1460                        .worktrees
1461                        .iter()
1462                        .find(|worktree| worktree.id == db_worktree.id as u64);
1463                    let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1464                        worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1465                    } else {
1466                        worktree_entry::Column::IsDeleted.eq(false)
1467                    };
1468
1469                    let mut db_entries = worktree_entry::Entity::find()
1470                        .filter(
1471                            Condition::all()
1472                                .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1473                                .add(entry_filter),
1474                        )
1475                        .stream(&*tx)
1476                        .await?;
1477
1478                    while let Some(db_entry) = db_entries.next().await {
1479                        let db_entry = db_entry?;
1480                        if db_entry.is_deleted {
1481                            worktree.removed_entries.push(db_entry.id as u64);
1482                        } else {
1483                            worktree.updated_entries.push(proto::Entry {
1484                                id: db_entry.id as u64,
1485                                is_dir: db_entry.is_dir,
1486                                path: db_entry.path,
1487                                inode: db_entry.inode as u64,
1488                                mtime: Some(proto::Timestamp {
1489                                    seconds: db_entry.mtime_seconds as u64,
1490                                    nanos: db_entry.mtime_nanos as u32,
1491                                }),
1492                                is_symlink: db_entry.is_symlink,
1493                                is_ignored: db_entry.is_ignored,
1494                            });
1495                        }
1496                    }
1497
1498                    worktrees.push(worktree);
1499                }
1500
1501                let language_servers = project
1502                    .find_related(language_server::Entity)
1503                    .all(&*tx)
1504                    .await?
1505                    .into_iter()
1506                    .map(|language_server| proto::LanguageServer {
1507                        id: language_server.id as u64,
1508                        name: language_server.name,
1509                    })
1510                    .collect::<Vec<_>>();
1511
1512                let mut collaborators = project
1513                    .find_related(project_collaborator::Entity)
1514                    .all(&*tx)
1515                    .await?;
1516                let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1517                    .iter()
1518                    .position(|collaborator| collaborator.user_id == user_id)
1519                {
1520                    collaborators.swap_remove(self_collaborator_ix)
1521                } else {
1522                    continue;
1523                };
1524                let old_connection_id = self_collaborator.connection();
1525                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1526                    connection_id: ActiveValue::set(connection.id as i32),
1527                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1528                    ..self_collaborator.into_active_model()
1529                })
1530                .exec(&*tx)
1531                .await?;
1532
1533                let collaborators = collaborators
1534                    .into_iter()
1535                    .map(|collaborator| ProjectCollaborator {
1536                        connection_id: collaborator.connection(),
1537                        user_id: collaborator.user_id,
1538                        replica_id: collaborator.replica_id,
1539                        is_host: collaborator.is_host,
1540                    })
1541                    .collect::<Vec<_>>();
1542
1543                rejoined_projects.push(RejoinedProject {
1544                    id: project_id,
1545                    old_connection_id,
1546                    collaborators,
1547                    worktrees,
1548                    language_servers,
1549                });
1550            }
1551
1552            let room = self.get_room(room_id, &tx).await?;
1553            Ok((
1554                room_id,
1555                RejoinedRoom {
1556                    room,
1557                    rejoined_projects,
1558                    reshared_projects,
1559                },
1560            ))
1561        })
1562        .await
1563    }
1564
1565    pub async fn leave_room(
1566        &self,
1567        connection: ConnectionId,
1568    ) -> Result<Option<RoomGuard<LeftRoom>>> {
1569        self.optional_room_transaction(|tx| async move {
1570            let leaving_participant = room_participant::Entity::find()
1571                .filter(
1572                    Condition::all()
1573                        .add(
1574                            room_participant::Column::AnsweringConnectionId
1575                                .eq(connection.id as i32),
1576                        )
1577                        .add(
1578                            room_participant::Column::AnsweringConnectionServerId
1579                                .eq(connection.owner_id as i32),
1580                        ),
1581                )
1582                .one(&*tx)
1583                .await?;
1584
1585            if let Some(leaving_participant) = leaving_participant {
1586                // Leave room.
1587                let room_id = leaving_participant.room_id;
1588                room_participant::Entity::delete_by_id(leaving_participant.id)
1589                    .exec(&*tx)
1590                    .await?;
1591
1592                // Cancel pending calls initiated by the leaving user.
1593                let called_participants = room_participant::Entity::find()
1594                    .filter(
1595                        Condition::all()
1596                            .add(
1597                                room_participant::Column::CallingUserId
1598                                    .eq(leaving_participant.user_id),
1599                            )
1600                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
1601                    )
1602                    .all(&*tx)
1603                    .await?;
1604                room_participant::Entity::delete_many()
1605                    .filter(
1606                        room_participant::Column::Id
1607                            .is_in(called_participants.iter().map(|participant| participant.id)),
1608                    )
1609                    .exec(&*tx)
1610                    .await?;
1611                let canceled_calls_to_user_ids = called_participants
1612                    .into_iter()
1613                    .map(|participant| participant.user_id)
1614                    .collect();
1615
1616                // Detect left projects.
1617                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1618                enum QueryProjectIds {
1619                    ProjectId,
1620                }
1621                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1622                    .select_only()
1623                    .column_as(
1624                        project_collaborator::Column::ProjectId,
1625                        QueryProjectIds::ProjectId,
1626                    )
1627                    .filter(
1628                        Condition::all()
1629                            .add(
1630                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1631                            )
1632                            .add(
1633                                project_collaborator::Column::ConnectionServerId
1634                                    .eq(connection.owner_id as i32),
1635                            ),
1636                    )
1637                    .into_values::<_, QueryProjectIds>()
1638                    .all(&*tx)
1639                    .await?;
1640                let mut left_projects = HashMap::default();
1641                let mut collaborators = project_collaborator::Entity::find()
1642                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1643                    .stream(&*tx)
1644                    .await?;
1645                while let Some(collaborator) = collaborators.next().await {
1646                    let collaborator = collaborator?;
1647                    let left_project =
1648                        left_projects
1649                            .entry(collaborator.project_id)
1650                            .or_insert(LeftProject {
1651                                id: collaborator.project_id,
1652                                host_user_id: Default::default(),
1653                                connection_ids: Default::default(),
1654                                host_connection_id: Default::default(),
1655                            });
1656
1657                    let collaborator_connection_id = collaborator.connection();
1658                    if collaborator_connection_id != connection {
1659                        left_project.connection_ids.push(collaborator_connection_id);
1660                    }
1661
1662                    if collaborator.is_host {
1663                        left_project.host_user_id = collaborator.user_id;
1664                        left_project.host_connection_id = collaborator_connection_id;
1665                    }
1666                }
1667                drop(collaborators);
1668
1669                // Leave projects.
1670                project_collaborator::Entity::delete_many()
1671                    .filter(
1672                        Condition::all()
1673                            .add(
1674                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1675                            )
1676                            .add(
1677                                project_collaborator::Column::ConnectionServerId
1678                                    .eq(connection.owner_id as i32),
1679                            ),
1680                    )
1681                    .exec(&*tx)
1682                    .await?;
1683
1684                // Unshare projects.
1685                project::Entity::delete_many()
1686                    .filter(
1687                        Condition::all()
1688                            .add(project::Column::RoomId.eq(room_id))
1689                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
1690                            .add(
1691                                project::Column::HostConnectionServerId
1692                                    .eq(connection.owner_id as i32),
1693                            ),
1694                    )
1695                    .exec(&*tx)
1696                    .await?;
1697
1698                let room = self.get_room(room_id, &tx).await?;
1699                if room.participants.is_empty() {
1700                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
1701                }
1702
1703                let left_room = LeftRoom {
1704                    room,
1705                    left_projects,
1706                    canceled_calls_to_user_ids,
1707                };
1708
1709                if left_room.room.participants.is_empty() {
1710                    self.rooms.remove(&room_id);
1711                }
1712
1713                Ok(Some((room_id, left_room)))
1714            } else {
1715                Ok(None)
1716            }
1717        })
1718        .await
1719    }
1720
1721    pub async fn follow(
1722        &self,
1723        project_id: ProjectId,
1724        leader_connection: ConnectionId,
1725        follower_connection: ConnectionId,
1726    ) -> Result<RoomGuard<proto::Room>> {
1727        let room_id = self.room_id_for_project(project_id).await?;
1728        self.room_transaction(room_id, |tx| async move {
1729            follower::ActiveModel {
1730                room_id: ActiveValue::set(room_id),
1731                project_id: ActiveValue::set(project_id),
1732                leader_connection_server_id: ActiveValue::set(ServerId(
1733                    leader_connection.owner_id as i32,
1734                )),
1735                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1736                follower_connection_server_id: ActiveValue::set(ServerId(
1737                    follower_connection.owner_id as i32,
1738                )),
1739                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1740                ..Default::default()
1741            }
1742            .insert(&*tx)
1743            .await?;
1744
1745            let room = self.get_room(room_id, &*tx).await?;
1746            Ok(room)
1747        })
1748        .await
1749    }
1750
1751    pub async fn unfollow(
1752        &self,
1753        project_id: ProjectId,
1754        leader_connection: ConnectionId,
1755        follower_connection: ConnectionId,
1756    ) -> Result<RoomGuard<proto::Room>> {
1757        let room_id = self.room_id_for_project(project_id).await?;
1758        self.room_transaction(room_id, |tx| async move {
1759            follower::Entity::delete_many()
1760                .filter(
1761                    Condition::all()
1762                        .add(follower::Column::ProjectId.eq(project_id))
1763                        .add(
1764                            follower::Column::LeaderConnectionServerId
1765                                .eq(leader_connection.owner_id)
1766                                .and(follower::Column::LeaderConnectionId.eq(leader_connection.id)),
1767                        )
1768                        .add(
1769                            follower::Column::FollowerConnectionServerId
1770                                .eq(follower_connection.owner_id)
1771                                .and(
1772                                    follower::Column::FollowerConnectionId
1773                                        .eq(follower_connection.id),
1774                                ),
1775                        ),
1776                )
1777                .exec(&*tx)
1778                .await?;
1779
1780            let room = self.get_room(room_id, &*tx).await?;
1781            Ok(room)
1782        })
1783        .await
1784    }
1785
1786    pub async fn update_room_participant_location(
1787        &self,
1788        room_id: RoomId,
1789        connection: ConnectionId,
1790        location: proto::ParticipantLocation,
1791    ) -> Result<RoomGuard<proto::Room>> {
1792        self.room_transaction(|tx| async {
1793            let tx = tx;
1794            let location_kind;
1795            let location_project_id;
1796            match location
1797                .variant
1798                .as_ref()
1799                .ok_or_else(|| anyhow!("invalid location"))?
1800            {
1801                proto::participant_location::Variant::SharedProject(project) => {
1802                    location_kind = 0;
1803                    location_project_id = Some(ProjectId::from_proto(project.id));
1804                }
1805                proto::participant_location::Variant::UnsharedProject(_) => {
1806                    location_kind = 1;
1807                    location_project_id = None;
1808                }
1809                proto::participant_location::Variant::External(_) => {
1810                    location_kind = 2;
1811                    location_project_id = None;
1812                }
1813            }
1814
1815            let result = room_participant::Entity::update_many()
1816                .filter(
1817                    Condition::all()
1818                        .add(room_participant::Column::RoomId.eq(room_id))
1819                        .add(
1820                            room_participant::Column::AnsweringConnectionId
1821                                .eq(connection.id as i32),
1822                        )
1823                        .add(
1824                            room_participant::Column::AnsweringConnectionServerId
1825                                .eq(connection.owner_id as i32),
1826                        ),
1827                )
1828                .set(room_participant::ActiveModel {
1829                    location_kind: ActiveValue::set(Some(location_kind)),
1830                    location_project_id: ActiveValue::set(location_project_id),
1831                    ..Default::default()
1832                })
1833                .exec(&*tx)
1834                .await?;
1835
1836            if result.rows_affected == 1 {
1837                let room = self.get_room(room_id, &tx).await?;
1838                Ok((room_id, room))
1839            } else {
1840                Err(anyhow!("could not update room participant location"))?
1841            }
1842        })
1843        .await
1844    }
1845
1846    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1847        self.transaction(|tx| async move {
1848            let participant = room_participant::Entity::find()
1849                .filter(
1850                    Condition::all()
1851                        .add(
1852                            room_participant::Column::AnsweringConnectionId
1853                                .eq(connection.id as i32),
1854                        )
1855                        .add(
1856                            room_participant::Column::AnsweringConnectionServerId
1857                                .eq(connection.owner_id as i32),
1858                        ),
1859                )
1860                .one(&*tx)
1861                .await?
1862                .ok_or_else(|| anyhow!("not a participant in any room"))?;
1863
1864            room_participant::Entity::update(room_participant::ActiveModel {
1865                answering_connection_lost: ActiveValue::set(true),
1866                ..participant.into_active_model()
1867            })
1868            .exec(&*tx)
1869            .await?;
1870
1871            Ok(())
1872        })
1873        .await
1874    }
1875
1876    fn build_incoming_call(
1877        room: &proto::Room,
1878        called_user_id: UserId,
1879    ) -> Option<proto::IncomingCall> {
1880        let pending_participant = room
1881            .pending_participants
1882            .iter()
1883            .find(|participant| participant.user_id == called_user_id.to_proto())?;
1884
1885        Some(proto::IncomingCall {
1886            room_id: room.id,
1887            calling_user_id: pending_participant.calling_user_id,
1888            participant_user_ids: room
1889                .participants
1890                .iter()
1891                .map(|participant| participant.user_id)
1892                .collect(),
1893            initial_project: room.participants.iter().find_map(|participant| {
1894                let initial_project_id = pending_participant.initial_project_id?;
1895                participant
1896                    .projects
1897                    .iter()
1898                    .find(|project| project.id == initial_project_id)
1899                    .cloned()
1900            }),
1901        })
1902    }
1903
1904    async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1905        let db_room = room::Entity::find_by_id(room_id)
1906            .one(tx)
1907            .await?
1908            .ok_or_else(|| anyhow!("could not find room"))?;
1909
1910        let mut db_participants = db_room
1911            .find_related(room_participant::Entity)
1912            .stream(tx)
1913            .await?;
1914        let mut participants = HashMap::default();
1915        let mut pending_participants = Vec::new();
1916        while let Some(db_participant) = db_participants.next().await {
1917            let db_participant = db_participant?;
1918            if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
1919                .answering_connection_id
1920                .zip(db_participant.answering_connection_server_id)
1921            {
1922                let location = match (
1923                    db_participant.location_kind,
1924                    db_participant.location_project_id,
1925                ) {
1926                    (Some(0), Some(project_id)) => {
1927                        Some(proto::participant_location::Variant::SharedProject(
1928                            proto::participant_location::SharedProject {
1929                                id: project_id.to_proto(),
1930                            },
1931                        ))
1932                    }
1933                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1934                        Default::default(),
1935                    )),
1936                    _ => Some(proto::participant_location::Variant::External(
1937                        Default::default(),
1938                    )),
1939                };
1940
1941                let answering_connection = ConnectionId {
1942                    owner_id: answering_connection_server_id.0 as u32,
1943                    id: answering_connection_id as u32,
1944                };
1945                participants.insert(
1946                    answering_connection,
1947                    proto::Participant {
1948                        user_id: db_participant.user_id.to_proto(),
1949                        peer_id: Some(answering_connection.into()),
1950                        projects: Default::default(),
1951                        location: Some(proto::ParticipantLocation { variant: location }),
1952                    },
1953                );
1954            } else {
1955                pending_participants.push(proto::PendingParticipant {
1956                    user_id: db_participant.user_id.to_proto(),
1957                    calling_user_id: db_participant.calling_user_id.to_proto(),
1958                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1959                });
1960            }
1961        }
1962        drop(db_participants);
1963
1964        let mut db_projects = db_room
1965            .find_related(project::Entity)
1966            .find_with_related(worktree::Entity)
1967            .stream(tx)
1968            .await?;
1969
1970        while let Some(row) = db_projects.next().await {
1971            let (db_project, db_worktree) = row?;
1972            let host_connection = db_project.host_connection()?;
1973            if let Some(participant) = participants.get_mut(&host_connection) {
1974                let project = if let Some(project) = participant
1975                    .projects
1976                    .iter_mut()
1977                    .find(|project| project.id == db_project.id.to_proto())
1978                {
1979                    project
1980                } else {
1981                    participant.projects.push(proto::ParticipantProject {
1982                        id: db_project.id.to_proto(),
1983                        worktree_root_names: Default::default(),
1984                    });
1985                    participant.projects.last_mut().unwrap()
1986                };
1987
1988                if let Some(db_worktree) = db_worktree {
1989                    if db_worktree.visible {
1990                        project.worktree_root_names.push(db_worktree.root_name);
1991                    }
1992                }
1993            }
1994        }
1995        drop(db_projects);
1996
1997        let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
1998        let mut followers = Vec::new();
1999        while let Some(db_follower) = db_followers.next().await {
2000            let db_follower = db_follower?;
2001            followers.push(proto::Follower {
2002                leader_id: Some(db_follower.leader_connection().into()),
2003                follower_id: Some(db_follower.follower_connection().into()),
2004            });
2005        }
2006
2007        Ok(proto::Room {
2008            id: db_room.id.to_proto(),
2009            live_kit_room: db_room.live_kit_room,
2010            participants: participants.into_values().collect(),
2011            pending_participants,
2012            followers,
2013        })
2014    }
2015
2016    // projects
2017
2018    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
2019        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2020        enum QueryAs {
2021            Count,
2022        }
2023
2024        self.transaction(|tx| async move {
2025            Ok(project::Entity::find()
2026                .select_only()
2027                .column_as(project::Column::Id.count(), QueryAs::Count)
2028                .inner_join(user::Entity)
2029                .filter(user::Column::Admin.eq(false))
2030                .into_values::<_, QueryAs>()
2031                .one(&*tx)
2032                .await?
2033                .unwrap_or(0i64) as usize)
2034        })
2035        .await
2036    }
2037
2038    pub async fn share_project(
2039        &self,
2040        room_id: RoomId,
2041        connection: ConnectionId,
2042        worktrees: &[proto::WorktreeMetadata],
2043    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
2044        self.room_transaction(|tx| async move {
2045            let participant = room_participant::Entity::find()
2046                .filter(
2047                    Condition::all()
2048                        .add(
2049                            room_participant::Column::AnsweringConnectionId
2050                                .eq(connection.id as i32),
2051                        )
2052                        .add(
2053                            room_participant::Column::AnsweringConnectionServerId
2054                                .eq(connection.owner_id as i32),
2055                        ),
2056                )
2057                .one(&*tx)
2058                .await?
2059                .ok_or_else(|| anyhow!("could not find participant"))?;
2060            if participant.room_id != room_id {
2061                return Err(anyhow!("shared project on unexpected room"))?;
2062            }
2063
2064            let project = project::ActiveModel {
2065                room_id: ActiveValue::set(participant.room_id),
2066                host_user_id: ActiveValue::set(participant.user_id),
2067                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
2068                host_connection_server_id: ActiveValue::set(Some(ServerId(
2069                    connection.owner_id as i32,
2070                ))),
2071                ..Default::default()
2072            }
2073            .insert(&*tx)
2074            .await?;
2075
2076            if !worktrees.is_empty() {
2077                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2078                    worktree::ActiveModel {
2079                        id: ActiveValue::set(worktree.id as i64),
2080                        project_id: ActiveValue::set(project.id),
2081                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
2082                        root_name: ActiveValue::set(worktree.root_name.clone()),
2083                        visible: ActiveValue::set(worktree.visible),
2084                        scan_id: ActiveValue::set(0),
2085                        completed_scan_id: ActiveValue::set(0),
2086                    }
2087                }))
2088                .exec(&*tx)
2089                .await?;
2090            }
2091
2092            project_collaborator::ActiveModel {
2093                project_id: ActiveValue::set(project.id),
2094                connection_id: ActiveValue::set(connection.id as i32),
2095                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2096                user_id: ActiveValue::set(participant.user_id),
2097                replica_id: ActiveValue::set(ReplicaId(0)),
2098                is_host: ActiveValue::set(true),
2099                ..Default::default()
2100            }
2101            .insert(&*tx)
2102            .await?;
2103
2104            let room = self.get_room(room_id, &tx).await?;
2105            Ok((room_id, (project.id, room)))
2106        })
2107        .await
2108    }
2109
2110    pub async fn unshare_project(
2111        &self,
2112        project_id: ProjectId,
2113        connection: ConnectionId,
2114    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2115        self.room_transaction(|tx| async move {
2116            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2117
2118            let project = project::Entity::find_by_id(project_id)
2119                .one(&*tx)
2120                .await?
2121                .ok_or_else(|| anyhow!("project not found"))?;
2122            if project.host_connection()? == connection {
2123                let room_id = project.room_id;
2124                project::Entity::delete(project.into_active_model())
2125                    .exec(&*tx)
2126                    .await?;
2127                let room = self.get_room(room_id, &tx).await?;
2128                Ok((room_id, (room, guest_connection_ids)))
2129            } else {
2130                Err(anyhow!("cannot unshare a project hosted by another user"))?
2131            }
2132        })
2133        .await
2134    }
2135
2136    pub async fn update_project(
2137        &self,
2138        project_id: ProjectId,
2139        connection: ConnectionId,
2140        worktrees: &[proto::WorktreeMetadata],
2141    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2142        self.room_transaction(|tx| async move {
2143            let project = project::Entity::find_by_id(project_id)
2144                .filter(
2145                    Condition::all()
2146                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2147                        .add(
2148                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2149                        ),
2150                )
2151                .one(&*tx)
2152                .await?
2153                .ok_or_else(|| anyhow!("no such project"))?;
2154
2155            self.update_project_worktrees(project.id, worktrees, &tx)
2156                .await?;
2157
2158            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2159            let room = self.get_room(project.room_id, &tx).await?;
2160            Ok((project.room_id, (room, guest_connection_ids)))
2161        })
2162        .await
2163    }
2164
2165    async fn update_project_worktrees(
2166        &self,
2167        project_id: ProjectId,
2168        worktrees: &[proto::WorktreeMetadata],
2169        tx: &DatabaseTransaction,
2170    ) -> Result<()> {
2171        if !worktrees.is_empty() {
2172            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2173                id: ActiveValue::set(worktree.id as i64),
2174                project_id: ActiveValue::set(project_id),
2175                abs_path: ActiveValue::set(worktree.abs_path.clone()),
2176                root_name: ActiveValue::set(worktree.root_name.clone()),
2177                visible: ActiveValue::set(worktree.visible),
2178                scan_id: ActiveValue::set(0),
2179                completed_scan_id: ActiveValue::set(0),
2180            }))
2181            .on_conflict(
2182                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2183                    .update_column(worktree::Column::RootName)
2184                    .to_owned(),
2185            )
2186            .exec(&*tx)
2187            .await?;
2188        }
2189
2190        worktree::Entity::delete_many()
2191            .filter(worktree::Column::ProjectId.eq(project_id).and(
2192                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2193            ))
2194            .exec(&*tx)
2195            .await?;
2196
2197        Ok(())
2198    }
2199
2200    pub async fn update_worktree(
2201        &self,
2202        update: &proto::UpdateWorktree,
2203        connection: ConnectionId,
2204    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2205        self.room_transaction(|tx| async move {
2206            let project_id = ProjectId::from_proto(update.project_id);
2207            let worktree_id = update.worktree_id as i64;
2208
2209            // Ensure the update comes from the host.
2210            let project = project::Entity::find_by_id(project_id)
2211                .filter(
2212                    Condition::all()
2213                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2214                        .add(
2215                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2216                        ),
2217                )
2218                .one(&*tx)
2219                .await?
2220                .ok_or_else(|| anyhow!("no such project"))?;
2221            let room_id = project.room_id;
2222
2223            // Update metadata.
2224            worktree::Entity::update(worktree::ActiveModel {
2225                id: ActiveValue::set(worktree_id),
2226                project_id: ActiveValue::set(project_id),
2227                root_name: ActiveValue::set(update.root_name.clone()),
2228                scan_id: ActiveValue::set(update.scan_id as i64),
2229                completed_scan_id: if update.is_last_update {
2230                    ActiveValue::set(update.scan_id as i64)
2231                } else {
2232                    ActiveValue::default()
2233                },
2234                abs_path: ActiveValue::set(update.abs_path.clone()),
2235                ..Default::default()
2236            })
2237            .exec(&*tx)
2238            .await?;
2239
2240            if !update.updated_entries.is_empty() {
2241                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2242                    let mtime = entry.mtime.clone().unwrap_or_default();
2243                    worktree_entry::ActiveModel {
2244                        project_id: ActiveValue::set(project_id),
2245                        worktree_id: ActiveValue::set(worktree_id),
2246                        id: ActiveValue::set(entry.id as i64),
2247                        is_dir: ActiveValue::set(entry.is_dir),
2248                        path: ActiveValue::set(entry.path.clone()),
2249                        inode: ActiveValue::set(entry.inode as i64),
2250                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2251                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2252                        is_symlink: ActiveValue::set(entry.is_symlink),
2253                        is_ignored: ActiveValue::set(entry.is_ignored),
2254                        is_deleted: ActiveValue::set(false),
2255                        scan_id: ActiveValue::set(update.scan_id as i64),
2256                    }
2257                }))
2258                .on_conflict(
2259                    OnConflict::columns([
2260                        worktree_entry::Column::ProjectId,
2261                        worktree_entry::Column::WorktreeId,
2262                        worktree_entry::Column::Id,
2263                    ])
2264                    .update_columns([
2265                        worktree_entry::Column::IsDir,
2266                        worktree_entry::Column::Path,
2267                        worktree_entry::Column::Inode,
2268                        worktree_entry::Column::MtimeSeconds,
2269                        worktree_entry::Column::MtimeNanos,
2270                        worktree_entry::Column::IsSymlink,
2271                        worktree_entry::Column::IsIgnored,
2272                        worktree_entry::Column::ScanId,
2273                    ])
2274                    .to_owned(),
2275                )
2276                .exec(&*tx)
2277                .await?;
2278            }
2279
2280            if !update.removed_entries.is_empty() {
2281                worktree_entry::Entity::update_many()
2282                    .filter(
2283                        worktree_entry::Column::ProjectId
2284                            .eq(project_id)
2285                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2286                            .and(
2287                                worktree_entry::Column::Id
2288                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2289                            ),
2290                    )
2291                    .set(worktree_entry::ActiveModel {
2292                        is_deleted: ActiveValue::Set(true),
2293                        scan_id: ActiveValue::Set(update.scan_id as i64),
2294                        ..Default::default()
2295                    })
2296                    .exec(&*tx)
2297                    .await?;
2298            }
2299
2300            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2301            Ok((room_id, connection_ids))
2302        })
2303        .await
2304    }
2305
2306    pub async fn update_diagnostic_summary(
2307        &self,
2308        update: &proto::UpdateDiagnosticSummary,
2309        connection: ConnectionId,
2310    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2311        self.room_transaction(|tx| async move {
2312            let project_id = ProjectId::from_proto(update.project_id);
2313            let worktree_id = update.worktree_id as i64;
2314            let summary = update
2315                .summary
2316                .as_ref()
2317                .ok_or_else(|| anyhow!("invalid summary"))?;
2318
2319            // Ensure the update comes from the host.
2320            let project = project::Entity::find_by_id(project_id)
2321                .one(&*tx)
2322                .await?
2323                .ok_or_else(|| anyhow!("no such project"))?;
2324            if project.host_connection()? != connection {
2325                return Err(anyhow!("can't update a project hosted by someone else"))?;
2326            }
2327
2328            // Update summary.
2329            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2330                project_id: ActiveValue::set(project_id),
2331                worktree_id: ActiveValue::set(worktree_id),
2332                path: ActiveValue::set(summary.path.clone()),
2333                language_server_id: ActiveValue::set(summary.language_server_id as i64),
2334                error_count: ActiveValue::set(summary.error_count as i32),
2335                warning_count: ActiveValue::set(summary.warning_count as i32),
2336                ..Default::default()
2337            })
2338            .on_conflict(
2339                OnConflict::columns([
2340                    worktree_diagnostic_summary::Column::ProjectId,
2341                    worktree_diagnostic_summary::Column::WorktreeId,
2342                    worktree_diagnostic_summary::Column::Path,
2343                ])
2344                .update_columns([
2345                    worktree_diagnostic_summary::Column::LanguageServerId,
2346                    worktree_diagnostic_summary::Column::ErrorCount,
2347                    worktree_diagnostic_summary::Column::WarningCount,
2348                ])
2349                .to_owned(),
2350            )
2351            .exec(&*tx)
2352            .await?;
2353
2354            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2355            Ok((project.room_id, connection_ids))
2356        })
2357        .await
2358    }
2359
2360    pub async fn start_language_server(
2361        &self,
2362        update: &proto::StartLanguageServer,
2363        connection: ConnectionId,
2364    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2365        self.room_transaction(|tx| async move {
2366            let project_id = ProjectId::from_proto(update.project_id);
2367            let server = update
2368                .server
2369                .as_ref()
2370                .ok_or_else(|| anyhow!("invalid language server"))?;
2371
2372            // Ensure the update comes from the host.
2373            let project = project::Entity::find_by_id(project_id)
2374                .one(&*tx)
2375                .await?
2376                .ok_or_else(|| anyhow!("no such project"))?;
2377            if project.host_connection()? != connection {
2378                return Err(anyhow!("can't update a project hosted by someone else"))?;
2379            }
2380
2381            // Add the newly-started language server.
2382            language_server::Entity::insert(language_server::ActiveModel {
2383                project_id: ActiveValue::set(project_id),
2384                id: ActiveValue::set(server.id as i64),
2385                name: ActiveValue::set(server.name.clone()),
2386                ..Default::default()
2387            })
2388            .on_conflict(
2389                OnConflict::columns([
2390                    language_server::Column::ProjectId,
2391                    language_server::Column::Id,
2392                ])
2393                .update_column(language_server::Column::Name)
2394                .to_owned(),
2395            )
2396            .exec(&*tx)
2397            .await?;
2398
2399            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2400            Ok((project.room_id, connection_ids))
2401        })
2402        .await
2403    }
2404
2405    pub async fn join_project(
2406        &self,
2407        project_id: ProjectId,
2408        connection: ConnectionId,
2409    ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2410        self.room_transaction(|tx| async move {
2411            let participant = room_participant::Entity::find()
2412                .filter(
2413                    Condition::all()
2414                        .add(
2415                            room_participant::Column::AnsweringConnectionId
2416                                .eq(connection.id as i32),
2417                        )
2418                        .add(
2419                            room_participant::Column::AnsweringConnectionServerId
2420                                .eq(connection.owner_id as i32),
2421                        ),
2422                )
2423                .one(&*tx)
2424                .await?
2425                .ok_or_else(|| anyhow!("must join a room first"))?;
2426
2427            let project = project::Entity::find_by_id(project_id)
2428                .one(&*tx)
2429                .await?
2430                .ok_or_else(|| anyhow!("no such project"))?;
2431            if project.room_id != participant.room_id {
2432                return Err(anyhow!("no such project"))?;
2433            }
2434
2435            let mut collaborators = project
2436                .find_related(project_collaborator::Entity)
2437                .all(&*tx)
2438                .await?;
2439            let replica_ids = collaborators
2440                .iter()
2441                .map(|c| c.replica_id)
2442                .collect::<HashSet<_>>();
2443            let mut replica_id = ReplicaId(1);
2444            while replica_ids.contains(&replica_id) {
2445                replica_id.0 += 1;
2446            }
2447            let new_collaborator = project_collaborator::ActiveModel {
2448                project_id: ActiveValue::set(project_id),
2449                connection_id: ActiveValue::set(connection.id as i32),
2450                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2451                user_id: ActiveValue::set(participant.user_id),
2452                replica_id: ActiveValue::set(replica_id),
2453                is_host: ActiveValue::set(false),
2454                ..Default::default()
2455            }
2456            .insert(&*tx)
2457            .await?;
2458            collaborators.push(new_collaborator);
2459
2460            let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2461            let mut worktrees = db_worktrees
2462                .into_iter()
2463                .map(|db_worktree| {
2464                    (
2465                        db_worktree.id as u64,
2466                        Worktree {
2467                            id: db_worktree.id as u64,
2468                            abs_path: db_worktree.abs_path,
2469                            root_name: db_worktree.root_name,
2470                            visible: db_worktree.visible,
2471                            entries: Default::default(),
2472                            diagnostic_summaries: Default::default(),
2473                            scan_id: db_worktree.scan_id as u64,
2474                            completed_scan_id: db_worktree.completed_scan_id as u64,
2475                        },
2476                    )
2477                })
2478                .collect::<BTreeMap<_, _>>();
2479
2480            // Populate worktree entries.
2481            {
2482                let mut db_entries = worktree_entry::Entity::find()
2483                    .filter(
2484                        Condition::all()
2485                            .add(worktree_entry::Column::ProjectId.eq(project_id))
2486                            .add(worktree_entry::Column::IsDeleted.eq(false)),
2487                    )
2488                    .stream(&*tx)
2489                    .await?;
2490                while let Some(db_entry) = db_entries.next().await {
2491                    let db_entry = db_entry?;
2492                    if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2493                        worktree.entries.push(proto::Entry {
2494                            id: db_entry.id as u64,
2495                            is_dir: db_entry.is_dir,
2496                            path: db_entry.path,
2497                            inode: db_entry.inode as u64,
2498                            mtime: Some(proto::Timestamp {
2499                                seconds: db_entry.mtime_seconds as u64,
2500                                nanos: db_entry.mtime_nanos as u32,
2501                            }),
2502                            is_symlink: db_entry.is_symlink,
2503                            is_ignored: db_entry.is_ignored,
2504                        });
2505                    }
2506                }
2507            }
2508
2509            // Populate worktree diagnostic summaries.
2510            {
2511                let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2512                    .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2513                    .stream(&*tx)
2514                    .await?;
2515                while let Some(db_summary) = db_summaries.next().await {
2516                    let db_summary = db_summary?;
2517                    if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2518                        worktree
2519                            .diagnostic_summaries
2520                            .push(proto::DiagnosticSummary {
2521                                path: db_summary.path,
2522                                language_server_id: db_summary.language_server_id as u64,
2523                                error_count: db_summary.error_count as u32,
2524                                warning_count: db_summary.warning_count as u32,
2525                            });
2526                    }
2527                }
2528            }
2529
2530            // Populate language servers.
2531            let language_servers = project
2532                .find_related(language_server::Entity)
2533                .all(&*tx)
2534                .await?;
2535
2536            let room_id = project.room_id;
2537            let project = Project {
2538                collaborators: collaborators
2539                    .into_iter()
2540                    .map(|collaborator| ProjectCollaborator {
2541                        connection_id: collaborator.connection(),
2542                        user_id: collaborator.user_id,
2543                        replica_id: collaborator.replica_id,
2544                        is_host: collaborator.is_host,
2545                    })
2546                    .collect(),
2547                worktrees,
2548                language_servers: language_servers
2549                    .into_iter()
2550                    .map(|language_server| proto::LanguageServer {
2551                        id: language_server.id as u64,
2552                        name: language_server.name,
2553                    })
2554                    .collect(),
2555            };
2556            Ok((room_id, (project, replica_id as ReplicaId)))
2557        })
2558        .await
2559    }
2560
2561    pub async fn leave_project(
2562        &self,
2563        project_id: ProjectId,
2564        connection: ConnectionId,
2565    ) -> Result<RoomGuard<LeftProject>> {
2566        self.room_transaction(|tx| async move {
2567            let result = project_collaborator::Entity::delete_many()
2568                .filter(
2569                    Condition::all()
2570                        .add(project_collaborator::Column::ProjectId.eq(project_id))
2571                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2572                        .add(
2573                            project_collaborator::Column::ConnectionServerId
2574                                .eq(connection.owner_id as i32),
2575                        ),
2576                )
2577                .exec(&*tx)
2578                .await?;
2579            if result.rows_affected == 0 {
2580                Err(anyhow!("not a collaborator on this project"))?;
2581            }
2582
2583            let project = project::Entity::find_by_id(project_id)
2584                .one(&*tx)
2585                .await?
2586                .ok_or_else(|| anyhow!("no such project"))?;
2587            let collaborators = project
2588                .find_related(project_collaborator::Entity)
2589                .all(&*tx)
2590                .await?;
2591            let connection_ids = collaborators
2592                .into_iter()
2593                .map(|collaborator| collaborator.connection())
2594                .collect();
2595
2596            let left_project = LeftProject {
2597                id: project_id,
2598                host_user_id: project.host_user_id,
2599                host_connection_id: project.host_connection()?,
2600                connection_ids,
2601            };
2602            Ok((project.room_id, left_project))
2603        })
2604        .await
2605    }
2606
2607    pub async fn project_collaborators(
2608        &self,
2609        project_id: ProjectId,
2610        connection_id: ConnectionId,
2611    ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
2612        self.room_transaction(|tx| async move {
2613            let project = project::Entity::find_by_id(project_id)
2614                .one(&*tx)
2615                .await?
2616                .ok_or_else(|| anyhow!("no such project"))?;
2617            let collaborators = project_collaborator::Entity::find()
2618                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2619                .all(&*tx)
2620                .await?
2621                .into_iter()
2622                .map(|collaborator| ProjectCollaborator {
2623                    connection_id: collaborator.connection(),
2624                    user_id: collaborator.user_id,
2625                    replica_id: collaborator.replica_id,
2626                    is_host: collaborator.is_host,
2627                })
2628                .collect::<Vec<_>>();
2629
2630            if collaborators
2631                .iter()
2632                .any(|collaborator| collaborator.connection_id == connection_id)
2633            {
2634                Ok((project.room_id, collaborators))
2635            } else {
2636                Err(anyhow!("no such project"))?
2637            }
2638        })
2639        .await
2640    }
2641
2642    pub async fn project_connection_ids(
2643        &self,
2644        project_id: ProjectId,
2645        connection_id: ConnectionId,
2646    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
2647        self.room_transaction(|tx| async move {
2648            let project = project::Entity::find_by_id(project_id)
2649                .one(&*tx)
2650                .await?
2651                .ok_or_else(|| anyhow!("no such project"))?;
2652            let mut collaborators = project_collaborator::Entity::find()
2653                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2654                .stream(&*tx)
2655                .await?;
2656
2657            let mut connection_ids = HashSet::default();
2658            while let Some(collaborator) = collaborators.next().await {
2659                let collaborator = collaborator?;
2660                connection_ids.insert(collaborator.connection());
2661            }
2662
2663            if connection_ids.contains(&connection_id) {
2664                Ok((project.room_id, connection_ids))
2665            } else {
2666                Err(anyhow!("no such project"))?
2667            }
2668        })
2669        .await
2670    }
2671
2672    async fn project_guest_connection_ids(
2673        &self,
2674        project_id: ProjectId,
2675        tx: &DatabaseTransaction,
2676    ) -> Result<Vec<ConnectionId>> {
2677        let mut collaborators = project_collaborator::Entity::find()
2678            .filter(
2679                project_collaborator::Column::ProjectId
2680                    .eq(project_id)
2681                    .and(project_collaborator::Column::IsHost.eq(false)),
2682            )
2683            .stream(tx)
2684            .await?;
2685
2686        let mut guest_connection_ids = Vec::new();
2687        while let Some(collaborator) = collaborators.next().await {
2688            let collaborator = collaborator?;
2689            guest_connection_ids.push(collaborator.connection());
2690        }
2691        Ok(guest_connection_ids)
2692    }
2693
2694    // access tokens
2695
2696    pub async fn create_access_token_hash(
2697        &self,
2698        user_id: UserId,
2699        access_token_hash: &str,
2700        max_access_token_count: usize,
2701    ) -> Result<()> {
2702        self.transaction(|tx| async {
2703            let tx = tx;
2704
2705            access_token::ActiveModel {
2706                user_id: ActiveValue::set(user_id),
2707                hash: ActiveValue::set(access_token_hash.into()),
2708                ..Default::default()
2709            }
2710            .insert(&*tx)
2711            .await?;
2712
2713            access_token::Entity::delete_many()
2714                .filter(
2715                    access_token::Column::Id.in_subquery(
2716                        Query::select()
2717                            .column(access_token::Column::Id)
2718                            .from(access_token::Entity)
2719                            .and_where(access_token::Column::UserId.eq(user_id))
2720                            .order_by(access_token::Column::Id, sea_orm::Order::Desc)
2721                            .limit(10000)
2722                            .offset(max_access_token_count as u64)
2723                            .to_owned(),
2724                    ),
2725                )
2726                .exec(&*tx)
2727                .await?;
2728            Ok(())
2729        })
2730        .await
2731    }
2732
2733    pub async fn get_access_token_hashes(&self, user_id: UserId) -> Result<Vec<String>> {
2734        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2735        enum QueryAs {
2736            Hash,
2737        }
2738
2739        self.transaction(|tx| async move {
2740            Ok(access_token::Entity::find()
2741                .select_only()
2742                .column(access_token::Column::Hash)
2743                .filter(access_token::Column::UserId.eq(user_id))
2744                .order_by_desc(access_token::Column::Id)
2745                .into_values::<_, QueryAs>()
2746                .all(&*tx)
2747                .await?)
2748        })
2749        .await
2750    }
2751
2752    async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
2753    where
2754        F: Send + Fn(TransactionHandle) -> Fut,
2755        Fut: Send + Future<Output = Result<T>>,
2756    {
2757        let body = async {
2758            loop {
2759                let (tx, result) = self.with_transaction(&f).await?;
2760                match result {
2761                    Ok(result) => {
2762                        match tx.commit().await.map_err(Into::into) {
2763                            Ok(()) => return Ok(result),
2764                            Err(error) => {
2765                                if is_serialization_error(&error) {
2766                                    // Retry (don't break the loop)
2767                                } else {
2768                                    return Err(error);
2769                                }
2770                            }
2771                        }
2772                    }
2773                    Err(error) => {
2774                        tx.rollback().await?;
2775                        if is_serialization_error(&error) {
2776                            // Retry (don't break the loop)
2777                        } else {
2778                            return Err(error);
2779                        }
2780                    }
2781                }
2782            }
2783        };
2784
2785        self.run(body).await
2786    }
2787
2788    async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
2789    where
2790        F: Send + Fn(TransactionHandle) -> Fut,
2791        Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
2792    {
2793        let body = async {
2794            loop {
2795                let (tx, result) = self.with_transaction(&f).await?;
2796                match result {
2797                    Ok(Some((room_id, data))) => {
2798                        let lock = self.rooms.entry(room_id).or_default().clone();
2799                        let _guard = lock.lock_owned().await;
2800                        match tx.commit().await.map_err(Into::into) {
2801                            Ok(()) => {
2802                                return Ok(Some(RoomGuard {
2803                                    data,
2804                                    _guard,
2805                                    _not_send: PhantomData,
2806                                }));
2807                            }
2808                            Err(error) => {
2809                                if is_serialization_error(&error) {
2810                                    // Retry (don't break the loop)
2811                                } else {
2812                                    return Err(error);
2813                                }
2814                            }
2815                        }
2816                    }
2817                    Ok(None) => {
2818                        match tx.commit().await.map_err(Into::into) {
2819                            Ok(()) => return Ok(None),
2820                            Err(error) => {
2821                                if is_serialization_error(&error) {
2822                                    // Retry (don't break the loop)
2823                                } else {
2824                                    return Err(error);
2825                                }
2826                            }
2827                        }
2828                    }
2829                    Err(error) => {
2830                        tx.rollback().await?;
2831                        if is_serialization_error(&error) {
2832                            // Retry (don't break the loop)
2833                        } else {
2834                            return Err(error);
2835                        }
2836                    }
2837                }
2838            }
2839        };
2840
2841        self.run(body).await
2842    }
2843
2844    async fn room_transaction<F, Fut, T>(&self, f: F) -> Result<RoomGuard<T>>
2845    where
2846        F: Send + Fn(TransactionHandle) -> Fut,
2847        Fut: Send + Future<Output = Result<(RoomId, T)>>,
2848    {
2849        let data = self
2850            .optional_room_transaction(move |tx| {
2851                let future = f(tx);
2852                async {
2853                    let data = future.await?;
2854                    Ok(Some(data))
2855                }
2856            })
2857            .await?;
2858        Ok(data.unwrap())
2859    }
2860
2861    async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
2862    where
2863        F: Send + Fn(TransactionHandle) -> Fut,
2864        Fut: Send + Future<Output = Result<T>>,
2865    {
2866        let tx = self
2867            .pool
2868            .begin_with_config(Some(IsolationLevel::Serializable), None)
2869            .await?;
2870
2871        let mut tx = Arc::new(Some(tx));
2872        let result = f(TransactionHandle(tx.clone())).await;
2873        let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
2874            return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
2875        };
2876
2877        Ok((tx, result))
2878    }
2879
2880    async fn run<F, T>(&self, future: F) -> T
2881    where
2882        F: Future<Output = T>,
2883    {
2884        #[cfg(test)]
2885        {
2886            if let Some(background) = self.background.as_ref() {
2887                background.simulate_random_delay().await;
2888            }
2889
2890            self.runtime.as_ref().unwrap().block_on(future)
2891        }
2892
2893        #[cfg(not(test))]
2894        {
2895            future.await
2896        }
2897    }
2898}
2899
2900fn is_serialization_error(error: &Error) -> bool {
2901    const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
2902    match error {
2903        Error::Database(
2904            DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
2905            | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
2906        ) if error
2907            .as_database_error()
2908            .and_then(|error| error.code())
2909            .as_deref()
2910            == Some(SERIALIZATION_FAILURE_CODE) =>
2911        {
2912            true
2913        }
2914        _ => false,
2915    }
2916}
2917
2918struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
2919
2920impl Deref for TransactionHandle {
2921    type Target = DatabaseTransaction;
2922
2923    fn deref(&self) -> &Self::Target {
2924        self.0.as_ref().as_ref().unwrap()
2925    }
2926}
2927
2928pub struct RoomGuard<T> {
2929    data: T,
2930    _guard: OwnedMutexGuard<()>,
2931    _not_send: PhantomData<Rc<()>>,
2932}
2933
2934impl<T> Deref for RoomGuard<T> {
2935    type Target = T;
2936
2937    fn deref(&self) -> &T {
2938        &self.data
2939    }
2940}
2941
2942impl<T> DerefMut for RoomGuard<T> {
2943    fn deref_mut(&mut self) -> &mut T {
2944        &mut self.data
2945    }
2946}
2947
2948#[derive(Debug, Serialize, Deserialize)]
2949pub struct NewUserParams {
2950    pub github_login: String,
2951    pub github_user_id: i32,
2952    pub invite_count: i32,
2953}
2954
2955#[derive(Debug)]
2956pub struct NewUserResult {
2957    pub user_id: UserId,
2958    pub metrics_id: String,
2959    pub inviting_user_id: Option<UserId>,
2960    pub signup_device_id: Option<String>,
2961}
2962
2963fn random_invite_code() -> String {
2964    nanoid::nanoid!(16)
2965}
2966
2967fn random_email_confirmation_code() -> String {
2968    nanoid::nanoid!(64)
2969}
2970
2971macro_rules! id_type {
2972    ($name:ident) => {
2973        #[derive(
2974            Clone,
2975            Copy,
2976            Debug,
2977            Default,
2978            PartialEq,
2979            Eq,
2980            PartialOrd,
2981            Ord,
2982            Hash,
2983            Serialize,
2984            Deserialize,
2985        )]
2986        #[serde(transparent)]
2987        pub struct $name(pub i32);
2988
2989        impl $name {
2990            #[allow(unused)]
2991            pub const MAX: Self = Self(i32::MAX);
2992
2993            #[allow(unused)]
2994            pub fn from_proto(value: u64) -> Self {
2995                Self(value as i32)
2996            }
2997
2998            #[allow(unused)]
2999            pub fn to_proto(self) -> u64 {
3000                self.0 as u64
3001            }
3002        }
3003
3004        impl std::fmt::Display for $name {
3005            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
3006                self.0.fmt(f)
3007            }
3008        }
3009
3010        impl From<$name> for sea_query::Value {
3011            fn from(value: $name) -> Self {
3012                sea_query::Value::Int(Some(value.0))
3013            }
3014        }
3015
3016        impl sea_orm::TryGetable for $name {
3017            fn try_get(
3018                res: &sea_orm::QueryResult,
3019                pre: &str,
3020                col: &str,
3021            ) -> Result<Self, sea_orm::TryGetError> {
3022                Ok(Self(i32::try_get(res, pre, col)?))
3023            }
3024        }
3025
3026        impl sea_query::ValueType for $name {
3027            fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
3028                match v {
3029                    Value::TinyInt(Some(int)) => {
3030                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3031                    }
3032                    Value::SmallInt(Some(int)) => {
3033                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3034                    }
3035                    Value::Int(Some(int)) => {
3036                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3037                    }
3038                    Value::BigInt(Some(int)) => {
3039                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3040                    }
3041                    Value::TinyUnsigned(Some(int)) => {
3042                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3043                    }
3044                    Value::SmallUnsigned(Some(int)) => {
3045                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3046                    }
3047                    Value::Unsigned(Some(int)) => {
3048                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3049                    }
3050                    Value::BigUnsigned(Some(int)) => {
3051                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3052                    }
3053                    _ => Err(sea_query::ValueTypeErr),
3054                }
3055            }
3056
3057            fn type_name() -> String {
3058                stringify!($name).into()
3059            }
3060
3061            fn array_type() -> sea_query::ArrayType {
3062                sea_query::ArrayType::Int
3063            }
3064
3065            fn column_type() -> sea_query::ColumnType {
3066                sea_query::ColumnType::Integer(None)
3067            }
3068        }
3069
3070        impl sea_orm::TryFromU64 for $name {
3071            fn try_from_u64(n: u64) -> Result<Self, DbErr> {
3072                Ok(Self(n.try_into().map_err(|_| {
3073                    DbErr::ConvertFromU64(concat!(
3074                        "error converting ",
3075                        stringify!($name),
3076                        " to u64"
3077                    ))
3078                })?))
3079            }
3080        }
3081
3082        impl sea_query::Nullable for $name {
3083            fn null() -> Value {
3084                Value::Int(None)
3085            }
3086        }
3087    };
3088}
3089
3090id_type!(AccessTokenId);
3091id_type!(ContactId);
3092id_type!(FollowerId);
3093id_type!(RoomId);
3094id_type!(RoomParticipantId);
3095id_type!(ProjectId);
3096id_type!(ProjectCollaboratorId);
3097id_type!(ReplicaId);
3098id_type!(ServerId);
3099id_type!(SignupId);
3100id_type!(UserId);
3101
3102pub struct RejoinedRoom {
3103    pub room: proto::Room,
3104    pub rejoined_projects: Vec<RejoinedProject>,
3105    pub reshared_projects: Vec<ResharedProject>,
3106}
3107
3108pub struct ResharedProject {
3109    pub id: ProjectId,
3110    pub old_connection_id: ConnectionId,
3111    pub collaborators: Vec<ProjectCollaborator>,
3112    pub worktrees: Vec<proto::WorktreeMetadata>,
3113}
3114
3115pub struct RejoinedProject {
3116    pub id: ProjectId,
3117    pub old_connection_id: ConnectionId,
3118    pub collaborators: Vec<ProjectCollaborator>,
3119    pub worktrees: Vec<RejoinedWorktree>,
3120    pub language_servers: Vec<proto::LanguageServer>,
3121}
3122
3123#[derive(Debug)]
3124pub struct RejoinedWorktree {
3125    pub id: u64,
3126    pub abs_path: String,
3127    pub root_name: String,
3128    pub visible: bool,
3129    pub updated_entries: Vec<proto::Entry>,
3130    pub removed_entries: Vec<u64>,
3131    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3132    pub scan_id: u64,
3133    pub completed_scan_id: u64,
3134}
3135
3136pub struct LeftRoom {
3137    pub room: proto::Room,
3138    pub left_projects: HashMap<ProjectId, LeftProject>,
3139    pub canceled_calls_to_user_ids: Vec<UserId>,
3140}
3141
3142pub struct RefreshedRoom {
3143    pub room: proto::Room,
3144    pub stale_participant_user_ids: Vec<UserId>,
3145    pub canceled_calls_to_user_ids: Vec<UserId>,
3146}
3147
3148pub struct Project {
3149    pub collaborators: Vec<ProjectCollaborator>,
3150    pub worktrees: BTreeMap<u64, Worktree>,
3151    pub language_servers: Vec<proto::LanguageServer>,
3152}
3153
3154pub struct ProjectCollaborator {
3155    pub connection_id: ConnectionId,
3156    pub user_id: UserId,
3157    pub replica_id: ReplicaId,
3158    pub is_host: bool,
3159}
3160
3161impl ProjectCollaborator {
3162    pub fn to_proto(&self) -> proto::Collaborator {
3163        proto::Collaborator {
3164            peer_id: Some(self.connection_id.into()),
3165            replica_id: self.replica_id.0 as u32,
3166            user_id: self.user_id.to_proto(),
3167        }
3168    }
3169}
3170
3171#[derive(Debug)]
3172pub struct LeftProject {
3173    pub id: ProjectId,
3174    pub host_user_id: UserId,
3175    pub host_connection_id: ConnectionId,
3176    pub connection_ids: Vec<ConnectionId>,
3177}
3178
3179pub struct Worktree {
3180    pub id: u64,
3181    pub abs_path: String,
3182    pub root_name: String,
3183    pub visible: bool,
3184    pub entries: Vec<proto::Entry>,
3185    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3186    pub scan_id: u64,
3187    pub completed_scan_id: u64,
3188}
3189
3190#[cfg(test)]
3191pub use test::*;
3192
3193#[cfg(test)]
3194mod test {
3195    use super::*;
3196    use gpui::executor::Background;
3197    use lazy_static::lazy_static;
3198    use parking_lot::Mutex;
3199    use rand::prelude::*;
3200    use sea_orm::ConnectionTrait;
3201    use sqlx::migrate::MigrateDatabase;
3202    use std::sync::Arc;
3203
3204    pub struct TestDb {
3205        pub db: Option<Arc<Database>>,
3206        pub connection: Option<sqlx::AnyConnection>,
3207    }
3208
3209    impl TestDb {
3210        pub fn sqlite(background: Arc<Background>) -> Self {
3211            let url = format!("sqlite::memory:");
3212            let runtime = tokio::runtime::Builder::new_current_thread()
3213                .enable_io()
3214                .enable_time()
3215                .build()
3216                .unwrap();
3217
3218            let mut db = runtime.block_on(async {
3219                let mut options = ConnectOptions::new(url);
3220                options.max_connections(5);
3221                let db = Database::new(options).await.unwrap();
3222                let sql = include_str!(concat!(
3223                    env!("CARGO_MANIFEST_DIR"),
3224                    "/migrations.sqlite/20221109000000_test_schema.sql"
3225                ));
3226                db.pool
3227                    .execute(sea_orm::Statement::from_string(
3228                        db.pool.get_database_backend(),
3229                        sql.into(),
3230                    ))
3231                    .await
3232                    .unwrap();
3233                db
3234            });
3235
3236            db.background = Some(background);
3237            db.runtime = Some(runtime);
3238
3239            Self {
3240                db: Some(Arc::new(db)),
3241                connection: None,
3242            }
3243        }
3244
3245        pub fn postgres(background: Arc<Background>) -> Self {
3246            lazy_static! {
3247                static ref LOCK: Mutex<()> = Mutex::new(());
3248            }
3249
3250            let _guard = LOCK.lock();
3251            let mut rng = StdRng::from_entropy();
3252            let url = format!(
3253                "postgres://postgres@localhost/zed-test-{}",
3254                rng.gen::<u128>()
3255            );
3256            let runtime = tokio::runtime::Builder::new_current_thread()
3257                .enable_io()
3258                .enable_time()
3259                .build()
3260                .unwrap();
3261
3262            let mut db = runtime.block_on(async {
3263                sqlx::Postgres::create_database(&url)
3264                    .await
3265                    .expect("failed to create test db");
3266                let mut options = ConnectOptions::new(url);
3267                options
3268                    .max_connections(5)
3269                    .idle_timeout(Duration::from_secs(0));
3270                let db = Database::new(options).await.unwrap();
3271                let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
3272                db.migrate(Path::new(migrations_path), false).await.unwrap();
3273                db
3274            });
3275
3276            db.background = Some(background);
3277            db.runtime = Some(runtime);
3278
3279            Self {
3280                db: Some(Arc::new(db)),
3281                connection: None,
3282            }
3283        }
3284
3285        pub fn db(&self) -> &Arc<Database> {
3286            self.db.as_ref().unwrap()
3287        }
3288    }
3289
3290    impl Drop for TestDb {
3291        fn drop(&mut self) {
3292            let db = self.db.take().unwrap();
3293            if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
3294                db.runtime.as_ref().unwrap().block_on(async {
3295                    use util::ResultExt;
3296                    let query = "
3297                        SELECT pg_terminate_backend(pg_stat_activity.pid)
3298                        FROM pg_stat_activity
3299                        WHERE
3300                            pg_stat_activity.datname = current_database() AND
3301                            pid <> pg_backend_pid();
3302                    ";
3303                    db.pool
3304                        .execute(sea_orm::Statement::from_string(
3305                            db.pool.get_database_backend(),
3306                            query.into(),
3307                        ))
3308                        .await
3309                        .log_err();
3310                    sqlx::Postgres::drop_database(db.options.get_url())
3311                        .await
3312                        .log_err();
3313                })
3314            }
3315        }
3316    }
3317}