1mod access_token;
2mod contact;
3mod follower;
4mod language_server;
5mod project;
6mod project_collaborator;
7mod room;
8mod room_participant;
9mod server;
10mod signup;
11#[cfg(test)]
12mod tests;
13mod user;
14mod worktree;
15mod worktree_diagnostic_summary;
16mod worktree_entry;
17
18use crate::executor::Executor;
19use crate::{Error, Result};
20use anyhow::anyhow;
21use collections::{BTreeMap, HashMap, HashSet};
22pub use contact::Contact;
23use dashmap::DashMap;
24use futures::StreamExt;
25use hyper::StatusCode;
26use rand::prelude::StdRng;
27use rand::{Rng, SeedableRng};
28use rpc::{proto, ConnectionId};
29use sea_orm::Condition;
30pub use sea_orm::ConnectOptions;
31use sea_orm::{
32 entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
33 DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
34 Statement, TransactionTrait,
35};
36use sea_query::{Alias, Expr, OnConflict, Query};
37use serde::{Deserialize, Serialize};
38pub use signup::{Invite, NewSignup, WaitlistSummary};
39use sqlx::migrate::{Migrate, Migration, MigrationSource};
40use sqlx::Connection;
41use std::ops::{Deref, DerefMut};
42use std::path::Path;
43use std::time::Duration;
44use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
45use tokio::sync::{Mutex, OwnedMutexGuard};
46pub use user::Model as User;
47
48pub struct Database {
49 options: ConnectOptions,
50 pool: DatabaseConnection,
51 rooms: DashMap<RoomId, Arc<Mutex<()>>>,
52 rng: Mutex<StdRng>,
53 executor: Executor,
54 #[cfg(test)]
55 runtime: Option<tokio::runtime::Runtime>,
56}
57
58impl Database {
59 pub async fn new(options: ConnectOptions, executor: Executor) -> Result<Self> {
60 Ok(Self {
61 options: options.clone(),
62 pool: sea_orm::Database::connect(options).await?,
63 rooms: DashMap::with_capacity(16384),
64 rng: Mutex::new(StdRng::seed_from_u64(0)),
65 executor,
66 #[cfg(test)]
67 runtime: None,
68 })
69 }
70
71 #[cfg(test)]
72 pub fn reset(&self) {
73 self.rooms.clear();
74 }
75
76 pub async fn migrate(
77 &self,
78 migrations_path: &Path,
79 ignore_checksum_mismatch: bool,
80 ) -> anyhow::Result<Vec<(Migration, Duration)>> {
81 let migrations = MigrationSource::resolve(migrations_path)
82 .await
83 .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
84
85 let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
86
87 connection.ensure_migrations_table().await?;
88 let applied_migrations: HashMap<_, _> = connection
89 .list_applied_migrations()
90 .await?
91 .into_iter()
92 .map(|m| (m.version, m))
93 .collect();
94
95 let mut new_migrations = Vec::new();
96 for migration in migrations {
97 match applied_migrations.get(&migration.version) {
98 Some(applied_migration) => {
99 if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
100 {
101 Err(anyhow!(
102 "checksum mismatch for applied migration {}",
103 migration.description
104 ))?;
105 }
106 }
107 None => {
108 let elapsed = connection.apply(&migration).await?;
109 new_migrations.push((migration, elapsed));
110 }
111 }
112 }
113
114 Ok(new_migrations)
115 }
116
117 pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
118 self.transaction(|tx| async move {
119 let server = server::ActiveModel {
120 environment: ActiveValue::set(environment.into()),
121 ..Default::default()
122 }
123 .insert(&*tx)
124 .await?;
125 Ok(server.id)
126 })
127 .await
128 }
129
130 pub async fn stale_room_ids(
131 &self,
132 environment: &str,
133 new_server_id: ServerId,
134 ) -> Result<Vec<RoomId>> {
135 self.transaction(|tx| async move {
136 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
137 enum QueryAs {
138 RoomId,
139 }
140
141 let stale_server_epochs = self
142 .stale_server_ids(environment, new_server_id, &tx)
143 .await?;
144 Ok(room_participant::Entity::find()
145 .select_only()
146 .column(room_participant::Column::RoomId)
147 .distinct()
148 .filter(
149 room_participant::Column::AnsweringConnectionServerId
150 .is_in(stale_server_epochs),
151 )
152 .into_values::<_, QueryAs>()
153 .all(&*tx)
154 .await?)
155 })
156 .await
157 }
158
159 pub async fn refresh_room(
160 &self,
161 room_id: RoomId,
162 new_server_id: ServerId,
163 ) -> Result<RoomGuard<RefreshedRoom>> {
164 self.room_transaction(room_id, |tx| async move {
165 let stale_participant_filter = Condition::all()
166 .add(room_participant::Column::RoomId.eq(room_id))
167 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
168 .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
169
170 let stale_participant_user_ids = room_participant::Entity::find()
171 .filter(stale_participant_filter.clone())
172 .all(&*tx)
173 .await?
174 .into_iter()
175 .map(|participant| participant.user_id)
176 .collect::<Vec<_>>();
177
178 // Delete participants who failed to reconnect.
179 room_participant::Entity::delete_many()
180 .filter(stale_participant_filter)
181 .exec(&*tx)
182 .await?;
183
184 let room = self.get_room(room_id, &tx).await?;
185 let mut canceled_calls_to_user_ids = Vec::new();
186 // Delete the room if it becomes empty and cancel pending calls.
187 if room.participants.is_empty() {
188 canceled_calls_to_user_ids.extend(
189 room.pending_participants
190 .iter()
191 .map(|pending_participant| UserId::from_proto(pending_participant.user_id)),
192 );
193 room_participant::Entity::delete_many()
194 .filter(room_participant::Column::RoomId.eq(room_id))
195 .exec(&*tx)
196 .await?;
197 project::Entity::delete_many()
198 .filter(project::Column::RoomId.eq(room_id))
199 .exec(&*tx)
200 .await?;
201 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
202 }
203
204 Ok(RefreshedRoom {
205 room,
206 stale_participant_user_ids,
207 canceled_calls_to_user_ids,
208 })
209 })
210 .await
211 }
212
213 pub async fn delete_stale_servers(
214 &self,
215 environment: &str,
216 new_server_id: ServerId,
217 ) -> Result<()> {
218 self.transaction(|tx| async move {
219 server::Entity::delete_many()
220 .filter(
221 Condition::all()
222 .add(server::Column::Environment.eq(environment))
223 .add(server::Column::Id.ne(new_server_id)),
224 )
225 .exec(&*tx)
226 .await?;
227 Ok(())
228 })
229 .await
230 }
231
232 async fn stale_server_ids(
233 &self,
234 environment: &str,
235 new_server_id: ServerId,
236 tx: &DatabaseTransaction,
237 ) -> Result<Vec<ServerId>> {
238 let stale_servers = server::Entity::find()
239 .filter(
240 Condition::all()
241 .add(server::Column::Environment.eq(environment))
242 .add(server::Column::Id.ne(new_server_id)),
243 )
244 .all(&*tx)
245 .await?;
246 Ok(stale_servers.into_iter().map(|server| server.id).collect())
247 }
248
249 // users
250
251 pub async fn create_user(
252 &self,
253 email_address: &str,
254 admin: bool,
255 params: NewUserParams,
256 ) -> Result<NewUserResult> {
257 self.transaction(|tx| async {
258 let tx = tx;
259 let user = user::Entity::insert(user::ActiveModel {
260 email_address: ActiveValue::set(Some(email_address.into())),
261 github_login: ActiveValue::set(params.github_login.clone()),
262 github_user_id: ActiveValue::set(Some(params.github_user_id)),
263 admin: ActiveValue::set(admin),
264 metrics_id: ActiveValue::set(Uuid::new_v4()),
265 ..Default::default()
266 })
267 .on_conflict(
268 OnConflict::column(user::Column::GithubLogin)
269 .update_column(user::Column::GithubLogin)
270 .to_owned(),
271 )
272 .exec_with_returning(&*tx)
273 .await?;
274
275 Ok(NewUserResult {
276 user_id: user.id,
277 metrics_id: user.metrics_id.to_string(),
278 signup_device_id: None,
279 inviting_user_id: None,
280 })
281 })
282 .await
283 }
284
285 pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
286 self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
287 .await
288 }
289
290 pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
291 self.transaction(|tx| async {
292 let tx = tx;
293 Ok(user::Entity::find()
294 .filter(user::Column::Id.is_in(ids.iter().copied()))
295 .all(&*tx)
296 .await?)
297 })
298 .await
299 }
300
301 pub async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>> {
302 self.transaction(|tx| async move {
303 Ok(user::Entity::find()
304 .filter(user::Column::GithubLogin.eq(github_login))
305 .one(&*tx)
306 .await?)
307 })
308 .await
309 }
310
311 pub async fn get_or_create_user_by_github_account(
312 &self,
313 github_login: &str,
314 github_user_id: Option<i32>,
315 github_email: Option<&str>,
316 ) -> Result<Option<User>> {
317 self.transaction(|tx| async move {
318 let tx = &*tx;
319 if let Some(github_user_id) = github_user_id {
320 if let Some(user_by_github_user_id) = user::Entity::find()
321 .filter(user::Column::GithubUserId.eq(github_user_id))
322 .one(tx)
323 .await?
324 {
325 let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
326 user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
327 Ok(Some(user_by_github_user_id.update(tx).await?))
328 } else if let Some(user_by_github_login) = user::Entity::find()
329 .filter(user::Column::GithubLogin.eq(github_login))
330 .one(tx)
331 .await?
332 {
333 let mut user_by_github_login = user_by_github_login.into_active_model();
334 user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
335 Ok(Some(user_by_github_login.update(tx).await?))
336 } else {
337 let user = user::Entity::insert(user::ActiveModel {
338 email_address: ActiveValue::set(github_email.map(|email| email.into())),
339 github_login: ActiveValue::set(github_login.into()),
340 github_user_id: ActiveValue::set(Some(github_user_id)),
341 admin: ActiveValue::set(false),
342 invite_count: ActiveValue::set(0),
343 invite_code: ActiveValue::set(None),
344 metrics_id: ActiveValue::set(Uuid::new_v4()),
345 ..Default::default()
346 })
347 .exec_with_returning(&*tx)
348 .await?;
349 Ok(Some(user))
350 }
351 } else {
352 Ok(user::Entity::find()
353 .filter(user::Column::GithubLogin.eq(github_login))
354 .one(tx)
355 .await?)
356 }
357 })
358 .await
359 }
360
361 pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
362 self.transaction(|tx| async move {
363 Ok(user::Entity::find()
364 .order_by_asc(user::Column::GithubLogin)
365 .limit(limit as u64)
366 .offset(page as u64 * limit as u64)
367 .all(&*tx)
368 .await?)
369 })
370 .await
371 }
372
373 pub async fn get_users_with_no_invites(
374 &self,
375 invited_by_another_user: bool,
376 ) -> Result<Vec<User>> {
377 self.transaction(|tx| async move {
378 Ok(user::Entity::find()
379 .filter(
380 user::Column::InviteCount
381 .eq(0)
382 .and(if invited_by_another_user {
383 user::Column::InviterId.is_not_null()
384 } else {
385 user::Column::InviterId.is_null()
386 }),
387 )
388 .all(&*tx)
389 .await?)
390 })
391 .await
392 }
393
394 pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
395 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
396 enum QueryAs {
397 MetricsId,
398 }
399
400 self.transaction(|tx| async move {
401 let metrics_id: Uuid = user::Entity::find_by_id(id)
402 .select_only()
403 .column(user::Column::MetricsId)
404 .into_values::<_, QueryAs>()
405 .one(&*tx)
406 .await?
407 .ok_or_else(|| anyhow!("could not find user"))?;
408 Ok(metrics_id.to_string())
409 })
410 .await
411 }
412
413 pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
414 self.transaction(|tx| async move {
415 user::Entity::update_many()
416 .filter(user::Column::Id.eq(id))
417 .set(user::ActiveModel {
418 admin: ActiveValue::set(is_admin),
419 ..Default::default()
420 })
421 .exec(&*tx)
422 .await?;
423 Ok(())
424 })
425 .await
426 }
427
428 pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
429 self.transaction(|tx| async move {
430 user::Entity::update_many()
431 .filter(user::Column::Id.eq(id))
432 .set(user::ActiveModel {
433 connected_once: ActiveValue::set(connected_once),
434 ..Default::default()
435 })
436 .exec(&*tx)
437 .await?;
438 Ok(())
439 })
440 .await
441 }
442
443 pub async fn destroy_user(&self, id: UserId) -> Result<()> {
444 self.transaction(|tx| async move {
445 access_token::Entity::delete_many()
446 .filter(access_token::Column::UserId.eq(id))
447 .exec(&*tx)
448 .await?;
449 user::Entity::delete_by_id(id).exec(&*tx).await?;
450 Ok(())
451 })
452 .await
453 }
454
455 // contacts
456
457 pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
458 #[derive(Debug, FromQueryResult)]
459 struct ContactWithUserBusyStatuses {
460 user_id_a: UserId,
461 user_id_b: UserId,
462 a_to_b: bool,
463 accepted: bool,
464 should_notify: bool,
465 user_a_busy: bool,
466 user_b_busy: bool,
467 }
468
469 self.transaction(|tx| async move {
470 let user_a_participant = Alias::new("user_a_participant");
471 let user_b_participant = Alias::new("user_b_participant");
472 let mut db_contacts = contact::Entity::find()
473 .column_as(
474 Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
475 .is_not_null(),
476 "user_a_busy",
477 )
478 .column_as(
479 Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
480 .is_not_null(),
481 "user_b_busy",
482 )
483 .filter(
484 contact::Column::UserIdA
485 .eq(user_id)
486 .or(contact::Column::UserIdB.eq(user_id)),
487 )
488 .join_as(
489 JoinType::LeftJoin,
490 contact::Relation::UserARoomParticipant.def(),
491 user_a_participant,
492 )
493 .join_as(
494 JoinType::LeftJoin,
495 contact::Relation::UserBRoomParticipant.def(),
496 user_b_participant,
497 )
498 .into_model::<ContactWithUserBusyStatuses>()
499 .stream(&*tx)
500 .await?;
501
502 let mut contacts = Vec::new();
503 while let Some(db_contact) = db_contacts.next().await {
504 let db_contact = db_contact?;
505 if db_contact.user_id_a == user_id {
506 if db_contact.accepted {
507 contacts.push(Contact::Accepted {
508 user_id: db_contact.user_id_b,
509 should_notify: db_contact.should_notify && db_contact.a_to_b,
510 busy: db_contact.user_b_busy,
511 });
512 } else if db_contact.a_to_b {
513 contacts.push(Contact::Outgoing {
514 user_id: db_contact.user_id_b,
515 })
516 } else {
517 contacts.push(Contact::Incoming {
518 user_id: db_contact.user_id_b,
519 should_notify: db_contact.should_notify,
520 });
521 }
522 } else if db_contact.accepted {
523 contacts.push(Contact::Accepted {
524 user_id: db_contact.user_id_a,
525 should_notify: db_contact.should_notify && !db_contact.a_to_b,
526 busy: db_contact.user_a_busy,
527 });
528 } else if db_contact.a_to_b {
529 contacts.push(Contact::Incoming {
530 user_id: db_contact.user_id_a,
531 should_notify: db_contact.should_notify,
532 });
533 } else {
534 contacts.push(Contact::Outgoing {
535 user_id: db_contact.user_id_a,
536 });
537 }
538 }
539
540 contacts.sort_unstable_by_key(|contact| contact.user_id());
541
542 Ok(contacts)
543 })
544 .await
545 }
546
547 pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
548 self.transaction(|tx| async move {
549 let participant = room_participant::Entity::find()
550 .filter(room_participant::Column::UserId.eq(user_id))
551 .one(&*tx)
552 .await?;
553 Ok(participant.is_some())
554 })
555 .await
556 }
557
558 pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
559 self.transaction(|tx| async move {
560 let (id_a, id_b) = if user_id_1 < user_id_2 {
561 (user_id_1, user_id_2)
562 } else {
563 (user_id_2, user_id_1)
564 };
565
566 Ok(contact::Entity::find()
567 .filter(
568 contact::Column::UserIdA
569 .eq(id_a)
570 .and(contact::Column::UserIdB.eq(id_b))
571 .and(contact::Column::Accepted.eq(true)),
572 )
573 .one(&*tx)
574 .await?
575 .is_some())
576 })
577 .await
578 }
579
580 pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
581 self.transaction(|tx| async move {
582 let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
583 (sender_id, receiver_id, true)
584 } else {
585 (receiver_id, sender_id, false)
586 };
587
588 let rows_affected = contact::Entity::insert(contact::ActiveModel {
589 user_id_a: ActiveValue::set(id_a),
590 user_id_b: ActiveValue::set(id_b),
591 a_to_b: ActiveValue::set(a_to_b),
592 accepted: ActiveValue::set(false),
593 should_notify: ActiveValue::set(true),
594 ..Default::default()
595 })
596 .on_conflict(
597 OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
598 .values([
599 (contact::Column::Accepted, true.into()),
600 (contact::Column::ShouldNotify, false.into()),
601 ])
602 .action_and_where(
603 contact::Column::Accepted.eq(false).and(
604 contact::Column::AToB
605 .eq(a_to_b)
606 .and(contact::Column::UserIdA.eq(id_b))
607 .or(contact::Column::AToB
608 .ne(a_to_b)
609 .and(contact::Column::UserIdA.eq(id_a))),
610 ),
611 )
612 .to_owned(),
613 )
614 .exec_without_returning(&*tx)
615 .await?;
616
617 if rows_affected == 1 {
618 Ok(())
619 } else {
620 Err(anyhow!("contact already requested"))?
621 }
622 })
623 .await
624 }
625
626 /// Returns a bool indicating whether the removed contact had originally accepted or not
627 ///
628 /// Deletes the contact identified by the requester and responder ids, and then returns
629 /// whether the deleted contact had originally accepted or was a pending contact request.
630 ///
631 /// # Arguments
632 ///
633 /// * `requester_id` - The user that initiates this request
634 /// * `responder_id` - The user that will be removed
635 pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<bool> {
636 self.transaction(|tx| async move {
637 let (id_a, id_b) = if responder_id < requester_id {
638 (responder_id, requester_id)
639 } else {
640 (requester_id, responder_id)
641 };
642
643 let contact = contact::Entity::find()
644 .filter(
645 contact::Column::UserIdA
646 .eq(id_a)
647 .and(contact::Column::UserIdB.eq(id_b)),
648 )
649 .one(&*tx)
650 .await?
651 .ok_or_else(|| anyhow!("no such contact"))?;
652
653 contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
654 Ok(contact.accepted)
655 })
656 .await
657 }
658
659 pub async fn dismiss_contact_notification(
660 &self,
661 user_id: UserId,
662 contact_user_id: UserId,
663 ) -> Result<()> {
664 self.transaction(|tx| async move {
665 let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
666 (user_id, contact_user_id, true)
667 } else {
668 (contact_user_id, user_id, false)
669 };
670
671 let result = contact::Entity::update_many()
672 .set(contact::ActiveModel {
673 should_notify: ActiveValue::set(false),
674 ..Default::default()
675 })
676 .filter(
677 contact::Column::UserIdA
678 .eq(id_a)
679 .and(contact::Column::UserIdB.eq(id_b))
680 .and(
681 contact::Column::AToB
682 .eq(a_to_b)
683 .and(contact::Column::Accepted.eq(true))
684 .or(contact::Column::AToB
685 .ne(a_to_b)
686 .and(contact::Column::Accepted.eq(false))),
687 ),
688 )
689 .exec(&*tx)
690 .await?;
691 if result.rows_affected == 0 {
692 Err(anyhow!("no such contact request"))?
693 } else {
694 Ok(())
695 }
696 })
697 .await
698 }
699
700 pub async fn respond_to_contact_request(
701 &self,
702 responder_id: UserId,
703 requester_id: UserId,
704 accept: bool,
705 ) -> Result<()> {
706 self.transaction(|tx| async move {
707 let (id_a, id_b, a_to_b) = if responder_id < requester_id {
708 (responder_id, requester_id, false)
709 } else {
710 (requester_id, responder_id, true)
711 };
712 let rows_affected = if accept {
713 let result = contact::Entity::update_many()
714 .set(contact::ActiveModel {
715 accepted: ActiveValue::set(true),
716 should_notify: ActiveValue::set(true),
717 ..Default::default()
718 })
719 .filter(
720 contact::Column::UserIdA
721 .eq(id_a)
722 .and(contact::Column::UserIdB.eq(id_b))
723 .and(contact::Column::AToB.eq(a_to_b)),
724 )
725 .exec(&*tx)
726 .await?;
727 result.rows_affected
728 } else {
729 let result = contact::Entity::delete_many()
730 .filter(
731 contact::Column::UserIdA
732 .eq(id_a)
733 .and(contact::Column::UserIdB.eq(id_b))
734 .and(contact::Column::AToB.eq(a_to_b))
735 .and(contact::Column::Accepted.eq(false)),
736 )
737 .exec(&*tx)
738 .await?;
739
740 result.rows_affected
741 };
742
743 if rows_affected == 1 {
744 Ok(())
745 } else {
746 Err(anyhow!("no such contact request"))?
747 }
748 })
749 .await
750 }
751
752 pub fn fuzzy_like_string(string: &str) -> String {
753 let mut result = String::with_capacity(string.len() * 2 + 1);
754 for c in string.chars() {
755 if c.is_alphanumeric() {
756 result.push('%');
757 result.push(c);
758 }
759 }
760 result.push('%');
761 result
762 }
763
764 pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
765 self.transaction(|tx| async {
766 let tx = tx;
767 let like_string = Self::fuzzy_like_string(name_query);
768 let query = "
769 SELECT users.*
770 FROM users
771 WHERE github_login ILIKE $1
772 ORDER BY github_login <-> $2
773 LIMIT $3
774 ";
775
776 Ok(user::Entity::find()
777 .from_raw_sql(Statement::from_sql_and_values(
778 self.pool.get_database_backend(),
779 query.into(),
780 vec![like_string.into(), name_query.into(), limit.into()],
781 ))
782 .all(&*tx)
783 .await?)
784 })
785 .await
786 }
787
788 // signups
789
790 pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
791 self.transaction(|tx| async move {
792 signup::Entity::insert(signup::ActiveModel {
793 email_address: ActiveValue::set(signup.email_address.clone()),
794 email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
795 email_confirmation_sent: ActiveValue::set(false),
796 platform_mac: ActiveValue::set(signup.platform_mac),
797 platform_windows: ActiveValue::set(signup.platform_windows),
798 platform_linux: ActiveValue::set(signup.platform_linux),
799 platform_unknown: ActiveValue::set(false),
800 editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
801 programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
802 device_id: ActiveValue::set(signup.device_id.clone()),
803 added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
804 ..Default::default()
805 })
806 .on_conflict(
807 OnConflict::column(signup::Column::EmailAddress)
808 .update_columns([
809 signup::Column::PlatformMac,
810 signup::Column::PlatformWindows,
811 signup::Column::PlatformLinux,
812 signup::Column::EditorFeatures,
813 signup::Column::ProgrammingLanguages,
814 signup::Column::DeviceId,
815 signup::Column::AddedToMailingList,
816 ])
817 .to_owned(),
818 )
819 .exec(&*tx)
820 .await?;
821 Ok(())
822 })
823 .await
824 }
825
826 pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
827 self.transaction(|tx| async move {
828 let signup = signup::Entity::find()
829 .filter(signup::Column::EmailAddress.eq(email_address))
830 .one(&*tx)
831 .await?
832 .ok_or_else(|| {
833 anyhow!("signup with email address {} doesn't exist", email_address)
834 })?;
835
836 Ok(signup)
837 })
838 .await
839 }
840
841 pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
842 self.transaction(|tx| async move {
843 let query = "
844 SELECT
845 COUNT(*) as count,
846 COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
847 COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
848 COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
849 COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
850 FROM (
851 SELECT *
852 FROM signups
853 WHERE
854 NOT email_confirmation_sent
855 ) AS unsent
856 ";
857 Ok(
858 WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
859 self.pool.get_database_backend(),
860 query.into(),
861 vec![],
862 ))
863 .one(&*tx)
864 .await?
865 .ok_or_else(|| anyhow!("invalid result"))?,
866 )
867 })
868 .await
869 }
870
871 pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
872 let emails = invites
873 .iter()
874 .map(|s| s.email_address.as_str())
875 .collect::<Vec<_>>();
876 self.transaction(|tx| async {
877 let tx = tx;
878 signup::Entity::update_many()
879 .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
880 .set(signup::ActiveModel {
881 email_confirmation_sent: ActiveValue::set(true),
882 ..Default::default()
883 })
884 .exec(&*tx)
885 .await?;
886 Ok(())
887 })
888 .await
889 }
890
891 pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
892 self.transaction(|tx| async move {
893 Ok(signup::Entity::find()
894 .select_only()
895 .column(signup::Column::EmailAddress)
896 .column(signup::Column::EmailConfirmationCode)
897 .filter(
898 signup::Column::EmailConfirmationSent.eq(false).and(
899 signup::Column::PlatformMac
900 .eq(true)
901 .or(signup::Column::PlatformUnknown.eq(true)),
902 ),
903 )
904 .order_by_asc(signup::Column::CreatedAt)
905 .limit(count as u64)
906 .into_model()
907 .all(&*tx)
908 .await?)
909 })
910 .await
911 }
912
913 // invite codes
914
915 pub async fn create_invite_from_code(
916 &self,
917 code: &str,
918 email_address: &str,
919 device_id: Option<&str>,
920 added_to_mailing_list: bool,
921 ) -> Result<Invite> {
922 self.transaction(|tx| async move {
923 let existing_user = user::Entity::find()
924 .filter(user::Column::EmailAddress.eq(email_address))
925 .one(&*tx)
926 .await?;
927
928 if existing_user.is_some() {
929 Err(anyhow!("email address is already in use"))?;
930 }
931
932 let inviting_user_with_invites = match user::Entity::find()
933 .filter(
934 user::Column::InviteCode
935 .eq(code)
936 .and(user::Column::InviteCount.gt(0)),
937 )
938 .one(&*tx)
939 .await?
940 {
941 Some(inviting_user) => inviting_user,
942 None => {
943 return Err(Error::Http(
944 StatusCode::UNAUTHORIZED,
945 "unable to find an invite code with invites remaining".to_string(),
946 ))?
947 }
948 };
949 user::Entity::update_many()
950 .filter(
951 user::Column::Id
952 .eq(inviting_user_with_invites.id)
953 .and(user::Column::InviteCount.gt(0)),
954 )
955 .col_expr(
956 user::Column::InviteCount,
957 Expr::col(user::Column::InviteCount).sub(1),
958 )
959 .exec(&*tx)
960 .await?;
961
962 let signup = signup::Entity::insert(signup::ActiveModel {
963 email_address: ActiveValue::set(email_address.into()),
964 email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
965 email_confirmation_sent: ActiveValue::set(false),
966 inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
967 platform_linux: ActiveValue::set(false),
968 platform_mac: ActiveValue::set(false),
969 platform_windows: ActiveValue::set(false),
970 platform_unknown: ActiveValue::set(true),
971 device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
972 added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
973 ..Default::default()
974 })
975 .on_conflict(
976 OnConflict::column(signup::Column::EmailAddress)
977 .update_column(signup::Column::InvitingUserId)
978 .to_owned(),
979 )
980 .exec_with_returning(&*tx)
981 .await?;
982
983 Ok(Invite {
984 email_address: signup.email_address,
985 email_confirmation_code: signup.email_confirmation_code,
986 })
987 })
988 .await
989 }
990
991 pub async fn create_user_from_invite(
992 &self,
993 invite: &Invite,
994 user: NewUserParams,
995 ) -> Result<Option<NewUserResult>> {
996 self.transaction(|tx| async {
997 let tx = tx;
998 let signup = signup::Entity::find()
999 .filter(
1000 signup::Column::EmailAddress
1001 .eq(invite.email_address.as_str())
1002 .and(
1003 signup::Column::EmailConfirmationCode
1004 .eq(invite.email_confirmation_code.as_str()),
1005 ),
1006 )
1007 .one(&*tx)
1008 .await?
1009 .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
1010
1011 if signup.user_id.is_some() {
1012 return Ok(None);
1013 }
1014
1015 let user = user::Entity::insert(user::ActiveModel {
1016 email_address: ActiveValue::set(Some(invite.email_address.clone())),
1017 github_login: ActiveValue::set(user.github_login.clone()),
1018 github_user_id: ActiveValue::set(Some(user.github_user_id)),
1019 admin: ActiveValue::set(false),
1020 invite_count: ActiveValue::set(user.invite_count),
1021 invite_code: ActiveValue::set(Some(random_invite_code())),
1022 metrics_id: ActiveValue::set(Uuid::new_v4()),
1023 ..Default::default()
1024 })
1025 .on_conflict(
1026 OnConflict::column(user::Column::GithubLogin)
1027 .update_columns([
1028 user::Column::EmailAddress,
1029 user::Column::GithubUserId,
1030 user::Column::Admin,
1031 ])
1032 .to_owned(),
1033 )
1034 .exec_with_returning(&*tx)
1035 .await?;
1036
1037 let mut signup = signup.into_active_model();
1038 signup.user_id = ActiveValue::set(Some(user.id));
1039 let signup = signup.update(&*tx).await?;
1040
1041 if let Some(inviting_user_id) = signup.inviting_user_id {
1042 let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1043 (inviting_user_id, user.id, true)
1044 } else {
1045 (user.id, inviting_user_id, false)
1046 };
1047
1048 contact::Entity::insert(contact::ActiveModel {
1049 user_id_a: ActiveValue::set(user_id_a),
1050 user_id_b: ActiveValue::set(user_id_b),
1051 a_to_b: ActiveValue::set(a_to_b),
1052 should_notify: ActiveValue::set(true),
1053 accepted: ActiveValue::set(true),
1054 ..Default::default()
1055 })
1056 .on_conflict(OnConflict::new().do_nothing().to_owned())
1057 .exec_without_returning(&*tx)
1058 .await?;
1059 }
1060
1061 Ok(Some(NewUserResult {
1062 user_id: user.id,
1063 metrics_id: user.metrics_id.to_string(),
1064 inviting_user_id: signup.inviting_user_id,
1065 signup_device_id: signup.device_id,
1066 }))
1067 })
1068 .await
1069 }
1070
1071 pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1072 self.transaction(|tx| async move {
1073 if count > 0 {
1074 user::Entity::update_many()
1075 .filter(
1076 user::Column::Id
1077 .eq(id)
1078 .and(user::Column::InviteCode.is_null()),
1079 )
1080 .set(user::ActiveModel {
1081 invite_code: ActiveValue::set(Some(random_invite_code())),
1082 ..Default::default()
1083 })
1084 .exec(&*tx)
1085 .await?;
1086 }
1087
1088 user::Entity::update_many()
1089 .filter(user::Column::Id.eq(id))
1090 .set(user::ActiveModel {
1091 invite_count: ActiveValue::set(count),
1092 ..Default::default()
1093 })
1094 .exec(&*tx)
1095 .await?;
1096 Ok(())
1097 })
1098 .await
1099 }
1100
1101 pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1102 self.transaction(|tx| async move {
1103 match user::Entity::find_by_id(id).one(&*tx).await? {
1104 Some(user) if user.invite_code.is_some() => {
1105 Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1106 }
1107 _ => Ok(None),
1108 }
1109 })
1110 .await
1111 }
1112
1113 pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1114 self.transaction(|tx| async move {
1115 user::Entity::find()
1116 .filter(user::Column::InviteCode.eq(code))
1117 .one(&*tx)
1118 .await?
1119 .ok_or_else(|| {
1120 Error::Http(
1121 StatusCode::NOT_FOUND,
1122 "that invite code does not exist".to_string(),
1123 )
1124 })
1125 })
1126 .await
1127 }
1128
1129 // rooms
1130
1131 pub async fn incoming_call_for_user(
1132 &self,
1133 user_id: UserId,
1134 ) -> Result<Option<proto::IncomingCall>> {
1135 self.transaction(|tx| async move {
1136 let pending_participant = room_participant::Entity::find()
1137 .filter(
1138 room_participant::Column::UserId
1139 .eq(user_id)
1140 .and(room_participant::Column::AnsweringConnectionId.is_null()),
1141 )
1142 .one(&*tx)
1143 .await?;
1144
1145 if let Some(pending_participant) = pending_participant {
1146 let room = self.get_room(pending_participant.room_id, &tx).await?;
1147 Ok(Self::build_incoming_call(&room, user_id))
1148 } else {
1149 Ok(None)
1150 }
1151 })
1152 .await
1153 }
1154
1155 pub async fn create_room(
1156 &self,
1157 user_id: UserId,
1158 connection: ConnectionId,
1159 live_kit_room: &str,
1160 ) -> Result<proto::Room> {
1161 self.transaction(|tx| async move {
1162 let room = room::ActiveModel {
1163 live_kit_room: ActiveValue::set(live_kit_room.into()),
1164 ..Default::default()
1165 }
1166 .insert(&*tx)
1167 .await?;
1168 room_participant::ActiveModel {
1169 room_id: ActiveValue::set(room.id),
1170 user_id: ActiveValue::set(user_id),
1171 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1172 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1173 connection.owner_id as i32,
1174 ))),
1175 answering_connection_lost: ActiveValue::set(false),
1176 calling_user_id: ActiveValue::set(user_id),
1177 calling_connection_id: ActiveValue::set(connection.id as i32),
1178 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1179 connection.owner_id as i32,
1180 ))),
1181 ..Default::default()
1182 }
1183 .insert(&*tx)
1184 .await?;
1185
1186 let room = self.get_room(room.id, &tx).await?;
1187 Ok(room)
1188 })
1189 .await
1190 }
1191
1192 pub async fn call(
1193 &self,
1194 room_id: RoomId,
1195 calling_user_id: UserId,
1196 calling_connection: ConnectionId,
1197 called_user_id: UserId,
1198 initial_project_id: Option<ProjectId>,
1199 ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1200 self.room_transaction(room_id, |tx| async move {
1201 room_participant::ActiveModel {
1202 room_id: ActiveValue::set(room_id),
1203 user_id: ActiveValue::set(called_user_id),
1204 answering_connection_lost: ActiveValue::set(false),
1205 calling_user_id: ActiveValue::set(calling_user_id),
1206 calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1207 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1208 calling_connection.owner_id as i32,
1209 ))),
1210 initial_project_id: ActiveValue::set(initial_project_id),
1211 ..Default::default()
1212 }
1213 .insert(&*tx)
1214 .await?;
1215
1216 let room = self.get_room(room_id, &tx).await?;
1217 let incoming_call = Self::build_incoming_call(&room, called_user_id)
1218 .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1219 Ok((room, incoming_call))
1220 })
1221 .await
1222 }
1223
1224 pub async fn call_failed(
1225 &self,
1226 room_id: RoomId,
1227 called_user_id: UserId,
1228 ) -> Result<RoomGuard<proto::Room>> {
1229 self.room_transaction(room_id, |tx| async move {
1230 room_participant::Entity::delete_many()
1231 .filter(
1232 room_participant::Column::RoomId
1233 .eq(room_id)
1234 .and(room_participant::Column::UserId.eq(called_user_id)),
1235 )
1236 .exec(&*tx)
1237 .await?;
1238 let room = self.get_room(room_id, &tx).await?;
1239 Ok(room)
1240 })
1241 .await
1242 }
1243
1244 pub async fn decline_call(
1245 &self,
1246 expected_room_id: Option<RoomId>,
1247 user_id: UserId,
1248 ) -> Result<Option<RoomGuard<proto::Room>>> {
1249 self.optional_room_transaction(|tx| async move {
1250 let mut filter = Condition::all()
1251 .add(room_participant::Column::UserId.eq(user_id))
1252 .add(room_participant::Column::AnsweringConnectionId.is_null());
1253 if let Some(room_id) = expected_room_id {
1254 filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1255 }
1256 let participant = room_participant::Entity::find()
1257 .filter(filter)
1258 .one(&*tx)
1259 .await?;
1260
1261 let participant = if let Some(participant) = participant {
1262 participant
1263 } else if expected_room_id.is_some() {
1264 return Err(anyhow!("could not find call to decline"))?;
1265 } else {
1266 return Ok(None);
1267 };
1268
1269 let room_id = participant.room_id;
1270 room_participant::Entity::delete(participant.into_active_model())
1271 .exec(&*tx)
1272 .await?;
1273
1274 let room = self.get_room(room_id, &tx).await?;
1275 Ok(Some((room_id, room)))
1276 })
1277 .await
1278 }
1279
1280 pub async fn cancel_call(
1281 &self,
1282 room_id: RoomId,
1283 calling_connection: ConnectionId,
1284 called_user_id: UserId,
1285 ) -> Result<RoomGuard<proto::Room>> {
1286 self.room_transaction(room_id, |tx| async move {
1287 let participant = room_participant::Entity::find()
1288 .filter(
1289 Condition::all()
1290 .add(room_participant::Column::UserId.eq(called_user_id))
1291 .add(room_participant::Column::RoomId.eq(room_id))
1292 .add(
1293 room_participant::Column::CallingConnectionId
1294 .eq(calling_connection.id as i32),
1295 )
1296 .add(
1297 room_participant::Column::CallingConnectionServerId
1298 .eq(calling_connection.owner_id as i32),
1299 )
1300 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1301 )
1302 .one(&*tx)
1303 .await?
1304 .ok_or_else(|| anyhow!("no call to cancel"))?;
1305
1306 room_participant::Entity::delete(participant.into_active_model())
1307 .exec(&*tx)
1308 .await?;
1309
1310 let room = self.get_room(room_id, &tx).await?;
1311 Ok(room)
1312 })
1313 .await
1314 }
1315
1316 pub async fn join_room(
1317 &self,
1318 room_id: RoomId,
1319 user_id: UserId,
1320 connection: ConnectionId,
1321 ) -> Result<RoomGuard<proto::Room>> {
1322 self.room_transaction(room_id, |tx| async move {
1323 let result = room_participant::Entity::update_many()
1324 .filter(
1325 Condition::all()
1326 .add(room_participant::Column::RoomId.eq(room_id))
1327 .add(room_participant::Column::UserId.eq(user_id))
1328 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1329 )
1330 .set(room_participant::ActiveModel {
1331 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1332 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1333 connection.owner_id as i32,
1334 ))),
1335 answering_connection_lost: ActiveValue::set(false),
1336 ..Default::default()
1337 })
1338 .exec(&*tx)
1339 .await?;
1340 if result.rows_affected == 0 {
1341 Err(anyhow!("room does not exist or was already joined"))?
1342 } else {
1343 let room = self.get_room(room_id, &tx).await?;
1344 Ok(room)
1345 }
1346 })
1347 .await
1348 }
1349
1350 pub async fn rejoin_room(
1351 &self,
1352 rejoin_room: proto::RejoinRoom,
1353 user_id: UserId,
1354 connection: ConnectionId,
1355 ) -> Result<RoomGuard<RejoinedRoom>> {
1356 let room_id = RoomId::from_proto(rejoin_room.id);
1357 self.room_transaction(room_id, |tx| async {
1358 let tx = tx;
1359 let participant_update = room_participant::Entity::update_many()
1360 .filter(
1361 Condition::all()
1362 .add(room_participant::Column::RoomId.eq(room_id))
1363 .add(room_participant::Column::UserId.eq(user_id))
1364 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1365 .add(
1366 Condition::any()
1367 .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1368 .add(
1369 room_participant::Column::AnsweringConnectionServerId
1370 .ne(connection.owner_id as i32),
1371 ),
1372 ),
1373 )
1374 .set(room_participant::ActiveModel {
1375 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1376 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1377 connection.owner_id as i32,
1378 ))),
1379 answering_connection_lost: ActiveValue::set(false),
1380 ..Default::default()
1381 })
1382 .exec(&*tx)
1383 .await?;
1384 if participant_update.rows_affected == 0 {
1385 return Err(anyhow!("room does not exist or was already joined"))?;
1386 }
1387
1388 let mut reshared_projects = Vec::new();
1389 for reshared_project in &rejoin_room.reshared_projects {
1390 let project_id = ProjectId::from_proto(reshared_project.project_id);
1391 let project = project::Entity::find_by_id(project_id)
1392 .one(&*tx)
1393 .await?
1394 .ok_or_else(|| anyhow!("project does not exist"))?;
1395 if project.host_user_id != user_id {
1396 return Err(anyhow!("no such project"))?;
1397 }
1398
1399 let mut collaborators = project
1400 .find_related(project_collaborator::Entity)
1401 .all(&*tx)
1402 .await?;
1403 let host_ix = collaborators
1404 .iter()
1405 .position(|collaborator| {
1406 collaborator.user_id == user_id && collaborator.is_host
1407 })
1408 .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1409 let host = collaborators.swap_remove(host_ix);
1410 let old_connection_id = host.connection();
1411
1412 project::Entity::update(project::ActiveModel {
1413 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1414 host_connection_server_id: ActiveValue::set(Some(ServerId(
1415 connection.owner_id as i32,
1416 ))),
1417 ..project.into_active_model()
1418 })
1419 .exec(&*tx)
1420 .await?;
1421 project_collaborator::Entity::update(project_collaborator::ActiveModel {
1422 connection_id: ActiveValue::set(connection.id as i32),
1423 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1424 ..host.into_active_model()
1425 })
1426 .exec(&*tx)
1427 .await?;
1428
1429 self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1430 .await?;
1431
1432 reshared_projects.push(ResharedProject {
1433 id: project_id,
1434 old_connection_id,
1435 collaborators: collaborators
1436 .iter()
1437 .map(|collaborator| ProjectCollaborator {
1438 connection_id: collaborator.connection(),
1439 user_id: collaborator.user_id,
1440 replica_id: collaborator.replica_id,
1441 is_host: collaborator.is_host,
1442 })
1443 .collect(),
1444 worktrees: reshared_project.worktrees.clone(),
1445 });
1446 }
1447
1448 project::Entity::delete_many()
1449 .filter(
1450 Condition::all()
1451 .add(project::Column::RoomId.eq(room_id))
1452 .add(project::Column::HostUserId.eq(user_id))
1453 .add(
1454 project::Column::Id
1455 .is_not_in(reshared_projects.iter().map(|project| project.id)),
1456 ),
1457 )
1458 .exec(&*tx)
1459 .await?;
1460
1461 let mut rejoined_projects = Vec::new();
1462 for rejoined_project in &rejoin_room.rejoined_projects {
1463 let project_id = ProjectId::from_proto(rejoined_project.id);
1464 let Some(project) = project::Entity::find_by_id(project_id)
1465 .one(&*tx)
1466 .await? else { continue };
1467
1468 let mut worktrees = Vec::new();
1469 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1470 for db_worktree in db_worktrees {
1471 let mut worktree = RejoinedWorktree {
1472 id: db_worktree.id as u64,
1473 abs_path: db_worktree.abs_path,
1474 root_name: db_worktree.root_name,
1475 visible: db_worktree.visible,
1476 updated_entries: Default::default(),
1477 removed_entries: Default::default(),
1478 diagnostic_summaries: Default::default(),
1479 scan_id: db_worktree.scan_id as u64,
1480 completed_scan_id: db_worktree.completed_scan_id as u64,
1481 };
1482
1483 let rejoined_worktree = rejoined_project
1484 .worktrees
1485 .iter()
1486 .find(|worktree| worktree.id == db_worktree.id as u64);
1487 let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1488 worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1489 } else {
1490 worktree_entry::Column::IsDeleted.eq(false)
1491 };
1492
1493 let mut db_entries = worktree_entry::Entity::find()
1494 .filter(
1495 Condition::all()
1496 .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1497 .add(entry_filter),
1498 )
1499 .stream(&*tx)
1500 .await?;
1501
1502 while let Some(db_entry) = db_entries.next().await {
1503 let db_entry = db_entry?;
1504 if db_entry.is_deleted {
1505 worktree.removed_entries.push(db_entry.id as u64);
1506 } else {
1507 worktree.updated_entries.push(proto::Entry {
1508 id: db_entry.id as u64,
1509 is_dir: db_entry.is_dir,
1510 path: db_entry.path,
1511 inode: db_entry.inode as u64,
1512 mtime: Some(proto::Timestamp {
1513 seconds: db_entry.mtime_seconds as u64,
1514 nanos: db_entry.mtime_nanos as u32,
1515 }),
1516 is_symlink: db_entry.is_symlink,
1517 is_ignored: db_entry.is_ignored,
1518 });
1519 }
1520 }
1521
1522 worktrees.push(worktree);
1523 }
1524
1525 let language_servers = project
1526 .find_related(language_server::Entity)
1527 .all(&*tx)
1528 .await?
1529 .into_iter()
1530 .map(|language_server| proto::LanguageServer {
1531 id: language_server.id as u64,
1532 name: language_server.name,
1533 })
1534 .collect::<Vec<_>>();
1535
1536 let mut collaborators = project
1537 .find_related(project_collaborator::Entity)
1538 .all(&*tx)
1539 .await?;
1540 let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1541 .iter()
1542 .position(|collaborator| collaborator.user_id == user_id)
1543 {
1544 collaborators.swap_remove(self_collaborator_ix)
1545 } else {
1546 continue;
1547 };
1548 let old_connection_id = self_collaborator.connection();
1549 project_collaborator::Entity::update(project_collaborator::ActiveModel {
1550 connection_id: ActiveValue::set(connection.id as i32),
1551 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1552 ..self_collaborator.into_active_model()
1553 })
1554 .exec(&*tx)
1555 .await?;
1556
1557 let collaborators = collaborators
1558 .into_iter()
1559 .map(|collaborator| ProjectCollaborator {
1560 connection_id: collaborator.connection(),
1561 user_id: collaborator.user_id,
1562 replica_id: collaborator.replica_id,
1563 is_host: collaborator.is_host,
1564 })
1565 .collect::<Vec<_>>();
1566
1567 rejoined_projects.push(RejoinedProject {
1568 id: project_id,
1569 old_connection_id,
1570 collaborators,
1571 worktrees,
1572 language_servers,
1573 });
1574 }
1575
1576 let room = self.get_room(room_id, &tx).await?;
1577 Ok(RejoinedRoom {
1578 room,
1579 rejoined_projects,
1580 reshared_projects,
1581 })
1582 })
1583 .await
1584 }
1585
1586 pub async fn leave_room(
1587 &self,
1588 connection: ConnectionId,
1589 ) -> Result<Option<RoomGuard<LeftRoom>>> {
1590 self.optional_room_transaction(|tx| async move {
1591 let leaving_participant = room_participant::Entity::find()
1592 .filter(
1593 Condition::all()
1594 .add(
1595 room_participant::Column::AnsweringConnectionId
1596 .eq(connection.id as i32),
1597 )
1598 .add(
1599 room_participant::Column::AnsweringConnectionServerId
1600 .eq(connection.owner_id as i32),
1601 ),
1602 )
1603 .one(&*tx)
1604 .await?;
1605
1606 if let Some(leaving_participant) = leaving_participant {
1607 // Leave room.
1608 let room_id = leaving_participant.room_id;
1609 room_participant::Entity::delete_by_id(leaving_participant.id)
1610 .exec(&*tx)
1611 .await?;
1612
1613 // Cancel pending calls initiated by the leaving user.
1614 let called_participants = room_participant::Entity::find()
1615 .filter(
1616 Condition::all()
1617 .add(
1618 room_participant::Column::CallingUserId
1619 .eq(leaving_participant.user_id),
1620 )
1621 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1622 )
1623 .all(&*tx)
1624 .await?;
1625 room_participant::Entity::delete_many()
1626 .filter(
1627 room_participant::Column::Id
1628 .is_in(called_participants.iter().map(|participant| participant.id)),
1629 )
1630 .exec(&*tx)
1631 .await?;
1632 let canceled_calls_to_user_ids = called_participants
1633 .into_iter()
1634 .map(|participant| participant.user_id)
1635 .collect();
1636
1637 // Detect left projects.
1638 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1639 enum QueryProjectIds {
1640 ProjectId,
1641 }
1642 let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1643 .select_only()
1644 .column_as(
1645 project_collaborator::Column::ProjectId,
1646 QueryProjectIds::ProjectId,
1647 )
1648 .filter(
1649 Condition::all()
1650 .add(
1651 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1652 )
1653 .add(
1654 project_collaborator::Column::ConnectionServerId
1655 .eq(connection.owner_id as i32),
1656 ),
1657 )
1658 .into_values::<_, QueryProjectIds>()
1659 .all(&*tx)
1660 .await?;
1661 let mut left_projects = HashMap::default();
1662 let mut collaborators = project_collaborator::Entity::find()
1663 .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1664 .stream(&*tx)
1665 .await?;
1666 while let Some(collaborator) = collaborators.next().await {
1667 let collaborator = collaborator?;
1668 let left_project =
1669 left_projects
1670 .entry(collaborator.project_id)
1671 .or_insert(LeftProject {
1672 id: collaborator.project_id,
1673 host_user_id: Default::default(),
1674 connection_ids: Default::default(),
1675 host_connection_id: Default::default(),
1676 });
1677
1678 let collaborator_connection_id = collaborator.connection();
1679 if collaborator_connection_id != connection {
1680 left_project.connection_ids.push(collaborator_connection_id);
1681 }
1682
1683 if collaborator.is_host {
1684 left_project.host_user_id = collaborator.user_id;
1685 left_project.host_connection_id = collaborator_connection_id;
1686 }
1687 }
1688 drop(collaborators);
1689
1690 // Leave projects.
1691 project_collaborator::Entity::delete_many()
1692 .filter(
1693 Condition::all()
1694 .add(
1695 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1696 )
1697 .add(
1698 project_collaborator::Column::ConnectionServerId
1699 .eq(connection.owner_id as i32),
1700 ),
1701 )
1702 .exec(&*tx)
1703 .await?;
1704
1705 // Unshare projects.
1706 project::Entity::delete_many()
1707 .filter(
1708 Condition::all()
1709 .add(project::Column::RoomId.eq(room_id))
1710 .add(project::Column::HostConnectionId.eq(connection.id as i32))
1711 .add(
1712 project::Column::HostConnectionServerId
1713 .eq(connection.owner_id as i32),
1714 ),
1715 )
1716 .exec(&*tx)
1717 .await?;
1718
1719 let room = self.get_room(room_id, &tx).await?;
1720 if room.participants.is_empty() {
1721 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
1722 }
1723
1724 let left_room = LeftRoom {
1725 room,
1726 left_projects,
1727 canceled_calls_to_user_ids,
1728 };
1729
1730 if left_room.room.participants.is_empty() {
1731 self.rooms.remove(&room_id);
1732 }
1733
1734 Ok(Some((room_id, left_room)))
1735 } else {
1736 Ok(None)
1737 }
1738 })
1739 .await
1740 }
1741
1742 pub async fn follow(
1743 &self,
1744 project_id: ProjectId,
1745 leader_connection: ConnectionId,
1746 follower_connection: ConnectionId,
1747 ) -> Result<RoomGuard<proto::Room>> {
1748 let room_id = self.room_id_for_project(project_id).await?;
1749 self.room_transaction(room_id, |tx| async move {
1750 follower::ActiveModel {
1751 room_id: ActiveValue::set(room_id),
1752 project_id: ActiveValue::set(project_id),
1753 leader_connection_server_id: ActiveValue::set(ServerId(
1754 leader_connection.owner_id as i32,
1755 )),
1756 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1757 follower_connection_server_id: ActiveValue::set(ServerId(
1758 follower_connection.owner_id as i32,
1759 )),
1760 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1761 ..Default::default()
1762 }
1763 .insert(&*tx)
1764 .await?;
1765
1766 let room = self.get_room(room_id, &*tx).await?;
1767 Ok(room)
1768 })
1769 .await
1770 }
1771
1772 pub async fn unfollow(
1773 &self,
1774 project_id: ProjectId,
1775 leader_connection: ConnectionId,
1776 follower_connection: ConnectionId,
1777 ) -> Result<RoomGuard<proto::Room>> {
1778 let room_id = self.room_id_for_project(project_id).await?;
1779 self.room_transaction(room_id, |tx| async move {
1780 follower::Entity::delete_many()
1781 .filter(
1782 Condition::all()
1783 .add(follower::Column::ProjectId.eq(project_id))
1784 .add(
1785 follower::Column::LeaderConnectionServerId
1786 .eq(leader_connection.owner_id),
1787 )
1788 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1789 .add(
1790 follower::Column::FollowerConnectionServerId
1791 .eq(follower_connection.owner_id),
1792 )
1793 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1794 )
1795 .exec(&*tx)
1796 .await?;
1797
1798 let room = self.get_room(room_id, &*tx).await?;
1799 Ok(room)
1800 })
1801 .await
1802 }
1803
1804 pub async fn update_room_participant_location(
1805 &self,
1806 room_id: RoomId,
1807 connection: ConnectionId,
1808 location: proto::ParticipantLocation,
1809 ) -> Result<RoomGuard<proto::Room>> {
1810 self.room_transaction(room_id, |tx| async {
1811 let tx = tx;
1812 let location_kind;
1813 let location_project_id;
1814 match location
1815 .variant
1816 .as_ref()
1817 .ok_or_else(|| anyhow!("invalid location"))?
1818 {
1819 proto::participant_location::Variant::SharedProject(project) => {
1820 location_kind = 0;
1821 location_project_id = Some(ProjectId::from_proto(project.id));
1822 }
1823 proto::participant_location::Variant::UnsharedProject(_) => {
1824 location_kind = 1;
1825 location_project_id = None;
1826 }
1827 proto::participant_location::Variant::External(_) => {
1828 location_kind = 2;
1829 location_project_id = None;
1830 }
1831 }
1832
1833 let result = room_participant::Entity::update_many()
1834 .filter(
1835 Condition::all()
1836 .add(room_participant::Column::RoomId.eq(room_id))
1837 .add(
1838 room_participant::Column::AnsweringConnectionId
1839 .eq(connection.id as i32),
1840 )
1841 .add(
1842 room_participant::Column::AnsweringConnectionServerId
1843 .eq(connection.owner_id as i32),
1844 ),
1845 )
1846 .set(room_participant::ActiveModel {
1847 location_kind: ActiveValue::set(Some(location_kind)),
1848 location_project_id: ActiveValue::set(location_project_id),
1849 ..Default::default()
1850 })
1851 .exec(&*tx)
1852 .await?;
1853
1854 if result.rows_affected == 1 {
1855 let room = self.get_room(room_id, &tx).await?;
1856 Ok(room)
1857 } else {
1858 Err(anyhow!("could not update room participant location"))?
1859 }
1860 })
1861 .await
1862 }
1863
1864 pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1865 self.transaction(|tx| async move {
1866 let participant = room_participant::Entity::find()
1867 .filter(
1868 Condition::all()
1869 .add(
1870 room_participant::Column::AnsweringConnectionId
1871 .eq(connection.id as i32),
1872 )
1873 .add(
1874 room_participant::Column::AnsweringConnectionServerId
1875 .eq(connection.owner_id as i32),
1876 ),
1877 )
1878 .one(&*tx)
1879 .await?
1880 .ok_or_else(|| anyhow!("not a participant in any room"))?;
1881
1882 room_participant::Entity::update(room_participant::ActiveModel {
1883 answering_connection_lost: ActiveValue::set(true),
1884 ..participant.into_active_model()
1885 })
1886 .exec(&*tx)
1887 .await?;
1888
1889 Ok(())
1890 })
1891 .await
1892 }
1893
1894 fn build_incoming_call(
1895 room: &proto::Room,
1896 called_user_id: UserId,
1897 ) -> Option<proto::IncomingCall> {
1898 let pending_participant = room
1899 .pending_participants
1900 .iter()
1901 .find(|participant| participant.user_id == called_user_id.to_proto())?;
1902
1903 Some(proto::IncomingCall {
1904 room_id: room.id,
1905 calling_user_id: pending_participant.calling_user_id,
1906 participant_user_ids: room
1907 .participants
1908 .iter()
1909 .map(|participant| participant.user_id)
1910 .collect(),
1911 initial_project: room.participants.iter().find_map(|participant| {
1912 let initial_project_id = pending_participant.initial_project_id?;
1913 participant
1914 .projects
1915 .iter()
1916 .find(|project| project.id == initial_project_id)
1917 .cloned()
1918 }),
1919 })
1920 }
1921
1922 async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1923 let db_room = room::Entity::find_by_id(room_id)
1924 .one(tx)
1925 .await?
1926 .ok_or_else(|| anyhow!("could not find room"))?;
1927
1928 let mut db_participants = db_room
1929 .find_related(room_participant::Entity)
1930 .stream(tx)
1931 .await?;
1932 let mut participants = HashMap::default();
1933 let mut pending_participants = Vec::new();
1934 while let Some(db_participant) = db_participants.next().await {
1935 let db_participant = db_participant?;
1936 if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
1937 .answering_connection_id
1938 .zip(db_participant.answering_connection_server_id)
1939 {
1940 let location = match (
1941 db_participant.location_kind,
1942 db_participant.location_project_id,
1943 ) {
1944 (Some(0), Some(project_id)) => {
1945 Some(proto::participant_location::Variant::SharedProject(
1946 proto::participant_location::SharedProject {
1947 id: project_id.to_proto(),
1948 },
1949 ))
1950 }
1951 (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1952 Default::default(),
1953 )),
1954 _ => Some(proto::participant_location::Variant::External(
1955 Default::default(),
1956 )),
1957 };
1958
1959 let answering_connection = ConnectionId {
1960 owner_id: answering_connection_server_id.0 as u32,
1961 id: answering_connection_id as u32,
1962 };
1963 participants.insert(
1964 answering_connection,
1965 proto::Participant {
1966 user_id: db_participant.user_id.to_proto(),
1967 peer_id: Some(answering_connection.into()),
1968 projects: Default::default(),
1969 location: Some(proto::ParticipantLocation { variant: location }),
1970 },
1971 );
1972 } else {
1973 pending_participants.push(proto::PendingParticipant {
1974 user_id: db_participant.user_id.to_proto(),
1975 calling_user_id: db_participant.calling_user_id.to_proto(),
1976 initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1977 });
1978 }
1979 }
1980 drop(db_participants);
1981
1982 let mut db_projects = db_room
1983 .find_related(project::Entity)
1984 .find_with_related(worktree::Entity)
1985 .stream(tx)
1986 .await?;
1987
1988 while let Some(row) = db_projects.next().await {
1989 let (db_project, db_worktree) = row?;
1990 let host_connection = db_project.host_connection()?;
1991 if let Some(participant) = participants.get_mut(&host_connection) {
1992 let project = if let Some(project) = participant
1993 .projects
1994 .iter_mut()
1995 .find(|project| project.id == db_project.id.to_proto())
1996 {
1997 project
1998 } else {
1999 participant.projects.push(proto::ParticipantProject {
2000 id: db_project.id.to_proto(),
2001 worktree_root_names: Default::default(),
2002 });
2003 participant.projects.last_mut().unwrap()
2004 };
2005
2006 if let Some(db_worktree) = db_worktree {
2007 if db_worktree.visible {
2008 project.worktree_root_names.push(db_worktree.root_name);
2009 }
2010 }
2011 }
2012 }
2013 drop(db_projects);
2014
2015 let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
2016 let mut followers = Vec::new();
2017 while let Some(db_follower) = db_followers.next().await {
2018 let db_follower = db_follower?;
2019 followers.push(proto::Follower {
2020 leader_id: Some(db_follower.leader_connection().into()),
2021 follower_id: Some(db_follower.follower_connection().into()),
2022 project_id: db_follower.project_id.to_proto(),
2023 });
2024 }
2025
2026 Ok(proto::Room {
2027 id: db_room.id.to_proto(),
2028 live_kit_room: db_room.live_kit_room,
2029 participants: participants.into_values().collect(),
2030 pending_participants,
2031 followers,
2032 })
2033 }
2034
2035 // projects
2036
2037 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
2038 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2039 enum QueryAs {
2040 Count,
2041 }
2042
2043 self.transaction(|tx| async move {
2044 Ok(project::Entity::find()
2045 .select_only()
2046 .column_as(project::Column::Id.count(), QueryAs::Count)
2047 .inner_join(user::Entity)
2048 .filter(user::Column::Admin.eq(false))
2049 .into_values::<_, QueryAs>()
2050 .one(&*tx)
2051 .await?
2052 .unwrap_or(0i64) as usize)
2053 })
2054 .await
2055 }
2056
2057 pub async fn share_project(
2058 &self,
2059 room_id: RoomId,
2060 connection: ConnectionId,
2061 worktrees: &[proto::WorktreeMetadata],
2062 ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
2063 self.room_transaction(room_id, |tx| async move {
2064 let participant = room_participant::Entity::find()
2065 .filter(
2066 Condition::all()
2067 .add(
2068 room_participant::Column::AnsweringConnectionId
2069 .eq(connection.id as i32),
2070 )
2071 .add(
2072 room_participant::Column::AnsweringConnectionServerId
2073 .eq(connection.owner_id as i32),
2074 ),
2075 )
2076 .one(&*tx)
2077 .await?
2078 .ok_or_else(|| anyhow!("could not find participant"))?;
2079 if participant.room_id != room_id {
2080 return Err(anyhow!("shared project on unexpected room"))?;
2081 }
2082
2083 let project = project::ActiveModel {
2084 room_id: ActiveValue::set(participant.room_id),
2085 host_user_id: ActiveValue::set(participant.user_id),
2086 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
2087 host_connection_server_id: ActiveValue::set(Some(ServerId(
2088 connection.owner_id as i32,
2089 ))),
2090 ..Default::default()
2091 }
2092 .insert(&*tx)
2093 .await?;
2094
2095 if !worktrees.is_empty() {
2096 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2097 worktree::ActiveModel {
2098 id: ActiveValue::set(worktree.id as i64),
2099 project_id: ActiveValue::set(project.id),
2100 abs_path: ActiveValue::set(worktree.abs_path.clone()),
2101 root_name: ActiveValue::set(worktree.root_name.clone()),
2102 visible: ActiveValue::set(worktree.visible),
2103 scan_id: ActiveValue::set(0),
2104 completed_scan_id: ActiveValue::set(0),
2105 }
2106 }))
2107 .exec(&*tx)
2108 .await?;
2109 }
2110
2111 project_collaborator::ActiveModel {
2112 project_id: ActiveValue::set(project.id),
2113 connection_id: ActiveValue::set(connection.id as i32),
2114 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2115 user_id: ActiveValue::set(participant.user_id),
2116 replica_id: ActiveValue::set(ReplicaId(0)),
2117 is_host: ActiveValue::set(true),
2118 ..Default::default()
2119 }
2120 .insert(&*tx)
2121 .await?;
2122
2123 let room = self.get_room(room_id, &tx).await?;
2124 Ok((project.id, room))
2125 })
2126 .await
2127 }
2128
2129 pub async fn unshare_project(
2130 &self,
2131 project_id: ProjectId,
2132 connection: ConnectionId,
2133 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2134 let room_id = self.room_id_for_project(project_id).await?;
2135 self.room_transaction(room_id, |tx| async move {
2136 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2137
2138 let project = project::Entity::find_by_id(project_id)
2139 .one(&*tx)
2140 .await?
2141 .ok_or_else(|| anyhow!("project not found"))?;
2142 if project.host_connection()? == connection {
2143 project::Entity::delete(project.into_active_model())
2144 .exec(&*tx)
2145 .await?;
2146 let room = self.get_room(room_id, &tx).await?;
2147 Ok((room, guest_connection_ids))
2148 } else {
2149 Err(anyhow!("cannot unshare a project hosted by another user"))?
2150 }
2151 })
2152 .await
2153 }
2154
2155 pub async fn update_project(
2156 &self,
2157 project_id: ProjectId,
2158 connection: ConnectionId,
2159 worktrees: &[proto::WorktreeMetadata],
2160 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2161 let room_id = self.room_id_for_project(project_id).await?;
2162 self.room_transaction(room_id, |tx| async move {
2163 let project = project::Entity::find_by_id(project_id)
2164 .filter(
2165 Condition::all()
2166 .add(project::Column::HostConnectionId.eq(connection.id as i32))
2167 .add(
2168 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2169 ),
2170 )
2171 .one(&*tx)
2172 .await?
2173 .ok_or_else(|| anyhow!("no such project"))?;
2174
2175 self.update_project_worktrees(project.id, worktrees, &tx)
2176 .await?;
2177
2178 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2179 let room = self.get_room(project.room_id, &tx).await?;
2180 Ok((room, guest_connection_ids))
2181 })
2182 .await
2183 }
2184
2185 async fn update_project_worktrees(
2186 &self,
2187 project_id: ProjectId,
2188 worktrees: &[proto::WorktreeMetadata],
2189 tx: &DatabaseTransaction,
2190 ) -> Result<()> {
2191 if !worktrees.is_empty() {
2192 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2193 id: ActiveValue::set(worktree.id as i64),
2194 project_id: ActiveValue::set(project_id),
2195 abs_path: ActiveValue::set(worktree.abs_path.clone()),
2196 root_name: ActiveValue::set(worktree.root_name.clone()),
2197 visible: ActiveValue::set(worktree.visible),
2198 scan_id: ActiveValue::set(0),
2199 completed_scan_id: ActiveValue::set(0),
2200 }))
2201 .on_conflict(
2202 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2203 .update_column(worktree::Column::RootName)
2204 .to_owned(),
2205 )
2206 .exec(&*tx)
2207 .await?;
2208 }
2209
2210 worktree::Entity::delete_many()
2211 .filter(worktree::Column::ProjectId.eq(project_id).and(
2212 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2213 ))
2214 .exec(&*tx)
2215 .await?;
2216
2217 Ok(())
2218 }
2219
2220 pub async fn update_worktree(
2221 &self,
2222 update: &proto::UpdateWorktree,
2223 connection: ConnectionId,
2224 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2225 let project_id = ProjectId::from_proto(update.project_id);
2226 let worktree_id = update.worktree_id as i64;
2227 let room_id = self.room_id_for_project(project_id).await?;
2228 self.room_transaction(room_id, |tx| async move {
2229 // Ensure the update comes from the host.
2230 let _project = project::Entity::find_by_id(project_id)
2231 .filter(
2232 Condition::all()
2233 .add(project::Column::HostConnectionId.eq(connection.id as i32))
2234 .add(
2235 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2236 ),
2237 )
2238 .one(&*tx)
2239 .await?
2240 .ok_or_else(|| anyhow!("no such project"))?;
2241
2242 // Update metadata.
2243 worktree::Entity::update(worktree::ActiveModel {
2244 id: ActiveValue::set(worktree_id),
2245 project_id: ActiveValue::set(project_id),
2246 root_name: ActiveValue::set(update.root_name.clone()),
2247 scan_id: ActiveValue::set(update.scan_id as i64),
2248 completed_scan_id: if update.is_last_update {
2249 ActiveValue::set(update.scan_id as i64)
2250 } else {
2251 ActiveValue::default()
2252 },
2253 abs_path: ActiveValue::set(update.abs_path.clone()),
2254 ..Default::default()
2255 })
2256 .exec(&*tx)
2257 .await?;
2258
2259 if !update.updated_entries.is_empty() {
2260 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2261 let mtime = entry.mtime.clone().unwrap_or_default();
2262 worktree_entry::ActiveModel {
2263 project_id: ActiveValue::set(project_id),
2264 worktree_id: ActiveValue::set(worktree_id),
2265 id: ActiveValue::set(entry.id as i64),
2266 is_dir: ActiveValue::set(entry.is_dir),
2267 path: ActiveValue::set(entry.path.clone()),
2268 inode: ActiveValue::set(entry.inode as i64),
2269 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2270 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2271 is_symlink: ActiveValue::set(entry.is_symlink),
2272 is_ignored: ActiveValue::set(entry.is_ignored),
2273 is_deleted: ActiveValue::set(false),
2274 scan_id: ActiveValue::set(update.scan_id as i64),
2275 }
2276 }))
2277 .on_conflict(
2278 OnConflict::columns([
2279 worktree_entry::Column::ProjectId,
2280 worktree_entry::Column::WorktreeId,
2281 worktree_entry::Column::Id,
2282 ])
2283 .update_columns([
2284 worktree_entry::Column::IsDir,
2285 worktree_entry::Column::Path,
2286 worktree_entry::Column::Inode,
2287 worktree_entry::Column::MtimeSeconds,
2288 worktree_entry::Column::MtimeNanos,
2289 worktree_entry::Column::IsSymlink,
2290 worktree_entry::Column::IsIgnored,
2291 worktree_entry::Column::ScanId,
2292 ])
2293 .to_owned(),
2294 )
2295 .exec(&*tx)
2296 .await?;
2297 }
2298
2299 if !update.removed_entries.is_empty() {
2300 worktree_entry::Entity::update_many()
2301 .filter(
2302 worktree_entry::Column::ProjectId
2303 .eq(project_id)
2304 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2305 .and(
2306 worktree_entry::Column::Id
2307 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2308 ),
2309 )
2310 .set(worktree_entry::ActiveModel {
2311 is_deleted: ActiveValue::Set(true),
2312 scan_id: ActiveValue::Set(update.scan_id as i64),
2313 ..Default::default()
2314 })
2315 .exec(&*tx)
2316 .await?;
2317 }
2318
2319 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2320 Ok(connection_ids)
2321 })
2322 .await
2323 }
2324
2325 pub async fn update_diagnostic_summary(
2326 &self,
2327 update: &proto::UpdateDiagnosticSummary,
2328 connection: ConnectionId,
2329 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2330 let project_id = ProjectId::from_proto(update.project_id);
2331 let worktree_id = update.worktree_id as i64;
2332 let room_id = self.room_id_for_project(project_id).await?;
2333 self.room_transaction(room_id, |tx| async move {
2334 let summary = update
2335 .summary
2336 .as_ref()
2337 .ok_or_else(|| anyhow!("invalid summary"))?;
2338
2339 // Ensure the update comes from the host.
2340 let project = project::Entity::find_by_id(project_id)
2341 .one(&*tx)
2342 .await?
2343 .ok_or_else(|| anyhow!("no such project"))?;
2344 if project.host_connection()? != connection {
2345 return Err(anyhow!("can't update a project hosted by someone else"))?;
2346 }
2347
2348 // Update summary.
2349 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2350 project_id: ActiveValue::set(project_id),
2351 worktree_id: ActiveValue::set(worktree_id),
2352 path: ActiveValue::set(summary.path.clone()),
2353 language_server_id: ActiveValue::set(summary.language_server_id as i64),
2354 error_count: ActiveValue::set(summary.error_count as i32),
2355 warning_count: ActiveValue::set(summary.warning_count as i32),
2356 ..Default::default()
2357 })
2358 .on_conflict(
2359 OnConflict::columns([
2360 worktree_diagnostic_summary::Column::ProjectId,
2361 worktree_diagnostic_summary::Column::WorktreeId,
2362 worktree_diagnostic_summary::Column::Path,
2363 ])
2364 .update_columns([
2365 worktree_diagnostic_summary::Column::LanguageServerId,
2366 worktree_diagnostic_summary::Column::ErrorCount,
2367 worktree_diagnostic_summary::Column::WarningCount,
2368 ])
2369 .to_owned(),
2370 )
2371 .exec(&*tx)
2372 .await?;
2373
2374 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2375 Ok(connection_ids)
2376 })
2377 .await
2378 }
2379
2380 pub async fn start_language_server(
2381 &self,
2382 update: &proto::StartLanguageServer,
2383 connection: ConnectionId,
2384 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2385 let project_id = ProjectId::from_proto(update.project_id);
2386 let room_id = self.room_id_for_project(project_id).await?;
2387 self.room_transaction(room_id, |tx| async move {
2388 let server = update
2389 .server
2390 .as_ref()
2391 .ok_or_else(|| anyhow!("invalid language server"))?;
2392
2393 // Ensure the update comes from the host.
2394 let project = project::Entity::find_by_id(project_id)
2395 .one(&*tx)
2396 .await?
2397 .ok_or_else(|| anyhow!("no such project"))?;
2398 if project.host_connection()? != connection {
2399 return Err(anyhow!("can't update a project hosted by someone else"))?;
2400 }
2401
2402 // Add the newly-started language server.
2403 language_server::Entity::insert(language_server::ActiveModel {
2404 project_id: ActiveValue::set(project_id),
2405 id: ActiveValue::set(server.id as i64),
2406 name: ActiveValue::set(server.name.clone()),
2407 ..Default::default()
2408 })
2409 .on_conflict(
2410 OnConflict::columns([
2411 language_server::Column::ProjectId,
2412 language_server::Column::Id,
2413 ])
2414 .update_column(language_server::Column::Name)
2415 .to_owned(),
2416 )
2417 .exec(&*tx)
2418 .await?;
2419
2420 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2421 Ok(connection_ids)
2422 })
2423 .await
2424 }
2425
2426 pub async fn join_project(
2427 &self,
2428 project_id: ProjectId,
2429 connection: ConnectionId,
2430 ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2431 let room_id = self.room_id_for_project(project_id).await?;
2432 self.room_transaction(room_id, |tx| async move {
2433 let participant = room_participant::Entity::find()
2434 .filter(
2435 Condition::all()
2436 .add(
2437 room_participant::Column::AnsweringConnectionId
2438 .eq(connection.id as i32),
2439 )
2440 .add(
2441 room_participant::Column::AnsweringConnectionServerId
2442 .eq(connection.owner_id as i32),
2443 ),
2444 )
2445 .one(&*tx)
2446 .await?
2447 .ok_or_else(|| anyhow!("must join a room first"))?;
2448
2449 let project = project::Entity::find_by_id(project_id)
2450 .one(&*tx)
2451 .await?
2452 .ok_or_else(|| anyhow!("no such project"))?;
2453 if project.room_id != participant.room_id {
2454 return Err(anyhow!("no such project"))?;
2455 }
2456
2457 let mut collaborators = project
2458 .find_related(project_collaborator::Entity)
2459 .all(&*tx)
2460 .await?;
2461 let replica_ids = collaborators
2462 .iter()
2463 .map(|c| c.replica_id)
2464 .collect::<HashSet<_>>();
2465 let mut replica_id = ReplicaId(1);
2466 while replica_ids.contains(&replica_id) {
2467 replica_id.0 += 1;
2468 }
2469 let new_collaborator = project_collaborator::ActiveModel {
2470 project_id: ActiveValue::set(project_id),
2471 connection_id: ActiveValue::set(connection.id as i32),
2472 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2473 user_id: ActiveValue::set(participant.user_id),
2474 replica_id: ActiveValue::set(replica_id),
2475 is_host: ActiveValue::set(false),
2476 ..Default::default()
2477 }
2478 .insert(&*tx)
2479 .await?;
2480 collaborators.push(new_collaborator);
2481
2482 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2483 let mut worktrees = db_worktrees
2484 .into_iter()
2485 .map(|db_worktree| {
2486 (
2487 db_worktree.id as u64,
2488 Worktree {
2489 id: db_worktree.id as u64,
2490 abs_path: db_worktree.abs_path,
2491 root_name: db_worktree.root_name,
2492 visible: db_worktree.visible,
2493 entries: Default::default(),
2494 diagnostic_summaries: Default::default(),
2495 scan_id: db_worktree.scan_id as u64,
2496 completed_scan_id: db_worktree.completed_scan_id as u64,
2497 },
2498 )
2499 })
2500 .collect::<BTreeMap<_, _>>();
2501
2502 // Populate worktree entries.
2503 {
2504 let mut db_entries = worktree_entry::Entity::find()
2505 .filter(
2506 Condition::all()
2507 .add(worktree_entry::Column::ProjectId.eq(project_id))
2508 .add(worktree_entry::Column::IsDeleted.eq(false)),
2509 )
2510 .stream(&*tx)
2511 .await?;
2512 while let Some(db_entry) = db_entries.next().await {
2513 let db_entry = db_entry?;
2514 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2515 worktree.entries.push(proto::Entry {
2516 id: db_entry.id as u64,
2517 is_dir: db_entry.is_dir,
2518 path: db_entry.path,
2519 inode: db_entry.inode as u64,
2520 mtime: Some(proto::Timestamp {
2521 seconds: db_entry.mtime_seconds as u64,
2522 nanos: db_entry.mtime_nanos as u32,
2523 }),
2524 is_symlink: db_entry.is_symlink,
2525 is_ignored: db_entry.is_ignored,
2526 });
2527 }
2528 }
2529 }
2530
2531 // Populate worktree diagnostic summaries.
2532 {
2533 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2534 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2535 .stream(&*tx)
2536 .await?;
2537 while let Some(db_summary) = db_summaries.next().await {
2538 let db_summary = db_summary?;
2539 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2540 worktree
2541 .diagnostic_summaries
2542 .push(proto::DiagnosticSummary {
2543 path: db_summary.path,
2544 language_server_id: db_summary.language_server_id as u64,
2545 error_count: db_summary.error_count as u32,
2546 warning_count: db_summary.warning_count as u32,
2547 });
2548 }
2549 }
2550 }
2551
2552 // Populate language servers.
2553 let language_servers = project
2554 .find_related(language_server::Entity)
2555 .all(&*tx)
2556 .await?;
2557
2558 let project = Project {
2559 collaborators: collaborators
2560 .into_iter()
2561 .map(|collaborator| ProjectCollaborator {
2562 connection_id: collaborator.connection(),
2563 user_id: collaborator.user_id,
2564 replica_id: collaborator.replica_id,
2565 is_host: collaborator.is_host,
2566 })
2567 .collect(),
2568 worktrees,
2569 language_servers: language_servers
2570 .into_iter()
2571 .map(|language_server| proto::LanguageServer {
2572 id: language_server.id as u64,
2573 name: language_server.name,
2574 })
2575 .collect(),
2576 };
2577 Ok((project, replica_id as ReplicaId))
2578 })
2579 .await
2580 }
2581
2582 pub async fn leave_project(
2583 &self,
2584 project_id: ProjectId,
2585 connection: ConnectionId,
2586 ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
2587 let room_id = self.room_id_for_project(project_id).await?;
2588 self.room_transaction(room_id, |tx| async move {
2589 let result = project_collaborator::Entity::delete_many()
2590 .filter(
2591 Condition::all()
2592 .add(project_collaborator::Column::ProjectId.eq(project_id))
2593 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2594 .add(
2595 project_collaborator::Column::ConnectionServerId
2596 .eq(connection.owner_id as i32),
2597 ),
2598 )
2599 .exec(&*tx)
2600 .await?;
2601 if result.rows_affected == 0 {
2602 Err(anyhow!("not a collaborator on this project"))?;
2603 }
2604
2605 let project = project::Entity::find_by_id(project_id)
2606 .one(&*tx)
2607 .await?
2608 .ok_or_else(|| anyhow!("no such project"))?;
2609 let collaborators = project
2610 .find_related(project_collaborator::Entity)
2611 .all(&*tx)
2612 .await?;
2613 let connection_ids = collaborators
2614 .into_iter()
2615 .map(|collaborator| collaborator.connection())
2616 .collect();
2617
2618 follower::Entity::delete_many()
2619 .filter(
2620 Condition::any()
2621 .add(
2622 Condition::all()
2623 .add(follower::Column::ProjectId.eq(project_id))
2624 .add(
2625 follower::Column::LeaderConnectionServerId
2626 .eq(connection.owner_id),
2627 )
2628 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
2629 )
2630 .add(
2631 Condition::all()
2632 .add(follower::Column::ProjectId.eq(project_id))
2633 .add(
2634 follower::Column::FollowerConnectionServerId
2635 .eq(connection.owner_id),
2636 )
2637 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
2638 ),
2639 )
2640 .exec(&*tx)
2641 .await?;
2642
2643 let room = self.get_room(project.room_id, &tx).await?;
2644 let left_project = LeftProject {
2645 id: project_id,
2646 host_user_id: project.host_user_id,
2647 host_connection_id: project.host_connection()?,
2648 connection_ids,
2649 };
2650 Ok((room, left_project))
2651 })
2652 .await
2653 }
2654
2655 pub async fn project_collaborators(
2656 &self,
2657 project_id: ProjectId,
2658 connection_id: ConnectionId,
2659 ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
2660 let room_id = self.room_id_for_project(project_id).await?;
2661 self.room_transaction(room_id, |tx| async move {
2662 let collaborators = project_collaborator::Entity::find()
2663 .filter(project_collaborator::Column::ProjectId.eq(project_id))
2664 .all(&*tx)
2665 .await?
2666 .into_iter()
2667 .map(|collaborator| ProjectCollaborator {
2668 connection_id: collaborator.connection(),
2669 user_id: collaborator.user_id,
2670 replica_id: collaborator.replica_id,
2671 is_host: collaborator.is_host,
2672 })
2673 .collect::<Vec<_>>();
2674
2675 if collaborators
2676 .iter()
2677 .any(|collaborator| collaborator.connection_id == connection_id)
2678 {
2679 Ok(collaborators)
2680 } else {
2681 Err(anyhow!("no such project"))?
2682 }
2683 })
2684 .await
2685 }
2686
2687 pub async fn project_connection_ids(
2688 &self,
2689 project_id: ProjectId,
2690 connection_id: ConnectionId,
2691 ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
2692 let room_id = self.room_id_for_project(project_id).await?;
2693 self.room_transaction(room_id, |tx| async move {
2694 let mut collaborators = project_collaborator::Entity::find()
2695 .filter(project_collaborator::Column::ProjectId.eq(project_id))
2696 .stream(&*tx)
2697 .await?;
2698
2699 let mut connection_ids = HashSet::default();
2700 while let Some(collaborator) = collaborators.next().await {
2701 let collaborator = collaborator?;
2702 connection_ids.insert(collaborator.connection());
2703 }
2704
2705 if connection_ids.contains(&connection_id) {
2706 Ok(connection_ids)
2707 } else {
2708 Err(anyhow!("no such project"))?
2709 }
2710 })
2711 .await
2712 }
2713
2714 async fn project_guest_connection_ids(
2715 &self,
2716 project_id: ProjectId,
2717 tx: &DatabaseTransaction,
2718 ) -> Result<Vec<ConnectionId>> {
2719 let mut collaborators = project_collaborator::Entity::find()
2720 .filter(
2721 project_collaborator::Column::ProjectId
2722 .eq(project_id)
2723 .and(project_collaborator::Column::IsHost.eq(false)),
2724 )
2725 .stream(tx)
2726 .await?;
2727
2728 let mut guest_connection_ids = Vec::new();
2729 while let Some(collaborator) = collaborators.next().await {
2730 let collaborator = collaborator?;
2731 guest_connection_ids.push(collaborator.connection());
2732 }
2733 Ok(guest_connection_ids)
2734 }
2735
2736 async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
2737 self.transaction(|tx| async move {
2738 let project = project::Entity::find_by_id(project_id)
2739 .one(&*tx)
2740 .await?
2741 .ok_or_else(|| anyhow!("project {} not found", project_id))?;
2742 Ok(project.room_id)
2743 })
2744 .await
2745 }
2746
2747 // access tokens
2748
2749 pub async fn create_access_token(
2750 &self,
2751 user_id: UserId,
2752 access_token_hash: &str,
2753 max_access_token_count: usize,
2754 ) -> Result<AccessTokenId> {
2755 self.transaction(|tx| async {
2756 let tx = tx;
2757
2758 let token = access_token::ActiveModel {
2759 user_id: ActiveValue::set(user_id),
2760 hash: ActiveValue::set(access_token_hash.into()),
2761 ..Default::default()
2762 }
2763 .insert(&*tx)
2764 .await?;
2765
2766 access_token::Entity::delete_many()
2767 .filter(
2768 access_token::Column::Id.in_subquery(
2769 Query::select()
2770 .column(access_token::Column::Id)
2771 .from(access_token::Entity)
2772 .and_where(access_token::Column::UserId.eq(user_id))
2773 .order_by(access_token::Column::Id, sea_orm::Order::Desc)
2774 .limit(10000)
2775 .offset(max_access_token_count as u64)
2776 .to_owned(),
2777 ),
2778 )
2779 .exec(&*tx)
2780 .await?;
2781 Ok(token.id)
2782 })
2783 .await
2784 }
2785
2786 pub async fn get_access_token(
2787 &self,
2788 access_token_id: AccessTokenId,
2789 ) -> Result<access_token::Model> {
2790 self.transaction(|tx| async move {
2791 Ok(access_token::Entity::find_by_id(access_token_id)
2792 .one(&*tx)
2793 .await?
2794 .ok_or_else(|| anyhow!("no such access token"))?)
2795 })
2796 .await
2797 }
2798
2799 async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
2800 where
2801 F: Send + Fn(TransactionHandle) -> Fut,
2802 Fut: Send + Future<Output = Result<T>>,
2803 {
2804 let body = async {
2805 let mut i = 0;
2806 loop {
2807 let (tx, result) = self.with_transaction(&f).await?;
2808 match result {
2809 Ok(result) => match tx.commit().await.map_err(Into::into) {
2810 Ok(()) => return Ok(result),
2811 Err(error) => {
2812 if !self.retry_on_serialization_error(&error, i).await {
2813 return Err(error);
2814 }
2815 }
2816 },
2817 Err(error) => {
2818 tx.rollback().await?;
2819 if !self.retry_on_serialization_error(&error, i).await {
2820 return Err(error);
2821 }
2822 }
2823 }
2824 i += 1;
2825 }
2826 };
2827
2828 self.run(body).await
2829 }
2830
2831 async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
2832 where
2833 F: Send + Fn(TransactionHandle) -> Fut,
2834 Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
2835 {
2836 let body = async {
2837 let mut i = 0;
2838 loop {
2839 let (tx, result) = self.with_transaction(&f).await?;
2840 match result {
2841 Ok(Some((room_id, data))) => {
2842 let lock = self.rooms.entry(room_id).or_default().clone();
2843 let _guard = lock.lock_owned().await;
2844 match tx.commit().await.map_err(Into::into) {
2845 Ok(()) => {
2846 return Ok(Some(RoomGuard {
2847 data,
2848 _guard,
2849 _not_send: PhantomData,
2850 }));
2851 }
2852 Err(error) => {
2853 if !self.retry_on_serialization_error(&error, i).await {
2854 return Err(error);
2855 }
2856 }
2857 }
2858 }
2859 Ok(None) => match tx.commit().await.map_err(Into::into) {
2860 Ok(()) => return Ok(None),
2861 Err(error) => {
2862 if !self.retry_on_serialization_error(&error, i).await {
2863 return Err(error);
2864 }
2865 }
2866 },
2867 Err(error) => {
2868 tx.rollback().await?;
2869 if !self.retry_on_serialization_error(&error, i).await {
2870 return Err(error);
2871 }
2872 }
2873 }
2874 i += 1;
2875 }
2876 };
2877
2878 self.run(body).await
2879 }
2880
2881 async fn room_transaction<F, Fut, T>(&self, room_id: RoomId, f: F) -> Result<RoomGuard<T>>
2882 where
2883 F: Send + Fn(TransactionHandle) -> Fut,
2884 Fut: Send + Future<Output = Result<T>>,
2885 {
2886 let body = async {
2887 let mut i = 0;
2888 loop {
2889 let lock = self.rooms.entry(room_id).or_default().clone();
2890 let _guard = lock.lock_owned().await;
2891 let (tx, result) = self.with_transaction(&f).await?;
2892 match result {
2893 Ok(data) => match tx.commit().await.map_err(Into::into) {
2894 Ok(()) => {
2895 return Ok(RoomGuard {
2896 data,
2897 _guard,
2898 _not_send: PhantomData,
2899 });
2900 }
2901 Err(error) => {
2902 if !self.retry_on_serialization_error(&error, i).await {
2903 return Err(error);
2904 }
2905 }
2906 },
2907 Err(error) => {
2908 tx.rollback().await?;
2909 if !self.retry_on_serialization_error(&error, i).await {
2910 return Err(error);
2911 }
2912 }
2913 }
2914 i += 1;
2915 }
2916 };
2917
2918 self.run(body).await
2919 }
2920
2921 async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
2922 where
2923 F: Send + Fn(TransactionHandle) -> Fut,
2924 Fut: Send + Future<Output = Result<T>>,
2925 {
2926 let tx = self
2927 .pool
2928 .begin_with_config(Some(IsolationLevel::Serializable), None)
2929 .await?;
2930
2931 let mut tx = Arc::new(Some(tx));
2932 let result = f(TransactionHandle(tx.clone())).await;
2933 let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
2934 return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
2935 };
2936
2937 Ok((tx, result))
2938 }
2939
2940 async fn run<F, T>(&self, future: F) -> Result<T>
2941 where
2942 F: Future<Output = Result<T>>,
2943 {
2944 #[cfg(test)]
2945 {
2946 if let Executor::Deterministic(executor) = &self.executor {
2947 executor.simulate_random_delay().await;
2948 }
2949
2950 self.runtime.as_ref().unwrap().block_on(future)
2951 }
2952
2953 #[cfg(not(test))]
2954 {
2955 future.await
2956 }
2957 }
2958
2959 async fn retry_on_serialization_error(&self, error: &Error, prev_attempt_count: u32) -> bool {
2960 // If the error is due to a failure to serialize concurrent transactions, then retry
2961 // this transaction after a delay. With each subsequent retry, double the delay duration.
2962 // Also vary the delay randomly in order to ensure different database connections retry
2963 // at different times.
2964 if is_serialization_error(error) {
2965 let base_delay = 4_u64 << prev_attempt_count.min(16);
2966 let randomized_delay = base_delay as f32 * self.rng.lock().await.gen_range(0.5..=2.0);
2967 log::info!(
2968 "retrying transaction after serialization error. delay: {} ms.",
2969 randomized_delay
2970 );
2971 self.executor
2972 .sleep(Duration::from_millis(randomized_delay as u64))
2973 .await;
2974 true
2975 } else {
2976 false
2977 }
2978 }
2979}
2980
2981fn is_serialization_error(error: &Error) -> bool {
2982 const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
2983 match error {
2984 Error::Database(
2985 DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
2986 | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
2987 ) if error
2988 .as_database_error()
2989 .and_then(|error| error.code())
2990 .as_deref()
2991 == Some(SERIALIZATION_FAILURE_CODE) =>
2992 {
2993 true
2994 }
2995 _ => false,
2996 }
2997}
2998
2999struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
3000
3001impl Deref for TransactionHandle {
3002 type Target = DatabaseTransaction;
3003
3004 fn deref(&self) -> &Self::Target {
3005 self.0.as_ref().as_ref().unwrap()
3006 }
3007}
3008
3009pub struct RoomGuard<T> {
3010 data: T,
3011 _guard: OwnedMutexGuard<()>,
3012 _not_send: PhantomData<Rc<()>>,
3013}
3014
3015impl<T> Deref for RoomGuard<T> {
3016 type Target = T;
3017
3018 fn deref(&self) -> &T {
3019 &self.data
3020 }
3021}
3022
3023impl<T> DerefMut for RoomGuard<T> {
3024 fn deref_mut(&mut self) -> &mut T {
3025 &mut self.data
3026 }
3027}
3028
3029#[derive(Debug, Serialize, Deserialize)]
3030pub struct NewUserParams {
3031 pub github_login: String,
3032 pub github_user_id: i32,
3033 pub invite_count: i32,
3034}
3035
3036#[derive(Debug)]
3037pub struct NewUserResult {
3038 pub user_id: UserId,
3039 pub metrics_id: String,
3040 pub inviting_user_id: Option<UserId>,
3041 pub signup_device_id: Option<String>,
3042}
3043
3044fn random_invite_code() -> String {
3045 nanoid::nanoid!(16)
3046}
3047
3048fn random_email_confirmation_code() -> String {
3049 nanoid::nanoid!(64)
3050}
3051
3052macro_rules! id_type {
3053 ($name:ident) => {
3054 #[derive(
3055 Clone,
3056 Copy,
3057 Debug,
3058 Default,
3059 PartialEq,
3060 Eq,
3061 PartialOrd,
3062 Ord,
3063 Hash,
3064 Serialize,
3065 Deserialize,
3066 )]
3067 #[serde(transparent)]
3068 pub struct $name(pub i32);
3069
3070 impl $name {
3071 #[allow(unused)]
3072 pub const MAX: Self = Self(i32::MAX);
3073
3074 #[allow(unused)]
3075 pub fn from_proto(value: u64) -> Self {
3076 Self(value as i32)
3077 }
3078
3079 #[allow(unused)]
3080 pub fn to_proto(self) -> u64 {
3081 self.0 as u64
3082 }
3083 }
3084
3085 impl std::fmt::Display for $name {
3086 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
3087 self.0.fmt(f)
3088 }
3089 }
3090
3091 impl From<$name> for sea_query::Value {
3092 fn from(value: $name) -> Self {
3093 sea_query::Value::Int(Some(value.0))
3094 }
3095 }
3096
3097 impl sea_orm::TryGetable for $name {
3098 fn try_get(
3099 res: &sea_orm::QueryResult,
3100 pre: &str,
3101 col: &str,
3102 ) -> Result<Self, sea_orm::TryGetError> {
3103 Ok(Self(i32::try_get(res, pre, col)?))
3104 }
3105 }
3106
3107 impl sea_query::ValueType for $name {
3108 fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
3109 match v {
3110 Value::TinyInt(Some(int)) => {
3111 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3112 }
3113 Value::SmallInt(Some(int)) => {
3114 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3115 }
3116 Value::Int(Some(int)) => {
3117 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3118 }
3119 Value::BigInt(Some(int)) => {
3120 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3121 }
3122 Value::TinyUnsigned(Some(int)) => {
3123 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3124 }
3125 Value::SmallUnsigned(Some(int)) => {
3126 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3127 }
3128 Value::Unsigned(Some(int)) => {
3129 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3130 }
3131 Value::BigUnsigned(Some(int)) => {
3132 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3133 }
3134 _ => Err(sea_query::ValueTypeErr),
3135 }
3136 }
3137
3138 fn type_name() -> String {
3139 stringify!($name).into()
3140 }
3141
3142 fn array_type() -> sea_query::ArrayType {
3143 sea_query::ArrayType::Int
3144 }
3145
3146 fn column_type() -> sea_query::ColumnType {
3147 sea_query::ColumnType::Integer(None)
3148 }
3149 }
3150
3151 impl sea_orm::TryFromU64 for $name {
3152 fn try_from_u64(n: u64) -> Result<Self, DbErr> {
3153 Ok(Self(n.try_into().map_err(|_| {
3154 DbErr::ConvertFromU64(concat!(
3155 "error converting ",
3156 stringify!($name),
3157 " to u64"
3158 ))
3159 })?))
3160 }
3161 }
3162
3163 impl sea_query::Nullable for $name {
3164 fn null() -> Value {
3165 Value::Int(None)
3166 }
3167 }
3168 };
3169}
3170
3171id_type!(AccessTokenId);
3172id_type!(ContactId);
3173id_type!(FollowerId);
3174id_type!(RoomId);
3175id_type!(RoomParticipantId);
3176id_type!(ProjectId);
3177id_type!(ProjectCollaboratorId);
3178id_type!(ReplicaId);
3179id_type!(ServerId);
3180id_type!(SignupId);
3181id_type!(UserId);
3182
3183pub struct RejoinedRoom {
3184 pub room: proto::Room,
3185 pub rejoined_projects: Vec<RejoinedProject>,
3186 pub reshared_projects: Vec<ResharedProject>,
3187}
3188
3189pub struct ResharedProject {
3190 pub id: ProjectId,
3191 pub old_connection_id: ConnectionId,
3192 pub collaborators: Vec<ProjectCollaborator>,
3193 pub worktrees: Vec<proto::WorktreeMetadata>,
3194}
3195
3196pub struct RejoinedProject {
3197 pub id: ProjectId,
3198 pub old_connection_id: ConnectionId,
3199 pub collaborators: Vec<ProjectCollaborator>,
3200 pub worktrees: Vec<RejoinedWorktree>,
3201 pub language_servers: Vec<proto::LanguageServer>,
3202}
3203
3204#[derive(Debug)]
3205pub struct RejoinedWorktree {
3206 pub id: u64,
3207 pub abs_path: String,
3208 pub root_name: String,
3209 pub visible: bool,
3210 pub updated_entries: Vec<proto::Entry>,
3211 pub removed_entries: Vec<u64>,
3212 pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3213 pub scan_id: u64,
3214 pub completed_scan_id: u64,
3215}
3216
3217pub struct LeftRoom {
3218 pub room: proto::Room,
3219 pub left_projects: HashMap<ProjectId, LeftProject>,
3220 pub canceled_calls_to_user_ids: Vec<UserId>,
3221}
3222
3223pub struct RefreshedRoom {
3224 pub room: proto::Room,
3225 pub stale_participant_user_ids: Vec<UserId>,
3226 pub canceled_calls_to_user_ids: Vec<UserId>,
3227}
3228
3229pub struct Project {
3230 pub collaborators: Vec<ProjectCollaborator>,
3231 pub worktrees: BTreeMap<u64, Worktree>,
3232 pub language_servers: Vec<proto::LanguageServer>,
3233}
3234
3235pub struct ProjectCollaborator {
3236 pub connection_id: ConnectionId,
3237 pub user_id: UserId,
3238 pub replica_id: ReplicaId,
3239 pub is_host: bool,
3240}
3241
3242impl ProjectCollaborator {
3243 pub fn to_proto(&self) -> proto::Collaborator {
3244 proto::Collaborator {
3245 peer_id: Some(self.connection_id.into()),
3246 replica_id: self.replica_id.0 as u32,
3247 user_id: self.user_id.to_proto(),
3248 }
3249 }
3250}
3251
3252#[derive(Debug)]
3253pub struct LeftProject {
3254 pub id: ProjectId,
3255 pub host_user_id: UserId,
3256 pub host_connection_id: ConnectionId,
3257 pub connection_ids: Vec<ConnectionId>,
3258}
3259
3260pub struct Worktree {
3261 pub id: u64,
3262 pub abs_path: String,
3263 pub root_name: String,
3264 pub visible: bool,
3265 pub entries: Vec<proto::Entry>,
3266 pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3267 pub scan_id: u64,
3268 pub completed_scan_id: u64,
3269}
3270
3271#[cfg(test)]
3272pub use test::*;
3273
3274#[cfg(test)]
3275mod test {
3276 use super::*;
3277 use gpui::executor::Background;
3278 use lazy_static::lazy_static;
3279 use parking_lot::Mutex;
3280 use sea_orm::ConnectionTrait;
3281 use sqlx::migrate::MigrateDatabase;
3282 use std::sync::Arc;
3283
3284 pub struct TestDb {
3285 pub db: Option<Arc<Database>>,
3286 pub connection: Option<sqlx::AnyConnection>,
3287 }
3288
3289 impl TestDb {
3290 pub fn sqlite(background: Arc<Background>) -> Self {
3291 let url = format!("sqlite::memory:");
3292 let runtime = tokio::runtime::Builder::new_current_thread()
3293 .enable_io()
3294 .enable_time()
3295 .build()
3296 .unwrap();
3297
3298 let mut db = runtime.block_on(async {
3299 let mut options = ConnectOptions::new(url);
3300 options.max_connections(5);
3301 let db = Database::new(options, Executor::Deterministic(background))
3302 .await
3303 .unwrap();
3304 let sql = include_str!(concat!(
3305 env!("CARGO_MANIFEST_DIR"),
3306 "/migrations.sqlite/20221109000000_test_schema.sql"
3307 ));
3308 db.pool
3309 .execute(sea_orm::Statement::from_string(
3310 db.pool.get_database_backend(),
3311 sql.into(),
3312 ))
3313 .await
3314 .unwrap();
3315 db
3316 });
3317
3318 db.runtime = Some(runtime);
3319
3320 Self {
3321 db: Some(Arc::new(db)),
3322 connection: None,
3323 }
3324 }
3325
3326 pub fn postgres(background: Arc<Background>) -> Self {
3327 lazy_static! {
3328 static ref LOCK: Mutex<()> = Mutex::new(());
3329 }
3330
3331 let _guard = LOCK.lock();
3332 let mut rng = StdRng::from_entropy();
3333 let url = format!(
3334 "postgres://postgres@localhost/zed-test-{}",
3335 rng.gen::<u128>()
3336 );
3337 let runtime = tokio::runtime::Builder::new_current_thread()
3338 .enable_io()
3339 .enable_time()
3340 .build()
3341 .unwrap();
3342
3343 let mut db = runtime.block_on(async {
3344 sqlx::Postgres::create_database(&url)
3345 .await
3346 .expect("failed to create test db");
3347 let mut options = ConnectOptions::new(url);
3348 options
3349 .max_connections(5)
3350 .idle_timeout(Duration::from_secs(0));
3351 let db = Database::new(options, Executor::Deterministic(background))
3352 .await
3353 .unwrap();
3354 let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
3355 db.migrate(Path::new(migrations_path), false).await.unwrap();
3356 db
3357 });
3358
3359 db.runtime = Some(runtime);
3360
3361 Self {
3362 db: Some(Arc::new(db)),
3363 connection: None,
3364 }
3365 }
3366
3367 pub fn db(&self) -> &Arc<Database> {
3368 self.db.as_ref().unwrap()
3369 }
3370 }
3371
3372 impl Drop for TestDb {
3373 fn drop(&mut self) {
3374 let db = self.db.take().unwrap();
3375 if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
3376 db.runtime.as_ref().unwrap().block_on(async {
3377 use util::ResultExt;
3378 let query = "
3379 SELECT pg_terminate_backend(pg_stat_activity.pid)
3380 FROM pg_stat_activity
3381 WHERE
3382 pg_stat_activity.datname = current_database() AND
3383 pid <> pg_backend_pid();
3384 ";
3385 db.pool
3386 .execute(sea_orm::Statement::from_string(
3387 db.pool.get_database_backend(),
3388 query.into(),
3389 ))
3390 .await
3391 .log_err();
3392 sqlx::Postgres::drop_database(db.options.get_url())
3393 .await
3394 .log_err();
3395 })
3396 }
3397 }
3398 }
3399}