1mod access_token;
2mod contact;
3mod project;
4mod project_collaborator;
5mod room;
6mod room_participant;
7mod signup;
8#[cfg(test)]
9mod tests;
10mod user;
11mod worktree;
12
13use crate::{Error, Result};
14use anyhow::anyhow;
15use collections::{BTreeMap, HashMap, HashSet};
16pub use contact::Contact;
17use dashmap::DashMap;
18use futures::StreamExt;
19use hyper::StatusCode;
20use rpc::{proto, ConnectionId};
21pub use sea_orm::ConnectOptions;
22use sea_orm::{
23 entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseBackend, DatabaseConnection,
24 DatabaseTransaction, DbErr, FromQueryResult, IntoActiveModel, JoinType, QueryOrder,
25 QuerySelect, Statement, TransactionTrait,
26};
27use sea_query::{Alias, Expr, OnConflict, Query};
28use serde::{Deserialize, Serialize};
29pub use signup::{Invite, NewSignup, WaitlistSummary};
30use sqlx::migrate::{Migrate, Migration, MigrationSource};
31use sqlx::Connection;
32use std::ops::{Deref, DerefMut};
33use std::path::Path;
34use std::time::Duration;
35use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
36use tokio::sync::{Mutex, OwnedMutexGuard};
37pub use user::Model as User;
38
39pub struct Database {
40 options: ConnectOptions,
41 pool: DatabaseConnection,
42 rooms: DashMap<RoomId, Arc<Mutex<()>>>,
43 #[cfg(test)]
44 background: Option<std::sync::Arc<gpui::executor::Background>>,
45 #[cfg(test)]
46 runtime: Option<tokio::runtime::Runtime>,
47}
48
49impl Database {
50 pub async fn new(options: ConnectOptions) -> Result<Self> {
51 Ok(Self {
52 options: options.clone(),
53 pool: sea_orm::Database::connect(options).await?,
54 rooms: DashMap::with_capacity(16384),
55 #[cfg(test)]
56 background: None,
57 #[cfg(test)]
58 runtime: None,
59 })
60 }
61
62 pub async fn migrate(
63 &self,
64 migrations_path: &Path,
65 ignore_checksum_mismatch: bool,
66 ) -> anyhow::Result<Vec<(Migration, Duration)>> {
67 let migrations = MigrationSource::resolve(migrations_path)
68 .await
69 .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
70
71 let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
72
73 connection.ensure_migrations_table().await?;
74 let applied_migrations: HashMap<_, _> = connection
75 .list_applied_migrations()
76 .await?
77 .into_iter()
78 .map(|m| (m.version, m))
79 .collect();
80
81 let mut new_migrations = Vec::new();
82 for migration in migrations {
83 match applied_migrations.get(&migration.version) {
84 Some(applied_migration) => {
85 if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
86 {
87 Err(anyhow!(
88 "checksum mismatch for applied migration {}",
89 migration.description
90 ))?;
91 }
92 }
93 None => {
94 let elapsed = connection.apply(&migration).await?;
95 new_migrations.push((migration, elapsed));
96 }
97 }
98 }
99
100 Ok(new_migrations)
101 }
102
103 // users
104
105 pub async fn create_user(
106 &self,
107 email_address: &str,
108 admin: bool,
109 params: NewUserParams,
110 ) -> Result<NewUserResult> {
111 self.transact(|tx| async {
112 let user = user::Entity::insert(user::ActiveModel {
113 email_address: ActiveValue::set(Some(email_address.into())),
114 github_login: ActiveValue::set(params.github_login.clone()),
115 github_user_id: ActiveValue::set(Some(params.github_user_id)),
116 admin: ActiveValue::set(admin),
117 metrics_id: ActiveValue::set(Uuid::new_v4()),
118 ..Default::default()
119 })
120 .on_conflict(
121 OnConflict::column(user::Column::GithubLogin)
122 .update_column(user::Column::GithubLogin)
123 .to_owned(),
124 )
125 .exec_with_returning(&tx)
126 .await?;
127
128 tx.commit().await?;
129
130 Ok(NewUserResult {
131 user_id: user.id,
132 metrics_id: user.metrics_id.to_string(),
133 signup_device_id: None,
134 inviting_user_id: None,
135 })
136 })
137 .await
138 }
139
140 pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
141 self.transact(|tx| async move { Ok(user::Entity::find_by_id(id).one(&tx).await?) })
142 .await
143 }
144
145 pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
146 self.transact(|tx| async {
147 let tx = tx;
148 Ok(user::Entity::find()
149 .filter(user::Column::Id.is_in(ids.iter().copied()))
150 .all(&tx)
151 .await?)
152 })
153 .await
154 }
155
156 pub async fn get_user_by_github_account(
157 &self,
158 github_login: &str,
159 github_user_id: Option<i32>,
160 ) -> Result<Option<User>> {
161 self.transact(|tx| async {
162 let tx = tx;
163 if let Some(github_user_id) = github_user_id {
164 if let Some(user_by_github_user_id) = user::Entity::find()
165 .filter(user::Column::GithubUserId.eq(github_user_id))
166 .one(&tx)
167 .await?
168 {
169 let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
170 user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
171 Ok(Some(user_by_github_user_id.update(&tx).await?))
172 } else if let Some(user_by_github_login) = user::Entity::find()
173 .filter(user::Column::GithubLogin.eq(github_login))
174 .one(&tx)
175 .await?
176 {
177 let mut user_by_github_login = user_by_github_login.into_active_model();
178 user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
179 Ok(Some(user_by_github_login.update(&tx).await?))
180 } else {
181 Ok(None)
182 }
183 } else {
184 Ok(user::Entity::find()
185 .filter(user::Column::GithubLogin.eq(github_login))
186 .one(&tx)
187 .await?)
188 }
189 })
190 .await
191 }
192
193 pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
194 self.transact(|tx| async move {
195 Ok(user::Entity::find()
196 .order_by_asc(user::Column::GithubLogin)
197 .limit(limit as u64)
198 .offset(page as u64 * limit as u64)
199 .all(&tx)
200 .await?)
201 })
202 .await
203 }
204
205 pub async fn get_users_with_no_invites(
206 &self,
207 invited_by_another_user: bool,
208 ) -> Result<Vec<User>> {
209 self.transact(|tx| async move {
210 Ok(user::Entity::find()
211 .filter(
212 user::Column::InviteCount
213 .eq(0)
214 .and(if invited_by_another_user {
215 user::Column::InviterId.is_not_null()
216 } else {
217 user::Column::InviterId.is_null()
218 }),
219 )
220 .all(&tx)
221 .await?)
222 })
223 .await
224 }
225
226 pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
227 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
228 enum QueryAs {
229 MetricsId,
230 }
231
232 self.transact(|tx| async move {
233 let metrics_id: Uuid = user::Entity::find_by_id(id)
234 .select_only()
235 .column(user::Column::MetricsId)
236 .into_values::<_, QueryAs>()
237 .one(&tx)
238 .await?
239 .ok_or_else(|| anyhow!("could not find user"))?;
240 Ok(metrics_id.to_string())
241 })
242 .await
243 }
244
245 pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
246 self.transact(|tx| async move {
247 user::Entity::update_many()
248 .filter(user::Column::Id.eq(id))
249 .col_expr(user::Column::Admin, is_admin.into())
250 .exec(&tx)
251 .await?;
252 tx.commit().await?;
253 Ok(())
254 })
255 .await
256 }
257
258 pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
259 self.transact(|tx| async move {
260 user::Entity::update_many()
261 .filter(user::Column::Id.eq(id))
262 .col_expr(user::Column::ConnectedOnce, connected_once.into())
263 .exec(&tx)
264 .await?;
265 tx.commit().await?;
266 Ok(())
267 })
268 .await
269 }
270
271 pub async fn destroy_user(&self, id: UserId) -> Result<()> {
272 self.transact(|tx| async move {
273 access_token::Entity::delete_many()
274 .filter(access_token::Column::UserId.eq(id))
275 .exec(&tx)
276 .await?;
277 user::Entity::delete_by_id(id).exec(&tx).await?;
278 tx.commit().await?;
279 Ok(())
280 })
281 .await
282 }
283
284 // contacts
285
286 pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
287 #[derive(Debug, FromQueryResult)]
288 struct ContactWithUserBusyStatuses {
289 user_id_a: UserId,
290 user_id_b: UserId,
291 a_to_b: bool,
292 accepted: bool,
293 should_notify: bool,
294 user_a_busy: bool,
295 user_b_busy: bool,
296 }
297
298 self.transact(|tx| async move {
299 let user_a_participant = Alias::new("user_a_participant");
300 let user_b_participant = Alias::new("user_b_participant");
301 let mut db_contacts = contact::Entity::find()
302 .column_as(
303 Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
304 .is_not_null(),
305 "user_a_busy",
306 )
307 .column_as(
308 Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
309 .is_not_null(),
310 "user_b_busy",
311 )
312 .filter(
313 contact::Column::UserIdA
314 .eq(user_id)
315 .or(contact::Column::UserIdB.eq(user_id)),
316 )
317 .join_as(
318 JoinType::LeftJoin,
319 contact::Relation::UserARoomParticipant.def(),
320 user_a_participant,
321 )
322 .join_as(
323 JoinType::LeftJoin,
324 contact::Relation::UserBRoomParticipant.def(),
325 user_b_participant,
326 )
327 .into_model::<ContactWithUserBusyStatuses>()
328 .stream(&tx)
329 .await?;
330
331 let mut contacts = Vec::new();
332 while let Some(db_contact) = db_contacts.next().await {
333 let db_contact = db_contact?;
334 if db_contact.user_id_a == user_id {
335 if db_contact.accepted {
336 contacts.push(Contact::Accepted {
337 user_id: db_contact.user_id_b,
338 should_notify: db_contact.should_notify && db_contact.a_to_b,
339 busy: db_contact.user_b_busy,
340 });
341 } else if db_contact.a_to_b {
342 contacts.push(Contact::Outgoing {
343 user_id: db_contact.user_id_b,
344 })
345 } else {
346 contacts.push(Contact::Incoming {
347 user_id: db_contact.user_id_b,
348 should_notify: db_contact.should_notify,
349 });
350 }
351 } else if db_contact.accepted {
352 contacts.push(Contact::Accepted {
353 user_id: db_contact.user_id_a,
354 should_notify: db_contact.should_notify && !db_contact.a_to_b,
355 busy: db_contact.user_a_busy,
356 });
357 } else if db_contact.a_to_b {
358 contacts.push(Contact::Incoming {
359 user_id: db_contact.user_id_a,
360 should_notify: db_contact.should_notify,
361 });
362 } else {
363 contacts.push(Contact::Outgoing {
364 user_id: db_contact.user_id_a,
365 });
366 }
367 }
368
369 contacts.sort_unstable_by_key(|contact| contact.user_id());
370
371 Ok(contacts)
372 })
373 .await
374 }
375
376 pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
377 self.transact(|tx| async move {
378 let participant = room_participant::Entity::find()
379 .filter(room_participant::Column::UserId.eq(user_id))
380 .one(&tx)
381 .await?;
382 Ok(participant.is_some())
383 })
384 .await
385 }
386
387 pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
388 self.transact(|tx| async move {
389 let (id_a, id_b) = if user_id_1 < user_id_2 {
390 (user_id_1, user_id_2)
391 } else {
392 (user_id_2, user_id_1)
393 };
394
395 Ok(contact::Entity::find()
396 .filter(
397 contact::Column::UserIdA
398 .eq(id_a)
399 .and(contact::Column::UserIdB.eq(id_b))
400 .and(contact::Column::Accepted.eq(true)),
401 )
402 .one(&tx)
403 .await?
404 .is_some())
405 })
406 .await
407 }
408
409 pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
410 self.transact(|tx| async move {
411 let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
412 (sender_id, receiver_id, true)
413 } else {
414 (receiver_id, sender_id, false)
415 };
416
417 let rows_affected = contact::Entity::insert(contact::ActiveModel {
418 user_id_a: ActiveValue::set(id_a),
419 user_id_b: ActiveValue::set(id_b),
420 a_to_b: ActiveValue::set(a_to_b),
421 accepted: ActiveValue::set(false),
422 should_notify: ActiveValue::set(true),
423 ..Default::default()
424 })
425 .on_conflict(
426 OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
427 .values([
428 (contact::Column::Accepted, true.into()),
429 (contact::Column::ShouldNotify, false.into()),
430 ])
431 .action_and_where(
432 contact::Column::Accepted.eq(false).and(
433 contact::Column::AToB
434 .eq(a_to_b)
435 .and(contact::Column::UserIdA.eq(id_b))
436 .or(contact::Column::AToB
437 .ne(a_to_b)
438 .and(contact::Column::UserIdA.eq(id_a))),
439 ),
440 )
441 .to_owned(),
442 )
443 .exec_without_returning(&tx)
444 .await?;
445
446 if rows_affected == 1 {
447 tx.commit().await?;
448 Ok(())
449 } else {
450 Err(anyhow!("contact already requested"))?
451 }
452 })
453 .await
454 }
455
456 pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<()> {
457 self.transact(|tx| async move {
458 let (id_a, id_b) = if responder_id < requester_id {
459 (responder_id, requester_id)
460 } else {
461 (requester_id, responder_id)
462 };
463
464 let result = contact::Entity::delete_many()
465 .filter(
466 contact::Column::UserIdA
467 .eq(id_a)
468 .and(contact::Column::UserIdB.eq(id_b)),
469 )
470 .exec(&tx)
471 .await?;
472
473 if result.rows_affected == 1 {
474 tx.commit().await?;
475 Ok(())
476 } else {
477 Err(anyhow!("no such contact"))?
478 }
479 })
480 .await
481 }
482
483 pub async fn dismiss_contact_notification(
484 &self,
485 user_id: UserId,
486 contact_user_id: UserId,
487 ) -> Result<()> {
488 self.transact(|tx| async move {
489 let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
490 (user_id, contact_user_id, true)
491 } else {
492 (contact_user_id, user_id, false)
493 };
494
495 let result = contact::Entity::update_many()
496 .set(contact::ActiveModel {
497 should_notify: ActiveValue::set(false),
498 ..Default::default()
499 })
500 .filter(
501 contact::Column::UserIdA
502 .eq(id_a)
503 .and(contact::Column::UserIdB.eq(id_b))
504 .and(
505 contact::Column::AToB
506 .eq(a_to_b)
507 .and(contact::Column::Accepted.eq(true))
508 .or(contact::Column::AToB
509 .ne(a_to_b)
510 .and(contact::Column::Accepted.eq(false))),
511 ),
512 )
513 .exec(&tx)
514 .await?;
515 if result.rows_affected == 0 {
516 Err(anyhow!("no such contact request"))?
517 } else {
518 tx.commit().await?;
519 Ok(())
520 }
521 })
522 .await
523 }
524
525 pub async fn respond_to_contact_request(
526 &self,
527 responder_id: UserId,
528 requester_id: UserId,
529 accept: bool,
530 ) -> Result<()> {
531 self.transact(|tx| async move {
532 let (id_a, id_b, a_to_b) = if responder_id < requester_id {
533 (responder_id, requester_id, false)
534 } else {
535 (requester_id, responder_id, true)
536 };
537 let rows_affected = if accept {
538 let result = contact::Entity::update_many()
539 .set(contact::ActiveModel {
540 accepted: ActiveValue::set(true),
541 should_notify: ActiveValue::set(true),
542 ..Default::default()
543 })
544 .filter(
545 contact::Column::UserIdA
546 .eq(id_a)
547 .and(contact::Column::UserIdB.eq(id_b))
548 .and(contact::Column::AToB.eq(a_to_b)),
549 )
550 .exec(&tx)
551 .await?;
552 result.rows_affected
553 } else {
554 let result = contact::Entity::delete_many()
555 .filter(
556 contact::Column::UserIdA
557 .eq(id_a)
558 .and(contact::Column::UserIdB.eq(id_b))
559 .and(contact::Column::AToB.eq(a_to_b))
560 .and(contact::Column::Accepted.eq(false)),
561 )
562 .exec(&tx)
563 .await?;
564
565 result.rows_affected
566 };
567
568 if rows_affected == 1 {
569 tx.commit().await?;
570 Ok(())
571 } else {
572 Err(anyhow!("no such contact request"))?
573 }
574 })
575 .await
576 }
577
578 pub fn fuzzy_like_string(string: &str) -> String {
579 let mut result = String::with_capacity(string.len() * 2 + 1);
580 for c in string.chars() {
581 if c.is_alphanumeric() {
582 result.push('%');
583 result.push(c);
584 }
585 }
586 result.push('%');
587 result
588 }
589
590 pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
591 self.transact(|tx| async {
592 let tx = tx;
593 let like_string = Self::fuzzy_like_string(name_query);
594 let query = "
595 SELECT users.*
596 FROM users
597 WHERE github_login ILIKE $1
598 ORDER BY github_login <-> $2
599 LIMIT $3
600 ";
601
602 Ok(user::Entity::find()
603 .from_raw_sql(Statement::from_sql_and_values(
604 self.pool.get_database_backend(),
605 query.into(),
606 vec![like_string.into(), name_query.into(), limit.into()],
607 ))
608 .all(&tx)
609 .await?)
610 })
611 .await
612 }
613
614 // signups
615
616 pub async fn create_signup(&self, signup: NewSignup) -> Result<()> {
617 self.transact(|tx| async {
618 signup::ActiveModel {
619 email_address: ActiveValue::set(signup.email_address.clone()),
620 email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
621 email_confirmation_sent: ActiveValue::set(false),
622 platform_mac: ActiveValue::set(signup.platform_mac),
623 platform_windows: ActiveValue::set(signup.platform_windows),
624 platform_linux: ActiveValue::set(signup.platform_linux),
625 platform_unknown: ActiveValue::set(false),
626 editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
627 programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
628 device_id: ActiveValue::set(signup.device_id.clone()),
629 ..Default::default()
630 }
631 .insert(&tx)
632 .await?;
633 tx.commit().await?;
634 Ok(())
635 })
636 .await
637 }
638
639 pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
640 self.transact(|tx| async move {
641 let query = "
642 SELECT
643 COUNT(*) as count,
644 COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
645 COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
646 COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
647 COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
648 FROM (
649 SELECT *
650 FROM signups
651 WHERE
652 NOT email_confirmation_sent
653 ) AS unsent
654 ";
655 Ok(
656 WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
657 self.pool.get_database_backend(),
658 query.into(),
659 vec![],
660 ))
661 .one(&tx)
662 .await?
663 .ok_or_else(|| anyhow!("invalid result"))?,
664 )
665 })
666 .await
667 }
668
669 pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
670 let emails = invites
671 .iter()
672 .map(|s| s.email_address.as_str())
673 .collect::<Vec<_>>();
674 self.transact(|tx| async {
675 signup::Entity::update_many()
676 .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
677 .col_expr(signup::Column::EmailConfirmationSent, true.into())
678 .exec(&tx)
679 .await?;
680 tx.commit().await?;
681 Ok(())
682 })
683 .await
684 }
685
686 pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
687 self.transact(|tx| async move {
688 Ok(signup::Entity::find()
689 .select_only()
690 .column(signup::Column::EmailAddress)
691 .column(signup::Column::EmailConfirmationCode)
692 .filter(
693 signup::Column::EmailConfirmationSent.eq(false).and(
694 signup::Column::PlatformMac
695 .eq(true)
696 .or(signup::Column::PlatformUnknown.eq(true)),
697 ),
698 )
699 .limit(count as u64)
700 .into_model()
701 .all(&tx)
702 .await?)
703 })
704 .await
705 }
706
707 // invite codes
708
709 pub async fn create_invite_from_code(
710 &self,
711 code: &str,
712 email_address: &str,
713 device_id: Option<&str>,
714 ) -> Result<Invite> {
715 self.transact(|tx| async move {
716 let existing_user = user::Entity::find()
717 .filter(user::Column::EmailAddress.eq(email_address))
718 .one(&tx)
719 .await?;
720
721 if existing_user.is_some() {
722 Err(anyhow!("email address is already in use"))?;
723 }
724
725 let inviter = match user::Entity::find()
726 .filter(user::Column::InviteCode.eq(code))
727 .one(&tx)
728 .await?
729 {
730 Some(inviter) => inviter,
731 None => {
732 return Err(Error::Http(
733 StatusCode::NOT_FOUND,
734 "invite code not found".to_string(),
735 ))?
736 }
737 };
738
739 if inviter.invite_count == 0 {
740 Err(Error::Http(
741 StatusCode::UNAUTHORIZED,
742 "no invites remaining".to_string(),
743 ))?;
744 }
745
746 let signup = signup::Entity::insert(signup::ActiveModel {
747 email_address: ActiveValue::set(email_address.into()),
748 email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
749 email_confirmation_sent: ActiveValue::set(false),
750 inviting_user_id: ActiveValue::set(Some(inviter.id)),
751 platform_linux: ActiveValue::set(false),
752 platform_mac: ActiveValue::set(false),
753 platform_windows: ActiveValue::set(false),
754 platform_unknown: ActiveValue::set(true),
755 device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
756 ..Default::default()
757 })
758 .on_conflict(
759 OnConflict::column(signup::Column::EmailAddress)
760 .update_column(signup::Column::InvitingUserId)
761 .to_owned(),
762 )
763 .exec_with_returning(&tx)
764 .await?;
765 tx.commit().await?;
766
767 Ok(Invite {
768 email_address: signup.email_address,
769 email_confirmation_code: signup.email_confirmation_code,
770 })
771 })
772 .await
773 }
774
775 pub async fn create_user_from_invite(
776 &self,
777 invite: &Invite,
778 user: NewUserParams,
779 ) -> Result<Option<NewUserResult>> {
780 self.transact(|tx| async {
781 let tx = tx;
782 let signup = signup::Entity::find()
783 .filter(
784 signup::Column::EmailAddress
785 .eq(invite.email_address.as_str())
786 .and(
787 signup::Column::EmailConfirmationCode
788 .eq(invite.email_confirmation_code.as_str()),
789 ),
790 )
791 .one(&tx)
792 .await?
793 .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
794
795 if signup.user_id.is_some() {
796 return Ok(None);
797 }
798
799 let user = user::Entity::insert(user::ActiveModel {
800 email_address: ActiveValue::set(Some(invite.email_address.clone())),
801 github_login: ActiveValue::set(user.github_login.clone()),
802 github_user_id: ActiveValue::set(Some(user.github_user_id)),
803 admin: ActiveValue::set(false),
804 invite_count: ActiveValue::set(user.invite_count),
805 invite_code: ActiveValue::set(Some(random_invite_code())),
806 metrics_id: ActiveValue::set(Uuid::new_v4()),
807 ..Default::default()
808 })
809 .on_conflict(
810 OnConflict::column(user::Column::GithubLogin)
811 .update_columns([
812 user::Column::EmailAddress,
813 user::Column::GithubUserId,
814 user::Column::Admin,
815 ])
816 .to_owned(),
817 )
818 .exec_with_returning(&tx)
819 .await?;
820
821 let mut signup = signup.into_active_model();
822 signup.user_id = ActiveValue::set(Some(user.id));
823 let signup = signup.update(&tx).await?;
824
825 if let Some(inviting_user_id) = signup.inviting_user_id {
826 let result = user::Entity::update_many()
827 .filter(
828 user::Column::Id
829 .eq(inviting_user_id)
830 .and(user::Column::InviteCount.gt(0)),
831 )
832 .col_expr(
833 user::Column::InviteCount,
834 Expr::col(user::Column::InviteCount).sub(1),
835 )
836 .exec(&tx)
837 .await?;
838
839 if result.rows_affected == 0 {
840 Err(Error::Http(
841 StatusCode::UNAUTHORIZED,
842 "no invites remaining".to_string(),
843 ))?;
844 }
845
846 contact::Entity::insert(contact::ActiveModel {
847 user_id_a: ActiveValue::set(inviting_user_id),
848 user_id_b: ActiveValue::set(user.id),
849 a_to_b: ActiveValue::set(true),
850 should_notify: ActiveValue::set(true),
851 accepted: ActiveValue::set(true),
852 ..Default::default()
853 })
854 .on_conflict(OnConflict::new().do_nothing().to_owned())
855 .exec_without_returning(&tx)
856 .await?;
857 }
858
859 tx.commit().await?;
860 Ok(Some(NewUserResult {
861 user_id: user.id,
862 metrics_id: user.metrics_id.to_string(),
863 inviting_user_id: signup.inviting_user_id,
864 signup_device_id: signup.device_id,
865 }))
866 })
867 .await
868 }
869
870 pub async fn set_invite_count_for_user(&self, id: UserId, count: u32) -> Result<()> {
871 self.transact(|tx| async move {
872 if count > 0 {
873 user::Entity::update_many()
874 .filter(
875 user::Column::Id
876 .eq(id)
877 .and(user::Column::InviteCode.is_null()),
878 )
879 .col_expr(user::Column::InviteCode, random_invite_code().into())
880 .exec(&tx)
881 .await?;
882 }
883
884 user::Entity::update_many()
885 .filter(user::Column::Id.eq(id))
886 .col_expr(user::Column::InviteCount, count.into())
887 .exec(&tx)
888 .await?;
889 tx.commit().await?;
890 Ok(())
891 })
892 .await
893 }
894
895 pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, u32)>> {
896 self.transact(|tx| async move {
897 match user::Entity::find_by_id(id).one(&tx).await? {
898 Some(user) if user.invite_code.is_some() => {
899 Ok(Some((user.invite_code.unwrap(), user.invite_count as u32)))
900 }
901 _ => Ok(None),
902 }
903 })
904 .await
905 }
906
907 pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
908 self.transact(|tx| async move {
909 user::Entity::find()
910 .filter(user::Column::InviteCode.eq(code))
911 .one(&tx)
912 .await?
913 .ok_or_else(|| {
914 Error::Http(
915 StatusCode::NOT_FOUND,
916 "that invite code does not exist".to_string(),
917 )
918 })
919 })
920 .await
921 }
922
923 // rooms
924
925 pub async fn incoming_call_for_user(
926 &self,
927 user_id: UserId,
928 ) -> Result<Option<proto::IncomingCall>> {
929 self.transact(|tx| async move {
930 let pending_participant = room_participant::Entity::find()
931 .filter(
932 room_participant::Column::UserId
933 .eq(user_id)
934 .and(room_participant::Column::AnsweringConnectionId.is_null()),
935 )
936 .one(&tx)
937 .await?;
938
939 if let Some(pending_participant) = pending_participant {
940 let room = self.get_room(pending_participant.room_id, &tx).await?;
941 Ok(Self::build_incoming_call(&room, user_id))
942 } else {
943 Ok(None)
944 }
945 })
946 .await
947 }
948
949 pub async fn create_room(
950 &self,
951 user_id: UserId,
952 connection_id: ConnectionId,
953 live_kit_room: &str,
954 ) -> Result<RoomGuard<proto::Room>> {
955 self.transact(|tx| async move {
956 let room = room::ActiveModel {
957 live_kit_room: ActiveValue::set(live_kit_room.into()),
958 ..Default::default()
959 }
960 .insert(&tx)
961 .await?;
962 let room_id = room.id;
963
964 room_participant::ActiveModel {
965 room_id: ActiveValue::set(room_id),
966 user_id: ActiveValue::set(user_id),
967 answering_connection_id: ActiveValue::set(Some(connection_id.0 as i32)),
968 calling_user_id: ActiveValue::set(user_id),
969 calling_connection_id: ActiveValue::set(connection_id.0 as i32),
970 ..Default::default()
971 }
972 .insert(&tx)
973 .await?;
974
975 let room = self.get_room(room_id, &tx).await?;
976 self.commit_room_transaction(room_id, tx, room).await
977 })
978 .await
979 }
980
981 pub async fn call(
982 &self,
983 room_id: RoomId,
984 calling_user_id: UserId,
985 calling_connection_id: ConnectionId,
986 called_user_id: UserId,
987 initial_project_id: Option<ProjectId>,
988 ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
989 self.transact(|tx| async move {
990 room_participant::ActiveModel {
991 room_id: ActiveValue::set(room_id),
992 user_id: ActiveValue::set(called_user_id),
993 calling_user_id: ActiveValue::set(calling_user_id),
994 calling_connection_id: ActiveValue::set(calling_connection_id.0 as i32),
995 initial_project_id: ActiveValue::set(initial_project_id),
996 ..Default::default()
997 }
998 .insert(&tx)
999 .await?;
1000
1001 let room = self.get_room(room_id, &tx).await?;
1002 let incoming_call = Self::build_incoming_call(&room, called_user_id)
1003 .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1004 self.commit_room_transaction(room_id, tx, (room, incoming_call))
1005 .await
1006 })
1007 .await
1008 }
1009
1010 pub async fn call_failed(
1011 &self,
1012 room_id: RoomId,
1013 called_user_id: UserId,
1014 ) -> Result<RoomGuard<proto::Room>> {
1015 self.transact(|tx| async move {
1016 room_participant::Entity::delete_many()
1017 .filter(
1018 room_participant::Column::RoomId
1019 .eq(room_id)
1020 .and(room_participant::Column::UserId.eq(called_user_id)),
1021 )
1022 .exec(&tx)
1023 .await?;
1024 let room = self.get_room(room_id, &tx).await?;
1025 self.commit_room_transaction(room_id, tx, room).await
1026 })
1027 .await
1028 }
1029
1030 pub async fn decline_call(
1031 &self,
1032 expected_room_id: Option<RoomId>,
1033 user_id: UserId,
1034 ) -> Result<RoomGuard<proto::Room>> {
1035 self.transact(|tx| async move {
1036 let participant = room_participant::Entity::find()
1037 .filter(
1038 room_participant::Column::UserId
1039 .eq(user_id)
1040 .and(room_participant::Column::AnsweringConnectionId.is_null()),
1041 )
1042 .one(&tx)
1043 .await?
1044 .ok_or_else(|| anyhow!("could not decline call"))?;
1045 let room_id = participant.room_id;
1046
1047 if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) {
1048 return Err(anyhow!("declining call on unexpected room"))?;
1049 }
1050
1051 room_participant::Entity::delete(participant.into_active_model())
1052 .exec(&tx)
1053 .await?;
1054
1055 let room = self.get_room(room_id, &tx).await?;
1056 self.commit_room_transaction(room_id, tx, room).await
1057 })
1058 .await
1059 }
1060
1061 pub async fn cancel_call(
1062 &self,
1063 expected_room_id: Option<RoomId>,
1064 calling_connection_id: ConnectionId,
1065 called_user_id: UserId,
1066 ) -> Result<RoomGuard<proto::Room>> {
1067 self.transact(|tx| async move {
1068 let participant = room_participant::Entity::find()
1069 .filter(
1070 room_participant::Column::UserId
1071 .eq(called_user_id)
1072 .and(
1073 room_participant::Column::CallingConnectionId
1074 .eq(calling_connection_id.0 as i32),
1075 )
1076 .and(room_participant::Column::AnsweringConnectionId.is_null()),
1077 )
1078 .one(&tx)
1079 .await?
1080 .ok_or_else(|| anyhow!("could not cancel call"))?;
1081 let room_id = participant.room_id;
1082 if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) {
1083 return Err(anyhow!("canceling call on unexpected room"))?;
1084 }
1085
1086 room_participant::Entity::delete(participant.into_active_model())
1087 .exec(&tx)
1088 .await?;
1089
1090 let room = self.get_room(room_id, &tx).await?;
1091 self.commit_room_transaction(room_id, tx, room).await
1092 })
1093 .await
1094 }
1095
1096 pub async fn join_room(
1097 &self,
1098 room_id: RoomId,
1099 user_id: UserId,
1100 connection_id: ConnectionId,
1101 ) -> Result<RoomGuard<proto::Room>> {
1102 self.transact(|tx| async move {
1103 let result = room_participant::Entity::update_many()
1104 .filter(
1105 room_participant::Column::RoomId
1106 .eq(room_id)
1107 .and(room_participant::Column::UserId.eq(user_id))
1108 .and(room_participant::Column::AnsweringConnectionId.is_null()),
1109 )
1110 .col_expr(
1111 room_participant::Column::AnsweringConnectionId,
1112 connection_id.0.into(),
1113 )
1114 .exec(&tx)
1115 .await?;
1116 if result.rows_affected == 0 {
1117 Err(anyhow!("room does not exist or was already joined"))?
1118 } else {
1119 let room = self.get_room(room_id, &tx).await?;
1120 self.commit_room_transaction(room_id, tx, room).await
1121 }
1122 })
1123 .await
1124 }
1125
1126 pub async fn leave_room(
1127 &self,
1128 connection_id: ConnectionId,
1129 ) -> Result<Option<RoomGuard<LeftRoom>>> {
1130 self.transact(|tx| async move {
1131 let leaving_participant = room_participant::Entity::find()
1132 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0))
1133 .one(&tx)
1134 .await?;
1135
1136 if let Some(leaving_participant) = leaving_participant {
1137 // Leave room.
1138 let room_id = leaving_participant.room_id;
1139 room_participant::Entity::delete_by_id(leaving_participant.id)
1140 .exec(&tx)
1141 .await?;
1142
1143 // Cancel pending calls initiated by the leaving user.
1144 let called_participants = room_participant::Entity::find()
1145 .filter(
1146 room_participant::Column::CallingConnectionId
1147 .eq(connection_id.0)
1148 .and(room_participant::Column::AnsweringConnectionId.is_null()),
1149 )
1150 .all(&tx)
1151 .await?;
1152 room_participant::Entity::delete_many()
1153 .filter(
1154 room_participant::Column::Id
1155 .is_in(called_participants.iter().map(|participant| participant.id)),
1156 )
1157 .exec(&tx)
1158 .await?;
1159 let canceled_calls_to_user_ids = called_participants
1160 .into_iter()
1161 .map(|participant| participant.user_id)
1162 .collect();
1163
1164 // Detect left projects.
1165 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1166 enum QueryProjectIds {
1167 ProjectId,
1168 }
1169 let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1170 .select_only()
1171 .column_as(
1172 project_collaborator::Column::ProjectId,
1173 QueryProjectIds::ProjectId,
1174 )
1175 .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0))
1176 .into_values::<_, QueryProjectIds>()
1177 .all(&tx)
1178 .await?;
1179 let mut left_projects = HashMap::default();
1180 let mut collaborators = project_collaborator::Entity::find()
1181 .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1182 .stream(&tx)
1183 .await?;
1184 while let Some(collaborator) = collaborators.next().await {
1185 let collaborator = collaborator?;
1186 let left_project =
1187 left_projects
1188 .entry(collaborator.project_id)
1189 .or_insert(LeftProject {
1190 id: collaborator.project_id,
1191 host_user_id: Default::default(),
1192 connection_ids: Default::default(),
1193 host_connection_id: Default::default(),
1194 });
1195
1196 let collaborator_connection_id =
1197 ConnectionId(collaborator.connection_id as u32);
1198 if collaborator_connection_id != connection_id {
1199 left_project.connection_ids.push(collaborator_connection_id);
1200 }
1201
1202 if collaborator.is_host {
1203 left_project.host_user_id = collaborator.user_id;
1204 left_project.host_connection_id =
1205 ConnectionId(collaborator.connection_id as u32);
1206 }
1207 }
1208 drop(collaborators);
1209
1210 // Leave projects.
1211 project_collaborator::Entity::delete_many()
1212 .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0))
1213 .exec(&tx)
1214 .await?;
1215
1216 // Unshare projects.
1217 project::Entity::delete_many()
1218 .filter(
1219 project::Column::RoomId
1220 .eq(room_id)
1221 .and(project::Column::HostConnectionId.eq(connection_id.0)),
1222 )
1223 .exec(&tx)
1224 .await?;
1225
1226 let room = self.get_room(room_id, &tx).await?;
1227 Ok(Some(
1228 self.commit_room_transaction(
1229 room_id,
1230 tx,
1231 LeftRoom {
1232 room,
1233 left_projects,
1234 canceled_calls_to_user_ids,
1235 },
1236 )
1237 .await?,
1238 ))
1239 } else {
1240 Ok(None)
1241 }
1242 })
1243 .await
1244 }
1245
1246 pub async fn update_room_participant_location(
1247 &self,
1248 room_id: RoomId,
1249 connection_id: ConnectionId,
1250 location: proto::ParticipantLocation,
1251 ) -> Result<RoomGuard<proto::Room>> {
1252 self.transact(|tx| async {
1253 let mut tx = tx;
1254 let location_kind;
1255 let location_project_id;
1256 match location
1257 .variant
1258 .as_ref()
1259 .ok_or_else(|| anyhow!("invalid location"))?
1260 {
1261 proto::participant_location::Variant::SharedProject(project) => {
1262 location_kind = 0;
1263 location_project_id = Some(ProjectId::from_proto(project.id));
1264 }
1265 proto::participant_location::Variant::UnsharedProject(_) => {
1266 location_kind = 1;
1267 location_project_id = None;
1268 }
1269 proto::participant_location::Variant::External(_) => {
1270 location_kind = 2;
1271 location_project_id = None;
1272 }
1273 }
1274
1275 let result = room_participant::Entity::update_many()
1276 .filter(
1277 room_participant::Column::RoomId
1278 .eq(room_id)
1279 .and(room_participant::Column::AnsweringConnectionId.eq(connection_id.0)),
1280 )
1281 .set(room_participant::ActiveModel {
1282 location_kind: ActiveValue::set(Some(location_kind)),
1283 location_project_id: ActiveValue::set(location_project_id),
1284 ..Default::default()
1285 })
1286 .exec(&tx)
1287 .await?;
1288
1289 if result.rows_affected == 1 {
1290 let room = self.get_room(room_id, &mut tx).await?;
1291 self.commit_room_transaction(room_id, tx, room).await
1292 } else {
1293 Err(anyhow!("could not update room participant location"))?
1294 }
1295 })
1296 .await
1297 }
1298
1299 async fn get_guest_connection_ids(
1300 &self,
1301 project_id: ProjectId,
1302 tx: &DatabaseTransaction,
1303 ) -> Result<Vec<ConnectionId>> {
1304 todo!()
1305 // let mut guest_connection_ids = Vec::new();
1306 // let mut db_guest_connection_ids = sqlx::query_scalar::<_, i32>(
1307 // "
1308 // SELECT connection_id
1309 // FROM project_collaborators
1310 // WHERE project_id = $1 AND is_host = FALSE
1311 // ",
1312 // )
1313 // .bind(project_id)
1314 // .fetch(tx);
1315 // while let Some(connection_id) = db_guest_connection_ids.next().await {
1316 // guest_connection_ids.push(ConnectionId(connection_id? as u32));
1317 // }
1318 // Ok(guest_connection_ids)
1319 }
1320
1321 fn build_incoming_call(
1322 room: &proto::Room,
1323 called_user_id: UserId,
1324 ) -> Option<proto::IncomingCall> {
1325 let pending_participant = room
1326 .pending_participants
1327 .iter()
1328 .find(|participant| participant.user_id == called_user_id.to_proto())?;
1329
1330 Some(proto::IncomingCall {
1331 room_id: room.id,
1332 calling_user_id: pending_participant.calling_user_id,
1333 participant_user_ids: room
1334 .participants
1335 .iter()
1336 .map(|participant| participant.user_id)
1337 .collect(),
1338 initial_project: room.participants.iter().find_map(|participant| {
1339 let initial_project_id = pending_participant.initial_project_id?;
1340 participant
1341 .projects
1342 .iter()
1343 .find(|project| project.id == initial_project_id)
1344 .cloned()
1345 }),
1346 })
1347 }
1348
1349 async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1350 let db_room = room::Entity::find_by_id(room_id)
1351 .one(tx)
1352 .await?
1353 .ok_or_else(|| anyhow!("could not find room"))?;
1354
1355 let mut db_participants = db_room
1356 .find_related(room_participant::Entity)
1357 .stream(tx)
1358 .await?;
1359 let mut participants = HashMap::default();
1360 let mut pending_participants = Vec::new();
1361 while let Some(db_participant) = db_participants.next().await {
1362 let db_participant = db_participant?;
1363 if let Some(answering_connection_id) = db_participant.answering_connection_id {
1364 let location = match (
1365 db_participant.location_kind,
1366 db_participant.location_project_id,
1367 ) {
1368 (Some(0), Some(project_id)) => {
1369 Some(proto::participant_location::Variant::SharedProject(
1370 proto::participant_location::SharedProject {
1371 id: project_id.to_proto(),
1372 },
1373 ))
1374 }
1375 (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1376 Default::default(),
1377 )),
1378 _ => Some(proto::participant_location::Variant::External(
1379 Default::default(),
1380 )),
1381 };
1382 participants.insert(
1383 answering_connection_id,
1384 proto::Participant {
1385 user_id: db_participant.user_id.to_proto(),
1386 peer_id: answering_connection_id as u32,
1387 projects: Default::default(),
1388 location: Some(proto::ParticipantLocation { variant: location }),
1389 },
1390 );
1391 } else {
1392 pending_participants.push(proto::PendingParticipant {
1393 user_id: db_participant.user_id.to_proto(),
1394 calling_user_id: db_participant.calling_user_id.to_proto(),
1395 initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1396 });
1397 }
1398 }
1399 drop(db_participants);
1400
1401 let mut db_projects = db_room
1402 .find_related(project::Entity)
1403 .find_with_related(worktree::Entity)
1404 .stream(tx)
1405 .await?;
1406
1407 while let Some(row) = db_projects.next().await {
1408 let (db_project, db_worktree) = row?;
1409 if let Some(participant) = participants.get_mut(&db_project.host_connection_id) {
1410 let project = if let Some(project) = participant
1411 .projects
1412 .iter_mut()
1413 .find(|project| project.id == db_project.id.to_proto())
1414 {
1415 project
1416 } else {
1417 participant.projects.push(proto::ParticipantProject {
1418 id: db_project.id.to_proto(),
1419 worktree_root_names: Default::default(),
1420 });
1421 participant.projects.last_mut().unwrap()
1422 };
1423
1424 if let Some(db_worktree) = db_worktree {
1425 project.worktree_root_names.push(db_worktree.root_name);
1426 }
1427 }
1428 }
1429
1430 Ok(proto::Room {
1431 id: db_room.id.to_proto(),
1432 live_kit_room: db_room.live_kit_room,
1433 participants: participants.into_values().collect(),
1434 pending_participants,
1435 })
1436 }
1437
1438 async fn commit_room_transaction<T>(
1439 &self,
1440 room_id: RoomId,
1441 tx: DatabaseTransaction,
1442 data: T,
1443 ) -> Result<RoomGuard<T>> {
1444 let lock = self.rooms.entry(room_id).or_default().clone();
1445 let _guard = lock.lock_owned().await;
1446 tx.commit().await?;
1447 Ok(RoomGuard {
1448 data,
1449 _guard,
1450 _not_send: PhantomData,
1451 })
1452 }
1453
1454 // projects
1455
1456 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
1457 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1458 enum QueryAs {
1459 Count,
1460 }
1461
1462 self.transact(|tx| async move {
1463 Ok(project::Entity::find()
1464 .select_only()
1465 .column_as(project::Column::Id.count(), QueryAs::Count)
1466 .inner_join(user::Entity)
1467 .filter(user::Column::Admin.eq(false))
1468 .into_values::<_, QueryAs>()
1469 .one(&tx)
1470 .await?
1471 .unwrap_or(0) as usize)
1472 })
1473 .await
1474 }
1475
1476 pub async fn share_project(
1477 &self,
1478 room_id: RoomId,
1479 connection_id: ConnectionId,
1480 worktrees: &[proto::WorktreeMetadata],
1481 ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
1482 self.transact(|tx| async move {
1483 let participant = room_participant::Entity::find()
1484 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0))
1485 .one(&tx)
1486 .await?
1487 .ok_or_else(|| anyhow!("could not find participant"))?;
1488 if participant.room_id != room_id {
1489 return Err(anyhow!("shared project on unexpected room"))?;
1490 }
1491
1492 let project = project::ActiveModel {
1493 room_id: ActiveValue::set(participant.room_id),
1494 host_user_id: ActiveValue::set(participant.user_id),
1495 host_connection_id: ActiveValue::set(connection_id.0 as i32),
1496 ..Default::default()
1497 }
1498 .insert(&tx)
1499 .await?;
1500
1501 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
1502 id: ActiveValue::set(worktree.id as i32),
1503 project_id: ActiveValue::set(project.id),
1504 abs_path: ActiveValue::set(worktree.abs_path.clone()),
1505 root_name: ActiveValue::set(worktree.root_name.clone()),
1506 visible: ActiveValue::set(worktree.visible),
1507 scan_id: ActiveValue::set(0),
1508 is_complete: ActiveValue::set(false),
1509 }))
1510 .exec(&tx)
1511 .await?;
1512
1513 project_collaborator::ActiveModel {
1514 project_id: ActiveValue::set(project.id),
1515 connection_id: ActiveValue::set(connection_id.0 as i32),
1516 user_id: ActiveValue::set(participant.user_id),
1517 replica_id: ActiveValue::set(ReplicaId(0)),
1518 is_host: ActiveValue::set(true),
1519 ..Default::default()
1520 }
1521 .insert(&tx)
1522 .await?;
1523
1524 let room = self.get_room(room_id, &tx).await?;
1525 self.commit_room_transaction(room_id, tx, (project.id, room))
1526 .await
1527 })
1528 .await
1529 }
1530
1531 pub async fn unshare_project(
1532 &self,
1533 project_id: ProjectId,
1534 connection_id: ConnectionId,
1535 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
1536 self.transact(|tx| async move {
1537 todo!()
1538 // let guest_connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?;
1539 // let room_id: RoomId = sqlx::query_scalar(
1540 // "
1541 // DELETE FROM projects
1542 // WHERE id = $1 AND host_connection_id = $2
1543 // RETURNING room_id
1544 // ",
1545 // )
1546 // .bind(project_id)
1547 // .bind(connection_id.0 as i32)
1548 // .fetch_one(&mut tx)
1549 // .await?;
1550 // let room = self.get_room(room_id, &mut tx).await?;
1551 // self.commit_room_transaction(room_id, tx, (room, guest_connection_ids))
1552 // .await
1553 })
1554 .await
1555 }
1556
1557 pub async fn update_project(
1558 &self,
1559 project_id: ProjectId,
1560 connection_id: ConnectionId,
1561 worktrees: &[proto::WorktreeMetadata],
1562 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
1563 self.transact(|tx| async move {
1564 todo!()
1565 // let room_id: RoomId = sqlx::query_scalar(
1566 // "
1567 // SELECT room_id
1568 // FROM projects
1569 // WHERE id = $1 AND host_connection_id = $2
1570 // ",
1571 // )
1572 // .bind(project_id)
1573 // .bind(connection_id.0 as i32)
1574 // .fetch_one(&mut tx)
1575 // .await?;
1576
1577 // if !worktrees.is_empty() {
1578 // let mut params = "(?, ?, ?, ?, ?, ?, ?),".repeat(worktrees.len());
1579 // params.pop();
1580 // let query = format!(
1581 // "
1582 // INSERT INTO worktrees (
1583 // project_id,
1584 // id,
1585 // root_name,
1586 // abs_path,
1587 // visible,
1588 // scan_id,
1589 // is_complete
1590 // )
1591 // VALUES {params}
1592 // ON CONFLICT (project_id, id) DO UPDATE SET root_name = excluded.root_name
1593 // "
1594 // );
1595
1596 // let mut query = sqlx::query(&query);
1597 // for worktree in worktrees {
1598 // query = query
1599 // .bind(project_id)
1600 // .bind(worktree.id as i32)
1601 // .bind(&worktree.root_name)
1602 // .bind(&worktree.abs_path)
1603 // .bind(worktree.visible)
1604 // .bind(0)
1605 // .bind(false)
1606 // }
1607 // query.execute(&mut tx).await?;
1608 // }
1609
1610 // let mut params = "?,".repeat(worktrees.len());
1611 // if !worktrees.is_empty() {
1612 // params.pop();
1613 // }
1614 // let query = format!(
1615 // "
1616 // DELETE FROM worktrees
1617 // WHERE project_id = ? AND id NOT IN ({params})
1618 // ",
1619 // );
1620
1621 // let mut query = sqlx::query(&query).bind(project_id);
1622 // for worktree in worktrees {
1623 // query = query.bind(WorktreeId(worktree.id as i32));
1624 // }
1625 // query.execute(&mut tx).await?;
1626
1627 // let guest_connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?;
1628 // let room = self.get_room(room_id, &mut tx).await?;
1629 // self.commit_room_transaction(room_id, tx, (room, guest_connection_ids))
1630 // .await
1631 })
1632 .await
1633 }
1634
1635 pub async fn update_worktree(
1636 &self,
1637 update: &proto::UpdateWorktree,
1638 connection_id: ConnectionId,
1639 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
1640 self.transact(|tx| async move {
1641 todo!()
1642 // let project_id = ProjectId::from_proto(update.project_id);
1643 // let worktree_id = WorktreeId::from_proto(update.worktree_id);
1644
1645 // // Ensure the update comes from the host.
1646 // let room_id: RoomId = sqlx::query_scalar(
1647 // "
1648 // SELECT room_id
1649 // FROM projects
1650 // WHERE id = $1 AND host_connection_id = $2
1651 // ",
1652 // )
1653 // .bind(project_id)
1654 // .bind(connection_id.0 as i32)
1655 // .fetch_one(&mut tx)
1656 // .await?;
1657
1658 // // Update metadata.
1659 // sqlx::query(
1660 // "
1661 // UPDATE worktrees
1662 // SET
1663 // root_name = $1,
1664 // scan_id = $2,
1665 // is_complete = $3,
1666 // abs_path = $4
1667 // WHERE project_id = $5 AND id = $6
1668 // RETURNING 1
1669 // ",
1670 // )
1671 // .bind(&update.root_name)
1672 // .bind(update.scan_id as i64)
1673 // .bind(update.is_last_update)
1674 // .bind(&update.abs_path)
1675 // .bind(project_id)
1676 // .bind(worktree_id)
1677 // .fetch_one(&mut tx)
1678 // .await?;
1679
1680 // if !update.updated_entries.is_empty() {
1681 // let mut params =
1682 // "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?),".repeat(update.updated_entries.len());
1683 // params.pop();
1684
1685 // let query = format!(
1686 // "
1687 // INSERT INTO worktree_entries (
1688 // project_id,
1689 // worktree_id,
1690 // id,
1691 // is_dir,
1692 // path,
1693 // inode,
1694 // mtime_seconds,
1695 // mtime_nanos,
1696 // is_symlink,
1697 // is_ignored
1698 // )
1699 // VALUES {params}
1700 // ON CONFLICT (project_id, worktree_id, id) DO UPDATE SET
1701 // is_dir = excluded.is_dir,
1702 // path = excluded.path,
1703 // inode = excluded.inode,
1704 // mtime_seconds = excluded.mtime_seconds,
1705 // mtime_nanos = excluded.mtime_nanos,
1706 // is_symlink = excluded.is_symlink,
1707 // is_ignored = excluded.is_ignored
1708 // "
1709 // );
1710 // let mut query = sqlx::query(&query);
1711 // for entry in &update.updated_entries {
1712 // let mtime = entry.mtime.clone().unwrap_or_default();
1713 // query = query
1714 // .bind(project_id)
1715 // .bind(worktree_id)
1716 // .bind(entry.id as i64)
1717 // .bind(entry.is_dir)
1718 // .bind(&entry.path)
1719 // .bind(entry.inode as i64)
1720 // .bind(mtime.seconds as i64)
1721 // .bind(mtime.nanos as i32)
1722 // .bind(entry.is_symlink)
1723 // .bind(entry.is_ignored);
1724 // }
1725 // query.execute(&mut tx).await?;
1726 // }
1727
1728 // if !update.removed_entries.is_empty() {
1729 // let mut params = "?,".repeat(update.removed_entries.len());
1730 // params.pop();
1731 // let query = format!(
1732 // "
1733 // DELETE FROM worktree_entries
1734 // WHERE project_id = ? AND worktree_id = ? AND id IN ({params})
1735 // "
1736 // );
1737
1738 // let mut query = sqlx::query(&query).bind(project_id).bind(worktree_id);
1739 // for entry_id in &update.removed_entries {
1740 // query = query.bind(*entry_id as i64);
1741 // }
1742 // query.execute(&mut tx).await?;
1743 // }
1744
1745 // let connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?;
1746 // self.commit_room_transaction(room_id, tx, connection_ids)
1747 // .await
1748 })
1749 .await
1750 }
1751
1752 pub async fn update_diagnostic_summary(
1753 &self,
1754 update: &proto::UpdateDiagnosticSummary,
1755 connection_id: ConnectionId,
1756 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
1757 self.transact(|tx| async {
1758 todo!()
1759 // let project_id = ProjectId::from_proto(update.project_id);
1760 // let worktree_id = WorktreeId::from_proto(update.worktree_id);
1761 // let summary = update
1762 // .summary
1763 // .as_ref()
1764 // .ok_or_else(|| anyhow!("invalid summary"))?;
1765
1766 // // Ensure the update comes from the host.
1767 // let room_id: RoomId = sqlx::query_scalar(
1768 // "
1769 // SELECT room_id
1770 // FROM projects
1771 // WHERE id = $1 AND host_connection_id = $2
1772 // ",
1773 // )
1774 // .bind(project_id)
1775 // .bind(connection_id.0 as i32)
1776 // .fetch_one(&mut tx)
1777 // .await?;
1778
1779 // // Update summary.
1780 // sqlx::query(
1781 // "
1782 // INSERT INTO worktree_diagnostic_summaries (
1783 // project_id,
1784 // worktree_id,
1785 // path,
1786 // language_server_id,
1787 // error_count,
1788 // warning_count
1789 // )
1790 // VALUES ($1, $2, $3, $4, $5, $6)
1791 // ON CONFLICT (project_id, worktree_id, path) DO UPDATE SET
1792 // language_server_id = excluded.language_server_id,
1793 // error_count = excluded.error_count,
1794 // warning_count = excluded.warning_count
1795 // ",
1796 // )
1797 // .bind(project_id)
1798 // .bind(worktree_id)
1799 // .bind(&summary.path)
1800 // .bind(summary.language_server_id as i64)
1801 // .bind(summary.error_count as i32)
1802 // .bind(summary.warning_count as i32)
1803 // .execute(&mut tx)
1804 // .await?;
1805
1806 // let connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?;
1807 // self.commit_room_transaction(room_id, tx, connection_ids)
1808 // .await
1809 })
1810 .await
1811 }
1812
1813 pub async fn start_language_server(
1814 &self,
1815 update: &proto::StartLanguageServer,
1816 connection_id: ConnectionId,
1817 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
1818 self.transact(|tx| async {
1819 todo!()
1820 // let project_id = ProjectId::from_proto(update.project_id);
1821 // let server = update
1822 // .server
1823 // .as_ref()
1824 // .ok_or_else(|| anyhow!("invalid language server"))?;
1825
1826 // // Ensure the update comes from the host.
1827 // let room_id: RoomId = sqlx::query_scalar(
1828 // "
1829 // SELECT room_id
1830 // FROM projects
1831 // WHERE id = $1 AND host_connection_id = $2
1832 // ",
1833 // )
1834 // .bind(project_id)
1835 // .bind(connection_id.0 as i32)
1836 // .fetch_one(&mut tx)
1837 // .await?;
1838
1839 // // Add the newly-started language server.
1840 // sqlx::query(
1841 // "
1842 // INSERT INTO language_servers (project_id, id, name)
1843 // VALUES ($1, $2, $3)
1844 // ON CONFLICT (project_id, id) DO UPDATE SET
1845 // name = excluded.name
1846 // ",
1847 // )
1848 // .bind(project_id)
1849 // .bind(server.id as i64)
1850 // .bind(&server.name)
1851 // .execute(&mut tx)
1852 // .await?;
1853
1854 // let connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?;
1855 // self.commit_room_transaction(room_id, tx, connection_ids)
1856 // .await
1857 })
1858 .await
1859 }
1860
1861 pub async fn join_project(
1862 &self,
1863 project_id: ProjectId,
1864 connection_id: ConnectionId,
1865 ) -> Result<RoomGuard<(Project, ReplicaId)>> {
1866 self.transact(|tx| async move {
1867 todo!()
1868 // let (room_id, user_id) = sqlx::query_as::<_, (RoomId, UserId)>(
1869 // "
1870 // SELECT room_id, user_id
1871 // FROM room_participants
1872 // WHERE answering_connection_id = $1
1873 // ",
1874 // )
1875 // .bind(connection_id.0 as i32)
1876 // .fetch_one(&mut tx)
1877 // .await?;
1878
1879 // // Ensure project id was shared on this room.
1880 // sqlx::query(
1881 // "
1882 // SELECT 1
1883 // FROM projects
1884 // WHERE id = $1 AND room_id = $2
1885 // ",
1886 // )
1887 // .bind(project_id)
1888 // .bind(room_id)
1889 // .fetch_one(&mut tx)
1890 // .await?;
1891
1892 // let mut collaborators = sqlx::query_as::<_, ProjectCollaborator>(
1893 // "
1894 // SELECT *
1895 // FROM project_collaborators
1896 // WHERE project_id = $1
1897 // ",
1898 // )
1899 // .bind(project_id)
1900 // .fetch_all(&mut tx)
1901 // .await?;
1902 // let replica_ids = collaborators
1903 // .iter()
1904 // .map(|c| c.replica_id)
1905 // .collect::<HashSet<_>>();
1906 // let mut replica_id = ReplicaId(1);
1907 // while replica_ids.contains(&replica_id) {
1908 // replica_id.0 += 1;
1909 // }
1910 // let new_collaborator = ProjectCollaborator {
1911 // project_id,
1912 // connection_id: connection_id.0 as i32,
1913 // user_id,
1914 // replica_id,
1915 // is_host: false,
1916 // };
1917
1918 // sqlx::query(
1919 // "
1920 // INSERT INTO project_collaborators (
1921 // project_id,
1922 // connection_id,
1923 // user_id,
1924 // replica_id,
1925 // is_host
1926 // )
1927 // VALUES ($1, $2, $3, $4, $5)
1928 // ",
1929 // )
1930 // .bind(new_collaborator.project_id)
1931 // .bind(new_collaborator.connection_id)
1932 // .bind(new_collaborator.user_id)
1933 // .bind(new_collaborator.replica_id)
1934 // .bind(new_collaborator.is_host)
1935 // .execute(&mut tx)
1936 // .await?;
1937 // collaborators.push(new_collaborator);
1938
1939 // let worktree_rows = sqlx::query_as::<_, WorktreeRow>(
1940 // "
1941 // SELECT *
1942 // FROM worktrees
1943 // WHERE project_id = $1
1944 // ",
1945 // )
1946 // .bind(project_id)
1947 // .fetch_all(&mut tx)
1948 // .await?;
1949 // let mut worktrees = worktree_rows
1950 // .into_iter()
1951 // .map(|worktree_row| {
1952 // (
1953 // worktree_row.id,
1954 // Worktree {
1955 // id: worktree_row.id,
1956 // abs_path: worktree_row.abs_path,
1957 // root_name: worktree_row.root_name,
1958 // visible: worktree_row.visible,
1959 // entries: Default::default(),
1960 // diagnostic_summaries: Default::default(),
1961 // scan_id: worktree_row.scan_id as u64,
1962 // is_complete: worktree_row.is_complete,
1963 // },
1964 // )
1965 // })
1966 // .collect::<BTreeMap<_, _>>();
1967
1968 // // Populate worktree entries.
1969 // {
1970 // let mut entries = sqlx::query_as::<_, WorktreeEntry>(
1971 // "
1972 // SELECT *
1973 // FROM worktree_entries
1974 // WHERE project_id = $1
1975 // ",
1976 // )
1977 // .bind(project_id)
1978 // .fetch(&mut tx);
1979 // while let Some(entry) = entries.next().await {
1980 // let entry = entry?;
1981 // if let Some(worktree) = worktrees.get_mut(&entry.worktree_id) {
1982 // worktree.entries.push(proto::Entry {
1983 // id: entry.id as u64,
1984 // is_dir: entry.is_dir,
1985 // path: entry.path,
1986 // inode: entry.inode as u64,
1987 // mtime: Some(proto::Timestamp {
1988 // seconds: entry.mtime_seconds as u64,
1989 // nanos: entry.mtime_nanos as u32,
1990 // }),
1991 // is_symlink: entry.is_symlink,
1992 // is_ignored: entry.is_ignored,
1993 // });
1994 // }
1995 // }
1996 // }
1997
1998 // // Populate worktree diagnostic summaries.
1999 // {
2000 // let mut summaries = sqlx::query_as::<_, WorktreeDiagnosticSummary>(
2001 // "
2002 // SELECT *
2003 // FROM worktree_diagnostic_summaries
2004 // WHERE project_id = $1
2005 // ",
2006 // )
2007 // .bind(project_id)
2008 // .fetch(&mut tx);
2009 // while let Some(summary) = summaries.next().await {
2010 // let summary = summary?;
2011 // if let Some(worktree) = worktrees.get_mut(&summary.worktree_id) {
2012 // worktree
2013 // .diagnostic_summaries
2014 // .push(proto::DiagnosticSummary {
2015 // path: summary.path,
2016 // language_server_id: summary.language_server_id as u64,
2017 // error_count: summary.error_count as u32,
2018 // warning_count: summary.warning_count as u32,
2019 // });
2020 // }
2021 // }
2022 // }
2023
2024 // // Populate language servers.
2025 // let language_servers = sqlx::query_as::<_, LanguageServer>(
2026 // "
2027 // SELECT *
2028 // FROM language_servers
2029 // WHERE project_id = $1
2030 // ",
2031 // )
2032 // .bind(project_id)
2033 // .fetch_all(&mut tx)
2034 // .await?;
2035
2036 // self.commit_room_transaction(
2037 // room_id,
2038 // tx,
2039 // (
2040 // Project {
2041 // collaborators,
2042 // worktrees,
2043 // language_servers: language_servers
2044 // .into_iter()
2045 // .map(|language_server| proto::LanguageServer {
2046 // id: language_server.id.to_proto(),
2047 // name: language_server.name,
2048 // })
2049 // .collect(),
2050 // },
2051 // replica_id as ReplicaId,
2052 // ),
2053 // )
2054 // .await
2055 })
2056 .await
2057 }
2058
2059 pub async fn leave_project(
2060 &self,
2061 project_id: ProjectId,
2062 connection_id: ConnectionId,
2063 ) -> Result<RoomGuard<LeftProject>> {
2064 self.transact(|tx| async move {
2065 todo!()
2066 // let result = sqlx::query(
2067 // "
2068 // DELETE FROM project_collaborators
2069 // WHERE project_id = $1 AND connection_id = $2
2070 // ",
2071 // )
2072 // .bind(project_id)
2073 // .bind(connection_id.0 as i32)
2074 // .execute(&mut tx)
2075 // .await?;
2076
2077 // if result.rows_affected() == 0 {
2078 // Err(anyhow!("not a collaborator on this project"))?;
2079 // }
2080
2081 // let connection_ids = sqlx::query_scalar::<_, i32>(
2082 // "
2083 // SELECT connection_id
2084 // FROM project_collaborators
2085 // WHERE project_id = $1
2086 // ",
2087 // )
2088 // .bind(project_id)
2089 // .fetch_all(&mut tx)
2090 // .await?
2091 // .into_iter()
2092 // .map(|id| ConnectionId(id as u32))
2093 // .collect();
2094
2095 // let (room_id, host_user_id, host_connection_id) =
2096 // sqlx::query_as::<_, (RoomId, i32, i32)>(
2097 // "
2098 // SELECT room_id, host_user_id, host_connection_id
2099 // FROM projects
2100 // WHERE id = $1
2101 // ",
2102 // )
2103 // .bind(project_id)
2104 // .fetch_one(&mut tx)
2105 // .await?;
2106
2107 // self.commit_room_transaction(
2108 // room_id,
2109 // tx,
2110 // LeftProject {
2111 // id: project_id,
2112 // host_user_id: UserId(host_user_id),
2113 // host_connection_id: ConnectionId(host_connection_id as u32),
2114 // connection_ids,
2115 // },
2116 // )
2117 // .await
2118 })
2119 .await
2120 }
2121
2122 pub async fn project_collaborators(
2123 &self,
2124 project_id: ProjectId,
2125 connection_id: ConnectionId,
2126 ) -> Result<Vec<project_collaborator::Model>> {
2127 self.transact(|tx| async move {
2128 todo!()
2129 // let collaborators = sqlx::query_as::<_, ProjectCollaborator>(
2130 // "
2131 // SELECT *
2132 // FROM project_collaborators
2133 // WHERE project_id = $1
2134 // ",
2135 // )
2136 // .bind(project_id)
2137 // .fetch_all(&mut tx)
2138 // .await?;
2139
2140 // if collaborators
2141 // .iter()
2142 // .any(|collaborator| collaborator.connection_id == connection_id.0 as i32)
2143 // {
2144 // Ok(collaborators)
2145 // } else {
2146 // Err(anyhow!("no such project"))?
2147 // }
2148 })
2149 .await
2150 }
2151
2152 pub async fn project_connection_ids(
2153 &self,
2154 project_id: ProjectId,
2155 connection_id: ConnectionId,
2156 ) -> Result<HashSet<ConnectionId>> {
2157 self.transact(|tx| async move {
2158 todo!()
2159 // let connection_ids = sqlx::query_scalar::<_, i32>(
2160 // "
2161 // SELECT connection_id
2162 // FROM project_collaborators
2163 // WHERE project_id = $1
2164 // ",
2165 // )
2166 // .bind(project_id)
2167 // .fetch_all(&mut tx)
2168 // .await?;
2169
2170 // if connection_ids.contains(&(connection_id.0 as i32)) {
2171 // Ok(connection_ids
2172 // .into_iter()
2173 // .map(|connection_id| ConnectionId(connection_id as u32))
2174 // .collect())
2175 // } else {
2176 // Err(anyhow!("no such project"))?
2177 // }
2178 })
2179 .await
2180 }
2181
2182 // access tokens
2183
2184 pub async fn create_access_token_hash(
2185 &self,
2186 user_id: UserId,
2187 access_token_hash: &str,
2188 max_access_token_count: usize,
2189 ) -> Result<()> {
2190 self.transact(|tx| async {
2191 let tx = tx;
2192
2193 access_token::ActiveModel {
2194 user_id: ActiveValue::set(user_id),
2195 hash: ActiveValue::set(access_token_hash.into()),
2196 ..Default::default()
2197 }
2198 .insert(&tx)
2199 .await?;
2200
2201 access_token::Entity::delete_many()
2202 .filter(
2203 access_token::Column::Id.in_subquery(
2204 Query::select()
2205 .column(access_token::Column::Id)
2206 .from(access_token::Entity)
2207 .and_where(access_token::Column::UserId.eq(user_id))
2208 .order_by(access_token::Column::Id, sea_orm::Order::Desc)
2209 .limit(10000)
2210 .offset(max_access_token_count as u64)
2211 .to_owned(),
2212 ),
2213 )
2214 .exec(&tx)
2215 .await?;
2216 tx.commit().await?;
2217 Ok(())
2218 })
2219 .await
2220 }
2221
2222 pub async fn get_access_token_hashes(&self, user_id: UserId) -> Result<Vec<String>> {
2223 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2224 enum QueryAs {
2225 Hash,
2226 }
2227
2228 self.transact(|tx| async move {
2229 Ok(access_token::Entity::find()
2230 .select_only()
2231 .column(access_token::Column::Hash)
2232 .filter(access_token::Column::UserId.eq(user_id))
2233 .order_by_desc(access_token::Column::Id)
2234 .into_values::<_, QueryAs>()
2235 .all(&tx)
2236 .await?)
2237 })
2238 .await
2239 }
2240
2241 async fn transact<F, Fut, T>(&self, f: F) -> Result<T>
2242 where
2243 F: Send + Fn(DatabaseTransaction) -> Fut,
2244 Fut: Send + Future<Output = Result<T>>,
2245 {
2246 let body = async {
2247 loop {
2248 let tx = self.pool.begin().await?;
2249
2250 // In Postgres, serializable transactions are opt-in
2251 if let DatabaseBackend::Postgres = self.pool.get_database_backend() {
2252 tx.execute(Statement::from_string(
2253 DatabaseBackend::Postgres,
2254 "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;".into(),
2255 ))
2256 .await?;
2257 }
2258
2259 match f(tx).await {
2260 Ok(result) => return Ok(result),
2261 Err(error) => match error {
2262 Error::Database2(
2263 DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
2264 | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
2265 ) if error
2266 .as_database_error()
2267 .and_then(|error| error.code())
2268 .as_deref()
2269 == Some("40001") =>
2270 {
2271 // Retry (don't break the loop)
2272 }
2273 error @ _ => return Err(error),
2274 },
2275 }
2276 }
2277 };
2278
2279 #[cfg(test)]
2280 {
2281 if let Some(background) = self.background.as_ref() {
2282 background.simulate_random_delay().await;
2283 }
2284
2285 self.runtime.as_ref().unwrap().block_on(body)
2286 }
2287
2288 #[cfg(not(test))]
2289 {
2290 body.await
2291 }
2292 }
2293}
2294
2295pub struct RoomGuard<T> {
2296 data: T,
2297 _guard: OwnedMutexGuard<()>,
2298 _not_send: PhantomData<Rc<()>>,
2299}
2300
2301impl<T> Deref for RoomGuard<T> {
2302 type Target = T;
2303
2304 fn deref(&self) -> &T {
2305 &self.data
2306 }
2307}
2308
2309impl<T> DerefMut for RoomGuard<T> {
2310 fn deref_mut(&mut self) -> &mut T {
2311 &mut self.data
2312 }
2313}
2314
2315#[derive(Debug, Serialize, Deserialize)]
2316pub struct NewUserParams {
2317 pub github_login: String,
2318 pub github_user_id: i32,
2319 pub invite_count: i32,
2320}
2321
2322#[derive(Debug)]
2323pub struct NewUserResult {
2324 pub user_id: UserId,
2325 pub metrics_id: String,
2326 pub inviting_user_id: Option<UserId>,
2327 pub signup_device_id: Option<String>,
2328}
2329
2330fn random_invite_code() -> String {
2331 nanoid::nanoid!(16)
2332}
2333
2334fn random_email_confirmation_code() -> String {
2335 nanoid::nanoid!(64)
2336}
2337
2338macro_rules! id_type {
2339 ($name:ident) => {
2340 #[derive(
2341 Clone,
2342 Copy,
2343 Debug,
2344 Default,
2345 PartialEq,
2346 Eq,
2347 PartialOrd,
2348 Ord,
2349 Hash,
2350 sqlx::Type,
2351 Serialize,
2352 Deserialize,
2353 )]
2354 #[sqlx(transparent)]
2355 #[serde(transparent)]
2356 pub struct $name(pub i32);
2357
2358 impl $name {
2359 #[allow(unused)]
2360 pub const MAX: Self = Self(i32::MAX);
2361
2362 #[allow(unused)]
2363 pub fn from_proto(value: u64) -> Self {
2364 Self(value as i32)
2365 }
2366
2367 #[allow(unused)]
2368 pub fn to_proto(self) -> u64 {
2369 self.0 as u64
2370 }
2371 }
2372
2373 impl std::fmt::Display for $name {
2374 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2375 self.0.fmt(f)
2376 }
2377 }
2378
2379 impl From<$name> for sea_query::Value {
2380 fn from(value: $name) -> Self {
2381 sea_query::Value::Int(Some(value.0))
2382 }
2383 }
2384
2385 impl sea_orm::TryGetable for $name {
2386 fn try_get(
2387 res: &sea_orm::QueryResult,
2388 pre: &str,
2389 col: &str,
2390 ) -> Result<Self, sea_orm::TryGetError> {
2391 Ok(Self(i32::try_get(res, pre, col)?))
2392 }
2393 }
2394
2395 impl sea_query::ValueType for $name {
2396 fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
2397 match v {
2398 Value::TinyInt(Some(int)) => {
2399 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2400 }
2401 Value::SmallInt(Some(int)) => {
2402 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2403 }
2404 Value::Int(Some(int)) => {
2405 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2406 }
2407 Value::BigInt(Some(int)) => {
2408 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2409 }
2410 Value::TinyUnsigned(Some(int)) => {
2411 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2412 }
2413 Value::SmallUnsigned(Some(int)) => {
2414 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2415 }
2416 Value::Unsigned(Some(int)) => {
2417 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2418 }
2419 Value::BigUnsigned(Some(int)) => {
2420 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2421 }
2422 _ => Err(sea_query::ValueTypeErr),
2423 }
2424 }
2425
2426 fn type_name() -> String {
2427 stringify!($name).into()
2428 }
2429
2430 fn array_type() -> sea_query::ArrayType {
2431 sea_query::ArrayType::Int
2432 }
2433
2434 fn column_type() -> sea_query::ColumnType {
2435 sea_query::ColumnType::Integer(None)
2436 }
2437 }
2438
2439 impl sea_orm::TryFromU64 for $name {
2440 fn try_from_u64(n: u64) -> Result<Self, DbErr> {
2441 Ok(Self(n.try_into().map_err(|_| {
2442 DbErr::ConvertFromU64(concat!(
2443 "error converting ",
2444 stringify!($name),
2445 " to u64"
2446 ))
2447 })?))
2448 }
2449 }
2450
2451 impl sea_query::Nullable for $name {
2452 fn null() -> Value {
2453 Value::Int(None)
2454 }
2455 }
2456 };
2457}
2458
2459id_type!(AccessTokenId);
2460id_type!(ContactId);
2461id_type!(RoomId);
2462id_type!(RoomParticipantId);
2463id_type!(ProjectId);
2464id_type!(ProjectCollaboratorId);
2465id_type!(ReplicaId);
2466id_type!(SignupId);
2467id_type!(UserId);
2468id_type!(WorktreeId);
2469
2470pub struct LeftRoom {
2471 pub room: proto::Room,
2472 pub left_projects: HashMap<ProjectId, LeftProject>,
2473 pub canceled_calls_to_user_ids: Vec<UserId>,
2474}
2475
2476pub struct Project {
2477 pub collaborators: Vec<project_collaborator::Model>,
2478 pub worktrees: BTreeMap<WorktreeId, Worktree>,
2479 pub language_servers: Vec<proto::LanguageServer>,
2480}
2481
2482pub struct LeftProject {
2483 pub id: ProjectId,
2484 pub host_user_id: UserId,
2485 pub host_connection_id: ConnectionId,
2486 pub connection_ids: Vec<ConnectionId>,
2487}
2488
2489pub struct Worktree {
2490 pub id: WorktreeId,
2491 pub abs_path: String,
2492 pub root_name: String,
2493 pub visible: bool,
2494 pub entries: Vec<proto::Entry>,
2495 pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
2496 pub scan_id: u64,
2497 pub is_complete: bool,
2498}
2499
2500#[cfg(test)]
2501pub use test::*;
2502
2503#[cfg(test)]
2504mod test {
2505 use super::*;
2506 use gpui::executor::Background;
2507 use lazy_static::lazy_static;
2508 use parking_lot::Mutex;
2509 use rand::prelude::*;
2510 use sea_orm::ConnectionTrait;
2511 use sqlx::migrate::MigrateDatabase;
2512 use std::sync::Arc;
2513
2514 pub struct TestDb {
2515 pub db: Option<Arc<Database>>,
2516 pub connection: Option<sqlx::AnyConnection>,
2517 }
2518
2519 impl TestDb {
2520 pub fn sqlite(background: Arc<Background>) -> Self {
2521 let url = format!("sqlite::memory:");
2522 let runtime = tokio::runtime::Builder::new_current_thread()
2523 .enable_io()
2524 .enable_time()
2525 .build()
2526 .unwrap();
2527
2528 let mut db = runtime.block_on(async {
2529 let mut options = ConnectOptions::new(url);
2530 options.max_connections(5);
2531 let db = Database::new(options).await.unwrap();
2532 let sql = include_str!(concat!(
2533 env!("CARGO_MANIFEST_DIR"),
2534 "/migrations.sqlite/20221109000000_test_schema.sql"
2535 ));
2536 db.pool
2537 .execute(sea_orm::Statement::from_string(
2538 db.pool.get_database_backend(),
2539 sql.into(),
2540 ))
2541 .await
2542 .unwrap();
2543 db
2544 });
2545
2546 db.background = Some(background);
2547 db.runtime = Some(runtime);
2548
2549 Self {
2550 db: Some(Arc::new(db)),
2551 connection: None,
2552 }
2553 }
2554
2555 pub fn postgres(background: Arc<Background>) -> Self {
2556 lazy_static! {
2557 static ref LOCK: Mutex<()> = Mutex::new(());
2558 }
2559
2560 let _guard = LOCK.lock();
2561 let mut rng = StdRng::from_entropy();
2562 let url = format!(
2563 "postgres://postgres@localhost/zed-test-{}",
2564 rng.gen::<u128>()
2565 );
2566 let runtime = tokio::runtime::Builder::new_current_thread()
2567 .enable_io()
2568 .enable_time()
2569 .build()
2570 .unwrap();
2571
2572 let mut db = runtime.block_on(async {
2573 sqlx::Postgres::create_database(&url)
2574 .await
2575 .expect("failed to create test db");
2576 let mut options = ConnectOptions::new(url);
2577 options
2578 .max_connections(5)
2579 .idle_timeout(Duration::from_secs(0));
2580 let db = Database::new(options).await.unwrap();
2581 let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
2582 db.migrate(Path::new(migrations_path), false).await.unwrap();
2583 db
2584 });
2585
2586 db.background = Some(background);
2587 db.runtime = Some(runtime);
2588
2589 Self {
2590 db: Some(Arc::new(db)),
2591 connection: None,
2592 }
2593 }
2594
2595 pub fn db(&self) -> &Arc<Database> {
2596 self.db.as_ref().unwrap()
2597 }
2598 }
2599
2600 impl Drop for TestDb {
2601 fn drop(&mut self) {
2602 let db = self.db.take().unwrap();
2603 if let DatabaseBackend::Postgres = db.pool.get_database_backend() {
2604 db.runtime.as_ref().unwrap().block_on(async {
2605 use util::ResultExt;
2606 let query = "
2607 SELECT pg_terminate_backend(pg_stat_activity.pid)
2608 FROM pg_stat_activity
2609 WHERE
2610 pg_stat_activity.datname = current_database() AND
2611 pid <> pg_backend_pid();
2612 ";
2613 db.pool
2614 .execute(sea_orm::Statement::from_string(
2615 db.pool.get_database_backend(),
2616 query.into(),
2617 ))
2618 .await
2619 .log_err();
2620 sqlx::Postgres::drop_database(db.options.get_url())
2621 .await
2622 .log_err();
2623 })
2624 }
2625 }
2626 }
2627}