db.rs

   1use std::{ops::Range, time::Duration};
   2
   3use crate::{Error, Result};
   4use anyhow::{anyhow, Context};
   5use async_trait::async_trait;
   6use axum::http::StatusCode;
   7use collections::HashMap;
   8use futures::StreamExt;
   9use nanoid::nanoid;
  10use serde::Serialize;
  11pub use sqlx::postgres::PgPoolOptions as DbOptions;
  12use sqlx::{types::Uuid, FromRow, QueryBuilder, Row};
  13use time::OffsetDateTime;
  14
  15#[async_trait]
  16pub trait Db: Send + Sync {
  17    async fn create_user(
  18        &self,
  19        github_login: &str,
  20        email_address: Option<&str>,
  21        admin: bool,
  22    ) -> Result<UserId>;
  23    async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>>;
  24    async fn create_users(&self, users: Vec<(String, String, usize)>) -> Result<Vec<UserId>>;
  25    async fn fuzzy_search_users(&self, query: &str, limit: u32) -> Result<Vec<User>>;
  26    async fn get_user_by_id(&self, id: UserId) -> Result<Option<User>>;
  27    async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<User>>;
  28    async fn get_users_with_no_invites(&self, invited_by_another_user: bool) -> Result<Vec<User>>;
  29    async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>>;
  30    async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()>;
  31    async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()>;
  32    async fn destroy_user(&self, id: UserId) -> Result<()>;
  33
  34    async fn set_invite_count(&self, id: UserId, count: u32) -> Result<()>;
  35    async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, u32)>>;
  36    async fn get_user_for_invite_code(&self, code: &str) -> Result<User>;
  37    async fn redeem_invite_code(
  38        &self,
  39        code: &str,
  40        login: &str,
  41        email_address: Option<&str>,
  42    ) -> Result<UserId>;
  43
  44    /// Registers a new project for the given user.
  45    async fn register_project(&self, host_user_id: UserId) -> Result<ProjectId>;
  46
  47    /// Unregisters a project for the given project id.
  48    async fn unregister_project(&self, project_id: ProjectId) -> Result<()>;
  49
  50    /// Update file counts by extension for the given project and worktree.
  51    async fn update_worktree_extensions(
  52        &self,
  53        project_id: ProjectId,
  54        worktree_id: u64,
  55        extensions: HashMap<String, usize>,
  56    ) -> Result<()>;
  57
  58    /// Get the file counts on the given project keyed by their worktree and extension.
  59    async fn get_project_extensions(
  60        &self,
  61        project_id: ProjectId,
  62    ) -> Result<HashMap<u64, HashMap<String, usize>>>;
  63
  64    /// Record which users have been active in which projects during
  65    /// a given period of time.
  66    async fn record_project_activity(
  67        &self,
  68        time_period: Range<OffsetDateTime>,
  69        active_projects: &[(UserId, ProjectId)],
  70    ) -> Result<()>;
  71
  72    /// Get the users that have been most active during the given time period,
  73    /// along with the amount of time they have been active in each project.
  74    async fn summarize_project_activity(
  75        &self,
  76        time_period: Range<OffsetDateTime>,
  77        max_user_count: usize,
  78    ) -> Result<Vec<UserActivitySummary>>;
  79
  80    async fn get_contacts(&self, id: UserId) -> Result<Vec<Contact>>;
  81    async fn has_contact(&self, user_id_a: UserId, user_id_b: UserId) -> Result<bool>;
  82    async fn send_contact_request(&self, requester_id: UserId, responder_id: UserId) -> Result<()>;
  83    async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<()>;
  84    async fn dismiss_contact_notification(
  85        &self,
  86        responder_id: UserId,
  87        requester_id: UserId,
  88    ) -> Result<()>;
  89    async fn respond_to_contact_request(
  90        &self,
  91        responder_id: UserId,
  92        requester_id: UserId,
  93        accept: bool,
  94    ) -> Result<()>;
  95
  96    async fn create_access_token_hash(
  97        &self,
  98        user_id: UserId,
  99        access_token_hash: &str,
 100        max_access_token_count: usize,
 101    ) -> Result<()>;
 102    async fn get_access_token_hashes(&self, user_id: UserId) -> Result<Vec<String>>;
 103    #[cfg(any(test, feature = "seed-support"))]
 104
 105    async fn find_org_by_slug(&self, slug: &str) -> Result<Option<Org>>;
 106    #[cfg(any(test, feature = "seed-support"))]
 107    async fn create_org(&self, name: &str, slug: &str) -> Result<OrgId>;
 108    #[cfg(any(test, feature = "seed-support"))]
 109    async fn add_org_member(&self, org_id: OrgId, user_id: UserId, is_admin: bool) -> Result<()>;
 110    #[cfg(any(test, feature = "seed-support"))]
 111    async fn create_org_channel(&self, org_id: OrgId, name: &str) -> Result<ChannelId>;
 112    #[cfg(any(test, feature = "seed-support"))]
 113
 114    async fn get_org_channels(&self, org_id: OrgId) -> Result<Vec<Channel>>;
 115    async fn get_accessible_channels(&self, user_id: UserId) -> Result<Vec<Channel>>;
 116    async fn can_user_access_channel(&self, user_id: UserId, channel_id: ChannelId)
 117        -> Result<bool>;
 118    #[cfg(any(test, feature = "seed-support"))]
 119    async fn add_channel_member(
 120        &self,
 121        channel_id: ChannelId,
 122        user_id: UserId,
 123        is_admin: bool,
 124    ) -> Result<()>;
 125    async fn create_channel_message(
 126        &self,
 127        channel_id: ChannelId,
 128        sender_id: UserId,
 129        body: &str,
 130        timestamp: OffsetDateTime,
 131        nonce: u128,
 132    ) -> Result<MessageId>;
 133    async fn get_channel_messages(
 134        &self,
 135        channel_id: ChannelId,
 136        count: usize,
 137        before_id: Option<MessageId>,
 138    ) -> Result<Vec<ChannelMessage>>;
 139    #[cfg(test)]
 140    async fn teardown(&self, url: &str);
 141    #[cfg(test)]
 142    fn as_fake<'a>(&'a self) -> Option<&'a tests::FakeDb>;
 143}
 144
 145pub struct PostgresDb {
 146    pool: sqlx::PgPool,
 147}
 148
 149impl PostgresDb {
 150    pub async fn new(url: &str, max_connections: u32) -> Result<Self> {
 151        let pool = DbOptions::new()
 152            .max_connections(max_connections)
 153            .connect(&url)
 154            .await
 155            .context("failed to connect to postgres database")?;
 156        Ok(Self { pool })
 157    }
 158}
 159
 160#[async_trait]
 161impl Db for PostgresDb {
 162    // users
 163
 164    async fn create_user(
 165        &self,
 166        github_login: &str,
 167        email_address: Option<&str>,
 168        admin: bool,
 169    ) -> Result<UserId> {
 170        let query = "
 171            INSERT INTO users (github_login, email_address, admin)
 172            VALUES ($1, $2, $3)
 173            ON CONFLICT (github_login) DO UPDATE SET github_login = excluded.github_login
 174            RETURNING id
 175        ";
 176        Ok(sqlx::query_scalar(query)
 177            .bind(github_login)
 178            .bind(email_address)
 179            .bind(admin)
 180            .fetch_one(&self.pool)
 181            .await
 182            .map(UserId)?)
 183    }
 184
 185    async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
 186        let query = "SELECT * FROM users ORDER BY github_login ASC LIMIT $1 OFFSET $2";
 187        Ok(sqlx::query_as(query)
 188            .bind(limit as i32)
 189            .bind((page * limit) as i32)
 190            .fetch_all(&self.pool)
 191            .await?)
 192    }
 193
 194    async fn create_users(&self, users: Vec<(String, String, usize)>) -> Result<Vec<UserId>> {
 195        let mut query = QueryBuilder::new(
 196            "INSERT INTO users (github_login, email_address, admin, invite_code, invite_count)",
 197        );
 198        query.push_values(
 199            users,
 200            |mut query, (github_login, email_address, invite_count)| {
 201                query
 202                    .push_bind(github_login)
 203                    .push_bind(email_address)
 204                    .push_bind(false)
 205                    .push_bind(nanoid!(16))
 206                    .push_bind(invite_count as i32);
 207            },
 208        );
 209        query.push(
 210            "
 211            ON CONFLICT (github_login) DO UPDATE SET
 212                github_login = excluded.github_login,
 213                invite_count = excluded.invite_count,
 214                invite_code = CASE WHEN users.invite_code IS NULL
 215                                   THEN excluded.invite_code
 216                                   ELSE users.invite_code
 217                              END
 218            RETURNING id
 219            ",
 220        );
 221
 222        let rows = query.build().fetch_all(&self.pool).await?;
 223        Ok(rows
 224            .into_iter()
 225            .filter_map(|row| row.try_get::<UserId, _>(0).ok())
 226            .collect())
 227    }
 228
 229    async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
 230        let like_string = fuzzy_like_string(name_query);
 231        let query = "
 232            SELECT users.*
 233            FROM users
 234            WHERE github_login ILIKE $1
 235            ORDER BY github_login <-> $2
 236            LIMIT $3
 237        ";
 238        Ok(sqlx::query_as(query)
 239            .bind(like_string)
 240            .bind(name_query)
 241            .bind(limit as i32)
 242            .fetch_all(&self.pool)
 243            .await?)
 244    }
 245
 246    async fn get_user_by_id(&self, id: UserId) -> Result<Option<User>> {
 247        let users = self.get_users_by_ids(vec![id]).await?;
 248        Ok(users.into_iter().next())
 249    }
 250
 251    async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<User>> {
 252        let ids = ids.into_iter().map(|id| id.0).collect::<Vec<_>>();
 253        let query = "
 254            SELECT users.*
 255            FROM users
 256            WHERE users.id = ANY ($1)
 257        ";
 258        Ok(sqlx::query_as(query)
 259            .bind(&ids)
 260            .fetch_all(&self.pool)
 261            .await?)
 262    }
 263
 264    async fn get_users_with_no_invites(&self, invited_by_another_user: bool) -> Result<Vec<User>> {
 265        let query = format!(
 266            "
 267            SELECT users.*
 268            FROM users
 269            WHERE invite_count = 0
 270            AND inviter_id IS{} NULL
 271            ",
 272            if invited_by_another_user { " NOT" } else { "" }
 273        );
 274
 275        Ok(sqlx::query_as(&query).fetch_all(&self.pool).await?)
 276    }
 277
 278    async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>> {
 279        let query = "SELECT * FROM users WHERE github_login = $1 LIMIT 1";
 280        Ok(sqlx::query_as(query)
 281            .bind(github_login)
 282            .fetch_optional(&self.pool)
 283            .await?)
 284    }
 285
 286    async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
 287        let query = "UPDATE users SET admin = $1 WHERE id = $2";
 288        Ok(sqlx::query(query)
 289            .bind(is_admin)
 290            .bind(id.0)
 291            .execute(&self.pool)
 292            .await
 293            .map(drop)?)
 294    }
 295
 296    async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
 297        let query = "UPDATE users SET connected_once = $1 WHERE id = $2";
 298        Ok(sqlx::query(query)
 299            .bind(connected_once)
 300            .bind(id.0)
 301            .execute(&self.pool)
 302            .await
 303            .map(drop)?)
 304    }
 305
 306    async fn destroy_user(&self, id: UserId) -> Result<()> {
 307        let query = "DELETE FROM access_tokens WHERE user_id = $1;";
 308        sqlx::query(query)
 309            .bind(id.0)
 310            .execute(&self.pool)
 311            .await
 312            .map(drop)?;
 313        let query = "DELETE FROM users WHERE id = $1;";
 314        Ok(sqlx::query(query)
 315            .bind(id.0)
 316            .execute(&self.pool)
 317            .await
 318            .map(drop)?)
 319    }
 320
 321    // invite codes
 322
 323    async fn set_invite_count(&self, id: UserId, count: u32) -> Result<()> {
 324        let mut tx = self.pool.begin().await?;
 325        if count > 0 {
 326            sqlx::query(
 327                "
 328                UPDATE users
 329                SET invite_code = $1
 330                WHERE id = $2 AND invite_code IS NULL
 331            ",
 332            )
 333            .bind(nanoid!(16))
 334            .bind(id)
 335            .execute(&mut tx)
 336            .await?;
 337        }
 338
 339        sqlx::query(
 340            "
 341            UPDATE users
 342            SET invite_count = $1
 343            WHERE id = $2
 344            ",
 345        )
 346        .bind(count as i32)
 347        .bind(id)
 348        .execute(&mut tx)
 349        .await?;
 350        tx.commit().await?;
 351        Ok(())
 352    }
 353
 354    async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, u32)>> {
 355        let result: Option<(String, i32)> = sqlx::query_as(
 356            "
 357                SELECT invite_code, invite_count
 358                FROM users
 359                WHERE id = $1 AND invite_code IS NOT NULL 
 360            ",
 361        )
 362        .bind(id)
 363        .fetch_optional(&self.pool)
 364        .await?;
 365        if let Some((code, count)) = result {
 366            Ok(Some((code, count.try_into().map_err(anyhow::Error::new)?)))
 367        } else {
 368            Ok(None)
 369        }
 370    }
 371
 372    async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
 373        sqlx::query_as(
 374            "
 375                SELECT *
 376                FROM users
 377                WHERE invite_code = $1
 378            ",
 379        )
 380        .bind(code)
 381        .fetch_optional(&self.pool)
 382        .await?
 383        .ok_or_else(|| {
 384            Error::Http(
 385                StatusCode::NOT_FOUND,
 386                "that invite code does not exist".to_string(),
 387            )
 388        })
 389    }
 390
 391    async fn redeem_invite_code(
 392        &self,
 393        code: &str,
 394        login: &str,
 395        email_address: Option<&str>,
 396    ) -> Result<UserId> {
 397        let mut tx = self.pool.begin().await?;
 398
 399        let inviter_id: Option<UserId> = sqlx::query_scalar(
 400            "
 401                UPDATE users
 402                SET invite_count = invite_count - 1
 403                WHERE
 404                    invite_code = $1 AND
 405                    invite_count > 0
 406                RETURNING id
 407            ",
 408        )
 409        .bind(code)
 410        .fetch_optional(&mut tx)
 411        .await?;
 412
 413        let inviter_id = match inviter_id {
 414            Some(inviter_id) => inviter_id,
 415            None => {
 416                if sqlx::query_scalar::<_, i32>("SELECT 1 FROM users WHERE invite_code = $1")
 417                    .bind(code)
 418                    .fetch_optional(&mut tx)
 419                    .await?
 420                    .is_some()
 421                {
 422                    Err(Error::Http(
 423                        StatusCode::UNAUTHORIZED,
 424                        "no invites remaining".to_string(),
 425                    ))?
 426                } else {
 427                    Err(Error::Http(
 428                        StatusCode::NOT_FOUND,
 429                        "invite code not found".to_string(),
 430                    ))?
 431                }
 432            }
 433        };
 434
 435        let invitee_id = sqlx::query_scalar(
 436            "
 437                INSERT INTO users
 438                    (github_login, email_address, admin, inviter_id)
 439                VALUES
 440                    ($1, $2, 'f', $3)
 441                RETURNING id
 442            ",
 443        )
 444        .bind(login)
 445        .bind(email_address)
 446        .bind(inviter_id)
 447        .fetch_one(&mut tx)
 448        .await
 449        .map(UserId)?;
 450
 451        sqlx::query(
 452            "
 453                INSERT INTO contacts
 454                    (user_id_a, user_id_b, a_to_b, should_notify, accepted)
 455                VALUES
 456                    ($1, $2, 't', 't', 't')
 457            ",
 458        )
 459        .bind(inviter_id)
 460        .bind(invitee_id)
 461        .execute(&mut tx)
 462        .await?;
 463
 464        tx.commit().await?;
 465        Ok(invitee_id)
 466    }
 467
 468    // projects
 469
 470    async fn register_project(&self, host_user_id: UserId) -> Result<ProjectId> {
 471        Ok(sqlx::query_scalar(
 472            "
 473            INSERT INTO projects(host_user_id)
 474            VALUES ($1)
 475            RETURNING id
 476            ",
 477        )
 478        .bind(host_user_id)
 479        .fetch_one(&self.pool)
 480        .await
 481        .map(ProjectId)?)
 482    }
 483
 484    async fn unregister_project(&self, project_id: ProjectId) -> Result<()> {
 485        sqlx::query(
 486            "
 487            UPDATE projects
 488            SET unregistered = 't'
 489            WHERE id = $1
 490            ",
 491        )
 492        .bind(project_id)
 493        .execute(&self.pool)
 494        .await?;
 495        Ok(())
 496    }
 497
 498    async fn update_worktree_extensions(
 499        &self,
 500        project_id: ProjectId,
 501        worktree_id: u64,
 502        extensions: HashMap<String, usize>,
 503    ) -> Result<()> {
 504        if extensions.is_empty() {
 505            return Ok(());
 506        }
 507
 508        let mut query = QueryBuilder::new(
 509            "INSERT INTO worktree_extensions (project_id, worktree_id, extension, count)",
 510        );
 511        query.push_values(extensions, |mut query, (extension, count)| {
 512            query
 513                .push_bind(project_id)
 514                .push_bind(worktree_id as i32)
 515                .push_bind(extension)
 516                .push_bind(count as i32);
 517        });
 518        query.push(
 519            "
 520            ON CONFLICT (project_id, worktree_id, extension) DO UPDATE SET
 521            count = excluded.count
 522            ",
 523        );
 524        query.build().execute(&self.pool).await?;
 525
 526        Ok(())
 527    }
 528
 529    async fn get_project_extensions(
 530        &self,
 531        project_id: ProjectId,
 532    ) -> Result<HashMap<u64, HashMap<String, usize>>> {
 533        #[derive(Clone, Debug, Default, FromRow, Serialize, PartialEq)]
 534        struct WorktreeExtension {
 535            worktree_id: i32,
 536            extension: String,
 537            count: i32,
 538        }
 539
 540        let query = "
 541            SELECT worktree_id, extension, count
 542            FROM worktree_extensions
 543            WHERE project_id = $1
 544        ";
 545        let counts = sqlx::query_as::<_, WorktreeExtension>(query)
 546            .bind(&project_id)
 547            .fetch_all(&self.pool)
 548            .await?;
 549
 550        let mut extension_counts = HashMap::default();
 551        for count in counts {
 552            extension_counts
 553                .entry(count.worktree_id as u64)
 554                .or_insert(HashMap::default())
 555                .insert(count.extension, count.count as usize);
 556        }
 557        Ok(extension_counts)
 558    }
 559
 560    async fn record_project_activity(
 561        &self,
 562        time_period: Range<OffsetDateTime>,
 563        projects: &[(UserId, ProjectId)],
 564    ) -> Result<()> {
 565        let query = "
 566            INSERT INTO project_activity_periods
 567            (ended_at, duration_millis, user_id, project_id)
 568            VALUES
 569            ($1, $2, $3, $4);
 570        ";
 571
 572        let mut tx = self.pool.begin().await?;
 573        let duration_millis =
 574            ((time_period.end - time_period.start).as_seconds_f64() * 1000.0) as i32;
 575        for (user_id, project_id) in projects {
 576            sqlx::query(query)
 577                .bind(time_period.end)
 578                .bind(duration_millis)
 579                .bind(user_id)
 580                .bind(project_id)
 581                .execute(&mut tx)
 582                .await?;
 583        }
 584        tx.commit().await?;
 585
 586        Ok(())
 587    }
 588
 589    async fn summarize_project_activity(
 590        &self,
 591        time_period: Range<OffsetDateTime>,
 592        max_user_count: usize,
 593    ) -> Result<Vec<UserActivitySummary>> {
 594        let query = "
 595            WITH
 596                project_durations AS (
 597                    SELECT user_id, project_id, SUM(duration_millis) AS project_duration
 598                    FROM project_activity_periods
 599                    WHERE $1 <= ended_at AND ended_at <= $2
 600                    GROUP BY user_id, project_id
 601                ),
 602                user_durations AS (
 603                    SELECT user_id, SUM(project_duration) as total_duration
 604                    FROM project_durations
 605                    GROUP BY user_id
 606                    ORDER BY total_duration DESC
 607                    LIMIT $3
 608                )
 609            SELECT user_durations.user_id, users.github_login, project_id, project_duration
 610            FROM user_durations, project_durations, users
 611            WHERE
 612                user_durations.user_id = project_durations.user_id AND
 613                user_durations.user_id = users.id
 614            ORDER BY user_id ASC, project_duration DESC
 615        ";
 616
 617        let mut rows = sqlx::query_as::<_, (UserId, String, ProjectId, i64)>(query)
 618            .bind(time_period.start)
 619            .bind(time_period.end)
 620            .bind(max_user_count as i32)
 621            .fetch(&self.pool);
 622
 623        let mut result = Vec::<UserActivitySummary>::new();
 624        while let Some(row) = rows.next().await {
 625            let (user_id, github_login, project_id, duration_millis) = row?;
 626            let project_id = project_id;
 627            let duration = Duration::from_millis(duration_millis as u64);
 628            if let Some(last_summary) = result.last_mut() {
 629                if last_summary.id == user_id {
 630                    last_summary.project_activity.push((project_id, duration));
 631                    continue;
 632                }
 633            }
 634            result.push(UserActivitySummary {
 635                id: user_id,
 636                project_activity: vec![(project_id, duration)],
 637                github_login,
 638            });
 639        }
 640
 641        Ok(result)
 642    }
 643
 644    // contacts
 645
 646    async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
 647        let query = "
 648            SELECT user_id_a, user_id_b, a_to_b, accepted, should_notify
 649            FROM contacts
 650            WHERE user_id_a = $1 OR user_id_b = $1;
 651        ";
 652
 653        let mut rows = sqlx::query_as::<_, (UserId, UserId, bool, bool, bool)>(query)
 654            .bind(user_id)
 655            .fetch(&self.pool);
 656
 657        let mut contacts = vec![Contact::Accepted {
 658            user_id,
 659            should_notify: false,
 660        }];
 661        while let Some(row) = rows.next().await {
 662            let (user_id_a, user_id_b, a_to_b, accepted, should_notify) = row?;
 663
 664            if user_id_a == user_id {
 665                if accepted {
 666                    contacts.push(Contact::Accepted {
 667                        user_id: user_id_b,
 668                        should_notify: should_notify && a_to_b,
 669                    });
 670                } else if a_to_b {
 671                    contacts.push(Contact::Outgoing { user_id: user_id_b })
 672                } else {
 673                    contacts.push(Contact::Incoming {
 674                        user_id: user_id_b,
 675                        should_notify,
 676                    });
 677                }
 678            } else {
 679                if accepted {
 680                    contacts.push(Contact::Accepted {
 681                        user_id: user_id_a,
 682                        should_notify: should_notify && !a_to_b,
 683                    });
 684                } else if a_to_b {
 685                    contacts.push(Contact::Incoming {
 686                        user_id: user_id_a,
 687                        should_notify,
 688                    });
 689                } else {
 690                    contacts.push(Contact::Outgoing { user_id: user_id_a });
 691                }
 692            }
 693        }
 694
 695        contacts.sort_unstable_by_key(|contact| contact.user_id());
 696
 697        Ok(contacts)
 698    }
 699
 700    async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
 701        let (id_a, id_b) = if user_id_1 < user_id_2 {
 702            (user_id_1, user_id_2)
 703        } else {
 704            (user_id_2, user_id_1)
 705        };
 706
 707        let query = "
 708            SELECT 1 FROM contacts
 709            WHERE user_id_a = $1 AND user_id_b = $2 AND accepted = 't'
 710            LIMIT 1
 711        ";
 712        Ok(sqlx::query_scalar::<_, i32>(query)
 713            .bind(id_a.0)
 714            .bind(id_b.0)
 715            .fetch_optional(&self.pool)
 716            .await?
 717            .is_some())
 718    }
 719
 720    async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
 721        let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
 722            (sender_id, receiver_id, true)
 723        } else {
 724            (receiver_id, sender_id, false)
 725        };
 726        let query = "
 727            INSERT into contacts (user_id_a, user_id_b, a_to_b, accepted, should_notify)
 728            VALUES ($1, $2, $3, 'f', 't')
 729            ON CONFLICT (user_id_a, user_id_b) DO UPDATE
 730            SET
 731                accepted = 't',
 732                should_notify = 'f'
 733            WHERE
 734                NOT contacts.accepted AND
 735                ((contacts.a_to_b = excluded.a_to_b AND contacts.user_id_a = excluded.user_id_b) OR
 736                (contacts.a_to_b != excluded.a_to_b AND contacts.user_id_a = excluded.user_id_a));
 737        ";
 738        let result = sqlx::query(query)
 739            .bind(id_a.0)
 740            .bind(id_b.0)
 741            .bind(a_to_b)
 742            .execute(&self.pool)
 743            .await?;
 744
 745        if result.rows_affected() == 1 {
 746            Ok(())
 747        } else {
 748            Err(anyhow!("contact already requested"))?
 749        }
 750    }
 751
 752    async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<()> {
 753        let (id_a, id_b) = if responder_id < requester_id {
 754            (responder_id, requester_id)
 755        } else {
 756            (requester_id, responder_id)
 757        };
 758        let query = "
 759            DELETE FROM contacts
 760            WHERE user_id_a = $1 AND user_id_b = $2;
 761        ";
 762        let result = sqlx::query(query)
 763            .bind(id_a.0)
 764            .bind(id_b.0)
 765            .execute(&self.pool)
 766            .await?;
 767
 768        if result.rows_affected() == 1 {
 769            Ok(())
 770        } else {
 771            Err(anyhow!("no such contact"))?
 772        }
 773    }
 774
 775    async fn dismiss_contact_notification(
 776        &self,
 777        user_id: UserId,
 778        contact_user_id: UserId,
 779    ) -> Result<()> {
 780        let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
 781            (user_id, contact_user_id, true)
 782        } else {
 783            (contact_user_id, user_id, false)
 784        };
 785
 786        let query = "
 787            UPDATE contacts
 788            SET should_notify = 'f'
 789            WHERE
 790                user_id_a = $1 AND user_id_b = $2 AND
 791                (
 792                    (a_to_b = $3 AND accepted) OR
 793                    (a_to_b != $3 AND NOT accepted)
 794                );
 795        ";
 796
 797        let result = sqlx::query(query)
 798            .bind(id_a.0)
 799            .bind(id_b.0)
 800            .bind(a_to_b)
 801            .execute(&self.pool)
 802            .await?;
 803
 804        if result.rows_affected() == 0 {
 805            Err(anyhow!("no such contact request"))?;
 806        }
 807
 808        Ok(())
 809    }
 810
 811    async fn respond_to_contact_request(
 812        &self,
 813        responder_id: UserId,
 814        requester_id: UserId,
 815        accept: bool,
 816    ) -> Result<()> {
 817        let (id_a, id_b, a_to_b) = if responder_id < requester_id {
 818            (responder_id, requester_id, false)
 819        } else {
 820            (requester_id, responder_id, true)
 821        };
 822        let result = if accept {
 823            let query = "
 824                UPDATE contacts
 825                SET accepted = 't', should_notify = 't'
 826                WHERE user_id_a = $1 AND user_id_b = $2 AND a_to_b = $3;
 827            ";
 828            sqlx::query(query)
 829                .bind(id_a.0)
 830                .bind(id_b.0)
 831                .bind(a_to_b)
 832                .execute(&self.pool)
 833                .await?
 834        } else {
 835            let query = "
 836                DELETE FROM contacts
 837                WHERE user_id_a = $1 AND user_id_b = $2 AND a_to_b = $3 AND NOT accepted;
 838            ";
 839            sqlx::query(query)
 840                .bind(id_a.0)
 841                .bind(id_b.0)
 842                .bind(a_to_b)
 843                .execute(&self.pool)
 844                .await?
 845        };
 846        if result.rows_affected() == 1 {
 847            Ok(())
 848        } else {
 849            Err(anyhow!("no such contact request"))?
 850        }
 851    }
 852
 853    // access tokens
 854
 855    async fn create_access_token_hash(
 856        &self,
 857        user_id: UserId,
 858        access_token_hash: &str,
 859        max_access_token_count: usize,
 860    ) -> Result<()> {
 861        let insert_query = "
 862            INSERT INTO access_tokens (user_id, hash)
 863            VALUES ($1, $2);
 864        ";
 865        let cleanup_query = "
 866            DELETE FROM access_tokens
 867            WHERE id IN (
 868                SELECT id from access_tokens
 869                WHERE user_id = $1
 870                ORDER BY id DESC
 871                OFFSET $3
 872            )
 873        ";
 874
 875        let mut tx = self.pool.begin().await?;
 876        sqlx::query(insert_query)
 877            .bind(user_id.0)
 878            .bind(access_token_hash)
 879            .execute(&mut tx)
 880            .await?;
 881        sqlx::query(cleanup_query)
 882            .bind(user_id.0)
 883            .bind(access_token_hash)
 884            .bind(max_access_token_count as i32)
 885            .execute(&mut tx)
 886            .await?;
 887        Ok(tx.commit().await?)
 888    }
 889
 890    async fn get_access_token_hashes(&self, user_id: UserId) -> Result<Vec<String>> {
 891        let query = "
 892            SELECT hash
 893            FROM access_tokens
 894            WHERE user_id = $1
 895            ORDER BY id DESC
 896        ";
 897        Ok(sqlx::query_scalar(query)
 898            .bind(user_id.0)
 899            .fetch_all(&self.pool)
 900            .await?)
 901    }
 902
 903    // orgs
 904
 905    #[allow(unused)] // Help rust-analyzer
 906    #[cfg(any(test, feature = "seed-support"))]
 907    async fn find_org_by_slug(&self, slug: &str) -> Result<Option<Org>> {
 908        let query = "
 909            SELECT *
 910            FROM orgs
 911            WHERE slug = $1
 912        ";
 913        Ok(sqlx::query_as(query)
 914            .bind(slug)
 915            .fetch_optional(&self.pool)
 916            .await?)
 917    }
 918
 919    #[cfg(any(test, feature = "seed-support"))]
 920    async fn create_org(&self, name: &str, slug: &str) -> Result<OrgId> {
 921        let query = "
 922            INSERT INTO orgs (name, slug)
 923            VALUES ($1, $2)
 924            RETURNING id
 925        ";
 926        Ok(sqlx::query_scalar(query)
 927            .bind(name)
 928            .bind(slug)
 929            .fetch_one(&self.pool)
 930            .await
 931            .map(OrgId)?)
 932    }
 933
 934    #[cfg(any(test, feature = "seed-support"))]
 935    async fn add_org_member(&self, org_id: OrgId, user_id: UserId, is_admin: bool) -> Result<()> {
 936        let query = "
 937            INSERT INTO org_memberships (org_id, user_id, admin)
 938            VALUES ($1, $2, $3)
 939            ON CONFLICT DO NOTHING
 940        ";
 941        Ok(sqlx::query(query)
 942            .bind(org_id.0)
 943            .bind(user_id.0)
 944            .bind(is_admin)
 945            .execute(&self.pool)
 946            .await
 947            .map(drop)?)
 948    }
 949
 950    // channels
 951
 952    #[cfg(any(test, feature = "seed-support"))]
 953    async fn create_org_channel(&self, org_id: OrgId, name: &str) -> Result<ChannelId> {
 954        let query = "
 955            INSERT INTO channels (owner_id, owner_is_user, name)
 956            VALUES ($1, false, $2)
 957            RETURNING id
 958        ";
 959        Ok(sqlx::query_scalar(query)
 960            .bind(org_id.0)
 961            .bind(name)
 962            .fetch_one(&self.pool)
 963            .await
 964            .map(ChannelId)?)
 965    }
 966
 967    #[allow(unused)] // Help rust-analyzer
 968    #[cfg(any(test, feature = "seed-support"))]
 969    async fn get_org_channels(&self, org_id: OrgId) -> Result<Vec<Channel>> {
 970        let query = "
 971            SELECT *
 972            FROM channels
 973            WHERE
 974                channels.owner_is_user = false AND
 975                channels.owner_id = $1
 976        ";
 977        Ok(sqlx::query_as(query)
 978            .bind(org_id.0)
 979            .fetch_all(&self.pool)
 980            .await?)
 981    }
 982
 983    async fn get_accessible_channels(&self, user_id: UserId) -> Result<Vec<Channel>> {
 984        let query = "
 985            SELECT
 986                channels.*
 987            FROM
 988                channel_memberships, channels
 989            WHERE
 990                channel_memberships.user_id = $1 AND
 991                channel_memberships.channel_id = channels.id
 992        ";
 993        Ok(sqlx::query_as(query)
 994            .bind(user_id.0)
 995            .fetch_all(&self.pool)
 996            .await?)
 997    }
 998
 999    async fn can_user_access_channel(
1000        &self,
1001        user_id: UserId,
1002        channel_id: ChannelId,
1003    ) -> Result<bool> {
1004        let query = "
1005            SELECT id
1006            FROM channel_memberships
1007            WHERE user_id = $1 AND channel_id = $2
1008            LIMIT 1
1009        ";
1010        Ok(sqlx::query_scalar::<_, i32>(query)
1011            .bind(user_id.0)
1012            .bind(channel_id.0)
1013            .fetch_optional(&self.pool)
1014            .await
1015            .map(|e| e.is_some())?)
1016    }
1017
1018    #[cfg(any(test, feature = "seed-support"))]
1019    async fn add_channel_member(
1020        &self,
1021        channel_id: ChannelId,
1022        user_id: UserId,
1023        is_admin: bool,
1024    ) -> Result<()> {
1025        let query = "
1026            INSERT INTO channel_memberships (channel_id, user_id, admin)
1027            VALUES ($1, $2, $3)
1028            ON CONFLICT DO NOTHING
1029        ";
1030        Ok(sqlx::query(query)
1031            .bind(channel_id.0)
1032            .bind(user_id.0)
1033            .bind(is_admin)
1034            .execute(&self.pool)
1035            .await
1036            .map(drop)?)
1037    }
1038
1039    // messages
1040
1041    async fn create_channel_message(
1042        &self,
1043        channel_id: ChannelId,
1044        sender_id: UserId,
1045        body: &str,
1046        timestamp: OffsetDateTime,
1047        nonce: u128,
1048    ) -> Result<MessageId> {
1049        let query = "
1050            INSERT INTO channel_messages (channel_id, sender_id, body, sent_at, nonce)
1051            VALUES ($1, $2, $3, $4, $5)
1052            ON CONFLICT (nonce) DO UPDATE SET nonce = excluded.nonce
1053            RETURNING id
1054        ";
1055        Ok(sqlx::query_scalar(query)
1056            .bind(channel_id.0)
1057            .bind(sender_id.0)
1058            .bind(body)
1059            .bind(timestamp)
1060            .bind(Uuid::from_u128(nonce))
1061            .fetch_one(&self.pool)
1062            .await
1063            .map(MessageId)?)
1064    }
1065
1066    async fn get_channel_messages(
1067        &self,
1068        channel_id: ChannelId,
1069        count: usize,
1070        before_id: Option<MessageId>,
1071    ) -> Result<Vec<ChannelMessage>> {
1072        let query = r#"
1073            SELECT * FROM (
1074                SELECT
1075                    id, channel_id, sender_id, body, sent_at AT TIME ZONE 'UTC' as sent_at, nonce
1076                FROM
1077                    channel_messages
1078                WHERE
1079                    channel_id = $1 AND
1080                    id < $2
1081                ORDER BY id DESC
1082                LIMIT $3
1083            ) as recent_messages
1084            ORDER BY id ASC
1085        "#;
1086        Ok(sqlx::query_as(query)
1087            .bind(channel_id.0)
1088            .bind(before_id.unwrap_or(MessageId::MAX))
1089            .bind(count as i64)
1090            .fetch_all(&self.pool)
1091            .await?)
1092    }
1093
1094    #[cfg(test)]
1095    async fn teardown(&self, url: &str) {
1096        use util::ResultExt;
1097
1098        let query = "
1099            SELECT pg_terminate_backend(pg_stat_activity.pid)
1100            FROM pg_stat_activity
1101            WHERE pg_stat_activity.datname = current_database() AND pid <> pg_backend_pid();
1102        ";
1103        sqlx::query(query).execute(&self.pool).await.log_err();
1104        self.pool.close().await;
1105        <sqlx::Postgres as sqlx::migrate::MigrateDatabase>::drop_database(url)
1106            .await
1107            .log_err();
1108    }
1109
1110    #[cfg(test)]
1111    fn as_fake(&self) -> Option<&tests::FakeDb> {
1112        None
1113    }
1114}
1115
1116macro_rules! id_type {
1117    ($name:ident) => {
1118        #[derive(
1119            Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type, Serialize,
1120        )]
1121        #[sqlx(transparent)]
1122        #[serde(transparent)]
1123        pub struct $name(pub i32);
1124
1125        impl $name {
1126            #[allow(unused)]
1127            pub const MAX: Self = Self(i32::MAX);
1128
1129            #[allow(unused)]
1130            pub fn from_proto(value: u64) -> Self {
1131                Self(value as i32)
1132            }
1133
1134            #[allow(unused)]
1135            pub fn to_proto(&self) -> u64 {
1136                self.0 as u64
1137            }
1138        }
1139
1140        impl std::fmt::Display for $name {
1141            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1142                self.0.fmt(f)
1143            }
1144        }
1145    };
1146}
1147
1148id_type!(UserId);
1149#[derive(Clone, Debug, Default, FromRow, Serialize, PartialEq)]
1150pub struct User {
1151    pub id: UserId,
1152    pub github_login: String,
1153    pub email_address: Option<String>,
1154    pub admin: bool,
1155    pub invite_code: Option<String>,
1156    pub invite_count: i32,
1157    pub connected_once: bool,
1158}
1159
1160id_type!(ProjectId);
1161#[derive(Clone, Debug, Default, FromRow, Serialize, PartialEq)]
1162pub struct Project {
1163    pub id: ProjectId,
1164    pub host_user_id: UserId,
1165    pub unregistered: bool,
1166}
1167
1168#[derive(Clone, Debug, PartialEq, Serialize)]
1169pub struct UserActivitySummary {
1170    pub id: UserId,
1171    pub github_login: String,
1172    pub project_activity: Vec<(ProjectId, Duration)>,
1173}
1174
1175id_type!(OrgId);
1176#[derive(FromRow)]
1177pub struct Org {
1178    pub id: OrgId,
1179    pub name: String,
1180    pub slug: String,
1181}
1182
1183id_type!(ChannelId);
1184#[derive(Clone, Debug, FromRow, Serialize)]
1185pub struct Channel {
1186    pub id: ChannelId,
1187    pub name: String,
1188    pub owner_id: i32,
1189    pub owner_is_user: bool,
1190}
1191
1192id_type!(MessageId);
1193#[derive(Clone, Debug, FromRow)]
1194pub struct ChannelMessage {
1195    pub id: MessageId,
1196    pub channel_id: ChannelId,
1197    pub sender_id: UserId,
1198    pub body: String,
1199    pub sent_at: OffsetDateTime,
1200    pub nonce: Uuid,
1201}
1202
1203#[derive(Clone, Debug, PartialEq, Eq)]
1204pub enum Contact {
1205    Accepted {
1206        user_id: UserId,
1207        should_notify: bool,
1208    },
1209    Outgoing {
1210        user_id: UserId,
1211    },
1212    Incoming {
1213        user_id: UserId,
1214        should_notify: bool,
1215    },
1216}
1217
1218impl Contact {
1219    pub fn user_id(&self) -> UserId {
1220        match self {
1221            Contact::Accepted { user_id, .. } => *user_id,
1222            Contact::Outgoing { user_id } => *user_id,
1223            Contact::Incoming { user_id, .. } => *user_id,
1224        }
1225    }
1226}
1227
1228#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
1229pub struct IncomingContactRequest {
1230    pub requester_id: UserId,
1231    pub should_notify: bool,
1232}
1233
1234fn fuzzy_like_string(string: &str) -> String {
1235    let mut result = String::with_capacity(string.len() * 2 + 1);
1236    for c in string.chars() {
1237        if c.is_alphanumeric() {
1238            result.push('%');
1239            result.push(c);
1240        }
1241    }
1242    result.push('%');
1243    result
1244}
1245
1246#[cfg(test)]
1247pub mod tests {
1248    use super::*;
1249    use anyhow::anyhow;
1250    use collections::BTreeMap;
1251    use gpui::executor::Background;
1252    use lazy_static::lazy_static;
1253    use parking_lot::Mutex;
1254    use rand::prelude::*;
1255    use sqlx::{
1256        migrate::{MigrateDatabase, Migrator},
1257        Postgres,
1258    };
1259    use std::{path::Path, sync::Arc};
1260    use util::post_inc;
1261
1262    #[tokio::test(flavor = "multi_thread")]
1263    async fn test_get_users_by_ids() {
1264        for test_db in [
1265            TestDb::postgres().await,
1266            TestDb::fake(Arc::new(gpui::executor::Background::new())),
1267        ] {
1268            let db = test_db.db();
1269
1270            let user = db.create_user("user", None, false).await.unwrap();
1271            let friend1 = db.create_user("friend-1", None, false).await.unwrap();
1272            let friend2 = db.create_user("friend-2", None, false).await.unwrap();
1273            let friend3 = db.create_user("friend-3", None, false).await.unwrap();
1274
1275            assert_eq!(
1276                db.get_users_by_ids(vec![user, friend1, friend2, friend3])
1277                    .await
1278                    .unwrap(),
1279                vec![
1280                    User {
1281                        id: user,
1282                        github_login: "user".to_string(),
1283                        admin: false,
1284                        ..Default::default()
1285                    },
1286                    User {
1287                        id: friend1,
1288                        github_login: "friend-1".to_string(),
1289                        admin: false,
1290                        ..Default::default()
1291                    },
1292                    User {
1293                        id: friend2,
1294                        github_login: "friend-2".to_string(),
1295                        admin: false,
1296                        ..Default::default()
1297                    },
1298                    User {
1299                        id: friend3,
1300                        github_login: "friend-3".to_string(),
1301                        admin: false,
1302                        ..Default::default()
1303                    }
1304                ]
1305            );
1306        }
1307    }
1308
1309    #[tokio::test(flavor = "multi_thread")]
1310    async fn test_create_users() {
1311        let db = TestDb::postgres().await;
1312        let db = db.db();
1313
1314        // Create the first batch of users, ensuring invite counts are assigned
1315        // correctly and the respective invite codes are unique.
1316        let user_ids_batch_1 = db
1317            .create_users(vec![
1318                ("user1".to_string(), "hi@user1.com".to_string(), 5),
1319                ("user2".to_string(), "hi@user2.com".to_string(), 4),
1320                ("user3".to_string(), "hi@user3.com".to_string(), 3),
1321            ])
1322            .await
1323            .unwrap();
1324        assert_eq!(user_ids_batch_1.len(), 3);
1325
1326        let users = db.get_users_by_ids(user_ids_batch_1.clone()).await.unwrap();
1327        assert_eq!(users.len(), 3);
1328        assert_eq!(users[0].github_login, "user1");
1329        assert_eq!(users[0].email_address.as_deref(), Some("hi@user1.com"));
1330        assert_eq!(users[0].invite_count, 5);
1331        assert_eq!(users[1].github_login, "user2");
1332        assert_eq!(users[1].email_address.as_deref(), Some("hi@user2.com"));
1333        assert_eq!(users[1].invite_count, 4);
1334        assert_eq!(users[2].github_login, "user3");
1335        assert_eq!(users[2].email_address.as_deref(), Some("hi@user3.com"));
1336        assert_eq!(users[2].invite_count, 3);
1337
1338        let invite_code_1 = users[0].invite_code.clone().unwrap();
1339        let invite_code_2 = users[1].invite_code.clone().unwrap();
1340        let invite_code_3 = users[2].invite_code.clone().unwrap();
1341        assert_ne!(invite_code_1, invite_code_2);
1342        assert_ne!(invite_code_1, invite_code_3);
1343        assert_ne!(invite_code_2, invite_code_3);
1344
1345        // Create the second batch of users and include a user that is already in the database, ensuring
1346        // the invite count for the existing user is updated without changing their invite code.
1347        let user_ids_batch_2 = db
1348            .create_users(vec![
1349                ("user2".to_string(), "hi@user2.com".to_string(), 10),
1350                ("user4".to_string(), "hi@user4.com".to_string(), 2),
1351            ])
1352            .await
1353            .unwrap();
1354        assert_eq!(user_ids_batch_2.len(), 2);
1355        assert_eq!(user_ids_batch_2[0], user_ids_batch_1[1]);
1356
1357        let users = db.get_users_by_ids(user_ids_batch_2).await.unwrap();
1358        assert_eq!(users.len(), 2);
1359        assert_eq!(users[0].github_login, "user2");
1360        assert_eq!(users[0].email_address.as_deref(), Some("hi@user2.com"));
1361        assert_eq!(users[0].invite_count, 10);
1362        assert_eq!(users[0].invite_code, Some(invite_code_2.clone()));
1363        assert_eq!(users[1].github_login, "user4");
1364        assert_eq!(users[1].email_address.as_deref(), Some("hi@user4.com"));
1365        assert_eq!(users[1].invite_count, 2);
1366
1367        let invite_code_4 = users[1].invite_code.clone().unwrap();
1368        assert_ne!(invite_code_4, invite_code_1);
1369        assert_ne!(invite_code_4, invite_code_2);
1370        assert_ne!(invite_code_4, invite_code_3);
1371    }
1372
1373    #[tokio::test(flavor = "multi_thread")]
1374    async fn test_worktree_extensions() {
1375        let test_db = TestDb::postgres().await;
1376        let db = test_db.db();
1377
1378        let user = db.create_user("user_1", None, false).await.unwrap();
1379        let project = db.register_project(user).await.unwrap();
1380
1381        db.update_worktree_extensions(project, 100, Default::default())
1382            .await
1383            .unwrap();
1384        db.update_worktree_extensions(
1385            project,
1386            100,
1387            [("rs".to_string(), 5), ("md".to_string(), 3)]
1388                .into_iter()
1389                .collect(),
1390        )
1391        .await
1392        .unwrap();
1393        db.update_worktree_extensions(
1394            project,
1395            100,
1396            [("rs".to_string(), 6), ("md".to_string(), 5)]
1397                .into_iter()
1398                .collect(),
1399        )
1400        .await
1401        .unwrap();
1402        db.update_worktree_extensions(
1403            project,
1404            101,
1405            [("ts".to_string(), 2), ("md".to_string(), 1)]
1406                .into_iter()
1407                .collect(),
1408        )
1409        .await
1410        .unwrap();
1411
1412        assert_eq!(
1413            db.get_project_extensions(project).await.unwrap(),
1414            [
1415                (
1416                    100,
1417                    [("rs".into(), 6), ("md".into(), 5),]
1418                        .into_iter()
1419                        .collect::<HashMap<_, _>>()
1420                ),
1421                (
1422                    101,
1423                    [("ts".into(), 2), ("md".into(), 1),]
1424                        .into_iter()
1425                        .collect::<HashMap<_, _>>()
1426                )
1427            ]
1428            .into_iter()
1429            .collect()
1430        );
1431    }
1432
1433    #[tokio::test(flavor = "multi_thread")]
1434    async fn test_project_activity() {
1435        let test_db = TestDb::postgres().await;
1436        let db = test_db.db();
1437
1438        let user_1 = db.create_user("user_1", None, false).await.unwrap();
1439        let user_2 = db.create_user("user_2", None, false).await.unwrap();
1440        let user_3 = db.create_user("user_3", None, false).await.unwrap();
1441        let project_1 = db.register_project(user_1).await.unwrap();
1442        let project_2 = db.register_project(user_2).await.unwrap();
1443        let t0 = OffsetDateTime::now_utc() - Duration::from_secs(60 * 60);
1444
1445        // User 2 opens a project
1446        let t1 = t0 + Duration::from_secs(10);
1447        db.record_project_activity(t0..t1, &[(user_2, project_2)])
1448            .await
1449            .unwrap();
1450
1451        let t2 = t1 + Duration::from_secs(10);
1452        db.record_project_activity(t1..t2, &[(user_2, project_2)])
1453            .await
1454            .unwrap();
1455
1456        // User 1 joins the project
1457        let t3 = t2 + Duration::from_secs(10);
1458        db.record_project_activity(t2..t3, &[(user_2, project_2), (user_1, project_2)])
1459            .await
1460            .unwrap();
1461
1462        // User 1 opens another project
1463        let t4 = t3 + Duration::from_secs(10);
1464        db.record_project_activity(
1465            t3..t4,
1466            &[
1467                (user_2, project_2),
1468                (user_1, project_2),
1469                (user_1, project_1),
1470            ],
1471        )
1472        .await
1473        .unwrap();
1474
1475        // User 3 joins that project
1476        let t5 = t4 + Duration::from_secs(10);
1477        db.record_project_activity(
1478            t4..t5,
1479            &[
1480                (user_2, project_2),
1481                (user_1, project_2),
1482                (user_1, project_1),
1483                (user_3, project_1),
1484            ],
1485        )
1486        .await
1487        .unwrap();
1488
1489        // User 2 leaves
1490        let t6 = t5 + Duration::from_secs(5);
1491        db.record_project_activity(t5..t6, &[(user_1, project_1), (user_3, project_1)])
1492            .await
1493            .unwrap();
1494
1495        let summary = db.summarize_project_activity(t0..t6, 10).await.unwrap();
1496        assert_eq!(
1497            summary,
1498            &[
1499                UserActivitySummary {
1500                    id: user_1,
1501                    github_login: "user_1".to_string(),
1502                    project_activity: vec![
1503                        (project_2, Duration::from_secs(30)),
1504                        (project_1, Duration::from_secs(25))
1505                    ]
1506                },
1507                UserActivitySummary {
1508                    id: user_2,
1509                    github_login: "user_2".to_string(),
1510                    project_activity: vec![(project_2, Duration::from_secs(50))]
1511                },
1512                UserActivitySummary {
1513                    id: user_3,
1514                    github_login: "user_3".to_string(),
1515                    project_activity: vec![(project_1, Duration::from_secs(15))]
1516                },
1517            ]
1518        );
1519    }
1520
1521    #[tokio::test(flavor = "multi_thread")]
1522    async fn test_recent_channel_messages() {
1523        for test_db in [
1524            TestDb::postgres().await,
1525            TestDb::fake(Arc::new(gpui::executor::Background::new())),
1526        ] {
1527            let db = test_db.db();
1528            let user = db.create_user("user", None, false).await.unwrap();
1529            let org = db.create_org("org", "org").await.unwrap();
1530            let channel = db.create_org_channel(org, "channel").await.unwrap();
1531            for i in 0..10 {
1532                db.create_channel_message(
1533                    channel,
1534                    user,
1535                    &i.to_string(),
1536                    OffsetDateTime::now_utc(),
1537                    i,
1538                )
1539                .await
1540                .unwrap();
1541            }
1542
1543            let messages = db.get_channel_messages(channel, 5, None).await.unwrap();
1544            assert_eq!(
1545                messages.iter().map(|m| &m.body).collect::<Vec<_>>(),
1546                ["5", "6", "7", "8", "9"]
1547            );
1548
1549            let prev_messages = db
1550                .get_channel_messages(channel, 4, Some(messages[0].id))
1551                .await
1552                .unwrap();
1553            assert_eq!(
1554                prev_messages.iter().map(|m| &m.body).collect::<Vec<_>>(),
1555                ["1", "2", "3", "4"]
1556            );
1557        }
1558    }
1559
1560    #[tokio::test(flavor = "multi_thread")]
1561    async fn test_channel_message_nonces() {
1562        for test_db in [
1563            TestDb::postgres().await,
1564            TestDb::fake(Arc::new(gpui::executor::Background::new())),
1565        ] {
1566            let db = test_db.db();
1567            let user = db.create_user("user", None, false).await.unwrap();
1568            let org = db.create_org("org", "org").await.unwrap();
1569            let channel = db.create_org_channel(org, "channel").await.unwrap();
1570
1571            let msg1_id = db
1572                .create_channel_message(channel, user, "1", OffsetDateTime::now_utc(), 1)
1573                .await
1574                .unwrap();
1575            let msg2_id = db
1576                .create_channel_message(channel, user, "2", OffsetDateTime::now_utc(), 2)
1577                .await
1578                .unwrap();
1579            let msg3_id = db
1580                .create_channel_message(channel, user, "3", OffsetDateTime::now_utc(), 1)
1581                .await
1582                .unwrap();
1583            let msg4_id = db
1584                .create_channel_message(channel, user, "4", OffsetDateTime::now_utc(), 2)
1585                .await
1586                .unwrap();
1587
1588            assert_ne!(msg1_id, msg2_id);
1589            assert_eq!(msg1_id, msg3_id);
1590            assert_eq!(msg2_id, msg4_id);
1591        }
1592    }
1593
1594    #[tokio::test(flavor = "multi_thread")]
1595    async fn test_create_access_tokens() {
1596        let test_db = TestDb::postgres().await;
1597        let db = test_db.db();
1598        let user = db.create_user("the-user", None, false).await.unwrap();
1599
1600        db.create_access_token_hash(user, "h1", 3).await.unwrap();
1601        db.create_access_token_hash(user, "h2", 3).await.unwrap();
1602        assert_eq!(
1603            db.get_access_token_hashes(user).await.unwrap(),
1604            &["h2".to_string(), "h1".to_string()]
1605        );
1606
1607        db.create_access_token_hash(user, "h3", 3).await.unwrap();
1608        assert_eq!(
1609            db.get_access_token_hashes(user).await.unwrap(),
1610            &["h3".to_string(), "h2".to_string(), "h1".to_string(),]
1611        );
1612
1613        db.create_access_token_hash(user, "h4", 3).await.unwrap();
1614        assert_eq!(
1615            db.get_access_token_hashes(user).await.unwrap(),
1616            &["h4".to_string(), "h3".to_string(), "h2".to_string(),]
1617        );
1618
1619        db.create_access_token_hash(user, "h5", 3).await.unwrap();
1620        assert_eq!(
1621            db.get_access_token_hashes(user).await.unwrap(),
1622            &["h5".to_string(), "h4".to_string(), "h3".to_string()]
1623        );
1624    }
1625
1626    #[test]
1627    fn test_fuzzy_like_string() {
1628        assert_eq!(fuzzy_like_string("abcd"), "%a%b%c%d%");
1629        assert_eq!(fuzzy_like_string("x y"), "%x%y%");
1630        assert_eq!(fuzzy_like_string(" z  "), "%z%");
1631    }
1632
1633    #[tokio::test(flavor = "multi_thread")]
1634    async fn test_fuzzy_search_users() {
1635        let test_db = TestDb::postgres().await;
1636        let db = test_db.db();
1637        for github_login in [
1638            "California",
1639            "colorado",
1640            "oregon",
1641            "washington",
1642            "florida",
1643            "delaware",
1644            "rhode-island",
1645        ] {
1646            db.create_user(github_login, None, false).await.unwrap();
1647        }
1648
1649        assert_eq!(
1650            fuzzy_search_user_names(db, "clr").await,
1651            &["colorado", "California"]
1652        );
1653        assert_eq!(
1654            fuzzy_search_user_names(db, "ro").await,
1655            &["rhode-island", "colorado", "oregon"],
1656        );
1657
1658        async fn fuzzy_search_user_names(db: &Arc<dyn Db>, query: &str) -> Vec<String> {
1659            db.fuzzy_search_users(query, 10)
1660                .await
1661                .unwrap()
1662                .into_iter()
1663                .map(|user| user.github_login)
1664                .collect::<Vec<_>>()
1665        }
1666    }
1667
1668    #[tokio::test(flavor = "multi_thread")]
1669    async fn test_add_contacts() {
1670        for test_db in [
1671            TestDb::postgres().await,
1672            TestDb::fake(Arc::new(gpui::executor::Background::new())),
1673        ] {
1674            let db = test_db.db();
1675
1676            let user_1 = db.create_user("user1", None, false).await.unwrap();
1677            let user_2 = db.create_user("user2", None, false).await.unwrap();
1678            let user_3 = db.create_user("user3", None, false).await.unwrap();
1679
1680            // User starts with no contacts
1681            assert_eq!(
1682                db.get_contacts(user_1).await.unwrap(),
1683                vec![Contact::Accepted {
1684                    user_id: user_1,
1685                    should_notify: false
1686                }],
1687            );
1688
1689            // User requests a contact. Both users see the pending request.
1690            db.send_contact_request(user_1, user_2).await.unwrap();
1691            assert!(!db.has_contact(user_1, user_2).await.unwrap());
1692            assert!(!db.has_contact(user_2, user_1).await.unwrap());
1693            assert_eq!(
1694                db.get_contacts(user_1).await.unwrap(),
1695                &[
1696                    Contact::Accepted {
1697                        user_id: user_1,
1698                        should_notify: false
1699                    },
1700                    Contact::Outgoing { user_id: user_2 }
1701                ],
1702            );
1703            assert_eq!(
1704                db.get_contacts(user_2).await.unwrap(),
1705                &[
1706                    Contact::Incoming {
1707                        user_id: user_1,
1708                        should_notify: true
1709                    },
1710                    Contact::Accepted {
1711                        user_id: user_2,
1712                        should_notify: false
1713                    },
1714                ]
1715            );
1716
1717            // User 2 dismisses the contact request notification without accepting or rejecting.
1718            // We shouldn't notify them again.
1719            db.dismiss_contact_notification(user_1, user_2)
1720                .await
1721                .unwrap_err();
1722            db.dismiss_contact_notification(user_2, user_1)
1723                .await
1724                .unwrap();
1725            assert_eq!(
1726                db.get_contacts(user_2).await.unwrap(),
1727                &[
1728                    Contact::Incoming {
1729                        user_id: user_1,
1730                        should_notify: false
1731                    },
1732                    Contact::Accepted {
1733                        user_id: user_2,
1734                        should_notify: false
1735                    },
1736                ]
1737            );
1738
1739            // User can't accept their own contact request
1740            db.respond_to_contact_request(user_1, user_2, true)
1741                .await
1742                .unwrap_err();
1743
1744            // User accepts a contact request. Both users see the contact.
1745            db.respond_to_contact_request(user_2, user_1, true)
1746                .await
1747                .unwrap();
1748            assert_eq!(
1749                db.get_contacts(user_1).await.unwrap(),
1750                &[
1751                    Contact::Accepted {
1752                        user_id: user_1,
1753                        should_notify: false
1754                    },
1755                    Contact::Accepted {
1756                        user_id: user_2,
1757                        should_notify: true
1758                    }
1759                ],
1760            );
1761            assert!(db.has_contact(user_1, user_2).await.unwrap());
1762            assert!(db.has_contact(user_2, user_1).await.unwrap());
1763            assert_eq!(
1764                db.get_contacts(user_2).await.unwrap(),
1765                &[
1766                    Contact::Accepted {
1767                        user_id: user_1,
1768                        should_notify: false,
1769                    },
1770                    Contact::Accepted {
1771                        user_id: user_2,
1772                        should_notify: false,
1773                    },
1774                ]
1775            );
1776
1777            // Users cannot re-request existing contacts.
1778            db.send_contact_request(user_1, user_2).await.unwrap_err();
1779            db.send_contact_request(user_2, user_1).await.unwrap_err();
1780
1781            // Users can't dismiss notifications of them accepting other users' requests.
1782            db.dismiss_contact_notification(user_2, user_1)
1783                .await
1784                .unwrap_err();
1785            assert_eq!(
1786                db.get_contacts(user_1).await.unwrap(),
1787                &[
1788                    Contact::Accepted {
1789                        user_id: user_1,
1790                        should_notify: false
1791                    },
1792                    Contact::Accepted {
1793                        user_id: user_2,
1794                        should_notify: true,
1795                    },
1796                ]
1797            );
1798
1799            // Users can dismiss notifications of other users accepting their requests.
1800            db.dismiss_contact_notification(user_1, user_2)
1801                .await
1802                .unwrap();
1803            assert_eq!(
1804                db.get_contacts(user_1).await.unwrap(),
1805                &[
1806                    Contact::Accepted {
1807                        user_id: user_1,
1808                        should_notify: false
1809                    },
1810                    Contact::Accepted {
1811                        user_id: user_2,
1812                        should_notify: false,
1813                    },
1814                ]
1815            );
1816
1817            // Users send each other concurrent contact requests and
1818            // see that they are immediately accepted.
1819            db.send_contact_request(user_1, user_3).await.unwrap();
1820            db.send_contact_request(user_3, user_1).await.unwrap();
1821            assert_eq!(
1822                db.get_contacts(user_1).await.unwrap(),
1823                &[
1824                    Contact::Accepted {
1825                        user_id: user_1,
1826                        should_notify: false
1827                    },
1828                    Contact::Accepted {
1829                        user_id: user_2,
1830                        should_notify: false,
1831                    },
1832                    Contact::Accepted {
1833                        user_id: user_3,
1834                        should_notify: false
1835                    },
1836                ]
1837            );
1838            assert_eq!(
1839                db.get_contacts(user_3).await.unwrap(),
1840                &[
1841                    Contact::Accepted {
1842                        user_id: user_1,
1843                        should_notify: false
1844                    },
1845                    Contact::Accepted {
1846                        user_id: user_3,
1847                        should_notify: false
1848                    }
1849                ],
1850            );
1851
1852            // User declines a contact request. Both users see that it is gone.
1853            db.send_contact_request(user_2, user_3).await.unwrap();
1854            db.respond_to_contact_request(user_3, user_2, false)
1855                .await
1856                .unwrap();
1857            assert!(!db.has_contact(user_2, user_3).await.unwrap());
1858            assert!(!db.has_contact(user_3, user_2).await.unwrap());
1859            assert_eq!(
1860                db.get_contacts(user_2).await.unwrap(),
1861                &[
1862                    Contact::Accepted {
1863                        user_id: user_1,
1864                        should_notify: false
1865                    },
1866                    Contact::Accepted {
1867                        user_id: user_2,
1868                        should_notify: false
1869                    }
1870                ]
1871            );
1872            assert_eq!(
1873                db.get_contacts(user_3).await.unwrap(),
1874                &[
1875                    Contact::Accepted {
1876                        user_id: user_1,
1877                        should_notify: false
1878                    },
1879                    Contact::Accepted {
1880                        user_id: user_3,
1881                        should_notify: false
1882                    }
1883                ],
1884            );
1885        }
1886    }
1887
1888    #[tokio::test(flavor = "multi_thread")]
1889    async fn test_invite_codes() {
1890        let postgres = TestDb::postgres().await;
1891        let db = postgres.db();
1892        let user1 = db.create_user("user-1", None, false).await.unwrap();
1893
1894        // Initially, user 1 has no invite code
1895        assert_eq!(db.get_invite_code_for_user(user1).await.unwrap(), None);
1896
1897        // Setting invite count to 0 when no code is assigned does not assign a new code
1898        db.set_invite_count(user1, 0).await.unwrap();
1899        assert!(db.get_invite_code_for_user(user1).await.unwrap().is_none());
1900
1901        // User 1 creates an invite code that can be used twice.
1902        db.set_invite_count(user1, 2).await.unwrap();
1903        let (invite_code, invite_count) =
1904            db.get_invite_code_for_user(user1).await.unwrap().unwrap();
1905        assert_eq!(invite_count, 2);
1906
1907        // User 2 redeems the invite code and becomes a contact of user 1.
1908        let user2 = db
1909            .redeem_invite_code(&invite_code, "user-2", None)
1910            .await
1911            .unwrap();
1912        let (_, invite_count) = db.get_invite_code_for_user(user1).await.unwrap().unwrap();
1913        assert_eq!(invite_count, 1);
1914        assert_eq!(
1915            db.get_contacts(user1).await.unwrap(),
1916            [
1917                Contact::Accepted {
1918                    user_id: user1,
1919                    should_notify: false
1920                },
1921                Contact::Accepted {
1922                    user_id: user2,
1923                    should_notify: true
1924                }
1925            ]
1926        );
1927        assert_eq!(
1928            db.get_contacts(user2).await.unwrap(),
1929            [
1930                Contact::Accepted {
1931                    user_id: user1,
1932                    should_notify: false
1933                },
1934                Contact::Accepted {
1935                    user_id: user2,
1936                    should_notify: false
1937                }
1938            ]
1939        );
1940
1941        // User 3 redeems the invite code and becomes a contact of user 1.
1942        let user3 = db
1943            .redeem_invite_code(&invite_code, "user-3", None)
1944            .await
1945            .unwrap();
1946        let (_, invite_count) = db.get_invite_code_for_user(user1).await.unwrap().unwrap();
1947        assert_eq!(invite_count, 0);
1948        assert_eq!(
1949            db.get_contacts(user1).await.unwrap(),
1950            [
1951                Contact::Accepted {
1952                    user_id: user1,
1953                    should_notify: false
1954                },
1955                Contact::Accepted {
1956                    user_id: user2,
1957                    should_notify: true
1958                },
1959                Contact::Accepted {
1960                    user_id: user3,
1961                    should_notify: true
1962                }
1963            ]
1964        );
1965        assert_eq!(
1966            db.get_contacts(user3).await.unwrap(),
1967            [
1968                Contact::Accepted {
1969                    user_id: user1,
1970                    should_notify: false
1971                },
1972                Contact::Accepted {
1973                    user_id: user3,
1974                    should_notify: false
1975                },
1976            ]
1977        );
1978
1979        // Trying to reedem the code for the third time results in an error.
1980        db.redeem_invite_code(&invite_code, "user-4", None)
1981            .await
1982            .unwrap_err();
1983
1984        // Invite count can be updated after the code has been created.
1985        db.set_invite_count(user1, 2).await.unwrap();
1986        let (latest_code, invite_count) =
1987            db.get_invite_code_for_user(user1).await.unwrap().unwrap();
1988        assert_eq!(latest_code, invite_code); // Invite code doesn't change when we increment above 0
1989        assert_eq!(invite_count, 2);
1990
1991        // User 4 can now redeem the invite code and becomes a contact of user 1.
1992        let user4 = db
1993            .redeem_invite_code(&invite_code, "user-4", None)
1994            .await
1995            .unwrap();
1996        let (_, invite_count) = db.get_invite_code_for_user(user1).await.unwrap().unwrap();
1997        assert_eq!(invite_count, 1);
1998        assert_eq!(
1999            db.get_contacts(user1).await.unwrap(),
2000            [
2001                Contact::Accepted {
2002                    user_id: user1,
2003                    should_notify: false
2004                },
2005                Contact::Accepted {
2006                    user_id: user2,
2007                    should_notify: true
2008                },
2009                Contact::Accepted {
2010                    user_id: user3,
2011                    should_notify: true
2012                },
2013                Contact::Accepted {
2014                    user_id: user4,
2015                    should_notify: true
2016                }
2017            ]
2018        );
2019        assert_eq!(
2020            db.get_contacts(user4).await.unwrap(),
2021            [
2022                Contact::Accepted {
2023                    user_id: user1,
2024                    should_notify: false
2025                },
2026                Contact::Accepted {
2027                    user_id: user4,
2028                    should_notify: false
2029                },
2030            ]
2031        );
2032
2033        // An existing user cannot redeem invite codes.
2034        db.redeem_invite_code(&invite_code, "user-2", None)
2035            .await
2036            .unwrap_err();
2037        let (_, invite_count) = db.get_invite_code_for_user(user1).await.unwrap().unwrap();
2038        assert_eq!(invite_count, 1);
2039    }
2040
2041    pub struct TestDb {
2042        pub db: Option<Arc<dyn Db>>,
2043        pub url: String,
2044    }
2045
2046    impl TestDb {
2047        pub async fn postgres() -> Self {
2048            lazy_static! {
2049                static ref LOCK: Mutex<()> = Mutex::new(());
2050            }
2051
2052            let _guard = LOCK.lock();
2053            let mut rng = StdRng::from_entropy();
2054            let name = format!("zed-test-{}", rng.gen::<u128>());
2055            let url = format!("postgres://postgres@localhost/{}", name);
2056            let migrations_path = Path::new(concat!(env!("CARGO_MANIFEST_DIR"), "/migrations"));
2057            Postgres::create_database(&url)
2058                .await
2059                .expect("failed to create test db");
2060            let db = PostgresDb::new(&url, 5).await.unwrap();
2061            let migrator = Migrator::new(migrations_path).await.unwrap();
2062            migrator.run(&db.pool).await.unwrap();
2063            Self {
2064                db: Some(Arc::new(db)),
2065                url,
2066            }
2067        }
2068
2069        pub fn fake(background: Arc<Background>) -> Self {
2070            Self {
2071                db: Some(Arc::new(FakeDb::new(background))),
2072                url: Default::default(),
2073            }
2074        }
2075
2076        pub fn db(&self) -> &Arc<dyn Db> {
2077            self.db.as_ref().unwrap()
2078        }
2079    }
2080
2081    impl Drop for TestDb {
2082        fn drop(&mut self) {
2083            if let Some(db) = self.db.take() {
2084                futures::executor::block_on(db.teardown(&self.url));
2085            }
2086        }
2087    }
2088
2089    pub struct FakeDb {
2090        background: Arc<Background>,
2091        pub users: Mutex<BTreeMap<UserId, User>>,
2092        pub projects: Mutex<BTreeMap<ProjectId, Project>>,
2093        pub worktree_extensions: Mutex<BTreeMap<(ProjectId, u64, String), usize>>,
2094        pub orgs: Mutex<BTreeMap<OrgId, Org>>,
2095        pub org_memberships: Mutex<BTreeMap<(OrgId, UserId), bool>>,
2096        pub channels: Mutex<BTreeMap<ChannelId, Channel>>,
2097        pub channel_memberships: Mutex<BTreeMap<(ChannelId, UserId), bool>>,
2098        pub channel_messages: Mutex<BTreeMap<MessageId, ChannelMessage>>,
2099        pub contacts: Mutex<Vec<FakeContact>>,
2100        next_channel_message_id: Mutex<i32>,
2101        next_user_id: Mutex<i32>,
2102        next_org_id: Mutex<i32>,
2103        next_channel_id: Mutex<i32>,
2104        next_project_id: Mutex<i32>,
2105    }
2106
2107    #[derive(Debug)]
2108    pub struct FakeContact {
2109        pub requester_id: UserId,
2110        pub responder_id: UserId,
2111        pub accepted: bool,
2112        pub should_notify: bool,
2113    }
2114
2115    impl FakeDb {
2116        pub fn new(background: Arc<Background>) -> Self {
2117            Self {
2118                background,
2119                users: Default::default(),
2120                next_user_id: Mutex::new(0),
2121                projects: Default::default(),
2122                worktree_extensions: Default::default(),
2123                next_project_id: Mutex::new(1),
2124                orgs: Default::default(),
2125                next_org_id: Mutex::new(1),
2126                org_memberships: Default::default(),
2127                channels: Default::default(),
2128                next_channel_id: Mutex::new(1),
2129                channel_memberships: Default::default(),
2130                channel_messages: Default::default(),
2131                next_channel_message_id: Mutex::new(1),
2132                contacts: Default::default(),
2133            }
2134        }
2135    }
2136
2137    #[async_trait]
2138    impl Db for FakeDb {
2139        async fn create_user(
2140            &self,
2141            github_login: &str,
2142            email_address: Option<&str>,
2143            admin: bool,
2144        ) -> Result<UserId> {
2145            self.background.simulate_random_delay().await;
2146
2147            let mut users = self.users.lock();
2148            if let Some(user) = users
2149                .values()
2150                .find(|user| user.github_login == github_login)
2151            {
2152                Ok(user.id)
2153            } else {
2154                let user_id = UserId(post_inc(&mut *self.next_user_id.lock()));
2155                users.insert(
2156                    user_id,
2157                    User {
2158                        id: user_id,
2159                        github_login: github_login.to_string(),
2160                        email_address: email_address.map(str::to_string),
2161                        admin,
2162                        invite_code: None,
2163                        invite_count: 0,
2164                        connected_once: false,
2165                    },
2166                );
2167                Ok(user_id)
2168            }
2169        }
2170
2171        async fn get_all_users(&self, _page: u32, _limit: u32) -> Result<Vec<User>> {
2172            unimplemented!()
2173        }
2174
2175        async fn create_users(&self, _users: Vec<(String, String, usize)>) -> Result<Vec<UserId>> {
2176            unimplemented!()
2177        }
2178
2179        async fn fuzzy_search_users(&self, _: &str, _: u32) -> Result<Vec<User>> {
2180            unimplemented!()
2181        }
2182
2183        async fn get_user_by_id(&self, id: UserId) -> Result<Option<User>> {
2184            self.background.simulate_random_delay().await;
2185            Ok(self.get_users_by_ids(vec![id]).await?.into_iter().next())
2186        }
2187
2188        async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<User>> {
2189            self.background.simulate_random_delay().await;
2190            let users = self.users.lock();
2191            Ok(ids.iter().filter_map(|id| users.get(id).cloned()).collect())
2192        }
2193
2194        async fn get_users_with_no_invites(&self, _: bool) -> Result<Vec<User>> {
2195            unimplemented!()
2196        }
2197
2198        async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>> {
2199            self.background.simulate_random_delay().await;
2200            Ok(self
2201                .users
2202                .lock()
2203                .values()
2204                .find(|user| user.github_login == github_login)
2205                .cloned())
2206        }
2207
2208        async fn set_user_is_admin(&self, _id: UserId, _is_admin: bool) -> Result<()> {
2209            unimplemented!()
2210        }
2211
2212        async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
2213            self.background.simulate_random_delay().await;
2214            let mut users = self.users.lock();
2215            let mut user = users
2216                .get_mut(&id)
2217                .ok_or_else(|| anyhow!("user not found"))?;
2218            user.connected_once = connected_once;
2219            Ok(())
2220        }
2221
2222        async fn destroy_user(&self, _id: UserId) -> Result<()> {
2223            unimplemented!()
2224        }
2225
2226        // invite codes
2227
2228        async fn set_invite_count(&self, _id: UserId, _count: u32) -> Result<()> {
2229            unimplemented!()
2230        }
2231
2232        async fn get_invite_code_for_user(&self, _id: UserId) -> Result<Option<(String, u32)>> {
2233            self.background.simulate_random_delay().await;
2234            Ok(None)
2235        }
2236
2237        async fn get_user_for_invite_code(&self, _code: &str) -> Result<User> {
2238            unimplemented!()
2239        }
2240
2241        async fn redeem_invite_code(
2242            &self,
2243            _code: &str,
2244            _login: &str,
2245            _email_address: Option<&str>,
2246        ) -> Result<UserId> {
2247            unimplemented!()
2248        }
2249
2250        // projects
2251
2252        async fn register_project(&self, host_user_id: UserId) -> Result<ProjectId> {
2253            self.background.simulate_random_delay().await;
2254            if !self.users.lock().contains_key(&host_user_id) {
2255                Err(anyhow!("no such user"))?;
2256            }
2257
2258            let project_id = ProjectId(post_inc(&mut *self.next_project_id.lock()));
2259            self.projects.lock().insert(
2260                project_id,
2261                Project {
2262                    id: project_id,
2263                    host_user_id,
2264                    unregistered: false,
2265                },
2266            );
2267            Ok(project_id)
2268        }
2269
2270        async fn unregister_project(&self, project_id: ProjectId) -> Result<()> {
2271            self.background.simulate_random_delay().await;
2272            self.projects
2273                .lock()
2274                .get_mut(&project_id)
2275                .ok_or_else(|| anyhow!("no such project"))?
2276                .unregistered = true;
2277            Ok(())
2278        }
2279
2280        async fn update_worktree_extensions(
2281            &self,
2282            project_id: ProjectId,
2283            worktree_id: u64,
2284            extensions: HashMap<String, usize>,
2285        ) -> Result<()> {
2286            self.background.simulate_random_delay().await;
2287            if !self.projects.lock().contains_key(&project_id) {
2288                Err(anyhow!("no such project"))?;
2289            }
2290
2291            for (extension, count) in extensions {
2292                self.worktree_extensions
2293                    .lock()
2294                    .insert((project_id, worktree_id, extension), count);
2295            }
2296
2297            Ok(())
2298        }
2299
2300        async fn get_project_extensions(
2301            &self,
2302            _project_id: ProjectId,
2303        ) -> Result<HashMap<u64, HashMap<String, usize>>> {
2304            unimplemented!()
2305        }
2306
2307        async fn record_project_activity(
2308            &self,
2309            _period: Range<OffsetDateTime>,
2310            _active_projects: &[(UserId, ProjectId)],
2311        ) -> Result<()> {
2312            unimplemented!()
2313        }
2314
2315        async fn summarize_project_activity(
2316            &self,
2317            _period: Range<OffsetDateTime>,
2318            _limit: usize,
2319        ) -> Result<Vec<UserActivitySummary>> {
2320            unimplemented!()
2321        }
2322
2323        // contacts
2324
2325        async fn get_contacts(&self, id: UserId) -> Result<Vec<Contact>> {
2326            self.background.simulate_random_delay().await;
2327            let mut contacts = vec![Contact::Accepted {
2328                user_id: id,
2329                should_notify: false,
2330            }];
2331
2332            for contact in self.contacts.lock().iter() {
2333                if contact.requester_id == id {
2334                    if contact.accepted {
2335                        contacts.push(Contact::Accepted {
2336                            user_id: contact.responder_id,
2337                            should_notify: contact.should_notify,
2338                        });
2339                    } else {
2340                        contacts.push(Contact::Outgoing {
2341                            user_id: contact.responder_id,
2342                        });
2343                    }
2344                } else if contact.responder_id == id {
2345                    if contact.accepted {
2346                        contacts.push(Contact::Accepted {
2347                            user_id: contact.requester_id,
2348                            should_notify: false,
2349                        });
2350                    } else {
2351                        contacts.push(Contact::Incoming {
2352                            user_id: contact.requester_id,
2353                            should_notify: contact.should_notify,
2354                        });
2355                    }
2356                }
2357            }
2358
2359            contacts.sort_unstable_by_key(|contact| contact.user_id());
2360            Ok(contacts)
2361        }
2362
2363        async fn has_contact(&self, user_id_a: UserId, user_id_b: UserId) -> Result<bool> {
2364            self.background.simulate_random_delay().await;
2365            Ok(self.contacts.lock().iter().any(|contact| {
2366                contact.accepted
2367                    && ((contact.requester_id == user_id_a && contact.responder_id == user_id_b)
2368                        || (contact.requester_id == user_id_b && contact.responder_id == user_id_a))
2369            }))
2370        }
2371
2372        async fn send_contact_request(
2373            &self,
2374            requester_id: UserId,
2375            responder_id: UserId,
2376        ) -> Result<()> {
2377            self.background.simulate_random_delay().await;
2378            let mut contacts = self.contacts.lock();
2379            for contact in contacts.iter_mut() {
2380                if contact.requester_id == requester_id && contact.responder_id == responder_id {
2381                    if contact.accepted {
2382                        Err(anyhow!("contact already exists"))?;
2383                    } else {
2384                        Err(anyhow!("contact already requested"))?;
2385                    }
2386                }
2387                if contact.responder_id == requester_id && contact.requester_id == responder_id {
2388                    if contact.accepted {
2389                        Err(anyhow!("contact already exists"))?;
2390                    } else {
2391                        contact.accepted = true;
2392                        contact.should_notify = false;
2393                        return Ok(());
2394                    }
2395                }
2396            }
2397            contacts.push(FakeContact {
2398                requester_id,
2399                responder_id,
2400                accepted: false,
2401                should_notify: true,
2402            });
2403            Ok(())
2404        }
2405
2406        async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<()> {
2407            self.background.simulate_random_delay().await;
2408            self.contacts.lock().retain(|contact| {
2409                !(contact.requester_id == requester_id && contact.responder_id == responder_id)
2410            });
2411            Ok(())
2412        }
2413
2414        async fn dismiss_contact_notification(
2415            &self,
2416            user_id: UserId,
2417            contact_user_id: UserId,
2418        ) -> Result<()> {
2419            self.background.simulate_random_delay().await;
2420            let mut contacts = self.contacts.lock();
2421            for contact in contacts.iter_mut() {
2422                if contact.requester_id == contact_user_id
2423                    && contact.responder_id == user_id
2424                    && !contact.accepted
2425                {
2426                    contact.should_notify = false;
2427                    return Ok(());
2428                }
2429                if contact.requester_id == user_id
2430                    && contact.responder_id == contact_user_id
2431                    && contact.accepted
2432                {
2433                    contact.should_notify = false;
2434                    return Ok(());
2435                }
2436            }
2437            Err(anyhow!("no such notification"))?
2438        }
2439
2440        async fn respond_to_contact_request(
2441            &self,
2442            responder_id: UserId,
2443            requester_id: UserId,
2444            accept: bool,
2445        ) -> Result<()> {
2446            self.background.simulate_random_delay().await;
2447            let mut contacts = self.contacts.lock();
2448            for (ix, contact) in contacts.iter_mut().enumerate() {
2449                if contact.requester_id == requester_id && contact.responder_id == responder_id {
2450                    if contact.accepted {
2451                        Err(anyhow!("contact already confirmed"))?;
2452                    }
2453                    if accept {
2454                        contact.accepted = true;
2455                        contact.should_notify = true;
2456                    } else {
2457                        contacts.remove(ix);
2458                    }
2459                    return Ok(());
2460                }
2461            }
2462            Err(anyhow!("no such contact request"))?
2463        }
2464
2465        async fn create_access_token_hash(
2466            &self,
2467            _user_id: UserId,
2468            _access_token_hash: &str,
2469            _max_access_token_count: usize,
2470        ) -> Result<()> {
2471            unimplemented!()
2472        }
2473
2474        async fn get_access_token_hashes(&self, _user_id: UserId) -> Result<Vec<String>> {
2475            unimplemented!()
2476        }
2477
2478        async fn find_org_by_slug(&self, _slug: &str) -> Result<Option<Org>> {
2479            unimplemented!()
2480        }
2481
2482        async fn create_org(&self, name: &str, slug: &str) -> Result<OrgId> {
2483            self.background.simulate_random_delay().await;
2484            let mut orgs = self.orgs.lock();
2485            if orgs.values().any(|org| org.slug == slug) {
2486                Err(anyhow!("org already exists"))?
2487            } else {
2488                let org_id = OrgId(post_inc(&mut *self.next_org_id.lock()));
2489                orgs.insert(
2490                    org_id,
2491                    Org {
2492                        id: org_id,
2493                        name: name.to_string(),
2494                        slug: slug.to_string(),
2495                    },
2496                );
2497                Ok(org_id)
2498            }
2499        }
2500
2501        async fn add_org_member(
2502            &self,
2503            org_id: OrgId,
2504            user_id: UserId,
2505            is_admin: bool,
2506        ) -> Result<()> {
2507            self.background.simulate_random_delay().await;
2508            if !self.orgs.lock().contains_key(&org_id) {
2509                Err(anyhow!("org does not exist"))?;
2510            }
2511            if !self.users.lock().contains_key(&user_id) {
2512                Err(anyhow!("user does not exist"))?;
2513            }
2514
2515            self.org_memberships
2516                .lock()
2517                .entry((org_id, user_id))
2518                .or_insert(is_admin);
2519            Ok(())
2520        }
2521
2522        async fn create_org_channel(&self, org_id: OrgId, name: &str) -> Result<ChannelId> {
2523            self.background.simulate_random_delay().await;
2524            if !self.orgs.lock().contains_key(&org_id) {
2525                Err(anyhow!("org does not exist"))?;
2526            }
2527
2528            let mut channels = self.channels.lock();
2529            let channel_id = ChannelId(post_inc(&mut *self.next_channel_id.lock()));
2530            channels.insert(
2531                channel_id,
2532                Channel {
2533                    id: channel_id,
2534                    name: name.to_string(),
2535                    owner_id: org_id.0,
2536                    owner_is_user: false,
2537                },
2538            );
2539            Ok(channel_id)
2540        }
2541
2542        async fn get_org_channels(&self, org_id: OrgId) -> Result<Vec<Channel>> {
2543            self.background.simulate_random_delay().await;
2544            Ok(self
2545                .channels
2546                .lock()
2547                .values()
2548                .filter(|channel| !channel.owner_is_user && channel.owner_id == org_id.0)
2549                .cloned()
2550                .collect())
2551        }
2552
2553        async fn get_accessible_channels(&self, user_id: UserId) -> Result<Vec<Channel>> {
2554            self.background.simulate_random_delay().await;
2555            let channels = self.channels.lock();
2556            let memberships = self.channel_memberships.lock();
2557            Ok(channels
2558                .values()
2559                .filter(|channel| memberships.contains_key(&(channel.id, user_id)))
2560                .cloned()
2561                .collect())
2562        }
2563
2564        async fn can_user_access_channel(
2565            &self,
2566            user_id: UserId,
2567            channel_id: ChannelId,
2568        ) -> Result<bool> {
2569            self.background.simulate_random_delay().await;
2570            Ok(self
2571                .channel_memberships
2572                .lock()
2573                .contains_key(&(channel_id, user_id)))
2574        }
2575
2576        async fn add_channel_member(
2577            &self,
2578            channel_id: ChannelId,
2579            user_id: UserId,
2580            is_admin: bool,
2581        ) -> Result<()> {
2582            self.background.simulate_random_delay().await;
2583            if !self.channels.lock().contains_key(&channel_id) {
2584                Err(anyhow!("channel does not exist"))?;
2585            }
2586            if !self.users.lock().contains_key(&user_id) {
2587                Err(anyhow!("user does not exist"))?;
2588            }
2589
2590            self.channel_memberships
2591                .lock()
2592                .entry((channel_id, user_id))
2593                .or_insert(is_admin);
2594            Ok(())
2595        }
2596
2597        async fn create_channel_message(
2598            &self,
2599            channel_id: ChannelId,
2600            sender_id: UserId,
2601            body: &str,
2602            timestamp: OffsetDateTime,
2603            nonce: u128,
2604        ) -> Result<MessageId> {
2605            self.background.simulate_random_delay().await;
2606            if !self.channels.lock().contains_key(&channel_id) {
2607                Err(anyhow!("channel does not exist"))?;
2608            }
2609            if !self.users.lock().contains_key(&sender_id) {
2610                Err(anyhow!("user does not exist"))?;
2611            }
2612
2613            let mut messages = self.channel_messages.lock();
2614            if let Some(message) = messages
2615                .values()
2616                .find(|message| message.nonce.as_u128() == nonce)
2617            {
2618                Ok(message.id)
2619            } else {
2620                let message_id = MessageId(post_inc(&mut *self.next_channel_message_id.lock()));
2621                messages.insert(
2622                    message_id,
2623                    ChannelMessage {
2624                        id: message_id,
2625                        channel_id,
2626                        sender_id,
2627                        body: body.to_string(),
2628                        sent_at: timestamp,
2629                        nonce: Uuid::from_u128(nonce),
2630                    },
2631                );
2632                Ok(message_id)
2633            }
2634        }
2635
2636        async fn get_channel_messages(
2637            &self,
2638            channel_id: ChannelId,
2639            count: usize,
2640            before_id: Option<MessageId>,
2641        ) -> Result<Vec<ChannelMessage>> {
2642            self.background.simulate_random_delay().await;
2643            let mut messages = self
2644                .channel_messages
2645                .lock()
2646                .values()
2647                .rev()
2648                .filter(|message| {
2649                    message.channel_id == channel_id
2650                        && message.id < before_id.unwrap_or(MessageId::MAX)
2651                })
2652                .take(count)
2653                .cloned()
2654                .collect::<Vec<_>>();
2655            messages.sort_unstable_by_key(|message| message.id);
2656            Ok(messages)
2657        }
2658
2659        async fn teardown(&self, _: &str) {}
2660
2661        #[cfg(test)]
2662        fn as_fake(&self) -> Option<&FakeDb> {
2663            Some(self)
2664        }
2665    }
2666}