db.rs

   1mod access_token;
   2mod contact;
   3mod follower;
   4mod language_server;
   5mod project;
   6mod project_collaborator;
   7mod room;
   8mod room_participant;
   9mod server;
  10mod signup;
  11#[cfg(test)]
  12mod tests;
  13mod user;
  14mod worktree;
  15mod worktree_diagnostic_summary;
  16mod worktree_entry;
  17
  18use crate::{Error, Result};
  19use anyhow::anyhow;
  20use collections::{BTreeMap, HashMap, HashSet};
  21pub use contact::Contact;
  22use dashmap::DashMap;
  23use futures::StreamExt;
  24use hyper::StatusCode;
  25use rpc::{proto, ConnectionId};
  26use sea_orm::Condition;
  27pub use sea_orm::ConnectOptions;
  28use sea_orm::{
  29    entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
  30    DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
  31    Statement, TransactionTrait,
  32};
  33use sea_query::{Alias, Expr, OnConflict, Query};
  34use serde::{Deserialize, Serialize};
  35pub use signup::{Invite, NewSignup, WaitlistSummary};
  36use sqlx::migrate::{Migrate, Migration, MigrationSource};
  37use sqlx::Connection;
  38use std::ops::{Deref, DerefMut};
  39use std::path::Path;
  40use std::time::Duration;
  41use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
  42use tokio::sync::{Mutex, OwnedMutexGuard};
  43pub use user::Model as User;
  44
  45pub struct Database {
  46    options: ConnectOptions,
  47    pool: DatabaseConnection,
  48    rooms: DashMap<RoomId, Arc<Mutex<()>>>,
  49    #[cfg(test)]
  50    background: Option<std::sync::Arc<gpui::executor::Background>>,
  51    #[cfg(test)]
  52    runtime: Option<tokio::runtime::Runtime>,
  53}
  54
  55impl Database {
  56    pub async fn new(options: ConnectOptions) -> Result<Self> {
  57        Ok(Self {
  58            options: options.clone(),
  59            pool: sea_orm::Database::connect(options).await?,
  60            rooms: DashMap::with_capacity(16384),
  61            #[cfg(test)]
  62            background: None,
  63            #[cfg(test)]
  64            runtime: None,
  65        })
  66    }
  67
  68    #[cfg(test)]
  69    pub fn reset(&self) {
  70        self.rooms.clear();
  71    }
  72
  73    pub async fn migrate(
  74        &self,
  75        migrations_path: &Path,
  76        ignore_checksum_mismatch: bool,
  77    ) -> anyhow::Result<Vec<(Migration, Duration)>> {
  78        let migrations = MigrationSource::resolve(migrations_path)
  79            .await
  80            .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
  81
  82        let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
  83
  84        connection.ensure_migrations_table().await?;
  85        let applied_migrations: HashMap<_, _> = connection
  86            .list_applied_migrations()
  87            .await?
  88            .into_iter()
  89            .map(|m| (m.version, m))
  90            .collect();
  91
  92        let mut new_migrations = Vec::new();
  93        for migration in migrations {
  94            match applied_migrations.get(&migration.version) {
  95                Some(applied_migration) => {
  96                    if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
  97                    {
  98                        Err(anyhow!(
  99                            "checksum mismatch for applied migration {}",
 100                            migration.description
 101                        ))?;
 102                    }
 103                }
 104                None => {
 105                    let elapsed = connection.apply(&migration).await?;
 106                    new_migrations.push((migration, elapsed));
 107                }
 108            }
 109        }
 110
 111        Ok(new_migrations)
 112    }
 113
 114    pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
 115        self.transaction(|tx| async move {
 116            let server = server::ActiveModel {
 117                environment: ActiveValue::set(environment.into()),
 118                ..Default::default()
 119            }
 120            .insert(&*tx)
 121            .await?;
 122            Ok(server.id)
 123        })
 124        .await
 125    }
 126
 127    pub async fn stale_room_ids(
 128        &self,
 129        environment: &str,
 130        new_server_id: ServerId,
 131    ) -> Result<Vec<RoomId>> {
 132        self.transaction(|tx| async move {
 133            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 134            enum QueryAs {
 135                RoomId,
 136            }
 137
 138            let stale_server_epochs = self
 139                .stale_server_ids(environment, new_server_id, &tx)
 140                .await?;
 141            Ok(room_participant::Entity::find()
 142                .select_only()
 143                .column(room_participant::Column::RoomId)
 144                .distinct()
 145                .filter(
 146                    room_participant::Column::AnsweringConnectionServerId
 147                        .is_in(stale_server_epochs),
 148                )
 149                .into_values::<_, QueryAs>()
 150                .all(&*tx)
 151                .await?)
 152        })
 153        .await
 154    }
 155
 156    pub async fn refresh_room(
 157        &self,
 158        room_id: RoomId,
 159        new_server_id: ServerId,
 160    ) -> Result<RoomGuard<RefreshedRoom>> {
 161        self.room_transaction(|tx| async move {
 162            let stale_participant_filter = Condition::all()
 163                .add(room_participant::Column::RoomId.eq(room_id))
 164                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
 165                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
 166
 167            let stale_participant_user_ids = room_participant::Entity::find()
 168                .filter(stale_participant_filter.clone())
 169                .all(&*tx)
 170                .await?
 171                .into_iter()
 172                .map(|participant| participant.user_id)
 173                .collect::<Vec<_>>();
 174
 175            // Delete participants who failed to reconnect.
 176            room_participant::Entity::delete_many()
 177                .filter(stale_participant_filter)
 178                .exec(&*tx)
 179                .await?;
 180
 181            let room = self.get_room(room_id, &tx).await?;
 182            let mut canceled_calls_to_user_ids = Vec::new();
 183            // Delete the room if it becomes empty and cancel pending calls.
 184            if room.participants.is_empty() {
 185                canceled_calls_to_user_ids.extend(
 186                    room.pending_participants
 187                        .iter()
 188                        .map(|pending_participant| UserId::from_proto(pending_participant.user_id)),
 189                );
 190                room_participant::Entity::delete_many()
 191                    .filter(room_participant::Column::RoomId.eq(room_id))
 192                    .exec(&*tx)
 193                    .await?;
 194                room::Entity::delete_by_id(room_id).exec(&*tx).await?;
 195            }
 196
 197            Ok((
 198                room_id,
 199                RefreshedRoom {
 200                    room,
 201                    stale_participant_user_ids,
 202                    canceled_calls_to_user_ids,
 203                },
 204            ))
 205        })
 206        .await
 207    }
 208
 209    pub async fn delete_stale_servers(
 210        &self,
 211        environment: &str,
 212        new_server_id: ServerId,
 213    ) -> Result<()> {
 214        self.transaction(|tx| async move {
 215            server::Entity::delete_many()
 216                .filter(
 217                    Condition::all()
 218                        .add(server::Column::Environment.eq(environment))
 219                        .add(server::Column::Id.ne(new_server_id)),
 220                )
 221                .exec(&*tx)
 222                .await?;
 223            Ok(())
 224        })
 225        .await
 226    }
 227
 228    async fn stale_server_ids(
 229        &self,
 230        environment: &str,
 231        new_server_id: ServerId,
 232        tx: &DatabaseTransaction,
 233    ) -> Result<Vec<ServerId>> {
 234        let stale_servers = server::Entity::find()
 235            .filter(
 236                Condition::all()
 237                    .add(server::Column::Environment.eq(environment))
 238                    .add(server::Column::Id.ne(new_server_id)),
 239            )
 240            .all(&*tx)
 241            .await?;
 242        Ok(stale_servers.into_iter().map(|server| server.id).collect())
 243    }
 244
 245    // users
 246
 247    pub async fn create_user(
 248        &self,
 249        email_address: &str,
 250        admin: bool,
 251        params: NewUserParams,
 252    ) -> Result<NewUserResult> {
 253        self.transaction(|tx| async {
 254            let tx = tx;
 255            let user = user::Entity::insert(user::ActiveModel {
 256                email_address: ActiveValue::set(Some(email_address.into())),
 257                github_login: ActiveValue::set(params.github_login.clone()),
 258                github_user_id: ActiveValue::set(Some(params.github_user_id)),
 259                admin: ActiveValue::set(admin),
 260                metrics_id: ActiveValue::set(Uuid::new_v4()),
 261                ..Default::default()
 262            })
 263            .on_conflict(
 264                OnConflict::column(user::Column::GithubLogin)
 265                    .update_column(user::Column::GithubLogin)
 266                    .to_owned(),
 267            )
 268            .exec_with_returning(&*tx)
 269            .await?;
 270
 271            Ok(NewUserResult {
 272                user_id: user.id,
 273                metrics_id: user.metrics_id.to_string(),
 274                signup_device_id: None,
 275                inviting_user_id: None,
 276            })
 277        })
 278        .await
 279    }
 280
 281    pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
 282        self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
 283            .await
 284    }
 285
 286    pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
 287        self.transaction(|tx| async {
 288            let tx = tx;
 289            Ok(user::Entity::find()
 290                .filter(user::Column::Id.is_in(ids.iter().copied()))
 291                .all(&*tx)
 292                .await?)
 293        })
 294        .await
 295    }
 296
 297    pub async fn get_user_by_github_account(
 298        &self,
 299        github_login: &str,
 300        github_user_id: Option<i32>,
 301    ) -> Result<Option<User>> {
 302        self.transaction(|tx| async move {
 303            let tx = &*tx;
 304            if let Some(github_user_id) = github_user_id {
 305                if let Some(user_by_github_user_id) = user::Entity::find()
 306                    .filter(user::Column::GithubUserId.eq(github_user_id))
 307                    .one(tx)
 308                    .await?
 309                {
 310                    let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
 311                    user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
 312                    Ok(Some(user_by_github_user_id.update(tx).await?))
 313                } else if let Some(user_by_github_login) = user::Entity::find()
 314                    .filter(user::Column::GithubLogin.eq(github_login))
 315                    .one(tx)
 316                    .await?
 317                {
 318                    let mut user_by_github_login = user_by_github_login.into_active_model();
 319                    user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
 320                    Ok(Some(user_by_github_login.update(tx).await?))
 321                } else {
 322                    Ok(None)
 323                }
 324            } else {
 325                Ok(user::Entity::find()
 326                    .filter(user::Column::GithubLogin.eq(github_login))
 327                    .one(tx)
 328                    .await?)
 329            }
 330        })
 331        .await
 332    }
 333
 334    pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
 335        self.transaction(|tx| async move {
 336            Ok(user::Entity::find()
 337                .order_by_asc(user::Column::GithubLogin)
 338                .limit(limit as u64)
 339                .offset(page as u64 * limit as u64)
 340                .all(&*tx)
 341                .await?)
 342        })
 343        .await
 344    }
 345
 346    pub async fn get_users_with_no_invites(
 347        &self,
 348        invited_by_another_user: bool,
 349    ) -> Result<Vec<User>> {
 350        self.transaction(|tx| async move {
 351            Ok(user::Entity::find()
 352                .filter(
 353                    user::Column::InviteCount
 354                        .eq(0)
 355                        .and(if invited_by_another_user {
 356                            user::Column::InviterId.is_not_null()
 357                        } else {
 358                            user::Column::InviterId.is_null()
 359                        }),
 360                )
 361                .all(&*tx)
 362                .await?)
 363        })
 364        .await
 365    }
 366
 367    pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
 368        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 369        enum QueryAs {
 370            MetricsId,
 371        }
 372
 373        self.transaction(|tx| async move {
 374            let metrics_id: Uuid = user::Entity::find_by_id(id)
 375                .select_only()
 376                .column(user::Column::MetricsId)
 377                .into_values::<_, QueryAs>()
 378                .one(&*tx)
 379                .await?
 380                .ok_or_else(|| anyhow!("could not find user"))?;
 381            Ok(metrics_id.to_string())
 382        })
 383        .await
 384    }
 385
 386    pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
 387        self.transaction(|tx| async move {
 388            user::Entity::update_many()
 389                .filter(user::Column::Id.eq(id))
 390                .set(user::ActiveModel {
 391                    admin: ActiveValue::set(is_admin),
 392                    ..Default::default()
 393                })
 394                .exec(&*tx)
 395                .await?;
 396            Ok(())
 397        })
 398        .await
 399    }
 400
 401    pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
 402        self.transaction(|tx| async move {
 403            user::Entity::update_many()
 404                .filter(user::Column::Id.eq(id))
 405                .set(user::ActiveModel {
 406                    connected_once: ActiveValue::set(connected_once),
 407                    ..Default::default()
 408                })
 409                .exec(&*tx)
 410                .await?;
 411            Ok(())
 412        })
 413        .await
 414    }
 415
 416    pub async fn destroy_user(&self, id: UserId) -> Result<()> {
 417        self.transaction(|tx| async move {
 418            access_token::Entity::delete_many()
 419                .filter(access_token::Column::UserId.eq(id))
 420                .exec(&*tx)
 421                .await?;
 422            user::Entity::delete_by_id(id).exec(&*tx).await?;
 423            Ok(())
 424        })
 425        .await
 426    }
 427
 428    // contacts
 429
 430    pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
 431        #[derive(Debug, FromQueryResult)]
 432        struct ContactWithUserBusyStatuses {
 433            user_id_a: UserId,
 434            user_id_b: UserId,
 435            a_to_b: bool,
 436            accepted: bool,
 437            should_notify: bool,
 438            user_a_busy: bool,
 439            user_b_busy: bool,
 440        }
 441
 442        self.transaction(|tx| async move {
 443            let user_a_participant = Alias::new("user_a_participant");
 444            let user_b_participant = Alias::new("user_b_participant");
 445            let mut db_contacts = contact::Entity::find()
 446                .column_as(
 447                    Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
 448                        .is_not_null(),
 449                    "user_a_busy",
 450                )
 451                .column_as(
 452                    Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
 453                        .is_not_null(),
 454                    "user_b_busy",
 455                )
 456                .filter(
 457                    contact::Column::UserIdA
 458                        .eq(user_id)
 459                        .or(contact::Column::UserIdB.eq(user_id)),
 460                )
 461                .join_as(
 462                    JoinType::LeftJoin,
 463                    contact::Relation::UserARoomParticipant.def(),
 464                    user_a_participant,
 465                )
 466                .join_as(
 467                    JoinType::LeftJoin,
 468                    contact::Relation::UserBRoomParticipant.def(),
 469                    user_b_participant,
 470                )
 471                .into_model::<ContactWithUserBusyStatuses>()
 472                .stream(&*tx)
 473                .await?;
 474
 475            let mut contacts = Vec::new();
 476            while let Some(db_contact) = db_contacts.next().await {
 477                let db_contact = db_contact?;
 478                if db_contact.user_id_a == user_id {
 479                    if db_contact.accepted {
 480                        contacts.push(Contact::Accepted {
 481                            user_id: db_contact.user_id_b,
 482                            should_notify: db_contact.should_notify && db_contact.a_to_b,
 483                            busy: db_contact.user_b_busy,
 484                        });
 485                    } else if db_contact.a_to_b {
 486                        contacts.push(Contact::Outgoing {
 487                            user_id: db_contact.user_id_b,
 488                        })
 489                    } else {
 490                        contacts.push(Contact::Incoming {
 491                            user_id: db_contact.user_id_b,
 492                            should_notify: db_contact.should_notify,
 493                        });
 494                    }
 495                } else if db_contact.accepted {
 496                    contacts.push(Contact::Accepted {
 497                        user_id: db_contact.user_id_a,
 498                        should_notify: db_contact.should_notify && !db_contact.a_to_b,
 499                        busy: db_contact.user_a_busy,
 500                    });
 501                } else if db_contact.a_to_b {
 502                    contacts.push(Contact::Incoming {
 503                        user_id: db_contact.user_id_a,
 504                        should_notify: db_contact.should_notify,
 505                    });
 506                } else {
 507                    contacts.push(Contact::Outgoing {
 508                        user_id: db_contact.user_id_a,
 509                    });
 510                }
 511            }
 512
 513            contacts.sort_unstable_by_key(|contact| contact.user_id());
 514
 515            Ok(contacts)
 516        })
 517        .await
 518    }
 519
 520    pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
 521        self.transaction(|tx| async move {
 522            let participant = room_participant::Entity::find()
 523                .filter(room_participant::Column::UserId.eq(user_id))
 524                .one(&*tx)
 525                .await?;
 526            Ok(participant.is_some())
 527        })
 528        .await
 529    }
 530
 531    pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
 532        self.transaction(|tx| async move {
 533            let (id_a, id_b) = if user_id_1 < user_id_2 {
 534                (user_id_1, user_id_2)
 535            } else {
 536                (user_id_2, user_id_1)
 537            };
 538
 539            Ok(contact::Entity::find()
 540                .filter(
 541                    contact::Column::UserIdA
 542                        .eq(id_a)
 543                        .and(contact::Column::UserIdB.eq(id_b))
 544                        .and(contact::Column::Accepted.eq(true)),
 545                )
 546                .one(&*tx)
 547                .await?
 548                .is_some())
 549        })
 550        .await
 551    }
 552
 553    pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
 554        self.transaction(|tx| async move {
 555            let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
 556                (sender_id, receiver_id, true)
 557            } else {
 558                (receiver_id, sender_id, false)
 559            };
 560
 561            let rows_affected = contact::Entity::insert(contact::ActiveModel {
 562                user_id_a: ActiveValue::set(id_a),
 563                user_id_b: ActiveValue::set(id_b),
 564                a_to_b: ActiveValue::set(a_to_b),
 565                accepted: ActiveValue::set(false),
 566                should_notify: ActiveValue::set(true),
 567                ..Default::default()
 568            })
 569            .on_conflict(
 570                OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
 571                    .values([
 572                        (contact::Column::Accepted, true.into()),
 573                        (contact::Column::ShouldNotify, false.into()),
 574                    ])
 575                    .action_and_where(
 576                        contact::Column::Accepted.eq(false).and(
 577                            contact::Column::AToB
 578                                .eq(a_to_b)
 579                                .and(contact::Column::UserIdA.eq(id_b))
 580                                .or(contact::Column::AToB
 581                                    .ne(a_to_b)
 582                                    .and(contact::Column::UserIdA.eq(id_a))),
 583                        ),
 584                    )
 585                    .to_owned(),
 586            )
 587            .exec_without_returning(&*tx)
 588            .await?;
 589
 590            if rows_affected == 1 {
 591                Ok(())
 592            } else {
 593                Err(anyhow!("contact already requested"))?
 594            }
 595        })
 596        .await
 597    }
 598
 599    /// Returns a bool indicating whether the removed contact had originally accepted or not
 600    ///
 601    /// Deletes the contact identified by the requester and responder ids, and then returns
 602    /// whether the deleted contact had originally accepted or was a pending contact request.
 603    ///
 604    /// # Arguments
 605    ///
 606    /// * `requester_id` - The user that initiates this request
 607    /// * `responder_id` - The user that will be removed
 608    pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<bool> {
 609        self.transaction(|tx| async move {
 610            let (id_a, id_b) = if responder_id < requester_id {
 611                (responder_id, requester_id)
 612            } else {
 613                (requester_id, responder_id)
 614            };
 615
 616            let contact = contact::Entity::find()
 617                .filter(
 618                    contact::Column::UserIdA
 619                        .eq(id_a)
 620                        .and(contact::Column::UserIdB.eq(id_b)),
 621                )
 622                .one(&*tx)
 623                .await?
 624                .ok_or_else(|| anyhow!("no such contact"))?;
 625
 626            contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
 627            Ok(contact.accepted)
 628        })
 629        .await
 630    }
 631
 632    pub async fn dismiss_contact_notification(
 633        &self,
 634        user_id: UserId,
 635        contact_user_id: UserId,
 636    ) -> Result<()> {
 637        self.transaction(|tx| async move {
 638            let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
 639                (user_id, contact_user_id, true)
 640            } else {
 641                (contact_user_id, user_id, false)
 642            };
 643
 644            let result = contact::Entity::update_many()
 645                .set(contact::ActiveModel {
 646                    should_notify: ActiveValue::set(false),
 647                    ..Default::default()
 648                })
 649                .filter(
 650                    contact::Column::UserIdA
 651                        .eq(id_a)
 652                        .and(contact::Column::UserIdB.eq(id_b))
 653                        .and(
 654                            contact::Column::AToB
 655                                .eq(a_to_b)
 656                                .and(contact::Column::Accepted.eq(true))
 657                                .or(contact::Column::AToB
 658                                    .ne(a_to_b)
 659                                    .and(contact::Column::Accepted.eq(false))),
 660                        ),
 661                )
 662                .exec(&*tx)
 663                .await?;
 664            if result.rows_affected == 0 {
 665                Err(anyhow!("no such contact request"))?
 666            } else {
 667                Ok(())
 668            }
 669        })
 670        .await
 671    }
 672
 673    pub async fn respond_to_contact_request(
 674        &self,
 675        responder_id: UserId,
 676        requester_id: UserId,
 677        accept: bool,
 678    ) -> Result<()> {
 679        self.transaction(|tx| async move {
 680            let (id_a, id_b, a_to_b) = if responder_id < requester_id {
 681                (responder_id, requester_id, false)
 682            } else {
 683                (requester_id, responder_id, true)
 684            };
 685            let rows_affected = if accept {
 686                let result = contact::Entity::update_many()
 687                    .set(contact::ActiveModel {
 688                        accepted: ActiveValue::set(true),
 689                        should_notify: ActiveValue::set(true),
 690                        ..Default::default()
 691                    })
 692                    .filter(
 693                        contact::Column::UserIdA
 694                            .eq(id_a)
 695                            .and(contact::Column::UserIdB.eq(id_b))
 696                            .and(contact::Column::AToB.eq(a_to_b)),
 697                    )
 698                    .exec(&*tx)
 699                    .await?;
 700                result.rows_affected
 701            } else {
 702                let result = contact::Entity::delete_many()
 703                    .filter(
 704                        contact::Column::UserIdA
 705                            .eq(id_a)
 706                            .and(contact::Column::UserIdB.eq(id_b))
 707                            .and(contact::Column::AToB.eq(a_to_b))
 708                            .and(contact::Column::Accepted.eq(false)),
 709                    )
 710                    .exec(&*tx)
 711                    .await?;
 712
 713                result.rows_affected
 714            };
 715
 716            if rows_affected == 1 {
 717                Ok(())
 718            } else {
 719                Err(anyhow!("no such contact request"))?
 720            }
 721        })
 722        .await
 723    }
 724
 725    pub fn fuzzy_like_string(string: &str) -> String {
 726        let mut result = String::with_capacity(string.len() * 2 + 1);
 727        for c in string.chars() {
 728            if c.is_alphanumeric() {
 729                result.push('%');
 730                result.push(c);
 731            }
 732        }
 733        result.push('%');
 734        result
 735    }
 736
 737    pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
 738        self.transaction(|tx| async {
 739            let tx = tx;
 740            let like_string = Self::fuzzy_like_string(name_query);
 741            let query = "
 742                SELECT users.*
 743                FROM users
 744                WHERE github_login ILIKE $1
 745                ORDER BY github_login <-> $2
 746                LIMIT $3
 747            ";
 748
 749            Ok(user::Entity::find()
 750                .from_raw_sql(Statement::from_sql_and_values(
 751                    self.pool.get_database_backend(),
 752                    query.into(),
 753                    vec![like_string.into(), name_query.into(), limit.into()],
 754                ))
 755                .all(&*tx)
 756                .await?)
 757        })
 758        .await
 759    }
 760
 761    // signups
 762
 763    pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
 764        self.transaction(|tx| async move {
 765            signup::Entity::insert(signup::ActiveModel {
 766                email_address: ActiveValue::set(signup.email_address.clone()),
 767                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 768                email_confirmation_sent: ActiveValue::set(false),
 769                platform_mac: ActiveValue::set(signup.platform_mac),
 770                platform_windows: ActiveValue::set(signup.platform_windows),
 771                platform_linux: ActiveValue::set(signup.platform_linux),
 772                platform_unknown: ActiveValue::set(false),
 773                editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
 774                programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
 775                device_id: ActiveValue::set(signup.device_id.clone()),
 776                added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
 777                ..Default::default()
 778            })
 779            .on_conflict(
 780                OnConflict::column(signup::Column::EmailAddress)
 781                    .update_columns([
 782                        signup::Column::PlatformMac,
 783                        signup::Column::PlatformWindows,
 784                        signup::Column::PlatformLinux,
 785                        signup::Column::EditorFeatures,
 786                        signup::Column::ProgrammingLanguages,
 787                        signup::Column::DeviceId,
 788                        signup::Column::AddedToMailingList,
 789                    ])
 790                    .to_owned(),
 791            )
 792            .exec(&*tx)
 793            .await?;
 794            Ok(())
 795        })
 796        .await
 797    }
 798
 799    pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
 800        self.transaction(|tx| async move {
 801            let signup = signup::Entity::find()
 802                .filter(signup::Column::EmailAddress.eq(email_address))
 803                .one(&*tx)
 804                .await?
 805                .ok_or_else(|| {
 806                    anyhow!("signup with email address {} doesn't exist", email_address)
 807                })?;
 808
 809            Ok(signup)
 810        })
 811        .await
 812    }
 813
 814    pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
 815        self.transaction(|tx| async move {
 816            let query = "
 817                SELECT
 818                    COUNT(*) as count,
 819                    COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
 820                    COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
 821                    COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
 822                    COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
 823                FROM (
 824                    SELECT *
 825                    FROM signups
 826                    WHERE
 827                        NOT email_confirmation_sent
 828                ) AS unsent
 829            ";
 830            Ok(
 831                WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
 832                    self.pool.get_database_backend(),
 833                    query.into(),
 834                    vec![],
 835                ))
 836                .one(&*tx)
 837                .await?
 838                .ok_or_else(|| anyhow!("invalid result"))?,
 839            )
 840        })
 841        .await
 842    }
 843
 844    pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
 845        let emails = invites
 846            .iter()
 847            .map(|s| s.email_address.as_str())
 848            .collect::<Vec<_>>();
 849        self.transaction(|tx| async {
 850            let tx = tx;
 851            signup::Entity::update_many()
 852                .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
 853                .set(signup::ActiveModel {
 854                    email_confirmation_sent: ActiveValue::set(true),
 855                    ..Default::default()
 856                })
 857                .exec(&*tx)
 858                .await?;
 859            Ok(())
 860        })
 861        .await
 862    }
 863
 864    pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
 865        self.transaction(|tx| async move {
 866            Ok(signup::Entity::find()
 867                .select_only()
 868                .column(signup::Column::EmailAddress)
 869                .column(signup::Column::EmailConfirmationCode)
 870                .filter(
 871                    signup::Column::EmailConfirmationSent.eq(false).and(
 872                        signup::Column::PlatformMac
 873                            .eq(true)
 874                            .or(signup::Column::PlatformUnknown.eq(true)),
 875                    ),
 876                )
 877                .order_by_asc(signup::Column::CreatedAt)
 878                .limit(count as u64)
 879                .into_model()
 880                .all(&*tx)
 881                .await?)
 882        })
 883        .await
 884    }
 885
 886    // invite codes
 887
 888    pub async fn create_invite_from_code(
 889        &self,
 890        code: &str,
 891        email_address: &str,
 892        device_id: Option<&str>,
 893        added_to_mailing_list: bool,
 894    ) -> Result<Invite> {
 895        self.transaction(|tx| async move {
 896            let existing_user = user::Entity::find()
 897                .filter(user::Column::EmailAddress.eq(email_address))
 898                .one(&*tx)
 899                .await?;
 900
 901            if existing_user.is_some() {
 902                Err(anyhow!("email address is already in use"))?;
 903            }
 904
 905            let inviting_user_with_invites = match user::Entity::find()
 906                .filter(
 907                    user::Column::InviteCode
 908                        .eq(code)
 909                        .and(user::Column::InviteCount.gt(0)),
 910                )
 911                .one(&*tx)
 912                .await?
 913            {
 914                Some(inviting_user) => inviting_user,
 915                None => {
 916                    return Err(Error::Http(
 917                        StatusCode::UNAUTHORIZED,
 918                        "unable to find an invite code with invites remaining".to_string(),
 919                    ))?
 920                }
 921            };
 922            user::Entity::update_many()
 923                .filter(
 924                    user::Column::Id
 925                        .eq(inviting_user_with_invites.id)
 926                        .and(user::Column::InviteCount.gt(0)),
 927                )
 928                .col_expr(
 929                    user::Column::InviteCount,
 930                    Expr::col(user::Column::InviteCount).sub(1),
 931                )
 932                .exec(&*tx)
 933                .await?;
 934
 935            let signup = signup::Entity::insert(signup::ActiveModel {
 936                email_address: ActiveValue::set(email_address.into()),
 937                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 938                email_confirmation_sent: ActiveValue::set(false),
 939                inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
 940                platform_linux: ActiveValue::set(false),
 941                platform_mac: ActiveValue::set(false),
 942                platform_windows: ActiveValue::set(false),
 943                platform_unknown: ActiveValue::set(true),
 944                device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
 945                added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
 946                ..Default::default()
 947            })
 948            .on_conflict(
 949                OnConflict::column(signup::Column::EmailAddress)
 950                    .update_column(signup::Column::InvitingUserId)
 951                    .to_owned(),
 952            )
 953            .exec_with_returning(&*tx)
 954            .await?;
 955
 956            Ok(Invite {
 957                email_address: signup.email_address,
 958                email_confirmation_code: signup.email_confirmation_code,
 959            })
 960        })
 961        .await
 962    }
 963
 964    pub async fn create_user_from_invite(
 965        &self,
 966        invite: &Invite,
 967        user: NewUserParams,
 968    ) -> Result<Option<NewUserResult>> {
 969        self.transaction(|tx| async {
 970            let tx = tx;
 971            let signup = signup::Entity::find()
 972                .filter(
 973                    signup::Column::EmailAddress
 974                        .eq(invite.email_address.as_str())
 975                        .and(
 976                            signup::Column::EmailConfirmationCode
 977                                .eq(invite.email_confirmation_code.as_str()),
 978                        ),
 979                )
 980                .one(&*tx)
 981                .await?
 982                .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
 983
 984            if signup.user_id.is_some() {
 985                return Ok(None);
 986            }
 987
 988            let user = user::Entity::insert(user::ActiveModel {
 989                email_address: ActiveValue::set(Some(invite.email_address.clone())),
 990                github_login: ActiveValue::set(user.github_login.clone()),
 991                github_user_id: ActiveValue::set(Some(user.github_user_id)),
 992                admin: ActiveValue::set(false),
 993                invite_count: ActiveValue::set(user.invite_count),
 994                invite_code: ActiveValue::set(Some(random_invite_code())),
 995                metrics_id: ActiveValue::set(Uuid::new_v4()),
 996                ..Default::default()
 997            })
 998            .on_conflict(
 999                OnConflict::column(user::Column::GithubLogin)
1000                    .update_columns([
1001                        user::Column::EmailAddress,
1002                        user::Column::GithubUserId,
1003                        user::Column::Admin,
1004                    ])
1005                    .to_owned(),
1006            )
1007            .exec_with_returning(&*tx)
1008            .await?;
1009
1010            let mut signup = signup.into_active_model();
1011            signup.user_id = ActiveValue::set(Some(user.id));
1012            let signup = signup.update(&*tx).await?;
1013
1014            if let Some(inviting_user_id) = signup.inviting_user_id {
1015                let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1016                    (inviting_user_id, user.id, true)
1017                } else {
1018                    (user.id, inviting_user_id, false)
1019                };
1020
1021                contact::Entity::insert(contact::ActiveModel {
1022                    user_id_a: ActiveValue::set(user_id_a),
1023                    user_id_b: ActiveValue::set(user_id_b),
1024                    a_to_b: ActiveValue::set(a_to_b),
1025                    should_notify: ActiveValue::set(true),
1026                    accepted: ActiveValue::set(true),
1027                    ..Default::default()
1028                })
1029                .on_conflict(OnConflict::new().do_nothing().to_owned())
1030                .exec_without_returning(&*tx)
1031                .await?;
1032            }
1033
1034            Ok(Some(NewUserResult {
1035                user_id: user.id,
1036                metrics_id: user.metrics_id.to_string(),
1037                inviting_user_id: signup.inviting_user_id,
1038                signup_device_id: signup.device_id,
1039            }))
1040        })
1041        .await
1042    }
1043
1044    pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1045        self.transaction(|tx| async move {
1046            if count > 0 {
1047                user::Entity::update_many()
1048                    .filter(
1049                        user::Column::Id
1050                            .eq(id)
1051                            .and(user::Column::InviteCode.is_null()),
1052                    )
1053                    .set(user::ActiveModel {
1054                        invite_code: ActiveValue::set(Some(random_invite_code())),
1055                        ..Default::default()
1056                    })
1057                    .exec(&*tx)
1058                    .await?;
1059            }
1060
1061            user::Entity::update_many()
1062                .filter(user::Column::Id.eq(id))
1063                .set(user::ActiveModel {
1064                    invite_count: ActiveValue::set(count),
1065                    ..Default::default()
1066                })
1067                .exec(&*tx)
1068                .await?;
1069            Ok(())
1070        })
1071        .await
1072    }
1073
1074    pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1075        self.transaction(|tx| async move {
1076            match user::Entity::find_by_id(id).one(&*tx).await? {
1077                Some(user) if user.invite_code.is_some() => {
1078                    Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1079                }
1080                _ => Ok(None),
1081            }
1082        })
1083        .await
1084    }
1085
1086    pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1087        self.transaction(|tx| async move {
1088            user::Entity::find()
1089                .filter(user::Column::InviteCode.eq(code))
1090                .one(&*tx)
1091                .await?
1092                .ok_or_else(|| {
1093                    Error::Http(
1094                        StatusCode::NOT_FOUND,
1095                        "that invite code does not exist".to_string(),
1096                    )
1097                })
1098        })
1099        .await
1100    }
1101
1102    // rooms
1103
1104    pub async fn incoming_call_for_user(
1105        &self,
1106        user_id: UserId,
1107    ) -> Result<Option<proto::IncomingCall>> {
1108        self.transaction(|tx| async move {
1109            let pending_participant = room_participant::Entity::find()
1110                .filter(
1111                    room_participant::Column::UserId
1112                        .eq(user_id)
1113                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1114                )
1115                .one(&*tx)
1116                .await?;
1117
1118            if let Some(pending_participant) = pending_participant {
1119                let room = self.get_room(pending_participant.room_id, &tx).await?;
1120                Ok(Self::build_incoming_call(&room, user_id))
1121            } else {
1122                Ok(None)
1123            }
1124        })
1125        .await
1126    }
1127
1128    pub async fn create_room(
1129        &self,
1130        user_id: UserId,
1131        connection: ConnectionId,
1132        live_kit_room: &str,
1133    ) -> Result<RoomGuard<proto::Room>> {
1134        self.room_transaction(|tx| async move {
1135            let room = room::ActiveModel {
1136                live_kit_room: ActiveValue::set(live_kit_room.into()),
1137                ..Default::default()
1138            }
1139            .insert(&*tx)
1140            .await?;
1141            let room_id = room.id;
1142
1143            room_participant::ActiveModel {
1144                room_id: ActiveValue::set(room_id),
1145                user_id: ActiveValue::set(user_id),
1146                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1147                answering_connection_server_id: ActiveValue::set(Some(ServerId(
1148                    connection.owner_id as i32,
1149                ))),
1150                answering_connection_lost: ActiveValue::set(false),
1151                calling_user_id: ActiveValue::set(user_id),
1152                calling_connection_id: ActiveValue::set(connection.id as i32),
1153                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1154                    connection.owner_id as i32,
1155                ))),
1156                ..Default::default()
1157            }
1158            .insert(&*tx)
1159            .await?;
1160
1161            let room = self.get_room(room_id, &tx).await?;
1162            Ok((room_id, room))
1163        })
1164        .await
1165    }
1166
1167    pub async fn call(
1168        &self,
1169        room_id: RoomId,
1170        calling_user_id: UserId,
1171        calling_connection: ConnectionId,
1172        called_user_id: UserId,
1173        initial_project_id: Option<ProjectId>,
1174    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1175        self.room_transaction(|tx| async move {
1176            room_participant::ActiveModel {
1177                room_id: ActiveValue::set(room_id),
1178                user_id: ActiveValue::set(called_user_id),
1179                answering_connection_lost: ActiveValue::set(false),
1180                calling_user_id: ActiveValue::set(calling_user_id),
1181                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1182                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1183                    calling_connection.owner_id as i32,
1184                ))),
1185                initial_project_id: ActiveValue::set(initial_project_id),
1186                ..Default::default()
1187            }
1188            .insert(&*tx)
1189            .await?;
1190
1191            let room = self.get_room(room_id, &tx).await?;
1192            let incoming_call = Self::build_incoming_call(&room, called_user_id)
1193                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1194            Ok((room_id, (room, incoming_call)))
1195        })
1196        .await
1197    }
1198
1199    pub async fn call_failed(
1200        &self,
1201        room_id: RoomId,
1202        called_user_id: UserId,
1203    ) -> Result<RoomGuard<proto::Room>> {
1204        self.room_transaction(|tx| async move {
1205            room_participant::Entity::delete_many()
1206                .filter(
1207                    room_participant::Column::RoomId
1208                        .eq(room_id)
1209                        .and(room_participant::Column::UserId.eq(called_user_id)),
1210                )
1211                .exec(&*tx)
1212                .await?;
1213            let room = self.get_room(room_id, &tx).await?;
1214            Ok((room_id, room))
1215        })
1216        .await
1217    }
1218
1219    pub async fn decline_call(
1220        &self,
1221        expected_room_id: Option<RoomId>,
1222        user_id: UserId,
1223    ) -> Result<Option<RoomGuard<proto::Room>>> {
1224        self.optional_room_transaction(|tx| async move {
1225            let mut filter = Condition::all()
1226                .add(room_participant::Column::UserId.eq(user_id))
1227                .add(room_participant::Column::AnsweringConnectionId.is_null());
1228            if let Some(room_id) = expected_room_id {
1229                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1230            }
1231            let participant = room_participant::Entity::find()
1232                .filter(filter)
1233                .one(&*tx)
1234                .await?;
1235
1236            let participant = if let Some(participant) = participant {
1237                participant
1238            } else if expected_room_id.is_some() {
1239                return Err(anyhow!("could not find call to decline"))?;
1240            } else {
1241                return Ok(None);
1242            };
1243
1244            let room_id = participant.room_id;
1245            room_participant::Entity::delete(participant.into_active_model())
1246                .exec(&*tx)
1247                .await?;
1248
1249            let room = self.get_room(room_id, &tx).await?;
1250            Ok(Some((room_id, room)))
1251        })
1252        .await
1253    }
1254
1255    pub async fn cancel_call(
1256        &self,
1257        room_id: RoomId,
1258        calling_connection: ConnectionId,
1259        called_user_id: UserId,
1260    ) -> Result<RoomGuard<proto::Room>> {
1261        self.room_transaction(|tx| async move {
1262            let participant = room_participant::Entity::find()
1263                .filter(
1264                    Condition::all()
1265                        .add(room_participant::Column::UserId.eq(called_user_id))
1266                        .add(room_participant::Column::RoomId.eq(room_id))
1267                        .add(
1268                            room_participant::Column::CallingConnectionId
1269                                .eq(calling_connection.id as i32),
1270                        )
1271                        .add(
1272                            room_participant::Column::CallingConnectionServerId
1273                                .eq(calling_connection.owner_id as i32),
1274                        )
1275                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1276                )
1277                .one(&*tx)
1278                .await?
1279                .ok_or_else(|| anyhow!("no call to cancel"))?;
1280            let room_id = participant.room_id;
1281
1282            room_participant::Entity::delete(participant.into_active_model())
1283                .exec(&*tx)
1284                .await?;
1285
1286            let room = self.get_room(room_id, &tx).await?;
1287            Ok((room_id, room))
1288        })
1289        .await
1290    }
1291
1292    pub async fn join_room(
1293        &self,
1294        room_id: RoomId,
1295        user_id: UserId,
1296        connection: ConnectionId,
1297    ) -> Result<RoomGuard<proto::Room>> {
1298        self.room_transaction(|tx| async move {
1299            let result = room_participant::Entity::update_many()
1300                .filter(
1301                    Condition::all()
1302                        .add(room_participant::Column::RoomId.eq(room_id))
1303                        .add(room_participant::Column::UserId.eq(user_id))
1304                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1305                )
1306                .set(room_participant::ActiveModel {
1307                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1308                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1309                        connection.owner_id as i32,
1310                    ))),
1311                    answering_connection_lost: ActiveValue::set(false),
1312                    ..Default::default()
1313                })
1314                .exec(&*tx)
1315                .await?;
1316            if result.rows_affected == 0 {
1317                Err(anyhow!("room does not exist or was already joined"))?
1318            } else {
1319                let room = self.get_room(room_id, &tx).await?;
1320                Ok((room_id, room))
1321            }
1322        })
1323        .await
1324    }
1325
1326    pub async fn rejoin_room(
1327        &self,
1328        rejoin_room: proto::RejoinRoom,
1329        user_id: UserId,
1330        connection: ConnectionId,
1331    ) -> Result<RoomGuard<RejoinedRoom>> {
1332        self.room_transaction(|tx| async {
1333            let tx = tx;
1334            let room_id = RoomId::from_proto(rejoin_room.id);
1335            let participant_update = room_participant::Entity::update_many()
1336                .filter(
1337                    Condition::all()
1338                        .add(room_participant::Column::RoomId.eq(room_id))
1339                        .add(room_participant::Column::UserId.eq(user_id))
1340                        .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1341                        .add(
1342                            Condition::any()
1343                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1344                                .add(
1345                                    room_participant::Column::AnsweringConnectionServerId
1346                                        .ne(connection.owner_id as i32),
1347                                ),
1348                        ),
1349                )
1350                .set(room_participant::ActiveModel {
1351                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1352                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1353                        connection.owner_id as i32,
1354                    ))),
1355                    answering_connection_lost: ActiveValue::set(false),
1356                    ..Default::default()
1357                })
1358                .exec(&*tx)
1359                .await?;
1360            if participant_update.rows_affected == 0 {
1361                return Err(anyhow!("room does not exist or was already joined"))?;
1362            }
1363
1364            let mut reshared_projects = Vec::new();
1365            for reshared_project in &rejoin_room.reshared_projects {
1366                let project_id = ProjectId::from_proto(reshared_project.project_id);
1367                let project = project::Entity::find_by_id(project_id)
1368                    .one(&*tx)
1369                    .await?
1370                    .ok_or_else(|| anyhow!("project does not exist"))?;
1371                if project.host_user_id != user_id {
1372                    return Err(anyhow!("no such project"))?;
1373                }
1374
1375                let mut collaborators = project
1376                    .find_related(project_collaborator::Entity)
1377                    .all(&*tx)
1378                    .await?;
1379                let host_ix = collaborators
1380                    .iter()
1381                    .position(|collaborator| {
1382                        collaborator.user_id == user_id && collaborator.is_host
1383                    })
1384                    .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1385                let host = collaborators.swap_remove(host_ix);
1386                let old_connection_id = host.connection();
1387
1388                project::Entity::update(project::ActiveModel {
1389                    host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1390                    host_connection_server_id: ActiveValue::set(Some(ServerId(
1391                        connection.owner_id as i32,
1392                    ))),
1393                    ..project.into_active_model()
1394                })
1395                .exec(&*tx)
1396                .await?;
1397                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1398                    connection_id: ActiveValue::set(connection.id as i32),
1399                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1400                    ..host.into_active_model()
1401                })
1402                .exec(&*tx)
1403                .await?;
1404
1405                self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1406                    .await?;
1407
1408                reshared_projects.push(ResharedProject {
1409                    id: project_id,
1410                    old_connection_id,
1411                    collaborators: collaborators
1412                        .iter()
1413                        .map(|collaborator| ProjectCollaborator {
1414                            connection_id: collaborator.connection(),
1415                            user_id: collaborator.user_id,
1416                            replica_id: collaborator.replica_id,
1417                            is_host: collaborator.is_host,
1418                        })
1419                        .collect(),
1420                    worktrees: reshared_project.worktrees.clone(),
1421                });
1422            }
1423
1424            project::Entity::delete_many()
1425                .filter(
1426                    Condition::all()
1427                        .add(project::Column::RoomId.eq(room_id))
1428                        .add(project::Column::HostUserId.eq(user_id))
1429                        .add(
1430                            project::Column::Id
1431                                .is_not_in(reshared_projects.iter().map(|project| project.id)),
1432                        ),
1433                )
1434                .exec(&*tx)
1435                .await?;
1436
1437            let mut rejoined_projects = Vec::new();
1438            for rejoined_project in &rejoin_room.rejoined_projects {
1439                let project_id = ProjectId::from_proto(rejoined_project.id);
1440                let Some(project) = project::Entity::find_by_id(project_id)
1441                    .one(&*tx)
1442                    .await? else { continue };
1443
1444                let mut worktrees = Vec::new();
1445                let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1446                for db_worktree in db_worktrees {
1447                    let mut worktree = RejoinedWorktree {
1448                        id: db_worktree.id as u64,
1449                        abs_path: db_worktree.abs_path,
1450                        root_name: db_worktree.root_name,
1451                        visible: db_worktree.visible,
1452                        updated_entries: Default::default(),
1453                        removed_entries: Default::default(),
1454                        diagnostic_summaries: Default::default(),
1455                        scan_id: db_worktree.scan_id as u64,
1456                        completed_scan_id: db_worktree.completed_scan_id as u64,
1457                    };
1458
1459                    let rejoined_worktree = rejoined_project
1460                        .worktrees
1461                        .iter()
1462                        .find(|worktree| worktree.id == db_worktree.id as u64);
1463                    let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1464                        worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1465                    } else {
1466                        worktree_entry::Column::IsDeleted.eq(false)
1467                    };
1468
1469                    let mut db_entries = worktree_entry::Entity::find()
1470                        .filter(
1471                            Condition::all()
1472                                .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1473                                .add(entry_filter),
1474                        )
1475                        .stream(&*tx)
1476                        .await?;
1477
1478                    while let Some(db_entry) = db_entries.next().await {
1479                        let db_entry = db_entry?;
1480                        if db_entry.is_deleted {
1481                            worktree.removed_entries.push(db_entry.id as u64);
1482                        } else {
1483                            worktree.updated_entries.push(proto::Entry {
1484                                id: db_entry.id as u64,
1485                                is_dir: db_entry.is_dir,
1486                                path: db_entry.path,
1487                                inode: db_entry.inode as u64,
1488                                mtime: Some(proto::Timestamp {
1489                                    seconds: db_entry.mtime_seconds as u64,
1490                                    nanos: db_entry.mtime_nanos as u32,
1491                                }),
1492                                is_symlink: db_entry.is_symlink,
1493                                is_ignored: db_entry.is_ignored,
1494                            });
1495                        }
1496                    }
1497
1498                    worktrees.push(worktree);
1499                }
1500
1501                let language_servers = project
1502                    .find_related(language_server::Entity)
1503                    .all(&*tx)
1504                    .await?
1505                    .into_iter()
1506                    .map(|language_server| proto::LanguageServer {
1507                        id: language_server.id as u64,
1508                        name: language_server.name,
1509                    })
1510                    .collect::<Vec<_>>();
1511
1512                let mut collaborators = project
1513                    .find_related(project_collaborator::Entity)
1514                    .all(&*tx)
1515                    .await?;
1516                let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1517                    .iter()
1518                    .position(|collaborator| collaborator.user_id == user_id)
1519                {
1520                    collaborators.swap_remove(self_collaborator_ix)
1521                } else {
1522                    continue;
1523                };
1524                let old_connection_id = self_collaborator.connection();
1525                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1526                    connection_id: ActiveValue::set(connection.id as i32),
1527                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1528                    ..self_collaborator.into_active_model()
1529                })
1530                .exec(&*tx)
1531                .await?;
1532
1533                let collaborators = collaborators
1534                    .into_iter()
1535                    .map(|collaborator| ProjectCollaborator {
1536                        connection_id: collaborator.connection(),
1537                        user_id: collaborator.user_id,
1538                        replica_id: collaborator.replica_id,
1539                        is_host: collaborator.is_host,
1540                    })
1541                    .collect::<Vec<_>>();
1542
1543                rejoined_projects.push(RejoinedProject {
1544                    id: project_id,
1545                    old_connection_id,
1546                    collaborators,
1547                    worktrees,
1548                    language_servers,
1549                });
1550            }
1551
1552            let room = self.get_room(room_id, &tx).await?;
1553            Ok((
1554                room_id,
1555                RejoinedRoom {
1556                    room,
1557                    rejoined_projects,
1558                    reshared_projects,
1559                },
1560            ))
1561        })
1562        .await
1563    }
1564
1565    pub async fn leave_room(
1566        &self,
1567        connection: ConnectionId,
1568    ) -> Result<Option<RoomGuard<LeftRoom>>> {
1569        self.optional_room_transaction(|tx| async move {
1570            let leaving_participant = room_participant::Entity::find()
1571                .filter(
1572                    Condition::all()
1573                        .add(
1574                            room_participant::Column::AnsweringConnectionId
1575                                .eq(connection.id as i32),
1576                        )
1577                        .add(
1578                            room_participant::Column::AnsweringConnectionServerId
1579                                .eq(connection.owner_id as i32),
1580                        ),
1581                )
1582                .one(&*tx)
1583                .await?;
1584
1585            if let Some(leaving_participant) = leaving_participant {
1586                // Leave room.
1587                let room_id = leaving_participant.room_id;
1588                room_participant::Entity::delete_by_id(leaving_participant.id)
1589                    .exec(&*tx)
1590                    .await?;
1591
1592                // Cancel pending calls initiated by the leaving user.
1593                let called_participants = room_participant::Entity::find()
1594                    .filter(
1595                        Condition::all()
1596                            .add(
1597                                room_participant::Column::CallingUserId
1598                                    .eq(leaving_participant.user_id),
1599                            )
1600                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
1601                    )
1602                    .all(&*tx)
1603                    .await?;
1604                room_participant::Entity::delete_many()
1605                    .filter(
1606                        room_participant::Column::Id
1607                            .is_in(called_participants.iter().map(|participant| participant.id)),
1608                    )
1609                    .exec(&*tx)
1610                    .await?;
1611                let canceled_calls_to_user_ids = called_participants
1612                    .into_iter()
1613                    .map(|participant| participant.user_id)
1614                    .collect();
1615
1616                // Detect left projects.
1617                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1618                enum QueryProjectIds {
1619                    ProjectId,
1620                }
1621                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1622                    .select_only()
1623                    .column_as(
1624                        project_collaborator::Column::ProjectId,
1625                        QueryProjectIds::ProjectId,
1626                    )
1627                    .filter(
1628                        Condition::all()
1629                            .add(
1630                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1631                            )
1632                            .add(
1633                                project_collaborator::Column::ConnectionServerId
1634                                    .eq(connection.owner_id as i32),
1635                            ),
1636                    )
1637                    .into_values::<_, QueryProjectIds>()
1638                    .all(&*tx)
1639                    .await?;
1640                let mut left_projects = HashMap::default();
1641                let mut collaborators = project_collaborator::Entity::find()
1642                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1643                    .stream(&*tx)
1644                    .await?;
1645                while let Some(collaborator) = collaborators.next().await {
1646                    let collaborator = collaborator?;
1647                    let left_project =
1648                        left_projects
1649                            .entry(collaborator.project_id)
1650                            .or_insert(LeftProject {
1651                                id: collaborator.project_id,
1652                                host_user_id: Default::default(),
1653                                connection_ids: Default::default(),
1654                                host_connection_id: Default::default(),
1655                            });
1656
1657                    let collaborator_connection_id = collaborator.connection();
1658                    if collaborator_connection_id != connection {
1659                        left_project.connection_ids.push(collaborator_connection_id);
1660                    }
1661
1662                    if collaborator.is_host {
1663                        left_project.host_user_id = collaborator.user_id;
1664                        left_project.host_connection_id = collaborator_connection_id;
1665                    }
1666                }
1667                drop(collaborators);
1668
1669                // Leave projects.
1670                project_collaborator::Entity::delete_many()
1671                    .filter(
1672                        Condition::all()
1673                            .add(
1674                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1675                            )
1676                            .add(
1677                                project_collaborator::Column::ConnectionServerId
1678                                    .eq(connection.owner_id as i32),
1679                            ),
1680                    )
1681                    .exec(&*tx)
1682                    .await?;
1683
1684                // Unshare projects.
1685                project::Entity::delete_many()
1686                    .filter(
1687                        Condition::all()
1688                            .add(project::Column::RoomId.eq(room_id))
1689                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
1690                            .add(
1691                                project::Column::HostConnectionServerId
1692                                    .eq(connection.owner_id as i32),
1693                            ),
1694                    )
1695                    .exec(&*tx)
1696                    .await?;
1697
1698                let room = self.get_room(room_id, &tx).await?;
1699                if room.participants.is_empty() {
1700                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
1701                }
1702
1703                let left_room = LeftRoom {
1704                    room,
1705                    left_projects,
1706                    canceled_calls_to_user_ids,
1707                };
1708
1709                if left_room.room.participants.is_empty() {
1710                    self.rooms.remove(&room_id);
1711                }
1712
1713                Ok(Some((room_id, left_room)))
1714            } else {
1715                Ok(None)
1716            }
1717        })
1718        .await
1719    }
1720
1721    pub async fn follow(
1722        &self,
1723        room_id: RoomId,
1724        project_id: ProjectId,
1725        leader_connection: ConnectionId,
1726        follower_connection: ConnectionId,
1727    ) -> Result<RoomGuard<proto::Room>> {
1728        self.room_transaction(|tx| async move {
1729            follower::ActiveModel {
1730                room_id: ActiveValue::set(room_id),
1731                project_id: ActiveValue::set(project_id),
1732                leader_connection_server_id: ActiveValue::set(ServerId(
1733                    leader_connection.owner_id as i32,
1734                )),
1735                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1736                follower_connection_server_id: ActiveValue::set(ServerId(
1737                    follower_connection.owner_id as i32,
1738                )),
1739                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1740                ..Default::default()
1741            }
1742            .insert(&*tx)
1743            .await?;
1744
1745            Ok((room_id, self.get_room(room_id, &*tx).await?))
1746        })
1747        .await
1748    }
1749
1750    pub async fn unfollow(
1751        &self,
1752        room_id: RoomId,
1753        project_id: ProjectId,
1754        leader_connection: ConnectionId,
1755        follower_connection: ConnectionId,
1756    ) -> Result<RoomGuard<proto::Room>> {
1757        self.room_transaction(|tx| async move {
1758            follower::Entity::delete_many()
1759                .filter(
1760                    Condition::all()
1761                        .add(follower::Column::RoomId.eq(room_id))
1762                        .add(follower::Column::ProjectId.eq(project_id))
1763                        .add(
1764                            Condition::any()
1765                                .add(
1766                                    follower::Column::LeaderConnectionServerId
1767                                        .eq(leader_connection.owner_id)
1768                                        .and(
1769                                            follower::Column::LeaderConnectionId
1770                                                .eq(leader_connection.id),
1771                                        ),
1772                                )
1773                                .add(
1774                                    follower::Column::FollowerConnectionServerId
1775                                        .eq(follower_connection.owner_id)
1776                                        .and(
1777                                            follower::Column::FollowerConnectionId
1778                                                .eq(follower_connection.id),
1779                                        ),
1780                                ),
1781                        ),
1782                )
1783                .exec(&*tx)
1784                .await?;
1785
1786            Ok((room_id, self.get_room(room_id, &*tx).await?))
1787        })
1788        .await
1789    }
1790
1791    pub async fn update_room_participant_location(
1792        &self,
1793        room_id: RoomId,
1794        connection: ConnectionId,
1795        location: proto::ParticipantLocation,
1796    ) -> Result<RoomGuard<proto::Room>> {
1797        self.room_transaction(|tx| async {
1798            let tx = tx;
1799            let location_kind;
1800            let location_project_id;
1801            match location
1802                .variant
1803                .as_ref()
1804                .ok_or_else(|| anyhow!("invalid location"))?
1805            {
1806                proto::participant_location::Variant::SharedProject(project) => {
1807                    location_kind = 0;
1808                    location_project_id = Some(ProjectId::from_proto(project.id));
1809                }
1810                proto::participant_location::Variant::UnsharedProject(_) => {
1811                    location_kind = 1;
1812                    location_project_id = None;
1813                }
1814                proto::participant_location::Variant::External(_) => {
1815                    location_kind = 2;
1816                    location_project_id = None;
1817                }
1818            }
1819
1820            let result = room_participant::Entity::update_many()
1821                .filter(
1822                    Condition::all()
1823                        .add(room_participant::Column::RoomId.eq(room_id))
1824                        .add(
1825                            room_participant::Column::AnsweringConnectionId
1826                                .eq(connection.id as i32),
1827                        )
1828                        .add(
1829                            room_participant::Column::AnsweringConnectionServerId
1830                                .eq(connection.owner_id as i32),
1831                        ),
1832                )
1833                .set(room_participant::ActiveModel {
1834                    location_kind: ActiveValue::set(Some(location_kind)),
1835                    location_project_id: ActiveValue::set(location_project_id),
1836                    ..Default::default()
1837                })
1838                .exec(&*tx)
1839                .await?;
1840
1841            if result.rows_affected == 1 {
1842                let room = self.get_room(room_id, &tx).await?;
1843                Ok((room_id, room))
1844            } else {
1845                Err(anyhow!("could not update room participant location"))?
1846            }
1847        })
1848        .await
1849    }
1850
1851    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1852        self.transaction(|tx| async move {
1853            let participant = room_participant::Entity::find()
1854                .filter(
1855                    Condition::all()
1856                        .add(
1857                            room_participant::Column::AnsweringConnectionId
1858                                .eq(connection.id as i32),
1859                        )
1860                        .add(
1861                            room_participant::Column::AnsweringConnectionServerId
1862                                .eq(connection.owner_id as i32),
1863                        ),
1864                )
1865                .one(&*tx)
1866                .await?
1867                .ok_or_else(|| anyhow!("not a participant in any room"))?;
1868
1869            room_participant::Entity::update(room_participant::ActiveModel {
1870                answering_connection_lost: ActiveValue::set(true),
1871                ..participant.into_active_model()
1872            })
1873            .exec(&*tx)
1874            .await?;
1875
1876            Ok(())
1877        })
1878        .await
1879    }
1880
1881    fn build_incoming_call(
1882        room: &proto::Room,
1883        called_user_id: UserId,
1884    ) -> Option<proto::IncomingCall> {
1885        let pending_participant = room
1886            .pending_participants
1887            .iter()
1888            .find(|participant| participant.user_id == called_user_id.to_proto())?;
1889
1890        Some(proto::IncomingCall {
1891            room_id: room.id,
1892            calling_user_id: pending_participant.calling_user_id,
1893            participant_user_ids: room
1894                .participants
1895                .iter()
1896                .map(|participant| participant.user_id)
1897                .collect(),
1898            initial_project: room.participants.iter().find_map(|participant| {
1899                let initial_project_id = pending_participant.initial_project_id?;
1900                participant
1901                    .projects
1902                    .iter()
1903                    .find(|project| project.id == initial_project_id)
1904                    .cloned()
1905            }),
1906        })
1907    }
1908
1909    async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1910        let db_room = room::Entity::find_by_id(room_id)
1911            .one(tx)
1912            .await?
1913            .ok_or_else(|| anyhow!("could not find room"))?;
1914
1915        let mut db_participants = db_room
1916            .find_related(room_participant::Entity)
1917            .stream(tx)
1918            .await?;
1919        let mut participants = HashMap::default();
1920        let mut pending_participants = Vec::new();
1921        while let Some(db_participant) = db_participants.next().await {
1922            let db_participant = db_participant?;
1923            if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
1924                .answering_connection_id
1925                .zip(db_participant.answering_connection_server_id)
1926            {
1927                let location = match (
1928                    db_participant.location_kind,
1929                    db_participant.location_project_id,
1930                ) {
1931                    (Some(0), Some(project_id)) => {
1932                        Some(proto::participant_location::Variant::SharedProject(
1933                            proto::participant_location::SharedProject {
1934                                id: project_id.to_proto(),
1935                            },
1936                        ))
1937                    }
1938                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1939                        Default::default(),
1940                    )),
1941                    _ => Some(proto::participant_location::Variant::External(
1942                        Default::default(),
1943                    )),
1944                };
1945
1946                let answering_connection = ConnectionId {
1947                    owner_id: answering_connection_server_id.0 as u32,
1948                    id: answering_connection_id as u32,
1949                };
1950                participants.insert(
1951                    answering_connection,
1952                    proto::Participant {
1953                        user_id: db_participant.user_id.to_proto(),
1954                        peer_id: Some(answering_connection.into()),
1955                        projects: Default::default(),
1956                        location: Some(proto::ParticipantLocation { variant: location }),
1957                    },
1958                );
1959            } else {
1960                pending_participants.push(proto::PendingParticipant {
1961                    user_id: db_participant.user_id.to_proto(),
1962                    calling_user_id: db_participant.calling_user_id.to_proto(),
1963                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1964                });
1965            }
1966        }
1967        drop(db_participants);
1968
1969        let mut db_projects = db_room
1970            .find_related(project::Entity)
1971            .find_with_related(worktree::Entity)
1972            .stream(tx)
1973            .await?;
1974
1975        while let Some(row) = db_projects.next().await {
1976            let (db_project, db_worktree) = row?;
1977            let host_connection = db_project.host_connection()?;
1978            if let Some(participant) = participants.get_mut(&host_connection) {
1979                let project = if let Some(project) = participant
1980                    .projects
1981                    .iter_mut()
1982                    .find(|project| project.id == db_project.id.to_proto())
1983                {
1984                    project
1985                } else {
1986                    participant.projects.push(proto::ParticipantProject {
1987                        id: db_project.id.to_proto(),
1988                        worktree_root_names: Default::default(),
1989                    });
1990                    participant.projects.last_mut().unwrap()
1991                };
1992
1993                if let Some(db_worktree) = db_worktree {
1994                    if db_worktree.visible {
1995                        project.worktree_root_names.push(db_worktree.root_name);
1996                    }
1997                }
1998            }
1999        }        
2000        drop(db_projects);
2001
2002        let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
2003        let mut followers = Vec::new();
2004        while let Some(db_follower) = db_followers.next().await {
2005            let db_follower = db_follower?;
2006            followers.push(proto::Follower {
2007                leader_id: Some(db_follower.leader_connection().into()),
2008                follower_id: Some(db_follower.follower_connection().into()),
2009            });
2010        }
2011
2012        Ok(proto::Room {
2013            id: db_room.id.to_proto(),
2014            live_kit_room: db_room.live_kit_room,
2015            participants: participants.into_values().collect(),
2016            pending_participants,
2017            followers,
2018        })
2019    }
2020
2021    // projects
2022
2023    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
2024        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2025        enum QueryAs {
2026            Count,
2027        }
2028
2029        self.transaction(|tx| async move {
2030            Ok(project::Entity::find()
2031                .select_only()
2032                .column_as(project::Column::Id.count(), QueryAs::Count)
2033                .inner_join(user::Entity)
2034                .filter(user::Column::Admin.eq(false))
2035                .into_values::<_, QueryAs>()
2036                .one(&*tx)
2037                .await?
2038                .unwrap_or(0i64) as usize)
2039        })
2040        .await
2041    }
2042
2043    pub async fn share_project(
2044        &self,
2045        room_id: RoomId,
2046        connection: ConnectionId,
2047        worktrees: &[proto::WorktreeMetadata],
2048    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
2049        self.room_transaction(|tx| async move {
2050            let participant = room_participant::Entity::find()
2051                .filter(
2052                    Condition::all()
2053                        .add(
2054                            room_participant::Column::AnsweringConnectionId
2055                                .eq(connection.id as i32),
2056                        )
2057                        .add(
2058                            room_participant::Column::AnsweringConnectionServerId
2059                                .eq(connection.owner_id as i32),
2060                        ),
2061                )
2062                .one(&*tx)
2063                .await?
2064                .ok_or_else(|| anyhow!("could not find participant"))?;
2065            if participant.room_id != room_id {
2066                return Err(anyhow!("shared project on unexpected room"))?;
2067            }
2068
2069            let project = project::ActiveModel {
2070                room_id: ActiveValue::set(participant.room_id),
2071                host_user_id: ActiveValue::set(participant.user_id),
2072                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
2073                host_connection_server_id: ActiveValue::set(Some(ServerId(
2074                    connection.owner_id as i32,
2075                ))),
2076                ..Default::default()
2077            }
2078            .insert(&*tx)
2079            .await?;
2080
2081            if !worktrees.is_empty() {
2082                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2083                    worktree::ActiveModel {
2084                        id: ActiveValue::set(worktree.id as i64),
2085                        project_id: ActiveValue::set(project.id),
2086                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
2087                        root_name: ActiveValue::set(worktree.root_name.clone()),
2088                        visible: ActiveValue::set(worktree.visible),
2089                        scan_id: ActiveValue::set(0),
2090                        completed_scan_id: ActiveValue::set(0),
2091                    }
2092                }))
2093                .exec(&*tx)
2094                .await?;
2095            }
2096
2097            project_collaborator::ActiveModel {
2098                project_id: ActiveValue::set(project.id),
2099                connection_id: ActiveValue::set(connection.id as i32),
2100                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2101                user_id: ActiveValue::set(participant.user_id),
2102                replica_id: ActiveValue::set(ReplicaId(0)),
2103                is_host: ActiveValue::set(true),
2104                ..Default::default()
2105            }
2106            .insert(&*tx)
2107            .await?;
2108
2109            let room = self.get_room(room_id, &tx).await?;
2110            Ok((room_id, (project.id, room)))
2111        })
2112        .await
2113    }
2114
2115    pub async fn unshare_project(
2116        &self,
2117        project_id: ProjectId,
2118        connection: ConnectionId,
2119    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2120        self.room_transaction(|tx| async move {
2121            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2122
2123            let project = project::Entity::find_by_id(project_id)
2124                .one(&*tx)
2125                .await?
2126                .ok_or_else(|| anyhow!("project not found"))?;
2127            if project.host_connection()? == connection {
2128                let room_id = project.room_id;
2129                project::Entity::delete(project.into_active_model())
2130                    .exec(&*tx)
2131                    .await?;
2132                let room = self.get_room(room_id, &tx).await?;
2133                Ok((room_id, (room, guest_connection_ids)))
2134            } else {
2135                Err(anyhow!("cannot unshare a project hosted by another user"))?
2136            }
2137        })
2138        .await
2139    }
2140
2141    pub async fn update_project(
2142        &self,
2143        project_id: ProjectId,
2144        connection: ConnectionId,
2145        worktrees: &[proto::WorktreeMetadata],
2146    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2147        self.room_transaction(|tx| async move {
2148            let project = project::Entity::find_by_id(project_id)
2149                .filter(
2150                    Condition::all()
2151                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2152                        .add(
2153                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2154                        ),
2155                )
2156                .one(&*tx)
2157                .await?
2158                .ok_or_else(|| anyhow!("no such project"))?;
2159
2160            self.update_project_worktrees(project.id, worktrees, &tx)
2161                .await?;
2162
2163            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2164            let room = self.get_room(project.room_id, &tx).await?;
2165            Ok((project.room_id, (room, guest_connection_ids)))
2166        })
2167        .await
2168    }
2169
2170    async fn update_project_worktrees(
2171        &self,
2172        project_id: ProjectId,
2173        worktrees: &[proto::WorktreeMetadata],
2174        tx: &DatabaseTransaction,
2175    ) -> Result<()> {
2176        if !worktrees.is_empty() {
2177            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2178                id: ActiveValue::set(worktree.id as i64),
2179                project_id: ActiveValue::set(project_id),
2180                abs_path: ActiveValue::set(worktree.abs_path.clone()),
2181                root_name: ActiveValue::set(worktree.root_name.clone()),
2182                visible: ActiveValue::set(worktree.visible),
2183                scan_id: ActiveValue::set(0),
2184                completed_scan_id: ActiveValue::set(0),
2185            }))
2186            .on_conflict(
2187                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2188                    .update_column(worktree::Column::RootName)
2189                    .to_owned(),
2190            )
2191            .exec(&*tx)
2192            .await?;
2193        }
2194
2195        worktree::Entity::delete_many()
2196            .filter(worktree::Column::ProjectId.eq(project_id).and(
2197                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2198            ))
2199            .exec(&*tx)
2200            .await?;
2201
2202        Ok(())
2203    }
2204
2205    pub async fn update_worktree(
2206        &self,
2207        update: &proto::UpdateWorktree,
2208        connection: ConnectionId,
2209    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2210        self.room_transaction(|tx| async move {
2211            let project_id = ProjectId::from_proto(update.project_id);
2212            let worktree_id = update.worktree_id as i64;
2213
2214            // Ensure the update comes from the host.
2215            let project = project::Entity::find_by_id(project_id)
2216                .filter(
2217                    Condition::all()
2218                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2219                        .add(
2220                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2221                        ),
2222                )
2223                .one(&*tx)
2224                .await?
2225                .ok_or_else(|| anyhow!("no such project"))?;
2226            let room_id = project.room_id;
2227
2228            // Update metadata.
2229            worktree::Entity::update(worktree::ActiveModel {
2230                id: ActiveValue::set(worktree_id),
2231                project_id: ActiveValue::set(project_id),
2232                root_name: ActiveValue::set(update.root_name.clone()),
2233                scan_id: ActiveValue::set(update.scan_id as i64),
2234                completed_scan_id: if update.is_last_update {
2235                    ActiveValue::set(update.scan_id as i64)
2236                } else {
2237                    ActiveValue::default()
2238                },
2239                abs_path: ActiveValue::set(update.abs_path.clone()),
2240                ..Default::default()
2241            })
2242            .exec(&*tx)
2243            .await?;
2244
2245            if !update.updated_entries.is_empty() {
2246                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2247                    let mtime = entry.mtime.clone().unwrap_or_default();
2248                    worktree_entry::ActiveModel {
2249                        project_id: ActiveValue::set(project_id),
2250                        worktree_id: ActiveValue::set(worktree_id),
2251                        id: ActiveValue::set(entry.id as i64),
2252                        is_dir: ActiveValue::set(entry.is_dir),
2253                        path: ActiveValue::set(entry.path.clone()),
2254                        inode: ActiveValue::set(entry.inode as i64),
2255                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2256                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2257                        is_symlink: ActiveValue::set(entry.is_symlink),
2258                        is_ignored: ActiveValue::set(entry.is_ignored),
2259                        is_deleted: ActiveValue::set(false),
2260                        scan_id: ActiveValue::set(update.scan_id as i64),
2261                    }
2262                }))
2263                .on_conflict(
2264                    OnConflict::columns([
2265                        worktree_entry::Column::ProjectId,
2266                        worktree_entry::Column::WorktreeId,
2267                        worktree_entry::Column::Id,
2268                    ])
2269                    .update_columns([
2270                        worktree_entry::Column::IsDir,
2271                        worktree_entry::Column::Path,
2272                        worktree_entry::Column::Inode,
2273                        worktree_entry::Column::MtimeSeconds,
2274                        worktree_entry::Column::MtimeNanos,
2275                        worktree_entry::Column::IsSymlink,
2276                        worktree_entry::Column::IsIgnored,
2277                        worktree_entry::Column::ScanId,
2278                    ])
2279                    .to_owned(),
2280                )
2281                .exec(&*tx)
2282                .await?;
2283            }
2284
2285            if !update.removed_entries.is_empty() {
2286                worktree_entry::Entity::update_many()
2287                    .filter(
2288                        worktree_entry::Column::ProjectId
2289                            .eq(project_id)
2290                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2291                            .and(
2292                                worktree_entry::Column::Id
2293                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2294                            ),
2295                    )
2296                    .set(worktree_entry::ActiveModel {
2297                        is_deleted: ActiveValue::Set(true),
2298                        scan_id: ActiveValue::Set(update.scan_id as i64),
2299                        ..Default::default()
2300                    })
2301                    .exec(&*tx)
2302                    .await?;
2303            }
2304
2305            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2306            Ok((room_id, connection_ids))
2307        })
2308        .await
2309    }
2310
2311    pub async fn update_diagnostic_summary(
2312        &self,
2313        update: &proto::UpdateDiagnosticSummary,
2314        connection: ConnectionId,
2315    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2316        self.room_transaction(|tx| async move {
2317            let project_id = ProjectId::from_proto(update.project_id);
2318            let worktree_id = update.worktree_id as i64;
2319            let summary = update
2320                .summary
2321                .as_ref()
2322                .ok_or_else(|| anyhow!("invalid summary"))?;
2323
2324            // Ensure the update comes from the host.
2325            let project = project::Entity::find_by_id(project_id)
2326                .one(&*tx)
2327                .await?
2328                .ok_or_else(|| anyhow!("no such project"))?;
2329            if project.host_connection()? != connection {
2330                return Err(anyhow!("can't update a project hosted by someone else"))?;
2331            }
2332
2333            // Update summary.
2334            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2335                project_id: ActiveValue::set(project_id),
2336                worktree_id: ActiveValue::set(worktree_id),
2337                path: ActiveValue::set(summary.path.clone()),
2338                language_server_id: ActiveValue::set(summary.language_server_id as i64),
2339                error_count: ActiveValue::set(summary.error_count as i32),
2340                warning_count: ActiveValue::set(summary.warning_count as i32),
2341                ..Default::default()
2342            })
2343            .on_conflict(
2344                OnConflict::columns([
2345                    worktree_diagnostic_summary::Column::ProjectId,
2346                    worktree_diagnostic_summary::Column::WorktreeId,
2347                    worktree_diagnostic_summary::Column::Path,
2348                ])
2349                .update_columns([
2350                    worktree_diagnostic_summary::Column::LanguageServerId,
2351                    worktree_diagnostic_summary::Column::ErrorCount,
2352                    worktree_diagnostic_summary::Column::WarningCount,
2353                ])
2354                .to_owned(),
2355            )
2356            .exec(&*tx)
2357            .await?;
2358
2359            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2360            Ok((project.room_id, connection_ids))
2361        })
2362        .await
2363    }
2364
2365    pub async fn start_language_server(
2366        &self,
2367        update: &proto::StartLanguageServer,
2368        connection: ConnectionId,
2369    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2370        self.room_transaction(|tx| async move {
2371            let project_id = ProjectId::from_proto(update.project_id);
2372            let server = update
2373                .server
2374                .as_ref()
2375                .ok_or_else(|| anyhow!("invalid language server"))?;
2376
2377            // Ensure the update comes from the host.
2378            let project = project::Entity::find_by_id(project_id)
2379                .one(&*tx)
2380                .await?
2381                .ok_or_else(|| anyhow!("no such project"))?;
2382            if project.host_connection()? != connection {
2383                return Err(anyhow!("can't update a project hosted by someone else"))?;
2384            }
2385
2386            // Add the newly-started language server.
2387            language_server::Entity::insert(language_server::ActiveModel {
2388                project_id: ActiveValue::set(project_id),
2389                id: ActiveValue::set(server.id as i64),
2390                name: ActiveValue::set(server.name.clone()),
2391                ..Default::default()
2392            })
2393            .on_conflict(
2394                OnConflict::columns([
2395                    language_server::Column::ProjectId,
2396                    language_server::Column::Id,
2397                ])
2398                .update_column(language_server::Column::Name)
2399                .to_owned(),
2400            )
2401            .exec(&*tx)
2402            .await?;
2403
2404            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2405            Ok((project.room_id, connection_ids))
2406        })
2407        .await
2408    }
2409
2410    pub async fn join_project(
2411        &self,
2412        project_id: ProjectId,
2413        connection: ConnectionId,
2414    ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2415        self.room_transaction(|tx| async move {
2416            let participant = room_participant::Entity::find()
2417                .filter(
2418                    Condition::all()
2419                        .add(
2420                            room_participant::Column::AnsweringConnectionId
2421                                .eq(connection.id as i32),
2422                        )
2423                        .add(
2424                            room_participant::Column::AnsweringConnectionServerId
2425                                .eq(connection.owner_id as i32),
2426                        ),
2427                )
2428                .one(&*tx)
2429                .await?
2430                .ok_or_else(|| anyhow!("must join a room first"))?;
2431
2432            let project = project::Entity::find_by_id(project_id)
2433                .one(&*tx)
2434                .await?
2435                .ok_or_else(|| anyhow!("no such project"))?;
2436            if project.room_id != participant.room_id {
2437                return Err(anyhow!("no such project"))?;
2438            }
2439
2440            let mut collaborators = project
2441                .find_related(project_collaborator::Entity)
2442                .all(&*tx)
2443                .await?;
2444            let replica_ids = collaborators
2445                .iter()
2446                .map(|c| c.replica_id)
2447                .collect::<HashSet<_>>();
2448            let mut replica_id = ReplicaId(1);
2449            while replica_ids.contains(&replica_id) {
2450                replica_id.0 += 1;
2451            }
2452            let new_collaborator = project_collaborator::ActiveModel {
2453                project_id: ActiveValue::set(project_id),
2454                connection_id: ActiveValue::set(connection.id as i32),
2455                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2456                user_id: ActiveValue::set(participant.user_id),
2457                replica_id: ActiveValue::set(replica_id),
2458                is_host: ActiveValue::set(false),
2459                ..Default::default()
2460            }
2461            .insert(&*tx)
2462            .await?;
2463            collaborators.push(new_collaborator);
2464
2465            let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2466            let mut worktrees = db_worktrees
2467                .into_iter()
2468                .map(|db_worktree| {
2469                    (
2470                        db_worktree.id as u64,
2471                        Worktree {
2472                            id: db_worktree.id as u64,
2473                            abs_path: db_worktree.abs_path,
2474                            root_name: db_worktree.root_name,
2475                            visible: db_worktree.visible,
2476                            entries: Default::default(),
2477                            diagnostic_summaries: Default::default(),
2478                            scan_id: db_worktree.scan_id as u64,
2479                            completed_scan_id: db_worktree.completed_scan_id as u64,
2480                        },
2481                    )
2482                })
2483                .collect::<BTreeMap<_, _>>();
2484
2485            // Populate worktree entries.
2486            {
2487                let mut db_entries = worktree_entry::Entity::find()
2488                    .filter(
2489                        Condition::all()
2490                            .add(worktree_entry::Column::ProjectId.eq(project_id))
2491                            .add(worktree_entry::Column::IsDeleted.eq(false)),
2492                    )
2493                    .stream(&*tx)
2494                    .await?;
2495                while let Some(db_entry) = db_entries.next().await {
2496                    let db_entry = db_entry?;
2497                    if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2498                        worktree.entries.push(proto::Entry {
2499                            id: db_entry.id as u64,
2500                            is_dir: db_entry.is_dir,
2501                            path: db_entry.path,
2502                            inode: db_entry.inode as u64,
2503                            mtime: Some(proto::Timestamp {
2504                                seconds: db_entry.mtime_seconds as u64,
2505                                nanos: db_entry.mtime_nanos as u32,
2506                            }),
2507                            is_symlink: db_entry.is_symlink,
2508                            is_ignored: db_entry.is_ignored,
2509                        });
2510                    }
2511                }
2512            }
2513
2514            // Populate worktree diagnostic summaries.
2515            {
2516                let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2517                    .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2518                    .stream(&*tx)
2519                    .await?;
2520                while let Some(db_summary) = db_summaries.next().await {
2521                    let db_summary = db_summary?;
2522                    if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2523                        worktree
2524                            .diagnostic_summaries
2525                            .push(proto::DiagnosticSummary {
2526                                path: db_summary.path,
2527                                language_server_id: db_summary.language_server_id as u64,
2528                                error_count: db_summary.error_count as u32,
2529                                warning_count: db_summary.warning_count as u32,
2530                            });
2531                    }
2532                }
2533            }
2534
2535            // Populate language servers.
2536            let language_servers = project
2537                .find_related(language_server::Entity)
2538                .all(&*tx)
2539                .await?;
2540
2541            let room_id = project.room_id;
2542            let project = Project {
2543                collaborators: collaborators
2544                    .into_iter()
2545                    .map(|collaborator| ProjectCollaborator {
2546                        connection_id: collaborator.connection(),
2547                        user_id: collaborator.user_id,
2548                        replica_id: collaborator.replica_id,
2549                        is_host: collaborator.is_host,
2550                    })
2551                    .collect(),
2552                worktrees,
2553                language_servers: language_servers
2554                    .into_iter()
2555                    .map(|language_server| proto::LanguageServer {
2556                        id: language_server.id as u64,
2557                        name: language_server.name,
2558                    })
2559                    .collect(),
2560            };
2561            Ok((room_id, (project, replica_id as ReplicaId)))
2562        })
2563        .await
2564    }
2565
2566    pub async fn leave_project(
2567        &self,
2568        project_id: ProjectId,
2569        connection: ConnectionId,
2570    ) -> Result<RoomGuard<LeftProject>> {
2571        self.room_transaction(|tx| async move {
2572            let result = project_collaborator::Entity::delete_many()
2573                .filter(
2574                    Condition::all()
2575                        .add(project_collaborator::Column::ProjectId.eq(project_id))
2576                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2577                        .add(
2578                            project_collaborator::Column::ConnectionServerId
2579                                .eq(connection.owner_id as i32),
2580                        ),
2581                )
2582                .exec(&*tx)
2583                .await?;
2584            if result.rows_affected == 0 {
2585                Err(anyhow!("not a collaborator on this project"))?;
2586            }
2587
2588            let project = project::Entity::find_by_id(project_id)
2589                .one(&*tx)
2590                .await?
2591                .ok_or_else(|| anyhow!("no such project"))?;
2592            let collaborators = project
2593                .find_related(project_collaborator::Entity)
2594                .all(&*tx)
2595                .await?;
2596            let connection_ids = collaborators
2597                .into_iter()
2598                .map(|collaborator| collaborator.connection())
2599                .collect();
2600
2601            let left_project = LeftProject {
2602                id: project_id,
2603                host_user_id: project.host_user_id,
2604                host_connection_id: project.host_connection()?,
2605                connection_ids,
2606            };
2607            Ok((project.room_id, left_project))
2608        })
2609        .await
2610    }
2611
2612    pub async fn project_collaborators(
2613        &self,
2614        project_id: ProjectId,
2615        connection_id: ConnectionId,
2616    ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
2617        self.room_transaction(|tx| async move {
2618            let project = project::Entity::find_by_id(project_id)
2619                .one(&*tx)
2620                .await?
2621                .ok_or_else(|| anyhow!("no such project"))?;
2622            let collaborators = project_collaborator::Entity::find()
2623                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2624                .all(&*tx)
2625                .await?
2626                .into_iter()
2627                .map(|collaborator| ProjectCollaborator {
2628                    connection_id: collaborator.connection(),
2629                    user_id: collaborator.user_id,
2630                    replica_id: collaborator.replica_id,
2631                    is_host: collaborator.is_host,
2632                })
2633                .collect::<Vec<_>>();
2634
2635            if collaborators
2636                .iter()
2637                .any(|collaborator| collaborator.connection_id == connection_id)
2638            {
2639                Ok((project.room_id, collaborators))
2640            } else {
2641                Err(anyhow!("no such project"))?
2642            }
2643        })
2644        .await
2645    }
2646
2647    pub async fn project_connection_ids(
2648        &self,
2649        project_id: ProjectId,
2650        connection_id: ConnectionId,
2651    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
2652        self.room_transaction(|tx| async move {
2653            let project = project::Entity::find_by_id(project_id)
2654                .one(&*tx)
2655                .await?
2656                .ok_or_else(|| anyhow!("no such project"))?;
2657            let mut collaborators = project_collaborator::Entity::find()
2658                .filter(project_collaborator::Column::ProjectId.eq(project_id))
2659                .stream(&*tx)
2660                .await?;
2661
2662            let mut connection_ids = HashSet::default();
2663            while let Some(collaborator) = collaborators.next().await {
2664                let collaborator = collaborator?;
2665                connection_ids.insert(collaborator.connection());
2666            }
2667
2668            if connection_ids.contains(&connection_id) {
2669                Ok((project.room_id, connection_ids))
2670            } else {
2671                Err(anyhow!("no such project"))?
2672            }
2673        })
2674        .await
2675    }
2676
2677    async fn project_guest_connection_ids(
2678        &self,
2679        project_id: ProjectId,
2680        tx: &DatabaseTransaction,
2681    ) -> Result<Vec<ConnectionId>> {
2682        let mut collaborators = project_collaborator::Entity::find()
2683            .filter(
2684                project_collaborator::Column::ProjectId
2685                    .eq(project_id)
2686                    .and(project_collaborator::Column::IsHost.eq(false)),
2687            )
2688            .stream(tx)
2689            .await?;
2690
2691        let mut guest_connection_ids = Vec::new();
2692        while let Some(collaborator) = collaborators.next().await {
2693            let collaborator = collaborator?;
2694            guest_connection_ids.push(collaborator.connection());
2695        }
2696        Ok(guest_connection_ids)
2697    }
2698
2699    // access tokens
2700
2701    pub async fn create_access_token_hash(
2702        &self,
2703        user_id: UserId,
2704        access_token_hash: &str,
2705        max_access_token_count: usize,
2706    ) -> Result<()> {
2707        self.transaction(|tx| async {
2708            let tx = tx;
2709
2710            access_token::ActiveModel {
2711                user_id: ActiveValue::set(user_id),
2712                hash: ActiveValue::set(access_token_hash.into()),
2713                ..Default::default()
2714            }
2715            .insert(&*tx)
2716            .await?;
2717
2718            access_token::Entity::delete_many()
2719                .filter(
2720                    access_token::Column::Id.in_subquery(
2721                        Query::select()
2722                            .column(access_token::Column::Id)
2723                            .from(access_token::Entity)
2724                            .and_where(access_token::Column::UserId.eq(user_id))
2725                            .order_by(access_token::Column::Id, sea_orm::Order::Desc)
2726                            .limit(10000)
2727                            .offset(max_access_token_count as u64)
2728                            .to_owned(),
2729                    ),
2730                )
2731                .exec(&*tx)
2732                .await?;
2733            Ok(())
2734        })
2735        .await
2736    }
2737
2738    pub async fn get_access_token_hashes(&self, user_id: UserId) -> Result<Vec<String>> {
2739        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2740        enum QueryAs {
2741            Hash,
2742        }
2743
2744        self.transaction(|tx| async move {
2745            Ok(access_token::Entity::find()
2746                .select_only()
2747                .column(access_token::Column::Hash)
2748                .filter(access_token::Column::UserId.eq(user_id))
2749                .order_by_desc(access_token::Column::Id)
2750                .into_values::<_, QueryAs>()
2751                .all(&*tx)
2752                .await?)
2753        })
2754        .await
2755    }
2756
2757    async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
2758    where
2759        F: Send + Fn(TransactionHandle) -> Fut,
2760        Fut: Send + Future<Output = Result<T>>,
2761    {
2762        let body = async {
2763            loop {
2764                let (tx, result) = self.with_transaction(&f).await?;
2765                match result {
2766                    Ok(result) => {
2767                        match tx.commit().await.map_err(Into::into) {
2768                            Ok(()) => return Ok(result),
2769                            Err(error) => {
2770                                if is_serialization_error(&error) {
2771                                    // Retry (don't break the loop)
2772                                } else {
2773                                    return Err(error);
2774                                }
2775                            }
2776                        }
2777                    }
2778                    Err(error) => {
2779                        tx.rollback().await?;
2780                        if is_serialization_error(&error) {
2781                            // Retry (don't break the loop)
2782                        } else {
2783                            return Err(error);
2784                        }
2785                    }
2786                }
2787            }
2788        };
2789
2790        self.run(body).await
2791    }
2792
2793    async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
2794    where
2795        F: Send + Fn(TransactionHandle) -> Fut,
2796        Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
2797    {
2798        let body = async {
2799            loop {
2800                let (tx, result) = self.with_transaction(&f).await?;
2801                match result {
2802                    Ok(Some((room_id, data))) => {
2803                        let lock = self.rooms.entry(room_id).or_default().clone();
2804                        let _guard = lock.lock_owned().await;
2805                        match tx.commit().await.map_err(Into::into) {
2806                            Ok(()) => {
2807                                return Ok(Some(RoomGuard {
2808                                    data,
2809                                    _guard,
2810                                    _not_send: PhantomData,
2811                                }));
2812                            }
2813                            Err(error) => {
2814                                if is_serialization_error(&error) {
2815                                    // Retry (don't break the loop)
2816                                } else {
2817                                    return Err(error);
2818                                }
2819                            }
2820                        }
2821                    }
2822                    Ok(None) => {
2823                        match tx.commit().await.map_err(Into::into) {
2824                            Ok(()) => return Ok(None),
2825                            Err(error) => {
2826                                if is_serialization_error(&error) {
2827                                    // Retry (don't break the loop)
2828                                } else {
2829                                    return Err(error);
2830                                }
2831                            }
2832                        }
2833                    }
2834                    Err(error) => {
2835                        tx.rollback().await?;
2836                        if is_serialization_error(&error) {
2837                            // Retry (don't break the loop)
2838                        } else {
2839                            return Err(error);
2840                        }
2841                    }
2842                }
2843            }
2844        };
2845
2846        self.run(body).await
2847    }
2848
2849    async fn room_transaction<F, Fut, T>(&self, f: F) -> Result<RoomGuard<T>>
2850    where
2851        F: Send + Fn(TransactionHandle) -> Fut,
2852        Fut: Send + Future<Output = Result<(RoomId, T)>>,
2853    {
2854        let data = self
2855            .optional_room_transaction(move |tx| {
2856                let future = f(tx);
2857                async {
2858                    let data = future.await?;
2859                    Ok(Some(data))
2860                }
2861            })
2862            .await?;
2863        Ok(data.unwrap())
2864    }
2865
2866    async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
2867    where
2868        F: Send + Fn(TransactionHandle) -> Fut,
2869        Fut: Send + Future<Output = Result<T>>,
2870    {
2871        let tx = self
2872            .pool
2873            .begin_with_config(Some(IsolationLevel::Serializable), None)
2874            .await?;
2875
2876        let mut tx = Arc::new(Some(tx));
2877        let result = f(TransactionHandle(tx.clone())).await;
2878        let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
2879            return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
2880        };
2881
2882        Ok((tx, result))
2883    }
2884
2885    async fn run<F, T>(&self, future: F) -> T
2886    where
2887        F: Future<Output = T>,
2888    {
2889        #[cfg(test)]
2890        {
2891            if let Some(background) = self.background.as_ref() {
2892                background.simulate_random_delay().await;
2893            }
2894
2895            self.runtime.as_ref().unwrap().block_on(future)
2896        }
2897
2898        #[cfg(not(test))]
2899        {
2900            future.await
2901        }
2902    }
2903}
2904
2905fn is_serialization_error(error: &Error) -> bool {
2906    const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
2907    match error {
2908        Error::Database(
2909            DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
2910            | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
2911        ) if error
2912            .as_database_error()
2913            .and_then(|error| error.code())
2914            .as_deref()
2915            == Some(SERIALIZATION_FAILURE_CODE) =>
2916        {
2917            true
2918        }
2919        _ => false,
2920    }
2921}
2922
2923struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
2924
2925impl Deref for TransactionHandle {
2926    type Target = DatabaseTransaction;
2927
2928    fn deref(&self) -> &Self::Target {
2929        self.0.as_ref().as_ref().unwrap()
2930    }
2931}
2932
2933pub struct RoomGuard<T> {
2934    data: T,
2935    _guard: OwnedMutexGuard<()>,
2936    _not_send: PhantomData<Rc<()>>,
2937}
2938
2939impl<T> Deref for RoomGuard<T> {
2940    type Target = T;
2941
2942    fn deref(&self) -> &T {
2943        &self.data
2944    }
2945}
2946
2947impl<T> DerefMut for RoomGuard<T> {
2948    fn deref_mut(&mut self) -> &mut T {
2949        &mut self.data
2950    }
2951}
2952
2953#[derive(Debug, Serialize, Deserialize)]
2954pub struct NewUserParams {
2955    pub github_login: String,
2956    pub github_user_id: i32,
2957    pub invite_count: i32,
2958}
2959
2960#[derive(Debug)]
2961pub struct NewUserResult {
2962    pub user_id: UserId,
2963    pub metrics_id: String,
2964    pub inviting_user_id: Option<UserId>,
2965    pub signup_device_id: Option<String>,
2966}
2967
2968fn random_invite_code() -> String {
2969    nanoid::nanoid!(16)
2970}
2971
2972fn random_email_confirmation_code() -> String {
2973    nanoid::nanoid!(64)
2974}
2975
2976macro_rules! id_type {
2977    ($name:ident) => {
2978        #[derive(
2979            Clone,
2980            Copy,
2981            Debug,
2982            Default,
2983            PartialEq,
2984            Eq,
2985            PartialOrd,
2986            Ord,
2987            Hash,
2988            Serialize,
2989            Deserialize,
2990        )]
2991        #[serde(transparent)]
2992        pub struct $name(pub i32);
2993
2994        impl $name {
2995            #[allow(unused)]
2996            pub const MAX: Self = Self(i32::MAX);
2997
2998            #[allow(unused)]
2999            pub fn from_proto(value: u64) -> Self {
3000                Self(value as i32)
3001            }
3002
3003            #[allow(unused)]
3004            pub fn to_proto(self) -> u64 {
3005                self.0 as u64
3006            }
3007        }
3008
3009        impl std::fmt::Display for $name {
3010            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
3011                self.0.fmt(f)
3012            }
3013        }
3014
3015        impl From<$name> for sea_query::Value {
3016            fn from(value: $name) -> Self {
3017                sea_query::Value::Int(Some(value.0))
3018            }
3019        }
3020
3021        impl sea_orm::TryGetable for $name {
3022            fn try_get(
3023                res: &sea_orm::QueryResult,
3024                pre: &str,
3025                col: &str,
3026            ) -> Result<Self, sea_orm::TryGetError> {
3027                Ok(Self(i32::try_get(res, pre, col)?))
3028            }
3029        }
3030
3031        impl sea_query::ValueType for $name {
3032            fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
3033                match v {
3034                    Value::TinyInt(Some(int)) => {
3035                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3036                    }
3037                    Value::SmallInt(Some(int)) => {
3038                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3039                    }
3040                    Value::Int(Some(int)) => {
3041                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3042                    }
3043                    Value::BigInt(Some(int)) => {
3044                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3045                    }
3046                    Value::TinyUnsigned(Some(int)) => {
3047                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3048                    }
3049                    Value::SmallUnsigned(Some(int)) => {
3050                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3051                    }
3052                    Value::Unsigned(Some(int)) => {
3053                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3054                    }
3055                    Value::BigUnsigned(Some(int)) => {
3056                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3057                    }
3058                    _ => Err(sea_query::ValueTypeErr),
3059                }
3060            }
3061
3062            fn type_name() -> String {
3063                stringify!($name).into()
3064            }
3065
3066            fn array_type() -> sea_query::ArrayType {
3067                sea_query::ArrayType::Int
3068            }
3069
3070            fn column_type() -> sea_query::ColumnType {
3071                sea_query::ColumnType::Integer(None)
3072            }
3073        }
3074
3075        impl sea_orm::TryFromU64 for $name {
3076            fn try_from_u64(n: u64) -> Result<Self, DbErr> {
3077                Ok(Self(n.try_into().map_err(|_| {
3078                    DbErr::ConvertFromU64(concat!(
3079                        "error converting ",
3080                        stringify!($name),
3081                        " to u64"
3082                    ))
3083                })?))
3084            }
3085        }
3086
3087        impl sea_query::Nullable for $name {
3088            fn null() -> Value {
3089                Value::Int(None)
3090            }
3091        }
3092    };
3093}
3094
3095id_type!(AccessTokenId);
3096id_type!(ContactId);
3097id_type!(FollowerId);
3098id_type!(RoomId);
3099id_type!(RoomParticipantId);
3100id_type!(ProjectId);
3101id_type!(ProjectCollaboratorId);
3102id_type!(ReplicaId);
3103id_type!(ServerId);
3104id_type!(SignupId);
3105id_type!(UserId);
3106
3107pub struct RejoinedRoom {
3108    pub room: proto::Room,
3109    pub rejoined_projects: Vec<RejoinedProject>,
3110    pub reshared_projects: Vec<ResharedProject>,
3111}
3112
3113pub struct ResharedProject {
3114    pub id: ProjectId,
3115    pub old_connection_id: ConnectionId,
3116    pub collaborators: Vec<ProjectCollaborator>,
3117    pub worktrees: Vec<proto::WorktreeMetadata>,
3118}
3119
3120pub struct RejoinedProject {
3121    pub id: ProjectId,
3122    pub old_connection_id: ConnectionId,
3123    pub collaborators: Vec<ProjectCollaborator>,
3124    pub worktrees: Vec<RejoinedWorktree>,
3125    pub language_servers: Vec<proto::LanguageServer>,
3126}
3127
3128#[derive(Debug)]
3129pub struct RejoinedWorktree {
3130    pub id: u64,
3131    pub abs_path: String,
3132    pub root_name: String,
3133    pub visible: bool,
3134    pub updated_entries: Vec<proto::Entry>,
3135    pub removed_entries: Vec<u64>,
3136    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3137    pub scan_id: u64,
3138    pub completed_scan_id: u64,
3139}
3140
3141pub struct LeftRoom {
3142    pub room: proto::Room,
3143    pub left_projects: HashMap<ProjectId, LeftProject>,
3144    pub canceled_calls_to_user_ids: Vec<UserId>,
3145}
3146
3147pub struct RefreshedRoom {
3148    pub room: proto::Room,
3149    pub stale_participant_user_ids: Vec<UserId>,
3150    pub canceled_calls_to_user_ids: Vec<UserId>,
3151}
3152
3153pub struct Project {
3154    pub collaborators: Vec<ProjectCollaborator>,
3155    pub worktrees: BTreeMap<u64, Worktree>,
3156    pub language_servers: Vec<proto::LanguageServer>,
3157}
3158
3159pub struct ProjectCollaborator {
3160    pub connection_id: ConnectionId,
3161    pub user_id: UserId,
3162    pub replica_id: ReplicaId,
3163    pub is_host: bool,
3164}
3165
3166impl ProjectCollaborator {
3167    pub fn to_proto(&self) -> proto::Collaborator {
3168        proto::Collaborator {
3169            peer_id: Some(self.connection_id.into()),
3170            replica_id: self.replica_id.0 as u32,
3171            user_id: self.user_id.to_proto(),
3172        }
3173    }
3174}
3175
3176#[derive(Debug)]
3177pub struct LeftProject {
3178    pub id: ProjectId,
3179    pub host_user_id: UserId,
3180    pub host_connection_id: ConnectionId,
3181    pub connection_ids: Vec<ConnectionId>,
3182}
3183
3184pub struct Worktree {
3185    pub id: u64,
3186    pub abs_path: String,
3187    pub root_name: String,
3188    pub visible: bool,
3189    pub entries: Vec<proto::Entry>,
3190    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3191    pub scan_id: u64,
3192    pub completed_scan_id: u64,
3193}
3194
3195#[cfg(test)]
3196pub use test::*;
3197
3198#[cfg(test)]
3199mod test {
3200    use super::*;
3201    use gpui::executor::Background;
3202    use lazy_static::lazy_static;
3203    use parking_lot::Mutex;
3204    use rand::prelude::*;
3205    use sea_orm::ConnectionTrait;
3206    use sqlx::migrate::MigrateDatabase;
3207    use std::sync::Arc;
3208
3209    pub struct TestDb {
3210        pub db: Option<Arc<Database>>,
3211        pub connection: Option<sqlx::AnyConnection>,
3212    }
3213
3214    impl TestDb {
3215        pub fn sqlite(background: Arc<Background>) -> Self {
3216            let url = format!("sqlite::memory:");
3217            let runtime = tokio::runtime::Builder::new_current_thread()
3218                .enable_io()
3219                .enable_time()
3220                .build()
3221                .unwrap();
3222
3223            let mut db = runtime.block_on(async {
3224                let mut options = ConnectOptions::new(url);
3225                options.max_connections(5);
3226                let db = Database::new(options).await.unwrap();
3227                let sql = include_str!(concat!(
3228                    env!("CARGO_MANIFEST_DIR"),
3229                    "/migrations.sqlite/20221109000000_test_schema.sql"
3230                ));
3231                db.pool
3232                    .execute(sea_orm::Statement::from_string(
3233                        db.pool.get_database_backend(),
3234                        sql.into(),
3235                    ))
3236                    .await
3237                    .unwrap();
3238                db
3239            });
3240
3241            db.background = Some(background);
3242            db.runtime = Some(runtime);
3243
3244            Self {
3245                db: Some(Arc::new(db)),
3246                connection: None,
3247            }
3248        }
3249
3250        pub fn postgres(background: Arc<Background>) -> Self {
3251            lazy_static! {
3252                static ref LOCK: Mutex<()> = Mutex::new(());
3253            }
3254
3255            let _guard = LOCK.lock();
3256            let mut rng = StdRng::from_entropy();
3257            let url = format!(
3258                "postgres://postgres@localhost/zed-test-{}",
3259                rng.gen::<u128>()
3260            );
3261            let runtime = tokio::runtime::Builder::new_current_thread()
3262                .enable_io()
3263                .enable_time()
3264                .build()
3265                .unwrap();
3266
3267            let mut db = runtime.block_on(async {
3268                sqlx::Postgres::create_database(&url)
3269                    .await
3270                    .expect("failed to create test db");
3271                let mut options = ConnectOptions::new(url);
3272                options
3273                    .max_connections(5)
3274                    .idle_timeout(Duration::from_secs(0));
3275                let db = Database::new(options).await.unwrap();
3276                let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
3277                db.migrate(Path::new(migrations_path), false).await.unwrap();
3278                db
3279            });
3280
3281            db.background = Some(background);
3282            db.runtime = Some(runtime);
3283
3284            Self {
3285                db: Some(Arc::new(db)),
3286                connection: None,
3287            }
3288        }
3289
3290        pub fn db(&self) -> &Arc<Database> {
3291            self.db.as_ref().unwrap()
3292        }
3293    }
3294
3295    impl Drop for TestDb {
3296        fn drop(&mut self) {
3297            let db = self.db.take().unwrap();
3298            if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
3299                db.runtime.as_ref().unwrap().block_on(async {
3300                    use util::ResultExt;
3301                    let query = "
3302                        SELECT pg_terminate_backend(pg_stat_activity.pid)
3303                        FROM pg_stat_activity
3304                        WHERE
3305                            pg_stat_activity.datname = current_database() AND
3306                            pid <> pg_backend_pid();
3307                    ";
3308                    db.pool
3309                        .execute(sea_orm::Statement::from_string(
3310                            db.pool.get_database_backend(),
3311                            query.into(),
3312                        ))
3313                        .await
3314                        .log_err();
3315                    sqlx::Postgres::drop_database(db.options.get_url())
3316                        .await
3317                        .log_err();
3318                })
3319            }
3320        }
3321    }
3322}