1use crate::{Error, Result};
2use anyhow::{anyhow, Context};
3use async_trait::async_trait;
4use axum::http::StatusCode;
5use collections::HashMap;
6use futures::StreamExt;
7use serde::{Deserialize, Serialize};
8pub use sqlx::postgres::PgPoolOptions as DbOptions;
9use sqlx::{types::Uuid, FromRow, QueryBuilder};
10use std::{cmp, ops::Range, time::Duration};
11use time::{OffsetDateTime, PrimitiveDateTime};
12
13#[async_trait]
14pub trait Db: Send + Sync {
15 async fn create_user(
16 &self,
17 github_login: &str,
18 email_address: &str,
19 admin: bool,
20 ) -> Result<UserId>;
21 async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>>;
22 async fn fuzzy_search_users(&self, query: &str, limit: u32) -> Result<Vec<User>>;
23 async fn get_user_by_id(&self, id: UserId) -> Result<Option<User>>;
24 async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<User>>;
25 async fn get_users_with_no_invites(&self, invited_by_another_user: bool) -> Result<Vec<User>>;
26 async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>>;
27 async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()>;
28 async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()>;
29 async fn destroy_user(&self, id: UserId) -> Result<()>;
30
31 async fn set_invite_count_for_user(&self, id: UserId, count: u32) -> Result<()>;
32 async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, u32)>>;
33 async fn get_user_for_invite_code(&self, code: &str) -> Result<User>;
34 async fn create_invite_from_code(&self, code: &str, email_address: &str) -> Result<Invite>;
35
36 async fn create_signup(&self, signup: Signup) -> Result<()>;
37 async fn get_waitlist_summary(&self) -> Result<WaitlistSummary>;
38 async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>>;
39 async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()>;
40 async fn create_user_from_invite(
41 &self,
42 invite: &Invite,
43 user: NewUserParams,
44 ) -> Result<(UserId, Option<UserId>)>;
45
46 /// Registers a new project for the given user.
47 async fn register_project(&self, host_user_id: UserId) -> Result<ProjectId>;
48
49 /// Unregisters a project for the given project id.
50 async fn unregister_project(&self, project_id: ProjectId) -> Result<()>;
51
52 /// Update file counts by extension for the given project and worktree.
53 async fn update_worktree_extensions(
54 &self,
55 project_id: ProjectId,
56 worktree_id: u64,
57 extensions: HashMap<String, u32>,
58 ) -> Result<()>;
59
60 /// Get the file counts on the given project keyed by their worktree and extension.
61 async fn get_project_extensions(
62 &self,
63 project_id: ProjectId,
64 ) -> Result<HashMap<u64, HashMap<String, usize>>>;
65
66 /// Record which users have been active in which projects during
67 /// a given period of time.
68 async fn record_user_activity(
69 &self,
70 time_period: Range<OffsetDateTime>,
71 active_projects: &[(UserId, ProjectId)],
72 ) -> Result<()>;
73
74 /// Get the number of users who have been active in the given
75 /// time period for at least the given time duration.
76 async fn get_active_user_count(
77 &self,
78 time_period: Range<OffsetDateTime>,
79 min_duration: Duration,
80 only_collaborative: bool,
81 ) -> Result<usize>;
82
83 /// Get the users that have been most active during the given time period,
84 /// along with the amount of time they have been active in each project.
85 async fn get_top_users_activity_summary(
86 &self,
87 time_period: Range<OffsetDateTime>,
88 max_user_count: usize,
89 ) -> Result<Vec<UserActivitySummary>>;
90
91 /// Get the project activity for the given user and time period.
92 async fn get_user_activity_timeline(
93 &self,
94 time_period: Range<OffsetDateTime>,
95 user_id: UserId,
96 ) -> Result<Vec<UserActivityPeriod>>;
97
98 async fn get_contacts(&self, id: UserId) -> Result<Vec<Contact>>;
99 async fn has_contact(&self, user_id_a: UserId, user_id_b: UserId) -> Result<bool>;
100 async fn send_contact_request(&self, requester_id: UserId, responder_id: UserId) -> Result<()>;
101 async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<()>;
102 async fn dismiss_contact_notification(
103 &self,
104 responder_id: UserId,
105 requester_id: UserId,
106 ) -> Result<()>;
107 async fn respond_to_contact_request(
108 &self,
109 responder_id: UserId,
110 requester_id: UserId,
111 accept: bool,
112 ) -> Result<()>;
113
114 async fn create_access_token_hash(
115 &self,
116 user_id: UserId,
117 access_token_hash: &str,
118 max_access_token_count: usize,
119 ) -> Result<()>;
120 async fn get_access_token_hashes(&self, user_id: UserId) -> Result<Vec<String>>;
121
122 #[cfg(any(test, feature = "seed-support"))]
123 async fn find_org_by_slug(&self, slug: &str) -> Result<Option<Org>>;
124 #[cfg(any(test, feature = "seed-support"))]
125 async fn create_org(&self, name: &str, slug: &str) -> Result<OrgId>;
126 #[cfg(any(test, feature = "seed-support"))]
127 async fn add_org_member(&self, org_id: OrgId, user_id: UserId, is_admin: bool) -> Result<()>;
128 #[cfg(any(test, feature = "seed-support"))]
129 async fn create_org_channel(&self, org_id: OrgId, name: &str) -> Result<ChannelId>;
130 #[cfg(any(test, feature = "seed-support"))]
131
132 async fn get_org_channels(&self, org_id: OrgId) -> Result<Vec<Channel>>;
133 async fn get_accessible_channels(&self, user_id: UserId) -> Result<Vec<Channel>>;
134 async fn can_user_access_channel(&self, user_id: UserId, channel_id: ChannelId)
135 -> Result<bool>;
136
137 #[cfg(any(test, feature = "seed-support"))]
138 async fn add_channel_member(
139 &self,
140 channel_id: ChannelId,
141 user_id: UserId,
142 is_admin: bool,
143 ) -> Result<()>;
144 async fn create_channel_message(
145 &self,
146 channel_id: ChannelId,
147 sender_id: UserId,
148 body: &str,
149 timestamp: OffsetDateTime,
150 nonce: u128,
151 ) -> Result<MessageId>;
152 async fn get_channel_messages(
153 &self,
154 channel_id: ChannelId,
155 count: usize,
156 before_id: Option<MessageId>,
157 ) -> Result<Vec<ChannelMessage>>;
158
159 #[cfg(test)]
160 async fn teardown(&self, url: &str);
161
162 #[cfg(test)]
163 fn as_fake(&self) -> Option<&FakeDb>;
164}
165
166pub struct PostgresDb {
167 pool: sqlx::PgPool,
168}
169
170impl PostgresDb {
171 pub async fn new(url: &str, max_connections: u32) -> Result<Self> {
172 let pool = DbOptions::new()
173 .max_connections(max_connections)
174 .connect(url)
175 .await
176 .context("failed to connect to postgres database")?;
177 Ok(Self { pool })
178 }
179
180 pub fn fuzzy_like_string(string: &str) -> String {
181 let mut result = String::with_capacity(string.len() * 2 + 1);
182 for c in string.chars() {
183 if c.is_alphanumeric() {
184 result.push('%');
185 result.push(c);
186 }
187 }
188 result.push('%');
189 result
190 }
191}
192
193#[async_trait]
194impl Db for PostgresDb {
195 // users
196
197 async fn create_user(
198 &self,
199 github_login: &str,
200 email_address: &str,
201 admin: bool,
202 ) -> Result<UserId> {
203 let query = "
204 INSERT INTO users (github_login, email_address, admin)
205 VALUES ($1, $2, $3)
206 ON CONFLICT (github_login) DO UPDATE SET github_login = excluded.github_login
207 RETURNING id
208 ";
209 Ok(sqlx::query_scalar(query)
210 .bind(github_login)
211 .bind(email_address)
212 .bind(admin)
213 .fetch_one(&self.pool)
214 .await
215 .map(UserId)?)
216 }
217
218 async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
219 let query = "SELECT * FROM users ORDER BY github_login ASC LIMIT $1 OFFSET $2";
220 Ok(sqlx::query_as(query)
221 .bind(limit as i32)
222 .bind((page * limit) as i32)
223 .fetch_all(&self.pool)
224 .await?)
225 }
226
227 async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
228 let like_string = Self::fuzzy_like_string(name_query);
229 let query = "
230 SELECT users.*
231 FROM users
232 WHERE github_login ILIKE $1
233 ORDER BY github_login <-> $2
234 LIMIT $3
235 ";
236 Ok(sqlx::query_as(query)
237 .bind(like_string)
238 .bind(name_query)
239 .bind(limit as i32)
240 .fetch_all(&self.pool)
241 .await?)
242 }
243
244 async fn get_user_by_id(&self, id: UserId) -> Result<Option<User>> {
245 let users = self.get_users_by_ids(vec![id]).await?;
246 Ok(users.into_iter().next())
247 }
248
249 async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<User>> {
250 let ids = ids.into_iter().map(|id| id.0).collect::<Vec<_>>();
251 let query = "
252 SELECT users.*
253 FROM users
254 WHERE users.id = ANY ($1)
255 ";
256 Ok(sqlx::query_as(query)
257 .bind(&ids)
258 .fetch_all(&self.pool)
259 .await?)
260 }
261
262 async fn get_users_with_no_invites(&self, invited_by_another_user: bool) -> Result<Vec<User>> {
263 let query = format!(
264 "
265 SELECT users.*
266 FROM users
267 WHERE invite_count = 0
268 AND inviter_id IS{} NULL
269 ",
270 if invited_by_another_user { " NOT" } else { "" }
271 );
272
273 Ok(sqlx::query_as(&query).fetch_all(&self.pool).await?)
274 }
275
276 async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>> {
277 let query = "SELECT * FROM users WHERE github_login = $1 LIMIT 1";
278 Ok(sqlx::query_as(query)
279 .bind(github_login)
280 .fetch_optional(&self.pool)
281 .await?)
282 }
283
284 async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
285 let query = "UPDATE users SET admin = $1 WHERE id = $2";
286 Ok(sqlx::query(query)
287 .bind(is_admin)
288 .bind(id.0)
289 .execute(&self.pool)
290 .await
291 .map(drop)?)
292 }
293
294 async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
295 let query = "UPDATE users SET connected_once = $1 WHERE id = $2";
296 Ok(sqlx::query(query)
297 .bind(connected_once)
298 .bind(id.0)
299 .execute(&self.pool)
300 .await
301 .map(drop)?)
302 }
303
304 async fn destroy_user(&self, id: UserId) -> Result<()> {
305 let query = "DELETE FROM access_tokens WHERE user_id = $1;";
306 sqlx::query(query)
307 .bind(id.0)
308 .execute(&self.pool)
309 .await
310 .map(drop)?;
311 let query = "DELETE FROM users WHERE id = $1;";
312 Ok(sqlx::query(query)
313 .bind(id.0)
314 .execute(&self.pool)
315 .await
316 .map(drop)?)
317 }
318
319 // signups
320
321 async fn create_signup(&self, signup: Signup) -> Result<()> {
322 sqlx::query(
323 "
324 INSERT INTO signups
325 (
326 email_address,
327 email_confirmation_code,
328 email_confirmation_sent,
329 platform_linux,
330 platform_mac,
331 platform_windows,
332 platform_unknown,
333 editor_features,
334 programming_languages
335 )
336 VALUES
337 ($1, $2, 'f', $3, $4, $5, 'f', $6, $7)
338 ",
339 )
340 .bind(&signup.email_address)
341 .bind(&random_email_confirmation_code())
342 .bind(&signup.platform_linux)
343 .bind(&signup.platform_mac)
344 .bind(&signup.platform_windows)
345 .bind(&signup.editor_features)
346 .bind(&signup.programming_languages)
347 .execute(&self.pool)
348 .await?;
349 Ok(())
350 }
351
352 async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
353 Ok(sqlx::query_as(
354 "
355 SELECT
356 COUNT(*) as count,
357 COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
358 COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
359 COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count
360 FROM (
361 SELECT *
362 FROM signups
363 WHERE
364 NOT email_confirmation_sent
365 ) AS unsent
366 ",
367 )
368 .fetch_one(&self.pool)
369 .await?)
370 }
371
372 async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
373 Ok(sqlx::query_as(
374 "
375 SELECT
376 email_address, email_confirmation_code
377 FROM signups
378 WHERE
379 NOT email_confirmation_sent AND
380 platform_mac
381 LIMIT $1
382 ",
383 )
384 .bind(count as i32)
385 .fetch_all(&self.pool)
386 .await?)
387 }
388
389 async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
390 sqlx::query(
391 "
392 UPDATE signups
393 SET email_confirmation_sent = 't'
394 WHERE email_address = ANY ($1)
395 ",
396 )
397 .bind(
398 &invites
399 .iter()
400 .map(|s| s.email_address.as_str())
401 .collect::<Vec<_>>(),
402 )
403 .execute(&self.pool)
404 .await?;
405 Ok(())
406 }
407
408 async fn create_user_from_invite(
409 &self,
410 invite: &Invite,
411 user: NewUserParams,
412 ) -> Result<(UserId, Option<UserId>)> {
413 let mut tx = self.pool.begin().await?;
414
415 let (signup_id, metrics_id, inviting_user_id): (i32, i32, Option<UserId>) = sqlx::query_as(
416 "
417 SELECT id, metrics_id, inviting_user_id
418 FROM signups
419 WHERE
420 email_address = $1 AND
421 email_confirmation_code = $2 AND
422 user_id is NULL
423 ",
424 )
425 .bind(&invite.email_address)
426 .bind(&invite.email_confirmation_code)
427 .fetch_optional(&mut tx)
428 .await?
429 .ok_or_else(|| anyhow!("no such invite"))?;
430
431 let user_id: UserId = sqlx::query_scalar(
432 "
433 INSERT INTO users
434 (email_address, github_login, admin, invite_count, invite_code, metrics_id)
435 VALUES
436 ($1, $2, 'f', $3, $4, $5)
437 RETURNING id
438 ",
439 )
440 .bind(&invite.email_address)
441 .bind(&user.github_login)
442 .bind(&user.invite_count)
443 .bind(random_invite_code())
444 .bind(metrics_id)
445 .fetch_one(&mut tx)
446 .await?;
447
448 sqlx::query(
449 "
450 UPDATE signups
451 SET user_id = $1
452 WHERE id = $2
453 ",
454 )
455 .bind(&user_id)
456 .bind(&signup_id)
457 .execute(&mut tx)
458 .await?;
459
460 if let Some(inviting_user_id) = inviting_user_id {
461 let id: Option<UserId> = sqlx::query_scalar(
462 "
463 UPDATE users
464 SET invite_count = invite_count - 1
465 WHERE id = $1 AND invite_count > 0
466 RETURNING id
467 ",
468 )
469 .bind(&inviting_user_id)
470 .fetch_optional(&mut tx)
471 .await?;
472
473 if id.is_none() {
474 Err(Error::Http(
475 StatusCode::UNAUTHORIZED,
476 "no invites remaining".to_string(),
477 ))?;
478 }
479
480 sqlx::query(
481 "
482 INSERT INTO contacts
483 (user_id_a, user_id_b, a_to_b, should_notify, accepted)
484 VALUES
485 ($1, $2, 't', 't', 't')
486 ",
487 )
488 .bind(inviting_user_id)
489 .bind(user_id)
490 .execute(&mut tx)
491 .await?;
492 }
493
494 tx.commit().await?;
495 Ok((user_id, inviting_user_id))
496 }
497
498 // invite codes
499
500 async fn set_invite_count_for_user(&self, id: UserId, count: u32) -> Result<()> {
501 let mut tx = self.pool.begin().await?;
502 if count > 0 {
503 sqlx::query(
504 "
505 UPDATE users
506 SET invite_code = $1
507 WHERE id = $2 AND invite_code IS NULL
508 ",
509 )
510 .bind(random_invite_code())
511 .bind(id)
512 .execute(&mut tx)
513 .await?;
514 }
515
516 sqlx::query(
517 "
518 UPDATE users
519 SET invite_count = $1
520 WHERE id = $2
521 ",
522 )
523 .bind(count as i32)
524 .bind(id)
525 .execute(&mut tx)
526 .await?;
527 tx.commit().await?;
528 Ok(())
529 }
530
531 async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, u32)>> {
532 let result: Option<(String, i32)> = sqlx::query_as(
533 "
534 SELECT invite_code, invite_count
535 FROM users
536 WHERE id = $1 AND invite_code IS NOT NULL
537 ",
538 )
539 .bind(id)
540 .fetch_optional(&self.pool)
541 .await?;
542 if let Some((code, count)) = result {
543 Ok(Some((code, count.try_into().map_err(anyhow::Error::new)?)))
544 } else {
545 Ok(None)
546 }
547 }
548
549 async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
550 sqlx::query_as(
551 "
552 SELECT *
553 FROM users
554 WHERE invite_code = $1
555 ",
556 )
557 .bind(code)
558 .fetch_optional(&self.pool)
559 .await?
560 .ok_or_else(|| {
561 Error::Http(
562 StatusCode::NOT_FOUND,
563 "that invite code does not exist".to_string(),
564 )
565 })
566 }
567
568 async fn create_invite_from_code(&self, code: &str, email_address: &str) -> Result<Invite> {
569 let mut tx = self.pool.begin().await?;
570
571 let existing_user: Option<UserId> = sqlx::query_scalar(
572 "
573 SELECT id
574 FROM users
575 WHERE email_address = $1
576 ",
577 )
578 .bind(email_address)
579 .fetch_optional(&mut tx)
580 .await?;
581 if existing_user.is_some() {
582 Err(anyhow!("email address is already in use"))?;
583 }
584
585 let row: Option<(UserId, i32)> = sqlx::query_as(
586 "
587 SELECT id, invite_count
588 FROM users
589 WHERE invite_code = $1
590 ",
591 )
592 .bind(code)
593 .fetch_optional(&mut tx)
594 .await?;
595
596 let (inviter_id, invite_count) = match row {
597 Some(row) => row,
598 None => Err(Error::Http(
599 StatusCode::NOT_FOUND,
600 "invite code not found".to_string(),
601 ))?,
602 };
603
604 if invite_count == 0 {
605 Err(Error::Http(
606 StatusCode::UNAUTHORIZED,
607 "no invites remaining".to_string(),
608 ))?;
609 }
610
611 let email_confirmation_code: String = sqlx::query_scalar(
612 "
613 INSERT INTO signups
614 (
615 email_address,
616 email_confirmation_code,
617 email_confirmation_sent,
618 inviting_user_id,
619 platform_linux,
620 platform_mac,
621 platform_windows,
622 platform_unknown
623 )
624 VALUES
625 ($1, $2, 'f', $3, 'f', 'f', 'f', 't')
626 ON CONFLICT (email_address)
627 DO UPDATE SET
628 inviting_user_id = excluded.inviting_user_id
629 RETURNING email_confirmation_code
630 ",
631 )
632 .bind(&email_address)
633 .bind(&random_email_confirmation_code())
634 .bind(&inviter_id)
635 .fetch_one(&mut tx)
636 .await?;
637
638 tx.commit().await?;
639
640 Ok(Invite {
641 email_address: email_address.into(),
642 email_confirmation_code,
643 })
644 }
645
646 // projects
647
648 async fn register_project(&self, host_user_id: UserId) -> Result<ProjectId> {
649 Ok(sqlx::query_scalar(
650 "
651 INSERT INTO projects(host_user_id)
652 VALUES ($1)
653 RETURNING id
654 ",
655 )
656 .bind(host_user_id)
657 .fetch_one(&self.pool)
658 .await
659 .map(ProjectId)?)
660 }
661
662 async fn unregister_project(&self, project_id: ProjectId) -> Result<()> {
663 sqlx::query(
664 "
665 UPDATE projects
666 SET unregistered = 't'
667 WHERE id = $1
668 ",
669 )
670 .bind(project_id)
671 .execute(&self.pool)
672 .await?;
673 Ok(())
674 }
675
676 async fn update_worktree_extensions(
677 &self,
678 project_id: ProjectId,
679 worktree_id: u64,
680 extensions: HashMap<String, u32>,
681 ) -> Result<()> {
682 if extensions.is_empty() {
683 return Ok(());
684 }
685
686 let mut query = QueryBuilder::new(
687 "INSERT INTO worktree_extensions (project_id, worktree_id, extension, count)",
688 );
689 query.push_values(extensions, |mut query, (extension, count)| {
690 query
691 .push_bind(project_id)
692 .push_bind(worktree_id as i32)
693 .push_bind(extension)
694 .push_bind(count as i32);
695 });
696 query.push(
697 "
698 ON CONFLICT (project_id, worktree_id, extension) DO UPDATE SET
699 count = excluded.count
700 ",
701 );
702 query.build().execute(&self.pool).await?;
703
704 Ok(())
705 }
706
707 async fn get_project_extensions(
708 &self,
709 project_id: ProjectId,
710 ) -> Result<HashMap<u64, HashMap<String, usize>>> {
711 #[derive(Clone, Debug, Default, FromRow, Serialize, PartialEq)]
712 struct WorktreeExtension {
713 worktree_id: i32,
714 extension: String,
715 count: i32,
716 }
717
718 let query = "
719 SELECT worktree_id, extension, count
720 FROM worktree_extensions
721 WHERE project_id = $1
722 ";
723 let counts = sqlx::query_as::<_, WorktreeExtension>(query)
724 .bind(&project_id)
725 .fetch_all(&self.pool)
726 .await?;
727
728 let mut extension_counts = HashMap::default();
729 for count in counts {
730 extension_counts
731 .entry(count.worktree_id as u64)
732 .or_insert_with(HashMap::default)
733 .insert(count.extension, count.count as usize);
734 }
735 Ok(extension_counts)
736 }
737
738 async fn record_user_activity(
739 &self,
740 time_period: Range<OffsetDateTime>,
741 projects: &[(UserId, ProjectId)],
742 ) -> Result<()> {
743 let query = "
744 INSERT INTO project_activity_periods
745 (ended_at, duration_millis, user_id, project_id)
746 VALUES
747 ($1, $2, $3, $4);
748 ";
749
750 let mut tx = self.pool.begin().await?;
751 let duration_millis =
752 ((time_period.end - time_period.start).as_seconds_f64() * 1000.0) as i32;
753 for (user_id, project_id) in projects {
754 sqlx::query(query)
755 .bind(time_period.end)
756 .bind(duration_millis)
757 .bind(user_id)
758 .bind(project_id)
759 .execute(&mut tx)
760 .await?;
761 }
762 tx.commit().await?;
763
764 Ok(())
765 }
766
767 async fn get_active_user_count(
768 &self,
769 time_period: Range<OffsetDateTime>,
770 min_duration: Duration,
771 only_collaborative: bool,
772 ) -> Result<usize> {
773 let mut with_clause = String::new();
774 with_clause.push_str("WITH\n");
775 with_clause.push_str(
776 "
777 project_durations AS (
778 SELECT user_id, project_id, SUM(duration_millis) AS project_duration
779 FROM project_activity_periods
780 WHERE $1 < ended_at AND ended_at <= $2
781 GROUP BY user_id, project_id
782 ),
783 ",
784 );
785 with_clause.push_str(
786 "
787 project_collaborators as (
788 SELECT project_id, COUNT(DISTINCT user_id) as max_collaborators
789 FROM project_durations
790 GROUP BY project_id
791 ),
792 ",
793 );
794
795 if only_collaborative {
796 with_clause.push_str(
797 "
798 user_durations AS (
799 SELECT user_id, SUM(project_duration) as total_duration
800 FROM project_durations, project_collaborators
801 WHERE
802 project_durations.project_id = project_collaborators.project_id AND
803 max_collaborators > 1
804 GROUP BY user_id
805 ORDER BY total_duration DESC
806 LIMIT $3
807 )
808 ",
809 );
810 } else {
811 with_clause.push_str(
812 "
813 user_durations AS (
814 SELECT user_id, SUM(project_duration) as total_duration
815 FROM project_durations
816 GROUP BY user_id
817 ORDER BY total_duration DESC
818 LIMIT $3
819 )
820 ",
821 );
822 }
823
824 let query = format!(
825 "
826 {with_clause}
827 SELECT count(user_durations.user_id)
828 FROM user_durations
829 WHERE user_durations.total_duration >= $3
830 "
831 );
832
833 let count: i64 = sqlx::query_scalar(&query)
834 .bind(time_period.start)
835 .bind(time_period.end)
836 .bind(min_duration.as_millis() as i64)
837 .fetch_one(&self.pool)
838 .await?;
839 Ok(count as usize)
840 }
841
842 async fn get_top_users_activity_summary(
843 &self,
844 time_period: Range<OffsetDateTime>,
845 max_user_count: usize,
846 ) -> Result<Vec<UserActivitySummary>> {
847 let query = "
848 WITH
849 project_durations AS (
850 SELECT user_id, project_id, SUM(duration_millis) AS project_duration
851 FROM project_activity_periods
852 WHERE $1 < ended_at AND ended_at <= $2
853 GROUP BY user_id, project_id
854 ),
855 user_durations AS (
856 SELECT user_id, SUM(project_duration) as total_duration
857 FROM project_durations
858 GROUP BY user_id
859 ORDER BY total_duration DESC
860 LIMIT $3
861 ),
862 project_collaborators as (
863 SELECT project_id, COUNT(DISTINCT user_id) as max_collaborators
864 FROM project_durations
865 GROUP BY project_id
866 )
867 SELECT user_durations.user_id, users.github_login, project_durations.project_id, project_duration, max_collaborators
868 FROM user_durations, project_durations, project_collaborators, users
869 WHERE
870 user_durations.user_id = project_durations.user_id AND
871 user_durations.user_id = users.id AND
872 project_durations.project_id = project_collaborators.project_id
873 ORDER BY total_duration DESC, user_id ASC, project_id ASC
874 ";
875
876 let mut rows = sqlx::query_as::<_, (UserId, String, ProjectId, i64, i64)>(query)
877 .bind(time_period.start)
878 .bind(time_period.end)
879 .bind(max_user_count as i32)
880 .fetch(&self.pool);
881
882 let mut result = Vec::<UserActivitySummary>::new();
883 while let Some(row) = rows.next().await {
884 let (user_id, github_login, project_id, duration_millis, project_collaborators) = row?;
885 let project_id = project_id;
886 let duration = Duration::from_millis(duration_millis as u64);
887 let project_activity = ProjectActivitySummary {
888 id: project_id,
889 duration,
890 max_collaborators: project_collaborators as usize,
891 };
892 if let Some(last_summary) = result.last_mut() {
893 if last_summary.id == user_id {
894 last_summary.project_activity.push(project_activity);
895 continue;
896 }
897 }
898 result.push(UserActivitySummary {
899 id: user_id,
900 project_activity: vec![project_activity],
901 github_login,
902 });
903 }
904
905 Ok(result)
906 }
907
908 async fn get_user_activity_timeline(
909 &self,
910 time_period: Range<OffsetDateTime>,
911 user_id: UserId,
912 ) -> Result<Vec<UserActivityPeriod>> {
913 const COALESCE_THRESHOLD: Duration = Duration::from_secs(30);
914
915 let query = "
916 SELECT
917 project_activity_periods.ended_at,
918 project_activity_periods.duration_millis,
919 project_activity_periods.project_id,
920 worktree_extensions.extension,
921 worktree_extensions.count
922 FROM project_activity_periods
923 LEFT OUTER JOIN
924 worktree_extensions
925 ON
926 project_activity_periods.project_id = worktree_extensions.project_id
927 WHERE
928 project_activity_periods.user_id = $1 AND
929 $2 < project_activity_periods.ended_at AND
930 project_activity_periods.ended_at <= $3
931 ORDER BY project_activity_periods.id ASC
932 ";
933
934 let mut rows = sqlx::query_as::<
935 _,
936 (
937 PrimitiveDateTime,
938 i32,
939 ProjectId,
940 Option<String>,
941 Option<i32>,
942 ),
943 >(query)
944 .bind(user_id)
945 .bind(time_period.start)
946 .bind(time_period.end)
947 .fetch(&self.pool);
948
949 let mut time_periods: HashMap<ProjectId, Vec<UserActivityPeriod>> = Default::default();
950 while let Some(row) = rows.next().await {
951 let (ended_at, duration_millis, project_id, extension, extension_count) = row?;
952 let ended_at = ended_at.assume_utc();
953 let duration = Duration::from_millis(duration_millis as u64);
954 let started_at = ended_at - duration;
955 let project_time_periods = time_periods.entry(project_id).or_default();
956
957 if let Some(prev_duration) = project_time_periods.last_mut() {
958 if started_at <= prev_duration.end + COALESCE_THRESHOLD
959 && ended_at >= prev_duration.start
960 {
961 prev_duration.end = cmp::max(prev_duration.end, ended_at);
962 } else {
963 project_time_periods.push(UserActivityPeriod {
964 project_id,
965 start: started_at,
966 end: ended_at,
967 extensions: Default::default(),
968 });
969 }
970 } else {
971 project_time_periods.push(UserActivityPeriod {
972 project_id,
973 start: started_at,
974 end: ended_at,
975 extensions: Default::default(),
976 });
977 }
978
979 if let Some((extension, extension_count)) = extension.zip(extension_count) {
980 project_time_periods
981 .last_mut()
982 .unwrap()
983 .extensions
984 .insert(extension, extension_count as usize);
985 }
986 }
987
988 let mut durations = time_periods.into_values().flatten().collect::<Vec<_>>();
989 durations.sort_unstable_by_key(|duration| duration.start);
990 Ok(durations)
991 }
992
993 // contacts
994
995 async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
996 let query = "
997 SELECT user_id_a, user_id_b, a_to_b, accepted, should_notify
998 FROM contacts
999 WHERE user_id_a = $1 OR user_id_b = $1;
1000 ";
1001
1002 let mut rows = sqlx::query_as::<_, (UserId, UserId, bool, bool, bool)>(query)
1003 .bind(user_id)
1004 .fetch(&self.pool);
1005
1006 let mut contacts = vec![Contact::Accepted {
1007 user_id,
1008 should_notify: false,
1009 }];
1010 while let Some(row) = rows.next().await {
1011 let (user_id_a, user_id_b, a_to_b, accepted, should_notify) = row?;
1012
1013 if user_id_a == user_id {
1014 if accepted {
1015 contacts.push(Contact::Accepted {
1016 user_id: user_id_b,
1017 should_notify: should_notify && a_to_b,
1018 });
1019 } else if a_to_b {
1020 contacts.push(Contact::Outgoing { user_id: user_id_b })
1021 } else {
1022 contacts.push(Contact::Incoming {
1023 user_id: user_id_b,
1024 should_notify,
1025 });
1026 }
1027 } else if accepted {
1028 contacts.push(Contact::Accepted {
1029 user_id: user_id_a,
1030 should_notify: should_notify && !a_to_b,
1031 });
1032 } else if a_to_b {
1033 contacts.push(Contact::Incoming {
1034 user_id: user_id_a,
1035 should_notify,
1036 });
1037 } else {
1038 contacts.push(Contact::Outgoing { user_id: user_id_a });
1039 }
1040 }
1041
1042 contacts.sort_unstable_by_key(|contact| contact.user_id());
1043
1044 Ok(contacts)
1045 }
1046
1047 async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
1048 let (id_a, id_b) = if user_id_1 < user_id_2 {
1049 (user_id_1, user_id_2)
1050 } else {
1051 (user_id_2, user_id_1)
1052 };
1053
1054 let query = "
1055 SELECT 1 FROM contacts
1056 WHERE user_id_a = $1 AND user_id_b = $2 AND accepted = 't'
1057 LIMIT 1
1058 ";
1059 Ok(sqlx::query_scalar::<_, i32>(query)
1060 .bind(id_a.0)
1061 .bind(id_b.0)
1062 .fetch_optional(&self.pool)
1063 .await?
1064 .is_some())
1065 }
1066
1067 async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
1068 let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
1069 (sender_id, receiver_id, true)
1070 } else {
1071 (receiver_id, sender_id, false)
1072 };
1073 let query = "
1074 INSERT into contacts (user_id_a, user_id_b, a_to_b, accepted, should_notify)
1075 VALUES ($1, $2, $3, 'f', 't')
1076 ON CONFLICT (user_id_a, user_id_b) DO UPDATE
1077 SET
1078 accepted = 't',
1079 should_notify = 'f'
1080 WHERE
1081 NOT contacts.accepted AND
1082 ((contacts.a_to_b = excluded.a_to_b AND contacts.user_id_a = excluded.user_id_b) OR
1083 (contacts.a_to_b != excluded.a_to_b AND contacts.user_id_a = excluded.user_id_a));
1084 ";
1085 let result = sqlx::query(query)
1086 .bind(id_a.0)
1087 .bind(id_b.0)
1088 .bind(a_to_b)
1089 .execute(&self.pool)
1090 .await?;
1091
1092 if result.rows_affected() == 1 {
1093 Ok(())
1094 } else {
1095 Err(anyhow!("contact already requested"))?
1096 }
1097 }
1098
1099 async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<()> {
1100 let (id_a, id_b) = if responder_id < requester_id {
1101 (responder_id, requester_id)
1102 } else {
1103 (requester_id, responder_id)
1104 };
1105 let query = "
1106 DELETE FROM contacts
1107 WHERE user_id_a = $1 AND user_id_b = $2;
1108 ";
1109 let result = sqlx::query(query)
1110 .bind(id_a.0)
1111 .bind(id_b.0)
1112 .execute(&self.pool)
1113 .await?;
1114
1115 if result.rows_affected() == 1 {
1116 Ok(())
1117 } else {
1118 Err(anyhow!("no such contact"))?
1119 }
1120 }
1121
1122 async fn dismiss_contact_notification(
1123 &self,
1124 user_id: UserId,
1125 contact_user_id: UserId,
1126 ) -> Result<()> {
1127 let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
1128 (user_id, contact_user_id, true)
1129 } else {
1130 (contact_user_id, user_id, false)
1131 };
1132
1133 let query = "
1134 UPDATE contacts
1135 SET should_notify = 'f'
1136 WHERE
1137 user_id_a = $1 AND user_id_b = $2 AND
1138 (
1139 (a_to_b = $3 AND accepted) OR
1140 (a_to_b != $3 AND NOT accepted)
1141 );
1142 ";
1143
1144 let result = sqlx::query(query)
1145 .bind(id_a.0)
1146 .bind(id_b.0)
1147 .bind(a_to_b)
1148 .execute(&self.pool)
1149 .await?;
1150
1151 if result.rows_affected() == 0 {
1152 Err(anyhow!("no such contact request"))?;
1153 }
1154
1155 Ok(())
1156 }
1157
1158 async fn respond_to_contact_request(
1159 &self,
1160 responder_id: UserId,
1161 requester_id: UserId,
1162 accept: bool,
1163 ) -> Result<()> {
1164 let (id_a, id_b, a_to_b) = if responder_id < requester_id {
1165 (responder_id, requester_id, false)
1166 } else {
1167 (requester_id, responder_id, true)
1168 };
1169 let result = if accept {
1170 let query = "
1171 UPDATE contacts
1172 SET accepted = 't', should_notify = 't'
1173 WHERE user_id_a = $1 AND user_id_b = $2 AND a_to_b = $3;
1174 ";
1175 sqlx::query(query)
1176 .bind(id_a.0)
1177 .bind(id_b.0)
1178 .bind(a_to_b)
1179 .execute(&self.pool)
1180 .await?
1181 } else {
1182 let query = "
1183 DELETE FROM contacts
1184 WHERE user_id_a = $1 AND user_id_b = $2 AND a_to_b = $3 AND NOT accepted;
1185 ";
1186 sqlx::query(query)
1187 .bind(id_a.0)
1188 .bind(id_b.0)
1189 .bind(a_to_b)
1190 .execute(&self.pool)
1191 .await?
1192 };
1193 if result.rows_affected() == 1 {
1194 Ok(())
1195 } else {
1196 Err(anyhow!("no such contact request"))?
1197 }
1198 }
1199
1200 // access tokens
1201
1202 async fn create_access_token_hash(
1203 &self,
1204 user_id: UserId,
1205 access_token_hash: &str,
1206 max_access_token_count: usize,
1207 ) -> Result<()> {
1208 let insert_query = "
1209 INSERT INTO access_tokens (user_id, hash)
1210 VALUES ($1, $2);
1211 ";
1212 let cleanup_query = "
1213 DELETE FROM access_tokens
1214 WHERE id IN (
1215 SELECT id from access_tokens
1216 WHERE user_id = $1
1217 ORDER BY id DESC
1218 OFFSET $3
1219 )
1220 ";
1221
1222 let mut tx = self.pool.begin().await?;
1223 sqlx::query(insert_query)
1224 .bind(user_id.0)
1225 .bind(access_token_hash)
1226 .execute(&mut tx)
1227 .await?;
1228 sqlx::query(cleanup_query)
1229 .bind(user_id.0)
1230 .bind(access_token_hash)
1231 .bind(max_access_token_count as i32)
1232 .execute(&mut tx)
1233 .await?;
1234 Ok(tx.commit().await?)
1235 }
1236
1237 async fn get_access_token_hashes(&self, user_id: UserId) -> Result<Vec<String>> {
1238 let query = "
1239 SELECT hash
1240 FROM access_tokens
1241 WHERE user_id = $1
1242 ORDER BY id DESC
1243 ";
1244 Ok(sqlx::query_scalar(query)
1245 .bind(user_id.0)
1246 .fetch_all(&self.pool)
1247 .await?)
1248 }
1249
1250 // orgs
1251
1252 #[allow(unused)] // Help rust-analyzer
1253 #[cfg(any(test, feature = "seed-support"))]
1254 async fn find_org_by_slug(&self, slug: &str) -> Result<Option<Org>> {
1255 let query = "
1256 SELECT *
1257 FROM orgs
1258 WHERE slug = $1
1259 ";
1260 Ok(sqlx::query_as(query)
1261 .bind(slug)
1262 .fetch_optional(&self.pool)
1263 .await?)
1264 }
1265
1266 #[cfg(any(test, feature = "seed-support"))]
1267 async fn create_org(&self, name: &str, slug: &str) -> Result<OrgId> {
1268 let query = "
1269 INSERT INTO orgs (name, slug)
1270 VALUES ($1, $2)
1271 RETURNING id
1272 ";
1273 Ok(sqlx::query_scalar(query)
1274 .bind(name)
1275 .bind(slug)
1276 .fetch_one(&self.pool)
1277 .await
1278 .map(OrgId)?)
1279 }
1280
1281 #[cfg(any(test, feature = "seed-support"))]
1282 async fn add_org_member(&self, org_id: OrgId, user_id: UserId, is_admin: bool) -> Result<()> {
1283 let query = "
1284 INSERT INTO org_memberships (org_id, user_id, admin)
1285 VALUES ($1, $2, $3)
1286 ON CONFLICT DO NOTHING
1287 ";
1288 Ok(sqlx::query(query)
1289 .bind(org_id.0)
1290 .bind(user_id.0)
1291 .bind(is_admin)
1292 .execute(&self.pool)
1293 .await
1294 .map(drop)?)
1295 }
1296
1297 // channels
1298
1299 #[cfg(any(test, feature = "seed-support"))]
1300 async fn create_org_channel(&self, org_id: OrgId, name: &str) -> Result<ChannelId> {
1301 let query = "
1302 INSERT INTO channels (owner_id, owner_is_user, name)
1303 VALUES ($1, false, $2)
1304 RETURNING id
1305 ";
1306 Ok(sqlx::query_scalar(query)
1307 .bind(org_id.0)
1308 .bind(name)
1309 .fetch_one(&self.pool)
1310 .await
1311 .map(ChannelId)?)
1312 }
1313
1314 #[allow(unused)] // Help rust-analyzer
1315 #[cfg(any(test, feature = "seed-support"))]
1316 async fn get_org_channels(&self, org_id: OrgId) -> Result<Vec<Channel>> {
1317 let query = "
1318 SELECT *
1319 FROM channels
1320 WHERE
1321 channels.owner_is_user = false AND
1322 channels.owner_id = $1
1323 ";
1324 Ok(sqlx::query_as(query)
1325 .bind(org_id.0)
1326 .fetch_all(&self.pool)
1327 .await?)
1328 }
1329
1330 async fn get_accessible_channels(&self, user_id: UserId) -> Result<Vec<Channel>> {
1331 let query = "
1332 SELECT
1333 channels.*
1334 FROM
1335 channel_memberships, channels
1336 WHERE
1337 channel_memberships.user_id = $1 AND
1338 channel_memberships.channel_id = channels.id
1339 ";
1340 Ok(sqlx::query_as(query)
1341 .bind(user_id.0)
1342 .fetch_all(&self.pool)
1343 .await?)
1344 }
1345
1346 async fn can_user_access_channel(
1347 &self,
1348 user_id: UserId,
1349 channel_id: ChannelId,
1350 ) -> Result<bool> {
1351 let query = "
1352 SELECT id
1353 FROM channel_memberships
1354 WHERE user_id = $1 AND channel_id = $2
1355 LIMIT 1
1356 ";
1357 Ok(sqlx::query_scalar::<_, i32>(query)
1358 .bind(user_id.0)
1359 .bind(channel_id.0)
1360 .fetch_optional(&self.pool)
1361 .await
1362 .map(|e| e.is_some())?)
1363 }
1364
1365 #[cfg(any(test, feature = "seed-support"))]
1366 async fn add_channel_member(
1367 &self,
1368 channel_id: ChannelId,
1369 user_id: UserId,
1370 is_admin: bool,
1371 ) -> Result<()> {
1372 let query = "
1373 INSERT INTO channel_memberships (channel_id, user_id, admin)
1374 VALUES ($1, $2, $3)
1375 ON CONFLICT DO NOTHING
1376 ";
1377 Ok(sqlx::query(query)
1378 .bind(channel_id.0)
1379 .bind(user_id.0)
1380 .bind(is_admin)
1381 .execute(&self.pool)
1382 .await
1383 .map(drop)?)
1384 }
1385
1386 // messages
1387
1388 async fn create_channel_message(
1389 &self,
1390 channel_id: ChannelId,
1391 sender_id: UserId,
1392 body: &str,
1393 timestamp: OffsetDateTime,
1394 nonce: u128,
1395 ) -> Result<MessageId> {
1396 let query = "
1397 INSERT INTO channel_messages (channel_id, sender_id, body, sent_at, nonce)
1398 VALUES ($1, $2, $3, $4, $5)
1399 ON CONFLICT (nonce) DO UPDATE SET nonce = excluded.nonce
1400 RETURNING id
1401 ";
1402 Ok(sqlx::query_scalar(query)
1403 .bind(channel_id.0)
1404 .bind(sender_id.0)
1405 .bind(body)
1406 .bind(timestamp)
1407 .bind(Uuid::from_u128(nonce))
1408 .fetch_one(&self.pool)
1409 .await
1410 .map(MessageId)?)
1411 }
1412
1413 async fn get_channel_messages(
1414 &self,
1415 channel_id: ChannelId,
1416 count: usize,
1417 before_id: Option<MessageId>,
1418 ) -> Result<Vec<ChannelMessage>> {
1419 let query = r#"
1420 SELECT * FROM (
1421 SELECT
1422 id, channel_id, sender_id, body, sent_at AT TIME ZONE 'UTC' as sent_at, nonce
1423 FROM
1424 channel_messages
1425 WHERE
1426 channel_id = $1 AND
1427 id < $2
1428 ORDER BY id DESC
1429 LIMIT $3
1430 ) as recent_messages
1431 ORDER BY id ASC
1432 "#;
1433 Ok(sqlx::query_as(query)
1434 .bind(channel_id.0)
1435 .bind(before_id.unwrap_or(MessageId::MAX))
1436 .bind(count as i64)
1437 .fetch_all(&self.pool)
1438 .await?)
1439 }
1440
1441 #[cfg(test)]
1442 async fn teardown(&self, url: &str) {
1443 use util::ResultExt;
1444
1445 let query = "
1446 SELECT pg_terminate_backend(pg_stat_activity.pid)
1447 FROM pg_stat_activity
1448 WHERE pg_stat_activity.datname = current_database() AND pid <> pg_backend_pid();
1449 ";
1450 sqlx::query(query).execute(&self.pool).await.log_err();
1451 self.pool.close().await;
1452 <sqlx::Postgres as sqlx::migrate::MigrateDatabase>::drop_database(url)
1453 .await
1454 .log_err();
1455 }
1456
1457 #[cfg(test)]
1458 fn as_fake(&self) -> Option<&FakeDb> {
1459 None
1460 }
1461}
1462
1463macro_rules! id_type {
1464 ($name:ident) => {
1465 #[derive(
1466 Clone,
1467 Copy,
1468 Debug,
1469 Default,
1470 PartialEq,
1471 Eq,
1472 PartialOrd,
1473 Ord,
1474 Hash,
1475 sqlx::Type,
1476 Serialize,
1477 Deserialize,
1478 )]
1479 #[sqlx(transparent)]
1480 #[serde(transparent)]
1481 pub struct $name(pub i32);
1482
1483 impl $name {
1484 #[allow(unused)]
1485 pub const MAX: Self = Self(i32::MAX);
1486
1487 #[allow(unused)]
1488 pub fn from_proto(value: u64) -> Self {
1489 Self(value as i32)
1490 }
1491
1492 #[allow(unused)]
1493 pub fn to_proto(self) -> u64 {
1494 self.0 as u64
1495 }
1496 }
1497
1498 impl std::fmt::Display for $name {
1499 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1500 self.0.fmt(f)
1501 }
1502 }
1503 };
1504}
1505
1506id_type!(UserId);
1507#[derive(Clone, Debug, Default, FromRow, Serialize, PartialEq)]
1508pub struct User {
1509 pub id: UserId,
1510 pub github_login: String,
1511 pub email_address: Option<String>,
1512 pub admin: bool,
1513 pub invite_code: Option<String>,
1514 pub invite_count: i32,
1515 pub connected_once: bool,
1516}
1517
1518id_type!(ProjectId);
1519#[derive(Clone, Debug, Default, FromRow, Serialize, PartialEq)]
1520pub struct Project {
1521 pub id: ProjectId,
1522 pub host_user_id: UserId,
1523 pub unregistered: bool,
1524}
1525
1526#[derive(Clone, Debug, PartialEq, Serialize)]
1527pub struct UserActivitySummary {
1528 pub id: UserId,
1529 pub github_login: String,
1530 pub project_activity: Vec<ProjectActivitySummary>,
1531}
1532
1533#[derive(Clone, Debug, PartialEq, Serialize)]
1534pub struct ProjectActivitySummary {
1535 pub id: ProjectId,
1536 pub duration: Duration,
1537 pub max_collaborators: usize,
1538}
1539
1540#[derive(Clone, Debug, PartialEq, Serialize)]
1541pub struct UserActivityPeriod {
1542 pub project_id: ProjectId,
1543 #[serde(with = "time::serde::iso8601")]
1544 pub start: OffsetDateTime,
1545 #[serde(with = "time::serde::iso8601")]
1546 pub end: OffsetDateTime,
1547 pub extensions: HashMap<String, usize>,
1548}
1549
1550id_type!(OrgId);
1551#[derive(FromRow)]
1552pub struct Org {
1553 pub id: OrgId,
1554 pub name: String,
1555 pub slug: String,
1556}
1557
1558id_type!(ChannelId);
1559#[derive(Clone, Debug, FromRow, Serialize)]
1560pub struct Channel {
1561 pub id: ChannelId,
1562 pub name: String,
1563 pub owner_id: i32,
1564 pub owner_is_user: bool,
1565}
1566
1567id_type!(MessageId);
1568#[derive(Clone, Debug, FromRow)]
1569pub struct ChannelMessage {
1570 pub id: MessageId,
1571 pub channel_id: ChannelId,
1572 pub sender_id: UserId,
1573 pub body: String,
1574 pub sent_at: OffsetDateTime,
1575 pub nonce: Uuid,
1576}
1577
1578#[derive(Clone, Debug, PartialEq, Eq)]
1579pub enum Contact {
1580 Accepted {
1581 user_id: UserId,
1582 should_notify: bool,
1583 },
1584 Outgoing {
1585 user_id: UserId,
1586 },
1587 Incoming {
1588 user_id: UserId,
1589 should_notify: bool,
1590 },
1591}
1592
1593impl Contact {
1594 pub fn user_id(&self) -> UserId {
1595 match self {
1596 Contact::Accepted { user_id, .. } => *user_id,
1597 Contact::Outgoing { user_id } => *user_id,
1598 Contact::Incoming { user_id, .. } => *user_id,
1599 }
1600 }
1601}
1602
1603#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
1604pub struct IncomingContactRequest {
1605 pub requester_id: UserId,
1606 pub should_notify: bool,
1607}
1608
1609#[derive(Clone, Deserialize)]
1610pub struct Signup {
1611 pub email_address: String,
1612 pub platform_mac: bool,
1613 pub platform_windows: bool,
1614 pub platform_linux: bool,
1615 pub editor_features: Vec<String>,
1616 pub programming_languages: Vec<String>,
1617}
1618
1619#[derive(Clone, Debug, PartialEq, Deserialize, Serialize, FromRow)]
1620pub struct WaitlistSummary {
1621 #[sqlx(default)]
1622 pub count: i64,
1623 #[sqlx(default)]
1624 pub linux_count: i64,
1625 #[sqlx(default)]
1626 pub mac_count: i64,
1627 #[sqlx(default)]
1628 pub windows_count: i64,
1629}
1630
1631#[derive(FromRow, PartialEq, Debug, Serialize, Deserialize)]
1632pub struct Invite {
1633 pub email_address: String,
1634 pub email_confirmation_code: String,
1635}
1636
1637#[derive(Debug, Serialize, Deserialize)]
1638pub struct NewUserParams {
1639 pub github_login: String,
1640 pub invite_count: i32,
1641}
1642
1643fn random_invite_code() -> String {
1644 nanoid::nanoid!(16)
1645}
1646
1647fn random_email_confirmation_code() -> String {
1648 nanoid::nanoid!(64)
1649}
1650
1651#[cfg(test)]
1652pub use test::*;
1653
1654#[cfg(test)]
1655mod test {
1656 use super::*;
1657 use anyhow::anyhow;
1658 use collections::BTreeMap;
1659 use gpui::executor::Background;
1660 use lazy_static::lazy_static;
1661 use parking_lot::Mutex;
1662 use rand::prelude::*;
1663 use sqlx::{
1664 migrate::{MigrateDatabase, Migrator},
1665 Postgres,
1666 };
1667 use std::{path::Path, sync::Arc};
1668 use util::post_inc;
1669
1670 pub struct FakeDb {
1671 background: Arc<Background>,
1672 pub users: Mutex<BTreeMap<UserId, User>>,
1673 pub projects: Mutex<BTreeMap<ProjectId, Project>>,
1674 pub worktree_extensions: Mutex<BTreeMap<(ProjectId, u64, String), u32>>,
1675 pub orgs: Mutex<BTreeMap<OrgId, Org>>,
1676 pub org_memberships: Mutex<BTreeMap<(OrgId, UserId), bool>>,
1677 pub channels: Mutex<BTreeMap<ChannelId, Channel>>,
1678 pub channel_memberships: Mutex<BTreeMap<(ChannelId, UserId), bool>>,
1679 pub channel_messages: Mutex<BTreeMap<MessageId, ChannelMessage>>,
1680 pub contacts: Mutex<Vec<FakeContact>>,
1681 next_channel_message_id: Mutex<i32>,
1682 next_user_id: Mutex<i32>,
1683 next_org_id: Mutex<i32>,
1684 next_channel_id: Mutex<i32>,
1685 next_project_id: Mutex<i32>,
1686 }
1687
1688 #[derive(Debug)]
1689 pub struct FakeContact {
1690 pub requester_id: UserId,
1691 pub responder_id: UserId,
1692 pub accepted: bool,
1693 pub should_notify: bool,
1694 }
1695
1696 impl FakeDb {
1697 pub fn new(background: Arc<Background>) -> Self {
1698 Self {
1699 background,
1700 users: Default::default(),
1701 next_user_id: Mutex::new(0),
1702 projects: Default::default(),
1703 worktree_extensions: Default::default(),
1704 next_project_id: Mutex::new(1),
1705 orgs: Default::default(),
1706 next_org_id: Mutex::new(1),
1707 org_memberships: Default::default(),
1708 channels: Default::default(),
1709 next_channel_id: Mutex::new(1),
1710 channel_memberships: Default::default(),
1711 channel_messages: Default::default(),
1712 next_channel_message_id: Mutex::new(1),
1713 contacts: Default::default(),
1714 }
1715 }
1716 }
1717
1718 #[async_trait]
1719 impl Db for FakeDb {
1720 async fn create_user(
1721 &self,
1722 github_login: &str,
1723 email_address: &str,
1724 admin: bool,
1725 ) -> Result<UserId> {
1726 self.background.simulate_random_delay().await;
1727
1728 let mut users = self.users.lock();
1729 if let Some(user) = users
1730 .values()
1731 .find(|user| user.github_login == github_login)
1732 {
1733 Ok(user.id)
1734 } else {
1735 let user_id = UserId(post_inc(&mut *self.next_user_id.lock()));
1736 users.insert(
1737 user_id,
1738 User {
1739 id: user_id,
1740 github_login: github_login.to_string(),
1741 email_address: Some(email_address.to_string()),
1742 admin,
1743 invite_code: None,
1744 invite_count: 0,
1745 connected_once: false,
1746 },
1747 );
1748 Ok(user_id)
1749 }
1750 }
1751
1752 async fn get_all_users(&self, _page: u32, _limit: u32) -> Result<Vec<User>> {
1753 unimplemented!()
1754 }
1755
1756 async fn fuzzy_search_users(&self, _: &str, _: u32) -> Result<Vec<User>> {
1757 unimplemented!()
1758 }
1759
1760 async fn get_user_by_id(&self, id: UserId) -> Result<Option<User>> {
1761 self.background.simulate_random_delay().await;
1762 Ok(self.get_users_by_ids(vec![id]).await?.into_iter().next())
1763 }
1764
1765 async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<User>> {
1766 self.background.simulate_random_delay().await;
1767 let users = self.users.lock();
1768 Ok(ids.iter().filter_map(|id| users.get(id).cloned()).collect())
1769 }
1770
1771 async fn get_users_with_no_invites(&self, _: bool) -> Result<Vec<User>> {
1772 unimplemented!()
1773 }
1774
1775 async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>> {
1776 self.background.simulate_random_delay().await;
1777 Ok(self
1778 .users
1779 .lock()
1780 .values()
1781 .find(|user| user.github_login == github_login)
1782 .cloned())
1783 }
1784
1785 async fn set_user_is_admin(&self, _id: UserId, _is_admin: bool) -> Result<()> {
1786 unimplemented!()
1787 }
1788
1789 async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
1790 self.background.simulate_random_delay().await;
1791 let mut users = self.users.lock();
1792 let mut user = users
1793 .get_mut(&id)
1794 .ok_or_else(|| anyhow!("user not found"))?;
1795 user.connected_once = connected_once;
1796 Ok(())
1797 }
1798
1799 async fn destroy_user(&self, _id: UserId) -> Result<()> {
1800 unimplemented!()
1801 }
1802
1803 // signups
1804
1805 async fn create_signup(&self, _signup: Signup) -> Result<()> {
1806 unimplemented!()
1807 }
1808
1809 async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
1810 unimplemented!()
1811 }
1812
1813 async fn get_unsent_invites(&self, _count: usize) -> Result<Vec<Invite>> {
1814 unimplemented!()
1815 }
1816
1817 async fn record_sent_invites(&self, _invites: &[Invite]) -> Result<()> {
1818 unimplemented!()
1819 }
1820
1821 async fn create_user_from_invite(
1822 &self,
1823 _invite: &Invite,
1824 _user: NewUserParams,
1825 ) -> Result<(UserId, Option<UserId>)> {
1826 unimplemented!()
1827 }
1828
1829 // invite codes
1830
1831 async fn set_invite_count_for_user(&self, _id: UserId, _count: u32) -> Result<()> {
1832 unimplemented!()
1833 }
1834
1835 async fn get_invite_code_for_user(&self, _id: UserId) -> Result<Option<(String, u32)>> {
1836 self.background.simulate_random_delay().await;
1837 Ok(None)
1838 }
1839
1840 async fn get_user_for_invite_code(&self, _code: &str) -> Result<User> {
1841 unimplemented!()
1842 }
1843
1844 async fn create_invite_from_code(
1845 &self,
1846 _code: &str,
1847 _email_address: &str,
1848 ) -> Result<Invite> {
1849 unimplemented!()
1850 }
1851
1852 // projects
1853
1854 async fn register_project(&self, host_user_id: UserId) -> Result<ProjectId> {
1855 self.background.simulate_random_delay().await;
1856 if !self.users.lock().contains_key(&host_user_id) {
1857 Err(anyhow!("no such user"))?;
1858 }
1859
1860 let project_id = ProjectId(post_inc(&mut *self.next_project_id.lock()));
1861 self.projects.lock().insert(
1862 project_id,
1863 Project {
1864 id: project_id,
1865 host_user_id,
1866 unregistered: false,
1867 },
1868 );
1869 Ok(project_id)
1870 }
1871
1872 async fn unregister_project(&self, project_id: ProjectId) -> Result<()> {
1873 self.background.simulate_random_delay().await;
1874 self.projects
1875 .lock()
1876 .get_mut(&project_id)
1877 .ok_or_else(|| anyhow!("no such project"))?
1878 .unregistered = true;
1879 Ok(())
1880 }
1881
1882 async fn update_worktree_extensions(
1883 &self,
1884 project_id: ProjectId,
1885 worktree_id: u64,
1886 extensions: HashMap<String, u32>,
1887 ) -> Result<()> {
1888 self.background.simulate_random_delay().await;
1889 if !self.projects.lock().contains_key(&project_id) {
1890 Err(anyhow!("no such project"))?;
1891 }
1892
1893 for (extension, count) in extensions {
1894 self.worktree_extensions
1895 .lock()
1896 .insert((project_id, worktree_id, extension), count);
1897 }
1898
1899 Ok(())
1900 }
1901
1902 async fn get_project_extensions(
1903 &self,
1904 _project_id: ProjectId,
1905 ) -> Result<HashMap<u64, HashMap<String, usize>>> {
1906 unimplemented!()
1907 }
1908
1909 async fn record_user_activity(
1910 &self,
1911 _time_period: Range<OffsetDateTime>,
1912 _active_projects: &[(UserId, ProjectId)],
1913 ) -> Result<()> {
1914 unimplemented!()
1915 }
1916
1917 async fn get_active_user_count(
1918 &self,
1919 _time_period: Range<OffsetDateTime>,
1920 _min_duration: Duration,
1921 _only_collaborative: bool,
1922 ) -> Result<usize> {
1923 unimplemented!()
1924 }
1925
1926 async fn get_top_users_activity_summary(
1927 &self,
1928 _time_period: Range<OffsetDateTime>,
1929 _limit: usize,
1930 ) -> Result<Vec<UserActivitySummary>> {
1931 unimplemented!()
1932 }
1933
1934 async fn get_user_activity_timeline(
1935 &self,
1936 _time_period: Range<OffsetDateTime>,
1937 _user_id: UserId,
1938 ) -> Result<Vec<UserActivityPeriod>> {
1939 unimplemented!()
1940 }
1941
1942 // contacts
1943
1944 async fn get_contacts(&self, id: UserId) -> Result<Vec<Contact>> {
1945 self.background.simulate_random_delay().await;
1946 let mut contacts = vec![Contact::Accepted {
1947 user_id: id,
1948 should_notify: false,
1949 }];
1950
1951 for contact in self.contacts.lock().iter() {
1952 if contact.requester_id == id {
1953 if contact.accepted {
1954 contacts.push(Contact::Accepted {
1955 user_id: contact.responder_id,
1956 should_notify: contact.should_notify,
1957 });
1958 } else {
1959 contacts.push(Contact::Outgoing {
1960 user_id: contact.responder_id,
1961 });
1962 }
1963 } else if contact.responder_id == id {
1964 if contact.accepted {
1965 contacts.push(Contact::Accepted {
1966 user_id: contact.requester_id,
1967 should_notify: false,
1968 });
1969 } else {
1970 contacts.push(Contact::Incoming {
1971 user_id: contact.requester_id,
1972 should_notify: contact.should_notify,
1973 });
1974 }
1975 }
1976 }
1977
1978 contacts.sort_unstable_by_key(|contact| contact.user_id());
1979 Ok(contacts)
1980 }
1981
1982 async fn has_contact(&self, user_id_a: UserId, user_id_b: UserId) -> Result<bool> {
1983 self.background.simulate_random_delay().await;
1984 Ok(self.contacts.lock().iter().any(|contact| {
1985 contact.accepted
1986 && ((contact.requester_id == user_id_a && contact.responder_id == user_id_b)
1987 || (contact.requester_id == user_id_b && contact.responder_id == user_id_a))
1988 }))
1989 }
1990
1991 async fn send_contact_request(
1992 &self,
1993 requester_id: UserId,
1994 responder_id: UserId,
1995 ) -> Result<()> {
1996 self.background.simulate_random_delay().await;
1997 let mut contacts = self.contacts.lock();
1998 for contact in contacts.iter_mut() {
1999 if contact.requester_id == requester_id && contact.responder_id == responder_id {
2000 if contact.accepted {
2001 Err(anyhow!("contact already exists"))?;
2002 } else {
2003 Err(anyhow!("contact already requested"))?;
2004 }
2005 }
2006 if contact.responder_id == requester_id && contact.requester_id == responder_id {
2007 if contact.accepted {
2008 Err(anyhow!("contact already exists"))?;
2009 } else {
2010 contact.accepted = true;
2011 contact.should_notify = false;
2012 return Ok(());
2013 }
2014 }
2015 }
2016 contacts.push(FakeContact {
2017 requester_id,
2018 responder_id,
2019 accepted: false,
2020 should_notify: true,
2021 });
2022 Ok(())
2023 }
2024
2025 async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<()> {
2026 self.background.simulate_random_delay().await;
2027 self.contacts.lock().retain(|contact| {
2028 !(contact.requester_id == requester_id && contact.responder_id == responder_id)
2029 });
2030 Ok(())
2031 }
2032
2033 async fn dismiss_contact_notification(
2034 &self,
2035 user_id: UserId,
2036 contact_user_id: UserId,
2037 ) -> Result<()> {
2038 self.background.simulate_random_delay().await;
2039 let mut contacts = self.contacts.lock();
2040 for contact in contacts.iter_mut() {
2041 if contact.requester_id == contact_user_id
2042 && contact.responder_id == user_id
2043 && !contact.accepted
2044 {
2045 contact.should_notify = false;
2046 return Ok(());
2047 }
2048 if contact.requester_id == user_id
2049 && contact.responder_id == contact_user_id
2050 && contact.accepted
2051 {
2052 contact.should_notify = false;
2053 return Ok(());
2054 }
2055 }
2056 Err(anyhow!("no such notification"))?
2057 }
2058
2059 async fn respond_to_contact_request(
2060 &self,
2061 responder_id: UserId,
2062 requester_id: UserId,
2063 accept: bool,
2064 ) -> Result<()> {
2065 self.background.simulate_random_delay().await;
2066 let mut contacts = self.contacts.lock();
2067 for (ix, contact) in contacts.iter_mut().enumerate() {
2068 if contact.requester_id == requester_id && contact.responder_id == responder_id {
2069 if contact.accepted {
2070 Err(anyhow!("contact already confirmed"))?;
2071 }
2072 if accept {
2073 contact.accepted = true;
2074 contact.should_notify = true;
2075 } else {
2076 contacts.remove(ix);
2077 }
2078 return Ok(());
2079 }
2080 }
2081 Err(anyhow!("no such contact request"))?
2082 }
2083
2084 async fn create_access_token_hash(
2085 &self,
2086 _user_id: UserId,
2087 _access_token_hash: &str,
2088 _max_access_token_count: usize,
2089 ) -> Result<()> {
2090 unimplemented!()
2091 }
2092
2093 async fn get_access_token_hashes(&self, _user_id: UserId) -> Result<Vec<String>> {
2094 unimplemented!()
2095 }
2096
2097 async fn find_org_by_slug(&self, _slug: &str) -> Result<Option<Org>> {
2098 unimplemented!()
2099 }
2100
2101 async fn create_org(&self, name: &str, slug: &str) -> Result<OrgId> {
2102 self.background.simulate_random_delay().await;
2103 let mut orgs = self.orgs.lock();
2104 if orgs.values().any(|org| org.slug == slug) {
2105 Err(anyhow!("org already exists"))?
2106 } else {
2107 let org_id = OrgId(post_inc(&mut *self.next_org_id.lock()));
2108 orgs.insert(
2109 org_id,
2110 Org {
2111 id: org_id,
2112 name: name.to_string(),
2113 slug: slug.to_string(),
2114 },
2115 );
2116 Ok(org_id)
2117 }
2118 }
2119
2120 async fn add_org_member(
2121 &self,
2122 org_id: OrgId,
2123 user_id: UserId,
2124 is_admin: bool,
2125 ) -> Result<()> {
2126 self.background.simulate_random_delay().await;
2127 if !self.orgs.lock().contains_key(&org_id) {
2128 Err(anyhow!("org does not exist"))?;
2129 }
2130 if !self.users.lock().contains_key(&user_id) {
2131 Err(anyhow!("user does not exist"))?;
2132 }
2133
2134 self.org_memberships
2135 .lock()
2136 .entry((org_id, user_id))
2137 .or_insert(is_admin);
2138 Ok(())
2139 }
2140
2141 async fn create_org_channel(&self, org_id: OrgId, name: &str) -> Result<ChannelId> {
2142 self.background.simulate_random_delay().await;
2143 if !self.orgs.lock().contains_key(&org_id) {
2144 Err(anyhow!("org does not exist"))?;
2145 }
2146
2147 let mut channels = self.channels.lock();
2148 let channel_id = ChannelId(post_inc(&mut *self.next_channel_id.lock()));
2149 channels.insert(
2150 channel_id,
2151 Channel {
2152 id: channel_id,
2153 name: name.to_string(),
2154 owner_id: org_id.0,
2155 owner_is_user: false,
2156 },
2157 );
2158 Ok(channel_id)
2159 }
2160
2161 async fn get_org_channels(&self, org_id: OrgId) -> Result<Vec<Channel>> {
2162 self.background.simulate_random_delay().await;
2163 Ok(self
2164 .channels
2165 .lock()
2166 .values()
2167 .filter(|channel| !channel.owner_is_user && channel.owner_id == org_id.0)
2168 .cloned()
2169 .collect())
2170 }
2171
2172 async fn get_accessible_channels(&self, user_id: UserId) -> Result<Vec<Channel>> {
2173 self.background.simulate_random_delay().await;
2174 let channels = self.channels.lock();
2175 let memberships = self.channel_memberships.lock();
2176 Ok(channels
2177 .values()
2178 .filter(|channel| memberships.contains_key(&(channel.id, user_id)))
2179 .cloned()
2180 .collect())
2181 }
2182
2183 async fn can_user_access_channel(
2184 &self,
2185 user_id: UserId,
2186 channel_id: ChannelId,
2187 ) -> Result<bool> {
2188 self.background.simulate_random_delay().await;
2189 Ok(self
2190 .channel_memberships
2191 .lock()
2192 .contains_key(&(channel_id, user_id)))
2193 }
2194
2195 async fn add_channel_member(
2196 &self,
2197 channel_id: ChannelId,
2198 user_id: UserId,
2199 is_admin: bool,
2200 ) -> Result<()> {
2201 self.background.simulate_random_delay().await;
2202 if !self.channels.lock().contains_key(&channel_id) {
2203 Err(anyhow!("channel does not exist"))?;
2204 }
2205 if !self.users.lock().contains_key(&user_id) {
2206 Err(anyhow!("user does not exist"))?;
2207 }
2208
2209 self.channel_memberships
2210 .lock()
2211 .entry((channel_id, user_id))
2212 .or_insert(is_admin);
2213 Ok(())
2214 }
2215
2216 async fn create_channel_message(
2217 &self,
2218 channel_id: ChannelId,
2219 sender_id: UserId,
2220 body: &str,
2221 timestamp: OffsetDateTime,
2222 nonce: u128,
2223 ) -> Result<MessageId> {
2224 self.background.simulate_random_delay().await;
2225 if !self.channels.lock().contains_key(&channel_id) {
2226 Err(anyhow!("channel does not exist"))?;
2227 }
2228 if !self.users.lock().contains_key(&sender_id) {
2229 Err(anyhow!("user does not exist"))?;
2230 }
2231
2232 let mut messages = self.channel_messages.lock();
2233 if let Some(message) = messages
2234 .values()
2235 .find(|message| message.nonce.as_u128() == nonce)
2236 {
2237 Ok(message.id)
2238 } else {
2239 let message_id = MessageId(post_inc(&mut *self.next_channel_message_id.lock()));
2240 messages.insert(
2241 message_id,
2242 ChannelMessage {
2243 id: message_id,
2244 channel_id,
2245 sender_id,
2246 body: body.to_string(),
2247 sent_at: timestamp,
2248 nonce: Uuid::from_u128(nonce),
2249 },
2250 );
2251 Ok(message_id)
2252 }
2253 }
2254
2255 async fn get_channel_messages(
2256 &self,
2257 channel_id: ChannelId,
2258 count: usize,
2259 before_id: Option<MessageId>,
2260 ) -> Result<Vec<ChannelMessage>> {
2261 self.background.simulate_random_delay().await;
2262 let mut messages = self
2263 .channel_messages
2264 .lock()
2265 .values()
2266 .rev()
2267 .filter(|message| {
2268 message.channel_id == channel_id
2269 && message.id < before_id.unwrap_or(MessageId::MAX)
2270 })
2271 .take(count)
2272 .cloned()
2273 .collect::<Vec<_>>();
2274 messages.sort_unstable_by_key(|message| message.id);
2275 Ok(messages)
2276 }
2277
2278 async fn teardown(&self, _: &str) {}
2279
2280 #[cfg(test)]
2281 fn as_fake(&self) -> Option<&FakeDb> {
2282 Some(self)
2283 }
2284 }
2285
2286 pub struct TestDb {
2287 pub db: Option<Arc<dyn Db>>,
2288 pub url: String,
2289 }
2290
2291 impl TestDb {
2292 #[allow(clippy::await_holding_lock)]
2293 pub async fn postgres() -> Self {
2294 lazy_static! {
2295 static ref LOCK: Mutex<()> = Mutex::new(());
2296 }
2297
2298 let _guard = LOCK.lock();
2299 let mut rng = StdRng::from_entropy();
2300 let name = format!("zed-test-{}", rng.gen::<u128>());
2301 let url = format!("postgres://postgres@localhost/{}", name);
2302 let migrations_path = Path::new(concat!(env!("CARGO_MANIFEST_DIR"), "/migrations"));
2303 Postgres::create_database(&url)
2304 .await
2305 .expect("failed to create test db");
2306 let db = PostgresDb::new(&url, 5).await.unwrap();
2307 let migrator = Migrator::new(migrations_path).await.unwrap();
2308 migrator.run(&db.pool).await.unwrap();
2309 Self {
2310 db: Some(Arc::new(db)),
2311 url,
2312 }
2313 }
2314
2315 pub fn fake(background: Arc<Background>) -> Self {
2316 Self {
2317 db: Some(Arc::new(FakeDb::new(background))),
2318 url: Default::default(),
2319 }
2320 }
2321
2322 pub fn db(&self) -> &Arc<dyn Db> {
2323 self.db.as_ref().unwrap()
2324 }
2325 }
2326
2327 impl Drop for TestDb {
2328 fn drop(&mut self) {
2329 if let Some(db) = self.db.take() {
2330 futures::executor::block_on(db.teardown(&self.url));
2331 }
2332 }
2333 }
2334}