db.rs

   1mod access_token;
   2mod channel;
   3mod channel_member;
   4mod channel_path;
   5mod contact;
   6mod follower;
   7mod language_server;
   8mod project;
   9mod project_collaborator;
  10mod room;
  11mod room_participant;
  12mod server;
  13mod signup;
  14#[cfg(test)]
  15mod tests;
  16mod user;
  17mod worktree;
  18mod worktree_diagnostic_summary;
  19mod worktree_entry;
  20mod worktree_repository;
  21mod worktree_repository_statuses;
  22mod worktree_settings_file;
  23
  24use crate::executor::Executor;
  25use crate::{Error, Result};
  26use anyhow::anyhow;
  27use collections::{BTreeMap, HashMap, HashSet};
  28pub use contact::Contact;
  29use dashmap::DashMap;
  30use futures::StreamExt;
  31use hyper::StatusCode;
  32use rand::prelude::StdRng;
  33use rand::{Rng, SeedableRng};
  34use rpc::{proto, ConnectionId};
  35use sea_orm::Condition;
  36pub use sea_orm::ConnectOptions;
  37use sea_orm::{
  38    entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
  39    DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
  40    Statement, TransactionTrait,
  41};
  42use sea_query::{Alias, Expr, OnConflict, Query};
  43use serde::{Deserialize, Serialize};
  44pub use signup::{Invite, NewSignup, WaitlistSummary};
  45use sqlx::migrate::{Migrate, Migration, MigrationSource};
  46use sqlx::Connection;
  47use std::fmt::Write as _;
  48use std::ops::{Deref, DerefMut};
  49use std::path::Path;
  50use std::time::Duration;
  51use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
  52use tokio::sync::{Mutex, OwnedMutexGuard};
  53pub use user::Model as User;
  54
  55pub struct Database {
  56    options: ConnectOptions,
  57    pool: DatabaseConnection,
  58    rooms: DashMap<RoomId, Arc<Mutex<()>>>,
  59    rng: Mutex<StdRng>,
  60    executor: Executor,
  61    #[cfg(test)]
  62    runtime: Option<tokio::runtime::Runtime>,
  63}
  64
  65impl Database {
  66    pub async fn new(options: ConnectOptions, executor: Executor) -> Result<Self> {
  67        Ok(Self {
  68            options: options.clone(),
  69            pool: sea_orm::Database::connect(options).await?,
  70            rooms: DashMap::with_capacity(16384),
  71            rng: Mutex::new(StdRng::seed_from_u64(0)),
  72            executor,
  73            #[cfg(test)]
  74            runtime: None,
  75        })
  76    }
  77
  78    #[cfg(test)]
  79    pub fn reset(&self) {
  80        self.rooms.clear();
  81    }
  82
  83    pub async fn migrate(
  84        &self,
  85        migrations_path: &Path,
  86        ignore_checksum_mismatch: bool,
  87    ) -> anyhow::Result<Vec<(Migration, Duration)>> {
  88        let migrations = MigrationSource::resolve(migrations_path)
  89            .await
  90            .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
  91
  92        let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
  93
  94        connection.ensure_migrations_table().await?;
  95        let applied_migrations: HashMap<_, _> = connection
  96            .list_applied_migrations()
  97            .await?
  98            .into_iter()
  99            .map(|m| (m.version, m))
 100            .collect();
 101
 102        let mut new_migrations = Vec::new();
 103        for migration in migrations {
 104            match applied_migrations.get(&migration.version) {
 105                Some(applied_migration) => {
 106                    if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
 107                    {
 108                        Err(anyhow!(
 109                            "checksum mismatch for applied migration {}",
 110                            migration.description
 111                        ))?;
 112                    }
 113                }
 114                None => {
 115                    let elapsed = connection.apply(&migration).await?;
 116                    new_migrations.push((migration, elapsed));
 117                }
 118            }
 119        }
 120
 121        Ok(new_migrations)
 122    }
 123
 124    pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
 125        self.transaction(|tx| async move {
 126            let server = server::ActiveModel {
 127                environment: ActiveValue::set(environment.into()),
 128                ..Default::default()
 129            }
 130            .insert(&*tx)
 131            .await?;
 132            Ok(server.id)
 133        })
 134        .await
 135    }
 136
 137    pub async fn stale_room_ids(
 138        &self,
 139        environment: &str,
 140        new_server_id: ServerId,
 141    ) -> Result<Vec<RoomId>> {
 142        self.transaction(|tx| async move {
 143            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 144            enum QueryAs {
 145                RoomId,
 146            }
 147
 148            let stale_server_epochs = self
 149                .stale_server_ids(environment, new_server_id, &tx)
 150                .await?;
 151            Ok(room_participant::Entity::find()
 152                .select_only()
 153                .column(room_participant::Column::RoomId)
 154                .distinct()
 155                .filter(
 156                    room_participant::Column::AnsweringConnectionServerId
 157                        .is_in(stale_server_epochs),
 158                )
 159                .into_values::<_, QueryAs>()
 160                .all(&*tx)
 161                .await?)
 162        })
 163        .await
 164    }
 165
 166    pub async fn refresh_room(
 167        &self,
 168        room_id: RoomId,
 169        new_server_id: ServerId,
 170    ) -> Result<RoomGuard<RefreshedRoom>> {
 171        self.room_transaction(room_id, |tx| async move {
 172            let stale_participant_filter = Condition::all()
 173                .add(room_participant::Column::RoomId.eq(room_id))
 174                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
 175                .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
 176
 177            let stale_participant_user_ids = room_participant::Entity::find()
 178                .filter(stale_participant_filter.clone())
 179                .all(&*tx)
 180                .await?
 181                .into_iter()
 182                .map(|participant| participant.user_id)
 183                .collect::<Vec<_>>();
 184
 185            // Delete participants who failed to reconnect and cancel their calls.
 186            let mut canceled_calls_to_user_ids = Vec::new();
 187            room_participant::Entity::delete_many()
 188                .filter(stale_participant_filter)
 189                .exec(&*tx)
 190                .await?;
 191            let called_participants = room_participant::Entity::find()
 192                .filter(
 193                    Condition::all()
 194                        .add(
 195                            room_participant::Column::CallingUserId
 196                                .is_in(stale_participant_user_ids.iter().copied()),
 197                        )
 198                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
 199                )
 200                .all(&*tx)
 201                .await?;
 202            room_participant::Entity::delete_many()
 203                .filter(
 204                    room_participant::Column::Id
 205                        .is_in(called_participants.iter().map(|participant| participant.id)),
 206                )
 207                .exec(&*tx)
 208                .await?;
 209            canceled_calls_to_user_ids.extend(
 210                called_participants
 211                    .into_iter()
 212                    .map(|participant| participant.user_id),
 213            );
 214
 215            let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
 216            let channel_members;
 217            if let Some(channel_id) = channel_id {
 218                channel_members = self.get_channel_members_internal(channel_id, &tx).await?;
 219            } else {
 220                channel_members = Vec::new();
 221
 222                // Delete the room if it becomes empty.
 223                if room.participants.is_empty() {
 224                    project::Entity::delete_many()
 225                        .filter(project::Column::RoomId.eq(room_id))
 226                        .exec(&*tx)
 227                        .await?;
 228                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
 229                }
 230            };
 231
 232            Ok(RefreshedRoom {
 233                room,
 234                channel_id,
 235                channel_members,
 236                stale_participant_user_ids,
 237                canceled_calls_to_user_ids,
 238            })
 239        })
 240        .await
 241    }
 242
 243    pub async fn delete_stale_servers(
 244        &self,
 245        environment: &str,
 246        new_server_id: ServerId,
 247    ) -> Result<()> {
 248        self.transaction(|tx| async move {
 249            server::Entity::delete_many()
 250                .filter(
 251                    Condition::all()
 252                        .add(server::Column::Environment.eq(environment))
 253                        .add(server::Column::Id.ne(new_server_id)),
 254                )
 255                .exec(&*tx)
 256                .await?;
 257            Ok(())
 258        })
 259        .await
 260    }
 261
 262    async fn stale_server_ids(
 263        &self,
 264        environment: &str,
 265        new_server_id: ServerId,
 266        tx: &DatabaseTransaction,
 267    ) -> Result<Vec<ServerId>> {
 268        let stale_servers = server::Entity::find()
 269            .filter(
 270                Condition::all()
 271                    .add(server::Column::Environment.eq(environment))
 272                    .add(server::Column::Id.ne(new_server_id)),
 273            )
 274            .all(&*tx)
 275            .await?;
 276        Ok(stale_servers.into_iter().map(|server| server.id).collect())
 277    }
 278
 279    // users
 280
 281    pub async fn create_user(
 282        &self,
 283        email_address: &str,
 284        admin: bool,
 285        params: NewUserParams,
 286    ) -> Result<NewUserResult> {
 287        self.transaction(|tx| async {
 288            let tx = tx;
 289            let user = user::Entity::insert(user::ActiveModel {
 290                email_address: ActiveValue::set(Some(email_address.into())),
 291                github_login: ActiveValue::set(params.github_login.clone()),
 292                github_user_id: ActiveValue::set(Some(params.github_user_id)),
 293                admin: ActiveValue::set(admin),
 294                metrics_id: ActiveValue::set(Uuid::new_v4()),
 295                ..Default::default()
 296            })
 297            .on_conflict(
 298                OnConflict::column(user::Column::GithubLogin)
 299                    .update_column(user::Column::GithubLogin)
 300                    .to_owned(),
 301            )
 302            .exec_with_returning(&*tx)
 303            .await?;
 304
 305            Ok(NewUserResult {
 306                user_id: user.id,
 307                metrics_id: user.metrics_id.to_string(),
 308                signup_device_id: None,
 309                inviting_user_id: None,
 310            })
 311        })
 312        .await
 313    }
 314
 315    pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
 316        self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
 317            .await
 318    }
 319
 320    pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
 321        self.transaction(|tx| async {
 322            let tx = tx;
 323            Ok(user::Entity::find()
 324                .filter(user::Column::Id.is_in(ids.iter().copied()))
 325                .all(&*tx)
 326                .await?)
 327        })
 328        .await
 329    }
 330
 331    pub async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>> {
 332        self.transaction(|tx| async move {
 333            Ok(user::Entity::find()
 334                .filter(user::Column::GithubLogin.eq(github_login))
 335                .one(&*tx)
 336                .await?)
 337        })
 338        .await
 339    }
 340
 341    pub async fn get_or_create_user_by_github_account(
 342        &self,
 343        github_login: &str,
 344        github_user_id: Option<i32>,
 345        github_email: Option<&str>,
 346    ) -> Result<Option<User>> {
 347        self.transaction(|tx| async move {
 348            let tx = &*tx;
 349            if let Some(github_user_id) = github_user_id {
 350                if let Some(user_by_github_user_id) = user::Entity::find()
 351                    .filter(user::Column::GithubUserId.eq(github_user_id))
 352                    .one(tx)
 353                    .await?
 354                {
 355                    let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
 356                    user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
 357                    Ok(Some(user_by_github_user_id.update(tx).await?))
 358                } else if let Some(user_by_github_login) = user::Entity::find()
 359                    .filter(user::Column::GithubLogin.eq(github_login))
 360                    .one(tx)
 361                    .await?
 362                {
 363                    let mut user_by_github_login = user_by_github_login.into_active_model();
 364                    user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
 365                    Ok(Some(user_by_github_login.update(tx).await?))
 366                } else {
 367                    let user = user::Entity::insert(user::ActiveModel {
 368                        email_address: ActiveValue::set(github_email.map(|email| email.into())),
 369                        github_login: ActiveValue::set(github_login.into()),
 370                        github_user_id: ActiveValue::set(Some(github_user_id)),
 371                        admin: ActiveValue::set(false),
 372                        invite_count: ActiveValue::set(0),
 373                        invite_code: ActiveValue::set(None),
 374                        metrics_id: ActiveValue::set(Uuid::new_v4()),
 375                        ..Default::default()
 376                    })
 377                    .exec_with_returning(&*tx)
 378                    .await?;
 379                    Ok(Some(user))
 380                }
 381            } else {
 382                Ok(user::Entity::find()
 383                    .filter(user::Column::GithubLogin.eq(github_login))
 384                    .one(tx)
 385                    .await?)
 386            }
 387        })
 388        .await
 389    }
 390
 391    pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
 392        self.transaction(|tx| async move {
 393            Ok(user::Entity::find()
 394                .order_by_asc(user::Column::GithubLogin)
 395                .limit(limit as u64)
 396                .offset(page as u64 * limit as u64)
 397                .all(&*tx)
 398                .await?)
 399        })
 400        .await
 401    }
 402
 403    pub async fn get_users_with_no_invites(
 404        &self,
 405        invited_by_another_user: bool,
 406    ) -> Result<Vec<User>> {
 407        self.transaction(|tx| async move {
 408            Ok(user::Entity::find()
 409                .filter(
 410                    user::Column::InviteCount
 411                        .eq(0)
 412                        .and(if invited_by_another_user {
 413                            user::Column::InviterId.is_not_null()
 414                        } else {
 415                            user::Column::InviterId.is_null()
 416                        }),
 417                )
 418                .all(&*tx)
 419                .await?)
 420        })
 421        .await
 422    }
 423
 424    pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
 425        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
 426        enum QueryAs {
 427            MetricsId,
 428        }
 429
 430        self.transaction(|tx| async move {
 431            let metrics_id: Uuid = user::Entity::find_by_id(id)
 432                .select_only()
 433                .column(user::Column::MetricsId)
 434                .into_values::<_, QueryAs>()
 435                .one(&*tx)
 436                .await?
 437                .ok_or_else(|| anyhow!("could not find user"))?;
 438            Ok(metrics_id.to_string())
 439        })
 440        .await
 441    }
 442
 443    pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
 444        self.transaction(|tx| async move {
 445            user::Entity::update_many()
 446                .filter(user::Column::Id.eq(id))
 447                .set(user::ActiveModel {
 448                    admin: ActiveValue::set(is_admin),
 449                    ..Default::default()
 450                })
 451                .exec(&*tx)
 452                .await?;
 453            Ok(())
 454        })
 455        .await
 456    }
 457
 458    pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
 459        self.transaction(|tx| async move {
 460            user::Entity::update_many()
 461                .filter(user::Column::Id.eq(id))
 462                .set(user::ActiveModel {
 463                    connected_once: ActiveValue::set(connected_once),
 464                    ..Default::default()
 465                })
 466                .exec(&*tx)
 467                .await?;
 468            Ok(())
 469        })
 470        .await
 471    }
 472
 473    pub async fn destroy_user(&self, id: UserId) -> Result<()> {
 474        self.transaction(|tx| async move {
 475            access_token::Entity::delete_many()
 476                .filter(access_token::Column::UserId.eq(id))
 477                .exec(&*tx)
 478                .await?;
 479            user::Entity::delete_by_id(id).exec(&*tx).await?;
 480            Ok(())
 481        })
 482        .await
 483    }
 484
 485    // contacts
 486
 487    pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
 488        #[derive(Debug, FromQueryResult)]
 489        struct ContactWithUserBusyStatuses {
 490            user_id_a: UserId,
 491            user_id_b: UserId,
 492            a_to_b: bool,
 493            accepted: bool,
 494            should_notify: bool,
 495            user_a_busy: bool,
 496            user_b_busy: bool,
 497        }
 498
 499        self.transaction(|tx| async move {
 500            let user_a_participant = Alias::new("user_a_participant");
 501            let user_b_participant = Alias::new("user_b_participant");
 502            let mut db_contacts = contact::Entity::find()
 503                .column_as(
 504                    Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
 505                        .is_not_null(),
 506                    "user_a_busy",
 507                )
 508                .column_as(
 509                    Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
 510                        .is_not_null(),
 511                    "user_b_busy",
 512                )
 513                .filter(
 514                    contact::Column::UserIdA
 515                        .eq(user_id)
 516                        .or(contact::Column::UserIdB.eq(user_id)),
 517                )
 518                .join_as(
 519                    JoinType::LeftJoin,
 520                    contact::Relation::UserARoomParticipant.def(),
 521                    user_a_participant,
 522                )
 523                .join_as(
 524                    JoinType::LeftJoin,
 525                    contact::Relation::UserBRoomParticipant.def(),
 526                    user_b_participant,
 527                )
 528                .into_model::<ContactWithUserBusyStatuses>()
 529                .stream(&*tx)
 530                .await?;
 531
 532            let mut contacts = Vec::new();
 533            while let Some(db_contact) = db_contacts.next().await {
 534                let db_contact = db_contact?;
 535                if db_contact.user_id_a == user_id {
 536                    if db_contact.accepted {
 537                        contacts.push(Contact::Accepted {
 538                            user_id: db_contact.user_id_b,
 539                            should_notify: db_contact.should_notify && db_contact.a_to_b,
 540                            busy: db_contact.user_b_busy,
 541                        });
 542                    } else if db_contact.a_to_b {
 543                        contacts.push(Contact::Outgoing {
 544                            user_id: db_contact.user_id_b,
 545                        })
 546                    } else {
 547                        contacts.push(Contact::Incoming {
 548                            user_id: db_contact.user_id_b,
 549                            should_notify: db_contact.should_notify,
 550                        });
 551                    }
 552                } else if db_contact.accepted {
 553                    contacts.push(Contact::Accepted {
 554                        user_id: db_contact.user_id_a,
 555                        should_notify: db_contact.should_notify && !db_contact.a_to_b,
 556                        busy: db_contact.user_a_busy,
 557                    });
 558                } else if db_contact.a_to_b {
 559                    contacts.push(Contact::Incoming {
 560                        user_id: db_contact.user_id_a,
 561                        should_notify: db_contact.should_notify,
 562                    });
 563                } else {
 564                    contacts.push(Contact::Outgoing {
 565                        user_id: db_contact.user_id_a,
 566                    });
 567                }
 568            }
 569
 570            contacts.sort_unstable_by_key(|contact| contact.user_id());
 571
 572            Ok(contacts)
 573        })
 574        .await
 575    }
 576
 577    pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
 578        self.transaction(|tx| async move {
 579            let participant = room_participant::Entity::find()
 580                .filter(room_participant::Column::UserId.eq(user_id))
 581                .one(&*tx)
 582                .await?;
 583            Ok(participant.is_some())
 584        })
 585        .await
 586    }
 587
 588    pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
 589        self.transaction(|tx| async move {
 590            let (id_a, id_b) = if user_id_1 < user_id_2 {
 591                (user_id_1, user_id_2)
 592            } else {
 593                (user_id_2, user_id_1)
 594            };
 595
 596            Ok(contact::Entity::find()
 597                .filter(
 598                    contact::Column::UserIdA
 599                        .eq(id_a)
 600                        .and(contact::Column::UserIdB.eq(id_b))
 601                        .and(contact::Column::Accepted.eq(true)),
 602                )
 603                .one(&*tx)
 604                .await?
 605                .is_some())
 606        })
 607        .await
 608    }
 609
 610    pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
 611        self.transaction(|tx| async move {
 612            let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
 613                (sender_id, receiver_id, true)
 614            } else {
 615                (receiver_id, sender_id, false)
 616            };
 617
 618            let rows_affected = contact::Entity::insert(contact::ActiveModel {
 619                user_id_a: ActiveValue::set(id_a),
 620                user_id_b: ActiveValue::set(id_b),
 621                a_to_b: ActiveValue::set(a_to_b),
 622                accepted: ActiveValue::set(false),
 623                should_notify: ActiveValue::set(true),
 624                ..Default::default()
 625            })
 626            .on_conflict(
 627                OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
 628                    .values([
 629                        (contact::Column::Accepted, true.into()),
 630                        (contact::Column::ShouldNotify, false.into()),
 631                    ])
 632                    .action_and_where(
 633                        contact::Column::Accepted.eq(false).and(
 634                            contact::Column::AToB
 635                                .eq(a_to_b)
 636                                .and(contact::Column::UserIdA.eq(id_b))
 637                                .or(contact::Column::AToB
 638                                    .ne(a_to_b)
 639                                    .and(contact::Column::UserIdA.eq(id_a))),
 640                        ),
 641                    )
 642                    .to_owned(),
 643            )
 644            .exec_without_returning(&*tx)
 645            .await?;
 646
 647            if rows_affected == 1 {
 648                Ok(())
 649            } else {
 650                Err(anyhow!("contact already requested"))?
 651            }
 652        })
 653        .await
 654    }
 655
 656    /// Returns a bool indicating whether the removed contact had originally accepted or not
 657    ///
 658    /// Deletes the contact identified by the requester and responder ids, and then returns
 659    /// whether the deleted contact had originally accepted or was a pending contact request.
 660    ///
 661    /// # Arguments
 662    ///
 663    /// * `requester_id` - The user that initiates this request
 664    /// * `responder_id` - The user that will be removed
 665    pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<bool> {
 666        self.transaction(|tx| async move {
 667            let (id_a, id_b) = if responder_id < requester_id {
 668                (responder_id, requester_id)
 669            } else {
 670                (requester_id, responder_id)
 671            };
 672
 673            let contact = contact::Entity::find()
 674                .filter(
 675                    contact::Column::UserIdA
 676                        .eq(id_a)
 677                        .and(contact::Column::UserIdB.eq(id_b)),
 678                )
 679                .one(&*tx)
 680                .await?
 681                .ok_or_else(|| anyhow!("no such contact"))?;
 682
 683            contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
 684            Ok(contact.accepted)
 685        })
 686        .await
 687    }
 688
 689    pub async fn dismiss_contact_notification(
 690        &self,
 691        user_id: UserId,
 692        contact_user_id: UserId,
 693    ) -> Result<()> {
 694        self.transaction(|tx| async move {
 695            let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
 696                (user_id, contact_user_id, true)
 697            } else {
 698                (contact_user_id, user_id, false)
 699            };
 700
 701            let result = contact::Entity::update_many()
 702                .set(contact::ActiveModel {
 703                    should_notify: ActiveValue::set(false),
 704                    ..Default::default()
 705                })
 706                .filter(
 707                    contact::Column::UserIdA
 708                        .eq(id_a)
 709                        .and(contact::Column::UserIdB.eq(id_b))
 710                        .and(
 711                            contact::Column::AToB
 712                                .eq(a_to_b)
 713                                .and(contact::Column::Accepted.eq(true))
 714                                .or(contact::Column::AToB
 715                                    .ne(a_to_b)
 716                                    .and(contact::Column::Accepted.eq(false))),
 717                        ),
 718                )
 719                .exec(&*tx)
 720                .await?;
 721            if result.rows_affected == 0 {
 722                Err(anyhow!("no such contact request"))?
 723            } else {
 724                Ok(())
 725            }
 726        })
 727        .await
 728    }
 729
 730    pub async fn respond_to_contact_request(
 731        &self,
 732        responder_id: UserId,
 733        requester_id: UserId,
 734        accept: bool,
 735    ) -> Result<()> {
 736        self.transaction(|tx| async move {
 737            let (id_a, id_b, a_to_b) = if responder_id < requester_id {
 738                (responder_id, requester_id, false)
 739            } else {
 740                (requester_id, responder_id, true)
 741            };
 742            let rows_affected = if accept {
 743                let result = contact::Entity::update_many()
 744                    .set(contact::ActiveModel {
 745                        accepted: ActiveValue::set(true),
 746                        should_notify: ActiveValue::set(true),
 747                        ..Default::default()
 748                    })
 749                    .filter(
 750                        contact::Column::UserIdA
 751                            .eq(id_a)
 752                            .and(contact::Column::UserIdB.eq(id_b))
 753                            .and(contact::Column::AToB.eq(a_to_b)),
 754                    )
 755                    .exec(&*tx)
 756                    .await?;
 757                result.rows_affected
 758            } else {
 759                let result = contact::Entity::delete_many()
 760                    .filter(
 761                        contact::Column::UserIdA
 762                            .eq(id_a)
 763                            .and(contact::Column::UserIdB.eq(id_b))
 764                            .and(contact::Column::AToB.eq(a_to_b))
 765                            .and(contact::Column::Accepted.eq(false)),
 766                    )
 767                    .exec(&*tx)
 768                    .await?;
 769
 770                result.rows_affected
 771            };
 772
 773            if rows_affected == 1 {
 774                Ok(())
 775            } else {
 776                Err(anyhow!("no such contact request"))?
 777            }
 778        })
 779        .await
 780    }
 781
 782    pub fn fuzzy_like_string(string: &str) -> String {
 783        let mut result = String::with_capacity(string.len() * 2 + 1);
 784        for c in string.chars() {
 785            if c.is_alphanumeric() {
 786                result.push('%');
 787                result.push(c);
 788            }
 789        }
 790        result.push('%');
 791        result
 792    }
 793
 794    pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
 795        self.transaction(|tx| async {
 796            let tx = tx;
 797            let like_string = Self::fuzzy_like_string(name_query);
 798            let query = "
 799                SELECT users.*
 800                FROM users
 801                WHERE github_login ILIKE $1
 802                ORDER BY github_login <-> $2
 803                LIMIT $3
 804            ";
 805
 806            Ok(user::Entity::find()
 807                .from_raw_sql(Statement::from_sql_and_values(
 808                    self.pool.get_database_backend(),
 809                    query.into(),
 810                    vec![like_string.into(), name_query.into(), limit.into()],
 811                ))
 812                .all(&*tx)
 813                .await?)
 814        })
 815        .await
 816    }
 817
 818    // signups
 819
 820    pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
 821        self.transaction(|tx| async move {
 822            signup::Entity::insert(signup::ActiveModel {
 823                email_address: ActiveValue::set(signup.email_address.clone()),
 824                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 825                email_confirmation_sent: ActiveValue::set(false),
 826                platform_mac: ActiveValue::set(signup.platform_mac),
 827                platform_windows: ActiveValue::set(signup.platform_windows),
 828                platform_linux: ActiveValue::set(signup.platform_linux),
 829                platform_unknown: ActiveValue::set(false),
 830                editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
 831                programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
 832                device_id: ActiveValue::set(signup.device_id.clone()),
 833                added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
 834                ..Default::default()
 835            })
 836            .on_conflict(
 837                OnConflict::column(signup::Column::EmailAddress)
 838                    .update_columns([
 839                        signup::Column::PlatformMac,
 840                        signup::Column::PlatformWindows,
 841                        signup::Column::PlatformLinux,
 842                        signup::Column::EditorFeatures,
 843                        signup::Column::ProgrammingLanguages,
 844                        signup::Column::DeviceId,
 845                        signup::Column::AddedToMailingList,
 846                    ])
 847                    .to_owned(),
 848            )
 849            .exec(&*tx)
 850            .await?;
 851            Ok(())
 852        })
 853        .await
 854    }
 855
 856    pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
 857        self.transaction(|tx| async move {
 858            let signup = signup::Entity::find()
 859                .filter(signup::Column::EmailAddress.eq(email_address))
 860                .one(&*tx)
 861                .await?
 862                .ok_or_else(|| {
 863                    anyhow!("signup with email address {} doesn't exist", email_address)
 864                })?;
 865
 866            Ok(signup)
 867        })
 868        .await
 869    }
 870
 871    pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
 872        self.transaction(|tx| async move {
 873            let query = "
 874                SELECT
 875                    COUNT(*) as count,
 876                    COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
 877                    COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
 878                    COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
 879                    COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
 880                FROM (
 881                    SELECT *
 882                    FROM signups
 883                    WHERE
 884                        NOT email_confirmation_sent
 885                ) AS unsent
 886            ";
 887            Ok(
 888                WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
 889                    self.pool.get_database_backend(),
 890                    query.into(),
 891                    vec![],
 892                ))
 893                .one(&*tx)
 894                .await?
 895                .ok_or_else(|| anyhow!("invalid result"))?,
 896            )
 897        })
 898        .await
 899    }
 900
 901    pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
 902        let emails = invites
 903            .iter()
 904            .map(|s| s.email_address.as_str())
 905            .collect::<Vec<_>>();
 906        self.transaction(|tx| async {
 907            let tx = tx;
 908            signup::Entity::update_many()
 909                .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
 910                .set(signup::ActiveModel {
 911                    email_confirmation_sent: ActiveValue::set(true),
 912                    ..Default::default()
 913                })
 914                .exec(&*tx)
 915                .await?;
 916            Ok(())
 917        })
 918        .await
 919    }
 920
 921    pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
 922        self.transaction(|tx| async move {
 923            Ok(signup::Entity::find()
 924                .select_only()
 925                .column(signup::Column::EmailAddress)
 926                .column(signup::Column::EmailConfirmationCode)
 927                .filter(
 928                    signup::Column::EmailConfirmationSent.eq(false).and(
 929                        signup::Column::PlatformMac
 930                            .eq(true)
 931                            .or(signup::Column::PlatformUnknown.eq(true)),
 932                    ),
 933                )
 934                .order_by_asc(signup::Column::CreatedAt)
 935                .limit(count as u64)
 936                .into_model()
 937                .all(&*tx)
 938                .await?)
 939        })
 940        .await
 941    }
 942
 943    // invite codes
 944
 945    pub async fn create_invite_from_code(
 946        &self,
 947        code: &str,
 948        email_address: &str,
 949        device_id: Option<&str>,
 950        added_to_mailing_list: bool,
 951    ) -> Result<Invite> {
 952        self.transaction(|tx| async move {
 953            let existing_user = user::Entity::find()
 954                .filter(user::Column::EmailAddress.eq(email_address))
 955                .one(&*tx)
 956                .await?;
 957
 958            if existing_user.is_some() {
 959                Err(anyhow!("email address is already in use"))?;
 960            }
 961
 962            let inviting_user_with_invites = match user::Entity::find()
 963                .filter(
 964                    user::Column::InviteCode
 965                        .eq(code)
 966                        .and(user::Column::InviteCount.gt(0)),
 967                )
 968                .one(&*tx)
 969                .await?
 970            {
 971                Some(inviting_user) => inviting_user,
 972                None => {
 973                    return Err(Error::Http(
 974                        StatusCode::UNAUTHORIZED,
 975                        "unable to find an invite code with invites remaining".to_string(),
 976                    ))?
 977                }
 978            };
 979            user::Entity::update_many()
 980                .filter(
 981                    user::Column::Id
 982                        .eq(inviting_user_with_invites.id)
 983                        .and(user::Column::InviteCount.gt(0)),
 984                )
 985                .col_expr(
 986                    user::Column::InviteCount,
 987                    Expr::col(user::Column::InviteCount).sub(1),
 988                )
 989                .exec(&*tx)
 990                .await?;
 991
 992            let signup = signup::Entity::insert(signup::ActiveModel {
 993                email_address: ActiveValue::set(email_address.into()),
 994                email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
 995                email_confirmation_sent: ActiveValue::set(false),
 996                inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
 997                platform_linux: ActiveValue::set(false),
 998                platform_mac: ActiveValue::set(false),
 999                platform_windows: ActiveValue::set(false),
1000                platform_unknown: ActiveValue::set(true),
1001                device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
1002                added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
1003                ..Default::default()
1004            })
1005            .on_conflict(
1006                OnConflict::column(signup::Column::EmailAddress)
1007                    .update_column(signup::Column::InvitingUserId)
1008                    .to_owned(),
1009            )
1010            .exec_with_returning(&*tx)
1011            .await?;
1012
1013            Ok(Invite {
1014                email_address: signup.email_address,
1015                email_confirmation_code: signup.email_confirmation_code,
1016            })
1017        })
1018        .await
1019    }
1020
1021    pub async fn create_user_from_invite(
1022        &self,
1023        invite: &Invite,
1024        user: NewUserParams,
1025    ) -> Result<Option<NewUserResult>> {
1026        self.transaction(|tx| async {
1027            let tx = tx;
1028            let signup = signup::Entity::find()
1029                .filter(
1030                    signup::Column::EmailAddress
1031                        .eq(invite.email_address.as_str())
1032                        .and(
1033                            signup::Column::EmailConfirmationCode
1034                                .eq(invite.email_confirmation_code.as_str()),
1035                        ),
1036                )
1037                .one(&*tx)
1038                .await?
1039                .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
1040
1041            if signup.user_id.is_some() {
1042                return Ok(None);
1043            }
1044
1045            let user = user::Entity::insert(user::ActiveModel {
1046                email_address: ActiveValue::set(Some(invite.email_address.clone())),
1047                github_login: ActiveValue::set(user.github_login.clone()),
1048                github_user_id: ActiveValue::set(Some(user.github_user_id)),
1049                admin: ActiveValue::set(false),
1050                invite_count: ActiveValue::set(user.invite_count),
1051                invite_code: ActiveValue::set(Some(random_invite_code())),
1052                metrics_id: ActiveValue::set(Uuid::new_v4()),
1053                ..Default::default()
1054            })
1055            .on_conflict(
1056                OnConflict::column(user::Column::GithubLogin)
1057                    .update_columns([
1058                        user::Column::EmailAddress,
1059                        user::Column::GithubUserId,
1060                        user::Column::Admin,
1061                    ])
1062                    .to_owned(),
1063            )
1064            .exec_with_returning(&*tx)
1065            .await?;
1066
1067            let mut signup = signup.into_active_model();
1068            signup.user_id = ActiveValue::set(Some(user.id));
1069            let signup = signup.update(&*tx).await?;
1070
1071            if let Some(inviting_user_id) = signup.inviting_user_id {
1072                let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1073                    (inviting_user_id, user.id, true)
1074                } else {
1075                    (user.id, inviting_user_id, false)
1076                };
1077
1078                contact::Entity::insert(contact::ActiveModel {
1079                    user_id_a: ActiveValue::set(user_id_a),
1080                    user_id_b: ActiveValue::set(user_id_b),
1081                    a_to_b: ActiveValue::set(a_to_b),
1082                    should_notify: ActiveValue::set(true),
1083                    accepted: ActiveValue::set(true),
1084                    ..Default::default()
1085                })
1086                .on_conflict(OnConflict::new().do_nothing().to_owned())
1087                .exec_without_returning(&*tx)
1088                .await?;
1089            }
1090
1091            Ok(Some(NewUserResult {
1092                user_id: user.id,
1093                metrics_id: user.metrics_id.to_string(),
1094                inviting_user_id: signup.inviting_user_id,
1095                signup_device_id: signup.device_id,
1096            }))
1097        })
1098        .await
1099    }
1100
1101    pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1102        self.transaction(|tx| async move {
1103            if count > 0 {
1104                user::Entity::update_many()
1105                    .filter(
1106                        user::Column::Id
1107                            .eq(id)
1108                            .and(user::Column::InviteCode.is_null()),
1109                    )
1110                    .set(user::ActiveModel {
1111                        invite_code: ActiveValue::set(Some(random_invite_code())),
1112                        ..Default::default()
1113                    })
1114                    .exec(&*tx)
1115                    .await?;
1116            }
1117
1118            user::Entity::update_many()
1119                .filter(user::Column::Id.eq(id))
1120                .set(user::ActiveModel {
1121                    invite_count: ActiveValue::set(count),
1122                    ..Default::default()
1123                })
1124                .exec(&*tx)
1125                .await?;
1126            Ok(())
1127        })
1128        .await
1129    }
1130
1131    pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1132        self.transaction(|tx| async move {
1133            match user::Entity::find_by_id(id).one(&*tx).await? {
1134                Some(user) if user.invite_code.is_some() => {
1135                    Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1136                }
1137                _ => Ok(None),
1138            }
1139        })
1140        .await
1141    }
1142
1143    pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1144        self.transaction(|tx| async move {
1145            user::Entity::find()
1146                .filter(user::Column::InviteCode.eq(code))
1147                .one(&*tx)
1148                .await?
1149                .ok_or_else(|| {
1150                    Error::Http(
1151                        StatusCode::NOT_FOUND,
1152                        "that invite code does not exist".to_string(),
1153                    )
1154                })
1155        })
1156        .await
1157    }
1158
1159    // rooms
1160
1161    pub async fn incoming_call_for_user(
1162        &self,
1163        user_id: UserId,
1164    ) -> Result<Option<proto::IncomingCall>> {
1165        self.transaction(|tx| async move {
1166            let pending_participant = room_participant::Entity::find()
1167                .filter(
1168                    room_participant::Column::UserId
1169                        .eq(user_id)
1170                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
1171                )
1172                .one(&*tx)
1173                .await?;
1174
1175            if let Some(pending_participant) = pending_participant {
1176                let room = self.get_room(pending_participant.room_id, &tx).await?;
1177                Ok(Self::build_incoming_call(&room, user_id))
1178            } else {
1179                Ok(None)
1180            }
1181        })
1182        .await
1183    }
1184
1185    pub async fn create_room(
1186        &self,
1187        user_id: UserId,
1188        connection: ConnectionId,
1189        live_kit_room: &str,
1190    ) -> Result<proto::Room> {
1191        self.transaction(|tx| async move {
1192            let room = room::ActiveModel {
1193                live_kit_room: ActiveValue::set(live_kit_room.into()),
1194                ..Default::default()
1195            }
1196            .insert(&*tx)
1197            .await?;
1198            room_participant::ActiveModel {
1199                room_id: ActiveValue::set(room.id),
1200                user_id: ActiveValue::set(user_id),
1201                answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1202                answering_connection_server_id: ActiveValue::set(Some(ServerId(
1203                    connection.owner_id as i32,
1204                ))),
1205                answering_connection_lost: ActiveValue::set(false),
1206                calling_user_id: ActiveValue::set(user_id),
1207                calling_connection_id: ActiveValue::set(connection.id as i32),
1208                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1209                    connection.owner_id as i32,
1210                ))),
1211                ..Default::default()
1212            }
1213            .insert(&*tx)
1214            .await?;
1215
1216            let room = self.get_room(room.id, &tx).await?;
1217            Ok(room)
1218        })
1219        .await
1220    }
1221
1222    pub async fn call(
1223        &self,
1224        room_id: RoomId,
1225        calling_user_id: UserId,
1226        calling_connection: ConnectionId,
1227        called_user_id: UserId,
1228        initial_project_id: Option<ProjectId>,
1229    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1230        self.room_transaction(room_id, |tx| async move {
1231            room_participant::ActiveModel {
1232                room_id: ActiveValue::set(room_id),
1233                user_id: ActiveValue::set(called_user_id),
1234                answering_connection_lost: ActiveValue::set(false),
1235                calling_user_id: ActiveValue::set(calling_user_id),
1236                calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1237                calling_connection_server_id: ActiveValue::set(Some(ServerId(
1238                    calling_connection.owner_id as i32,
1239                ))),
1240                initial_project_id: ActiveValue::set(initial_project_id),
1241                ..Default::default()
1242            }
1243            .insert(&*tx)
1244            .await?;
1245
1246            let room = self.get_room(room_id, &tx).await?;
1247            let incoming_call = Self::build_incoming_call(&room, called_user_id)
1248                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1249            Ok((room, incoming_call))
1250        })
1251        .await
1252    }
1253
1254    pub async fn call_failed(
1255        &self,
1256        room_id: RoomId,
1257        called_user_id: UserId,
1258    ) -> Result<RoomGuard<proto::Room>> {
1259        self.room_transaction(room_id, |tx| async move {
1260            room_participant::Entity::delete_many()
1261                .filter(
1262                    room_participant::Column::RoomId
1263                        .eq(room_id)
1264                        .and(room_participant::Column::UserId.eq(called_user_id)),
1265                )
1266                .exec(&*tx)
1267                .await?;
1268            let room = self.get_room(room_id, &tx).await?;
1269            Ok(room)
1270        })
1271        .await
1272    }
1273
1274    pub async fn decline_call(
1275        &self,
1276        expected_room_id: Option<RoomId>,
1277        user_id: UserId,
1278    ) -> Result<Option<RoomGuard<proto::Room>>> {
1279        self.optional_room_transaction(|tx| async move {
1280            let mut filter = Condition::all()
1281                .add(room_participant::Column::UserId.eq(user_id))
1282                .add(room_participant::Column::AnsweringConnectionId.is_null());
1283            if let Some(room_id) = expected_room_id {
1284                filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1285            }
1286            let participant = room_participant::Entity::find()
1287                .filter(filter)
1288                .one(&*tx)
1289                .await?;
1290
1291            let participant = if let Some(participant) = participant {
1292                participant
1293            } else if expected_room_id.is_some() {
1294                return Err(anyhow!("could not find call to decline"))?;
1295            } else {
1296                return Ok(None);
1297            };
1298
1299            let room_id = participant.room_id;
1300            room_participant::Entity::delete(participant.into_active_model())
1301                .exec(&*tx)
1302                .await?;
1303
1304            let room = self.get_room(room_id, &tx).await?;
1305            Ok(Some((room_id, room)))
1306        })
1307        .await
1308    }
1309
1310    pub async fn cancel_call(
1311        &self,
1312        room_id: RoomId,
1313        calling_connection: ConnectionId,
1314        called_user_id: UserId,
1315    ) -> Result<RoomGuard<proto::Room>> {
1316        self.room_transaction(room_id, |tx| async move {
1317            let participant = room_participant::Entity::find()
1318                .filter(
1319                    Condition::all()
1320                        .add(room_participant::Column::UserId.eq(called_user_id))
1321                        .add(room_participant::Column::RoomId.eq(room_id))
1322                        .add(
1323                            room_participant::Column::CallingConnectionId
1324                                .eq(calling_connection.id as i32),
1325                        )
1326                        .add(
1327                            room_participant::Column::CallingConnectionServerId
1328                                .eq(calling_connection.owner_id as i32),
1329                        )
1330                        .add(room_participant::Column::AnsweringConnectionId.is_null()),
1331                )
1332                .one(&*tx)
1333                .await?
1334                .ok_or_else(|| anyhow!("no call to cancel"))?;
1335
1336            room_participant::Entity::delete(participant.into_active_model())
1337                .exec(&*tx)
1338                .await?;
1339
1340            let room = self.get_room(room_id, &tx).await?;
1341            Ok(room)
1342        })
1343        .await
1344    }
1345
1346    pub async fn is_current_room_different_channel(
1347        &self,
1348        user_id: UserId,
1349        channel_id: ChannelId,
1350    ) -> Result<bool> {
1351        self.transaction(|tx| async move {
1352            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1353            enum QueryAs {
1354                ChannelId,
1355            }
1356
1357            let channel_id_model: Option<ChannelId> = room_participant::Entity::find()
1358                .select_only()
1359                .column_as(room::Column::ChannelId, QueryAs::ChannelId)
1360                .inner_join(room::Entity)
1361                .filter(room_participant::Column::UserId.eq(user_id))
1362                .into_values::<_, QueryAs>()
1363                .one(&*tx)
1364                .await?;
1365
1366            let result = channel_id_model
1367                .map(|channel_id_model| channel_id_model != channel_id)
1368                .unwrap_or(false);
1369
1370            Ok(result)
1371        })
1372        .await
1373    }
1374
1375    pub async fn join_room(
1376        &self,
1377        room_id: RoomId,
1378        user_id: UserId,
1379        connection: ConnectionId,
1380    ) -> Result<RoomGuard<JoinRoom>> {
1381        self.room_transaction(room_id, |tx| async move {
1382            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1383            enum QueryChannelId {
1384                ChannelId,
1385            }
1386            let channel_id: Option<ChannelId> = room::Entity::find()
1387                .select_only()
1388                .column(room::Column::ChannelId)
1389                .filter(room::Column::Id.eq(room_id))
1390                .into_values::<_, QueryChannelId>()
1391                .one(&*tx)
1392                .await?
1393                .ok_or_else(|| anyhow!("no such room"))?;
1394
1395            if let Some(channel_id) = channel_id {
1396                self.check_user_is_channel_member(channel_id, user_id, &*tx)
1397                    .await?;
1398
1399                room_participant::Entity::insert_many([room_participant::ActiveModel {
1400                    room_id: ActiveValue::set(room_id),
1401                    user_id: ActiveValue::set(user_id),
1402                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1403                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1404                        connection.owner_id as i32,
1405                    ))),
1406                    answering_connection_lost: ActiveValue::set(false),
1407                    calling_user_id: ActiveValue::set(user_id),
1408                    calling_connection_id: ActiveValue::set(connection.id as i32),
1409                    calling_connection_server_id: ActiveValue::set(Some(ServerId(
1410                        connection.owner_id as i32,
1411                    ))),
1412                    ..Default::default()
1413                }])
1414                .on_conflict(
1415                    OnConflict::columns([room_participant::Column::UserId])
1416                        .update_columns([
1417                            room_participant::Column::AnsweringConnectionId,
1418                            room_participant::Column::AnsweringConnectionServerId,
1419                            room_participant::Column::AnsweringConnectionLost,
1420                        ])
1421                        .to_owned(),
1422                )
1423                .exec(&*tx)
1424                .await?;
1425            } else {
1426                let result = room_participant::Entity::update_many()
1427                    .filter(
1428                        Condition::all()
1429                            .add(room_participant::Column::RoomId.eq(room_id))
1430                            .add(room_participant::Column::UserId.eq(user_id))
1431                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
1432                    )
1433                    .set(room_participant::ActiveModel {
1434                        answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1435                        answering_connection_server_id: ActiveValue::set(Some(ServerId(
1436                            connection.owner_id as i32,
1437                        ))),
1438                        answering_connection_lost: ActiveValue::set(false),
1439                        ..Default::default()
1440                    })
1441                    .exec(&*tx)
1442                    .await?;
1443                if result.rows_affected == 0 {
1444                    Err(anyhow!("room does not exist or was already joined"))?;
1445                }
1446            }
1447
1448            let room = self.get_room(room_id, &tx).await?;
1449            let channel_members = if let Some(channel_id) = channel_id {
1450                self.get_channel_members_internal(channel_id, &tx).await?
1451            } else {
1452                Vec::new()
1453            };
1454            Ok(JoinRoom {
1455                room,
1456                channel_id,
1457                channel_members,
1458            })
1459        })
1460        .await
1461    }
1462
1463    pub async fn rejoin_room(
1464        &self,
1465        rejoin_room: proto::RejoinRoom,
1466        user_id: UserId,
1467        connection: ConnectionId,
1468    ) -> Result<RoomGuard<RejoinedRoom>> {
1469        let room_id = RoomId::from_proto(rejoin_room.id);
1470        self.room_transaction(room_id, |tx| async {
1471            let tx = tx;
1472            let participant_update = room_participant::Entity::update_many()
1473                .filter(
1474                    Condition::all()
1475                        .add(room_participant::Column::RoomId.eq(room_id))
1476                        .add(room_participant::Column::UserId.eq(user_id))
1477                        .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1478                        .add(
1479                            Condition::any()
1480                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1481                                .add(
1482                                    room_participant::Column::AnsweringConnectionServerId
1483                                        .ne(connection.owner_id as i32),
1484                                ),
1485                        ),
1486                )
1487                .set(room_participant::ActiveModel {
1488                    answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1489                    answering_connection_server_id: ActiveValue::set(Some(ServerId(
1490                        connection.owner_id as i32,
1491                    ))),
1492                    answering_connection_lost: ActiveValue::set(false),
1493                    ..Default::default()
1494                })
1495                .exec(&*tx)
1496                .await?;
1497            if participant_update.rows_affected == 0 {
1498                return Err(anyhow!("room does not exist or was already joined"))?;
1499            }
1500
1501            let mut reshared_projects = Vec::new();
1502            for reshared_project in &rejoin_room.reshared_projects {
1503                let project_id = ProjectId::from_proto(reshared_project.project_id);
1504                let project = project::Entity::find_by_id(project_id)
1505                    .one(&*tx)
1506                    .await?
1507                    .ok_or_else(|| anyhow!("project does not exist"))?;
1508                if project.host_user_id != user_id {
1509                    return Err(anyhow!("no such project"))?;
1510                }
1511
1512                let mut collaborators = project
1513                    .find_related(project_collaborator::Entity)
1514                    .all(&*tx)
1515                    .await?;
1516                let host_ix = collaborators
1517                    .iter()
1518                    .position(|collaborator| {
1519                        collaborator.user_id == user_id && collaborator.is_host
1520                    })
1521                    .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1522                let host = collaborators.swap_remove(host_ix);
1523                let old_connection_id = host.connection();
1524
1525                project::Entity::update(project::ActiveModel {
1526                    host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1527                    host_connection_server_id: ActiveValue::set(Some(ServerId(
1528                        connection.owner_id as i32,
1529                    ))),
1530                    ..project.into_active_model()
1531                })
1532                .exec(&*tx)
1533                .await?;
1534                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1535                    connection_id: ActiveValue::set(connection.id as i32),
1536                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1537                    ..host.into_active_model()
1538                })
1539                .exec(&*tx)
1540                .await?;
1541
1542                self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1543                    .await?;
1544
1545                reshared_projects.push(ResharedProject {
1546                    id: project_id,
1547                    old_connection_id,
1548                    collaborators: collaborators
1549                        .iter()
1550                        .map(|collaborator| ProjectCollaborator {
1551                            connection_id: collaborator.connection(),
1552                            user_id: collaborator.user_id,
1553                            replica_id: collaborator.replica_id,
1554                            is_host: collaborator.is_host,
1555                        })
1556                        .collect(),
1557                    worktrees: reshared_project.worktrees.clone(),
1558                });
1559            }
1560
1561            project::Entity::delete_many()
1562                .filter(
1563                    Condition::all()
1564                        .add(project::Column::RoomId.eq(room_id))
1565                        .add(project::Column::HostUserId.eq(user_id))
1566                        .add(
1567                            project::Column::Id
1568                                .is_not_in(reshared_projects.iter().map(|project| project.id)),
1569                        ),
1570                )
1571                .exec(&*tx)
1572                .await?;
1573
1574            let mut rejoined_projects = Vec::new();
1575            for rejoined_project in &rejoin_room.rejoined_projects {
1576                let project_id = ProjectId::from_proto(rejoined_project.id);
1577                let Some(project) = project::Entity::find_by_id(project_id)
1578                    .one(&*tx)
1579                    .await? else { continue };
1580
1581                let mut worktrees = Vec::new();
1582                let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1583                for db_worktree in db_worktrees {
1584                    let mut worktree = RejoinedWorktree {
1585                        id: db_worktree.id as u64,
1586                        abs_path: db_worktree.abs_path,
1587                        root_name: db_worktree.root_name,
1588                        visible: db_worktree.visible,
1589                        updated_entries: Default::default(),
1590                        removed_entries: Default::default(),
1591                        updated_repositories: Default::default(),
1592                        removed_repositories: Default::default(),
1593                        diagnostic_summaries: Default::default(),
1594                        settings_files: Default::default(),
1595                        scan_id: db_worktree.scan_id as u64,
1596                        completed_scan_id: db_worktree.completed_scan_id as u64,
1597                    };
1598
1599                    let rejoined_worktree = rejoined_project
1600                        .worktrees
1601                        .iter()
1602                        .find(|worktree| worktree.id == db_worktree.id as u64);
1603
1604                    // File entries
1605                    {
1606                        let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1607                            worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1608                        } else {
1609                            worktree_entry::Column::IsDeleted.eq(false)
1610                        };
1611
1612                        let mut db_entries = worktree_entry::Entity::find()
1613                            .filter(
1614                                Condition::all()
1615                                    .add(worktree_entry::Column::ProjectId.eq(project.id))
1616                                    .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1617                                    .add(entry_filter),
1618                            )
1619                            .stream(&*tx)
1620                            .await?;
1621
1622                        while let Some(db_entry) = db_entries.next().await {
1623                            let db_entry = db_entry?;
1624                            if db_entry.is_deleted {
1625                                worktree.removed_entries.push(db_entry.id as u64);
1626                            } else {
1627                                worktree.updated_entries.push(proto::Entry {
1628                                    id: db_entry.id as u64,
1629                                    is_dir: db_entry.is_dir,
1630                                    path: db_entry.path,
1631                                    inode: db_entry.inode as u64,
1632                                    mtime: Some(proto::Timestamp {
1633                                        seconds: db_entry.mtime_seconds as u64,
1634                                        nanos: db_entry.mtime_nanos as u32,
1635                                    }),
1636                                    is_symlink: db_entry.is_symlink,
1637                                    is_ignored: db_entry.is_ignored,
1638                                    is_external: db_entry.is_external,
1639                                    git_status: db_entry.git_status.map(|status| status as i32),
1640                                });
1641                            }
1642                        }
1643                    }
1644
1645                    // Repository Entries
1646                    {
1647                        let repository_entry_filter =
1648                            if let Some(rejoined_worktree) = rejoined_worktree {
1649                                worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
1650                            } else {
1651                                worktree_repository::Column::IsDeleted.eq(false)
1652                            };
1653
1654                        let mut db_repositories = worktree_repository::Entity::find()
1655                            .filter(
1656                                Condition::all()
1657                                    .add(worktree_repository::Column::ProjectId.eq(project.id))
1658                                    .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
1659                                    .add(repository_entry_filter),
1660                            )
1661                            .stream(&*tx)
1662                            .await?;
1663
1664                        while let Some(db_repository) = db_repositories.next().await {
1665                            let db_repository = db_repository?;
1666                            if db_repository.is_deleted {
1667                                worktree
1668                                    .removed_repositories
1669                                    .push(db_repository.work_directory_id as u64);
1670                            } else {
1671                                worktree.updated_repositories.push(proto::RepositoryEntry {
1672                                    work_directory_id: db_repository.work_directory_id as u64,
1673                                    branch: db_repository.branch,
1674                                });
1675                            }
1676                        }
1677                    }
1678
1679                    worktrees.push(worktree);
1680                }
1681
1682                let language_servers = project
1683                    .find_related(language_server::Entity)
1684                    .all(&*tx)
1685                    .await?
1686                    .into_iter()
1687                    .map(|language_server| proto::LanguageServer {
1688                        id: language_server.id as u64,
1689                        name: language_server.name,
1690                    })
1691                    .collect::<Vec<_>>();
1692
1693                {
1694                    let mut db_settings_files = worktree_settings_file::Entity::find()
1695                        .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
1696                        .stream(&*tx)
1697                        .await?;
1698                    while let Some(db_settings_file) = db_settings_files.next().await {
1699                        let db_settings_file = db_settings_file?;
1700                        if let Some(worktree) = worktrees
1701                            .iter_mut()
1702                            .find(|w| w.id == db_settings_file.worktree_id as u64)
1703                        {
1704                            worktree.settings_files.push(WorktreeSettingsFile {
1705                                path: db_settings_file.path,
1706                                content: db_settings_file.content,
1707                            });
1708                        }
1709                    }
1710                }
1711
1712                let mut collaborators = project
1713                    .find_related(project_collaborator::Entity)
1714                    .all(&*tx)
1715                    .await?;
1716                let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1717                    .iter()
1718                    .position(|collaborator| collaborator.user_id == user_id)
1719                {
1720                    collaborators.swap_remove(self_collaborator_ix)
1721                } else {
1722                    continue;
1723                };
1724                let old_connection_id = self_collaborator.connection();
1725                project_collaborator::Entity::update(project_collaborator::ActiveModel {
1726                    connection_id: ActiveValue::set(connection.id as i32),
1727                    connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1728                    ..self_collaborator.into_active_model()
1729                })
1730                .exec(&*tx)
1731                .await?;
1732
1733                let collaborators = collaborators
1734                    .into_iter()
1735                    .map(|collaborator| ProjectCollaborator {
1736                        connection_id: collaborator.connection(),
1737                        user_id: collaborator.user_id,
1738                        replica_id: collaborator.replica_id,
1739                        is_host: collaborator.is_host,
1740                    })
1741                    .collect::<Vec<_>>();
1742
1743                rejoined_projects.push(RejoinedProject {
1744                    id: project_id,
1745                    old_connection_id,
1746                    collaborators,
1747                    worktrees,
1748                    language_servers,
1749                });
1750            }
1751
1752            let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
1753            let channel_members = if let Some(channel_id) = channel_id {
1754                self.get_channel_members_internal(channel_id, &tx).await?
1755            } else {
1756                Vec::new()
1757            };
1758
1759            Ok(RejoinedRoom {
1760                room,
1761                channel_id,
1762                channel_members,
1763                rejoined_projects,
1764                reshared_projects,
1765            })
1766        })
1767        .await
1768    }
1769
1770    pub async fn leave_room(
1771        &self,
1772        connection: ConnectionId,
1773    ) -> Result<Option<RoomGuard<LeftRoom>>> {
1774        self.optional_room_transaction(|tx| async move {
1775            let leaving_participant = room_participant::Entity::find()
1776                .filter(
1777                    Condition::all()
1778                        .add(
1779                            room_participant::Column::AnsweringConnectionId
1780                                .eq(connection.id as i32),
1781                        )
1782                        .add(
1783                            room_participant::Column::AnsweringConnectionServerId
1784                                .eq(connection.owner_id as i32),
1785                        ),
1786                )
1787                .one(&*tx)
1788                .await?;
1789
1790            if let Some(leaving_participant) = leaving_participant {
1791                // Leave room.
1792                let room_id = leaving_participant.room_id;
1793                room_participant::Entity::delete_by_id(leaving_participant.id)
1794                    .exec(&*tx)
1795                    .await?;
1796
1797                // Cancel pending calls initiated by the leaving user.
1798                let called_participants = room_participant::Entity::find()
1799                    .filter(
1800                        Condition::all()
1801                            .add(
1802                                room_participant::Column::CallingUserId
1803                                    .eq(leaving_participant.user_id),
1804                            )
1805                            .add(room_participant::Column::AnsweringConnectionId.is_null()),
1806                    )
1807                    .all(&*tx)
1808                    .await?;
1809                room_participant::Entity::delete_many()
1810                    .filter(
1811                        room_participant::Column::Id
1812                            .is_in(called_participants.iter().map(|participant| participant.id)),
1813                    )
1814                    .exec(&*tx)
1815                    .await?;
1816                let canceled_calls_to_user_ids = called_participants
1817                    .into_iter()
1818                    .map(|participant| participant.user_id)
1819                    .collect();
1820
1821                // Detect left projects.
1822                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1823                enum QueryProjectIds {
1824                    ProjectId,
1825                }
1826                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1827                    .select_only()
1828                    .column_as(
1829                        project_collaborator::Column::ProjectId,
1830                        QueryProjectIds::ProjectId,
1831                    )
1832                    .filter(
1833                        Condition::all()
1834                            .add(
1835                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1836                            )
1837                            .add(
1838                                project_collaborator::Column::ConnectionServerId
1839                                    .eq(connection.owner_id as i32),
1840                            ),
1841                    )
1842                    .into_values::<_, QueryProjectIds>()
1843                    .all(&*tx)
1844                    .await?;
1845                let mut left_projects = HashMap::default();
1846                let mut collaborators = project_collaborator::Entity::find()
1847                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1848                    .stream(&*tx)
1849                    .await?;
1850                while let Some(collaborator) = collaborators.next().await {
1851                    let collaborator = collaborator?;
1852                    let left_project =
1853                        left_projects
1854                            .entry(collaborator.project_id)
1855                            .or_insert(LeftProject {
1856                                id: collaborator.project_id,
1857                                host_user_id: Default::default(),
1858                                connection_ids: Default::default(),
1859                                host_connection_id: Default::default(),
1860                            });
1861
1862                    let collaborator_connection_id = collaborator.connection();
1863                    if collaborator_connection_id != connection {
1864                        left_project.connection_ids.push(collaborator_connection_id);
1865                    }
1866
1867                    if collaborator.is_host {
1868                        left_project.host_user_id = collaborator.user_id;
1869                        left_project.host_connection_id = collaborator_connection_id;
1870                    }
1871                }
1872                drop(collaborators);
1873
1874                // Leave projects.
1875                project_collaborator::Entity::delete_many()
1876                    .filter(
1877                        Condition::all()
1878                            .add(
1879                                project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1880                            )
1881                            .add(
1882                                project_collaborator::Column::ConnectionServerId
1883                                    .eq(connection.owner_id as i32),
1884                            ),
1885                    )
1886                    .exec(&*tx)
1887                    .await?;
1888
1889                // Unshare projects.
1890                project::Entity::delete_many()
1891                    .filter(
1892                        Condition::all()
1893                            .add(project::Column::RoomId.eq(room_id))
1894                            .add(project::Column::HostConnectionId.eq(connection.id as i32))
1895                            .add(
1896                                project::Column::HostConnectionServerId
1897                                    .eq(connection.owner_id as i32),
1898                            ),
1899                    )
1900                    .exec(&*tx)
1901                    .await?;
1902
1903                let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
1904                let deleted = if room.participants.is_empty() {
1905                    let result = room::Entity::delete_by_id(room_id)
1906                        .filter(room::Column::ChannelId.is_null())
1907                        .exec(&*tx)
1908                        .await?;
1909                    result.rows_affected > 0
1910                } else {
1911                    false
1912                };
1913
1914                let channel_members = if let Some(channel_id) = channel_id {
1915                    self.get_channel_members_internal(channel_id, &tx).await?
1916                } else {
1917                    Vec::new()
1918                };
1919                let left_room = LeftRoom {
1920                    room,
1921                    channel_id,
1922                    channel_members,
1923                    left_projects,
1924                    canceled_calls_to_user_ids,
1925                    deleted,
1926                };
1927
1928                if left_room.room.participants.is_empty() {
1929                    self.rooms.remove(&room_id);
1930                }
1931
1932                Ok(Some((room_id, left_room)))
1933            } else {
1934                Ok(None)
1935            }
1936        })
1937        .await
1938    }
1939
1940    pub async fn follow(
1941        &self,
1942        project_id: ProjectId,
1943        leader_connection: ConnectionId,
1944        follower_connection: ConnectionId,
1945    ) -> Result<RoomGuard<proto::Room>> {
1946        let room_id = self.room_id_for_project(project_id).await?;
1947        self.room_transaction(room_id, |tx| async move {
1948            follower::ActiveModel {
1949                room_id: ActiveValue::set(room_id),
1950                project_id: ActiveValue::set(project_id),
1951                leader_connection_server_id: ActiveValue::set(ServerId(
1952                    leader_connection.owner_id as i32,
1953                )),
1954                leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1955                follower_connection_server_id: ActiveValue::set(ServerId(
1956                    follower_connection.owner_id as i32,
1957                )),
1958                follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1959                ..Default::default()
1960            }
1961            .insert(&*tx)
1962            .await?;
1963
1964            let room = self.get_room(room_id, &*tx).await?;
1965            Ok(room)
1966        })
1967        .await
1968    }
1969
1970    pub async fn unfollow(
1971        &self,
1972        project_id: ProjectId,
1973        leader_connection: ConnectionId,
1974        follower_connection: ConnectionId,
1975    ) -> Result<RoomGuard<proto::Room>> {
1976        let room_id = self.room_id_for_project(project_id).await?;
1977        self.room_transaction(room_id, |tx| async move {
1978            follower::Entity::delete_many()
1979                .filter(
1980                    Condition::all()
1981                        .add(follower::Column::ProjectId.eq(project_id))
1982                        .add(
1983                            follower::Column::LeaderConnectionServerId
1984                                .eq(leader_connection.owner_id),
1985                        )
1986                        .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1987                        .add(
1988                            follower::Column::FollowerConnectionServerId
1989                                .eq(follower_connection.owner_id),
1990                        )
1991                        .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1992                )
1993                .exec(&*tx)
1994                .await?;
1995
1996            let room = self.get_room(room_id, &*tx).await?;
1997            Ok(room)
1998        })
1999        .await
2000    }
2001
2002    pub async fn update_room_participant_location(
2003        &self,
2004        room_id: RoomId,
2005        connection: ConnectionId,
2006        location: proto::ParticipantLocation,
2007    ) -> Result<RoomGuard<proto::Room>> {
2008        self.room_transaction(room_id, |tx| async {
2009            let tx = tx;
2010            let location_kind;
2011            let location_project_id;
2012            match location
2013                .variant
2014                .as_ref()
2015                .ok_or_else(|| anyhow!("invalid location"))?
2016            {
2017                proto::participant_location::Variant::SharedProject(project) => {
2018                    location_kind = 0;
2019                    location_project_id = Some(ProjectId::from_proto(project.id));
2020                }
2021                proto::participant_location::Variant::UnsharedProject(_) => {
2022                    location_kind = 1;
2023                    location_project_id = None;
2024                }
2025                proto::participant_location::Variant::External(_) => {
2026                    location_kind = 2;
2027                    location_project_id = None;
2028                }
2029            }
2030
2031            let result = room_participant::Entity::update_many()
2032                .filter(
2033                    Condition::all()
2034                        .add(room_participant::Column::RoomId.eq(room_id))
2035                        .add(
2036                            room_participant::Column::AnsweringConnectionId
2037                                .eq(connection.id as i32),
2038                        )
2039                        .add(
2040                            room_participant::Column::AnsweringConnectionServerId
2041                                .eq(connection.owner_id as i32),
2042                        ),
2043                )
2044                .set(room_participant::ActiveModel {
2045                    location_kind: ActiveValue::set(Some(location_kind)),
2046                    location_project_id: ActiveValue::set(location_project_id),
2047                    ..Default::default()
2048                })
2049                .exec(&*tx)
2050                .await?;
2051
2052            if result.rows_affected == 1 {
2053                let room = self.get_room(room_id, &tx).await?;
2054                Ok(room)
2055            } else {
2056                Err(anyhow!("could not update room participant location"))?
2057            }
2058        })
2059        .await
2060    }
2061
2062    pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
2063        self.transaction(|tx| async move {
2064            let participant = room_participant::Entity::find()
2065                .filter(
2066                    Condition::all()
2067                        .add(
2068                            room_participant::Column::AnsweringConnectionId
2069                                .eq(connection.id as i32),
2070                        )
2071                        .add(
2072                            room_participant::Column::AnsweringConnectionServerId
2073                                .eq(connection.owner_id as i32),
2074                        ),
2075                )
2076                .one(&*tx)
2077                .await?
2078                .ok_or_else(|| anyhow!("not a participant in any room"))?;
2079
2080            room_participant::Entity::update(room_participant::ActiveModel {
2081                answering_connection_lost: ActiveValue::set(true),
2082                ..participant.into_active_model()
2083            })
2084            .exec(&*tx)
2085            .await?;
2086
2087            Ok(())
2088        })
2089        .await
2090    }
2091
2092    fn build_incoming_call(
2093        room: &proto::Room,
2094        called_user_id: UserId,
2095    ) -> Option<proto::IncomingCall> {
2096        let pending_participant = room
2097            .pending_participants
2098            .iter()
2099            .find(|participant| participant.user_id == called_user_id.to_proto())?;
2100
2101        Some(proto::IncomingCall {
2102            room_id: room.id,
2103            calling_user_id: pending_participant.calling_user_id,
2104            participant_user_ids: room
2105                .participants
2106                .iter()
2107                .map(|participant| participant.user_id)
2108                .collect(),
2109            initial_project: room.participants.iter().find_map(|participant| {
2110                let initial_project_id = pending_participant.initial_project_id?;
2111                participant
2112                    .projects
2113                    .iter()
2114                    .find(|project| project.id == initial_project_id)
2115                    .cloned()
2116            }),
2117        })
2118    }
2119    async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
2120        let (_, room) = self.get_channel_room(room_id, tx).await?;
2121        Ok(room)
2122    }
2123
2124    async fn get_channel_room(
2125        &self,
2126        room_id: RoomId,
2127        tx: &DatabaseTransaction,
2128    ) -> Result<(Option<ChannelId>, proto::Room)> {
2129        let db_room = room::Entity::find_by_id(room_id)
2130            .one(tx)
2131            .await?
2132            .ok_or_else(|| anyhow!("could not find room"))?;
2133
2134        let mut db_participants = db_room
2135            .find_related(room_participant::Entity)
2136            .stream(tx)
2137            .await?;
2138        let mut participants = HashMap::default();
2139        let mut pending_participants = Vec::new();
2140        while let Some(db_participant) = db_participants.next().await {
2141            let db_participant = db_participant?;
2142            if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
2143                .answering_connection_id
2144                .zip(db_participant.answering_connection_server_id)
2145            {
2146                let location = match (
2147                    db_participant.location_kind,
2148                    db_participant.location_project_id,
2149                ) {
2150                    (Some(0), Some(project_id)) => {
2151                        Some(proto::participant_location::Variant::SharedProject(
2152                            proto::participant_location::SharedProject {
2153                                id: project_id.to_proto(),
2154                            },
2155                        ))
2156                    }
2157                    (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
2158                        Default::default(),
2159                    )),
2160                    _ => Some(proto::participant_location::Variant::External(
2161                        Default::default(),
2162                    )),
2163                };
2164
2165                let answering_connection = ConnectionId {
2166                    owner_id: answering_connection_server_id.0 as u32,
2167                    id: answering_connection_id as u32,
2168                };
2169                participants.insert(
2170                    answering_connection,
2171                    proto::Participant {
2172                        user_id: db_participant.user_id.to_proto(),
2173                        peer_id: Some(answering_connection.into()),
2174                        projects: Default::default(),
2175                        location: Some(proto::ParticipantLocation { variant: location }),
2176                    },
2177                );
2178            } else {
2179                pending_participants.push(proto::PendingParticipant {
2180                    user_id: db_participant.user_id.to_proto(),
2181                    calling_user_id: db_participant.calling_user_id.to_proto(),
2182                    initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
2183                });
2184            }
2185        }
2186        drop(db_participants);
2187
2188        let mut db_projects = db_room
2189            .find_related(project::Entity)
2190            .find_with_related(worktree::Entity)
2191            .stream(tx)
2192            .await?;
2193
2194        while let Some(row) = db_projects.next().await {
2195            let (db_project, db_worktree) = row?;
2196            let host_connection = db_project.host_connection()?;
2197            if let Some(participant) = participants.get_mut(&host_connection) {
2198                let project = if let Some(project) = participant
2199                    .projects
2200                    .iter_mut()
2201                    .find(|project| project.id == db_project.id.to_proto())
2202                {
2203                    project
2204                } else {
2205                    participant.projects.push(proto::ParticipantProject {
2206                        id: db_project.id.to_proto(),
2207                        worktree_root_names: Default::default(),
2208                    });
2209                    participant.projects.last_mut().unwrap()
2210                };
2211
2212                if let Some(db_worktree) = db_worktree {
2213                    if db_worktree.visible {
2214                        project.worktree_root_names.push(db_worktree.root_name);
2215                    }
2216                }
2217            }
2218        }
2219        drop(db_projects);
2220
2221        let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
2222        let mut followers = Vec::new();
2223        while let Some(db_follower) = db_followers.next().await {
2224            let db_follower = db_follower?;
2225            followers.push(proto::Follower {
2226                leader_id: Some(db_follower.leader_connection().into()),
2227                follower_id: Some(db_follower.follower_connection().into()),
2228                project_id: db_follower.project_id.to_proto(),
2229            });
2230        }
2231
2232        Ok((
2233            db_room.channel_id,
2234            proto::Room {
2235                id: db_room.id.to_proto(),
2236                live_kit_room: db_room.live_kit_room,
2237                participants: participants.into_values().collect(),
2238                pending_participants,
2239                followers,
2240            },
2241        ))
2242    }
2243
2244    // projects
2245
2246    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
2247        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2248        enum QueryAs {
2249            Count,
2250        }
2251
2252        self.transaction(|tx| async move {
2253            Ok(project::Entity::find()
2254                .select_only()
2255                .column_as(project::Column::Id.count(), QueryAs::Count)
2256                .inner_join(user::Entity)
2257                .filter(user::Column::Admin.eq(false))
2258                .into_values::<_, QueryAs>()
2259                .one(&*tx)
2260                .await?
2261                .unwrap_or(0i64) as usize)
2262        })
2263        .await
2264    }
2265
2266    pub async fn share_project(
2267        &self,
2268        room_id: RoomId,
2269        connection: ConnectionId,
2270        worktrees: &[proto::WorktreeMetadata],
2271    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
2272        self.room_transaction(room_id, |tx| async move {
2273            let participant = room_participant::Entity::find()
2274                .filter(
2275                    Condition::all()
2276                        .add(
2277                            room_participant::Column::AnsweringConnectionId
2278                                .eq(connection.id as i32),
2279                        )
2280                        .add(
2281                            room_participant::Column::AnsweringConnectionServerId
2282                                .eq(connection.owner_id as i32),
2283                        ),
2284                )
2285                .one(&*tx)
2286                .await?
2287                .ok_or_else(|| anyhow!("could not find participant"))?;
2288            if participant.room_id != room_id {
2289                return Err(anyhow!("shared project on unexpected room"))?;
2290            }
2291
2292            let project = project::ActiveModel {
2293                room_id: ActiveValue::set(participant.room_id),
2294                host_user_id: ActiveValue::set(participant.user_id),
2295                host_connection_id: ActiveValue::set(Some(connection.id as i32)),
2296                host_connection_server_id: ActiveValue::set(Some(ServerId(
2297                    connection.owner_id as i32,
2298                ))),
2299                ..Default::default()
2300            }
2301            .insert(&*tx)
2302            .await?;
2303
2304            if !worktrees.is_empty() {
2305                worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2306                    worktree::ActiveModel {
2307                        id: ActiveValue::set(worktree.id as i64),
2308                        project_id: ActiveValue::set(project.id),
2309                        abs_path: ActiveValue::set(worktree.abs_path.clone()),
2310                        root_name: ActiveValue::set(worktree.root_name.clone()),
2311                        visible: ActiveValue::set(worktree.visible),
2312                        scan_id: ActiveValue::set(0),
2313                        completed_scan_id: ActiveValue::set(0),
2314                    }
2315                }))
2316                .exec(&*tx)
2317                .await?;
2318            }
2319
2320            project_collaborator::ActiveModel {
2321                project_id: ActiveValue::set(project.id),
2322                connection_id: ActiveValue::set(connection.id as i32),
2323                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2324                user_id: ActiveValue::set(participant.user_id),
2325                replica_id: ActiveValue::set(ReplicaId(0)),
2326                is_host: ActiveValue::set(true),
2327                ..Default::default()
2328            }
2329            .insert(&*tx)
2330            .await?;
2331
2332            let room = self.get_room(room_id, &tx).await?;
2333            Ok((project.id, room))
2334        })
2335        .await
2336    }
2337
2338    pub async fn unshare_project(
2339        &self,
2340        project_id: ProjectId,
2341        connection: ConnectionId,
2342    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2343        let room_id = self.room_id_for_project(project_id).await?;
2344        self.room_transaction(room_id, |tx| async move {
2345            let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2346
2347            let project = project::Entity::find_by_id(project_id)
2348                .one(&*tx)
2349                .await?
2350                .ok_or_else(|| anyhow!("project not found"))?;
2351            if project.host_connection()? == connection {
2352                project::Entity::delete(project.into_active_model())
2353                    .exec(&*tx)
2354                    .await?;
2355                let room = self.get_room(room_id, &tx).await?;
2356                Ok((room, guest_connection_ids))
2357            } else {
2358                Err(anyhow!("cannot unshare a project hosted by another user"))?
2359            }
2360        })
2361        .await
2362    }
2363
2364    pub async fn update_project(
2365        &self,
2366        project_id: ProjectId,
2367        connection: ConnectionId,
2368        worktrees: &[proto::WorktreeMetadata],
2369    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2370        let room_id = self.room_id_for_project(project_id).await?;
2371        self.room_transaction(room_id, |tx| async move {
2372            let project = project::Entity::find_by_id(project_id)
2373                .filter(
2374                    Condition::all()
2375                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2376                        .add(
2377                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2378                        ),
2379                )
2380                .one(&*tx)
2381                .await?
2382                .ok_or_else(|| anyhow!("no such project"))?;
2383
2384            self.update_project_worktrees(project.id, worktrees, &tx)
2385                .await?;
2386
2387            let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2388            let room = self.get_room(project.room_id, &tx).await?;
2389            Ok((room, guest_connection_ids))
2390        })
2391        .await
2392    }
2393
2394    async fn update_project_worktrees(
2395        &self,
2396        project_id: ProjectId,
2397        worktrees: &[proto::WorktreeMetadata],
2398        tx: &DatabaseTransaction,
2399    ) -> Result<()> {
2400        if !worktrees.is_empty() {
2401            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2402                id: ActiveValue::set(worktree.id as i64),
2403                project_id: ActiveValue::set(project_id),
2404                abs_path: ActiveValue::set(worktree.abs_path.clone()),
2405                root_name: ActiveValue::set(worktree.root_name.clone()),
2406                visible: ActiveValue::set(worktree.visible),
2407                scan_id: ActiveValue::set(0),
2408                completed_scan_id: ActiveValue::set(0),
2409            }))
2410            .on_conflict(
2411                OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2412                    .update_column(worktree::Column::RootName)
2413                    .to_owned(),
2414            )
2415            .exec(&*tx)
2416            .await?;
2417        }
2418
2419        worktree::Entity::delete_many()
2420            .filter(worktree::Column::ProjectId.eq(project_id).and(
2421                worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2422            ))
2423            .exec(&*tx)
2424            .await?;
2425
2426        Ok(())
2427    }
2428
2429    pub async fn update_worktree(
2430        &self,
2431        update: &proto::UpdateWorktree,
2432        connection: ConnectionId,
2433    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2434        let project_id = ProjectId::from_proto(update.project_id);
2435        let worktree_id = update.worktree_id as i64;
2436        let room_id = self.room_id_for_project(project_id).await?;
2437        self.room_transaction(room_id, |tx| async move {
2438            // Ensure the update comes from the host.
2439            let _project = project::Entity::find_by_id(project_id)
2440                .filter(
2441                    Condition::all()
2442                        .add(project::Column::HostConnectionId.eq(connection.id as i32))
2443                        .add(
2444                            project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2445                        ),
2446                )
2447                .one(&*tx)
2448                .await?
2449                .ok_or_else(|| anyhow!("no such project"))?;
2450
2451            // Update metadata.
2452            worktree::Entity::update(worktree::ActiveModel {
2453                id: ActiveValue::set(worktree_id),
2454                project_id: ActiveValue::set(project_id),
2455                root_name: ActiveValue::set(update.root_name.clone()),
2456                scan_id: ActiveValue::set(update.scan_id as i64),
2457                completed_scan_id: if update.is_last_update {
2458                    ActiveValue::set(update.scan_id as i64)
2459                } else {
2460                    ActiveValue::default()
2461                },
2462                abs_path: ActiveValue::set(update.abs_path.clone()),
2463                ..Default::default()
2464            })
2465            .exec(&*tx)
2466            .await?;
2467
2468            if !update.updated_entries.is_empty() {
2469                worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2470                    let mtime = entry.mtime.clone().unwrap_or_default();
2471                    worktree_entry::ActiveModel {
2472                        project_id: ActiveValue::set(project_id),
2473                        worktree_id: ActiveValue::set(worktree_id),
2474                        id: ActiveValue::set(entry.id as i64),
2475                        is_dir: ActiveValue::set(entry.is_dir),
2476                        path: ActiveValue::set(entry.path.clone()),
2477                        inode: ActiveValue::set(entry.inode as i64),
2478                        mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2479                        mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2480                        is_symlink: ActiveValue::set(entry.is_symlink),
2481                        is_ignored: ActiveValue::set(entry.is_ignored),
2482                        is_external: ActiveValue::set(entry.is_external),
2483                        git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
2484                        is_deleted: ActiveValue::set(false),
2485                        scan_id: ActiveValue::set(update.scan_id as i64),
2486                    }
2487                }))
2488                .on_conflict(
2489                    OnConflict::columns([
2490                        worktree_entry::Column::ProjectId,
2491                        worktree_entry::Column::WorktreeId,
2492                        worktree_entry::Column::Id,
2493                    ])
2494                    .update_columns([
2495                        worktree_entry::Column::IsDir,
2496                        worktree_entry::Column::Path,
2497                        worktree_entry::Column::Inode,
2498                        worktree_entry::Column::MtimeSeconds,
2499                        worktree_entry::Column::MtimeNanos,
2500                        worktree_entry::Column::IsSymlink,
2501                        worktree_entry::Column::IsIgnored,
2502                        worktree_entry::Column::GitStatus,
2503                        worktree_entry::Column::ScanId,
2504                    ])
2505                    .to_owned(),
2506                )
2507                .exec(&*tx)
2508                .await?;
2509            }
2510
2511            if !update.removed_entries.is_empty() {
2512                worktree_entry::Entity::update_many()
2513                    .filter(
2514                        worktree_entry::Column::ProjectId
2515                            .eq(project_id)
2516                            .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2517                            .and(
2518                                worktree_entry::Column::Id
2519                                    .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2520                            ),
2521                    )
2522                    .set(worktree_entry::ActiveModel {
2523                        is_deleted: ActiveValue::Set(true),
2524                        scan_id: ActiveValue::Set(update.scan_id as i64),
2525                        ..Default::default()
2526                    })
2527                    .exec(&*tx)
2528                    .await?;
2529            }
2530
2531            if !update.updated_repositories.is_empty() {
2532                worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
2533                    |repository| worktree_repository::ActiveModel {
2534                        project_id: ActiveValue::set(project_id),
2535                        worktree_id: ActiveValue::set(worktree_id),
2536                        work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
2537                        scan_id: ActiveValue::set(update.scan_id as i64),
2538                        branch: ActiveValue::set(repository.branch.clone()),
2539                        is_deleted: ActiveValue::set(false),
2540                    },
2541                ))
2542                .on_conflict(
2543                    OnConflict::columns([
2544                        worktree_repository::Column::ProjectId,
2545                        worktree_repository::Column::WorktreeId,
2546                        worktree_repository::Column::WorkDirectoryId,
2547                    ])
2548                    .update_columns([
2549                        worktree_repository::Column::ScanId,
2550                        worktree_repository::Column::Branch,
2551                    ])
2552                    .to_owned(),
2553                )
2554                .exec(&*tx)
2555                .await?;
2556            }
2557
2558            if !update.removed_repositories.is_empty() {
2559                worktree_repository::Entity::update_many()
2560                    .filter(
2561                        worktree_repository::Column::ProjectId
2562                            .eq(project_id)
2563                            .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
2564                            .and(
2565                                worktree_repository::Column::WorkDirectoryId
2566                                    .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
2567                            ),
2568                    )
2569                    .set(worktree_repository::ActiveModel {
2570                        is_deleted: ActiveValue::Set(true),
2571                        scan_id: ActiveValue::Set(update.scan_id as i64),
2572                        ..Default::default()
2573                    })
2574                    .exec(&*tx)
2575                    .await?;
2576            }
2577
2578            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2579            Ok(connection_ids)
2580        })
2581        .await
2582    }
2583
2584    pub async fn update_diagnostic_summary(
2585        &self,
2586        update: &proto::UpdateDiagnosticSummary,
2587        connection: ConnectionId,
2588    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2589        let project_id = ProjectId::from_proto(update.project_id);
2590        let worktree_id = update.worktree_id as i64;
2591        let room_id = self.room_id_for_project(project_id).await?;
2592        self.room_transaction(room_id, |tx| async move {
2593            let summary = update
2594                .summary
2595                .as_ref()
2596                .ok_or_else(|| anyhow!("invalid summary"))?;
2597
2598            // Ensure the update comes from the host.
2599            let project = project::Entity::find_by_id(project_id)
2600                .one(&*tx)
2601                .await?
2602                .ok_or_else(|| anyhow!("no such project"))?;
2603            if project.host_connection()? != connection {
2604                return Err(anyhow!("can't update a project hosted by someone else"))?;
2605            }
2606
2607            // Update summary.
2608            worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2609                project_id: ActiveValue::set(project_id),
2610                worktree_id: ActiveValue::set(worktree_id),
2611                path: ActiveValue::set(summary.path.clone()),
2612                language_server_id: ActiveValue::set(summary.language_server_id as i64),
2613                error_count: ActiveValue::set(summary.error_count as i32),
2614                warning_count: ActiveValue::set(summary.warning_count as i32),
2615                ..Default::default()
2616            })
2617            .on_conflict(
2618                OnConflict::columns([
2619                    worktree_diagnostic_summary::Column::ProjectId,
2620                    worktree_diagnostic_summary::Column::WorktreeId,
2621                    worktree_diagnostic_summary::Column::Path,
2622                ])
2623                .update_columns([
2624                    worktree_diagnostic_summary::Column::LanguageServerId,
2625                    worktree_diagnostic_summary::Column::ErrorCount,
2626                    worktree_diagnostic_summary::Column::WarningCount,
2627                ])
2628                .to_owned(),
2629            )
2630            .exec(&*tx)
2631            .await?;
2632
2633            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2634            Ok(connection_ids)
2635        })
2636        .await
2637    }
2638
2639    pub async fn start_language_server(
2640        &self,
2641        update: &proto::StartLanguageServer,
2642        connection: ConnectionId,
2643    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2644        let project_id = ProjectId::from_proto(update.project_id);
2645        let room_id = self.room_id_for_project(project_id).await?;
2646        self.room_transaction(room_id, |tx| async move {
2647            let server = update
2648                .server
2649                .as_ref()
2650                .ok_or_else(|| anyhow!("invalid language server"))?;
2651
2652            // Ensure the update comes from the host.
2653            let project = project::Entity::find_by_id(project_id)
2654                .one(&*tx)
2655                .await?
2656                .ok_or_else(|| anyhow!("no such project"))?;
2657            if project.host_connection()? != connection {
2658                return Err(anyhow!("can't update a project hosted by someone else"))?;
2659            }
2660
2661            // Add the newly-started language server.
2662            language_server::Entity::insert(language_server::ActiveModel {
2663                project_id: ActiveValue::set(project_id),
2664                id: ActiveValue::set(server.id as i64),
2665                name: ActiveValue::set(server.name.clone()),
2666                ..Default::default()
2667            })
2668            .on_conflict(
2669                OnConflict::columns([
2670                    language_server::Column::ProjectId,
2671                    language_server::Column::Id,
2672                ])
2673                .update_column(language_server::Column::Name)
2674                .to_owned(),
2675            )
2676            .exec(&*tx)
2677            .await?;
2678
2679            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2680            Ok(connection_ids)
2681        })
2682        .await
2683    }
2684
2685    pub async fn update_worktree_settings(
2686        &self,
2687        update: &proto::UpdateWorktreeSettings,
2688        connection: ConnectionId,
2689    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2690        let project_id = ProjectId::from_proto(update.project_id);
2691        let room_id = self.room_id_for_project(project_id).await?;
2692        self.room_transaction(room_id, |tx| async move {
2693            // Ensure the update comes from the host.
2694            let project = project::Entity::find_by_id(project_id)
2695                .one(&*tx)
2696                .await?
2697                .ok_or_else(|| anyhow!("no such project"))?;
2698            if project.host_connection()? != connection {
2699                return Err(anyhow!("can't update a project hosted by someone else"))?;
2700            }
2701
2702            if let Some(content) = &update.content {
2703                worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
2704                    project_id: ActiveValue::Set(project_id),
2705                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
2706                    path: ActiveValue::Set(update.path.clone()),
2707                    content: ActiveValue::Set(content.clone()),
2708                })
2709                .on_conflict(
2710                    OnConflict::columns([
2711                        worktree_settings_file::Column::ProjectId,
2712                        worktree_settings_file::Column::WorktreeId,
2713                        worktree_settings_file::Column::Path,
2714                    ])
2715                    .update_column(worktree_settings_file::Column::Content)
2716                    .to_owned(),
2717                )
2718                .exec(&*tx)
2719                .await?;
2720            } else {
2721                worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
2722                    project_id: ActiveValue::Set(project_id),
2723                    worktree_id: ActiveValue::Set(update.worktree_id as i64),
2724                    path: ActiveValue::Set(update.path.clone()),
2725                    ..Default::default()
2726                })
2727                .exec(&*tx)
2728                .await?;
2729            }
2730
2731            let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2732            Ok(connection_ids)
2733        })
2734        .await
2735    }
2736
2737    pub async fn join_project(
2738        &self,
2739        project_id: ProjectId,
2740        connection: ConnectionId,
2741    ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2742        let room_id = self.room_id_for_project(project_id).await?;
2743        self.room_transaction(room_id, |tx| async move {
2744            let participant = room_participant::Entity::find()
2745                .filter(
2746                    Condition::all()
2747                        .add(
2748                            room_participant::Column::AnsweringConnectionId
2749                                .eq(connection.id as i32),
2750                        )
2751                        .add(
2752                            room_participant::Column::AnsweringConnectionServerId
2753                                .eq(connection.owner_id as i32),
2754                        ),
2755                )
2756                .one(&*tx)
2757                .await?
2758                .ok_or_else(|| anyhow!("must join a room first"))?;
2759
2760            let project = project::Entity::find_by_id(project_id)
2761                .one(&*tx)
2762                .await?
2763                .ok_or_else(|| anyhow!("no such project"))?;
2764            if project.room_id != participant.room_id {
2765                return Err(anyhow!("no such project"))?;
2766            }
2767
2768            let mut collaborators = project
2769                .find_related(project_collaborator::Entity)
2770                .all(&*tx)
2771                .await?;
2772            let replica_ids = collaborators
2773                .iter()
2774                .map(|c| c.replica_id)
2775                .collect::<HashSet<_>>();
2776            let mut replica_id = ReplicaId(1);
2777            while replica_ids.contains(&replica_id) {
2778                replica_id.0 += 1;
2779            }
2780            let new_collaborator = project_collaborator::ActiveModel {
2781                project_id: ActiveValue::set(project_id),
2782                connection_id: ActiveValue::set(connection.id as i32),
2783                connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2784                user_id: ActiveValue::set(participant.user_id),
2785                replica_id: ActiveValue::set(replica_id),
2786                is_host: ActiveValue::set(false),
2787                ..Default::default()
2788            }
2789            .insert(&*tx)
2790            .await?;
2791            collaborators.push(new_collaborator);
2792
2793            let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2794            let mut worktrees = db_worktrees
2795                .into_iter()
2796                .map(|db_worktree| {
2797                    (
2798                        db_worktree.id as u64,
2799                        Worktree {
2800                            id: db_worktree.id as u64,
2801                            abs_path: db_worktree.abs_path,
2802                            root_name: db_worktree.root_name,
2803                            visible: db_worktree.visible,
2804                            entries: Default::default(),
2805                            repository_entries: Default::default(),
2806                            diagnostic_summaries: Default::default(),
2807                            settings_files: Default::default(),
2808                            scan_id: db_worktree.scan_id as u64,
2809                            completed_scan_id: db_worktree.completed_scan_id as u64,
2810                        },
2811                    )
2812                })
2813                .collect::<BTreeMap<_, _>>();
2814
2815            // Populate worktree entries.
2816            {
2817                let mut db_entries = worktree_entry::Entity::find()
2818                    .filter(
2819                        Condition::all()
2820                            .add(worktree_entry::Column::ProjectId.eq(project_id))
2821                            .add(worktree_entry::Column::IsDeleted.eq(false)),
2822                    )
2823                    .stream(&*tx)
2824                    .await?;
2825                while let Some(db_entry) = db_entries.next().await {
2826                    let db_entry = db_entry?;
2827                    if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2828                        worktree.entries.push(proto::Entry {
2829                            id: db_entry.id as u64,
2830                            is_dir: db_entry.is_dir,
2831                            path: db_entry.path,
2832                            inode: db_entry.inode as u64,
2833                            mtime: Some(proto::Timestamp {
2834                                seconds: db_entry.mtime_seconds as u64,
2835                                nanos: db_entry.mtime_nanos as u32,
2836                            }),
2837                            is_symlink: db_entry.is_symlink,
2838                            is_ignored: db_entry.is_ignored,
2839                            is_external: db_entry.is_external,
2840                            git_status: db_entry.git_status.map(|status| status as i32),
2841                        });
2842                    }
2843                }
2844            }
2845
2846            // Populate repository entries.
2847            {
2848                let mut db_repository_entries = worktree_repository::Entity::find()
2849                    .filter(
2850                        Condition::all()
2851                            .add(worktree_repository::Column::ProjectId.eq(project_id))
2852                            .add(worktree_repository::Column::IsDeleted.eq(false)),
2853                    )
2854                    .stream(&*tx)
2855                    .await?;
2856                while let Some(db_repository_entry) = db_repository_entries.next().await {
2857                    let db_repository_entry = db_repository_entry?;
2858                    if let Some(worktree) =
2859                        worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
2860                    {
2861                        worktree.repository_entries.insert(
2862                            db_repository_entry.work_directory_id as u64,
2863                            proto::RepositoryEntry {
2864                                work_directory_id: db_repository_entry.work_directory_id as u64,
2865                                branch: db_repository_entry.branch,
2866                            },
2867                        );
2868                    }
2869                }
2870            }
2871
2872            // Populate worktree diagnostic summaries.
2873            {
2874                let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2875                    .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2876                    .stream(&*tx)
2877                    .await?;
2878                while let Some(db_summary) = db_summaries.next().await {
2879                    let db_summary = db_summary?;
2880                    if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2881                        worktree
2882                            .diagnostic_summaries
2883                            .push(proto::DiagnosticSummary {
2884                                path: db_summary.path,
2885                                language_server_id: db_summary.language_server_id as u64,
2886                                error_count: db_summary.error_count as u32,
2887                                warning_count: db_summary.warning_count as u32,
2888                            });
2889                    }
2890                }
2891            }
2892
2893            // Populate worktree settings files
2894            {
2895                let mut db_settings_files = worktree_settings_file::Entity::find()
2896                    .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
2897                    .stream(&*tx)
2898                    .await?;
2899                while let Some(db_settings_file) = db_settings_files.next().await {
2900                    let db_settings_file = db_settings_file?;
2901                    if let Some(worktree) =
2902                        worktrees.get_mut(&(db_settings_file.worktree_id as u64))
2903                    {
2904                        worktree.settings_files.push(WorktreeSettingsFile {
2905                            path: db_settings_file.path,
2906                            content: db_settings_file.content,
2907                        });
2908                    }
2909                }
2910            }
2911
2912            // Populate language servers.
2913            let language_servers = project
2914                .find_related(language_server::Entity)
2915                .all(&*tx)
2916                .await?;
2917
2918            let project = Project {
2919                collaborators: collaborators
2920                    .into_iter()
2921                    .map(|collaborator| ProjectCollaborator {
2922                        connection_id: collaborator.connection(),
2923                        user_id: collaborator.user_id,
2924                        replica_id: collaborator.replica_id,
2925                        is_host: collaborator.is_host,
2926                    })
2927                    .collect(),
2928                worktrees,
2929                language_servers: language_servers
2930                    .into_iter()
2931                    .map(|language_server| proto::LanguageServer {
2932                        id: language_server.id as u64,
2933                        name: language_server.name,
2934                    })
2935                    .collect(),
2936            };
2937            Ok((project, replica_id as ReplicaId))
2938        })
2939        .await
2940    }
2941
2942    pub async fn leave_project(
2943        &self,
2944        project_id: ProjectId,
2945        connection: ConnectionId,
2946    ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
2947        let room_id = self.room_id_for_project(project_id).await?;
2948        self.room_transaction(room_id, |tx| async move {
2949            let result = project_collaborator::Entity::delete_many()
2950                .filter(
2951                    Condition::all()
2952                        .add(project_collaborator::Column::ProjectId.eq(project_id))
2953                        .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2954                        .add(
2955                            project_collaborator::Column::ConnectionServerId
2956                                .eq(connection.owner_id as i32),
2957                        ),
2958                )
2959                .exec(&*tx)
2960                .await?;
2961            if result.rows_affected == 0 {
2962                Err(anyhow!("not a collaborator on this project"))?;
2963            }
2964
2965            let project = project::Entity::find_by_id(project_id)
2966                .one(&*tx)
2967                .await?
2968                .ok_or_else(|| anyhow!("no such project"))?;
2969            let collaborators = project
2970                .find_related(project_collaborator::Entity)
2971                .all(&*tx)
2972                .await?;
2973            let connection_ids = collaborators
2974                .into_iter()
2975                .map(|collaborator| collaborator.connection())
2976                .collect();
2977
2978            follower::Entity::delete_many()
2979                .filter(
2980                    Condition::any()
2981                        .add(
2982                            Condition::all()
2983                                .add(follower::Column::ProjectId.eq(project_id))
2984                                .add(
2985                                    follower::Column::LeaderConnectionServerId
2986                                        .eq(connection.owner_id),
2987                                )
2988                                .add(follower::Column::LeaderConnectionId.eq(connection.id)),
2989                        )
2990                        .add(
2991                            Condition::all()
2992                                .add(follower::Column::ProjectId.eq(project_id))
2993                                .add(
2994                                    follower::Column::FollowerConnectionServerId
2995                                        .eq(connection.owner_id),
2996                                )
2997                                .add(follower::Column::FollowerConnectionId.eq(connection.id)),
2998                        ),
2999                )
3000                .exec(&*tx)
3001                .await?;
3002
3003            let room = self.get_room(project.room_id, &tx).await?;
3004            let left_project = LeftProject {
3005                id: project_id,
3006                host_user_id: project.host_user_id,
3007                host_connection_id: project.host_connection()?,
3008                connection_ids,
3009            };
3010            Ok((room, left_project))
3011        })
3012        .await
3013    }
3014
3015    pub async fn project_collaborators(
3016        &self,
3017        project_id: ProjectId,
3018        connection_id: ConnectionId,
3019    ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
3020        let room_id = self.room_id_for_project(project_id).await?;
3021        self.room_transaction(room_id, |tx| async move {
3022            let collaborators = project_collaborator::Entity::find()
3023                .filter(project_collaborator::Column::ProjectId.eq(project_id))
3024                .all(&*tx)
3025                .await?
3026                .into_iter()
3027                .map(|collaborator| ProjectCollaborator {
3028                    connection_id: collaborator.connection(),
3029                    user_id: collaborator.user_id,
3030                    replica_id: collaborator.replica_id,
3031                    is_host: collaborator.is_host,
3032                })
3033                .collect::<Vec<_>>();
3034
3035            if collaborators
3036                .iter()
3037                .any(|collaborator| collaborator.connection_id == connection_id)
3038            {
3039                Ok(collaborators)
3040            } else {
3041                Err(anyhow!("no such project"))?
3042            }
3043        })
3044        .await
3045    }
3046
3047    pub async fn project_connection_ids(
3048        &self,
3049        project_id: ProjectId,
3050        connection_id: ConnectionId,
3051    ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
3052        let room_id = self.room_id_for_project(project_id).await?;
3053        self.room_transaction(room_id, |tx| async move {
3054            let mut collaborators = project_collaborator::Entity::find()
3055                .filter(project_collaborator::Column::ProjectId.eq(project_id))
3056                .stream(&*tx)
3057                .await?;
3058
3059            let mut connection_ids = HashSet::default();
3060            while let Some(collaborator) = collaborators.next().await {
3061                let collaborator = collaborator?;
3062                connection_ids.insert(collaborator.connection());
3063            }
3064
3065            if connection_ids.contains(&connection_id) {
3066                Ok(connection_ids)
3067            } else {
3068                Err(anyhow!("no such project"))?
3069            }
3070        })
3071        .await
3072    }
3073
3074    async fn project_guest_connection_ids(
3075        &self,
3076        project_id: ProjectId,
3077        tx: &DatabaseTransaction,
3078    ) -> Result<Vec<ConnectionId>> {
3079        let mut collaborators = project_collaborator::Entity::find()
3080            .filter(
3081                project_collaborator::Column::ProjectId
3082                    .eq(project_id)
3083                    .and(project_collaborator::Column::IsHost.eq(false)),
3084            )
3085            .stream(tx)
3086            .await?;
3087
3088        let mut guest_connection_ids = Vec::new();
3089        while let Some(collaborator) = collaborators.next().await {
3090            let collaborator = collaborator?;
3091            guest_connection_ids.push(collaborator.connection());
3092        }
3093        Ok(guest_connection_ids)
3094    }
3095
3096    async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
3097        self.transaction(|tx| async move {
3098            let project = project::Entity::find_by_id(project_id)
3099                .one(&*tx)
3100                .await?
3101                .ok_or_else(|| anyhow!("project {} not found", project_id))?;
3102            Ok(project.room_id)
3103        })
3104        .await
3105    }
3106
3107    // access tokens
3108
3109    pub async fn create_access_token(
3110        &self,
3111        user_id: UserId,
3112        access_token_hash: &str,
3113        max_access_token_count: usize,
3114    ) -> Result<AccessTokenId> {
3115        self.transaction(|tx| async {
3116            let tx = tx;
3117
3118            let token = access_token::ActiveModel {
3119                user_id: ActiveValue::set(user_id),
3120                hash: ActiveValue::set(access_token_hash.into()),
3121                ..Default::default()
3122            }
3123            .insert(&*tx)
3124            .await?;
3125
3126            access_token::Entity::delete_many()
3127                .filter(
3128                    access_token::Column::Id.in_subquery(
3129                        Query::select()
3130                            .column(access_token::Column::Id)
3131                            .from(access_token::Entity)
3132                            .and_where(access_token::Column::UserId.eq(user_id))
3133                            .order_by(access_token::Column::Id, sea_orm::Order::Desc)
3134                            .limit(10000)
3135                            .offset(max_access_token_count as u64)
3136                            .to_owned(),
3137                    ),
3138                )
3139                .exec(&*tx)
3140                .await?;
3141            Ok(token.id)
3142        })
3143        .await
3144    }
3145
3146    pub async fn get_access_token(
3147        &self,
3148        access_token_id: AccessTokenId,
3149    ) -> Result<access_token::Model> {
3150        self.transaction(|tx| async move {
3151            Ok(access_token::Entity::find_by_id(access_token_id)
3152                .one(&*tx)
3153                .await?
3154                .ok_or_else(|| anyhow!("no such access token"))?)
3155        })
3156        .await
3157    }
3158
3159    // channels
3160
3161    pub async fn create_root_channel(
3162        &self,
3163        name: &str,
3164        live_kit_room: &str,
3165        creator_id: UserId,
3166    ) -> Result<ChannelId> {
3167        self.create_channel(name, None, live_kit_room, creator_id)
3168            .await
3169    }
3170
3171    pub async fn create_channel(
3172        &self,
3173        name: &str,
3174        parent: Option<ChannelId>,
3175        live_kit_room: &str,
3176        creator_id: UserId,
3177    ) -> Result<ChannelId> {
3178        let name = Self::sanitize_channel_name(name)?;
3179        self.transaction(move |tx| async move {
3180            if let Some(parent) = parent {
3181                self.check_user_is_channel_admin(parent, creator_id, &*tx)
3182                    .await?;
3183            }
3184
3185            let channel = channel::ActiveModel {
3186                name: ActiveValue::Set(name.to_string()),
3187                ..Default::default()
3188            }
3189            .insert(&*tx)
3190            .await?;
3191
3192            let channel_paths_stmt;
3193            if let Some(parent) = parent {
3194                let sql = r#"
3195                    INSERT INTO channel_paths
3196                    (id_path, channel_id)
3197                    SELECT
3198                        id_path || $1 || '/', $2
3199                    FROM
3200                        channel_paths
3201                    WHERE
3202                        channel_id = $3
3203                "#;
3204                channel_paths_stmt = Statement::from_sql_and_values(
3205                    self.pool.get_database_backend(),
3206                    sql,
3207                    [
3208                        channel.id.to_proto().into(),
3209                        channel.id.to_proto().into(),
3210                        parent.to_proto().into(),
3211                    ],
3212                );
3213                tx.execute(channel_paths_stmt).await?;
3214            } else {
3215                channel_path::Entity::insert(channel_path::ActiveModel {
3216                    channel_id: ActiveValue::Set(channel.id),
3217                    id_path: ActiveValue::Set(format!("/{}/", channel.id)),
3218                })
3219                .exec(&*tx)
3220                .await?;
3221            }
3222
3223            channel_member::ActiveModel {
3224                channel_id: ActiveValue::Set(channel.id),
3225                user_id: ActiveValue::Set(creator_id),
3226                accepted: ActiveValue::Set(true),
3227                admin: ActiveValue::Set(true),
3228                ..Default::default()
3229            }
3230            .insert(&*tx)
3231            .await?;
3232
3233            room::ActiveModel {
3234                channel_id: ActiveValue::Set(Some(channel.id)),
3235                live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
3236                ..Default::default()
3237            }
3238            .insert(&*tx)
3239            .await?;
3240
3241            Ok(channel.id)
3242        })
3243        .await
3244    }
3245
3246    pub async fn remove_channel(
3247        &self,
3248        channel_id: ChannelId,
3249        user_id: UserId,
3250    ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
3251        self.transaction(move |tx| async move {
3252            self.check_user_is_channel_admin(channel_id, user_id, &*tx)
3253                .await?;
3254
3255            // Don't remove descendant channels that have additional parents.
3256            let mut channels_to_remove = self.get_channel_descendants([channel_id], &*tx).await?;
3257            {
3258                let mut channels_to_keep = channel_path::Entity::find()
3259                    .filter(
3260                        channel_path::Column::ChannelId
3261                            .is_in(
3262                                channels_to_remove
3263                                    .keys()
3264                                    .copied()
3265                                    .filter(|&id| id != channel_id),
3266                            )
3267                            .and(
3268                                channel_path::Column::IdPath
3269                                    .not_like(&format!("%/{}/%", channel_id)),
3270                            ),
3271                    )
3272                    .stream(&*tx)
3273                    .await?;
3274                while let Some(row) = channels_to_keep.next().await {
3275                    let row = row?;
3276                    channels_to_remove.remove(&row.channel_id);
3277                }
3278            }
3279
3280            let channel_ancestors = self.get_channel_ancestors(channel_id, &*tx).await?;
3281            let members_to_notify: Vec<UserId> = channel_member::Entity::find()
3282                .filter(channel_member::Column::ChannelId.is_in(channel_ancestors))
3283                .select_only()
3284                .column(channel_member::Column::UserId)
3285                .distinct()
3286                .into_values::<_, QueryUserIds>()
3287                .all(&*tx)
3288                .await?;
3289
3290            channel::Entity::delete_many()
3291                .filter(channel::Column::Id.is_in(channels_to_remove.keys().copied()))
3292                .exec(&*tx)
3293                .await?;
3294
3295            Ok((channels_to_remove.into_keys().collect(), members_to_notify))
3296        })
3297        .await
3298    }
3299
3300    pub async fn invite_channel_member(
3301        &self,
3302        channel_id: ChannelId,
3303        invitee_id: UserId,
3304        inviter_id: UserId,
3305        is_admin: bool,
3306    ) -> Result<()> {
3307        self.transaction(move |tx| async move {
3308            self.check_user_is_channel_admin(channel_id, inviter_id, &*tx)
3309                .await?;
3310
3311            channel_member::ActiveModel {
3312                channel_id: ActiveValue::Set(channel_id),
3313                user_id: ActiveValue::Set(invitee_id),
3314                accepted: ActiveValue::Set(false),
3315                admin: ActiveValue::Set(is_admin),
3316                ..Default::default()
3317            }
3318            .insert(&*tx)
3319            .await?;
3320
3321            Ok(())
3322        })
3323        .await
3324    }
3325
3326    fn sanitize_channel_name(name: &str) -> Result<&str> {
3327        let new_name = name.trim().trim_start_matches('#');
3328        if new_name == "" {
3329            Err(anyhow!("channel name can't be blank"))?;
3330        }
3331        Ok(new_name)
3332    }
3333
3334    pub async fn rename_channel(
3335        &self,
3336        channel_id: ChannelId,
3337        user_id: UserId,
3338        new_name: &str,
3339    ) -> Result<String> {
3340        self.transaction(move |tx| async move {
3341            let new_name = Self::sanitize_channel_name(new_name)?.to_string();
3342
3343            self.check_user_is_channel_admin(channel_id, user_id, &*tx)
3344                .await?;
3345
3346            channel::ActiveModel {
3347                id: ActiveValue::Unchanged(channel_id),
3348                name: ActiveValue::Set(new_name.clone()),
3349                ..Default::default()
3350            }
3351            .update(&*tx)
3352            .await?;
3353
3354            Ok(new_name)
3355        })
3356        .await
3357    }
3358
3359    pub async fn respond_to_channel_invite(
3360        &self,
3361        channel_id: ChannelId,
3362        user_id: UserId,
3363        accept: bool,
3364    ) -> Result<()> {
3365        self.transaction(move |tx| async move {
3366            let rows_affected = if accept {
3367                channel_member::Entity::update_many()
3368                    .set(channel_member::ActiveModel {
3369                        accepted: ActiveValue::Set(accept),
3370                        ..Default::default()
3371                    })
3372                    .filter(
3373                        channel_member::Column::ChannelId
3374                            .eq(channel_id)
3375                            .and(channel_member::Column::UserId.eq(user_id))
3376                            .and(channel_member::Column::Accepted.eq(false)),
3377                    )
3378                    .exec(&*tx)
3379                    .await?
3380                    .rows_affected
3381            } else {
3382                channel_member::ActiveModel {
3383                    channel_id: ActiveValue::Unchanged(channel_id),
3384                    user_id: ActiveValue::Unchanged(user_id),
3385                    ..Default::default()
3386                }
3387                .delete(&*tx)
3388                .await?
3389                .rows_affected
3390            };
3391
3392            if rows_affected == 0 {
3393                Err(anyhow!("no such invitation"))?;
3394            }
3395
3396            Ok(())
3397        })
3398        .await
3399    }
3400
3401    pub async fn remove_channel_member(
3402        &self,
3403        channel_id: ChannelId,
3404        member_id: UserId,
3405        remover_id: UserId,
3406    ) -> Result<()> {
3407        self.transaction(|tx| async move {
3408            self.check_user_is_channel_admin(channel_id, remover_id, &*tx)
3409                .await?;
3410
3411            let result = channel_member::Entity::delete_many()
3412                .filter(
3413                    channel_member::Column::ChannelId
3414                        .eq(channel_id)
3415                        .and(channel_member::Column::UserId.eq(member_id)),
3416                )
3417                .exec(&*tx)
3418                .await?;
3419
3420            if result.rows_affected == 0 {
3421                Err(anyhow!("no such member"))?;
3422            }
3423
3424            Ok(())
3425        })
3426        .await
3427    }
3428
3429    pub async fn get_channel_invites_for_user(&self, user_id: UserId) -> Result<Vec<Channel>> {
3430        self.transaction(|tx| async move {
3431            let channel_invites = channel_member::Entity::find()
3432                .filter(
3433                    channel_member::Column::UserId
3434                        .eq(user_id)
3435                        .and(channel_member::Column::Accepted.eq(false)),
3436                )
3437                .all(&*tx)
3438                .await?;
3439
3440            let channels = channel::Entity::find()
3441                .filter(
3442                    channel::Column::Id.is_in(
3443                        channel_invites
3444                            .into_iter()
3445                            .map(|channel_member| channel_member.channel_id),
3446                    ),
3447                )
3448                .all(&*tx)
3449                .await?;
3450
3451            let channels = channels
3452                .into_iter()
3453                .map(|channel| Channel {
3454                    id: channel.id,
3455                    name: channel.name,
3456                    parent_id: None,
3457                })
3458                .collect();
3459
3460            Ok(channels)
3461        })
3462        .await
3463    }
3464
3465    pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
3466        self.transaction(|tx| async move {
3467            let tx = tx;
3468
3469            let channel_memberships = channel_member::Entity::find()
3470                .filter(
3471                    channel_member::Column::UserId
3472                        .eq(user_id)
3473                        .and(channel_member::Column::Accepted.eq(true)),
3474                )
3475                .all(&*tx)
3476                .await?;
3477
3478            let parents_by_child_id = self
3479                .get_channel_descendants(channel_memberships.iter().map(|m| m.channel_id), &*tx)
3480                .await?;
3481
3482            let channels_with_admin_privileges = channel_memberships
3483                .iter()
3484                .filter_map(|membership| membership.admin.then_some(membership.channel_id))
3485                .collect();
3486
3487            let mut channels = Vec::with_capacity(parents_by_child_id.len());
3488            {
3489                let mut rows = channel::Entity::find()
3490                    .filter(channel::Column::Id.is_in(parents_by_child_id.keys().copied()))
3491                    .stream(&*tx)
3492                    .await?;
3493                while let Some(row) = rows.next().await {
3494                    let row = row?;
3495                    channels.push(Channel {
3496                        id: row.id,
3497                        name: row.name,
3498                        parent_id: parents_by_child_id.get(&row.id).copied().flatten(),
3499                    });
3500                }
3501            }
3502
3503            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
3504            enum QueryUserIdsAndChannelIds {
3505                ChannelId,
3506                UserId,
3507            }
3508
3509            let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
3510            {
3511                let mut rows = room_participant::Entity::find()
3512                    .inner_join(room::Entity)
3513                    .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
3514                    .select_only()
3515                    .column(room::Column::ChannelId)
3516                    .column(room_participant::Column::UserId)
3517                    .into_values::<_, QueryUserIdsAndChannelIds>()
3518                    .stream(&*tx)
3519                    .await?;
3520                while let Some(row) = rows.next().await {
3521                    let row: (ChannelId, UserId) = row?;
3522                    channel_participants.entry(row.0).or_default().push(row.1)
3523                }
3524            }
3525
3526            Ok(ChannelsForUser {
3527                channels,
3528                channel_participants,
3529                channels_with_admin_privileges,
3530            })
3531        })
3532        .await
3533    }
3534
3535    pub async fn get_channel_members(&self, id: ChannelId) -> Result<Vec<UserId>> {
3536        self.transaction(|tx| async move { self.get_channel_members_internal(id, &*tx).await })
3537            .await
3538    }
3539
3540    pub async fn set_channel_member_admin(
3541        &self,
3542        channel_id: ChannelId,
3543        from: UserId,
3544        for_user: UserId,
3545        admin: bool,
3546    ) -> Result<()> {
3547        self.transaction(|tx| async move {
3548            self.check_user_is_channel_admin(channel_id, from, &*tx)
3549                .await?;
3550
3551            let result = channel_member::Entity::update_many()
3552                .filter(
3553                    channel_member::Column::ChannelId
3554                        .eq(channel_id)
3555                        .and(channel_member::Column::UserId.eq(for_user)),
3556                )
3557                .set(channel_member::ActiveModel {
3558                    admin: ActiveValue::set(admin),
3559                    ..Default::default()
3560                })
3561                .exec(&*tx)
3562                .await?;
3563
3564            if result.rows_affected == 0 {
3565                Err(anyhow!("no such member"))?;
3566            }
3567
3568            Ok(())
3569        })
3570        .await
3571    }
3572
3573    pub async fn get_channel_member_details(
3574        &self,
3575        channel_id: ChannelId,
3576        user_id: UserId,
3577    ) -> Result<Vec<proto::ChannelMember>> {
3578        self.transaction(|tx| async move {
3579            self.check_user_is_channel_admin(channel_id, user_id, &*tx)
3580                .await?;
3581
3582            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
3583            enum QueryMemberDetails {
3584                UserId,
3585                Admin,
3586                IsDirectMember,
3587                Accepted,
3588            }
3589
3590            let tx = tx;
3591            let ancestor_ids = self.get_channel_ancestors(channel_id, &*tx).await?;
3592            let mut stream = channel_member::Entity::find()
3593                .distinct()
3594                .filter(channel_member::Column::ChannelId.is_in(ancestor_ids.iter().copied()))
3595                .select_only()
3596                .column(channel_member::Column::UserId)
3597                .column(channel_member::Column::Admin)
3598                .column_as(
3599                    channel_member::Column::ChannelId.eq(channel_id),
3600                    QueryMemberDetails::IsDirectMember,
3601                )
3602                .column(channel_member::Column::Accepted)
3603                .order_by_asc(channel_member::Column::UserId)
3604                .into_values::<_, QueryMemberDetails>()
3605                .stream(&*tx)
3606                .await?;
3607
3608            let mut rows = Vec::<proto::ChannelMember>::new();
3609            while let Some(row) = stream.next().await {
3610                let (user_id, is_admin, is_direct_member, is_invite_accepted): (
3611                    UserId,
3612                    bool,
3613                    bool,
3614                    bool,
3615                ) = row?;
3616                let kind = match (is_direct_member, is_invite_accepted) {
3617                    (true, true) => proto::channel_member::Kind::Member,
3618                    (true, false) => proto::channel_member::Kind::Invitee,
3619                    (false, true) => proto::channel_member::Kind::AncestorMember,
3620                    (false, false) => continue,
3621                };
3622                let user_id = user_id.to_proto();
3623                let kind = kind.into();
3624                if let Some(last_row) = rows.last_mut() {
3625                    if last_row.user_id == user_id {
3626                        if is_direct_member {
3627                            last_row.kind = kind;
3628                            last_row.admin = is_admin;
3629                        }
3630                        continue;
3631                    }
3632                }
3633                rows.push(proto::ChannelMember {
3634                    user_id,
3635                    kind,
3636                    admin: is_admin,
3637                });
3638            }
3639
3640            Ok(rows)
3641        })
3642        .await
3643    }
3644
3645    pub async fn get_channel_members_internal(
3646        &self,
3647        id: ChannelId,
3648        tx: &DatabaseTransaction,
3649    ) -> Result<Vec<UserId>> {
3650        let ancestor_ids = self.get_channel_ancestors(id, tx).await?;
3651        let user_ids = channel_member::Entity::find()
3652            .distinct()
3653            .filter(
3654                channel_member::Column::ChannelId
3655                    .is_in(ancestor_ids.iter().copied())
3656                    .and(channel_member::Column::Accepted.eq(true)),
3657            )
3658            .select_only()
3659            .column(channel_member::Column::UserId)
3660            .into_values::<_, QueryUserIds>()
3661            .all(&*tx)
3662            .await?;
3663        Ok(user_ids)
3664    }
3665
3666    async fn check_user_is_channel_member(
3667        &self,
3668        channel_id: ChannelId,
3669        user_id: UserId,
3670        tx: &DatabaseTransaction,
3671    ) -> Result<()> {
3672        let channel_ids = self.get_channel_ancestors(channel_id, tx).await?;
3673        channel_member::Entity::find()
3674            .filter(
3675                channel_member::Column::ChannelId
3676                    .is_in(channel_ids)
3677                    .and(channel_member::Column::UserId.eq(user_id)),
3678            )
3679            .one(&*tx)
3680            .await?
3681            .ok_or_else(|| anyhow!("user is not a channel member or channel does not exist"))?;
3682        Ok(())
3683    }
3684
3685    async fn check_user_is_channel_admin(
3686        &self,
3687        channel_id: ChannelId,
3688        user_id: UserId,
3689        tx: &DatabaseTransaction,
3690    ) -> Result<()> {
3691        let channel_ids = self.get_channel_ancestors(channel_id, tx).await?;
3692        channel_member::Entity::find()
3693            .filter(
3694                channel_member::Column::ChannelId
3695                    .is_in(channel_ids)
3696                    .and(channel_member::Column::UserId.eq(user_id))
3697                    .and(channel_member::Column::Admin.eq(true)),
3698            )
3699            .one(&*tx)
3700            .await?
3701            .ok_or_else(|| anyhow!("user is not a channel admin or channel does not exist"))?;
3702        Ok(())
3703    }
3704
3705    async fn get_channel_ancestors(
3706        &self,
3707        channel_id: ChannelId,
3708        tx: &DatabaseTransaction,
3709    ) -> Result<Vec<ChannelId>> {
3710        let paths = channel_path::Entity::find()
3711            .filter(channel_path::Column::ChannelId.eq(channel_id))
3712            .all(tx)
3713            .await?;
3714        let mut channel_ids = Vec::new();
3715        for path in paths {
3716            for id in path.id_path.trim_matches('/').split('/') {
3717                if let Ok(id) = id.parse() {
3718                    let id = ChannelId::from_proto(id);
3719                    if let Err(ix) = channel_ids.binary_search(&id) {
3720                        channel_ids.insert(ix, id);
3721                    }
3722                }
3723            }
3724        }
3725        Ok(channel_ids)
3726    }
3727
3728    async fn get_channel_descendants(
3729        &self,
3730        channel_ids: impl IntoIterator<Item = ChannelId>,
3731        tx: &DatabaseTransaction,
3732    ) -> Result<HashMap<ChannelId, Option<ChannelId>>> {
3733        let mut values = String::new();
3734        for id in channel_ids {
3735            if !values.is_empty() {
3736                values.push_str(", ");
3737            }
3738            write!(&mut values, "({})", id).unwrap();
3739        }
3740
3741        if values.is_empty() {
3742            return Ok(HashMap::default());
3743        }
3744
3745        let sql = format!(
3746            r#"
3747            SELECT
3748                descendant_paths.*
3749            FROM
3750                channel_paths parent_paths, channel_paths descendant_paths
3751            WHERE
3752                parent_paths.channel_id IN ({values}) AND
3753                descendant_paths.id_path LIKE (parent_paths.id_path || '%')
3754        "#
3755        );
3756
3757        let stmt = Statement::from_string(self.pool.get_database_backend(), sql);
3758
3759        let mut parents_by_child_id = HashMap::default();
3760        let mut paths = channel_path::Entity::find()
3761            .from_raw_sql(stmt)
3762            .stream(tx)
3763            .await?;
3764
3765        while let Some(path) = paths.next().await {
3766            let path = path?;
3767            let ids = path.id_path.trim_matches('/').split('/');
3768            let mut parent_id = None;
3769            for id in ids {
3770                if let Ok(id) = id.parse() {
3771                    let id = ChannelId::from_proto(id);
3772                    if id == path.channel_id {
3773                        break;
3774                    }
3775                    parent_id = Some(id);
3776                }
3777            }
3778            parents_by_child_id.insert(path.channel_id, parent_id);
3779        }
3780
3781        Ok(parents_by_child_id)
3782    }
3783
3784    /// Returns the channel with the given ID and:
3785    /// - true if the user is a member
3786    /// - false if the user hasn't accepted the invitation yet
3787    pub async fn get_channel(
3788        &self,
3789        channel_id: ChannelId,
3790        user_id: UserId,
3791    ) -> Result<Option<(Channel, bool)>> {
3792        self.transaction(|tx| async move {
3793            let tx = tx;
3794
3795            let channel = channel::Entity::find_by_id(channel_id).one(&*tx).await?;
3796
3797            if let Some(channel) = channel {
3798                if self
3799                    .check_user_is_channel_member(channel_id, user_id, &*tx)
3800                    .await
3801                    .is_err()
3802                {
3803                    return Ok(None);
3804                }
3805
3806                let channel_membership = channel_member::Entity::find()
3807                    .filter(
3808                        channel_member::Column::ChannelId
3809                            .eq(channel_id)
3810                            .and(channel_member::Column::UserId.eq(user_id)),
3811                    )
3812                    .one(&*tx)
3813                    .await?;
3814
3815                let is_accepted = channel_membership
3816                    .map(|membership| membership.accepted)
3817                    .unwrap_or(false);
3818
3819                Ok(Some((
3820                    Channel {
3821                        id: channel.id,
3822                        name: channel.name,
3823                        parent_id: None,
3824                    },
3825                    is_accepted,
3826                )))
3827            } else {
3828                Ok(None)
3829            }
3830        })
3831        .await
3832    }
3833
3834    pub async fn room_id_for_channel(&self, channel_id: ChannelId) -> Result<RoomId> {
3835        self.transaction(|tx| async move {
3836            let tx = tx;
3837            let room = channel::Model {
3838                id: channel_id,
3839                ..Default::default()
3840            }
3841            .find_related(room::Entity)
3842            .one(&*tx)
3843            .await?
3844            .ok_or_else(|| anyhow!("invalid channel"))?;
3845            Ok(room.id)
3846        })
3847        .await
3848    }
3849
3850    async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
3851    where
3852        F: Send + Fn(TransactionHandle) -> Fut,
3853        Fut: Send + Future<Output = Result<T>>,
3854    {
3855        let body = async {
3856            let mut i = 0;
3857            loop {
3858                let (tx, result) = self.with_transaction(&f).await?;
3859                match result {
3860                    Ok(result) => match tx.commit().await.map_err(Into::into) {
3861                        Ok(()) => return Ok(result),
3862                        Err(error) => {
3863                            if !self.retry_on_serialization_error(&error, i).await {
3864                                return Err(error);
3865                            }
3866                        }
3867                    },
3868                    Err(error) => {
3869                        tx.rollback().await?;
3870                        if !self.retry_on_serialization_error(&error, i).await {
3871                            return Err(error);
3872                        }
3873                    }
3874                }
3875                i += 1;
3876            }
3877        };
3878
3879        self.run(body).await
3880    }
3881
3882    async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
3883    where
3884        F: Send + Fn(TransactionHandle) -> Fut,
3885        Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
3886    {
3887        let body = async {
3888            let mut i = 0;
3889            loop {
3890                let (tx, result) = self.with_transaction(&f).await?;
3891                match result {
3892                    Ok(Some((room_id, data))) => {
3893                        let lock = self.rooms.entry(room_id).or_default().clone();
3894                        let _guard = lock.lock_owned().await;
3895                        match tx.commit().await.map_err(Into::into) {
3896                            Ok(()) => {
3897                                return Ok(Some(RoomGuard {
3898                                    data,
3899                                    _guard,
3900                                    _not_send: PhantomData,
3901                                }));
3902                            }
3903                            Err(error) => {
3904                                if !self.retry_on_serialization_error(&error, i).await {
3905                                    return Err(error);
3906                                }
3907                            }
3908                        }
3909                    }
3910                    Ok(None) => match tx.commit().await.map_err(Into::into) {
3911                        Ok(()) => return Ok(None),
3912                        Err(error) => {
3913                            if !self.retry_on_serialization_error(&error, i).await {
3914                                return Err(error);
3915                            }
3916                        }
3917                    },
3918                    Err(error) => {
3919                        tx.rollback().await?;
3920                        if !self.retry_on_serialization_error(&error, i).await {
3921                            return Err(error);
3922                        }
3923                    }
3924                }
3925                i += 1;
3926            }
3927        };
3928
3929        self.run(body).await
3930    }
3931
3932    async fn room_transaction<F, Fut, T>(&self, room_id: RoomId, f: F) -> Result<RoomGuard<T>>
3933    where
3934        F: Send + Fn(TransactionHandle) -> Fut,
3935        Fut: Send + Future<Output = Result<T>>,
3936    {
3937        let body = async {
3938            let mut i = 0;
3939            loop {
3940                let lock = self.rooms.entry(room_id).or_default().clone();
3941                let _guard = lock.lock_owned().await;
3942                let (tx, result) = self.with_transaction(&f).await?;
3943                match result {
3944                    Ok(data) => match tx.commit().await.map_err(Into::into) {
3945                        Ok(()) => {
3946                            return Ok(RoomGuard {
3947                                data,
3948                                _guard,
3949                                _not_send: PhantomData,
3950                            });
3951                        }
3952                        Err(error) => {
3953                            if !self.retry_on_serialization_error(&error, i).await {
3954                                return Err(error);
3955                            }
3956                        }
3957                    },
3958                    Err(error) => {
3959                        tx.rollback().await?;
3960                        if !self.retry_on_serialization_error(&error, i).await {
3961                            return Err(error);
3962                        }
3963                    }
3964                }
3965                i += 1;
3966            }
3967        };
3968
3969        self.run(body).await
3970    }
3971
3972    async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
3973    where
3974        F: Send + Fn(TransactionHandle) -> Fut,
3975        Fut: Send + Future<Output = Result<T>>,
3976    {
3977        let tx = self
3978            .pool
3979            .begin_with_config(Some(IsolationLevel::Serializable), None)
3980            .await?;
3981
3982        let mut tx = Arc::new(Some(tx));
3983        let result = f(TransactionHandle(tx.clone())).await;
3984        let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
3985            return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
3986        };
3987
3988        Ok((tx, result))
3989    }
3990
3991    async fn run<F, T>(&self, future: F) -> Result<T>
3992    where
3993        F: Future<Output = Result<T>>,
3994    {
3995        #[cfg(test)]
3996        {
3997            if let Executor::Deterministic(executor) = &self.executor {
3998                executor.simulate_random_delay().await;
3999            }
4000
4001            self.runtime.as_ref().unwrap().block_on(future)
4002        }
4003
4004        #[cfg(not(test))]
4005        {
4006            future.await
4007        }
4008    }
4009
4010    async fn retry_on_serialization_error(&self, error: &Error, prev_attempt_count: u32) -> bool {
4011        // If the error is due to a failure to serialize concurrent transactions, then retry
4012        // this transaction after a delay. With each subsequent retry, double the delay duration.
4013        // Also vary the delay randomly in order to ensure different database connections retry
4014        // at different times.
4015        if is_serialization_error(error) {
4016            let base_delay = 4_u64 << prev_attempt_count.min(16);
4017            let randomized_delay = base_delay as f32 * self.rng.lock().await.gen_range(0.5..=2.0);
4018            log::info!(
4019                "retrying transaction after serialization error. delay: {} ms.",
4020                randomized_delay
4021            );
4022            self.executor
4023                .sleep(Duration::from_millis(randomized_delay as u64))
4024                .await;
4025            true
4026        } else {
4027            false
4028        }
4029    }
4030}
4031
4032fn is_serialization_error(error: &Error) -> bool {
4033    const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
4034    match error {
4035        Error::Database(
4036            DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
4037            | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
4038        ) if error
4039            .as_database_error()
4040            .and_then(|error| error.code())
4041            .as_deref()
4042            == Some(SERIALIZATION_FAILURE_CODE) =>
4043        {
4044            true
4045        }
4046        _ => false,
4047    }
4048}
4049
4050struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
4051
4052impl Deref for TransactionHandle {
4053    type Target = DatabaseTransaction;
4054
4055    fn deref(&self) -> &Self::Target {
4056        self.0.as_ref().as_ref().unwrap()
4057    }
4058}
4059
4060pub struct RoomGuard<T> {
4061    data: T,
4062    _guard: OwnedMutexGuard<()>,
4063    _not_send: PhantomData<Rc<()>>,
4064}
4065
4066impl<T> Deref for RoomGuard<T> {
4067    type Target = T;
4068
4069    fn deref(&self) -> &T {
4070        &self.data
4071    }
4072}
4073
4074impl<T> DerefMut for RoomGuard<T> {
4075    fn deref_mut(&mut self) -> &mut T {
4076        &mut self.data
4077    }
4078}
4079
4080impl<T> RoomGuard<T> {
4081    pub fn into_inner(self) -> T {
4082        self.data
4083    }
4084}
4085
4086#[derive(Debug, Serialize, Deserialize)]
4087pub struct NewUserParams {
4088    pub github_login: String,
4089    pub github_user_id: i32,
4090    pub invite_count: i32,
4091}
4092
4093#[derive(Debug)]
4094pub struct NewUserResult {
4095    pub user_id: UserId,
4096    pub metrics_id: String,
4097    pub inviting_user_id: Option<UserId>,
4098    pub signup_device_id: Option<String>,
4099}
4100
4101#[derive(FromQueryResult, Debug, PartialEq)]
4102pub struct Channel {
4103    pub id: ChannelId,
4104    pub name: String,
4105    pub parent_id: Option<ChannelId>,
4106}
4107
4108#[derive(Debug, PartialEq)]
4109pub struct ChannelsForUser {
4110    pub channels: Vec<Channel>,
4111    pub channel_participants: HashMap<ChannelId, Vec<UserId>>,
4112    pub channels_with_admin_privileges: HashSet<ChannelId>,
4113}
4114
4115fn random_invite_code() -> String {
4116    nanoid::nanoid!(16)
4117}
4118
4119fn random_email_confirmation_code() -> String {
4120    nanoid::nanoid!(64)
4121}
4122
4123macro_rules! id_type {
4124    ($name:ident) => {
4125        #[derive(
4126            Clone,
4127            Copy,
4128            Debug,
4129            Default,
4130            PartialEq,
4131            Eq,
4132            PartialOrd,
4133            Ord,
4134            Hash,
4135            Serialize,
4136            Deserialize,
4137        )]
4138        #[serde(transparent)]
4139        pub struct $name(pub i32);
4140
4141        impl $name {
4142            #[allow(unused)]
4143            pub const MAX: Self = Self(i32::MAX);
4144
4145            #[allow(unused)]
4146            pub fn from_proto(value: u64) -> Self {
4147                Self(value as i32)
4148            }
4149
4150            #[allow(unused)]
4151            pub fn to_proto(self) -> u64 {
4152                self.0 as u64
4153            }
4154        }
4155
4156        impl std::fmt::Display for $name {
4157            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
4158                self.0.fmt(f)
4159            }
4160        }
4161
4162        impl From<$name> for sea_query::Value {
4163            fn from(value: $name) -> Self {
4164                sea_query::Value::Int(Some(value.0))
4165            }
4166        }
4167
4168        impl sea_orm::TryGetable for $name {
4169            fn try_get(
4170                res: &sea_orm::QueryResult,
4171                pre: &str,
4172                col: &str,
4173            ) -> Result<Self, sea_orm::TryGetError> {
4174                Ok(Self(i32::try_get(res, pre, col)?))
4175            }
4176        }
4177
4178        impl sea_query::ValueType for $name {
4179            fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
4180                match v {
4181                    Value::TinyInt(Some(int)) => {
4182                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4183                    }
4184                    Value::SmallInt(Some(int)) => {
4185                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4186                    }
4187                    Value::Int(Some(int)) => {
4188                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4189                    }
4190                    Value::BigInt(Some(int)) => {
4191                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4192                    }
4193                    Value::TinyUnsigned(Some(int)) => {
4194                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4195                    }
4196                    Value::SmallUnsigned(Some(int)) => {
4197                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4198                    }
4199                    Value::Unsigned(Some(int)) => {
4200                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4201                    }
4202                    Value::BigUnsigned(Some(int)) => {
4203                        Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4204                    }
4205                    _ => Err(sea_query::ValueTypeErr),
4206                }
4207            }
4208
4209            fn type_name() -> String {
4210                stringify!($name).into()
4211            }
4212
4213            fn array_type() -> sea_query::ArrayType {
4214                sea_query::ArrayType::Int
4215            }
4216
4217            fn column_type() -> sea_query::ColumnType {
4218                sea_query::ColumnType::Integer(None)
4219            }
4220        }
4221
4222        impl sea_orm::TryFromU64 for $name {
4223            fn try_from_u64(n: u64) -> Result<Self, DbErr> {
4224                Ok(Self(n.try_into().map_err(|_| {
4225                    DbErr::ConvertFromU64(concat!(
4226                        "error converting ",
4227                        stringify!($name),
4228                        " to u64"
4229                    ))
4230                })?))
4231            }
4232        }
4233
4234        impl sea_query::Nullable for $name {
4235            fn null() -> Value {
4236                Value::Int(None)
4237            }
4238        }
4239    };
4240}
4241
4242id_type!(AccessTokenId);
4243id_type!(ChannelId);
4244id_type!(ChannelMemberId);
4245id_type!(ContactId);
4246id_type!(FollowerId);
4247id_type!(RoomId);
4248id_type!(RoomParticipantId);
4249id_type!(ProjectId);
4250id_type!(ProjectCollaboratorId);
4251id_type!(ReplicaId);
4252id_type!(ServerId);
4253id_type!(SignupId);
4254id_type!(UserId);
4255
4256#[derive(Clone)]
4257pub struct JoinRoom {
4258    pub room: proto::Room,
4259    pub channel_id: Option<ChannelId>,
4260    pub channel_members: Vec<UserId>,
4261}
4262
4263pub struct RejoinedRoom {
4264    pub room: proto::Room,
4265    pub rejoined_projects: Vec<RejoinedProject>,
4266    pub reshared_projects: Vec<ResharedProject>,
4267    pub channel_id: Option<ChannelId>,
4268    pub channel_members: Vec<UserId>,
4269}
4270
4271pub struct ResharedProject {
4272    pub id: ProjectId,
4273    pub old_connection_id: ConnectionId,
4274    pub collaborators: Vec<ProjectCollaborator>,
4275    pub worktrees: Vec<proto::WorktreeMetadata>,
4276}
4277
4278pub struct RejoinedProject {
4279    pub id: ProjectId,
4280    pub old_connection_id: ConnectionId,
4281    pub collaborators: Vec<ProjectCollaborator>,
4282    pub worktrees: Vec<RejoinedWorktree>,
4283    pub language_servers: Vec<proto::LanguageServer>,
4284}
4285
4286#[derive(Debug)]
4287pub struct RejoinedWorktree {
4288    pub id: u64,
4289    pub abs_path: String,
4290    pub root_name: String,
4291    pub visible: bool,
4292    pub updated_entries: Vec<proto::Entry>,
4293    pub removed_entries: Vec<u64>,
4294    pub updated_repositories: Vec<proto::RepositoryEntry>,
4295    pub removed_repositories: Vec<u64>,
4296    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
4297    pub settings_files: Vec<WorktreeSettingsFile>,
4298    pub scan_id: u64,
4299    pub completed_scan_id: u64,
4300}
4301
4302pub struct LeftRoom {
4303    pub room: proto::Room,
4304    pub channel_id: Option<ChannelId>,
4305    pub channel_members: Vec<UserId>,
4306    pub left_projects: HashMap<ProjectId, LeftProject>,
4307    pub canceled_calls_to_user_ids: Vec<UserId>,
4308    pub deleted: bool,
4309}
4310
4311pub struct RefreshedRoom {
4312    pub room: proto::Room,
4313    pub channel_id: Option<ChannelId>,
4314    pub channel_members: Vec<UserId>,
4315    pub stale_participant_user_ids: Vec<UserId>,
4316    pub canceled_calls_to_user_ids: Vec<UserId>,
4317}
4318
4319pub struct Project {
4320    pub collaborators: Vec<ProjectCollaborator>,
4321    pub worktrees: BTreeMap<u64, Worktree>,
4322    pub language_servers: Vec<proto::LanguageServer>,
4323}
4324
4325pub struct ProjectCollaborator {
4326    pub connection_id: ConnectionId,
4327    pub user_id: UserId,
4328    pub replica_id: ReplicaId,
4329    pub is_host: bool,
4330}
4331
4332impl ProjectCollaborator {
4333    pub fn to_proto(&self) -> proto::Collaborator {
4334        proto::Collaborator {
4335            peer_id: Some(self.connection_id.into()),
4336            replica_id: self.replica_id.0 as u32,
4337            user_id: self.user_id.to_proto(),
4338        }
4339    }
4340}
4341
4342#[derive(Debug)]
4343pub struct LeftProject {
4344    pub id: ProjectId,
4345    pub host_user_id: UserId,
4346    pub host_connection_id: ConnectionId,
4347    pub connection_ids: Vec<ConnectionId>,
4348}
4349
4350pub struct Worktree {
4351    pub id: u64,
4352    pub abs_path: String,
4353    pub root_name: String,
4354    pub visible: bool,
4355    pub entries: Vec<proto::Entry>,
4356    pub repository_entries: BTreeMap<u64, proto::RepositoryEntry>,
4357    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
4358    pub settings_files: Vec<WorktreeSettingsFile>,
4359    pub scan_id: u64,
4360    pub completed_scan_id: u64,
4361}
4362
4363#[derive(Debug)]
4364pub struct WorktreeSettingsFile {
4365    pub path: String,
4366    pub content: String,
4367}
4368
4369#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
4370enum QueryUserIds {
4371    UserId,
4372}
4373
4374#[cfg(test)]
4375pub use test::*;
4376
4377#[cfg(test)]
4378mod test {
4379    use super::*;
4380    use gpui::executor::Background;
4381    use parking_lot::Mutex;
4382    use sea_orm::ConnectionTrait;
4383    use sqlx::migrate::MigrateDatabase;
4384    use std::sync::Arc;
4385
4386    pub struct TestDb {
4387        pub db: Option<Arc<Database>>,
4388        pub connection: Option<sqlx::AnyConnection>,
4389    }
4390
4391    impl TestDb {
4392        pub fn sqlite(background: Arc<Background>) -> Self {
4393            let url = format!("sqlite::memory:");
4394            let runtime = tokio::runtime::Builder::new_current_thread()
4395                .enable_io()
4396                .enable_time()
4397                .build()
4398                .unwrap();
4399
4400            let mut db = runtime.block_on(async {
4401                let mut options = ConnectOptions::new(url);
4402                options.max_connections(5);
4403                let db = Database::new(options, Executor::Deterministic(background))
4404                    .await
4405                    .unwrap();
4406                let sql = include_str!(concat!(
4407                    env!("CARGO_MANIFEST_DIR"),
4408                    "/migrations.sqlite/20221109000000_test_schema.sql"
4409                ));
4410                db.pool
4411                    .execute(sea_orm::Statement::from_string(
4412                        db.pool.get_database_backend(),
4413                        sql.into(),
4414                    ))
4415                    .await
4416                    .unwrap();
4417                db
4418            });
4419
4420            db.runtime = Some(runtime);
4421
4422            Self {
4423                db: Some(Arc::new(db)),
4424                connection: None,
4425            }
4426        }
4427
4428        pub fn postgres(background: Arc<Background>) -> Self {
4429            static LOCK: Mutex<()> = Mutex::new(());
4430
4431            let _guard = LOCK.lock();
4432            let mut rng = StdRng::from_entropy();
4433            let url = format!(
4434                "postgres://postgres@localhost/zed-test-{}",
4435                rng.gen::<u128>()
4436            );
4437            let runtime = tokio::runtime::Builder::new_current_thread()
4438                .enable_io()
4439                .enable_time()
4440                .build()
4441                .unwrap();
4442
4443            let mut db = runtime.block_on(async {
4444                sqlx::Postgres::create_database(&url)
4445                    .await
4446                    .expect("failed to create test db");
4447                let mut options = ConnectOptions::new(url);
4448                options
4449                    .max_connections(5)
4450                    .idle_timeout(Duration::from_secs(0));
4451                let db = Database::new(options, Executor::Deterministic(background))
4452                    .await
4453                    .unwrap();
4454                let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
4455                db.migrate(Path::new(migrations_path), false).await.unwrap();
4456                db
4457            });
4458
4459            db.runtime = Some(runtime);
4460
4461            Self {
4462                db: Some(Arc::new(db)),
4463                connection: None,
4464            }
4465        }
4466
4467        pub fn db(&self) -> &Arc<Database> {
4468            self.db.as_ref().unwrap()
4469        }
4470    }
4471
4472    impl Drop for TestDb {
4473        fn drop(&mut self) {
4474            let db = self.db.take().unwrap();
4475            if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
4476                db.runtime.as_ref().unwrap().block_on(async {
4477                    use util::ResultExt;
4478                    let query = "
4479                        SELECT pg_terminate_backend(pg_stat_activity.pid)
4480                        FROM pg_stat_activity
4481                        WHERE
4482                            pg_stat_activity.datname = current_database() AND
4483                            pid <> pg_backend_pid();
4484                    ";
4485                    db.pool
4486                        .execute(sea_orm::Statement::from_string(
4487                            db.pool.get_database_backend(),
4488                            query.into(),
4489                        ))
4490                        .await
4491                        .log_err();
4492                    sqlx::Postgres::drop_database(db.options.get_url())
4493                        .await
4494                        .log_err();
4495                })
4496            }
4497        }
4498    }
4499}