1use crate::{Error, Result};
2use anyhow::anyhow;
3use axum::http::StatusCode;
4use collections::{BTreeMap, HashMap, HashSet};
5use futures::{future::BoxFuture, FutureExt, StreamExt};
6use rpc::{proto, ConnectionId};
7use serde::{Deserialize, Serialize};
8use sqlx::{
9 migrate::{Migrate as _, Migration, MigrationSource},
10 types::Uuid,
11 FromRow,
12};
13use std::{future::Future, path::Path, time::Duration};
14use time::{OffsetDateTime, PrimitiveDateTime};
15
16#[cfg(test)]
17pub type DefaultDb = Db<sqlx::Sqlite>;
18
19#[cfg(not(test))]
20pub type DefaultDb = Db<sqlx::Postgres>;
21
22pub struct Db<D: sqlx::Database> {
23 pool: sqlx::Pool<D>,
24 #[cfg(test)]
25 background: Option<std::sync::Arc<gpui::executor::Background>>,
26 #[cfg(test)]
27 runtime: Option<tokio::runtime::Runtime>,
28}
29
30pub trait BeginTransaction: Send + Sync {
31 type Database: sqlx::Database;
32
33 fn begin_transaction(&self) -> BoxFuture<Result<sqlx::Transaction<'static, Self::Database>>>;
34}
35
36// In Postgres, serializable transactions are opt-in
37impl BeginTransaction for Db<sqlx::Postgres> {
38 type Database = sqlx::Postgres;
39
40 fn begin_transaction(&self) -> BoxFuture<Result<sqlx::Transaction<'static, sqlx::Postgres>>> {
41 async move {
42 let mut tx = self.pool.begin().await?;
43 sqlx::Executor::execute(&mut tx, "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;")
44 .await?;
45 Ok(tx)
46 }
47 .boxed()
48 }
49}
50
51// In Sqlite, transactions are inherently serializable.
52#[cfg(test)]
53impl BeginTransaction for Db<sqlx::Sqlite> {
54 type Database = sqlx::Sqlite;
55
56 fn begin_transaction(&self) -> BoxFuture<Result<sqlx::Transaction<'static, sqlx::Sqlite>>> {
57 async move { Ok(self.pool.begin().await?) }.boxed()
58 }
59}
60
61pub trait RowsAffected {
62 fn rows_affected(&self) -> u64;
63}
64
65#[cfg(test)]
66impl RowsAffected for sqlx::sqlite::SqliteQueryResult {
67 fn rows_affected(&self) -> u64 {
68 self.rows_affected()
69 }
70}
71
72impl RowsAffected for sqlx::postgres::PgQueryResult {
73 fn rows_affected(&self) -> u64 {
74 self.rows_affected()
75 }
76}
77
78#[cfg(test)]
79impl Db<sqlx::Sqlite> {
80 pub async fn new(url: &str, max_connections: u32) -> Result<Self> {
81 use std::str::FromStr as _;
82 let options = sqlx::sqlite::SqliteConnectOptions::from_str(url)
83 .unwrap()
84 .create_if_missing(true)
85 .shared_cache(true);
86 let pool = sqlx::sqlite::SqlitePoolOptions::new()
87 .min_connections(2)
88 .max_connections(max_connections)
89 .connect_with(options)
90 .await?;
91 Ok(Self {
92 pool,
93 background: None,
94 runtime: None,
95 })
96 }
97
98 pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<User>> {
99 self.transact(|tx| async {
100 let mut tx = tx;
101 let query = "
102 SELECT users.*
103 FROM users
104 WHERE users.id IN (SELECT value from json_each($1))
105 ";
106 Ok(sqlx::query_as(query)
107 .bind(&serde_json::json!(ids))
108 .fetch_all(&mut tx)
109 .await?)
110 })
111 .await
112 }
113
114 pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
115 self.transact(|mut tx| async move {
116 let query = "
117 SELECT metrics_id
118 FROM users
119 WHERE id = $1
120 ";
121 Ok(sqlx::query_scalar(query)
122 .bind(id)
123 .fetch_one(&mut tx)
124 .await?)
125 })
126 .await
127 }
128
129 pub async fn create_user(
130 &self,
131 email_address: &str,
132 admin: bool,
133 params: NewUserParams,
134 ) -> Result<NewUserResult> {
135 self.transact(|mut tx| async {
136 let query = "
137 INSERT INTO users (email_address, github_login, github_user_id, admin, metrics_id)
138 VALUES ($1, $2, $3, $4, $5)
139 ON CONFLICT (github_login) DO UPDATE SET github_login = excluded.github_login
140 RETURNING id, metrics_id
141 ";
142
143 let (user_id, metrics_id): (UserId, String) = sqlx::query_as(query)
144 .bind(email_address)
145 .bind(¶ms.github_login)
146 .bind(¶ms.github_user_id)
147 .bind(admin)
148 .bind(Uuid::new_v4().to_string())
149 .fetch_one(&mut tx)
150 .await?;
151 tx.commit().await?;
152 Ok(NewUserResult {
153 user_id,
154 metrics_id,
155 signup_device_id: None,
156 inviting_user_id: None,
157 })
158 })
159 .await
160 }
161
162 pub async fn fuzzy_search_users(&self, _name_query: &str, _limit: u32) -> Result<Vec<User>> {
163 unimplemented!()
164 }
165
166 pub async fn create_user_from_invite(
167 &self,
168 _invite: &Invite,
169 _user: NewUserParams,
170 ) -> Result<Option<NewUserResult>> {
171 unimplemented!()
172 }
173
174 pub async fn create_signup(&self, _signup: Signup) -> Result<()> {
175 unimplemented!()
176 }
177
178 pub async fn create_invite_from_code(
179 &self,
180 _code: &str,
181 _email_address: &str,
182 _device_id: Option<&str>,
183 ) -> Result<Invite> {
184 unimplemented!()
185 }
186
187 pub async fn record_sent_invites(&self, _invites: &[Invite]) -> Result<()> {
188 unimplemented!()
189 }
190}
191
192impl Db<sqlx::Postgres> {
193 pub async fn new(url: &str, max_connections: u32) -> Result<Self> {
194 let pool = sqlx::postgres::PgPoolOptions::new()
195 .max_connections(max_connections)
196 .connect(url)
197 .await?;
198 Ok(Self {
199 pool,
200 #[cfg(test)]
201 background: None,
202 #[cfg(test)]
203 runtime: None,
204 })
205 }
206
207 #[cfg(test)]
208 pub fn teardown(&self, url: &str) {
209 self.runtime.as_ref().unwrap().block_on(async {
210 use util::ResultExt;
211 let query = "
212 SELECT pg_terminate_backend(pg_stat_activity.pid)
213 FROM pg_stat_activity
214 WHERE pg_stat_activity.datname = current_database() AND pid <> pg_backend_pid();
215 ";
216 sqlx::query(query).execute(&self.pool).await.log_err();
217 self.pool.close().await;
218 <sqlx::Sqlite as sqlx::migrate::MigrateDatabase>::drop_database(url)
219 .await
220 .log_err();
221 })
222 }
223
224 pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
225 self.transact(|tx| async {
226 let mut tx = tx;
227 let like_string = Self::fuzzy_like_string(name_query);
228 let query = "
229 SELECT users.*
230 FROM users
231 WHERE github_login ILIKE $1
232 ORDER BY github_login <-> $2
233 LIMIT $3
234 ";
235 Ok(sqlx::query_as(query)
236 .bind(like_string)
237 .bind(name_query)
238 .bind(limit as i32)
239 .fetch_all(&mut tx)
240 .await?)
241 })
242 .await
243 }
244
245 pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<User>> {
246 let ids = ids.iter().map(|id| id.0).collect::<Vec<_>>();
247 self.transact(|tx| async {
248 let mut tx = tx;
249 let query = "
250 SELECT users.*
251 FROM users
252 WHERE users.id = ANY ($1)
253 ";
254 Ok(sqlx::query_as(query).bind(&ids).fetch_all(&mut tx).await?)
255 })
256 .await
257 }
258
259 pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
260 self.transact(|mut tx| async move {
261 let query = "
262 SELECT metrics_id::text
263 FROM users
264 WHERE id = $1
265 ";
266 Ok(sqlx::query_scalar(query)
267 .bind(id)
268 .fetch_one(&mut tx)
269 .await?)
270 })
271 .await
272 }
273
274 pub async fn create_user(
275 &self,
276 email_address: &str,
277 admin: bool,
278 params: NewUserParams,
279 ) -> Result<NewUserResult> {
280 self.transact(|mut tx| async {
281 let query = "
282 INSERT INTO users (email_address, github_login, github_user_id, admin)
283 VALUES ($1, $2, $3, $4)
284 ON CONFLICT (github_login) DO UPDATE SET github_login = excluded.github_login
285 RETURNING id, metrics_id::text
286 ";
287
288 let (user_id, metrics_id): (UserId, String) = sqlx::query_as(query)
289 .bind(email_address)
290 .bind(¶ms.github_login)
291 .bind(params.github_user_id)
292 .bind(admin)
293 .fetch_one(&mut tx)
294 .await?;
295 tx.commit().await?;
296
297 Ok(NewUserResult {
298 user_id,
299 metrics_id,
300 signup_device_id: None,
301 inviting_user_id: None,
302 })
303 })
304 .await
305 }
306
307 pub async fn create_user_from_invite(
308 &self,
309 invite: &Invite,
310 user: NewUserParams,
311 ) -> Result<Option<NewUserResult>> {
312 self.transact(|mut tx| async {
313 let (signup_id, existing_user_id, inviting_user_id, signup_device_id): (
314 i32,
315 Option<UserId>,
316 Option<UserId>,
317 Option<String>,
318 ) = sqlx::query_as(
319 "
320 SELECT id, user_id, inviting_user_id, device_id
321 FROM signups
322 WHERE
323 email_address = $1 AND
324 email_confirmation_code = $2
325 ",
326 )
327 .bind(&invite.email_address)
328 .bind(&invite.email_confirmation_code)
329 .fetch_optional(&mut tx)
330 .await?
331 .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
332
333 if existing_user_id.is_some() {
334 return Ok(None);
335 }
336
337 let (user_id, metrics_id): (UserId, String) = sqlx::query_as(
338 "
339 INSERT INTO users
340 (email_address, github_login, github_user_id, admin, invite_count, invite_code)
341 VALUES
342 ($1, $2, $3, FALSE, $4, $5)
343 ON CONFLICT (github_login) DO UPDATE SET
344 email_address = excluded.email_address,
345 github_user_id = excluded.github_user_id,
346 admin = excluded.admin
347 RETURNING id, metrics_id::text
348 ",
349 )
350 .bind(&invite.email_address)
351 .bind(&user.github_login)
352 .bind(&user.github_user_id)
353 .bind(&user.invite_count)
354 .bind(random_invite_code())
355 .fetch_one(&mut tx)
356 .await?;
357
358 sqlx::query(
359 "
360 UPDATE signups
361 SET user_id = $1
362 WHERE id = $2
363 ",
364 )
365 .bind(&user_id)
366 .bind(&signup_id)
367 .execute(&mut tx)
368 .await?;
369
370 if let Some(inviting_user_id) = inviting_user_id {
371 let id: Option<UserId> = sqlx::query_scalar(
372 "
373 UPDATE users
374 SET invite_count = invite_count - 1
375 WHERE id = $1 AND invite_count > 0
376 RETURNING id
377 ",
378 )
379 .bind(&inviting_user_id)
380 .fetch_optional(&mut tx)
381 .await?;
382
383 if id.is_none() {
384 Err(Error::Http(
385 StatusCode::UNAUTHORIZED,
386 "no invites remaining".to_string(),
387 ))?;
388 }
389
390 sqlx::query(
391 "
392 INSERT INTO contacts
393 (user_id_a, user_id_b, a_to_b, should_notify, accepted)
394 VALUES
395 ($1, $2, TRUE, TRUE, TRUE)
396 ON CONFLICT DO NOTHING
397 ",
398 )
399 .bind(inviting_user_id)
400 .bind(user_id)
401 .execute(&mut tx)
402 .await?;
403 }
404
405 tx.commit().await?;
406 Ok(Some(NewUserResult {
407 user_id,
408 metrics_id,
409 inviting_user_id,
410 signup_device_id,
411 }))
412 })
413 .await
414 }
415
416 pub async fn create_signup(&self, signup: Signup) -> Result<()> {
417 self.transact(|mut tx| async {
418 sqlx::query(
419 "
420 INSERT INTO signups
421 (
422 email_address,
423 email_confirmation_code,
424 email_confirmation_sent,
425 platform_linux,
426 platform_mac,
427 platform_windows,
428 platform_unknown,
429 editor_features,
430 programming_languages,
431 device_id
432 )
433 VALUES
434 ($1, $2, FALSE, $3, $4, $5, FALSE, $6, $7, $8)
435 RETURNING id
436 ",
437 )
438 .bind(&signup.email_address)
439 .bind(&random_email_confirmation_code())
440 .bind(&signup.platform_linux)
441 .bind(&signup.platform_mac)
442 .bind(&signup.platform_windows)
443 .bind(&signup.editor_features)
444 .bind(&signup.programming_languages)
445 .bind(&signup.device_id)
446 .execute(&mut tx)
447 .await?;
448 tx.commit().await?;
449 Ok(())
450 })
451 .await
452 }
453
454 pub async fn create_invite_from_code(
455 &self,
456 code: &str,
457 email_address: &str,
458 device_id: Option<&str>,
459 ) -> Result<Invite> {
460 self.transact(|mut tx| async {
461 let existing_user: Option<UserId> = sqlx::query_scalar(
462 "
463 SELECT id
464 FROM users
465 WHERE email_address = $1
466 ",
467 )
468 .bind(email_address)
469 .fetch_optional(&mut tx)
470 .await?;
471 if existing_user.is_some() {
472 Err(anyhow!("email address is already in use"))?;
473 }
474
475 let row: Option<(UserId, i32)> = sqlx::query_as(
476 "
477 SELECT id, invite_count
478 FROM users
479 WHERE invite_code = $1
480 ",
481 )
482 .bind(code)
483 .fetch_optional(&mut tx)
484 .await?;
485
486 let (inviter_id, invite_count) = match row {
487 Some(row) => row,
488 None => Err(Error::Http(
489 StatusCode::NOT_FOUND,
490 "invite code not found".to_string(),
491 ))?,
492 };
493
494 if invite_count == 0 {
495 Err(Error::Http(
496 StatusCode::UNAUTHORIZED,
497 "no invites remaining".to_string(),
498 ))?;
499 }
500
501 let email_confirmation_code: String = sqlx::query_scalar(
502 "
503 INSERT INTO signups
504 (
505 email_address,
506 email_confirmation_code,
507 email_confirmation_sent,
508 inviting_user_id,
509 platform_linux,
510 platform_mac,
511 platform_windows,
512 platform_unknown,
513 device_id
514 )
515 VALUES
516 ($1, $2, FALSE, $3, FALSE, FALSE, FALSE, TRUE, $4)
517 ON CONFLICT (email_address)
518 DO UPDATE SET
519 inviting_user_id = excluded.inviting_user_id
520 RETURNING email_confirmation_code
521 ",
522 )
523 .bind(&email_address)
524 .bind(&random_email_confirmation_code())
525 .bind(&inviter_id)
526 .bind(&device_id)
527 .fetch_one(&mut tx)
528 .await?;
529
530 tx.commit().await?;
531
532 Ok(Invite {
533 email_address: email_address.into(),
534 email_confirmation_code,
535 })
536 })
537 .await
538 }
539
540 pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
541 self.transact(|mut tx| async {
542 let emails = invites
543 .iter()
544 .map(|s| s.email_address.as_str())
545 .collect::<Vec<_>>();
546 sqlx::query(
547 "
548 UPDATE signups
549 SET email_confirmation_sent = TRUE
550 WHERE email_address = ANY ($1)
551 ",
552 )
553 .bind(&emails)
554 .execute(&mut tx)
555 .await?;
556 tx.commit().await?;
557 Ok(())
558 })
559 .await
560 }
561}
562
563impl<D> Db<D>
564where
565 Self: BeginTransaction<Database = D>,
566 D: sqlx::Database + sqlx::migrate::MigrateDatabase,
567 D::Connection: sqlx::migrate::Migrate,
568 for<'a> <D as sqlx::database::HasArguments<'a>>::Arguments: sqlx::IntoArguments<'a, D>,
569 for<'a> &'a mut D::Connection: sqlx::Executor<'a, Database = D>,
570 for<'a, 'b> &'b mut sqlx::Transaction<'a, D>: sqlx::Executor<'b, Database = D>,
571 D::QueryResult: RowsAffected,
572 String: sqlx::Type<D>,
573 i32: sqlx::Type<D>,
574 i64: sqlx::Type<D>,
575 bool: sqlx::Type<D>,
576 str: sqlx::Type<D>,
577 Uuid: sqlx::Type<D>,
578 sqlx::types::Json<serde_json::Value>: sqlx::Type<D>,
579 OffsetDateTime: sqlx::Type<D>,
580 PrimitiveDateTime: sqlx::Type<D>,
581 usize: sqlx::ColumnIndex<D::Row>,
582 for<'a> &'a str: sqlx::ColumnIndex<D::Row>,
583 for<'a> &'a str: sqlx::Encode<'a, D> + sqlx::Decode<'a, D>,
584 for<'a> String: sqlx::Encode<'a, D> + sqlx::Decode<'a, D>,
585 for<'a> Option<String>: sqlx::Encode<'a, D> + sqlx::Decode<'a, D>,
586 for<'a> Option<&'a str>: sqlx::Encode<'a, D> + sqlx::Decode<'a, D>,
587 for<'a> i32: sqlx::Encode<'a, D> + sqlx::Decode<'a, D>,
588 for<'a> i64: sqlx::Encode<'a, D> + sqlx::Decode<'a, D>,
589 for<'a> bool: sqlx::Encode<'a, D> + sqlx::Decode<'a, D>,
590 for<'a> Uuid: sqlx::Encode<'a, D> + sqlx::Decode<'a, D>,
591 for<'a> Option<ProjectId>: sqlx::Encode<'a, D> + sqlx::Decode<'a, D>,
592 for<'a> sqlx::types::JsonValue: sqlx::Encode<'a, D> + sqlx::Decode<'a, D>,
593 for<'a> OffsetDateTime: sqlx::Encode<'a, D> + sqlx::Decode<'a, D>,
594 for<'a> PrimitiveDateTime: sqlx::Decode<'a, D> + sqlx::Decode<'a, D>,
595{
596 pub async fn migrate(
597 &self,
598 migrations_path: &Path,
599 ignore_checksum_mismatch: bool,
600 ) -> anyhow::Result<Vec<(Migration, Duration)>> {
601 let migrations = MigrationSource::resolve(migrations_path)
602 .await
603 .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
604
605 let mut conn = self.pool.acquire().await?;
606
607 conn.ensure_migrations_table().await?;
608 let applied_migrations: HashMap<_, _> = conn
609 .list_applied_migrations()
610 .await?
611 .into_iter()
612 .map(|m| (m.version, m))
613 .collect();
614
615 let mut new_migrations = Vec::new();
616 for migration in migrations {
617 match applied_migrations.get(&migration.version) {
618 Some(applied_migration) => {
619 if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
620 {
621 Err(anyhow!(
622 "checksum mismatch for applied migration {}",
623 migration.description
624 ))?;
625 }
626 }
627 None => {
628 let elapsed = conn.apply(&migration).await?;
629 new_migrations.push((migration, elapsed));
630 }
631 }
632 }
633
634 Ok(new_migrations)
635 }
636
637 pub fn fuzzy_like_string(string: &str) -> String {
638 let mut result = String::with_capacity(string.len() * 2 + 1);
639 for c in string.chars() {
640 if c.is_alphanumeric() {
641 result.push('%');
642 result.push(c);
643 }
644 }
645 result.push('%');
646 result
647 }
648
649 // users
650
651 pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
652 self.transact(|tx| async {
653 let mut tx = tx;
654 let query = "SELECT * FROM users ORDER BY github_login ASC LIMIT $1 OFFSET $2";
655 Ok(sqlx::query_as(query)
656 .bind(limit as i32)
657 .bind((page * limit) as i32)
658 .fetch_all(&mut tx)
659 .await?)
660 })
661 .await
662 }
663
664 pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<User>> {
665 self.transact(|tx| async {
666 let mut tx = tx;
667 let query = "
668 SELECT users.*
669 FROM users
670 WHERE id = $1
671 LIMIT 1
672 ";
673 Ok(sqlx::query_as(query)
674 .bind(&id)
675 .fetch_optional(&mut tx)
676 .await?)
677 })
678 .await
679 }
680
681 pub async fn get_users_with_no_invites(
682 &self,
683 invited_by_another_user: bool,
684 ) -> Result<Vec<User>> {
685 self.transact(|tx| async {
686 let mut tx = tx;
687 let query = format!(
688 "
689 SELECT users.*
690 FROM users
691 WHERE invite_count = 0
692 AND inviter_id IS{} NULL
693 ",
694 if invited_by_another_user { " NOT" } else { "" }
695 );
696
697 Ok(sqlx::query_as(&query).fetch_all(&mut tx).await?)
698 })
699 .await
700 }
701
702 pub async fn get_user_by_github_account(
703 &self,
704 github_login: &str,
705 github_user_id: Option<i32>,
706 ) -> Result<Option<User>> {
707 self.transact(|tx| async {
708 let mut tx = tx;
709 if let Some(github_user_id) = github_user_id {
710 let mut user = sqlx::query_as::<_, User>(
711 "
712 UPDATE users
713 SET github_login = $1
714 WHERE github_user_id = $2
715 RETURNING *
716 ",
717 )
718 .bind(github_login)
719 .bind(github_user_id)
720 .fetch_optional(&mut tx)
721 .await?;
722
723 if user.is_none() {
724 user = sqlx::query_as::<_, User>(
725 "
726 UPDATE users
727 SET github_user_id = $1
728 WHERE github_login = $2
729 RETURNING *
730 ",
731 )
732 .bind(github_user_id)
733 .bind(github_login)
734 .fetch_optional(&mut tx)
735 .await?;
736 }
737
738 Ok(user)
739 } else {
740 let user = sqlx::query_as(
741 "
742 SELECT * FROM users
743 WHERE github_login = $1
744 LIMIT 1
745 ",
746 )
747 .bind(github_login)
748 .fetch_optional(&mut tx)
749 .await?;
750 Ok(user)
751 }
752 })
753 .await
754 }
755
756 pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
757 self.transact(|mut tx| async {
758 let query = "UPDATE users SET admin = $1 WHERE id = $2";
759 sqlx::query(query)
760 .bind(is_admin)
761 .bind(id.0)
762 .execute(&mut tx)
763 .await?;
764 tx.commit().await?;
765 Ok(())
766 })
767 .await
768 }
769
770 pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
771 self.transact(|mut tx| async move {
772 let query = "UPDATE users SET connected_once = $1 WHERE id = $2";
773 sqlx::query(query)
774 .bind(connected_once)
775 .bind(id.0)
776 .execute(&mut tx)
777 .await?;
778 tx.commit().await?;
779 Ok(())
780 })
781 .await
782 }
783
784 pub async fn destroy_user(&self, id: UserId) -> Result<()> {
785 self.transact(|mut tx| async move {
786 let query = "DELETE FROM access_tokens WHERE user_id = $1;";
787 sqlx::query(query)
788 .bind(id.0)
789 .execute(&mut tx)
790 .await
791 .map(drop)?;
792 let query = "DELETE FROM users WHERE id = $1;";
793 sqlx::query(query).bind(id.0).execute(&mut tx).await?;
794 tx.commit().await?;
795 Ok(())
796 })
797 .await
798 }
799
800 // signups
801
802 pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
803 self.transact(|mut tx| async move {
804 Ok(sqlx::query_as(
805 "
806 SELECT
807 COUNT(*) as count,
808 COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
809 COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
810 COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
811 COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
812 FROM (
813 SELECT *
814 FROM signups
815 WHERE
816 NOT email_confirmation_sent
817 ) AS unsent
818 ",
819 )
820 .fetch_one(&mut tx)
821 .await?)
822 })
823 .await
824 }
825
826 pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
827 self.transact(|mut tx| async move {
828 Ok(sqlx::query_as(
829 "
830 SELECT
831 email_address, email_confirmation_code
832 FROM signups
833 WHERE
834 NOT email_confirmation_sent AND
835 (platform_mac OR platform_unknown)
836 LIMIT $1
837 ",
838 )
839 .bind(count as i32)
840 .fetch_all(&mut tx)
841 .await?)
842 })
843 .await
844 }
845
846 // invite codes
847
848 pub async fn set_invite_count_for_user(&self, id: UserId, count: u32) -> Result<()> {
849 self.transact(|mut tx| async move {
850 if count > 0 {
851 sqlx::query(
852 "
853 UPDATE users
854 SET invite_code = $1
855 WHERE id = $2 AND invite_code IS NULL
856 ",
857 )
858 .bind(random_invite_code())
859 .bind(id)
860 .execute(&mut tx)
861 .await?;
862 }
863
864 sqlx::query(
865 "
866 UPDATE users
867 SET invite_count = $1
868 WHERE id = $2
869 ",
870 )
871 .bind(count as i32)
872 .bind(id)
873 .execute(&mut tx)
874 .await?;
875 tx.commit().await?;
876 Ok(())
877 })
878 .await
879 }
880
881 pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, u32)>> {
882 self.transact(|mut tx| async move {
883 let result: Option<(String, i32)> = sqlx::query_as(
884 "
885 SELECT invite_code, invite_count
886 FROM users
887 WHERE id = $1 AND invite_code IS NOT NULL
888 ",
889 )
890 .bind(id)
891 .fetch_optional(&mut tx)
892 .await?;
893 if let Some((code, count)) = result {
894 Ok(Some((code, count.try_into().map_err(anyhow::Error::new)?)))
895 } else {
896 Ok(None)
897 }
898 })
899 .await
900 }
901
902 pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
903 self.transact(|tx| async {
904 let mut tx = tx;
905 sqlx::query_as(
906 "
907 SELECT *
908 FROM users
909 WHERE invite_code = $1
910 ",
911 )
912 .bind(code)
913 .fetch_optional(&mut tx)
914 .await?
915 .ok_or_else(|| {
916 Error::Http(
917 StatusCode::NOT_FOUND,
918 "that invite code does not exist".to_string(),
919 )
920 })
921 })
922 .await
923 }
924
925 pub async fn create_room(
926 &self,
927 user_id: UserId,
928 connection_id: ConnectionId,
929 ) -> Result<proto::Room> {
930 self.transact(|mut tx| async move {
931 let live_kit_room = nanoid::nanoid!(30);
932 let room_id = sqlx::query_scalar(
933 "
934 INSERT INTO rooms (live_kit_room, version)
935 VALUES ($1, $2)
936 RETURNING id
937 ",
938 )
939 .bind(&live_kit_room)
940 .bind(0)
941 .fetch_one(&mut tx)
942 .await
943 .map(RoomId)?;
944
945 sqlx::query(
946 "
947 INSERT INTO room_participants (room_id, user_id, answering_connection_id, calling_user_id, calling_connection_id)
948 VALUES ($1, $2, $3, $4, $5)
949 ",
950 )
951 .bind(room_id)
952 .bind(user_id)
953 .bind(connection_id.0 as i32)
954 .bind(user_id)
955 .bind(connection_id.0 as i32)
956 .execute(&mut tx)
957 .await?;
958
959 self.commit_room_transaction(room_id, tx).await
960 }).await
961 }
962
963 pub async fn call(
964 &self,
965 room_id: RoomId,
966 calling_user_id: UserId,
967 calling_connection_id: ConnectionId,
968 called_user_id: UserId,
969 initial_project_id: Option<ProjectId>,
970 ) -> Result<(proto::Room, proto::IncomingCall)> {
971 self.transact(|mut tx| async move {
972 sqlx::query(
973 "
974 INSERT INTO room_participants (room_id, user_id, calling_user_id, calling_connection_id, initial_project_id)
975 VALUES ($1, $2, $3, $4, $5)
976 ",
977 )
978 .bind(room_id)
979 .bind(called_user_id)
980 .bind(calling_user_id)
981 .bind(calling_connection_id.0 as i32)
982 .bind(initial_project_id)
983 .execute(&mut tx)
984 .await?;
985
986 let room = self.commit_room_transaction(room_id, tx).await?;
987 let incoming_call = Self::build_incoming_call(&room, called_user_id)
988 .ok_or_else(|| anyhow!("failed to build incoming call"))?;
989 Ok((room, incoming_call))
990 }).await
991 }
992
993 pub async fn incoming_call_for_user(
994 &self,
995 user_id: UserId,
996 ) -> Result<Option<proto::IncomingCall>> {
997 self.transact(|mut tx| async move {
998 let room_id = sqlx::query_scalar::<_, RoomId>(
999 "
1000 SELECT room_id
1001 FROM room_participants
1002 WHERE user_id = $1 AND answering_connection_id IS NULL
1003 ",
1004 )
1005 .bind(user_id)
1006 .fetch_optional(&mut tx)
1007 .await?;
1008
1009 if let Some(room_id) = room_id {
1010 let room = self.get_room(room_id, &mut tx).await?;
1011 Ok(Self::build_incoming_call(&room, user_id))
1012 } else {
1013 Ok(None)
1014 }
1015 })
1016 .await
1017 }
1018
1019 fn build_incoming_call(
1020 room: &proto::Room,
1021 called_user_id: UserId,
1022 ) -> Option<proto::IncomingCall> {
1023 let pending_participant = room
1024 .pending_participants
1025 .iter()
1026 .find(|participant| participant.user_id == called_user_id.to_proto())?;
1027
1028 Some(proto::IncomingCall {
1029 room_id: room.id,
1030 calling_user_id: pending_participant.calling_user_id,
1031 participant_user_ids: room
1032 .participants
1033 .iter()
1034 .map(|participant| participant.user_id)
1035 .collect(),
1036 initial_project: room.participants.iter().find_map(|participant| {
1037 let initial_project_id = pending_participant.initial_project_id?;
1038 participant
1039 .projects
1040 .iter()
1041 .find(|project| project.id == initial_project_id)
1042 .cloned()
1043 }),
1044 })
1045 }
1046
1047 pub async fn call_failed(
1048 &self,
1049 room_id: RoomId,
1050 called_user_id: UserId,
1051 ) -> Result<proto::Room> {
1052 self.transact(|mut tx| async move {
1053 sqlx::query(
1054 "
1055 DELETE FROM room_participants
1056 WHERE room_id = $1 AND user_id = $2
1057 ",
1058 )
1059 .bind(room_id)
1060 .bind(called_user_id)
1061 .execute(&mut tx)
1062 .await?;
1063
1064 self.commit_room_transaction(room_id, tx).await
1065 })
1066 .await
1067 }
1068
1069 pub async fn decline_call(
1070 &self,
1071 expected_room_id: Option<RoomId>,
1072 user_id: UserId,
1073 ) -> Result<proto::Room> {
1074 self.transact(|mut tx| async move {
1075 let room_id = sqlx::query_scalar(
1076 "
1077 DELETE FROM room_participants
1078 WHERE user_id = $1 AND answering_connection_id IS NULL
1079 RETURNING room_id
1080 ",
1081 )
1082 .bind(user_id)
1083 .fetch_one(&mut tx)
1084 .await?;
1085 if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) {
1086 return Err(anyhow!("declining call on unexpected room"))?;
1087 }
1088
1089 self.commit_room_transaction(room_id, tx).await
1090 })
1091 .await
1092 }
1093
1094 pub async fn cancel_call(
1095 &self,
1096 expected_room_id: Option<RoomId>,
1097 calling_connection_id: ConnectionId,
1098 called_user_id: UserId,
1099 ) -> Result<proto::Room> {
1100 self.transact(|mut tx| async move {
1101 let room_id = sqlx::query_scalar(
1102 "
1103 DELETE FROM room_participants
1104 WHERE user_id = $1 AND calling_connection_id = $2 AND answering_connection_id IS NULL
1105 RETURNING room_id
1106 ",
1107 )
1108 .bind(called_user_id)
1109 .bind(calling_connection_id.0 as i32)
1110 .fetch_one(&mut tx)
1111 .await?;
1112 if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) {
1113 return Err(anyhow!("canceling call on unexpected room"))?;
1114 }
1115
1116 self.commit_room_transaction(room_id, tx).await
1117 }).await
1118 }
1119
1120 pub async fn join_room(
1121 &self,
1122 room_id: RoomId,
1123 user_id: UserId,
1124 connection_id: ConnectionId,
1125 ) -> Result<proto::Room> {
1126 self.transact(|mut tx| async move {
1127 sqlx::query(
1128 "
1129 UPDATE room_participants
1130 SET answering_connection_id = $1
1131 WHERE room_id = $2 AND user_id = $3
1132 RETURNING 1
1133 ",
1134 )
1135 .bind(connection_id.0 as i32)
1136 .bind(room_id)
1137 .bind(user_id)
1138 .fetch_one(&mut tx)
1139 .await?;
1140 self.commit_room_transaction(room_id, tx).await
1141 })
1142 .await
1143 }
1144
1145 pub async fn leave_room(&self, connection_id: ConnectionId) -> Result<Option<LeftRoom>> {
1146 self.transact(|mut tx| async move {
1147 // Leave room.
1148 let room_id = sqlx::query_scalar::<_, RoomId>(
1149 "
1150 DELETE FROM room_participants
1151 WHERE answering_connection_id = $1
1152 RETURNING room_id
1153 ",
1154 )
1155 .bind(connection_id.0 as i32)
1156 .fetch_optional(&mut tx)
1157 .await?;
1158
1159 if let Some(room_id) = room_id {
1160 // Cancel pending calls initiated by the leaving user.
1161 let canceled_calls_to_user_ids: Vec<UserId> = sqlx::query_scalar(
1162 "
1163 DELETE FROM room_participants
1164 WHERE calling_connection_id = $1 AND answering_connection_id IS NULL
1165 RETURNING user_id
1166 ",
1167 )
1168 .bind(connection_id.0 as i32)
1169 .fetch_all(&mut tx)
1170 .await?;
1171
1172 let project_ids = sqlx::query_scalar::<_, ProjectId>(
1173 "
1174 SELECT project_id
1175 FROM project_collaborators
1176 WHERE connection_id = $1
1177 ",
1178 )
1179 .bind(connection_id.0 as i32)
1180 .fetch_all(&mut tx)
1181 .await?;
1182
1183 // Leave projects.
1184 let mut left_projects = HashMap::default();
1185 if !project_ids.is_empty() {
1186 let mut params = "?,".repeat(project_ids.len());
1187 params.pop();
1188 let query = format!(
1189 "
1190 SELECT *
1191 FROM project_collaborators
1192 WHERE project_id IN ({params})
1193 "
1194 );
1195 let mut query = sqlx::query_as::<_, ProjectCollaborator>(&query);
1196 for project_id in project_ids {
1197 query = query.bind(project_id);
1198 }
1199
1200 let mut project_collaborators = query.fetch(&mut tx);
1201 while let Some(collaborator) = project_collaborators.next().await {
1202 let collaborator = collaborator?;
1203 let left_project =
1204 left_projects
1205 .entry(collaborator.project_id)
1206 .or_insert(LeftProject {
1207 id: collaborator.project_id,
1208 host_user_id: Default::default(),
1209 connection_ids: Default::default(),
1210 host_connection_id: Default::default(),
1211 });
1212
1213 let collaborator_connection_id =
1214 ConnectionId(collaborator.connection_id as u32);
1215 if collaborator_connection_id != connection_id {
1216 left_project.connection_ids.push(collaborator_connection_id);
1217 }
1218
1219 if collaborator.is_host {
1220 left_project.host_user_id = collaborator.user_id;
1221 left_project.host_connection_id =
1222 ConnectionId(collaborator.connection_id as u32);
1223 }
1224 }
1225 }
1226 sqlx::query(
1227 "
1228 DELETE FROM project_collaborators
1229 WHERE connection_id = $1
1230 ",
1231 )
1232 .bind(connection_id.0 as i32)
1233 .execute(&mut tx)
1234 .await?;
1235
1236 // Unshare projects.
1237 sqlx::query(
1238 "
1239 DELETE FROM projects
1240 WHERE room_id = $1 AND host_connection_id = $2
1241 ",
1242 )
1243 .bind(room_id)
1244 .bind(connection_id.0 as i32)
1245 .execute(&mut tx)
1246 .await?;
1247
1248 let room = self.commit_room_transaction(room_id, tx).await?;
1249 Ok(Some(LeftRoom {
1250 room,
1251 left_projects,
1252 canceled_calls_to_user_ids,
1253 }))
1254 } else {
1255 Ok(None)
1256 }
1257 })
1258 .await
1259 }
1260
1261 pub async fn update_room_participant_location(
1262 &self,
1263 room_id: RoomId,
1264 connection_id: ConnectionId,
1265 location: proto::ParticipantLocation,
1266 ) -> Result<proto::Room> {
1267 self.transact(|tx| async {
1268 let mut tx = tx;
1269 let location_kind;
1270 let location_project_id;
1271 match location
1272 .variant
1273 .as_ref()
1274 .ok_or_else(|| anyhow!("invalid location"))?
1275 {
1276 proto::participant_location::Variant::SharedProject(project) => {
1277 location_kind = 0;
1278 location_project_id = Some(ProjectId::from_proto(project.id));
1279 }
1280 proto::participant_location::Variant::UnsharedProject(_) => {
1281 location_kind = 1;
1282 location_project_id = None;
1283 }
1284 proto::participant_location::Variant::External(_) => {
1285 location_kind = 2;
1286 location_project_id = None;
1287 }
1288 }
1289
1290 sqlx::query(
1291 "
1292 UPDATE room_participants
1293 SET location_kind = $1, location_project_id = $2
1294 WHERE room_id = $3 AND answering_connection_id = $4
1295 RETURNING 1
1296 ",
1297 )
1298 .bind(location_kind)
1299 .bind(location_project_id)
1300 .bind(room_id)
1301 .bind(connection_id.0 as i32)
1302 .fetch_one(&mut tx)
1303 .await?;
1304
1305 self.commit_room_transaction(room_id, tx).await
1306 })
1307 .await
1308 }
1309
1310 async fn commit_room_transaction(
1311 &self,
1312 room_id: RoomId,
1313 mut tx: sqlx::Transaction<'_, D>,
1314 ) -> Result<proto::Room> {
1315 sqlx::query(
1316 "
1317 UPDATE rooms
1318 SET version = version + 1
1319 WHERE id = $1
1320 ",
1321 )
1322 .bind(room_id)
1323 .execute(&mut tx)
1324 .await?;
1325 let room = self.get_room(room_id, &mut tx).await?;
1326 tx.commit().await?;
1327
1328 Ok(room)
1329 }
1330
1331 async fn get_guest_connection_ids(
1332 &self,
1333 project_id: ProjectId,
1334 tx: &mut sqlx::Transaction<'_, D>,
1335 ) -> Result<Vec<ConnectionId>> {
1336 let mut guest_connection_ids = Vec::new();
1337 let mut db_guest_connection_ids = sqlx::query_scalar::<_, i32>(
1338 "
1339 SELECT connection_id
1340 FROM project_collaborators
1341 WHERE project_id = $1 AND is_host = FALSE
1342 ",
1343 )
1344 .bind(project_id)
1345 .fetch(tx);
1346 while let Some(connection_id) = db_guest_connection_ids.next().await {
1347 guest_connection_ids.push(ConnectionId(connection_id? as u32));
1348 }
1349 Ok(guest_connection_ids)
1350 }
1351
1352 async fn get_room(
1353 &self,
1354 room_id: RoomId,
1355 tx: &mut sqlx::Transaction<'_, D>,
1356 ) -> Result<proto::Room> {
1357 let room: Room = sqlx::query_as(
1358 "
1359 SELECT *
1360 FROM rooms
1361 WHERE id = $1
1362 ",
1363 )
1364 .bind(room_id)
1365 .fetch_one(&mut *tx)
1366 .await?;
1367
1368 let mut db_participants =
1369 sqlx::query_as::<_, (UserId, Option<i32>, Option<i32>, Option<ProjectId>, UserId, Option<ProjectId>)>(
1370 "
1371 SELECT user_id, answering_connection_id, location_kind, location_project_id, calling_user_id, initial_project_id
1372 FROM room_participants
1373 WHERE room_id = $1
1374 ",
1375 )
1376 .bind(room_id)
1377 .fetch(&mut *tx);
1378
1379 let mut participants = HashMap::default();
1380 let mut pending_participants = Vec::new();
1381 while let Some(participant) = db_participants.next().await {
1382 let (
1383 user_id,
1384 answering_connection_id,
1385 location_kind,
1386 location_project_id,
1387 calling_user_id,
1388 initial_project_id,
1389 ) = participant?;
1390 if let Some(answering_connection_id) = answering_connection_id {
1391 let location = match (location_kind, location_project_id) {
1392 (Some(0), Some(project_id)) => {
1393 Some(proto::participant_location::Variant::SharedProject(
1394 proto::participant_location::SharedProject {
1395 id: project_id.to_proto(),
1396 },
1397 ))
1398 }
1399 (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1400 Default::default(),
1401 )),
1402 _ => Some(proto::participant_location::Variant::External(
1403 Default::default(),
1404 )),
1405 };
1406 participants.insert(
1407 answering_connection_id,
1408 proto::Participant {
1409 user_id: user_id.to_proto(),
1410 peer_id: answering_connection_id as u32,
1411 projects: Default::default(),
1412 location: Some(proto::ParticipantLocation { variant: location }),
1413 },
1414 );
1415 } else {
1416 pending_participants.push(proto::PendingParticipant {
1417 user_id: user_id.to_proto(),
1418 calling_user_id: calling_user_id.to_proto(),
1419 initial_project_id: initial_project_id.map(|id| id.to_proto()),
1420 });
1421 }
1422 }
1423 drop(db_participants);
1424
1425 let mut rows = sqlx::query_as::<_, (i32, ProjectId, Option<String>)>(
1426 "
1427 SELECT host_connection_id, projects.id, worktrees.root_name
1428 FROM projects
1429 LEFT JOIN worktrees ON projects.id = worktrees.project_id
1430 WHERE room_id = $1
1431 ",
1432 )
1433 .bind(room_id)
1434 .fetch(&mut *tx);
1435
1436 while let Some(row) = rows.next().await {
1437 let (connection_id, project_id, worktree_root_name) = row?;
1438 if let Some(participant) = participants.get_mut(&connection_id) {
1439 let project = if let Some(project) = participant
1440 .projects
1441 .iter_mut()
1442 .find(|project| project.id == project_id.to_proto())
1443 {
1444 project
1445 } else {
1446 participant.projects.push(proto::ParticipantProject {
1447 id: project_id.to_proto(),
1448 worktree_root_names: Default::default(),
1449 });
1450 participant.projects.last_mut().unwrap()
1451 };
1452 project.worktree_root_names.extend(worktree_root_name);
1453 }
1454 }
1455
1456 Ok(proto::Room {
1457 id: room.id.to_proto(),
1458 version: room.version as u64,
1459 live_kit_room: room.live_kit_room,
1460 participants: participants.into_values().collect(),
1461 pending_participants,
1462 })
1463 }
1464
1465 // projects
1466
1467 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
1468 self.transact(|mut tx| async move {
1469 Ok(sqlx::query_scalar::<_, i32>(
1470 "
1471 SELECT COUNT(*)
1472 FROM projects, users
1473 WHERE projects.host_user_id = users.id AND users.admin IS FALSE
1474 ",
1475 )
1476 .fetch_one(&mut tx)
1477 .await? as usize)
1478 })
1479 .await
1480 }
1481
1482 pub async fn share_project(
1483 &self,
1484 expected_room_id: RoomId,
1485 connection_id: ConnectionId,
1486 worktrees: &[proto::WorktreeMetadata],
1487 ) -> Result<(ProjectId, proto::Room)> {
1488 self.transact(|mut tx| async move {
1489 let (room_id, user_id) = sqlx::query_as::<_, (RoomId, UserId)>(
1490 "
1491 SELECT room_id, user_id
1492 FROM room_participants
1493 WHERE answering_connection_id = $1
1494 ",
1495 )
1496 .bind(connection_id.0 as i32)
1497 .fetch_one(&mut tx)
1498 .await?;
1499 if room_id != expected_room_id {
1500 return Err(anyhow!("shared project on unexpected room"))?;
1501 }
1502
1503 let project_id: ProjectId = sqlx::query_scalar(
1504 "
1505 INSERT INTO projects (room_id, host_user_id, host_connection_id)
1506 VALUES ($1, $2, $3)
1507 RETURNING id
1508 ",
1509 )
1510 .bind(room_id)
1511 .bind(user_id)
1512 .bind(connection_id.0 as i32)
1513 .fetch_one(&mut tx)
1514 .await?;
1515
1516 if !worktrees.is_empty() {
1517 let mut params = "(?, ?, ?, ?, ?, ?, ?),".repeat(worktrees.len());
1518 params.pop();
1519 let query = format!(
1520 "
1521 INSERT INTO worktrees (
1522 project_id,
1523 id,
1524 root_name,
1525 abs_path,
1526 visible,
1527 scan_id,
1528 is_complete
1529 )
1530 VALUES {params}
1531 "
1532 );
1533
1534 let mut query = sqlx::query(&query);
1535 for worktree in worktrees {
1536 query = query
1537 .bind(project_id)
1538 .bind(worktree.id as i32)
1539 .bind(&worktree.root_name)
1540 .bind(&worktree.abs_path)
1541 .bind(worktree.visible)
1542 .bind(0)
1543 .bind(false);
1544 }
1545 query.execute(&mut tx).await?;
1546 }
1547
1548 sqlx::query(
1549 "
1550 INSERT INTO project_collaborators (
1551 project_id,
1552 connection_id,
1553 user_id,
1554 replica_id,
1555 is_host
1556 )
1557 VALUES ($1, $2, $3, $4, $5)
1558 ",
1559 )
1560 .bind(project_id)
1561 .bind(connection_id.0 as i32)
1562 .bind(user_id)
1563 .bind(0)
1564 .bind(true)
1565 .execute(&mut tx)
1566 .await?;
1567
1568 let room = self.commit_room_transaction(room_id, tx).await?;
1569 Ok((project_id, room))
1570 })
1571 .await
1572 }
1573
1574 pub async fn unshare_project(
1575 &self,
1576 project_id: ProjectId,
1577 connection_id: ConnectionId,
1578 ) -> Result<(proto::Room, Vec<ConnectionId>)> {
1579 self.transact(|mut tx| async move {
1580 let guest_connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?;
1581 let room_id: RoomId = sqlx::query_scalar(
1582 "
1583 DELETE FROM projects
1584 WHERE id = $1 AND host_connection_id = $2
1585 RETURNING room_id
1586 ",
1587 )
1588 .bind(project_id)
1589 .bind(connection_id.0 as i32)
1590 .fetch_one(&mut tx)
1591 .await?;
1592 let room = self.commit_room_transaction(room_id, tx).await?;
1593
1594 Ok((room, guest_connection_ids))
1595 })
1596 .await
1597 }
1598
1599 pub async fn update_project(
1600 &self,
1601 project_id: ProjectId,
1602 connection_id: ConnectionId,
1603 worktrees: &[proto::WorktreeMetadata],
1604 ) -> Result<(proto::Room, Vec<ConnectionId>)> {
1605 self.transact(|mut tx| async move {
1606 let room_id: RoomId = sqlx::query_scalar(
1607 "
1608 SELECT room_id
1609 FROM projects
1610 WHERE id = $1 AND host_connection_id = $2
1611 ",
1612 )
1613 .bind(project_id)
1614 .bind(connection_id.0 as i32)
1615 .fetch_one(&mut tx)
1616 .await?;
1617
1618 if !worktrees.is_empty() {
1619 let mut params = "(?, ?, ?, ?, ?, ?, ?),".repeat(worktrees.len());
1620 params.pop();
1621 let query = format!(
1622 "
1623 INSERT INTO worktrees (
1624 project_id,
1625 id,
1626 root_name,
1627 abs_path,
1628 visible,
1629 scan_id,
1630 is_complete
1631 )
1632 VALUES {params}
1633 ON CONFLICT (project_id, id) DO UPDATE SET root_name = excluded.root_name
1634 "
1635 );
1636
1637 let mut query = sqlx::query(&query);
1638 for worktree in worktrees {
1639 query = query
1640 .bind(project_id)
1641 .bind(worktree.id as i32)
1642 .bind(&worktree.root_name)
1643 .bind(&worktree.abs_path)
1644 .bind(worktree.visible)
1645 .bind(0)
1646 .bind(false)
1647 }
1648 query.execute(&mut tx).await?;
1649 }
1650
1651 let mut params = "?,".repeat(worktrees.len());
1652 if !worktrees.is_empty() {
1653 params.pop();
1654 }
1655 let query = format!(
1656 "
1657 DELETE FROM worktrees
1658 WHERE project_id = ? AND id NOT IN ({params})
1659 ",
1660 );
1661
1662 let mut query = sqlx::query(&query).bind(project_id);
1663 for worktree in worktrees {
1664 query = query.bind(WorktreeId(worktree.id as i32));
1665 }
1666 query.execute(&mut tx).await?;
1667
1668 let guest_connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?;
1669 let room = self.commit_room_transaction(room_id, tx).await?;
1670
1671 Ok((room, guest_connection_ids))
1672 })
1673 .await
1674 }
1675
1676 pub async fn update_worktree(
1677 &self,
1678 update: &proto::UpdateWorktree,
1679 connection_id: ConnectionId,
1680 ) -> Result<Vec<ConnectionId>> {
1681 self.transact(|mut tx| async move {
1682 let project_id = ProjectId::from_proto(update.project_id);
1683 let worktree_id = WorktreeId::from_proto(update.worktree_id);
1684
1685 // Ensure the update comes from the host.
1686 sqlx::query(
1687 "
1688 SELECT 1
1689 FROM projects
1690 WHERE id = $1 AND host_connection_id = $2
1691 ",
1692 )
1693 .bind(project_id)
1694 .bind(connection_id.0 as i32)
1695 .fetch_one(&mut tx)
1696 .await?;
1697
1698 // Update metadata.
1699 sqlx::query(
1700 "
1701 UPDATE worktrees
1702 SET
1703 root_name = $1,
1704 scan_id = $2,
1705 is_complete = $3,
1706 abs_path = $4
1707 WHERE project_id = $5 AND id = $6
1708 RETURNING 1
1709 ",
1710 )
1711 .bind(&update.root_name)
1712 .bind(update.scan_id as i64)
1713 .bind(update.is_last_update)
1714 .bind(&update.abs_path)
1715 .bind(project_id)
1716 .bind(worktree_id)
1717 .fetch_one(&mut tx)
1718 .await?;
1719
1720 if !update.updated_entries.is_empty() {
1721 let mut params =
1722 "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?),".repeat(update.updated_entries.len());
1723 params.pop();
1724
1725 let query = format!(
1726 "
1727 INSERT INTO worktree_entries (
1728 project_id,
1729 worktree_id,
1730 id,
1731 is_dir,
1732 path,
1733 inode,
1734 mtime_seconds,
1735 mtime_nanos,
1736 is_symlink,
1737 is_ignored
1738 )
1739 VALUES {params}
1740 ON CONFLICT (project_id, worktree_id, id) DO UPDATE SET
1741 is_dir = excluded.is_dir,
1742 path = excluded.path,
1743 inode = excluded.inode,
1744 mtime_seconds = excluded.mtime_seconds,
1745 mtime_nanos = excluded.mtime_nanos,
1746 is_symlink = excluded.is_symlink,
1747 is_ignored = excluded.is_ignored
1748 "
1749 );
1750 let mut query = sqlx::query(&query);
1751 for entry in &update.updated_entries {
1752 let mtime = entry.mtime.clone().unwrap_or_default();
1753 query = query
1754 .bind(project_id)
1755 .bind(worktree_id)
1756 .bind(entry.id as i64)
1757 .bind(entry.is_dir)
1758 .bind(&entry.path)
1759 .bind(entry.inode as i64)
1760 .bind(mtime.seconds as i64)
1761 .bind(mtime.nanos as i32)
1762 .bind(entry.is_symlink)
1763 .bind(entry.is_ignored);
1764 }
1765 query.execute(&mut tx).await?;
1766 }
1767
1768 if !update.removed_entries.is_empty() {
1769 let mut params = "?,".repeat(update.removed_entries.len());
1770 params.pop();
1771 let query = format!(
1772 "
1773 DELETE FROM worktree_entries
1774 WHERE project_id = ? AND worktree_id = ? AND id IN ({params})
1775 "
1776 );
1777
1778 let mut query = sqlx::query(&query).bind(project_id).bind(worktree_id);
1779 for entry_id in &update.removed_entries {
1780 query = query.bind(*entry_id as i64);
1781 }
1782 query.execute(&mut tx).await?;
1783 }
1784
1785 let connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?;
1786 tx.commit().await?;
1787 Ok(connection_ids)
1788 })
1789 .await
1790 }
1791
1792 pub async fn update_diagnostic_summary(
1793 &self,
1794 update: &proto::UpdateDiagnosticSummary,
1795 connection_id: ConnectionId,
1796 ) -> Result<Vec<ConnectionId>> {
1797 self.transact(|mut tx| async {
1798 let project_id = ProjectId::from_proto(update.project_id);
1799 let worktree_id = WorktreeId::from_proto(update.worktree_id);
1800 let summary = update
1801 .summary
1802 .as_ref()
1803 .ok_or_else(|| anyhow!("invalid summary"))?;
1804
1805 // Ensure the update comes from the host.
1806 sqlx::query(
1807 "
1808 SELECT 1
1809 FROM projects
1810 WHERE id = $1 AND host_connection_id = $2
1811 ",
1812 )
1813 .bind(project_id)
1814 .bind(connection_id.0 as i32)
1815 .fetch_one(&mut tx)
1816 .await?;
1817
1818 // Update summary.
1819 sqlx::query(
1820 "
1821 INSERT INTO worktree_diagnostic_summaries (
1822 project_id,
1823 worktree_id,
1824 path,
1825 language_server_id,
1826 error_count,
1827 warning_count
1828 )
1829 VALUES ($1, $2, $3, $4, $5, $6)
1830 ON CONFLICT (project_id, worktree_id, path) DO UPDATE SET
1831 language_server_id = excluded.language_server_id,
1832 error_count = excluded.error_count,
1833 warning_count = excluded.warning_count
1834 ",
1835 )
1836 .bind(project_id)
1837 .bind(worktree_id)
1838 .bind(&summary.path)
1839 .bind(summary.language_server_id as i64)
1840 .bind(summary.error_count as i32)
1841 .bind(summary.warning_count as i32)
1842 .execute(&mut tx)
1843 .await?;
1844
1845 let connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?;
1846 tx.commit().await?;
1847 Ok(connection_ids)
1848 })
1849 .await
1850 }
1851
1852 pub async fn start_language_server(
1853 &self,
1854 update: &proto::StartLanguageServer,
1855 connection_id: ConnectionId,
1856 ) -> Result<Vec<ConnectionId>> {
1857 self.transact(|mut tx| async {
1858 let project_id = ProjectId::from_proto(update.project_id);
1859 let server = update
1860 .server
1861 .as_ref()
1862 .ok_or_else(|| anyhow!("invalid language server"))?;
1863
1864 // Ensure the update comes from the host.
1865 sqlx::query(
1866 "
1867 SELECT 1
1868 FROM projects
1869 WHERE id = $1 AND host_connection_id = $2
1870 ",
1871 )
1872 .bind(project_id)
1873 .bind(connection_id.0 as i32)
1874 .fetch_one(&mut tx)
1875 .await?;
1876
1877 // Add the newly-started language server.
1878 sqlx::query(
1879 "
1880 INSERT INTO language_servers (project_id, id, name)
1881 VALUES ($1, $2, $3)
1882 ON CONFLICT (project_id, id) DO UPDATE SET
1883 name = excluded.name
1884 ",
1885 )
1886 .bind(project_id)
1887 .bind(server.id as i64)
1888 .bind(&server.name)
1889 .execute(&mut tx)
1890 .await?;
1891
1892 let connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?;
1893 tx.commit().await?;
1894 Ok(connection_ids)
1895 })
1896 .await
1897 }
1898
1899 pub async fn join_project(
1900 &self,
1901 project_id: ProjectId,
1902 connection_id: ConnectionId,
1903 ) -> Result<(Project, ReplicaId)> {
1904 self.transact(|mut tx| async move {
1905 let (room_id, user_id) = sqlx::query_as::<_, (RoomId, UserId)>(
1906 "
1907 SELECT room_id, user_id
1908 FROM room_participants
1909 WHERE answering_connection_id = $1
1910 ",
1911 )
1912 .bind(connection_id.0 as i32)
1913 .fetch_one(&mut tx)
1914 .await?;
1915
1916 // Ensure project id was shared on this room.
1917 sqlx::query(
1918 "
1919 SELECT 1
1920 FROM projects
1921 WHERE id = $1 AND room_id = $2
1922 ",
1923 )
1924 .bind(project_id)
1925 .bind(room_id)
1926 .fetch_one(&mut tx)
1927 .await?;
1928
1929 let mut collaborators = sqlx::query_as::<_, ProjectCollaborator>(
1930 "
1931 SELECT *
1932 FROM project_collaborators
1933 WHERE project_id = $1
1934 ",
1935 )
1936 .bind(project_id)
1937 .fetch_all(&mut tx)
1938 .await?;
1939 let replica_ids = collaborators
1940 .iter()
1941 .map(|c| c.replica_id)
1942 .collect::<HashSet<_>>();
1943 let mut replica_id = ReplicaId(1);
1944 while replica_ids.contains(&replica_id) {
1945 replica_id.0 += 1;
1946 }
1947 let new_collaborator = ProjectCollaborator {
1948 project_id,
1949 connection_id: connection_id.0 as i32,
1950 user_id,
1951 replica_id,
1952 is_host: false,
1953 };
1954
1955 sqlx::query(
1956 "
1957 INSERT INTO project_collaborators (
1958 project_id,
1959 connection_id,
1960 user_id,
1961 replica_id,
1962 is_host
1963 )
1964 VALUES ($1, $2, $3, $4, $5)
1965 ",
1966 )
1967 .bind(new_collaborator.project_id)
1968 .bind(new_collaborator.connection_id)
1969 .bind(new_collaborator.user_id)
1970 .bind(new_collaborator.replica_id)
1971 .bind(new_collaborator.is_host)
1972 .execute(&mut tx)
1973 .await?;
1974 collaborators.push(new_collaborator);
1975
1976 let worktree_rows = sqlx::query_as::<_, WorktreeRow>(
1977 "
1978 SELECT *
1979 FROM worktrees
1980 WHERE project_id = $1
1981 ",
1982 )
1983 .bind(project_id)
1984 .fetch_all(&mut tx)
1985 .await?;
1986 let mut worktrees = worktree_rows
1987 .into_iter()
1988 .map(|worktree_row| {
1989 (
1990 worktree_row.id,
1991 Worktree {
1992 id: worktree_row.id,
1993 abs_path: worktree_row.abs_path,
1994 root_name: worktree_row.root_name,
1995 visible: worktree_row.visible,
1996 entries: Default::default(),
1997 diagnostic_summaries: Default::default(),
1998 scan_id: worktree_row.scan_id as u64,
1999 is_complete: worktree_row.is_complete,
2000 },
2001 )
2002 })
2003 .collect::<BTreeMap<_, _>>();
2004
2005 // Populate worktree entries.
2006 {
2007 let mut entries = sqlx::query_as::<_, WorktreeEntry>(
2008 "
2009 SELECT *
2010 FROM worktree_entries
2011 WHERE project_id = $1
2012 ",
2013 )
2014 .bind(project_id)
2015 .fetch(&mut tx);
2016 while let Some(entry) = entries.next().await {
2017 let entry = entry?;
2018 if let Some(worktree) = worktrees.get_mut(&entry.worktree_id) {
2019 worktree.entries.push(proto::Entry {
2020 id: entry.id as u64,
2021 is_dir: entry.is_dir,
2022 path: entry.path,
2023 inode: entry.inode as u64,
2024 mtime: Some(proto::Timestamp {
2025 seconds: entry.mtime_seconds as u64,
2026 nanos: entry.mtime_nanos as u32,
2027 }),
2028 is_symlink: entry.is_symlink,
2029 is_ignored: entry.is_ignored,
2030 });
2031 }
2032 }
2033 }
2034
2035 // Populate worktree diagnostic summaries.
2036 {
2037 let mut summaries = sqlx::query_as::<_, WorktreeDiagnosticSummary>(
2038 "
2039 SELECT *
2040 FROM worktree_diagnostic_summaries
2041 WHERE project_id = $1
2042 ",
2043 )
2044 .bind(project_id)
2045 .fetch(&mut tx);
2046 while let Some(summary) = summaries.next().await {
2047 let summary = summary?;
2048 if let Some(worktree) = worktrees.get_mut(&summary.worktree_id) {
2049 worktree
2050 .diagnostic_summaries
2051 .push(proto::DiagnosticSummary {
2052 path: summary.path,
2053 language_server_id: summary.language_server_id as u64,
2054 error_count: summary.error_count as u32,
2055 warning_count: summary.warning_count as u32,
2056 });
2057 }
2058 }
2059 }
2060
2061 // Populate language servers.
2062 let language_servers = sqlx::query_as::<_, LanguageServer>(
2063 "
2064 SELECT *
2065 FROM language_servers
2066 WHERE project_id = $1
2067 ",
2068 )
2069 .bind(project_id)
2070 .fetch_all(&mut tx)
2071 .await?;
2072
2073 tx.commit().await?;
2074 Ok((
2075 Project {
2076 collaborators,
2077 worktrees,
2078 language_servers: language_servers
2079 .into_iter()
2080 .map(|language_server| proto::LanguageServer {
2081 id: language_server.id.to_proto(),
2082 name: language_server.name,
2083 })
2084 .collect(),
2085 },
2086 replica_id as ReplicaId,
2087 ))
2088 })
2089 .await
2090 }
2091
2092 pub async fn leave_project(
2093 &self,
2094 project_id: ProjectId,
2095 connection_id: ConnectionId,
2096 ) -> Result<LeftProject> {
2097 self.transact(|mut tx| async move {
2098 let result = sqlx::query(
2099 "
2100 DELETE FROM project_collaborators
2101 WHERE project_id = $1 AND connection_id = $2
2102 ",
2103 )
2104 .bind(project_id)
2105 .bind(connection_id.0 as i32)
2106 .execute(&mut tx)
2107 .await?;
2108
2109 if result.rows_affected() == 0 {
2110 Err(anyhow!("not a collaborator on this project"))?;
2111 }
2112
2113 let connection_ids = sqlx::query_scalar::<_, i32>(
2114 "
2115 SELECT connection_id
2116 FROM project_collaborators
2117 WHERE project_id = $1
2118 ",
2119 )
2120 .bind(project_id)
2121 .fetch_all(&mut tx)
2122 .await?
2123 .into_iter()
2124 .map(|id| ConnectionId(id as u32))
2125 .collect();
2126
2127 let (host_user_id, host_connection_id) = sqlx::query_as::<_, (i32, i32)>(
2128 "
2129 SELECT host_user_id, host_connection_id
2130 FROM projects
2131 WHERE id = $1
2132 ",
2133 )
2134 .bind(project_id)
2135 .fetch_one(&mut tx)
2136 .await?;
2137
2138 tx.commit().await?;
2139
2140 Ok(LeftProject {
2141 id: project_id,
2142 host_user_id: UserId(host_user_id),
2143 host_connection_id: ConnectionId(host_connection_id as u32),
2144 connection_ids,
2145 })
2146 })
2147 .await
2148 }
2149
2150 pub async fn project_collaborators(
2151 &self,
2152 project_id: ProjectId,
2153 connection_id: ConnectionId,
2154 ) -> Result<Vec<ProjectCollaborator>> {
2155 self.transact(|mut tx| async move {
2156 let collaborators = sqlx::query_as::<_, ProjectCollaborator>(
2157 "
2158 SELECT *
2159 FROM project_collaborators
2160 WHERE project_id = $1
2161 ",
2162 )
2163 .bind(project_id)
2164 .fetch_all(&mut tx)
2165 .await?;
2166
2167 if collaborators
2168 .iter()
2169 .any(|collaborator| collaborator.connection_id == connection_id.0 as i32)
2170 {
2171 Ok(collaborators)
2172 } else {
2173 Err(anyhow!("no such project"))?
2174 }
2175 })
2176 .await
2177 }
2178
2179 pub async fn project_connection_ids(
2180 &self,
2181 project_id: ProjectId,
2182 connection_id: ConnectionId,
2183 ) -> Result<HashSet<ConnectionId>> {
2184 self.transact(|mut tx| async move {
2185 let connection_ids = sqlx::query_scalar::<_, i32>(
2186 "
2187 SELECT connection_id
2188 FROM project_collaborators
2189 WHERE project_id = $1
2190 ",
2191 )
2192 .bind(project_id)
2193 .fetch_all(&mut tx)
2194 .await?;
2195
2196 if connection_ids.contains(&(connection_id.0 as i32)) {
2197 Ok(connection_ids
2198 .into_iter()
2199 .map(|connection_id| ConnectionId(connection_id as u32))
2200 .collect())
2201 } else {
2202 Err(anyhow!("no such project"))?
2203 }
2204 })
2205 .await
2206 }
2207
2208 // contacts
2209
2210 pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
2211 self.transact(|mut tx| async move {
2212 let query = "
2213 SELECT user_id_a, user_id_b, a_to_b, accepted, should_notify, (room_participants.id IS NOT NULL) as busy
2214 FROM contacts
2215 LEFT JOIN room_participants ON room_participants.user_id = $1
2216 WHERE user_id_a = $1 OR user_id_b = $1;
2217 ";
2218
2219 let mut rows = sqlx::query_as::<_, (UserId, UserId, bool, bool, bool, bool)>(query)
2220 .bind(user_id)
2221 .fetch(&mut tx);
2222
2223 let mut contacts = Vec::new();
2224 while let Some(row) = rows.next().await {
2225 let (user_id_a, user_id_b, a_to_b, accepted, should_notify, busy) = row?;
2226 if user_id_a == user_id {
2227 if accepted {
2228 contacts.push(Contact::Accepted {
2229 user_id: user_id_b,
2230 should_notify: should_notify && a_to_b,
2231 busy
2232 });
2233 } else if a_to_b {
2234 contacts.push(Contact::Outgoing { user_id: user_id_b })
2235 } else {
2236 contacts.push(Contact::Incoming {
2237 user_id: user_id_b,
2238 should_notify,
2239 });
2240 }
2241 } else if accepted {
2242 contacts.push(Contact::Accepted {
2243 user_id: user_id_a,
2244 should_notify: should_notify && !a_to_b,
2245 busy
2246 });
2247 } else if a_to_b {
2248 contacts.push(Contact::Incoming {
2249 user_id: user_id_a,
2250 should_notify,
2251 });
2252 } else {
2253 contacts.push(Contact::Outgoing { user_id: user_id_a });
2254 }
2255 }
2256
2257 contacts.sort_unstable_by_key(|contact| contact.user_id());
2258
2259 Ok(contacts)
2260 })
2261 .await
2262 }
2263
2264 pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
2265 self.transact(|mut tx| async move {
2266 Ok(sqlx::query_scalar::<_, i32>(
2267 "
2268 SELECT 1
2269 FROM room_participants
2270 WHERE room_participants.user_id = $1
2271 ",
2272 )
2273 .bind(user_id)
2274 .fetch_optional(&mut tx)
2275 .await?
2276 .is_some())
2277 })
2278 .await
2279 }
2280
2281 pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
2282 self.transact(|mut tx| async move {
2283 let (id_a, id_b) = if user_id_1 < user_id_2 {
2284 (user_id_1, user_id_2)
2285 } else {
2286 (user_id_2, user_id_1)
2287 };
2288
2289 let query = "
2290 SELECT 1 FROM contacts
2291 WHERE user_id_a = $1 AND user_id_b = $2 AND accepted = TRUE
2292 LIMIT 1
2293 ";
2294 Ok(sqlx::query_scalar::<_, i32>(query)
2295 .bind(id_a.0)
2296 .bind(id_b.0)
2297 .fetch_optional(&mut tx)
2298 .await?
2299 .is_some())
2300 })
2301 .await
2302 }
2303
2304 pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
2305 self.transact(|mut tx| async move {
2306 let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
2307 (sender_id, receiver_id, true)
2308 } else {
2309 (receiver_id, sender_id, false)
2310 };
2311 let query = "
2312 INSERT into contacts (user_id_a, user_id_b, a_to_b, accepted, should_notify)
2313 VALUES ($1, $2, $3, FALSE, TRUE)
2314 ON CONFLICT (user_id_a, user_id_b) DO UPDATE
2315 SET
2316 accepted = TRUE,
2317 should_notify = FALSE
2318 WHERE
2319 NOT contacts.accepted AND
2320 ((contacts.a_to_b = excluded.a_to_b AND contacts.user_id_a = excluded.user_id_b) OR
2321 (contacts.a_to_b != excluded.a_to_b AND contacts.user_id_a = excluded.user_id_a));
2322 ";
2323 let result = sqlx::query(query)
2324 .bind(id_a.0)
2325 .bind(id_b.0)
2326 .bind(a_to_b)
2327 .execute(&mut tx)
2328 .await?;
2329
2330 if result.rows_affected() == 1 {
2331 tx.commit().await?;
2332 Ok(())
2333 } else {
2334 Err(anyhow!("contact already requested"))?
2335 }
2336 }).await
2337 }
2338
2339 pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<()> {
2340 self.transact(|mut tx| async move {
2341 let (id_a, id_b) = if responder_id < requester_id {
2342 (responder_id, requester_id)
2343 } else {
2344 (requester_id, responder_id)
2345 };
2346 let query = "
2347 DELETE FROM contacts
2348 WHERE user_id_a = $1 AND user_id_b = $2;
2349 ";
2350 let result = sqlx::query(query)
2351 .bind(id_a.0)
2352 .bind(id_b.0)
2353 .execute(&mut tx)
2354 .await?;
2355
2356 if result.rows_affected() == 1 {
2357 tx.commit().await?;
2358 Ok(())
2359 } else {
2360 Err(anyhow!("no such contact"))?
2361 }
2362 })
2363 .await
2364 }
2365
2366 pub async fn dismiss_contact_notification(
2367 &self,
2368 user_id: UserId,
2369 contact_user_id: UserId,
2370 ) -> Result<()> {
2371 self.transact(|mut tx| async move {
2372 let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
2373 (user_id, contact_user_id, true)
2374 } else {
2375 (contact_user_id, user_id, false)
2376 };
2377
2378 let query = "
2379 UPDATE contacts
2380 SET should_notify = FALSE
2381 WHERE
2382 user_id_a = $1 AND user_id_b = $2 AND
2383 (
2384 (a_to_b = $3 AND accepted) OR
2385 (a_to_b != $3 AND NOT accepted)
2386 );
2387 ";
2388
2389 let result = sqlx::query(query)
2390 .bind(id_a.0)
2391 .bind(id_b.0)
2392 .bind(a_to_b)
2393 .execute(&mut tx)
2394 .await?;
2395
2396 if result.rows_affected() == 0 {
2397 Err(anyhow!("no such contact request"))?
2398 } else {
2399 tx.commit().await?;
2400 Ok(())
2401 }
2402 })
2403 .await
2404 }
2405
2406 pub async fn respond_to_contact_request(
2407 &self,
2408 responder_id: UserId,
2409 requester_id: UserId,
2410 accept: bool,
2411 ) -> Result<()> {
2412 self.transact(|mut tx| async move {
2413 let (id_a, id_b, a_to_b) = if responder_id < requester_id {
2414 (responder_id, requester_id, false)
2415 } else {
2416 (requester_id, responder_id, true)
2417 };
2418 let result = if accept {
2419 let query = "
2420 UPDATE contacts
2421 SET accepted = TRUE, should_notify = TRUE
2422 WHERE user_id_a = $1 AND user_id_b = $2 AND a_to_b = $3;
2423 ";
2424 sqlx::query(query)
2425 .bind(id_a.0)
2426 .bind(id_b.0)
2427 .bind(a_to_b)
2428 .execute(&mut tx)
2429 .await?
2430 } else {
2431 let query = "
2432 DELETE FROM contacts
2433 WHERE user_id_a = $1 AND user_id_b = $2 AND a_to_b = $3 AND NOT accepted;
2434 ";
2435 sqlx::query(query)
2436 .bind(id_a.0)
2437 .bind(id_b.0)
2438 .bind(a_to_b)
2439 .execute(&mut tx)
2440 .await?
2441 };
2442 if result.rows_affected() == 1 {
2443 tx.commit().await?;
2444 Ok(())
2445 } else {
2446 Err(anyhow!("no such contact request"))?
2447 }
2448 })
2449 .await
2450 }
2451
2452 // access tokens
2453
2454 pub async fn create_access_token_hash(
2455 &self,
2456 user_id: UserId,
2457 access_token_hash: &str,
2458 max_access_token_count: usize,
2459 ) -> Result<()> {
2460 self.transact(|tx| async {
2461 let mut tx = tx;
2462 let insert_query = "
2463 INSERT INTO access_tokens (user_id, hash)
2464 VALUES ($1, $2);
2465 ";
2466 let cleanup_query = "
2467 DELETE FROM access_tokens
2468 WHERE id IN (
2469 SELECT id from access_tokens
2470 WHERE user_id = $1
2471 ORDER BY id DESC
2472 LIMIT 10000
2473 OFFSET $3
2474 )
2475 ";
2476
2477 sqlx::query(insert_query)
2478 .bind(user_id.0)
2479 .bind(access_token_hash)
2480 .execute(&mut tx)
2481 .await?;
2482 sqlx::query(cleanup_query)
2483 .bind(user_id.0)
2484 .bind(access_token_hash)
2485 .bind(max_access_token_count as i32)
2486 .execute(&mut tx)
2487 .await?;
2488 Ok(tx.commit().await?)
2489 })
2490 .await
2491 }
2492
2493 pub async fn get_access_token_hashes(&self, user_id: UserId) -> Result<Vec<String>> {
2494 self.transact(|mut tx| async move {
2495 let query = "
2496 SELECT hash
2497 FROM access_tokens
2498 WHERE user_id = $1
2499 ORDER BY id DESC
2500 ";
2501 Ok(sqlx::query_scalar(query)
2502 .bind(user_id.0)
2503 .fetch_all(&mut tx)
2504 .await?)
2505 })
2506 .await
2507 }
2508
2509 async fn transact<F, Fut, T>(&self, f: F) -> Result<T>
2510 where
2511 F: Send + Fn(sqlx::Transaction<'static, D>) -> Fut,
2512 Fut: Send + Future<Output = Result<T>>,
2513 {
2514 let body = async {
2515 loop {
2516 let tx = self.begin_transaction().await?;
2517 match f(tx).await {
2518 Ok(result) => return Ok(result),
2519 Err(error) => match error {
2520 Error::Database(error)
2521 if error
2522 .as_database_error()
2523 .and_then(|error| error.code())
2524 .as_deref()
2525 == Some("hey") =>
2526 {
2527 // Retry (don't break the loop)
2528 }
2529 error @ _ => return Err(error),
2530 },
2531 }
2532 }
2533 };
2534
2535 #[cfg(test)]
2536 {
2537 if let Some(background) = self.background.as_ref() {
2538 background.simulate_random_delay().await;
2539 }
2540
2541 let result = self.runtime.as_ref().unwrap().block_on(body);
2542
2543 if let Some(background) = self.background.as_ref() {
2544 background.simulate_random_delay().await;
2545 }
2546
2547 result
2548 }
2549
2550 #[cfg(not(test))]
2551 {
2552 body.await
2553 }
2554 }
2555}
2556
2557macro_rules! id_type {
2558 ($name:ident) => {
2559 #[derive(
2560 Clone,
2561 Copy,
2562 Debug,
2563 Default,
2564 PartialEq,
2565 Eq,
2566 PartialOrd,
2567 Ord,
2568 Hash,
2569 sqlx::Type,
2570 Serialize,
2571 Deserialize,
2572 )]
2573 #[sqlx(transparent)]
2574 #[serde(transparent)]
2575 pub struct $name(pub i32);
2576
2577 impl $name {
2578 #[allow(unused)]
2579 pub const MAX: Self = Self(i32::MAX);
2580
2581 #[allow(unused)]
2582 pub fn from_proto(value: u64) -> Self {
2583 Self(value as i32)
2584 }
2585
2586 #[allow(unused)]
2587 pub fn to_proto(self) -> u64 {
2588 self.0 as u64
2589 }
2590 }
2591
2592 impl std::fmt::Display for $name {
2593 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2594 self.0.fmt(f)
2595 }
2596 }
2597 };
2598}
2599
2600id_type!(UserId);
2601#[derive(Clone, Debug, Default, FromRow, Serialize, PartialEq)]
2602pub struct User {
2603 pub id: UserId,
2604 pub github_login: String,
2605 pub github_user_id: Option<i32>,
2606 pub email_address: Option<String>,
2607 pub admin: bool,
2608 pub invite_code: Option<String>,
2609 pub invite_count: i32,
2610 pub connected_once: bool,
2611}
2612
2613id_type!(RoomId);
2614#[derive(Clone, Debug, Default, FromRow, Serialize, PartialEq)]
2615pub struct Room {
2616 pub id: RoomId,
2617 pub version: i32,
2618 pub live_kit_room: String,
2619}
2620
2621id_type!(ProjectId);
2622pub struct Project {
2623 pub collaborators: Vec<ProjectCollaborator>,
2624 pub worktrees: BTreeMap<WorktreeId, Worktree>,
2625 pub language_servers: Vec<proto::LanguageServer>,
2626}
2627
2628id_type!(ReplicaId);
2629#[derive(Clone, Debug, Default, FromRow, PartialEq)]
2630pub struct ProjectCollaborator {
2631 pub project_id: ProjectId,
2632 pub connection_id: i32,
2633 pub user_id: UserId,
2634 pub replica_id: ReplicaId,
2635 pub is_host: bool,
2636}
2637
2638id_type!(WorktreeId);
2639#[derive(Clone, Debug, Default, FromRow, PartialEq)]
2640struct WorktreeRow {
2641 pub id: WorktreeId,
2642 pub abs_path: String,
2643 pub root_name: String,
2644 pub visible: bool,
2645 pub scan_id: i64,
2646 pub is_complete: bool,
2647}
2648
2649pub struct Worktree {
2650 pub id: WorktreeId,
2651 pub abs_path: String,
2652 pub root_name: String,
2653 pub visible: bool,
2654 pub entries: Vec<proto::Entry>,
2655 pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
2656 pub scan_id: u64,
2657 pub is_complete: bool,
2658}
2659
2660#[derive(Clone, Debug, Default, FromRow, PartialEq)]
2661struct WorktreeEntry {
2662 id: i64,
2663 worktree_id: WorktreeId,
2664 is_dir: bool,
2665 path: String,
2666 inode: i64,
2667 mtime_seconds: i64,
2668 mtime_nanos: i32,
2669 is_symlink: bool,
2670 is_ignored: bool,
2671}
2672
2673#[derive(Clone, Debug, Default, FromRow, PartialEq)]
2674struct WorktreeDiagnosticSummary {
2675 worktree_id: WorktreeId,
2676 path: String,
2677 language_server_id: i64,
2678 error_count: i32,
2679 warning_count: i32,
2680}
2681
2682id_type!(LanguageServerId);
2683#[derive(Clone, Debug, Default, FromRow, PartialEq)]
2684struct LanguageServer {
2685 id: LanguageServerId,
2686 name: String,
2687}
2688
2689pub struct LeftProject {
2690 pub id: ProjectId,
2691 pub host_user_id: UserId,
2692 pub host_connection_id: ConnectionId,
2693 pub connection_ids: Vec<ConnectionId>,
2694}
2695
2696pub struct LeftRoom {
2697 pub room: proto::Room,
2698 pub left_projects: HashMap<ProjectId, LeftProject>,
2699 pub canceled_calls_to_user_ids: Vec<UserId>,
2700}
2701
2702#[derive(Clone, Debug, PartialEq, Eq)]
2703pub enum Contact {
2704 Accepted {
2705 user_id: UserId,
2706 should_notify: bool,
2707 busy: bool,
2708 },
2709 Outgoing {
2710 user_id: UserId,
2711 },
2712 Incoming {
2713 user_id: UserId,
2714 should_notify: bool,
2715 },
2716}
2717
2718impl Contact {
2719 pub fn user_id(&self) -> UserId {
2720 match self {
2721 Contact::Accepted { user_id, .. } => *user_id,
2722 Contact::Outgoing { user_id } => *user_id,
2723 Contact::Incoming { user_id, .. } => *user_id,
2724 }
2725 }
2726}
2727
2728#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
2729pub struct IncomingContactRequest {
2730 pub requester_id: UserId,
2731 pub should_notify: bool,
2732}
2733
2734#[derive(Clone, Deserialize)]
2735pub struct Signup {
2736 pub email_address: String,
2737 pub platform_mac: bool,
2738 pub platform_windows: bool,
2739 pub platform_linux: bool,
2740 pub editor_features: Vec<String>,
2741 pub programming_languages: Vec<String>,
2742 pub device_id: Option<String>,
2743}
2744
2745#[derive(Clone, Debug, PartialEq, Deserialize, Serialize, FromRow)]
2746pub struct WaitlistSummary {
2747 #[sqlx(default)]
2748 pub count: i64,
2749 #[sqlx(default)]
2750 pub linux_count: i64,
2751 #[sqlx(default)]
2752 pub mac_count: i64,
2753 #[sqlx(default)]
2754 pub windows_count: i64,
2755 #[sqlx(default)]
2756 pub unknown_count: i64,
2757}
2758
2759#[derive(FromRow, PartialEq, Debug, Serialize, Deserialize)]
2760pub struct Invite {
2761 pub email_address: String,
2762 pub email_confirmation_code: String,
2763}
2764
2765#[derive(Debug, Serialize, Deserialize)]
2766pub struct NewUserParams {
2767 pub github_login: String,
2768 pub github_user_id: i32,
2769 pub invite_count: i32,
2770}
2771
2772#[derive(Debug)]
2773pub struct NewUserResult {
2774 pub user_id: UserId,
2775 pub metrics_id: String,
2776 pub inviting_user_id: Option<UserId>,
2777 pub signup_device_id: Option<String>,
2778}
2779
2780fn random_invite_code() -> String {
2781 nanoid::nanoid!(16)
2782}
2783
2784fn random_email_confirmation_code() -> String {
2785 nanoid::nanoid!(64)
2786}
2787
2788#[cfg(test)]
2789pub use test::*;
2790
2791#[cfg(test)]
2792mod test {
2793 use super::*;
2794 use gpui::executor::Background;
2795 use lazy_static::lazy_static;
2796 use parking_lot::Mutex;
2797 use rand::prelude::*;
2798 use sqlx::migrate::MigrateDatabase;
2799 use std::sync::Arc;
2800
2801 pub struct SqliteTestDb {
2802 pub db: Option<Arc<Db<sqlx::Sqlite>>>,
2803 pub conn: sqlx::sqlite::SqliteConnection,
2804 }
2805
2806 pub struct PostgresTestDb {
2807 pub db: Option<Arc<Db<sqlx::Postgres>>>,
2808 pub url: String,
2809 }
2810
2811 impl SqliteTestDb {
2812 pub fn new(background: Arc<Background>) -> Self {
2813 let mut rng = StdRng::from_entropy();
2814 let url = format!("file:zed-test-{}?mode=memory", rng.gen::<u128>());
2815 let runtime = tokio::runtime::Builder::new_current_thread()
2816 .enable_io()
2817 .enable_time()
2818 .build()
2819 .unwrap();
2820
2821 let (mut db, conn) = runtime.block_on(async {
2822 let db = Db::<sqlx::Sqlite>::new(&url, 5).await.unwrap();
2823 let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations.sqlite");
2824 db.migrate(migrations_path.as_ref(), false).await.unwrap();
2825 let conn = db.pool.acquire().await.unwrap().detach();
2826 (db, conn)
2827 });
2828
2829 db.background = Some(background);
2830 db.runtime = Some(runtime);
2831
2832 Self {
2833 db: Some(Arc::new(db)),
2834 conn,
2835 }
2836 }
2837
2838 pub fn db(&self) -> &Arc<Db<sqlx::Sqlite>> {
2839 self.db.as_ref().unwrap()
2840 }
2841 }
2842
2843 impl PostgresTestDb {
2844 pub fn new(background: Arc<Background>) -> Self {
2845 lazy_static! {
2846 static ref LOCK: Mutex<()> = Mutex::new(());
2847 }
2848
2849 let _guard = LOCK.lock();
2850 let mut rng = StdRng::from_entropy();
2851 let url = format!(
2852 "postgres://postgres@localhost/zed-test-{}",
2853 rng.gen::<u128>()
2854 );
2855 let runtime = tokio::runtime::Builder::new_current_thread()
2856 .enable_io()
2857 .enable_time()
2858 .build()
2859 .unwrap();
2860
2861 let mut db = runtime.block_on(async {
2862 sqlx::Postgres::create_database(&url)
2863 .await
2864 .expect("failed to create test db");
2865 let db = Db::<sqlx::Postgres>::new(&url, 5).await.unwrap();
2866 let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
2867 db.migrate(Path::new(migrations_path), false).await.unwrap();
2868 db
2869 });
2870
2871 db.background = Some(background);
2872 db.runtime = Some(runtime);
2873
2874 Self {
2875 db: Some(Arc::new(db)),
2876 url,
2877 }
2878 }
2879
2880 pub fn db(&self) -> &Arc<Db<sqlx::Postgres>> {
2881 self.db.as_ref().unwrap()
2882 }
2883 }
2884
2885 impl Drop for PostgresTestDb {
2886 fn drop(&mut self) {
2887 let db = self.db.take().unwrap();
2888 db.teardown(&self.url);
2889 }
2890 }
2891}