db.rs

   1mod access_token;
   2mod channel;
   3mod channel_member;
   4mod channel_parent;
   5mod contact;
   6mod follower;
   7mod language_server;
   8mod project;
   9mod project_collaborator;
  10mod room;
  11mod room_participant;
  12mod server;
  13mod signup;
  14#[cfg(test)]
  15mod tests;
  16mod user;
  17mod worktree;
  18mod worktree_diagnostic_summary;
  19mod worktree_entry;
  20mod worktree_repository;
  21mod worktree_repository_statuses;
  22mod worktree_settings_file;
  23
  24use crate::executor::Executor;
  25use crate::{Error, Result};
  26use anyhow::anyhow;
  27use collections::{BTreeMap, HashMap, HashSet};
  28pub use contact::Contact;
  29use dashmap::DashMap;
  30use futures::StreamExt;
  31use hyper::StatusCode;
  32use rand::prelude::StdRng;
  33use rand::{Rng, SeedableRng};
  34use rpc::{proto, ConnectionId};
  35use sea_orm::Condition;
  36pub use sea_orm::ConnectOptions;
  37use sea_orm::{
  38    entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
  39    DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
  40    Statement, TransactionTrait,
  41};
  42use sea_query::{Alias, Expr, OnConflict, Query};
  43use serde::{Deserialize, Serialize};
  44pub use signup::{Invite, NewSignup, WaitlistSummary};
  45use sqlx::migrate::{Migrate, Migration, MigrationSource};
  46use sqlx::Connection;
  47use std::fmt::Write as _;
  48use std::ops::{Deref, DerefMut};
  49use std::path::Path;
  50use std::time::Duration;
  51use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
  52use tokio::sync::{Mutex, OwnedMutexGuard};
  53pub use user::Model as User;
  54
  55pub struct Database {
  56    options: ConnectOptions,
  57    pool: DatabaseConnection,
  58    rooms: DashMap<RoomId, Arc<Mutex<()>>>,
  59    rng: Mutex<StdRng>,
  60    executor: Executor,
  61    #[cfg(test)]
  62    runtime: Option<tokio::runtime::Runtime>,
  63}
  64
  65impl Database {
  66    pub async fn new(options: ConnectOptions, executor: Executor) -> Result<Self> {
  67        Ok(Self {
  68            options: options.clone(),
  69            pool: sea_orm::Database::connect(options).await?,
  70            rooms: DashMap::with_capacity(16384),
  71            rng: Mutex::new(StdRng::seed_from_u64(0)),
  72            executor,
  73            #[cfg(test)]
  74            runtime: None,
  75        })
  76    }
  77
  78    #[cfg(test)]
  79    pub fn reset(&self) {
  80        self.rooms.clear();
  81    }
  82
  83    pub async fn migrate(
  84        &self,
  85        migrations_path: &Path,
  86        ignore_checksum_mismatch: bool,
  87    ) -> anyhow::Result<Vec<(Migration, Duration)>> {
  88        let migrations = MigrationSource::resolve(migrations_path)
  89            .await
  90            .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
  91
  92        let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
  93
  94        connection.ensure_migrations_table().await?;
  95        let applied_migrations: HashMap<_, _> = connection
  96            .list_applied_migrations()
  97            .await?
  98            .into_iter()
  99            .map(|m| (m.version, m))
 100            .collect();
 101
 102        let mut new_migrations = Vec::new();
 103        for migration in migrations {
 104            match applied_migrations.get(&migration.version) {
 105                Some(applied_migration) => {
 106                    if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
 107                    {
 108                        Err(anyhow!(
 109                            "checksum mismatch for applied migration {}",
 110                            migration.description
 111                        ))?;
 112                    }
 113                }
 114                None => {
 115                    let elapsed = connection.apply(&migration).await?;
 116                    new_migrations.push((migration, elapsed));
 117                }
 118            }
 119        }
 120
 121        Ok(new_migrations)
 122    }
 123
 124    pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
 125        self.transaction(|tx| async move {
 126            let server = server::ActiveModel {
 127                environment: ActiveValue::set(environment.into()),
 128                ..Default::default()
 129            }
 130            .insert(&*tx)
 131            .await?;
 132            Ok(server.id)
 133        })
 134        .await
 135    }
 136
 137    pub async fn stale_room_ids(
 138        &self,
 139        environment: &str,
 140        new_server_id: ServerId,
 141    ) -> Result<Vec<RoomId>> {
 142        self.transaction(|tx| async move {
 143            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 144            enum QueryAs {
 145                RoomId,
 146            }
 147
 148            let stale_server_epochs = self
 149                .stale_server_ids(environment, new_server_id, &tx)
 150                .await?;
 151            Ok(room_participant::Entity::find()
 152                .select_only()
 153                .column(room_participant::Column::RoomId)
 154                .distinct()
 155                .filter(
 156                    room_participant::Column::AnsweringConnectionServerId
 157                        .is_in(stale_server_epochs),
 158                )
 159                .into_values::<_, QueryAs>()
 160                .all(&*tx)
 161                .await?)
 162        })
 163        .await
 164    }
 165
 166    pub async fn refresh_room(
 167        &self,
 168        room_id: RoomId,
 169        new_server_id: ServerId,
 170    ) -> Result<RoomGuard<RefreshedRoom>> {
 171        self.room_transaction(room_id, |tx| async move {
 172            let stale_participant_filter = Condition::all()
 173                .add(room_participant::Column::RoomId.eq(room_id))
 174                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
 175                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
 176
 177            let stale_participant_user_ids = room_participant::Entity::find()
 178                .filter(stale_participant_filter.clone())
 179                .all(&*tx)
 180                .await?
 181                .into_iter()
 182                .map(|participant| participant.user_id)
 183                .collect::<Vec<_>>();
 184
 185            // Delete participants who failed to reconnect and cancel their calls.
 186            let mut canceled_calls_to_user_ids = Vec::new();
 187            room_participant::Entity::delete_many()
 188                .filter(stale_participant_filter)
 189                .exec(&*tx)
 190                .await?;
 191            let called_participants = room_participant::Entity::find()
 192                .filter(
 193                    Condition::all()
 194                        .add(
 195                            room_participant::Column::CallingUserId
 196                                .is_in(stale_participant_user_ids.iter().copied()),
 197                        )
 198                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
 199                )
 200                .all(&*tx)
 201                .await?;
 202            room_participant::Entity::delete_many()
 203                .filter(
 204                    room_participant::Column::Id
 205                        .is_in(called_participants.iter().map(|participant| participant.id)),
 206                )
 207                .exec(&*tx)
 208                .await?;
 209            canceled_calls_to_user_ids.extend(
 210                called_participants
 211                    .into_iter()
 212                    .map(|participant| participant.user_id),
 213            );
 214
 215            let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
 216            let channel_members;
 217            if let Some(channel_id) = channel_id {
 218                channel_members = self.get_channel_members_internal(channel_id, &tx).await?;
 219            } else {
 220                channel_members = Vec::new();
 221
 222                // Delete the room if it becomes empty.
 223                if room.participants.is_empty() {
 224                    project::Entity::delete_many()
 225                        .filter(project::Column::RoomId.eq(room_id))
 226                        .exec(&*tx)
 227                        .await?;
 228                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
 229                }
 230            };
 231
 232            Ok(RefreshedRoom {
 233                room,
 234                channel_id,
 235                channel_members,
 236                stale_participant_user_ids,
 237                canceled_calls_to_user_ids,
 238            })
 239        })
 240        .await
 241    }
 242
 243    pub async fn delete_stale_servers(
 244        &self,
 245        environment: &str,
 246        new_server_id: ServerId,
 247    ) -> Result<()> {
 248        self.transaction(|tx| async move {
 249            server::Entity::delete_many()
 250                .filter(
 251                    Condition::all()
 252                        .add(server::Column::Environment.eq(environment))
 253                        .add(server::Column::Id.ne(new_server_id)),
 254                )
 255                .exec(&*tx)
 256                .await?;
 257            Ok(())
 258        })
 259        .await
 260    }
 261
 262    async fn stale_server_ids(
 263        &self,
 264        environment: &str,
 265        new_server_id: ServerId,
 266        tx: &DatabaseTransaction,
 267    ) -> Result<Vec<ServerId>> {
 268        let stale_servers = server::Entity::find()
 269            .filter(
 270                Condition::all()
 271                    .add(server::Column::Environment.eq(environment))
 272                    .add(server::Column::Id.ne(new_server_id)),
 273            )
 274            .all(&*tx)
 275            .await?;
 276        Ok(stale_servers.into_iter().map(|server| server.id).collect())
 277    }
 278
 279    // users
 280
 281    pub async fn create_user(
 282        &self,
 283        email_address: &str,
 284        admin: bool,
 285        params: NewUserParams,
 286    ) -> Result<NewUserResult> {
 287        self.transaction(|tx| async {
 288            let tx = tx;
 289            let user = user::Entity::insert(user::ActiveModel {
 290                email_address: ActiveValue::set(Some(email_address.into())),
 291                github_login: ActiveValue::set(params.github_login.clone()),
 292                github_user_id: ActiveValue::set(Some(params.github_user_id)),
 293                admin: ActiveValue::set(admin),
 294                metrics_id: ActiveValue::set(Uuid::new_v4()),
 295                ..Default::default()
 296            })
 297            .on_conflict(
 298                OnConflict::column(user::Column::GithubLogin)
 299                    .update_column(user::Column::GithubLogin)
 300                    .to_owned(),
 301            )
 302            .exec_with_returning(&*tx)
 303            .await?;
 304
 305            Ok(NewUserResult {
 306                user_id: user.id,
 307                metrics_id: user.metrics_id.to_string(),
 308                signup_device_id: None,
 309                inviting_user_id: None,
 310            })
 311        })
 312        .await
 313    }
 314
 315    pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
 316        self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
 317            .await
 318    }
 319
 320    pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
 321        self.transaction(|tx| async {
 322            let tx = tx;
 323            Ok(user::Entity::find()
 324                .filter(user::Column::Id.is_in(ids.iter().copied()))
 325                .all(&*tx)
 326                .await?)
 327        })
 328        .await
 329    }
 330
 331    pub async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>> {
 332        self.transaction(|tx| async move {
 333            Ok(user::Entity::find()
 334                .filter(user::Column::GithubLogin.eq(github_login))
 335                .one(&*tx)
 336                .await?)
 337        })
 338        .await
 339    }
 340
 341    pub async fn get_or_create_user_by_github_account(
 342        &self,
 343        github_login: &str,
 344        github_user_id: Option<i32>,
 345        github_email: Option<&str>,
 346    ) -> Result<Option<User>> {
 347        self.transaction(|tx| async move {
 348            let tx = &*tx;
 349            if let Some(github_user_id) = github_user_id {
 350                if let Some(user_by_github_user_id) = user::Entity::find()
 351                    .filter(user::Column::GithubUserId.eq(github_user_id))
 352                    .one(tx)
 353                    .await?
 354                {
 355                    let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
 356                    user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
 357                    Ok(Some(user_by_github_user_id.update(tx).await?))
 358                } else if let Some(user_by_github_login) = user::Entity::find()
 359                    .filter(user::Column::GithubLogin.eq(github_login))
 360                    .one(tx)
 361                    .await?
 362                {
 363                    let mut user_by_github_login = user_by_github_login.into_active_model();
 364                    user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
 365                    Ok(Some(user_by_github_login.update(tx).await?))
 366                } else {
 367                    let user = user::Entity::insert(user::ActiveModel {
 368                        email_address: ActiveValue::set(github_email.map(|email| email.into())),
 369                        github_login: ActiveValue::set(github_login.into()),
 370                        github_user_id: ActiveValue::set(Some(github_user_id)),
 371                        admin: ActiveValue::set(false),
 372                        invite_count: ActiveValue::set(0),
 373                        invite_code: ActiveValue::set(None),
 374                        metrics_id: ActiveValue::set(Uuid::new_v4()),
 375                        ..Default::default()
 376                    })
 377                    .exec_with_returning(&*tx)
 378                    .await?;
 379                    Ok(Some(user))
 380                }
 381            } else {
 382                Ok(user::Entity::find()
 383                    .filter(user::Column::GithubLogin.eq(github_login))
 384                    .one(tx)
 385                    .await?)
 386            }
 387        })
 388        .await
 389    }
 390
 391    pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
 392        self.transaction(|tx| async move {
 393            Ok(user::Entity::find()
 394                .order_by_asc(user::Column::GithubLogin)
 395                .limit(limit as u64)
 396                .offset(page as u64 * limit as u64)
 397                .all(&*tx)
 398                .await?)
 399        })
 400        .await
 401    }
 402
 403    pub async fn get_users_with_no_invites(
 404        &self,
 405        invited_by_another_user: bool,
 406    ) -> Result<Vec<User>> {
 407        self.transaction(|tx| async move {
 408            Ok(user::Entity::find()
 409                .filter(
 410                    user::Column::InviteCount
 411                        .eq(0)
 412                        .and(if invited_by_another_user {
 413                            user::Column::InviterId.is_not_null()
 414                        } else {
 415                            user::Column::InviterId.is_null()
 416                        }),
 417                )
 418                .all(&*tx)
 419                .await?)
 420        })
 421        .await
 422    }
 423
 424    pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
 425        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 426        enum QueryAs {
 427            MetricsId,
 428        }
 429
 430        self.transaction(|tx| async move {
 431            let metrics_id: Uuid = user::Entity::find_by_id(id)
 432                .select_only()
 433                .column(user::Column::MetricsId)
 434                .into_values::<_, QueryAs>()
 435                .one(&*tx)
 436                .await?
 437                .ok_or_else(|| anyhow!("could not find user"))?;
 438            Ok(metrics_id.to_string())
 439        })
 440        .await
 441    }
 442
 443    pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
 444        self.transaction(|tx| async move {
 445            user::Entity::update_many()
 446                .filter(user::Column::Id.eq(id))
 447                .set(user::ActiveModel {
 448                    admin: ActiveValue::set(is_admin),
 449                    ..Default::default()
 450                })
 451                .exec(&*tx)
 452                .await?;
 453            Ok(())
 454        })
 455        .await
 456    }
 457
 458    pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
 459        self.transaction(|tx| async move {
 460            user::Entity::update_many()
 461                .filter(user::Column::Id.eq(id))
 462                .set(user::ActiveModel {
 463                    connected_once: ActiveValue::set(connected_once),
 464                    ..Default::default()
 465                })
 466                .exec(&*tx)
 467                .await?;
 468            Ok(())
 469        })
 470        .await
 471    }
 472
 473    pub async fn destroy_user(&self, id: UserId) -> Result<()> {
 474        self.transaction(|tx| async move {
 475            access_token::Entity::delete_many()
 476                .filter(access_token::Column::UserId.eq(id))
 477                .exec(&*tx)
 478                .await?;
 479            user::Entity::delete_by_id(id).exec(&*tx).await?;
 480            Ok(())
 481        })
 482        .await
 483    }
 484
 485    // contacts
 486
 487    pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
 488        #[derive(Debug, FromQueryResult)]
 489        struct ContactWithUserBusyStatuses {
 490            user_id_a: UserId,
 491            user_id_b: UserId,
 492            a_to_b: bool,
 493            accepted: bool,
 494            should_notify: bool,
 495            user_a_busy: bool,
 496            user_b_busy: bool,
 497        }
 498
 499        self.transaction(|tx| async move {
 500            let user_a_participant = Alias::new("user_a_participant");
 501            let user_b_participant = Alias::new("user_b_participant");
 502            let mut db_contacts = contact::Entity::find()
 503                .column_as(
 504                    Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
 505                        .is_not_null(),
 506                    "user_a_busy",
 507                )
 508                .column_as(
 509                    Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
 510                        .is_not_null(),
 511                    "user_b_busy",
 512                )
 513                .filter(
 514                    contact::Column::UserIdA
 515                        .eq(user_id)
 516                        .or(contact::Column::UserIdB.eq(user_id)),
 517                )
 518                .join_as(
 519                    JoinType::LeftJoin,
 520                    contact::Relation::UserARoomParticipant.def(),
 521                    user_a_participant,
 522                )
 523                .join_as(
 524                    JoinType::LeftJoin,
 525                    contact::Relation::UserBRoomParticipant.def(),
 526                    user_b_participant,
 527                )
 528                .into_model::<ContactWithUserBusyStatuses>()
 529                .stream(&*tx)
 530                .await?;
 531
 532            let mut contacts = Vec::new();
 533            while let Some(db_contact) = db_contacts.next().await {
 534                let db_contact = db_contact?;
 535                if db_contact.user_id_a == user_id {
 536                    if db_contact.accepted {
 537                        contacts.push(Contact::Accepted {
 538                            user_id: db_contact.user_id_b,
 539                            should_notify: db_contact.should_notify && db_contact.a_to_b,
 540                            busy: db_contact.user_b_busy,
 541                        });
 542                    } else if db_contact.a_to_b {
 543                        contacts.push(Contact::Outgoing {
 544                            user_id: db_contact.user_id_b,
 545                        })
 546                    } else {
 547                        contacts.push(Contact::Incoming {
 548                            user_id: db_contact.user_id_b,
 549                            should_notify: db_contact.should_notify,
 550                        });
 551                    }
 552                } else if db_contact.accepted {
 553                    contacts.push(Contact::Accepted {
 554                        user_id: db_contact.user_id_a,
 555                        should_notify: db_contact.should_notify && !db_contact.a_to_b,
 556                        busy: db_contact.user_a_busy,
 557                    });
 558                } else if db_contact.a_to_b {
 559                    contacts.push(Contact::Incoming {
 560                        user_id: db_contact.user_id_a,
 561                        should_notify: db_contact.should_notify,
 562                    });
 563                } else {
 564                    contacts.push(Contact::Outgoing {
 565                        user_id: db_contact.user_id_a,
 566                    });
 567                }
 568            }
 569
 570            contacts.sort_unstable_by_key(|contact| contact.user_id());
 571
 572            Ok(contacts)
 573        })
 574        .await
 575    }
 576
 577    pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
 578        self.transaction(|tx| async move {
 579            let participant = room_participant::Entity::find()
 580                .filter(room_participant::Column::UserId.eq(user_id))
 581                .one(&*tx)
 582                .await?;
 583            Ok(participant.is_some())
 584        })
 585        .await
 586    }
 587
 588    pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
 589        self.transaction(|tx| async move {
 590            let (id_a, id_b) = if user_id_1 < user_id_2 {
 591                (user_id_1, user_id_2)
 592            } else {
 593                (user_id_2, user_id_1)
 594            };
 595
 596            Ok(contact::Entity::find()
 597                .filter(
 598                    contact::Column::UserIdA
 599                        .eq(id_a)
 600                        .and(contact::Column::UserIdB.eq(id_b))
 601                        .and(contact::Column::Accepted.eq(true)),
 602                )
 603                .one(&*tx)
 604                .await?
 605                .is_some())
 606        })
 607        .await
 608    }
 609
 610    pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
 611        self.transaction(|tx| async move {
 612            let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
 613                (sender_id, receiver_id, true)
 614            } else {
 615                (receiver_id, sender_id, false)
 616            };
 617
 618            let rows_affected = contact::Entity::insert(contact::ActiveModel {
 619                user_id_a: ActiveValue::set(id_a),
 620                user_id_b: ActiveValue::set(id_b),
 621                a_to_b: ActiveValue::set(a_to_b),
 622                accepted: ActiveValue::set(false),
 623                should_notify: ActiveValue::set(true),
 624                ..Default::default()
 625            })
 626            .on_conflict(
 627                OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
 628                    .values([
 629                        (contact::Column::Accepted, true.into()),
 630                        (contact::Column::ShouldNotify, false.into()),
 631                    ])
 632                    .action_and_where(
 633                        contact::Column::Accepted.eq(false).and(
 634                            contact::Column::AToB
 635                                .eq(a_to_b)
 636                                .and(contact::Column::UserIdA.eq(id_b))
 637                                .or(contact::Column::AToB
 638                                    .ne(a_to_b)
 639                                    .and(contact::Column::UserIdA.eq(id_a))),
 640                        ),
 641                    )
 642                    .to_owned(),
 643            )
 644            .exec_without_returning(&*tx)
 645            .await?;
 646
 647            if rows_affected == 1 {
 648                Ok(())
 649            } else {
 650                Err(anyhow!("contact already requested"))?
 651            }
 652        })
 653        .await
 654    }
 655
 656    /// Returns a bool indicating whether the removed contact had originally accepted or not
 657    ///
 658    /// Deletes the contact identified by the requester and responder ids, and then returns
 659    /// whether the deleted contact had originally accepted or was a pending contact request.
 660    ///
 661    /// # Arguments
 662    ///
 663    /// * `requester_id` - The user that initiates this request
 664    /// * `responder_id` - The user that will be removed
 665    pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<bool> {
 666        self.transaction(|tx| async move {
 667            let (id_a, id_b) = if responder_id < requester_id {
 668                (responder_id, requester_id)
 669            } else {
 670                (requester_id, responder_id)
 671            };
 672
 673            let contact = contact::Entity::find()
 674                .filter(
 675                    contact::Column::UserIdA
 676                        .eq(id_a)
 677                        .and(contact::Column::UserIdB.eq(id_b)),
 678                )
 679                .one(&*tx)
 680                .await?
 681                .ok_or_else(|| anyhow!("no such contact"))?;
 682
 683            contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
 684            Ok(contact.accepted)
 685        })
 686        .await
 687    }
 688
 689    pub async fn dismiss_contact_notification(
 690        &self,
 691        user_id: UserId,
 692        contact_user_id: UserId,
 693    ) -> Result<()> {
 694        self.transaction(|tx| async move {
 695            let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
 696                (user_id, contact_user_id, true)
 697            } else {
 698                (contact_user_id, user_id, false)
 699            };
 700
 701            let result = contact::Entity::update_many()
 702                .set(contact::ActiveModel {
 703                    should_notify: ActiveValue::set(false),
 704                    ..Default::default()
 705                })
 706                .filter(
 707                    contact::Column::UserIdA
 708                        .eq(id_a)
 709                        .and(contact::Column::UserIdB.eq(id_b))
 710                        .and(
 711                            contact::Column::AToB
 712                                .eq(a_to_b)
 713                                .and(contact::Column::Accepted.eq(true))
 714                                .or(contact::Column::AToB
 715                                    .ne(a_to_b)
 716                                    .and(contact::Column::Accepted.eq(false))),
 717                        ),
 718                )
 719                .exec(&*tx)
 720                .await?;
 721            if result.rows_affected == 0 {
 722                Err(anyhow!("no such contact request"))?
 723            } else {
 724                Ok(())
 725            }
 726        })
 727        .await
 728    }
 729
 730    pub async fn respond_to_contact_request(
 731        &self,
 732        responder_id: UserId,
 733        requester_id: UserId,
 734        accept: bool,
 735    ) -> Result<()> {
 736        self.transaction(|tx| async move {
 737            let (id_a, id_b, a_to_b) = if responder_id < requester_id {
 738                (responder_id, requester_id, false)
 739            } else {
 740                (requester_id, responder_id, true)
 741            };
 742            let rows_affected = if accept {
 743                let result = contact::Entity::update_many()
 744                    .set(contact::ActiveModel {
 745                        accepted: ActiveValue::set(true),
 746                        should_notify: ActiveValue::set(true),
 747                        ..Default::default()
 748                    })
 749                    .filter(
 750                        contact::Column::UserIdA
 751                            .eq(id_a)
 752                            .and(contact::Column::UserIdB.eq(id_b))
 753                            .and(contact::Column::AToB.eq(a_to_b)),
 754                    )
 755                    .exec(&*tx)
 756                    .await?;
 757                result.rows_affected
 758            } else {
 759                let result = contact::Entity::delete_many()
 760                    .filter(
 761                        contact::Column::UserIdA
 762                            .eq(id_a)
 763                            .and(contact::Column::UserIdB.eq(id_b))
 764                            .and(contact::Column::AToB.eq(a_to_b))
 765                            .and(contact::Column::Accepted.eq(false)),
 766                    )
 767                    .exec(&*tx)
 768                    .await?;
 769
 770                result.rows_affected
 771            };
 772
 773            if rows_affected == 1 {
 774                Ok(())
 775            } else {
 776                Err(anyhow!("no such contact request"))?
 777            }
 778        })
 779        .await
 780    }
 781
 782    pub fn fuzzy_like_string(string: &str) -> String {
 783        let mut result = String::with_capacity(string.len() * 2 + 1);
 784        for c in string.chars() {
 785            if c.is_alphanumeric() {
 786                result.push('%');
 787                result.push(c);
 788            }
 789        }
 790        result.push('%');
 791        result
 792    }
 793
 794    pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
 795        self.transaction(|tx| async {
 796            let tx = tx;
 797            let like_string = Self::fuzzy_like_string(name_query);
 798            let query = "
 799                SELECT users.*
 800                FROM users
 801                WHERE github_login ILIKE $1
 802                ORDER BY github_login <-> $2
 803                LIMIT $3
 804            ";
 805
 806            Ok(user::Entity::find()
 807                .from_raw_sql(Statement::from_sql_and_values(
 808                    self.pool.get_database_backend(),
 809                    query.into(),
 810                    vec![like_string.into(), name_query.into(), limit.into()],
 811                ))
 812                .all(&*tx)
 813                .await?)
 814        })
 815        .await
 816    }
 817
 818    // signups
 819
 820    pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
 821        self.transaction(|tx| async move {
 822            signup::Entity::insert(signup::ActiveModel {
 823                email_address: ActiveValue::set(signup.email_address.clone()),
 824                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 825                email_confirmation_sent: ActiveValue::set(false),
 826                platform_mac: ActiveValue::set(signup.platform_mac),
 827                platform_windows: ActiveValue::set(signup.platform_windows),
 828                platform_linux: ActiveValue::set(signup.platform_linux),
 829                platform_unknown: ActiveValue::set(false),
 830                editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
 831                programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
 832                device_id: ActiveValue::set(signup.device_id.clone()),
 833                added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
 834                ..Default::default()
 835            })
 836            .on_conflict(
 837                OnConflict::column(signup::Column::EmailAddress)
 838                    .update_columns([
 839                        signup::Column::PlatformMac,
 840                        signup::Column::PlatformWindows,
 841                        signup::Column::PlatformLinux,
 842                        signup::Column::EditorFeatures,
 843                        signup::Column::ProgrammingLanguages,
 844                        signup::Column::DeviceId,
 845                        signup::Column::AddedToMailingList,
 846                    ])
 847                    .to_owned(),
 848            )
 849            .exec(&*tx)
 850            .await?;
 851            Ok(())
 852        })
 853        .await
 854    }
 855
 856    pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
 857        self.transaction(|tx| async move {
 858            let signup = signup::Entity::find()
 859                .filter(signup::Column::EmailAddress.eq(email_address))
 860                .one(&*tx)
 861                .await?
 862                .ok_or_else(|| {
 863                    anyhow!("signup with email address {} doesn't exist", email_address)
 864                })?;
 865
 866            Ok(signup)
 867        })
 868        .await
 869    }
 870
 871    pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
 872        self.transaction(|tx| async move {
 873            let query = "
 874                SELECT
 875                    COUNT(*) as count,
 876                    COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
 877                    COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
 878                    COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
 879                    COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
 880                FROM (
 881                    SELECT *
 882                    FROM signups
 883                    WHERE
 884                        NOT email_confirmation_sent
 885                ) AS unsent
 886            ";
 887            Ok(
 888                WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
 889                    self.pool.get_database_backend(),
 890                    query.into(),
 891                    vec![],
 892                ))
 893                .one(&*tx)
 894                .await?
 895                .ok_or_else(|| anyhow!("invalid result"))?,
 896            )
 897        })
 898        .await
 899    }
 900
 901    pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
 902        let emails = invites
 903            .iter()
 904            .map(|s| s.email_address.as_str())
 905            .collect::<Vec<_>>();
 906        self.transaction(|tx| async {
 907            let tx = tx;
 908            signup::Entity::update_many()
 909                .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
 910                .set(signup::ActiveModel {
 911                    email_confirmation_sent: ActiveValue::set(true),
 912                    ..Default::default()
 913                })
 914                .exec(&*tx)
 915                .await?;
 916            Ok(())
 917        })
 918        .await
 919    }
 920
 921    pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
 922        self.transaction(|tx| async move {
 923            Ok(signup::Entity::find()
 924                .select_only()
 925                .column(signup::Column::EmailAddress)
 926                .column(signup::Column::EmailConfirmationCode)
 927                .filter(
 928                    signup::Column::EmailConfirmationSent.eq(false).and(
 929                        signup::Column::PlatformMac
 930                            .eq(true)
 931                            .or(signup::Column::PlatformUnknown.eq(true)),
 932                    ),
 933                )
 934                .order_by_asc(signup::Column::CreatedAt)
 935                .limit(count as u64)
 936                .into_model()
 937                .all(&*tx)
 938                .await?)
 939        })
 940        .await
 941    }
 942
 943    // invite codes
 944
 945    pub async fn create_invite_from_code(
 946        &self,
 947        code: &str,
 948        email_address: &str,
 949        device_id: Option<&str>,
 950        added_to_mailing_list: bool,
 951    ) -> Result<Invite> {
 952        self.transaction(|tx| async move {
 953            let existing_user = user::Entity::find()
 954                .filter(user::Column::EmailAddress.eq(email_address))
 955                .one(&*tx)
 956                .await?;
 957
 958            if existing_user.is_some() {
 959                Err(anyhow!("email address is already in use"))?;
 960            }
 961
 962            let inviting_user_with_invites = match user::Entity::find()
 963                .filter(
 964                    user::Column::InviteCode
 965                        .eq(code)
 966                        .and(user::Column::InviteCount.gt(0)),
 967                )
 968                .one(&*tx)
 969                .await?
 970            {
 971                Some(inviting_user) => inviting_user,
 972                None => {
 973                    return Err(Error::Http(
 974                        StatusCode::UNAUTHORIZED,
 975                        "unable to find an invite code with invites remaining".to_string(),
 976                    ))?
 977                }
 978            };
 979            user::Entity::update_many()
 980                .filter(
 981                    user::Column::Id
 982                        .eq(inviting_user_with_invites.id)
 983                        .and(user::Column::InviteCount.gt(0)),
 984                )
 985                .col_expr(
 986                    user::Column::InviteCount,
 987                    Expr::col(user::Column::InviteCount).sub(1),
 988                )
 989                .exec(&*tx)
 990                .await?;
 991
 992            let signup = signup::Entity::insert(signup::ActiveModel {
 993                email_address: ActiveValue::set(email_address.into()),
 994                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 995                email_confirmation_sent: ActiveValue::set(false),
 996                inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
 997                platform_linux: ActiveValue::set(false),
 998                platform_mac: ActiveValue::set(false),
 999                platform_windows: ActiveValue::set(false),
1000                platform_unknown: ActiveValue::set(true),
1001                device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
1002                added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
1003                ..Default::default()
1004            })
1005            .on_conflict(
1006                OnConflict::column(signup::Column::EmailAddress)
1007                    .update_column(signup::Column::InvitingUserId)
1008                    .to_owned(),
1009            )
1010            .exec_with_returning(&*tx)
1011            .await?;
1012
1013            Ok(Invite {
1014                email_address: signup.email_address,
1015                email_confirmation_code: signup.email_confirmation_code,
1016            })
1017        })
1018        .await
1019    }
1020
1021    pub async fn create_user_from_invite(
1022        &self,
1023        invite: &Invite,
1024        user: NewUserParams,
1025    ) -> Result<Option<NewUserResult>> {
1026        self.transaction(|tx| async {
1027            let tx = tx;
1028            let signup = signup::Entity::find()
1029                .filter(
1030                    signup::Column::EmailAddress
1031                        .eq(invite.email_address.as_str())
1032                        .and(
1033                            signup::Column::EmailConfirmationCode
1034                                .eq(invite.email_confirmation_code.as_str()),
1035                        ),
1036                )
1037                .one(&*tx)
1038                .await?
1039                .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
1040
1041            if signup.user_id.is_some() {
1042                return Ok(None);
1043            }
1044
1045            let user = user::Entity::insert(user::ActiveModel {
1046                email_address: ActiveValue::set(Some(invite.email_address.clone())),
1047                github_login: ActiveValue::set(user.github_login.clone()),
1048                github_user_id: ActiveValue::set(Some(user.github_user_id)),
1049                admin: ActiveValue::set(false),
1050                invite_count: ActiveValue::set(user.invite_count),
1051                invite_code: ActiveValue::set(Some(random_invite_code())),
1052                metrics_id: ActiveValue::set(Uuid::new_v4()),
1053                ..Default::default()
1054            })
1055            .on_conflict(
1056                OnConflict::column(user::Column::GithubLogin)
1057                    .update_columns([
1058                        user::Column::EmailAddress,
1059                        user::Column::GithubUserId,
1060                        user::Column::Admin,
1061                    ])
1062                    .to_owned(),
1063            )
1064            .exec_with_returning(&*tx)
1065            .await?;
1066
1067            let mut signup = signup.into_active_model();
1068            signup.user_id = ActiveValue::set(Some(user.id));
1069            let signup = signup.update(&*tx).await?;
1070
1071            if let Some(inviting_user_id) = signup.inviting_user_id {
1072                let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1073                    (inviting_user_id, user.id, true)
1074                } else {
1075                    (user.id, inviting_user_id, false)
1076                };
1077
1078                contact::Entity::insert(contact::ActiveModel {
1079                    user_id_a: ActiveValue::set(user_id_a),
1080                    user_id_b: ActiveValue::set(user_id_b),
1081                    a_to_b: ActiveValue::set(a_to_b),
1082                    should_notify: ActiveValue::set(true),
1083                    accepted: ActiveValue::set(true),
1084                    ..Default::default()
1085                })
1086                .on_conflict(OnConflict::new().do_nothing().to_owned())
1087                .exec_without_returning(&*tx)
1088                .await?;
1089            }
1090
1091            Ok(Some(NewUserResult {
1092                user_id: user.id,
1093                metrics_id: user.metrics_id.to_string(),
1094                inviting_user_id: signup.inviting_user_id,
1095                signup_device_id: signup.device_id,
1096            }))
1097        })
1098        .await
1099    }
1100
1101    pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1102        self.transaction(|tx| async move {
1103            if count > 0 {
1104                user::Entity::update_many()
1105                    .filter(
1106                        user::Column::Id
1107                            .eq(id)
1108                            .and(user::Column::InviteCode.is_null()),
1109                    )
1110                    .set(user::ActiveModel {
1111                        invite_code: ActiveValue::set(Some(random_invite_code())),
1112                        ..Default::default()
1113                    })
1114                    .exec(&*tx)
1115                    .await?;
1116            }
1117
1118            user::Entity::update_many()
1119                .filter(user::Column::Id.eq(id))
1120                .set(user::ActiveModel {
1121                    invite_count: ActiveValue::set(count),
1122                    ..Default::default()
1123                })
1124                .exec(&*tx)
1125                .await?;
1126            Ok(())
1127        })
1128        .await
1129    }
1130
1131    pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1132        self.transaction(|tx| async move {
1133            match user::Entity::find_by_id(id).one(&*tx).await? {
1134                Some(user) if user.invite_code.is_some() => {
1135                    Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1136                }
1137                _ => Ok(None),
1138            }
1139        })
1140        .await
1141    }
1142
1143    pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1144        self.transaction(|tx| async move {
1145            user::Entity::find()
1146                .filter(user::Column::InviteCode.eq(code))
1147                .one(&*tx)
1148                .await?
1149                .ok_or_else(|| {
1150                    Error::Http(
1151                        StatusCode::NOT_FOUND,
1152                        "that invite code does not exist".to_string(),
1153                    )
1154                })
1155        })
1156        .await
1157    }
1158
1159    // rooms
1160
1161    pub async fn incoming_call_for_user(
1162        &self,
1163        user_id: UserId,
1164    ) -> Result<Option<proto::IncomingCall>> {
1165        self.transaction(|tx| async move {
1166            let pending_participant = room_participant::Entity::find()
1167                .filter(
1168                    room_participant::Column::UserId
1169                        .eq(user_id)
1170                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1171                )
1172                .one(&*tx)
1173                .await?;
1174
1175            if let Some(pending_participant) = pending_participant {
1176                let room = self.get_room(pending_participant.room_id, &tx).await?;
1177                Ok(Self::build_incoming_call(&room, user_id))
1178            } else {
1179                Ok(None)
1180            }
1181        })
1182        .await
1183    }
1184
1185    pub async fn create_room(
1186        &self,
1187        user_id: UserId,
1188        connection: ConnectionId,
1189        live_kit_room: &str,
1190    ) -> Result<proto::Room> {
1191        self.transaction(|tx| async move {
1192            let room = room::ActiveModel {
1193                live_kit_room: ActiveValue::set(live_kit_room.into()),
1194                ..Default::default()
1195            }
1196            .insert(&*tx)
1197            .await?;
1198            room_participant::ActiveModel {
1199                room_id: ActiveValue::set(room.id),
1200                user_id: ActiveValue::set(user_id),
1201                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1202                answering_connection_server_id: ActiveValue::set(Some(ServerId(
1203                    connection.owner_id as i32,
1204                ))),
1205                answering_connection_lost: ActiveValue::set(false),
1206                calling_user_id: ActiveValue::set(user_id),
1207                calling_connection_id: ActiveValue::set(connection.id as i32),
1208                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1209                    connection.owner_id as i32,
1210                ))),
1211                ..Default::default()
1212            }
1213            .insert(&*tx)
1214            .await?;
1215
1216            let room = self.get_room(room.id, &tx).await?;
1217            Ok(room)
1218        })
1219        .await
1220    }
1221
1222    pub async fn call(
1223        &self,
1224        room_id: RoomId,
1225        calling_user_id: UserId,
1226        calling_connection: ConnectionId,
1227        called_user_id: UserId,
1228        initial_project_id: Option<ProjectId>,
1229    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1230        self.room_transaction(room_id, |tx| async move {
1231            room_participant::ActiveModel {
1232                room_id: ActiveValue::set(room_id),
1233                user_id: ActiveValue::set(called_user_id),
1234                answering_connection_lost: ActiveValue::set(false),
1235                calling_user_id: ActiveValue::set(calling_user_id),
1236                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1237                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1238                    calling_connection.owner_id as i32,
1239                ))),
1240                initial_project_id: ActiveValue::set(initial_project_id),
1241                ..Default::default()
1242            }
1243            .insert(&*tx)
1244            .await?;
1245
1246            let room = self.get_room(room_id, &tx).await?;
1247            let incoming_call = Self::build_incoming_call(&room, called_user_id)
1248                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1249            Ok((room, incoming_call))
1250        })
1251        .await
1252    }
1253
1254    pub async fn call_failed(
1255        &self,
1256        room_id: RoomId,
1257        called_user_id: UserId,
1258    ) -> Result<RoomGuard<proto::Room>> {
1259        self.room_transaction(room_id, |tx| async move {
1260            room_participant::Entity::delete_many()
1261                .filter(
1262                    room_participant::Column::RoomId
1263                        .eq(room_id)
1264                        .and(room_participant::Column::UserId.eq(called_user_id)),
1265                )
1266                .exec(&*tx)
1267                .await?;
1268            let room = self.get_room(room_id, &tx).await?;
1269            Ok(room)
1270        })
1271        .await
1272    }
1273
1274    pub async fn decline_call(
1275        &self,
1276        expected_room_id: Option<RoomId>,
1277        user_id: UserId,
1278    ) -> Result<Option<RoomGuard<proto::Room>>> {
1279        self.optional_room_transaction(|tx| async move {
1280            let mut filter = Condition::all()
1281                .add(room_participant::Column::UserId.eq(user_id))
1282                .add(room_participant::Column::AnsweringConnectionId.is_null());
1283            if let Some(room_id) = expected_room_id {
1284                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1285            }
1286            let participant = room_participant::Entity::find()
1287                .filter(filter)
1288                .one(&*tx)
1289                .await?;
1290
1291            let participant = if let Some(participant) = participant {
1292                participant
1293            } else if expected_room_id.is_some() {
1294                return Err(anyhow!("could not find call to decline"))?;
1295            } else {
1296                return Ok(None);
1297            };
1298
1299            let room_id = participant.room_id;
1300            room_participant::Entity::delete(participant.into_active_model())
1301                .exec(&*tx)
1302                .await?;
1303
1304            let room = self.get_room(room_id, &tx).await?;
1305            Ok(Some((room_id, room)))
1306        })
1307        .await
1308    }
1309
1310    pub async fn cancel_call(
1311        &self,
1312        room_id: RoomId,
1313        calling_connection: ConnectionId,
1314        called_user_id: UserId,
1315    ) -> Result<RoomGuard<proto::Room>> {
1316        self.room_transaction(room_id, |tx| async move {
1317            let participant = room_participant::Entity::find()
1318                .filter(
1319                    Condition::all()
1320                        .add(room_participant::Column::UserId.eq(called_user_id))
1321                        .add(room_participant::Column::RoomId.eq(room_id))
1322                        .add(
1323                            room_participant::Column::CallingConnectionId
1324                                .eq(calling_connection.id as i32),
1325                        )
1326                        .add(
1327                            room_participant::Column::CallingConnectionServerId
1328                                .eq(calling_connection.owner_id as i32),
1329                        )
1330                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1331                )
1332                .one(&*tx)
1333                .await?
1334                .ok_or_else(|| anyhow!("no call to cancel"))?;
1335
1336            room_participant::Entity::delete(participant.into_active_model())
1337                .exec(&*tx)
1338                .await?;
1339
1340            let room = self.get_room(room_id, &tx).await?;
1341            Ok(room)
1342        })
1343        .await
1344    }
1345
1346    pub async fn is_current_room_different_channel(
1347        &self,
1348        user_id: UserId,
1349        channel_id: ChannelId,
1350    ) -> Result<bool> {
1351        self.transaction(|tx| async move {
1352            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1353            enum QueryAs {
1354                ChannelId,
1355            }
1356
1357            let channel_id_model: Option<ChannelId> = room_participant::Entity::find()
1358                .select_only()
1359                .column_as(room::Column::ChannelId, QueryAs::ChannelId)
1360                .inner_join(room::Entity)
1361                .filter(room_participant::Column::UserId.eq(user_id))
1362                .into_values::<_, QueryAs>()
1363                .one(&*tx)
1364                .await?;
1365
1366            let result = channel_id_model
1367                .map(|channel_id_model| channel_id_model != channel_id)
1368                .unwrap_or(false);
1369
1370            Ok(result)
1371        })
1372        .await
1373    }
1374
1375    pub async fn join_room(
1376        &self,
1377        room_id: RoomId,
1378        user_id: UserId,
1379        channel_id: Option<ChannelId>,
1380        connection: ConnectionId,
1381    ) -> Result<RoomGuard<JoinRoom>> {
1382        self.room_transaction(room_id, |tx| async move {
1383            if let Some(channel_id) = channel_id {
1384                channel_member::Entity::find()
1385                    .filter(
1386                        channel_member::Column::ChannelId
1387                            .eq(channel_id)
1388                            .and(channel_member::Column::UserId.eq(user_id))
1389                            .and(channel_member::Column::Accepted.eq(true)),
1390                    )
1391                    .one(&*tx)
1392                    .await?
1393                    .ok_or_else(|| anyhow!("no such channel membership"))?;
1394
1395                room_participant::ActiveModel {
1396                    room_id: ActiveValue::set(room_id),
1397                    user_id: ActiveValue::set(user_id),
1398                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1399                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1400                        connection.owner_id as i32,
1401                    ))),
1402                    answering_connection_lost: ActiveValue::set(false),
1403                    // Redundant for the channel join use case, used for channel and call invitations
1404                    calling_user_id: ActiveValue::set(user_id),
1405                    calling_connection_id: ActiveValue::set(connection.id as i32),
1406                    calling_connection_server_id: ActiveValue::set(Some(ServerId(
1407                        connection.owner_id as i32,
1408                    ))),
1409                    ..Default::default()
1410                }
1411                .insert(&*tx)
1412                .await?;
1413            } else {
1414                let result = room_participant::Entity::update_many()
1415                    .filter(
1416                        Condition::all()
1417                            .add(room_participant::Column::RoomId.eq(room_id))
1418                            .add(room_participant::Column::UserId.eq(user_id))
1419                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
1420                    )
1421                    .set(room_participant::ActiveModel {
1422                        answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1423                        answering_connection_server_id: ActiveValue::set(Some(ServerId(
1424                            connection.owner_id as i32,
1425                        ))),
1426                        answering_connection_lost: ActiveValue::set(false),
1427                        ..Default::default()
1428                    })
1429                    .exec(&*tx)
1430                    .await?;
1431                if result.rows_affected == 0 {
1432                    Err(anyhow!("room does not exist or was already joined"))?;
1433                }
1434            }
1435
1436            let room = self.get_room(room_id, &tx).await?;
1437            let channel_members = if let Some(channel_id) = channel_id {
1438                self.get_channel_members_internal(channel_id, &tx).await?
1439            } else {
1440                Vec::new()
1441            };
1442            Ok(JoinRoom {
1443                room,
1444                channel_id,
1445                channel_members,
1446            })
1447        })
1448        .await
1449    }
1450
1451    pub async fn rejoin_room(
1452        &self,
1453        rejoin_room: proto::RejoinRoom,
1454        user_id: UserId,
1455        connection: ConnectionId,
1456    ) -> Result<RoomGuard<RejoinedRoom>> {
1457        let room_id = RoomId::from_proto(rejoin_room.id);
1458        self.room_transaction(room_id, |tx| async {
1459            let tx = tx;
1460            let participant_update = room_participant::Entity::update_many()
1461                .filter(
1462                    Condition::all()
1463                        .add(room_participant::Column::RoomId.eq(room_id))
1464                        .add(room_participant::Column::UserId.eq(user_id))
1465                        .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1466                        .add(
1467                            Condition::any()
1468                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1469                                .add(
1470                                    room_participant::Column::AnsweringConnectionServerId
1471                                        .ne(connection.owner_id as i32),
1472                                ),
1473                        ),
1474                )
1475                .set(room_participant::ActiveModel {
1476                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1477                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1478                        connection.owner_id as i32,
1479                    ))),
1480                    answering_connection_lost: ActiveValue::set(false),
1481                    ..Default::default()
1482                })
1483                .exec(&*tx)
1484                .await?;
1485            if participant_update.rows_affected == 0 {
1486                return Err(anyhow!("room does not exist or was already joined"))?;
1487            }
1488
1489            let mut reshared_projects = Vec::new();
1490            for reshared_project in &rejoin_room.reshared_projects {
1491                let project_id = ProjectId::from_proto(reshared_project.project_id);
1492                let project = project::Entity::find_by_id(project_id)
1493                    .one(&*tx)
1494                    .await?
1495                    .ok_or_else(|| anyhow!("project does not exist"))?;
1496                if project.host_user_id != user_id {
1497                    return Err(anyhow!("no such project"))?;
1498                }
1499
1500                let mut collaborators = project
1501                    .find_related(project_collaborator::Entity)
1502                    .all(&*tx)
1503                    .await?;
1504                let host_ix = collaborators
1505                    .iter()
1506                    .position(|collaborator| {
1507                        collaborator.user_id == user_id && collaborator.is_host
1508                    })
1509                    .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1510                let host = collaborators.swap_remove(host_ix);
1511                let old_connection_id = host.connection();
1512
1513                project::Entity::update(project::ActiveModel {
1514                    host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1515                    host_connection_server_id: ActiveValue::set(Some(ServerId(
1516                        connection.owner_id as i32,
1517                    ))),
1518                    ..project.into_active_model()
1519                })
1520                .exec(&*tx)
1521                .await?;
1522                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1523                    connection_id: ActiveValue::set(connection.id as i32),
1524                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1525                    ..host.into_active_model()
1526                })
1527                .exec(&*tx)
1528                .await?;
1529
1530                self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1531                    .await?;
1532
1533                reshared_projects.push(ResharedProject {
1534                    id: project_id,
1535                    old_connection_id,
1536                    collaborators: collaborators
1537                        .iter()
1538                        .map(|collaborator| ProjectCollaborator {
1539                            connection_id: collaborator.connection(),
1540                            user_id: collaborator.user_id,
1541                            replica_id: collaborator.replica_id,
1542                            is_host: collaborator.is_host,
1543                        })
1544                        .collect(),
1545                    worktrees: reshared_project.worktrees.clone(),
1546                });
1547            }
1548
1549            project::Entity::delete_many()
1550                .filter(
1551                    Condition::all()
1552                        .add(project::Column::RoomId.eq(room_id))
1553                        .add(project::Column::HostUserId.eq(user_id))
1554                        .add(
1555                            project::Column::Id
1556                                .is_not_in(reshared_projects.iter().map(|project| project.id)),
1557                        ),
1558                )
1559                .exec(&*tx)
1560                .await?;
1561
1562            let mut rejoined_projects = Vec::new();
1563            for rejoined_project in &rejoin_room.rejoined_projects {
1564                let project_id = ProjectId::from_proto(rejoined_project.id);
1565                let Some(project) = project::Entity::find_by_id(project_id)
1566                    .one(&*tx)
1567                    .await? else { continue };
1568
1569                let mut worktrees = Vec::new();
1570                let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1571                for db_worktree in db_worktrees {
1572                    let mut worktree = RejoinedWorktree {
1573                        id: db_worktree.id as u64,
1574                        abs_path: db_worktree.abs_path,
1575                        root_name: db_worktree.root_name,
1576                        visible: db_worktree.visible,
1577                        updated_entries: Default::default(),
1578                        removed_entries: Default::default(),
1579                        updated_repositories: Default::default(),
1580                        removed_repositories: Default::default(),
1581                        diagnostic_summaries: Default::default(),
1582                        settings_files: Default::default(),
1583                        scan_id: db_worktree.scan_id as u64,
1584                        completed_scan_id: db_worktree.completed_scan_id as u64,
1585                    };
1586
1587                    let rejoined_worktree = rejoined_project
1588                        .worktrees
1589                        .iter()
1590                        .find(|worktree| worktree.id == db_worktree.id as u64);
1591
1592                    // File entries
1593                    {
1594                        let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1595                            worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1596                        } else {
1597                            worktree_entry::Column::IsDeleted.eq(false)
1598                        };
1599
1600                        let mut db_entries = worktree_entry::Entity::find()
1601                            .filter(
1602                                Condition::all()
1603                                    .add(worktree_entry::Column::ProjectId.eq(project.id))
1604                                    .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1605                                    .add(entry_filter),
1606                            )
1607                            .stream(&*tx)
1608                            .await?;
1609
1610                        while let Some(db_entry) = db_entries.next().await {
1611                            let db_entry = db_entry?;
1612                            if db_entry.is_deleted {
1613                                worktree.removed_entries.push(db_entry.id as u64);
1614                            } else {
1615                                worktree.updated_entries.push(proto::Entry {
1616                                    id: db_entry.id as u64,
1617                                    is_dir: db_entry.is_dir,
1618                                    path: db_entry.path,
1619                                    inode: db_entry.inode as u64,
1620                                    mtime: Some(proto::Timestamp {
1621                                        seconds: db_entry.mtime_seconds as u64,
1622                                        nanos: db_entry.mtime_nanos as u32,
1623                                    }),
1624                                    is_symlink: db_entry.is_symlink,
1625                                    is_ignored: db_entry.is_ignored,
1626                                    is_external: db_entry.is_external,
1627                                    git_status: db_entry.git_status.map(|status| status as i32),
1628                                });
1629                            }
1630                        }
1631                    }
1632
1633                    // Repository Entries
1634                    {
1635                        let repository_entry_filter =
1636                            if let Some(rejoined_worktree) = rejoined_worktree {
1637                                worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
1638                            } else {
1639                                worktree_repository::Column::IsDeleted.eq(false)
1640                            };
1641
1642                        let mut db_repositories = worktree_repository::Entity::find()
1643                            .filter(
1644                                Condition::all()
1645                                    .add(worktree_repository::Column::ProjectId.eq(project.id))
1646                                    .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
1647                                    .add(repository_entry_filter),
1648                            )
1649                            .stream(&*tx)
1650                            .await?;
1651
1652                        while let Some(db_repository) = db_repositories.next().await {
1653                            let db_repository = db_repository?;
1654                            if db_repository.is_deleted {
1655                                worktree
1656                                    .removed_repositories
1657                                    .push(db_repository.work_directory_id as u64);
1658                            } else {
1659                                worktree.updated_repositories.push(proto::RepositoryEntry {
1660                                    work_directory_id: db_repository.work_directory_id as u64,
1661                                    branch: db_repository.branch,
1662                                });
1663                            }
1664                        }
1665                    }
1666
1667                    worktrees.push(worktree);
1668                }
1669
1670                let language_servers = project
1671                    .find_related(language_server::Entity)
1672                    .all(&*tx)
1673                    .await?
1674                    .into_iter()
1675                    .map(|language_server| proto::LanguageServer {
1676                        id: language_server.id as u64,
1677                        name: language_server.name,
1678                    })
1679                    .collect::<Vec<_>>();
1680
1681                {
1682                    let mut db_settings_files = worktree_settings_file::Entity::find()
1683                        .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
1684                        .stream(&*tx)
1685                        .await?;
1686                    while let Some(db_settings_file) = db_settings_files.next().await {
1687                        let db_settings_file = db_settings_file?;
1688                        if let Some(worktree) = worktrees
1689                            .iter_mut()
1690                            .find(|w| w.id == db_settings_file.worktree_id as u64)
1691                        {
1692                            worktree.settings_files.push(WorktreeSettingsFile {
1693                                path: db_settings_file.path,
1694                                content: db_settings_file.content,
1695                            });
1696                        }
1697                    }
1698                }
1699
1700                let mut collaborators = project
1701                    .find_related(project_collaborator::Entity)
1702                    .all(&*tx)
1703                    .await?;
1704                let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1705                    .iter()
1706                    .position(|collaborator| collaborator.user_id == user_id)
1707                {
1708                    collaborators.swap_remove(self_collaborator_ix)
1709                } else {
1710                    continue;
1711                };
1712                let old_connection_id = self_collaborator.connection();
1713                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1714                    connection_id: ActiveValue::set(connection.id as i32),
1715                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1716                    ..self_collaborator.into_active_model()
1717                })
1718                .exec(&*tx)
1719                .await?;
1720
1721                let collaborators = collaborators
1722                    .into_iter()
1723                    .map(|collaborator| ProjectCollaborator {
1724                        connection_id: collaborator.connection(),
1725                        user_id: collaborator.user_id,
1726                        replica_id: collaborator.replica_id,
1727                        is_host: collaborator.is_host,
1728                    })
1729                    .collect::<Vec<_>>();
1730
1731                rejoined_projects.push(RejoinedProject {
1732                    id: project_id,
1733                    old_connection_id,
1734                    collaborators,
1735                    worktrees,
1736                    language_servers,
1737                });
1738            }
1739
1740            let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
1741
1742            let channel_members = if let Some(channel_id) = channel_id {
1743                self.get_channel_members_internal(channel_id, &tx).await?
1744            } else {
1745                Vec::new()
1746            };
1747
1748            Ok(RejoinedRoom {
1749                room,
1750                channel_id,
1751                channel_members,
1752                rejoined_projects,
1753                reshared_projects,
1754            })
1755        })
1756        .await
1757    }
1758
1759    pub async fn leave_room(
1760        &self,
1761        connection: ConnectionId,
1762    ) -> Result<Option<RoomGuard<LeftRoom>>> {
1763        self.optional_room_transaction(|tx| async move {
1764            let leaving_participant = room_participant::Entity::find()
1765                .filter(
1766                    Condition::all()
1767                        .add(
1768                            room_participant::Column::AnsweringConnectionId
1769                                .eq(connection.id as i32),
1770                        )
1771                        .add(
1772                            room_participant::Column::AnsweringConnectionServerId
1773                                .eq(connection.owner_id as i32),
1774                        ),
1775                )
1776                .one(&*tx)
1777                .await?;
1778
1779            if let Some(leaving_participant) = leaving_participant {
1780                // Leave room.
1781                let room_id = leaving_participant.room_id;
1782                room_participant::Entity::delete_by_id(leaving_participant.id)
1783                    .exec(&*tx)
1784                    .await?;
1785
1786                // Cancel pending calls initiated by the leaving user.
1787                let called_participants = room_participant::Entity::find()
1788                    .filter(
1789                        Condition::all()
1790                            .add(
1791                                room_participant::Column::CallingUserId
1792                                    .eq(leaving_participant.user_id),
1793                            )
1794                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
1795                    )
1796                    .all(&*tx)
1797                    .await?;
1798                room_participant::Entity::delete_many()
1799                    .filter(
1800                        room_participant::Column::Id
1801                            .is_in(called_participants.iter().map(|participant| participant.id)),
1802                    )
1803                    .exec(&*tx)
1804                    .await?;
1805                let canceled_calls_to_user_ids = called_participants
1806                    .into_iter()
1807                    .map(|participant| participant.user_id)
1808                    .collect();
1809
1810                // Detect left projects.
1811                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1812                enum QueryProjectIds {
1813                    ProjectId,
1814                }
1815                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1816                    .select_only()
1817                    .column_as(
1818                        project_collaborator::Column::ProjectId,
1819                        QueryProjectIds::ProjectId,
1820                    )
1821                    .filter(
1822                        Condition::all()
1823                            .add(
1824                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1825                            )
1826                            .add(
1827                                project_collaborator::Column::ConnectionServerId
1828                                    .eq(connection.owner_id as i32),
1829                            ),
1830                    )
1831                    .into_values::<_, QueryProjectIds>()
1832                    .all(&*tx)
1833                    .await?;
1834                let mut left_projects = HashMap::default();
1835                let mut collaborators = project_collaborator::Entity::find()
1836                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1837                    .stream(&*tx)
1838                    .await?;
1839                while let Some(collaborator) = collaborators.next().await {
1840                    let collaborator = collaborator?;
1841                    let left_project =
1842                        left_projects
1843                            .entry(collaborator.project_id)
1844                            .or_insert(LeftProject {
1845                                id: collaborator.project_id,
1846                                host_user_id: Default::default(),
1847                                connection_ids: Default::default(),
1848                                host_connection_id: Default::default(),
1849                            });
1850
1851                    let collaborator_connection_id = collaborator.connection();
1852                    if collaborator_connection_id != connection {
1853                        left_project.connection_ids.push(collaborator_connection_id);
1854                    }
1855
1856                    if collaborator.is_host {
1857                        left_project.host_user_id = collaborator.user_id;
1858                        left_project.host_connection_id = collaborator_connection_id;
1859                    }
1860                }
1861                drop(collaborators);
1862
1863                // Leave projects.
1864                project_collaborator::Entity::delete_many()
1865                    .filter(
1866                        Condition::all()
1867                            .add(
1868                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1869                            )
1870                            .add(
1871                                project_collaborator::Column::ConnectionServerId
1872                                    .eq(connection.owner_id as i32),
1873                            ),
1874                    )
1875                    .exec(&*tx)
1876                    .await?;
1877
1878                // Unshare projects.
1879                project::Entity::delete_many()
1880                    .filter(
1881                        Condition::all()
1882                            .add(project::Column::RoomId.eq(room_id))
1883                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
1884                            .add(
1885                                project::Column::HostConnectionServerId
1886                                    .eq(connection.owner_id as i32),
1887                            ),
1888                    )
1889                    .exec(&*tx)
1890                    .await?;
1891
1892                let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
1893                let deleted = if room.participants.is_empty() {
1894                    let result = room::Entity::delete_by_id(room_id)
1895                        .filter(room::Column::ChannelId.is_null())
1896                        .exec(&*tx)
1897                        .await?;
1898                    result.rows_affected > 0
1899                } else {
1900                    false
1901                };
1902
1903                let channel_members = if let Some(channel_id) = channel_id {
1904                    self.get_channel_members_internal(channel_id, &tx).await?
1905                } else {
1906                    Vec::new()
1907                };
1908                let left_room = LeftRoom {
1909                    room,
1910                    channel_id,
1911                    channel_members,
1912                    left_projects,
1913                    canceled_calls_to_user_ids,
1914                    deleted,
1915                };
1916
1917                if left_room.room.participants.is_empty() {
1918                    self.rooms.remove(&room_id);
1919                }
1920
1921                Ok(Some((room_id, left_room)))
1922            } else {
1923                Ok(None)
1924            }
1925        })
1926        .await
1927    }
1928
1929    pub async fn follow(
1930        &self,
1931        project_id: ProjectId,
1932        leader_connection: ConnectionId,
1933        follower_connection: ConnectionId,
1934    ) -> Result<RoomGuard<proto::Room>> {
1935        let room_id = self.room_id_for_project(project_id).await?;
1936        self.room_transaction(room_id, |tx| async move {
1937            follower::ActiveModel {
1938                room_id: ActiveValue::set(room_id),
1939                project_id: ActiveValue::set(project_id),
1940                leader_connection_server_id: ActiveValue::set(ServerId(
1941                    leader_connection.owner_id as i32,
1942                )),
1943                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1944                follower_connection_server_id: ActiveValue::set(ServerId(
1945                    follower_connection.owner_id as i32,
1946                )),
1947                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1948                ..Default::default()
1949            }
1950            .insert(&*tx)
1951            .await?;
1952
1953            let room = self.get_room(room_id, &*tx).await?;
1954            Ok(room)
1955        })
1956        .await
1957    }
1958
1959    pub async fn unfollow(
1960        &self,
1961        project_id: ProjectId,
1962        leader_connection: ConnectionId,
1963        follower_connection: ConnectionId,
1964    ) -> Result<RoomGuard<proto::Room>> {
1965        let room_id = self.room_id_for_project(project_id).await?;
1966        self.room_transaction(room_id, |tx| async move {
1967            follower::Entity::delete_many()
1968                .filter(
1969                    Condition::all()
1970                        .add(follower::Column::ProjectId.eq(project_id))
1971                        .add(
1972                            follower::Column::LeaderConnectionServerId
1973                                .eq(leader_connection.owner_id),
1974                        )
1975                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1976                        .add(
1977                            follower::Column::FollowerConnectionServerId
1978                                .eq(follower_connection.owner_id),
1979                        )
1980                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1981                )
1982                .exec(&*tx)
1983                .await?;
1984
1985            let room = self.get_room(room_id, &*tx).await?;
1986            Ok(room)
1987        })
1988        .await
1989    }
1990
1991    pub async fn update_room_participant_location(
1992        &self,
1993        room_id: RoomId,
1994        connection: ConnectionId,
1995        location: proto::ParticipantLocation,
1996    ) -> Result<RoomGuard<proto::Room>> {
1997        self.room_transaction(room_id, |tx| async {
1998            let tx = tx;
1999            let location_kind;
2000            let location_project_id;
2001            match location
2002                .variant
2003                .as_ref()
2004                .ok_or_else(|| anyhow!("invalid location"))?
2005            {
2006                proto::participant_location::Variant::SharedProject(project) => {
2007                    location_kind = 0;
2008                    location_project_id = Some(ProjectId::from_proto(project.id));
2009                }
2010                proto::participant_location::Variant::UnsharedProject(_) => {
2011                    location_kind = 1;
2012                    location_project_id = None;
2013                }
2014                proto::participant_location::Variant::External(_) => {
2015                    location_kind = 2;
2016                    location_project_id = None;
2017                }
2018            }
2019
2020            let result = room_participant::Entity::update_many()
2021                .filter(
2022                    Condition::all()
2023                        .add(room_participant::Column::RoomId.eq(room_id))
2024                        .add(
2025                            room_participant::Column::AnsweringConnectionId
2026                                .eq(connection.id as i32),
2027                        )
2028                        .add(
2029                            room_participant::Column::AnsweringConnectionServerId
2030                                .eq(connection.owner_id as i32),
2031                        ),
2032                )
2033                .set(room_participant::ActiveModel {
2034                    location_kind: ActiveValue::set(Some(location_kind)),
2035                    location_project_id: ActiveValue::set(location_project_id),
2036                    ..Default::default()
2037                })
2038                .exec(&*tx)
2039                .await?;
2040
2041            if result.rows_affected == 1 {
2042                let room = self.get_room(room_id, &tx).await?;
2043                Ok(room)
2044            } else {
2045                Err(anyhow!("could not update room participant location"))?
2046            }
2047        })
2048        .await
2049    }
2050
2051    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
2052        self.transaction(|tx| async move {
2053            let participant = room_participant::Entity::find()
2054                .filter(
2055                    Condition::all()
2056                        .add(
2057                            room_participant::Column::AnsweringConnectionId
2058                                .eq(connection.id as i32),
2059                        )
2060                        .add(
2061                            room_participant::Column::AnsweringConnectionServerId
2062                                .eq(connection.owner_id as i32),
2063                        ),
2064                )
2065                .one(&*tx)
2066                .await?
2067                .ok_or_else(|| anyhow!("not a participant in any room"))?;
2068
2069            room_participant::Entity::update(room_participant::ActiveModel {
2070                answering_connection_lost: ActiveValue::set(true),
2071                ..participant.into_active_model()
2072            })
2073            .exec(&*tx)
2074            .await?;
2075
2076            Ok(())
2077        })
2078        .await
2079    }
2080
2081    fn build_incoming_call(
2082        room: &proto::Room,
2083        called_user_id: UserId,
2084    ) -> Option<proto::IncomingCall> {
2085        let pending_participant = room
2086            .pending_participants
2087            .iter()
2088            .find(|participant| participant.user_id == called_user_id.to_proto())?;
2089
2090        Some(proto::IncomingCall {
2091            room_id: room.id,
2092            calling_user_id: pending_participant.calling_user_id,
2093            participant_user_ids: room
2094                .participants
2095                .iter()
2096                .map(|participant| participant.user_id)
2097                .collect(),
2098            initial_project: room.participants.iter().find_map(|participant| {
2099                let initial_project_id = pending_participant.initial_project_id?;
2100                participant
2101                    .projects
2102                    .iter()
2103                    .find(|project| project.id == initial_project_id)
2104                    .cloned()
2105            }),
2106        })
2107    }
2108    async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
2109        let (_, room) = self.get_channel_room(room_id, tx).await?;
2110        Ok(room)
2111    }
2112
2113    async fn get_channel_room(
2114        &self,
2115        room_id: RoomId,
2116        tx: &DatabaseTransaction,
2117    ) -> Result<(Option<ChannelId>, proto::Room)> {
2118        let db_room = room::Entity::find_by_id(room_id)
2119            .one(tx)
2120            .await?
2121            .ok_or_else(|| anyhow!("could not find room"))?;
2122
2123        let mut db_participants = db_room
2124            .find_related(room_participant::Entity)
2125            .stream(tx)
2126            .await?;
2127        let mut participants = HashMap::default();
2128        let mut pending_participants = Vec::new();
2129        while let Some(db_participant) = db_participants.next().await {
2130            let db_participant = db_participant?;
2131            if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
2132                .answering_connection_id
2133                .zip(db_participant.answering_connection_server_id)
2134            {
2135                let location = match (
2136                    db_participant.location_kind,
2137                    db_participant.location_project_id,
2138                ) {
2139                    (Some(0), Some(project_id)) => {
2140                        Some(proto::participant_location::Variant::SharedProject(
2141                            proto::participant_location::SharedProject {
2142                                id: project_id.to_proto(),
2143                            },
2144                        ))
2145                    }
2146                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
2147                        Default::default(),
2148                    )),
2149                    _ => Some(proto::participant_location::Variant::External(
2150                        Default::default(),
2151                    )),
2152                };
2153
2154                let answering_connection = ConnectionId {
2155                    owner_id: answering_connection_server_id.0 as u32,
2156                    id: answering_connection_id as u32,
2157                };
2158                participants.insert(
2159                    answering_connection,
2160                    proto::Participant {
2161                        user_id: db_participant.user_id.to_proto(),
2162                        peer_id: Some(answering_connection.into()),
2163                        projects: Default::default(),
2164                        location: Some(proto::ParticipantLocation { variant: location }),
2165                    },
2166                );
2167            } else {
2168                pending_participants.push(proto::PendingParticipant {
2169                    user_id: db_participant.user_id.to_proto(),
2170                    calling_user_id: db_participant.calling_user_id.to_proto(),
2171                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
2172                });
2173            }
2174        }
2175        drop(db_participants);
2176
2177        let mut db_projects = db_room
2178            .find_related(project::Entity)
2179            .find_with_related(worktree::Entity)
2180            .stream(tx)
2181            .await?;
2182
2183        while let Some(row) = db_projects.next().await {
2184            let (db_project, db_worktree) = row?;
2185            let host_connection = db_project.host_connection()?;
2186            if let Some(participant) = participants.get_mut(&host_connection) {
2187                let project = if let Some(project) = participant
2188                    .projects
2189                    .iter_mut()
2190                    .find(|project| project.id == db_project.id.to_proto())
2191                {
2192                    project
2193                } else {
2194                    participant.projects.push(proto::ParticipantProject {
2195                        id: db_project.id.to_proto(),
2196                        worktree_root_names: Default::default(),
2197                    });
2198                    participant.projects.last_mut().unwrap()
2199                };
2200
2201                if let Some(db_worktree) = db_worktree {
2202                    if db_worktree.visible {
2203                        project.worktree_root_names.push(db_worktree.root_name);
2204                    }
2205                }
2206            }
2207        }
2208        drop(db_projects);
2209
2210        let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
2211        let mut followers = Vec::new();
2212        while let Some(db_follower) = db_followers.next().await {
2213            let db_follower = db_follower?;
2214            followers.push(proto::Follower {
2215                leader_id: Some(db_follower.leader_connection().into()),
2216                follower_id: Some(db_follower.follower_connection().into()),
2217                project_id: db_follower.project_id.to_proto(),
2218            });
2219        }
2220
2221        Ok((
2222            db_room.channel_id,
2223            proto::Room {
2224                id: db_room.id.to_proto(),
2225                live_kit_room: db_room.live_kit_room,
2226                participants: participants.into_values().collect(),
2227                pending_participants,
2228                followers,
2229            },
2230        ))
2231    }
2232
2233    // projects
2234
2235    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
2236        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2237        enum QueryAs {
2238            Count,
2239        }
2240
2241        self.transaction(|tx| async move {
2242            Ok(project::Entity::find()
2243                .select_only()
2244                .column_as(project::Column::Id.count(), QueryAs::Count)
2245                .inner_join(user::Entity)
2246                .filter(user::Column::Admin.eq(false))
2247                .into_values::<_, QueryAs>()
2248                .one(&*tx)
2249                .await?
2250                .unwrap_or(0i64) as usize)
2251        })
2252        .await
2253    }
2254
2255    pub async fn share_project(
2256        &self,
2257        room_id: RoomId,
2258        connection: ConnectionId,
2259        worktrees: &[proto::WorktreeMetadata],
2260    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
2261        self.room_transaction(room_id, |tx| async move {
2262            let participant = room_participant::Entity::find()
2263                .filter(
2264                    Condition::all()
2265                        .add(
2266                            room_participant::Column::AnsweringConnectionId
2267                                .eq(connection.id as i32),
2268                        )
2269                        .add(
2270                            room_participant::Column::AnsweringConnectionServerId
2271                                .eq(connection.owner_id as i32),
2272                        ),
2273                )
2274                .one(&*tx)
2275                .await?
2276                .ok_or_else(|| anyhow!("could not find participant"))?;
2277            if participant.room_id != room_id {
2278                return Err(anyhow!("shared project on unexpected room"))?;
2279            }
2280
2281            let project = project::ActiveModel {
2282                room_id: ActiveValue::set(participant.room_id),
2283                host_user_id: ActiveValue::set(participant.user_id),
2284                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
2285                host_connection_server_id: ActiveValue::set(Some(ServerId(
2286                    connection.owner_id as i32,
2287                ))),
2288                ..Default::default()
2289            }
2290            .insert(&*tx)
2291            .await?;
2292
2293            if !worktrees.is_empty() {
2294                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2295                    worktree::ActiveModel {
2296                        id: ActiveValue::set(worktree.id as i64),
2297                        project_id: ActiveValue::set(project.id),
2298                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
2299                        root_name: ActiveValue::set(worktree.root_name.clone()),
2300                        visible: ActiveValue::set(worktree.visible),
2301                        scan_id: ActiveValue::set(0),
2302                        completed_scan_id: ActiveValue::set(0),
2303                    }
2304                }))
2305                .exec(&*tx)
2306                .await?;
2307            }
2308
2309            project_collaborator::ActiveModel {
2310                project_id: ActiveValue::set(project.id),
2311                connection_id: ActiveValue::set(connection.id as i32),
2312                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2313                user_id: ActiveValue::set(participant.user_id),
2314                replica_id: ActiveValue::set(ReplicaId(0)),
2315                is_host: ActiveValue::set(true),
2316                ..Default::default()
2317            }
2318            .insert(&*tx)
2319            .await?;
2320
2321            let room = self.get_room(room_id, &tx).await?;
2322            Ok((project.id, room))
2323        })
2324        .await
2325    }
2326
2327    pub async fn unshare_project(
2328        &self,
2329        project_id: ProjectId,
2330        connection: ConnectionId,
2331    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2332        let room_id = self.room_id_for_project(project_id).await?;
2333        self.room_transaction(room_id, |tx| async move {
2334            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2335
2336            let project = project::Entity::find_by_id(project_id)
2337                .one(&*tx)
2338                .await?
2339                .ok_or_else(|| anyhow!("project not found"))?;
2340            if project.host_connection()? == connection {
2341                project::Entity::delete(project.into_active_model())
2342                    .exec(&*tx)
2343                    .await?;
2344                let room = self.get_room(room_id, &tx).await?;
2345                Ok((room, guest_connection_ids))
2346            } else {
2347                Err(anyhow!("cannot unshare a project hosted by another user"))?
2348            }
2349        })
2350        .await
2351    }
2352
2353    pub async fn update_project(
2354        &self,
2355        project_id: ProjectId,
2356        connection: ConnectionId,
2357        worktrees: &[proto::WorktreeMetadata],
2358    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2359        let room_id = self.room_id_for_project(project_id).await?;
2360        self.room_transaction(room_id, |tx| async move {
2361            let project = project::Entity::find_by_id(project_id)
2362                .filter(
2363                    Condition::all()
2364                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2365                        .add(
2366                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2367                        ),
2368                )
2369                .one(&*tx)
2370                .await?
2371                .ok_or_else(|| anyhow!("no such project"))?;
2372
2373            self.update_project_worktrees(project.id, worktrees, &tx)
2374                .await?;
2375
2376            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2377            let room = self.get_room(project.room_id, &tx).await?;
2378            Ok((room, guest_connection_ids))
2379        })
2380        .await
2381    }
2382
2383    async fn update_project_worktrees(
2384        &self,
2385        project_id: ProjectId,
2386        worktrees: &[proto::WorktreeMetadata],
2387        tx: &DatabaseTransaction,
2388    ) -> Result<()> {
2389        if !worktrees.is_empty() {
2390            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2391                id: ActiveValue::set(worktree.id as i64),
2392                project_id: ActiveValue::set(project_id),
2393                abs_path: ActiveValue::set(worktree.abs_path.clone()),
2394                root_name: ActiveValue::set(worktree.root_name.clone()),
2395                visible: ActiveValue::set(worktree.visible),
2396                scan_id: ActiveValue::set(0),
2397                completed_scan_id: ActiveValue::set(0),
2398            }))
2399            .on_conflict(
2400                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2401                    .update_column(worktree::Column::RootName)
2402                    .to_owned(),
2403            )
2404            .exec(&*tx)
2405            .await?;
2406        }
2407
2408        worktree::Entity::delete_many()
2409            .filter(worktree::Column::ProjectId.eq(project_id).and(
2410                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2411            ))
2412            .exec(&*tx)
2413            .await?;
2414
2415        Ok(())
2416    }
2417
2418    pub async fn update_worktree(
2419        &self,
2420        update: &proto::UpdateWorktree,
2421        connection: ConnectionId,
2422    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2423        let project_id = ProjectId::from_proto(update.project_id);
2424        let worktree_id = update.worktree_id as i64;
2425        let room_id = self.room_id_for_project(project_id).await?;
2426        self.room_transaction(room_id, |tx| async move {
2427            // Ensure the update comes from the host.
2428            let _project = project::Entity::find_by_id(project_id)
2429                .filter(
2430                    Condition::all()
2431                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2432                        .add(
2433                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2434                        ),
2435                )
2436                .one(&*tx)
2437                .await?
2438                .ok_or_else(|| anyhow!("no such project"))?;
2439
2440            // Update metadata.
2441            worktree::Entity::update(worktree::ActiveModel {
2442                id: ActiveValue::set(worktree_id),
2443                project_id: ActiveValue::set(project_id),
2444                root_name: ActiveValue::set(update.root_name.clone()),
2445                scan_id: ActiveValue::set(update.scan_id as i64),
2446                completed_scan_id: if update.is_last_update {
2447                    ActiveValue::set(update.scan_id as i64)
2448                } else {
2449                    ActiveValue::default()
2450                },
2451                abs_path: ActiveValue::set(update.abs_path.clone()),
2452                ..Default::default()
2453            })
2454            .exec(&*tx)
2455            .await?;
2456
2457            if !update.updated_entries.is_empty() {
2458                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2459                    let mtime = entry.mtime.clone().unwrap_or_default();
2460                    worktree_entry::ActiveModel {
2461                        project_id: ActiveValue::set(project_id),
2462                        worktree_id: ActiveValue::set(worktree_id),
2463                        id: ActiveValue::set(entry.id as i64),
2464                        is_dir: ActiveValue::set(entry.is_dir),
2465                        path: ActiveValue::set(entry.path.clone()),
2466                        inode: ActiveValue::set(entry.inode as i64),
2467                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2468                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2469                        is_symlink: ActiveValue::set(entry.is_symlink),
2470                        is_ignored: ActiveValue::set(entry.is_ignored),
2471                        is_external: ActiveValue::set(entry.is_external),
2472                        git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
2473                        is_deleted: ActiveValue::set(false),
2474                        scan_id: ActiveValue::set(update.scan_id as i64),
2475                    }
2476                }))
2477                .on_conflict(
2478                    OnConflict::columns([
2479                        worktree_entry::Column::ProjectId,
2480                        worktree_entry::Column::WorktreeId,
2481                        worktree_entry::Column::Id,
2482                    ])
2483                    .update_columns([
2484                        worktree_entry::Column::IsDir,
2485                        worktree_entry::Column::Path,
2486                        worktree_entry::Column::Inode,
2487                        worktree_entry::Column::MtimeSeconds,
2488                        worktree_entry::Column::MtimeNanos,
2489                        worktree_entry::Column::IsSymlink,
2490                        worktree_entry::Column::IsIgnored,
2491                        worktree_entry::Column::GitStatus,
2492                        worktree_entry::Column::ScanId,
2493                    ])
2494                    .to_owned(),
2495                )
2496                .exec(&*tx)
2497                .await?;
2498            }
2499
2500            if !update.removed_entries.is_empty() {
2501                worktree_entry::Entity::update_many()
2502                    .filter(
2503                        worktree_entry::Column::ProjectId
2504                            .eq(project_id)
2505                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2506                            .and(
2507                                worktree_entry::Column::Id
2508                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2509                            ),
2510                    )
2511                    .set(worktree_entry::ActiveModel {
2512                        is_deleted: ActiveValue::Set(true),
2513                        scan_id: ActiveValue::Set(update.scan_id as i64),
2514                        ..Default::default()
2515                    })
2516                    .exec(&*tx)
2517                    .await?;
2518            }
2519
2520            if !update.updated_repositories.is_empty() {
2521                worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
2522                    |repository| worktree_repository::ActiveModel {
2523                        project_id: ActiveValue::set(project_id),
2524                        worktree_id: ActiveValue::set(worktree_id),
2525                        work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
2526                        scan_id: ActiveValue::set(update.scan_id as i64),
2527                        branch: ActiveValue::set(repository.branch.clone()),
2528                        is_deleted: ActiveValue::set(false),
2529                    },
2530                ))
2531                .on_conflict(
2532                    OnConflict::columns([
2533                        worktree_repository::Column::ProjectId,
2534                        worktree_repository::Column::WorktreeId,
2535                        worktree_repository::Column::WorkDirectoryId,
2536                    ])
2537                    .update_columns([
2538                        worktree_repository::Column::ScanId,
2539                        worktree_repository::Column::Branch,
2540                    ])
2541                    .to_owned(),
2542                )
2543                .exec(&*tx)
2544                .await?;
2545            }
2546
2547            if !update.removed_repositories.is_empty() {
2548                worktree_repository::Entity::update_many()
2549                    .filter(
2550                        worktree_repository::Column::ProjectId
2551                            .eq(project_id)
2552                            .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
2553                            .and(
2554                                worktree_repository::Column::WorkDirectoryId
2555                                    .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
2556                            ),
2557                    )
2558                    .set(worktree_repository::ActiveModel {
2559                        is_deleted: ActiveValue::Set(true),
2560                        scan_id: ActiveValue::Set(update.scan_id as i64),
2561                        ..Default::default()
2562                    })
2563                    .exec(&*tx)
2564                    .await?;
2565            }
2566
2567            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2568            Ok(connection_ids)
2569        })
2570        .await
2571    }
2572
2573    pub async fn update_diagnostic_summary(
2574        &self,
2575        update: &proto::UpdateDiagnosticSummary,
2576        connection: ConnectionId,
2577    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2578        let project_id = ProjectId::from_proto(update.project_id);
2579        let worktree_id = update.worktree_id as i64;
2580        let room_id = self.room_id_for_project(project_id).await?;
2581        self.room_transaction(room_id, |tx| async move {
2582            let summary = update
2583                .summary
2584                .as_ref()
2585                .ok_or_else(|| anyhow!("invalid summary"))?;
2586
2587            // Ensure the update comes from the host.
2588            let project = project::Entity::find_by_id(project_id)
2589                .one(&*tx)
2590                .await?
2591                .ok_or_else(|| anyhow!("no such project"))?;
2592            if project.host_connection()? != connection {
2593                return Err(anyhow!("can't update a project hosted by someone else"))?;
2594            }
2595
2596            // Update summary.
2597            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2598                project_id: ActiveValue::set(project_id),
2599                worktree_id: ActiveValue::set(worktree_id),
2600                path: ActiveValue::set(summary.path.clone()),
2601                language_server_id: ActiveValue::set(summary.language_server_id as i64),
2602                error_count: ActiveValue::set(summary.error_count as i32),
2603                warning_count: ActiveValue::set(summary.warning_count as i32),
2604                ..Default::default()
2605            })
2606            .on_conflict(
2607                OnConflict::columns([
2608                    worktree_diagnostic_summary::Column::ProjectId,
2609                    worktree_diagnostic_summary::Column::WorktreeId,
2610                    worktree_diagnostic_summary::Column::Path,
2611                ])
2612                .update_columns([
2613                    worktree_diagnostic_summary::Column::LanguageServerId,
2614                    worktree_diagnostic_summary::Column::ErrorCount,
2615                    worktree_diagnostic_summary::Column::WarningCount,
2616                ])
2617                .to_owned(),
2618            )
2619            .exec(&*tx)
2620            .await?;
2621
2622            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2623            Ok(connection_ids)
2624        })
2625        .await
2626    }
2627
2628    pub async fn start_language_server(
2629        &self,
2630        update: &proto::StartLanguageServer,
2631        connection: ConnectionId,
2632    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2633        let project_id = ProjectId::from_proto(update.project_id);
2634        let room_id = self.room_id_for_project(project_id).await?;
2635        self.room_transaction(room_id, |tx| async move {
2636            let server = update
2637                .server
2638                .as_ref()
2639                .ok_or_else(|| anyhow!("invalid language server"))?;
2640
2641            // Ensure the update comes from the host.
2642            let project = project::Entity::find_by_id(project_id)
2643                .one(&*tx)
2644                .await?
2645                .ok_or_else(|| anyhow!("no such project"))?;
2646            if project.host_connection()? != connection {
2647                return Err(anyhow!("can't update a project hosted by someone else"))?;
2648            }
2649
2650            // Add the newly-started language server.
2651            language_server::Entity::insert(language_server::ActiveModel {
2652                project_id: ActiveValue::set(project_id),
2653                id: ActiveValue::set(server.id as i64),
2654                name: ActiveValue::set(server.name.clone()),
2655                ..Default::default()
2656            })
2657            .on_conflict(
2658                OnConflict::columns([
2659                    language_server::Column::ProjectId,
2660                    language_server::Column::Id,
2661                ])
2662                .update_column(language_server::Column::Name)
2663                .to_owned(),
2664            )
2665            .exec(&*tx)
2666            .await?;
2667
2668            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2669            Ok(connection_ids)
2670        })
2671        .await
2672    }
2673
2674    pub async fn update_worktree_settings(
2675        &self,
2676        update: &proto::UpdateWorktreeSettings,
2677        connection: ConnectionId,
2678    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2679        let project_id = ProjectId::from_proto(update.project_id);
2680        let room_id = self.room_id_for_project(project_id).await?;
2681        self.room_transaction(room_id, |tx| async move {
2682            // Ensure the update comes from the host.
2683            let project = project::Entity::find_by_id(project_id)
2684                .one(&*tx)
2685                .await?
2686                .ok_or_else(|| anyhow!("no such project"))?;
2687            if project.host_connection()? != connection {
2688                return Err(anyhow!("can't update a project hosted by someone else"))?;
2689            }
2690
2691            if let Some(content) = &update.content {
2692                worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
2693                    project_id: ActiveValue::Set(project_id),
2694                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
2695                    path: ActiveValue::Set(update.path.clone()),
2696                    content: ActiveValue::Set(content.clone()),
2697                })
2698                .on_conflict(
2699                    OnConflict::columns([
2700                        worktree_settings_file::Column::ProjectId,
2701                        worktree_settings_file::Column::WorktreeId,
2702                        worktree_settings_file::Column::Path,
2703                    ])
2704                    .update_column(worktree_settings_file::Column::Content)
2705                    .to_owned(),
2706                )
2707                .exec(&*tx)
2708                .await?;
2709            } else {
2710                worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
2711                    project_id: ActiveValue::Set(project_id),
2712                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
2713                    path: ActiveValue::Set(update.path.clone()),
2714                    ..Default::default()
2715                })
2716                .exec(&*tx)
2717                .await?;
2718            }
2719
2720            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2721            Ok(connection_ids)
2722        })
2723        .await
2724    }
2725
2726    pub async fn join_project(
2727        &self,
2728        project_id: ProjectId,
2729        connection: ConnectionId,
2730    ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2731        let room_id = self.room_id_for_project(project_id).await?;
2732        self.room_transaction(room_id, |tx| async move {
2733            let participant = room_participant::Entity::find()
2734                .filter(
2735                    Condition::all()
2736                        .add(
2737                            room_participant::Column::AnsweringConnectionId
2738                                .eq(connection.id as i32),
2739                        )
2740                        .add(
2741                            room_participant::Column::AnsweringConnectionServerId
2742                                .eq(connection.owner_id as i32),
2743                        ),
2744                )
2745                .one(&*tx)
2746                .await?
2747                .ok_or_else(|| anyhow!("must join a room first"))?;
2748
2749            let project = project::Entity::find_by_id(project_id)
2750                .one(&*tx)
2751                .await?
2752                .ok_or_else(|| anyhow!("no such project"))?;
2753            if project.room_id != participant.room_id {
2754                return Err(anyhow!("no such project"))?;
2755            }
2756
2757            let mut collaborators = project
2758                .find_related(project_collaborator::Entity)
2759                .all(&*tx)
2760                .await?;
2761            let replica_ids = collaborators
2762                .iter()
2763                .map(|c| c.replica_id)
2764                .collect::<HashSet<_>>();
2765            let mut replica_id = ReplicaId(1);
2766            while replica_ids.contains(&replica_id) {
2767                replica_id.0 += 1;
2768            }
2769            let new_collaborator = project_collaborator::ActiveModel {
2770                project_id: ActiveValue::set(project_id),
2771                connection_id: ActiveValue::set(connection.id as i32),
2772                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2773                user_id: ActiveValue::set(participant.user_id),
2774                replica_id: ActiveValue::set(replica_id),
2775                is_host: ActiveValue::set(false),
2776                ..Default::default()
2777            }
2778            .insert(&*tx)
2779            .await?;
2780            collaborators.push(new_collaborator);
2781
2782            let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2783            let mut worktrees = db_worktrees
2784                .into_iter()
2785                .map(|db_worktree| {
2786                    (
2787                        db_worktree.id as u64,
2788                        Worktree {
2789                            id: db_worktree.id as u64,
2790                            abs_path: db_worktree.abs_path,
2791                            root_name: db_worktree.root_name,
2792                            visible: db_worktree.visible,
2793                            entries: Default::default(),
2794                            repository_entries: Default::default(),
2795                            diagnostic_summaries: Default::default(),
2796                            settings_files: Default::default(),
2797                            scan_id: db_worktree.scan_id as u64,
2798                            completed_scan_id: db_worktree.completed_scan_id as u64,
2799                        },
2800                    )
2801                })
2802                .collect::<BTreeMap<_, _>>();
2803
2804            // Populate worktree entries.
2805            {
2806                let mut db_entries = worktree_entry::Entity::find()
2807                    .filter(
2808                        Condition::all()
2809                            .add(worktree_entry::Column::ProjectId.eq(project_id))
2810                            .add(worktree_entry::Column::IsDeleted.eq(false)),
2811                    )
2812                    .stream(&*tx)
2813                    .await?;
2814                while let Some(db_entry) = db_entries.next().await {
2815                    let db_entry = db_entry?;
2816                    if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2817                        worktree.entries.push(proto::Entry {
2818                            id: db_entry.id as u64,
2819                            is_dir: db_entry.is_dir,
2820                            path: db_entry.path,
2821                            inode: db_entry.inode as u64,
2822                            mtime: Some(proto::Timestamp {
2823                                seconds: db_entry.mtime_seconds as u64,
2824                                nanos: db_entry.mtime_nanos as u32,
2825                            }),
2826                            is_symlink: db_entry.is_symlink,
2827                            is_ignored: db_entry.is_ignored,
2828                            is_external: db_entry.is_external,
2829                            git_status: db_entry.git_status.map(|status| status as i32),
2830                        });
2831                    }
2832                }
2833            }
2834
2835            // Populate repository entries.
2836            {
2837                let mut db_repository_entries = worktree_repository::Entity::find()
2838                    .filter(
2839                        Condition::all()
2840                            .add(worktree_repository::Column::ProjectId.eq(project_id))
2841                            .add(worktree_repository::Column::IsDeleted.eq(false)),
2842                    )
2843                    .stream(&*tx)
2844                    .await?;
2845                while let Some(db_repository_entry) = db_repository_entries.next().await {
2846                    let db_repository_entry = db_repository_entry?;
2847                    if let Some(worktree) =
2848                        worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
2849                    {
2850                        worktree.repository_entries.insert(
2851                            db_repository_entry.work_directory_id as u64,
2852                            proto::RepositoryEntry {
2853                                work_directory_id: db_repository_entry.work_directory_id as u64,
2854                                branch: db_repository_entry.branch,
2855                            },
2856                        );
2857                    }
2858                }
2859            }
2860
2861            // Populate worktree diagnostic summaries.
2862            {
2863                let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2864                    .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2865                    .stream(&*tx)
2866                    .await?;
2867                while let Some(db_summary) = db_summaries.next().await {
2868                    let db_summary = db_summary?;
2869                    if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2870                        worktree
2871                            .diagnostic_summaries
2872                            .push(proto::DiagnosticSummary {
2873                                path: db_summary.path,
2874                                language_server_id: db_summary.language_server_id as u64,
2875                                error_count: db_summary.error_count as u32,
2876                                warning_count: db_summary.warning_count as u32,
2877                            });
2878                    }
2879                }
2880            }
2881
2882            // Populate worktree settings files
2883            {
2884                let mut db_settings_files = worktree_settings_file::Entity::find()
2885                    .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
2886                    .stream(&*tx)
2887                    .await?;
2888                while let Some(db_settings_file) = db_settings_files.next().await {
2889                    let db_settings_file = db_settings_file?;
2890                    if let Some(worktree) =
2891                        worktrees.get_mut(&(db_settings_file.worktree_id as u64))
2892                    {
2893                        worktree.settings_files.push(WorktreeSettingsFile {
2894                            path: db_settings_file.path,
2895                            content: db_settings_file.content,
2896                        });
2897                    }
2898                }
2899            }
2900
2901            // Populate language servers.
2902            let language_servers = project
2903                .find_related(language_server::Entity)
2904                .all(&*tx)
2905                .await?;
2906
2907            let project = Project {
2908                collaborators: collaborators
2909                    .into_iter()
2910                    .map(|collaborator| ProjectCollaborator {
2911                        connection_id: collaborator.connection(),
2912                        user_id: collaborator.user_id,
2913                        replica_id: collaborator.replica_id,
2914                        is_host: collaborator.is_host,
2915                    })
2916                    .collect(),
2917                worktrees,
2918                language_servers: language_servers
2919                    .into_iter()
2920                    .map(|language_server| proto::LanguageServer {
2921                        id: language_server.id as u64,
2922                        name: language_server.name,
2923                    })
2924                    .collect(),
2925            };
2926            Ok((project, replica_id as ReplicaId))
2927        })
2928        .await
2929    }
2930
2931    pub async fn leave_project(
2932        &self,
2933        project_id: ProjectId,
2934        connection: ConnectionId,
2935    ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
2936        let room_id = self.room_id_for_project(project_id).await?;
2937        self.room_transaction(room_id, |tx| async move {
2938            let result = project_collaborator::Entity::delete_many()
2939                .filter(
2940                    Condition::all()
2941                        .add(project_collaborator::Column::ProjectId.eq(project_id))
2942                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2943                        .add(
2944                            project_collaborator::Column::ConnectionServerId
2945                                .eq(connection.owner_id as i32),
2946                        ),
2947                )
2948                .exec(&*tx)
2949                .await?;
2950            if result.rows_affected == 0 {
2951                Err(anyhow!("not a collaborator on this project"))?;
2952            }
2953
2954            let project = project::Entity::find_by_id(project_id)
2955                .one(&*tx)
2956                .await?
2957                .ok_or_else(|| anyhow!("no such project"))?;
2958            let collaborators = project
2959                .find_related(project_collaborator::Entity)
2960                .all(&*tx)
2961                .await?;
2962            let connection_ids = collaborators
2963                .into_iter()
2964                .map(|collaborator| collaborator.connection())
2965                .collect();
2966
2967            follower::Entity::delete_many()
2968                .filter(
2969                    Condition::any()
2970                        .add(
2971                            Condition::all()
2972                                .add(follower::Column::ProjectId.eq(project_id))
2973                                .add(
2974                                    follower::Column::LeaderConnectionServerId
2975                                        .eq(connection.owner_id),
2976                                )
2977                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
2978                        )
2979                        .add(
2980                            Condition::all()
2981                                .add(follower::Column::ProjectId.eq(project_id))
2982                                .add(
2983                                    follower::Column::FollowerConnectionServerId
2984                                        .eq(connection.owner_id),
2985                                )
2986                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
2987                        ),
2988                )
2989                .exec(&*tx)
2990                .await?;
2991
2992            let room = self.get_room(project.room_id, &tx).await?;
2993            let left_project = LeftProject {
2994                id: project_id,
2995                host_user_id: project.host_user_id,
2996                host_connection_id: project.host_connection()?,
2997                connection_ids,
2998            };
2999            Ok((room, left_project))
3000        })
3001        .await
3002    }
3003
3004    pub async fn project_collaborators(
3005        &self,
3006        project_id: ProjectId,
3007        connection_id: ConnectionId,
3008    ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
3009        let room_id = self.room_id_for_project(project_id).await?;
3010        self.room_transaction(room_id, |tx| async move {
3011            let collaborators = project_collaborator::Entity::find()
3012                .filter(project_collaborator::Column::ProjectId.eq(project_id))
3013                .all(&*tx)
3014                .await?
3015                .into_iter()
3016                .map(|collaborator| ProjectCollaborator {
3017                    connection_id: collaborator.connection(),
3018                    user_id: collaborator.user_id,
3019                    replica_id: collaborator.replica_id,
3020                    is_host: collaborator.is_host,
3021                })
3022                .collect::<Vec<_>>();
3023
3024            if collaborators
3025                .iter()
3026                .any(|collaborator| collaborator.connection_id == connection_id)
3027            {
3028                Ok(collaborators)
3029            } else {
3030                Err(anyhow!("no such project"))?
3031            }
3032        })
3033        .await
3034    }
3035
3036    pub async fn project_connection_ids(
3037        &self,
3038        project_id: ProjectId,
3039        connection_id: ConnectionId,
3040    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
3041        let room_id = self.room_id_for_project(project_id).await?;
3042        self.room_transaction(room_id, |tx| async move {
3043            let mut collaborators = project_collaborator::Entity::find()
3044                .filter(project_collaborator::Column::ProjectId.eq(project_id))
3045                .stream(&*tx)
3046                .await?;
3047
3048            let mut connection_ids = HashSet::default();
3049            while let Some(collaborator) = collaborators.next().await {
3050                let collaborator = collaborator?;
3051                connection_ids.insert(collaborator.connection());
3052            }
3053
3054            if connection_ids.contains(&connection_id) {
3055                Ok(connection_ids)
3056            } else {
3057                Err(anyhow!("no such project"))?
3058            }
3059        })
3060        .await
3061    }
3062
3063    async fn project_guest_connection_ids(
3064        &self,
3065        project_id: ProjectId,
3066        tx: &DatabaseTransaction,
3067    ) -> Result<Vec<ConnectionId>> {
3068        let mut collaborators = project_collaborator::Entity::find()
3069            .filter(
3070                project_collaborator::Column::ProjectId
3071                    .eq(project_id)
3072                    .and(project_collaborator::Column::IsHost.eq(false)),
3073            )
3074            .stream(tx)
3075            .await?;
3076
3077        let mut guest_connection_ids = Vec::new();
3078        while let Some(collaborator) = collaborators.next().await {
3079            let collaborator = collaborator?;
3080            guest_connection_ids.push(collaborator.connection());
3081        }
3082        Ok(guest_connection_ids)
3083    }
3084
3085    async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
3086        self.transaction(|tx| async move {
3087            let project = project::Entity::find_by_id(project_id)
3088                .one(&*tx)
3089                .await?
3090                .ok_or_else(|| anyhow!("project {} not found", project_id))?;
3091            Ok(project.room_id)
3092        })
3093        .await
3094    }
3095
3096    // access tokens
3097
3098    pub async fn create_access_token(
3099        &self,
3100        user_id: UserId,
3101        access_token_hash: &str,
3102        max_access_token_count: usize,
3103    ) -> Result<AccessTokenId> {
3104        self.transaction(|tx| async {
3105            let tx = tx;
3106
3107            let token = access_token::ActiveModel {
3108                user_id: ActiveValue::set(user_id),
3109                hash: ActiveValue::set(access_token_hash.into()),
3110                ..Default::default()
3111            }
3112            .insert(&*tx)
3113            .await?;
3114
3115            access_token::Entity::delete_many()
3116                .filter(
3117                    access_token::Column::Id.in_subquery(
3118                        Query::select()
3119                            .column(access_token::Column::Id)
3120                            .from(access_token::Entity)
3121                            .and_where(access_token::Column::UserId.eq(user_id))
3122                            .order_by(access_token::Column::Id, sea_orm::Order::Desc)
3123                            .limit(10000)
3124                            .offset(max_access_token_count as u64)
3125                            .to_owned(),
3126                    ),
3127                )
3128                .exec(&*tx)
3129                .await?;
3130            Ok(token.id)
3131        })
3132        .await
3133    }
3134
3135    pub async fn get_access_token(
3136        &self,
3137        access_token_id: AccessTokenId,
3138    ) -> Result<access_token::Model> {
3139        self.transaction(|tx| async move {
3140            Ok(access_token::Entity::find_by_id(access_token_id)
3141                .one(&*tx)
3142                .await?
3143                .ok_or_else(|| anyhow!("no such access token"))?)
3144        })
3145        .await
3146    }
3147
3148    // channels
3149
3150    pub async fn create_root_channel(
3151        &self,
3152        name: &str,
3153        live_kit_room: &str,
3154        creator_id: UserId,
3155    ) -> Result<ChannelId> {
3156        self.create_channel(name, None, live_kit_room, creator_id)
3157            .await
3158    }
3159
3160    pub async fn create_channel(
3161        &self,
3162        name: &str,
3163        parent: Option<ChannelId>,
3164        live_kit_room: &str,
3165        creator_id: UserId,
3166    ) -> Result<ChannelId> {
3167        self.transaction(move |tx| async move {
3168            if let Some(parent) = parent {
3169                self.check_user_is_channel_admin(parent, creator_id, &*tx)
3170                    .await?;
3171            }
3172
3173            let channel = channel::ActiveModel {
3174                name: ActiveValue::Set(name.to_string()),
3175                ..Default::default()
3176            }
3177            .insert(&*tx)
3178            .await?;
3179
3180            if let Some(parent) = parent {
3181                channel_parent::ActiveModel {
3182                    child_id: ActiveValue::Set(channel.id),
3183                    parent_id: ActiveValue::Set(parent),
3184                }
3185                .insert(&*tx)
3186                .await?;
3187            }
3188
3189            channel_member::ActiveModel {
3190                channel_id: ActiveValue::Set(channel.id),
3191                user_id: ActiveValue::Set(creator_id),
3192                accepted: ActiveValue::Set(true),
3193                admin: ActiveValue::Set(true),
3194                ..Default::default()
3195            }
3196            .insert(&*tx)
3197            .await?;
3198
3199            room::ActiveModel {
3200                channel_id: ActiveValue::Set(Some(channel.id)),
3201                live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
3202                ..Default::default()
3203            }
3204            .insert(&*tx)
3205            .await?;
3206
3207            Ok(channel.id)
3208        })
3209        .await
3210    }
3211
3212    pub async fn remove_channel(
3213        &self,
3214        channel_id: ChannelId,
3215        user_id: UserId,
3216    ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
3217        self.transaction(move |tx| async move {
3218            self.check_user_is_channel_admin(channel_id, user_id, &*tx)
3219                .await?;
3220
3221            // Don't remove descendant channels that have additional parents.
3222            let mut channels_to_remove = self.get_channel_descendants([channel_id], &*tx).await?;
3223            {
3224                let mut channels_to_keep = channel_parent::Entity::find()
3225                    .filter(
3226                        channel_parent::Column::ChildId
3227                            .is_in(
3228                                channels_to_remove
3229                                    .keys()
3230                                    .copied()
3231                                    .filter(|&id| id != channel_id),
3232                            )
3233                            .and(
3234                                channel_parent::Column::ParentId
3235                                    .is_not_in(channels_to_remove.keys().copied()),
3236                            ),
3237                    )
3238                    .stream(&*tx)
3239                    .await?;
3240                while let Some(row) = channels_to_keep.next().await {
3241                    let row = row?;
3242                    channels_to_remove.remove(&row.child_id);
3243                }
3244            }
3245
3246            let channel_ancestors = self.get_channel_ancestors(channel_id, &*tx).await?;
3247            let members_to_notify: Vec<UserId> = channel_member::Entity::find()
3248                .filter(channel_member::Column::ChannelId.is_in(channel_ancestors))
3249                .select_only()
3250                .column(channel_member::Column::UserId)
3251                .distinct()
3252                .into_values::<_, QueryUserIds>()
3253                .all(&*tx)
3254                .await?;
3255
3256            channel::Entity::delete_many()
3257                .filter(channel::Column::Id.is_in(channels_to_remove.keys().copied()))
3258                .exec(&*tx)
3259                .await?;
3260
3261            Ok((channels_to_remove.into_keys().collect(), members_to_notify))
3262        })
3263        .await
3264    }
3265
3266    pub async fn invite_channel_member(
3267        &self,
3268        channel_id: ChannelId,
3269        invitee_id: UserId,
3270        inviter_id: UserId,
3271        is_admin: bool,
3272    ) -> Result<()> {
3273        self.transaction(move |tx| async move {
3274            self.check_user_is_channel_admin(channel_id, inviter_id, &*tx)
3275                .await?;
3276
3277            channel_member::ActiveModel {
3278                channel_id: ActiveValue::Set(channel_id),
3279                user_id: ActiveValue::Set(invitee_id),
3280                accepted: ActiveValue::Set(false),
3281                admin: ActiveValue::Set(is_admin),
3282                ..Default::default()
3283            }
3284            .insert(&*tx)
3285            .await?;
3286
3287            Ok(())
3288        })
3289        .await
3290    }
3291
3292    pub async fn respond_to_channel_invite(
3293        &self,
3294        channel_id: ChannelId,
3295        user_id: UserId,
3296        accept: bool,
3297    ) -> Result<()> {
3298        self.transaction(move |tx| async move {
3299            let rows_affected = if accept {
3300                channel_member::Entity::update_many()
3301                    .set(channel_member::ActiveModel {
3302                        accepted: ActiveValue::Set(accept),
3303                        ..Default::default()
3304                    })
3305                    .filter(
3306                        channel_member::Column::ChannelId
3307                            .eq(channel_id)
3308                            .and(channel_member::Column::UserId.eq(user_id))
3309                            .and(channel_member::Column::Accepted.eq(false)),
3310                    )
3311                    .exec(&*tx)
3312                    .await?
3313                    .rows_affected
3314            } else {
3315                channel_member::ActiveModel {
3316                    channel_id: ActiveValue::Unchanged(channel_id),
3317                    user_id: ActiveValue::Unchanged(user_id),
3318                    ..Default::default()
3319                }
3320                .delete(&*tx)
3321                .await?
3322                .rows_affected
3323            };
3324
3325            if rows_affected == 0 {
3326                Err(anyhow!("no such invitation"))?;
3327            }
3328
3329            Ok(())
3330        })
3331        .await
3332    }
3333
3334    pub async fn remove_channel_member(
3335        &self,
3336        channel_id: ChannelId,
3337        member_id: UserId,
3338        remover_id: UserId,
3339    ) -> Result<()> {
3340        self.transaction(|tx| async move {
3341            self.check_user_is_channel_admin(channel_id, remover_id, &*tx)
3342                .await?;
3343
3344            let result = channel_member::Entity::delete_many()
3345                .filter(
3346                    channel_member::Column::ChannelId
3347                        .eq(channel_id)
3348                        .and(channel_member::Column::UserId.eq(member_id)),
3349                )
3350                .exec(&*tx)
3351                .await?;
3352
3353            if result.rows_affected == 0 {
3354                Err(anyhow!("no such member"))?;
3355            }
3356
3357            Ok(())
3358        })
3359        .await
3360    }
3361
3362    pub async fn get_channel_invites_for_user(&self, user_id: UserId) -> Result<Vec<Channel>> {
3363        self.transaction(|tx| async move {
3364            let channel_invites = channel_member::Entity::find()
3365                .filter(
3366                    channel_member::Column::UserId
3367                        .eq(user_id)
3368                        .and(channel_member::Column::Accepted.eq(false)),
3369                )
3370                .all(&*tx)
3371                .await?;
3372
3373            let channels = channel::Entity::find()
3374                .filter(
3375                    channel::Column::Id.is_in(
3376                        channel_invites
3377                            .into_iter()
3378                            .map(|channel_member| channel_member.channel_id),
3379                    ),
3380                )
3381                .all(&*tx)
3382                .await?;
3383
3384            let channels = channels
3385                .into_iter()
3386                .map(|channel| Channel {
3387                    id: channel.id,
3388                    name: channel.name,
3389                    user_is_admin: false,
3390                    parent_id: None,
3391                })
3392                .collect();
3393
3394            Ok(channels)
3395        })
3396        .await
3397    }
3398
3399    pub async fn get_channels_for_user(
3400        &self,
3401        user_id: UserId,
3402    ) -> Result<(Vec<Channel>, HashMap<ChannelId, Vec<UserId>>)> {
3403        self.transaction(|tx| async move {
3404            let tx = tx;
3405
3406            let channel_memberships = channel_member::Entity::find()
3407                .filter(
3408                    channel_member::Column::UserId
3409                        .eq(user_id)
3410                        .and(channel_member::Column::Accepted.eq(true)),
3411                )
3412                .all(&*tx)
3413                .await?;
3414
3415            let admin_channel_ids = channel_memberships
3416                .iter()
3417                .filter_map(|m| m.admin.then_some(m.channel_id))
3418                .collect::<HashSet<_>>();
3419            let parents_by_child_id = self
3420                .get_channel_descendants(channel_memberships.iter().map(|m| m.channel_id), &*tx)
3421                .await?;
3422
3423            let mut channels = Vec::with_capacity(parents_by_child_id.len());
3424            {
3425                let mut rows = channel::Entity::find()
3426                    .filter(channel::Column::Id.is_in(parents_by_child_id.keys().copied()))
3427                    .stream(&*tx)
3428                    .await?;
3429                while let Some(row) = rows.next().await {
3430                    let row = row?;
3431                    channels.push(Channel {
3432                        id: row.id,
3433                        name: row.name,
3434                        user_is_admin: admin_channel_ids.contains(&row.id),
3435                        parent_id: parents_by_child_id.get(&row.id).copied().flatten(),
3436                    });
3437                }
3438            }
3439
3440            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
3441            enum QueryUserIdsAndChannelIds {
3442                ChannelId,
3443                UserId,
3444            }
3445
3446            let mut participants_by_channel: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
3447            {
3448                let mut rows = room_participant::Entity::find()
3449                    .inner_join(room::Entity)
3450                    .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
3451                    .select_only()
3452                    .column(room::Column::ChannelId)
3453                    .column(room_participant::Column::UserId)
3454                    .into_values::<_, QueryUserIdsAndChannelIds>()
3455                    .stream(&*tx)
3456                    .await?;
3457                while let Some(row) = rows.next().await {
3458                    let row: (ChannelId, UserId) = row?;
3459                    participants_by_channel
3460                        .entry(row.0)
3461                        .or_default()
3462                        .push(row.1)
3463                }
3464            }
3465
3466            Ok((channels, participants_by_channel))
3467        })
3468        .await
3469    }
3470
3471    pub async fn get_channel_members(&self, id: ChannelId) -> Result<Vec<UserId>> {
3472        self.transaction(|tx| async move { self.get_channel_members_internal(id, &*tx).await })
3473            .await
3474    }
3475
3476    pub async fn set_channel_member_admin(
3477        &self,
3478        channel_id: ChannelId,
3479        from: UserId,
3480        for_user: UserId,
3481        admin: bool,
3482    ) -> Result<()> {
3483        self.transaction(|tx| async move {
3484            self.check_user_is_channel_admin(channel_id, from, &*tx)
3485                .await?;
3486
3487            let result = channel_member::Entity::update_many()
3488                .filter(
3489                    channel_member::Column::ChannelId
3490                        .eq(channel_id)
3491                        .and(channel_member::Column::UserId.eq(for_user)),
3492                )
3493                .set(channel_member::ActiveModel {
3494                    admin: ActiveValue::set(admin),
3495                    ..Default::default()
3496                })
3497                .exec(&*tx)
3498                .await?;
3499
3500            if result.rows_affected == 0 {
3501                Err(anyhow!("no such member"))?;
3502            }
3503
3504            Ok(())
3505        })
3506        .await
3507    }
3508
3509    pub async fn get_channel_member_details(
3510        &self,
3511        channel_id: ChannelId,
3512        user_id: UserId,
3513    ) -> Result<Vec<proto::ChannelMember>> {
3514        self.transaction(|tx| async move {
3515            self.check_user_is_channel_admin(channel_id, user_id, &*tx)
3516                .await?;
3517
3518            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
3519            enum QueryMemberDetails {
3520                UserId,
3521                Admin,
3522                IsDirectMember,
3523                Accepted,
3524            }
3525
3526            let tx = tx;
3527            let ancestor_ids = self.get_channel_ancestors(channel_id, &*tx).await?;
3528            let mut stream = channel_member::Entity::find()
3529                .distinct()
3530                .filter(channel_member::Column::ChannelId.is_in(ancestor_ids.iter().copied()))
3531                .select_only()
3532                .column(channel_member::Column::UserId)
3533                .column(channel_member::Column::Admin)
3534                .column_as(
3535                    channel_member::Column::ChannelId.eq(channel_id),
3536                    QueryMemberDetails::IsDirectMember,
3537                )
3538                .column(channel_member::Column::Accepted)
3539                .order_by_asc(channel_member::Column::UserId)
3540                .into_values::<_, QueryMemberDetails>()
3541                .stream(&*tx)
3542                .await?;
3543
3544            let mut rows = Vec::<proto::ChannelMember>::new();
3545            while let Some(row) = stream.next().await {
3546                let (user_id, is_admin, is_direct_member, is_invite_accepted): (
3547                    UserId,
3548                    bool,
3549                    bool,
3550                    bool,
3551                ) = row?;
3552                let kind = match (is_direct_member, is_invite_accepted) {
3553                    (true, true) => proto::channel_member::Kind::Member,
3554                    (true, false) => proto::channel_member::Kind::Invitee,
3555                    (false, true) => proto::channel_member::Kind::AncestorMember,
3556                    (false, false) => continue,
3557                };
3558                let user_id = user_id.to_proto();
3559                let kind = kind.into();
3560                if let Some(last_row) = rows.last_mut() {
3561                    if last_row.user_id == user_id {
3562                        if is_direct_member {
3563                            last_row.kind = kind;
3564                            last_row.admin = is_admin;
3565                        }
3566                        continue;
3567                    }
3568                }
3569                rows.push(proto::ChannelMember {
3570                    user_id,
3571                    kind,
3572                    admin: is_admin,
3573                });
3574            }
3575
3576            Ok(rows)
3577        })
3578        .await
3579    }
3580
3581    pub async fn get_channel_members_internal(
3582        &self,
3583        id: ChannelId,
3584        tx: &DatabaseTransaction,
3585    ) -> Result<Vec<UserId>> {
3586        let ancestor_ids = self.get_channel_ancestors(id, tx).await?;
3587        let user_ids = channel_member::Entity::find()
3588            .distinct()
3589            .filter(channel_member::Column::ChannelId.is_in(ancestor_ids.iter().copied()))
3590            .select_only()
3591            .column(channel_member::Column::UserId)
3592            .into_values::<_, QueryUserIds>()
3593            .all(&*tx)
3594            .await?;
3595        Ok(user_ids)
3596    }
3597
3598    async fn check_user_is_channel_admin(
3599        &self,
3600        channel_id: ChannelId,
3601        user_id: UserId,
3602        tx: &DatabaseTransaction,
3603    ) -> Result<()> {
3604        let channel_ids = self.get_channel_ancestors(channel_id, tx).await?;
3605        channel_member::Entity::find()
3606            .filter(
3607                channel_member::Column::ChannelId
3608                    .is_in(channel_ids)
3609                    .and(channel_member::Column::UserId.eq(user_id))
3610                    .and(channel_member::Column::Admin.eq(true)),
3611            )
3612            .one(&*tx)
3613            .await?
3614            .ok_or_else(|| anyhow!("user is not allowed to remove this channel"))?;
3615        Ok(())
3616    }
3617
3618    async fn get_channel_ancestors(
3619        &self,
3620        channel_id: ChannelId,
3621        tx: &DatabaseTransaction,
3622    ) -> Result<Vec<ChannelId>> {
3623        let sql = format!(
3624            r#"
3625            WITH RECURSIVE channel_tree(child_id, parent_id) AS (
3626                    SELECT CAST(NULL as INTEGER) as child_id, root_ids.column1 as parent_id
3627                    FROM (VALUES ({})) as root_ids
3628                UNION
3629                    SELECT channel_parents.child_id, channel_parents.parent_id
3630                    FROM channel_parents, channel_tree
3631                    WHERE channel_parents.child_id = channel_tree.parent_id
3632            )
3633            SELECT DISTINCT channel_tree.parent_id
3634            FROM channel_tree
3635            "#,
3636            channel_id
3637        );
3638
3639        #[derive(FromQueryResult, Debug, PartialEq)]
3640        pub struct ChannelParent {
3641            pub parent_id: ChannelId,
3642        }
3643
3644        let stmt = Statement::from_string(self.pool.get_database_backend(), sql);
3645
3646        let mut channel_ids_stream = channel_parent::Entity::find()
3647            .from_raw_sql(stmt)
3648            .into_model::<ChannelParent>()
3649            .stream(&*tx)
3650            .await?;
3651
3652        let mut channel_ids = vec![];
3653        while let Some(channel_id) = channel_ids_stream.next().await {
3654            channel_ids.push(channel_id?.parent_id);
3655        }
3656
3657        Ok(channel_ids)
3658    }
3659
3660    async fn get_channel_descendants(
3661        &self,
3662        channel_ids: impl IntoIterator<Item = ChannelId>,
3663        tx: &DatabaseTransaction,
3664    ) -> Result<HashMap<ChannelId, Option<ChannelId>>> {
3665        let mut values = String::new();
3666        for id in channel_ids {
3667            if !values.is_empty() {
3668                values.push_str(", ");
3669            }
3670            write!(&mut values, "({})", id).unwrap();
3671        }
3672
3673        if values.is_empty() {
3674            return Ok(HashMap::default());
3675        }
3676
3677        let sql = format!(
3678            r#"
3679            WITH RECURSIVE channel_tree(child_id, parent_id) AS (
3680                    SELECT root_ids.column1 as child_id, CAST(NULL as INTEGER) as parent_id
3681                    FROM (VALUES {values}) as root_ids
3682                UNION
3683                    SELECT channel_parents.child_id, channel_parents.parent_id
3684                    FROM channel_parents, channel_tree
3685                    WHERE channel_parents.parent_id = channel_tree.child_id
3686            )
3687            SELECT channel_tree.child_id, channel_tree.parent_id
3688            FROM channel_tree
3689            ORDER BY child_id, parent_id IS NOT NULL
3690            "#,
3691        );
3692
3693        #[derive(FromQueryResult, Debug, PartialEq)]
3694        pub struct ChannelParent {
3695            pub child_id: ChannelId,
3696            pub parent_id: Option<ChannelId>,
3697        }
3698
3699        let stmt = Statement::from_string(self.pool.get_database_backend(), sql);
3700
3701        let mut parents_by_child_id = HashMap::default();
3702        let mut parents = channel_parent::Entity::find()
3703            .from_raw_sql(stmt)
3704            .into_model::<ChannelParent>()
3705            .stream(tx)
3706            .await?;
3707
3708        while let Some(parent) = parents.next().await {
3709            let parent = parent?;
3710            parents_by_child_id.insert(parent.child_id, parent.parent_id);
3711        }
3712
3713        Ok(parents_by_child_id)
3714    }
3715
3716    pub async fn get_channel(
3717        &self,
3718        channel_id: ChannelId,
3719        user_id: UserId,
3720    ) -> Result<Option<Channel>> {
3721        self.transaction(|tx| async move {
3722            let tx = tx;
3723            let channel = channel::Entity::find_by_id(channel_id).one(&*tx).await?;
3724            let user_is_admin = channel_member::Entity::find()
3725                .filter(
3726                    channel_member::Column::ChannelId
3727                        .eq(channel_id)
3728                        .and(channel_member::Column::UserId.eq(user_id))
3729                        .and(channel_member::Column::Admin.eq(true)),
3730                )
3731                .count(&*tx)
3732                .await?
3733                > 0;
3734
3735            Ok(channel.map(|channel| Channel {
3736                id: channel.id,
3737                name: channel.name,
3738                user_is_admin,
3739                parent_id: None,
3740            }))
3741        })
3742        .await
3743    }
3744
3745    pub async fn room_id_for_channel(&self, channel_id: ChannelId) -> Result<RoomId> {
3746        self.transaction(|tx| async move {
3747            let tx = tx;
3748            let room = channel::Model {
3749                id: channel_id,
3750                ..Default::default()
3751            }
3752            .find_related(room::Entity)
3753            .one(&*tx)
3754            .await?
3755            .ok_or_else(|| anyhow!("invalid channel"))?;
3756            Ok(room.id)
3757        })
3758        .await
3759    }
3760
3761    async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
3762    where
3763        F: Send + Fn(TransactionHandle) -> Fut,
3764        Fut: Send + Future<Output = Result<T>>,
3765    {
3766        let body = async {
3767            let mut i = 0;
3768            loop {
3769                let (tx, result) = self.with_transaction(&f).await?;
3770                match result {
3771                    Ok(result) => match tx.commit().await.map_err(Into::into) {
3772                        Ok(()) => return Ok(result),
3773                        Err(error) => {
3774                            if !self.retry_on_serialization_error(&error, i).await {
3775                                return Err(error);
3776                            }
3777                        }
3778                    },
3779                    Err(error) => {
3780                        tx.rollback().await?;
3781                        if !self.retry_on_serialization_error(&error, i).await {
3782                            return Err(error);
3783                        }
3784                    }
3785                }
3786                i += 1;
3787            }
3788        };
3789
3790        self.run(body).await
3791    }
3792
3793    async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
3794    where
3795        F: Send + Fn(TransactionHandle) -> Fut,
3796        Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
3797    {
3798        let body = async {
3799            let mut i = 0;
3800            loop {
3801                let (tx, result) = self.with_transaction(&f).await?;
3802                match result {
3803                    Ok(Some((room_id, data))) => {
3804                        let lock = self.rooms.entry(room_id).or_default().clone();
3805                        let _guard = lock.lock_owned().await;
3806                        match tx.commit().await.map_err(Into::into) {
3807                            Ok(()) => {
3808                                return Ok(Some(RoomGuard {
3809                                    data,
3810                                    _guard,
3811                                    _not_send: PhantomData,
3812                                }));
3813                            }
3814                            Err(error) => {
3815                                if !self.retry_on_serialization_error(&error, i).await {
3816                                    return Err(error);
3817                                }
3818                            }
3819                        }
3820                    }
3821                    Ok(None) => match tx.commit().await.map_err(Into::into) {
3822                        Ok(()) => return Ok(None),
3823                        Err(error) => {
3824                            if !self.retry_on_serialization_error(&error, i).await {
3825                                return Err(error);
3826                            }
3827                        }
3828                    },
3829                    Err(error) => {
3830                        tx.rollback().await?;
3831                        if !self.retry_on_serialization_error(&error, i).await {
3832                            return Err(error);
3833                        }
3834                    }
3835                }
3836                i += 1;
3837            }
3838        };
3839
3840        self.run(body).await
3841    }
3842
3843    async fn room_transaction<F, Fut, T>(&self, room_id: RoomId, f: F) -> Result<RoomGuard<T>>
3844    where
3845        F: Send + Fn(TransactionHandle) -> Fut,
3846        Fut: Send + Future<Output = Result<T>>,
3847    {
3848        let body = async {
3849            let mut i = 0;
3850            loop {
3851                let lock = self.rooms.entry(room_id).or_default().clone();
3852                let _guard = lock.lock_owned().await;
3853                let (tx, result) = self.with_transaction(&f).await?;
3854                match result {
3855                    Ok(data) => match tx.commit().await.map_err(Into::into) {
3856                        Ok(()) => {
3857                            return Ok(RoomGuard {
3858                                data,
3859                                _guard,
3860                                _not_send: PhantomData,
3861                            });
3862                        }
3863                        Err(error) => {
3864                            if !self.retry_on_serialization_error(&error, i).await {
3865                                return Err(error);
3866                            }
3867                        }
3868                    },
3869                    Err(error) => {
3870                        tx.rollback().await?;
3871                        if !self.retry_on_serialization_error(&error, i).await {
3872                            return Err(error);
3873                        }
3874                    }
3875                }
3876                i += 1;
3877            }
3878        };
3879
3880        self.run(body).await
3881    }
3882
3883    async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
3884    where
3885        F: Send + Fn(TransactionHandle) -> Fut,
3886        Fut: Send + Future<Output = Result<T>>,
3887    {
3888        let tx = self
3889            .pool
3890            .begin_with_config(Some(IsolationLevel::Serializable), None)
3891            .await?;
3892
3893        let mut tx = Arc::new(Some(tx));
3894        let result = f(TransactionHandle(tx.clone())).await;
3895        let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
3896            return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
3897        };
3898
3899        Ok((tx, result))
3900    }
3901
3902    async fn run<F, T>(&self, future: F) -> Result<T>
3903    where
3904        F: Future<Output = Result<T>>,
3905    {
3906        #[cfg(test)]
3907        {
3908            if let Executor::Deterministic(executor) = &self.executor {
3909                executor.simulate_random_delay().await;
3910            }
3911
3912            self.runtime.as_ref().unwrap().block_on(future)
3913        }
3914
3915        #[cfg(not(test))]
3916        {
3917            future.await
3918        }
3919    }
3920
3921    async fn retry_on_serialization_error(&self, error: &Error, prev_attempt_count: u32) -> bool {
3922        // If the error is due to a failure to serialize concurrent transactions, then retry
3923        // this transaction after a delay. With each subsequent retry, double the delay duration.
3924        // Also vary the delay randomly in order to ensure different database connections retry
3925        // at different times.
3926        if is_serialization_error(error) {
3927            let base_delay = 4_u64 << prev_attempt_count.min(16);
3928            let randomized_delay = base_delay as f32 * self.rng.lock().await.gen_range(0.5..=2.0);
3929            log::info!(
3930                "retrying transaction after serialization error. delay: {} ms.",
3931                randomized_delay
3932            );
3933            self.executor
3934                .sleep(Duration::from_millis(randomized_delay as u64))
3935                .await;
3936            true
3937        } else {
3938            false
3939        }
3940    }
3941}
3942
3943fn is_serialization_error(error: &Error) -> bool {
3944    const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
3945    match error {
3946        Error::Database(
3947            DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
3948            | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
3949        ) if error
3950            .as_database_error()
3951            .and_then(|error| error.code())
3952            .as_deref()
3953            == Some(SERIALIZATION_FAILURE_CODE) =>
3954        {
3955            true
3956        }
3957        _ => false,
3958    }
3959}
3960
3961struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
3962
3963impl Deref for TransactionHandle {
3964    type Target = DatabaseTransaction;
3965
3966    fn deref(&self) -> &Self::Target {
3967        self.0.as_ref().as_ref().unwrap()
3968    }
3969}
3970
3971pub struct RoomGuard<T> {
3972    data: T,
3973    _guard: OwnedMutexGuard<()>,
3974    _not_send: PhantomData<Rc<()>>,
3975}
3976
3977impl<T> Deref for RoomGuard<T> {
3978    type Target = T;
3979
3980    fn deref(&self) -> &T {
3981        &self.data
3982    }
3983}
3984
3985impl<T> DerefMut for RoomGuard<T> {
3986    fn deref_mut(&mut self) -> &mut T {
3987        &mut self.data
3988    }
3989}
3990
3991#[derive(Debug, Serialize, Deserialize)]
3992pub struct NewUserParams {
3993    pub github_login: String,
3994    pub github_user_id: i32,
3995    pub invite_count: i32,
3996}
3997
3998#[derive(Debug)]
3999pub struct NewUserResult {
4000    pub user_id: UserId,
4001    pub metrics_id: String,
4002    pub inviting_user_id: Option<UserId>,
4003    pub signup_device_id: Option<String>,
4004}
4005
4006#[derive(FromQueryResult, Debug, PartialEq)]
4007pub struct Channel {
4008    pub id: ChannelId,
4009    pub name: String,
4010    pub user_is_admin: bool,
4011    pub parent_id: Option<ChannelId>,
4012}
4013
4014fn random_invite_code() -> String {
4015    nanoid::nanoid!(16)
4016}
4017
4018fn random_email_confirmation_code() -> String {
4019    nanoid::nanoid!(64)
4020}
4021
4022macro_rules! id_type {
4023    ($name:ident) => {
4024        #[derive(
4025            Clone,
4026            Copy,
4027            Debug,
4028            Default,
4029            PartialEq,
4030            Eq,
4031            PartialOrd,
4032            Ord,
4033            Hash,
4034            Serialize,
4035            Deserialize,
4036        )]
4037        #[serde(transparent)]
4038        pub struct $name(pub i32);
4039
4040        impl $name {
4041            #[allow(unused)]
4042            pub const MAX: Self = Self(i32::MAX);
4043
4044            #[allow(unused)]
4045            pub fn from_proto(value: u64) -> Self {
4046                Self(value as i32)
4047            }
4048
4049            #[allow(unused)]
4050            pub fn to_proto(self) -> u64 {
4051                self.0 as u64
4052            }
4053        }
4054
4055        impl std::fmt::Display for $name {
4056            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
4057                self.0.fmt(f)
4058            }
4059        }
4060
4061        impl From<$name> for sea_query::Value {
4062            fn from(value: $name) -> Self {
4063                sea_query::Value::Int(Some(value.0))
4064            }
4065        }
4066
4067        impl sea_orm::TryGetable for $name {
4068            fn try_get(
4069                res: &sea_orm::QueryResult,
4070                pre: &str,
4071                col: &str,
4072            ) -> Result<Self, sea_orm::TryGetError> {
4073                Ok(Self(i32::try_get(res, pre, col)?))
4074            }
4075        }
4076
4077        impl sea_query::ValueType for $name {
4078            fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
4079                match v {
4080                    Value::TinyInt(Some(int)) => {
4081                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4082                    }
4083                    Value::SmallInt(Some(int)) => {
4084                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4085                    }
4086                    Value::Int(Some(int)) => {
4087                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4088                    }
4089                    Value::BigInt(Some(int)) => {
4090                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4091                    }
4092                    Value::TinyUnsigned(Some(int)) => {
4093                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4094                    }
4095                    Value::SmallUnsigned(Some(int)) => {
4096                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4097                    }
4098                    Value::Unsigned(Some(int)) => {
4099                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4100                    }
4101                    Value::BigUnsigned(Some(int)) => {
4102                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4103                    }
4104                    _ => Err(sea_query::ValueTypeErr),
4105                }
4106            }
4107
4108            fn type_name() -> String {
4109                stringify!($name).into()
4110            }
4111
4112            fn array_type() -> sea_query::ArrayType {
4113                sea_query::ArrayType::Int
4114            }
4115
4116            fn column_type() -> sea_query::ColumnType {
4117                sea_query::ColumnType::Integer(None)
4118            }
4119        }
4120
4121        impl sea_orm::TryFromU64 for $name {
4122            fn try_from_u64(n: u64) -> Result<Self, DbErr> {
4123                Ok(Self(n.try_into().map_err(|_| {
4124                    DbErr::ConvertFromU64(concat!(
4125                        "error converting ",
4126                        stringify!($name),
4127                        " to u64"
4128                    ))
4129                })?))
4130            }
4131        }
4132
4133        impl sea_query::Nullable for $name {
4134            fn null() -> Value {
4135                Value::Int(None)
4136            }
4137        }
4138    };
4139}
4140
4141id_type!(AccessTokenId);
4142id_type!(ChannelId);
4143id_type!(ChannelMemberId);
4144id_type!(ContactId);
4145id_type!(FollowerId);
4146id_type!(RoomId);
4147id_type!(RoomParticipantId);
4148id_type!(ProjectId);
4149id_type!(ProjectCollaboratorId);
4150id_type!(ReplicaId);
4151id_type!(ServerId);
4152id_type!(SignupId);
4153id_type!(UserId);
4154
4155#[derive(Clone)]
4156pub struct JoinRoom {
4157    pub room: proto::Room,
4158    pub channel_id: Option<ChannelId>,
4159    pub channel_members: Vec<UserId>,
4160}
4161
4162pub struct RejoinedRoom {
4163    pub room: proto::Room,
4164    pub rejoined_projects: Vec<RejoinedProject>,
4165    pub reshared_projects: Vec<ResharedProject>,
4166    pub channel_id: Option<ChannelId>,
4167    pub channel_members: Vec<UserId>,
4168}
4169
4170pub struct ResharedProject {
4171    pub id: ProjectId,
4172    pub old_connection_id: ConnectionId,
4173    pub collaborators: Vec<ProjectCollaborator>,
4174    pub worktrees: Vec<proto::WorktreeMetadata>,
4175}
4176
4177pub struct RejoinedProject {
4178    pub id: ProjectId,
4179    pub old_connection_id: ConnectionId,
4180    pub collaborators: Vec<ProjectCollaborator>,
4181    pub worktrees: Vec<RejoinedWorktree>,
4182    pub language_servers: Vec<proto::LanguageServer>,
4183}
4184
4185#[derive(Debug)]
4186pub struct RejoinedWorktree {
4187    pub id: u64,
4188    pub abs_path: String,
4189    pub root_name: String,
4190    pub visible: bool,
4191    pub updated_entries: Vec<proto::Entry>,
4192    pub removed_entries: Vec<u64>,
4193    pub updated_repositories: Vec<proto::RepositoryEntry>,
4194    pub removed_repositories: Vec<u64>,
4195    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
4196    pub settings_files: Vec<WorktreeSettingsFile>,
4197    pub scan_id: u64,
4198    pub completed_scan_id: u64,
4199}
4200
4201pub struct LeftRoom {
4202    pub room: proto::Room,
4203    pub channel_id: Option<ChannelId>,
4204    pub channel_members: Vec<UserId>,
4205    pub left_projects: HashMap<ProjectId, LeftProject>,
4206    pub canceled_calls_to_user_ids: Vec<UserId>,
4207    pub deleted: bool,
4208}
4209
4210pub struct RefreshedRoom {
4211    pub room: proto::Room,
4212    pub channel_id: Option<ChannelId>,
4213    pub channel_members: Vec<UserId>,
4214    pub stale_participant_user_ids: Vec<UserId>,
4215    pub canceled_calls_to_user_ids: Vec<UserId>,
4216}
4217
4218pub struct Project {
4219    pub collaborators: Vec<ProjectCollaborator>,
4220    pub worktrees: BTreeMap<u64, Worktree>,
4221    pub language_servers: Vec<proto::LanguageServer>,
4222}
4223
4224pub struct ProjectCollaborator {
4225    pub connection_id: ConnectionId,
4226    pub user_id: UserId,
4227    pub replica_id: ReplicaId,
4228    pub is_host: bool,
4229}
4230
4231impl ProjectCollaborator {
4232    pub fn to_proto(&self) -> proto::Collaborator {
4233        proto::Collaborator {
4234            peer_id: Some(self.connection_id.into()),
4235            replica_id: self.replica_id.0 as u32,
4236            user_id: self.user_id.to_proto(),
4237        }
4238    }
4239}
4240
4241#[derive(Debug)]
4242pub struct LeftProject {
4243    pub id: ProjectId,
4244    pub host_user_id: UserId,
4245    pub host_connection_id: ConnectionId,
4246    pub connection_ids: Vec<ConnectionId>,
4247}
4248
4249pub struct Worktree {
4250    pub id: u64,
4251    pub abs_path: String,
4252    pub root_name: String,
4253    pub visible: bool,
4254    pub entries: Vec<proto::Entry>,
4255    pub repository_entries: BTreeMap<u64, proto::RepositoryEntry>,
4256    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
4257    pub settings_files: Vec<WorktreeSettingsFile>,
4258    pub scan_id: u64,
4259    pub completed_scan_id: u64,
4260}
4261
4262#[derive(Debug)]
4263pub struct WorktreeSettingsFile {
4264    pub path: String,
4265    pub content: String,
4266}
4267
4268#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
4269enum QueryUserIds {
4270    UserId,
4271}
4272
4273#[cfg(test)]
4274pub use test::*;
4275
4276#[cfg(test)]
4277mod test {
4278    use super::*;
4279    use gpui::executor::Background;
4280    use parking_lot::Mutex;
4281    use sea_orm::ConnectionTrait;
4282    use sqlx::migrate::MigrateDatabase;
4283    use std::sync::Arc;
4284
4285    pub struct TestDb {
4286        pub db: Option<Arc<Database>>,
4287        pub connection: Option<sqlx::AnyConnection>,
4288    }
4289
4290    impl TestDb {
4291        pub fn sqlite(background: Arc<Background>) -> Self {
4292            let url = format!("sqlite::memory:");
4293            let runtime = tokio::runtime::Builder::new_current_thread()
4294                .enable_io()
4295                .enable_time()
4296                .build()
4297                .unwrap();
4298
4299            let mut db = runtime.block_on(async {
4300                let mut options = ConnectOptions::new(url);
4301                options.max_connections(5);
4302                let db = Database::new(options, Executor::Deterministic(background))
4303                    .await
4304                    .unwrap();
4305                let sql = include_str!(concat!(
4306                    env!("CARGO_MANIFEST_DIR"),
4307                    "/migrations.sqlite/20221109000000_test_schema.sql"
4308                ));
4309                db.pool
4310                    .execute(sea_orm::Statement::from_string(
4311                        db.pool.get_database_backend(),
4312                        sql.into(),
4313                    ))
4314                    .await
4315                    .unwrap();
4316                db
4317            });
4318
4319            db.runtime = Some(runtime);
4320
4321            Self {
4322                db: Some(Arc::new(db)),
4323                connection: None,
4324            }
4325        }
4326
4327        pub fn postgres(background: Arc<Background>) -> Self {
4328            static LOCK: Mutex<()> = Mutex::new(());
4329
4330            let _guard = LOCK.lock();
4331            let mut rng = StdRng::from_entropy();
4332            let url = format!(
4333                "postgres://postgres@localhost/zed-test-{}",
4334                rng.gen::<u128>()
4335            );
4336            let runtime = tokio::runtime::Builder::new_current_thread()
4337                .enable_io()
4338                .enable_time()
4339                .build()
4340                .unwrap();
4341
4342            let mut db = runtime.block_on(async {
4343                sqlx::Postgres::create_database(&url)
4344                    .await
4345                    .expect("failed to create test db");
4346                let mut options = ConnectOptions::new(url);
4347                options
4348                    .max_connections(5)
4349                    .idle_timeout(Duration::from_secs(0));
4350                let db = Database::new(options, Executor::Deterministic(background))
4351                    .await
4352                    .unwrap();
4353                let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
4354                db.migrate(Path::new(migrations_path), false).await.unwrap();
4355                db
4356            });
4357
4358            db.runtime = Some(runtime);
4359
4360            Self {
4361                db: Some(Arc::new(db)),
4362                connection: None,
4363            }
4364        }
4365
4366        pub fn db(&self) -> &Arc<Database> {
4367            self.db.as_ref().unwrap()
4368        }
4369    }
4370
4371    impl Drop for TestDb {
4372        fn drop(&mut self) {
4373            let db = self.db.take().unwrap();
4374            if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
4375                db.runtime.as_ref().unwrap().block_on(async {
4376                    use util::ResultExt;
4377                    let query = "
4378                        SELECT pg_terminate_backend(pg_stat_activity.pid)
4379                        FROM pg_stat_activity
4380                        WHERE
4381                            pg_stat_activity.datname = current_database() AND
4382                            pid <> pg_backend_pid();
4383                    ";
4384                    db.pool
4385                        .execute(sea_orm::Statement::from_string(
4386                            db.pool.get_database_backend(),
4387                            query.into(),
4388                        ))
4389                        .await
4390                        .log_err();
4391                    sqlx::Postgres::drop_database(db.options.get_url())
4392                        .await
4393                        .log_err();
4394                })
4395            }
4396        }
4397    }
4398}