1mod access_token;
2mod contact;
3mod follower;
4mod language_server;
5mod project;
6mod project_collaborator;
7mod room;
8mod room_participant;
9mod server;
10mod signup;
11#[cfg(test)]
12mod tests;
13mod user;
14mod worktree;
15mod worktree_diagnostic_summary;
16mod worktree_entry;
17
18use crate::executor::Executor;
19use crate::{Error, Result};
20use anyhow::anyhow;
21use collections::{BTreeMap, HashMap, HashSet};
22pub use contact::Contact;
23use dashmap::DashMap;
24use futures::StreamExt;
25use hyper::StatusCode;
26use rand::prelude::StdRng;
27use rand::{Rng, SeedableRng};
28use rpc::{proto, ConnectionId};
29use sea_orm::Condition;
30pub use sea_orm::ConnectOptions;
31use sea_orm::{
32 entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
33 DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
34 Statement, TransactionTrait,
35};
36use sea_query::{Alias, Expr, OnConflict, Query};
37use serde::{Deserialize, Serialize};
38pub use signup::{Invite, NewSignup, WaitlistSummary};
39use sqlx::migrate::{Migrate, Migration, MigrationSource};
40use sqlx::Connection;
41use std::ops::{Deref, DerefMut};
42use std::path::Path;
43use std::time::Duration;
44use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
45use tokio::sync::{Mutex, OwnedMutexGuard};
46pub use user::Model as User;
47
48pub struct Database {
49 options: ConnectOptions,
50 pool: DatabaseConnection,
51 rooms: DashMap<RoomId, Arc<Mutex<()>>>,
52 rng: Mutex<StdRng>,
53 executor: Executor,
54 #[cfg(test)]
55 runtime: Option<tokio::runtime::Runtime>,
56}
57
58impl Database {
59 pub async fn new(options: ConnectOptions, executor: Executor) -> Result<Self> {
60 Ok(Self {
61 options: options.clone(),
62 pool: sea_orm::Database::connect(options).await?,
63 rooms: DashMap::with_capacity(16384),
64 rng: Mutex::new(StdRng::seed_from_u64(0)),
65 executor,
66 #[cfg(test)]
67 runtime: None,
68 })
69 }
70
71 #[cfg(test)]
72 pub fn reset(&self) {
73 self.rooms.clear();
74 }
75
76 pub async fn migrate(
77 &self,
78 migrations_path: &Path,
79 ignore_checksum_mismatch: bool,
80 ) -> anyhow::Result<Vec<(Migration, Duration)>> {
81 let migrations = MigrationSource::resolve(migrations_path)
82 .await
83 .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
84
85 let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
86
87 connection.ensure_migrations_table().await?;
88 let applied_migrations: HashMap<_, _> = connection
89 .list_applied_migrations()
90 .await?
91 .into_iter()
92 .map(|m| (m.version, m))
93 .collect();
94
95 let mut new_migrations = Vec::new();
96 for migration in migrations {
97 match applied_migrations.get(&migration.version) {
98 Some(applied_migration) => {
99 if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
100 {
101 Err(anyhow!(
102 "checksum mismatch for applied migration {}",
103 migration.description
104 ))?;
105 }
106 }
107 None => {
108 let elapsed = connection.apply(&migration).await?;
109 new_migrations.push((migration, elapsed));
110 }
111 }
112 }
113
114 Ok(new_migrations)
115 }
116
117 pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
118 self.transaction(|tx| async move {
119 let server = server::ActiveModel {
120 environment: ActiveValue::set(environment.into()),
121 ..Default::default()
122 }
123 .insert(&*tx)
124 .await?;
125 Ok(server.id)
126 })
127 .await
128 }
129
130 pub async fn stale_room_ids(
131 &self,
132 environment: &str,
133 new_server_id: ServerId,
134 ) -> Result<Vec<RoomId>> {
135 self.transaction(|tx| async move {
136 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
137 enum QueryAs {
138 RoomId,
139 }
140
141 let stale_server_epochs = self
142 .stale_server_ids(environment, new_server_id, &tx)
143 .await?;
144 Ok(room_participant::Entity::find()
145 .select_only()
146 .column(room_participant::Column::RoomId)
147 .distinct()
148 .filter(
149 room_participant::Column::AnsweringConnectionServerId
150 .is_in(stale_server_epochs),
151 )
152 .into_values::<_, QueryAs>()
153 .all(&*tx)
154 .await?)
155 })
156 .await
157 }
158
159 pub async fn refresh_room(
160 &self,
161 room_id: RoomId,
162 new_server_id: ServerId,
163 ) -> Result<RoomGuard<RefreshedRoom>> {
164 self.room_transaction(room_id, |tx| async move {
165 let stale_participant_filter = Condition::all()
166 .add(room_participant::Column::RoomId.eq(room_id))
167 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
168 .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
169
170 let stale_participant_user_ids = room_participant::Entity::find()
171 .filter(stale_participant_filter.clone())
172 .all(&*tx)
173 .await?
174 .into_iter()
175 .map(|participant| participant.user_id)
176 .collect::<Vec<_>>();
177
178 // Delete participants who failed to reconnect and cancel their calls.
179 let mut canceled_calls_to_user_ids = Vec::new();
180 room_participant::Entity::delete_many()
181 .filter(stale_participant_filter)
182 .exec(&*tx)
183 .await?;
184 let called_participants = room_participant::Entity::find()
185 .filter(
186 Condition::all()
187 .add(
188 room_participant::Column::CallingUserId
189 .is_in(stale_participant_user_ids.iter().copied()),
190 )
191 .add(room_participant::Column::AnsweringConnectionId.is_null()),
192 )
193 .all(&*tx)
194 .await?;
195 room_participant::Entity::delete_many()
196 .filter(
197 room_participant::Column::Id
198 .is_in(called_participants.iter().map(|participant| participant.id)),
199 )
200 .exec(&*tx)
201 .await?;
202 canceled_calls_to_user_ids.extend(
203 called_participants
204 .into_iter()
205 .map(|participant| participant.user_id),
206 );
207
208 let room = self.get_room(room_id, &tx).await?;
209 // Delete the room if it becomes empty.
210 if room.participants.is_empty() {
211 project::Entity::delete_many()
212 .filter(project::Column::RoomId.eq(room_id))
213 .exec(&*tx)
214 .await?;
215 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
216 }
217
218 Ok(RefreshedRoom {
219 room,
220 stale_participant_user_ids,
221 canceled_calls_to_user_ids,
222 })
223 })
224 .await
225 }
226
227 pub async fn delete_stale_servers(
228 &self,
229 environment: &str,
230 new_server_id: ServerId,
231 ) -> Result<()> {
232 self.transaction(|tx| async move {
233 server::Entity::delete_many()
234 .filter(
235 Condition::all()
236 .add(server::Column::Environment.eq(environment))
237 .add(server::Column::Id.ne(new_server_id)),
238 )
239 .exec(&*tx)
240 .await?;
241 Ok(())
242 })
243 .await
244 }
245
246 async fn stale_server_ids(
247 &self,
248 environment: &str,
249 new_server_id: ServerId,
250 tx: &DatabaseTransaction,
251 ) -> Result<Vec<ServerId>> {
252 let stale_servers = server::Entity::find()
253 .filter(
254 Condition::all()
255 .add(server::Column::Environment.eq(environment))
256 .add(server::Column::Id.ne(new_server_id)),
257 )
258 .all(&*tx)
259 .await?;
260 Ok(stale_servers.into_iter().map(|server| server.id).collect())
261 }
262
263 // users
264
265 pub async fn create_user(
266 &self,
267 email_address: &str,
268 admin: bool,
269 params: NewUserParams,
270 ) -> Result<NewUserResult> {
271 self.transaction(|tx| async {
272 let tx = tx;
273 let user = user::Entity::insert(user::ActiveModel {
274 email_address: ActiveValue::set(Some(email_address.into())),
275 github_login: ActiveValue::set(params.github_login.clone()),
276 github_user_id: ActiveValue::set(Some(params.github_user_id)),
277 admin: ActiveValue::set(admin),
278 metrics_id: ActiveValue::set(Uuid::new_v4()),
279 ..Default::default()
280 })
281 .on_conflict(
282 OnConflict::column(user::Column::GithubLogin)
283 .update_column(user::Column::GithubLogin)
284 .to_owned(),
285 )
286 .exec_with_returning(&*tx)
287 .await?;
288
289 Ok(NewUserResult {
290 user_id: user.id,
291 metrics_id: user.metrics_id.to_string(),
292 signup_device_id: None,
293 inviting_user_id: None,
294 })
295 })
296 .await
297 }
298
299 pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
300 self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
301 .await
302 }
303
304 pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
305 self.transaction(|tx| async {
306 let tx = tx;
307 Ok(user::Entity::find()
308 .filter(user::Column::Id.is_in(ids.iter().copied()))
309 .all(&*tx)
310 .await?)
311 })
312 .await
313 }
314
315 pub async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>> {
316 self.transaction(|tx| async move {
317 Ok(user::Entity::find()
318 .filter(user::Column::GithubLogin.eq(github_login))
319 .one(&*tx)
320 .await?)
321 })
322 .await
323 }
324
325 pub async fn get_or_create_user_by_github_account(
326 &self,
327 github_login: &str,
328 github_user_id: Option<i32>,
329 github_email: Option<&str>,
330 ) -> Result<Option<User>> {
331 self.transaction(|tx| async move {
332 let tx = &*tx;
333 if let Some(github_user_id) = github_user_id {
334 if let Some(user_by_github_user_id) = user::Entity::find()
335 .filter(user::Column::GithubUserId.eq(github_user_id))
336 .one(tx)
337 .await?
338 {
339 let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
340 user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
341 Ok(Some(user_by_github_user_id.update(tx).await?))
342 } else if let Some(user_by_github_login) = user::Entity::find()
343 .filter(user::Column::GithubLogin.eq(github_login))
344 .one(tx)
345 .await?
346 {
347 let mut user_by_github_login = user_by_github_login.into_active_model();
348 user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
349 Ok(Some(user_by_github_login.update(tx).await?))
350 } else {
351 let user = user::Entity::insert(user::ActiveModel {
352 email_address: ActiveValue::set(github_email.map(|email| email.into())),
353 github_login: ActiveValue::set(github_login.into()),
354 github_user_id: ActiveValue::set(Some(github_user_id)),
355 admin: ActiveValue::set(false),
356 invite_count: ActiveValue::set(0),
357 invite_code: ActiveValue::set(None),
358 metrics_id: ActiveValue::set(Uuid::new_v4()),
359 ..Default::default()
360 })
361 .exec_with_returning(&*tx)
362 .await?;
363 Ok(Some(user))
364 }
365 } else {
366 Ok(user::Entity::find()
367 .filter(user::Column::GithubLogin.eq(github_login))
368 .one(tx)
369 .await?)
370 }
371 })
372 .await
373 }
374
375 pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
376 self.transaction(|tx| async move {
377 Ok(user::Entity::find()
378 .order_by_asc(user::Column::GithubLogin)
379 .limit(limit as u64)
380 .offset(page as u64 * limit as u64)
381 .all(&*tx)
382 .await?)
383 })
384 .await
385 }
386
387 pub async fn get_users_with_no_invites(
388 &self,
389 invited_by_another_user: bool,
390 ) -> Result<Vec<User>> {
391 self.transaction(|tx| async move {
392 Ok(user::Entity::find()
393 .filter(
394 user::Column::InviteCount
395 .eq(0)
396 .and(if invited_by_another_user {
397 user::Column::InviterId.is_not_null()
398 } else {
399 user::Column::InviterId.is_null()
400 }),
401 )
402 .all(&*tx)
403 .await?)
404 })
405 .await
406 }
407
408 pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
409 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
410 enum QueryAs {
411 MetricsId,
412 }
413
414 self.transaction(|tx| async move {
415 let metrics_id: Uuid = user::Entity::find_by_id(id)
416 .select_only()
417 .column(user::Column::MetricsId)
418 .into_values::<_, QueryAs>()
419 .one(&*tx)
420 .await?
421 .ok_or_else(|| anyhow!("could not find user"))?;
422 Ok(metrics_id.to_string())
423 })
424 .await
425 }
426
427 pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
428 self.transaction(|tx| async move {
429 user::Entity::update_many()
430 .filter(user::Column::Id.eq(id))
431 .set(user::ActiveModel {
432 admin: ActiveValue::set(is_admin),
433 ..Default::default()
434 })
435 .exec(&*tx)
436 .await?;
437 Ok(())
438 })
439 .await
440 }
441
442 pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
443 self.transaction(|tx| async move {
444 user::Entity::update_many()
445 .filter(user::Column::Id.eq(id))
446 .set(user::ActiveModel {
447 connected_once: ActiveValue::set(connected_once),
448 ..Default::default()
449 })
450 .exec(&*tx)
451 .await?;
452 Ok(())
453 })
454 .await
455 }
456
457 pub async fn destroy_user(&self, id: UserId) -> Result<()> {
458 self.transaction(|tx| async move {
459 access_token::Entity::delete_many()
460 .filter(access_token::Column::UserId.eq(id))
461 .exec(&*tx)
462 .await?;
463 user::Entity::delete_by_id(id).exec(&*tx).await?;
464 Ok(())
465 })
466 .await
467 }
468
469 // contacts
470
471 pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
472 #[derive(Debug, FromQueryResult)]
473 struct ContactWithUserBusyStatuses {
474 user_id_a: UserId,
475 user_id_b: UserId,
476 a_to_b: bool,
477 accepted: bool,
478 should_notify: bool,
479 user_a_busy: bool,
480 user_b_busy: bool,
481 }
482
483 self.transaction(|tx| async move {
484 let user_a_participant = Alias::new("user_a_participant");
485 let user_b_participant = Alias::new("user_b_participant");
486 let mut db_contacts = contact::Entity::find()
487 .column_as(
488 Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
489 .is_not_null(),
490 "user_a_busy",
491 )
492 .column_as(
493 Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
494 .is_not_null(),
495 "user_b_busy",
496 )
497 .filter(
498 contact::Column::UserIdA
499 .eq(user_id)
500 .or(contact::Column::UserIdB.eq(user_id)),
501 )
502 .join_as(
503 JoinType::LeftJoin,
504 contact::Relation::UserARoomParticipant.def(),
505 user_a_participant,
506 )
507 .join_as(
508 JoinType::LeftJoin,
509 contact::Relation::UserBRoomParticipant.def(),
510 user_b_participant,
511 )
512 .into_model::<ContactWithUserBusyStatuses>()
513 .stream(&*tx)
514 .await?;
515
516 let mut contacts = Vec::new();
517 while let Some(db_contact) = db_contacts.next().await {
518 let db_contact = db_contact?;
519 if db_contact.user_id_a == user_id {
520 if db_contact.accepted {
521 contacts.push(Contact::Accepted {
522 user_id: db_contact.user_id_b,
523 should_notify: db_contact.should_notify && db_contact.a_to_b,
524 busy: db_contact.user_b_busy,
525 });
526 } else if db_contact.a_to_b {
527 contacts.push(Contact::Outgoing {
528 user_id: db_contact.user_id_b,
529 })
530 } else {
531 contacts.push(Contact::Incoming {
532 user_id: db_contact.user_id_b,
533 should_notify: db_contact.should_notify,
534 });
535 }
536 } else if db_contact.accepted {
537 contacts.push(Contact::Accepted {
538 user_id: db_contact.user_id_a,
539 should_notify: db_contact.should_notify && !db_contact.a_to_b,
540 busy: db_contact.user_a_busy,
541 });
542 } else if db_contact.a_to_b {
543 contacts.push(Contact::Incoming {
544 user_id: db_contact.user_id_a,
545 should_notify: db_contact.should_notify,
546 });
547 } else {
548 contacts.push(Contact::Outgoing {
549 user_id: db_contact.user_id_a,
550 });
551 }
552 }
553
554 contacts.sort_unstable_by_key(|contact| contact.user_id());
555
556 Ok(contacts)
557 })
558 .await
559 }
560
561 pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
562 self.transaction(|tx| async move {
563 let participant = room_participant::Entity::find()
564 .filter(room_participant::Column::UserId.eq(user_id))
565 .one(&*tx)
566 .await?;
567 Ok(participant.is_some())
568 })
569 .await
570 }
571
572 pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
573 self.transaction(|tx| async move {
574 let (id_a, id_b) = if user_id_1 < user_id_2 {
575 (user_id_1, user_id_2)
576 } else {
577 (user_id_2, user_id_1)
578 };
579
580 Ok(contact::Entity::find()
581 .filter(
582 contact::Column::UserIdA
583 .eq(id_a)
584 .and(contact::Column::UserIdB.eq(id_b))
585 .and(contact::Column::Accepted.eq(true)),
586 )
587 .one(&*tx)
588 .await?
589 .is_some())
590 })
591 .await
592 }
593
594 pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
595 self.transaction(|tx| async move {
596 let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
597 (sender_id, receiver_id, true)
598 } else {
599 (receiver_id, sender_id, false)
600 };
601
602 let rows_affected = contact::Entity::insert(contact::ActiveModel {
603 user_id_a: ActiveValue::set(id_a),
604 user_id_b: ActiveValue::set(id_b),
605 a_to_b: ActiveValue::set(a_to_b),
606 accepted: ActiveValue::set(false),
607 should_notify: ActiveValue::set(true),
608 ..Default::default()
609 })
610 .on_conflict(
611 OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
612 .values([
613 (contact::Column::Accepted, true.into()),
614 (contact::Column::ShouldNotify, false.into()),
615 ])
616 .action_and_where(
617 contact::Column::Accepted.eq(false).and(
618 contact::Column::AToB
619 .eq(a_to_b)
620 .and(contact::Column::UserIdA.eq(id_b))
621 .or(contact::Column::AToB
622 .ne(a_to_b)
623 .and(contact::Column::UserIdA.eq(id_a))),
624 ),
625 )
626 .to_owned(),
627 )
628 .exec_without_returning(&*tx)
629 .await?;
630
631 if rows_affected == 1 {
632 Ok(())
633 } else {
634 Err(anyhow!("contact already requested"))?
635 }
636 })
637 .await
638 }
639
640 /// Returns a bool indicating whether the removed contact had originally accepted or not
641 ///
642 /// Deletes the contact identified by the requester and responder ids, and then returns
643 /// whether the deleted contact had originally accepted or was a pending contact request.
644 ///
645 /// # Arguments
646 ///
647 /// * `requester_id` - The user that initiates this request
648 /// * `responder_id` - The user that will be removed
649 pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<bool> {
650 self.transaction(|tx| async move {
651 let (id_a, id_b) = if responder_id < requester_id {
652 (responder_id, requester_id)
653 } else {
654 (requester_id, responder_id)
655 };
656
657 let contact = contact::Entity::find()
658 .filter(
659 contact::Column::UserIdA
660 .eq(id_a)
661 .and(contact::Column::UserIdB.eq(id_b)),
662 )
663 .one(&*tx)
664 .await?
665 .ok_or_else(|| anyhow!("no such contact"))?;
666
667 contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
668 Ok(contact.accepted)
669 })
670 .await
671 }
672
673 pub async fn dismiss_contact_notification(
674 &self,
675 user_id: UserId,
676 contact_user_id: UserId,
677 ) -> Result<()> {
678 self.transaction(|tx| async move {
679 let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
680 (user_id, contact_user_id, true)
681 } else {
682 (contact_user_id, user_id, false)
683 };
684
685 let result = contact::Entity::update_many()
686 .set(contact::ActiveModel {
687 should_notify: ActiveValue::set(false),
688 ..Default::default()
689 })
690 .filter(
691 contact::Column::UserIdA
692 .eq(id_a)
693 .and(contact::Column::UserIdB.eq(id_b))
694 .and(
695 contact::Column::AToB
696 .eq(a_to_b)
697 .and(contact::Column::Accepted.eq(true))
698 .or(contact::Column::AToB
699 .ne(a_to_b)
700 .and(contact::Column::Accepted.eq(false))),
701 ),
702 )
703 .exec(&*tx)
704 .await?;
705 if result.rows_affected == 0 {
706 Err(anyhow!("no such contact request"))?
707 } else {
708 Ok(())
709 }
710 })
711 .await
712 }
713
714 pub async fn respond_to_contact_request(
715 &self,
716 responder_id: UserId,
717 requester_id: UserId,
718 accept: bool,
719 ) -> Result<()> {
720 self.transaction(|tx| async move {
721 let (id_a, id_b, a_to_b) = if responder_id < requester_id {
722 (responder_id, requester_id, false)
723 } else {
724 (requester_id, responder_id, true)
725 };
726 let rows_affected = if accept {
727 let result = contact::Entity::update_many()
728 .set(contact::ActiveModel {
729 accepted: ActiveValue::set(true),
730 should_notify: ActiveValue::set(true),
731 ..Default::default()
732 })
733 .filter(
734 contact::Column::UserIdA
735 .eq(id_a)
736 .and(contact::Column::UserIdB.eq(id_b))
737 .and(contact::Column::AToB.eq(a_to_b)),
738 )
739 .exec(&*tx)
740 .await?;
741 result.rows_affected
742 } else {
743 let result = contact::Entity::delete_many()
744 .filter(
745 contact::Column::UserIdA
746 .eq(id_a)
747 .and(contact::Column::UserIdB.eq(id_b))
748 .and(contact::Column::AToB.eq(a_to_b))
749 .and(contact::Column::Accepted.eq(false)),
750 )
751 .exec(&*tx)
752 .await?;
753
754 result.rows_affected
755 };
756
757 if rows_affected == 1 {
758 Ok(())
759 } else {
760 Err(anyhow!("no such contact request"))?
761 }
762 })
763 .await
764 }
765
766 pub fn fuzzy_like_string(string: &str) -> String {
767 let mut result = String::with_capacity(string.len() * 2 + 1);
768 for c in string.chars() {
769 if c.is_alphanumeric() {
770 result.push('%');
771 result.push(c);
772 }
773 }
774 result.push('%');
775 result
776 }
777
778 pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
779 self.transaction(|tx| async {
780 let tx = tx;
781 let like_string = Self::fuzzy_like_string(name_query);
782 let query = "
783 SELECT users.*
784 FROM users
785 WHERE github_login ILIKE $1
786 ORDER BY github_login <-> $2
787 LIMIT $3
788 ";
789
790 Ok(user::Entity::find()
791 .from_raw_sql(Statement::from_sql_and_values(
792 self.pool.get_database_backend(),
793 query.into(),
794 vec![like_string.into(), name_query.into(), limit.into()],
795 ))
796 .all(&*tx)
797 .await?)
798 })
799 .await
800 }
801
802 // signups
803
804 pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
805 self.transaction(|tx| async move {
806 signup::Entity::insert(signup::ActiveModel {
807 email_address: ActiveValue::set(signup.email_address.clone()),
808 email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
809 email_confirmation_sent: ActiveValue::set(false),
810 platform_mac: ActiveValue::set(signup.platform_mac),
811 platform_windows: ActiveValue::set(signup.platform_windows),
812 platform_linux: ActiveValue::set(signup.platform_linux),
813 platform_unknown: ActiveValue::set(false),
814 editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
815 programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
816 device_id: ActiveValue::set(signup.device_id.clone()),
817 added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
818 ..Default::default()
819 })
820 .on_conflict(
821 OnConflict::column(signup::Column::EmailAddress)
822 .update_columns([
823 signup::Column::PlatformMac,
824 signup::Column::PlatformWindows,
825 signup::Column::PlatformLinux,
826 signup::Column::EditorFeatures,
827 signup::Column::ProgrammingLanguages,
828 signup::Column::DeviceId,
829 signup::Column::AddedToMailingList,
830 ])
831 .to_owned(),
832 )
833 .exec(&*tx)
834 .await?;
835 Ok(())
836 })
837 .await
838 }
839
840 pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
841 self.transaction(|tx| async move {
842 let signup = signup::Entity::find()
843 .filter(signup::Column::EmailAddress.eq(email_address))
844 .one(&*tx)
845 .await?
846 .ok_or_else(|| {
847 anyhow!("signup with email address {} doesn't exist", email_address)
848 })?;
849
850 Ok(signup)
851 })
852 .await
853 }
854
855 pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
856 self.transaction(|tx| async move {
857 let query = "
858 SELECT
859 COUNT(*) as count,
860 COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
861 COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
862 COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
863 COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
864 FROM (
865 SELECT *
866 FROM signups
867 WHERE
868 NOT email_confirmation_sent
869 ) AS unsent
870 ";
871 Ok(
872 WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
873 self.pool.get_database_backend(),
874 query.into(),
875 vec![],
876 ))
877 .one(&*tx)
878 .await?
879 .ok_or_else(|| anyhow!("invalid result"))?,
880 )
881 })
882 .await
883 }
884
885 pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
886 let emails = invites
887 .iter()
888 .map(|s| s.email_address.as_str())
889 .collect::<Vec<_>>();
890 self.transaction(|tx| async {
891 let tx = tx;
892 signup::Entity::update_many()
893 .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
894 .set(signup::ActiveModel {
895 email_confirmation_sent: ActiveValue::set(true),
896 ..Default::default()
897 })
898 .exec(&*tx)
899 .await?;
900 Ok(())
901 })
902 .await
903 }
904
905 pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
906 self.transaction(|tx| async move {
907 Ok(signup::Entity::find()
908 .select_only()
909 .column(signup::Column::EmailAddress)
910 .column(signup::Column::EmailConfirmationCode)
911 .filter(
912 signup::Column::EmailConfirmationSent.eq(false).and(
913 signup::Column::PlatformMac
914 .eq(true)
915 .or(signup::Column::PlatformUnknown.eq(true)),
916 ),
917 )
918 .order_by_asc(signup::Column::CreatedAt)
919 .limit(count as u64)
920 .into_model()
921 .all(&*tx)
922 .await?)
923 })
924 .await
925 }
926
927 // invite codes
928
929 pub async fn create_invite_from_code(
930 &self,
931 code: &str,
932 email_address: &str,
933 device_id: Option<&str>,
934 added_to_mailing_list: bool,
935 ) -> Result<Invite> {
936 self.transaction(|tx| async move {
937 let existing_user = user::Entity::find()
938 .filter(user::Column::EmailAddress.eq(email_address))
939 .one(&*tx)
940 .await?;
941
942 if existing_user.is_some() {
943 Err(anyhow!("email address is already in use"))?;
944 }
945
946 let inviting_user_with_invites = match user::Entity::find()
947 .filter(
948 user::Column::InviteCode
949 .eq(code)
950 .and(user::Column::InviteCount.gt(0)),
951 )
952 .one(&*tx)
953 .await?
954 {
955 Some(inviting_user) => inviting_user,
956 None => {
957 return Err(Error::Http(
958 StatusCode::UNAUTHORIZED,
959 "unable to find an invite code with invites remaining".to_string(),
960 ))?
961 }
962 };
963 user::Entity::update_many()
964 .filter(
965 user::Column::Id
966 .eq(inviting_user_with_invites.id)
967 .and(user::Column::InviteCount.gt(0)),
968 )
969 .col_expr(
970 user::Column::InviteCount,
971 Expr::col(user::Column::InviteCount).sub(1),
972 )
973 .exec(&*tx)
974 .await?;
975
976 let signup = signup::Entity::insert(signup::ActiveModel {
977 email_address: ActiveValue::set(email_address.into()),
978 email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
979 email_confirmation_sent: ActiveValue::set(false),
980 inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
981 platform_linux: ActiveValue::set(false),
982 platform_mac: ActiveValue::set(false),
983 platform_windows: ActiveValue::set(false),
984 platform_unknown: ActiveValue::set(true),
985 device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
986 added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
987 ..Default::default()
988 })
989 .on_conflict(
990 OnConflict::column(signup::Column::EmailAddress)
991 .update_column(signup::Column::InvitingUserId)
992 .to_owned(),
993 )
994 .exec_with_returning(&*tx)
995 .await?;
996
997 Ok(Invite {
998 email_address: signup.email_address,
999 email_confirmation_code: signup.email_confirmation_code,
1000 })
1001 })
1002 .await
1003 }
1004
1005 pub async fn create_user_from_invite(
1006 &self,
1007 invite: &Invite,
1008 user: NewUserParams,
1009 ) -> Result<Option<NewUserResult>> {
1010 self.transaction(|tx| async {
1011 let tx = tx;
1012 let signup = signup::Entity::find()
1013 .filter(
1014 signup::Column::EmailAddress
1015 .eq(invite.email_address.as_str())
1016 .and(
1017 signup::Column::EmailConfirmationCode
1018 .eq(invite.email_confirmation_code.as_str()),
1019 ),
1020 )
1021 .one(&*tx)
1022 .await?
1023 .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
1024
1025 if signup.user_id.is_some() {
1026 return Ok(None);
1027 }
1028
1029 let user = user::Entity::insert(user::ActiveModel {
1030 email_address: ActiveValue::set(Some(invite.email_address.clone())),
1031 github_login: ActiveValue::set(user.github_login.clone()),
1032 github_user_id: ActiveValue::set(Some(user.github_user_id)),
1033 admin: ActiveValue::set(false),
1034 invite_count: ActiveValue::set(user.invite_count),
1035 invite_code: ActiveValue::set(Some(random_invite_code())),
1036 metrics_id: ActiveValue::set(Uuid::new_v4()),
1037 ..Default::default()
1038 })
1039 .on_conflict(
1040 OnConflict::column(user::Column::GithubLogin)
1041 .update_columns([
1042 user::Column::EmailAddress,
1043 user::Column::GithubUserId,
1044 user::Column::Admin,
1045 ])
1046 .to_owned(),
1047 )
1048 .exec_with_returning(&*tx)
1049 .await?;
1050
1051 let mut signup = signup.into_active_model();
1052 signup.user_id = ActiveValue::set(Some(user.id));
1053 let signup = signup.update(&*tx).await?;
1054
1055 if let Some(inviting_user_id) = signup.inviting_user_id {
1056 let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1057 (inviting_user_id, user.id, true)
1058 } else {
1059 (user.id, inviting_user_id, false)
1060 };
1061
1062 contact::Entity::insert(contact::ActiveModel {
1063 user_id_a: ActiveValue::set(user_id_a),
1064 user_id_b: ActiveValue::set(user_id_b),
1065 a_to_b: ActiveValue::set(a_to_b),
1066 should_notify: ActiveValue::set(true),
1067 accepted: ActiveValue::set(true),
1068 ..Default::default()
1069 })
1070 .on_conflict(OnConflict::new().do_nothing().to_owned())
1071 .exec_without_returning(&*tx)
1072 .await?;
1073 }
1074
1075 Ok(Some(NewUserResult {
1076 user_id: user.id,
1077 metrics_id: user.metrics_id.to_string(),
1078 inviting_user_id: signup.inviting_user_id,
1079 signup_device_id: signup.device_id,
1080 }))
1081 })
1082 .await
1083 }
1084
1085 pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1086 self.transaction(|tx| async move {
1087 if count > 0 {
1088 user::Entity::update_many()
1089 .filter(
1090 user::Column::Id
1091 .eq(id)
1092 .and(user::Column::InviteCode.is_null()),
1093 )
1094 .set(user::ActiveModel {
1095 invite_code: ActiveValue::set(Some(random_invite_code())),
1096 ..Default::default()
1097 })
1098 .exec(&*tx)
1099 .await?;
1100 }
1101
1102 user::Entity::update_many()
1103 .filter(user::Column::Id.eq(id))
1104 .set(user::ActiveModel {
1105 invite_count: ActiveValue::set(count),
1106 ..Default::default()
1107 })
1108 .exec(&*tx)
1109 .await?;
1110 Ok(())
1111 })
1112 .await
1113 }
1114
1115 pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1116 self.transaction(|tx| async move {
1117 match user::Entity::find_by_id(id).one(&*tx).await? {
1118 Some(user) if user.invite_code.is_some() => {
1119 Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1120 }
1121 _ => Ok(None),
1122 }
1123 })
1124 .await
1125 }
1126
1127 pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1128 self.transaction(|tx| async move {
1129 user::Entity::find()
1130 .filter(user::Column::InviteCode.eq(code))
1131 .one(&*tx)
1132 .await?
1133 .ok_or_else(|| {
1134 Error::Http(
1135 StatusCode::NOT_FOUND,
1136 "that invite code does not exist".to_string(),
1137 )
1138 })
1139 })
1140 .await
1141 }
1142
1143 // rooms
1144
1145 pub async fn incoming_call_for_user(
1146 &self,
1147 user_id: UserId,
1148 ) -> Result<Option<proto::IncomingCall>> {
1149 self.transaction(|tx| async move {
1150 let pending_participant = room_participant::Entity::find()
1151 .filter(
1152 room_participant::Column::UserId
1153 .eq(user_id)
1154 .and(room_participant::Column::AnsweringConnectionId.is_null()),
1155 )
1156 .one(&*tx)
1157 .await?;
1158
1159 if let Some(pending_participant) = pending_participant {
1160 let room = self.get_room(pending_participant.room_id, &tx).await?;
1161 Ok(Self::build_incoming_call(&room, user_id))
1162 } else {
1163 Ok(None)
1164 }
1165 })
1166 .await
1167 }
1168
1169 pub async fn create_room(
1170 &self,
1171 user_id: UserId,
1172 connection: ConnectionId,
1173 live_kit_room: &str,
1174 ) -> Result<proto::Room> {
1175 self.transaction(|tx| async move {
1176 let room = room::ActiveModel {
1177 live_kit_room: ActiveValue::set(live_kit_room.into()),
1178 ..Default::default()
1179 }
1180 .insert(&*tx)
1181 .await?;
1182 room_participant::ActiveModel {
1183 room_id: ActiveValue::set(room.id),
1184 user_id: ActiveValue::set(user_id),
1185 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1186 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1187 connection.owner_id as i32,
1188 ))),
1189 answering_connection_lost: ActiveValue::set(false),
1190 calling_user_id: ActiveValue::set(user_id),
1191 calling_connection_id: ActiveValue::set(connection.id as i32),
1192 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1193 connection.owner_id as i32,
1194 ))),
1195 ..Default::default()
1196 }
1197 .insert(&*tx)
1198 .await?;
1199
1200 let room = self.get_room(room.id, &tx).await?;
1201 Ok(room)
1202 })
1203 .await
1204 }
1205
1206 pub async fn call(
1207 &self,
1208 room_id: RoomId,
1209 calling_user_id: UserId,
1210 calling_connection: ConnectionId,
1211 called_user_id: UserId,
1212 initial_project_id: Option<ProjectId>,
1213 ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1214 self.room_transaction(room_id, |tx| async move {
1215 room_participant::ActiveModel {
1216 room_id: ActiveValue::set(room_id),
1217 user_id: ActiveValue::set(called_user_id),
1218 answering_connection_lost: ActiveValue::set(false),
1219 calling_user_id: ActiveValue::set(calling_user_id),
1220 calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1221 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1222 calling_connection.owner_id as i32,
1223 ))),
1224 initial_project_id: ActiveValue::set(initial_project_id),
1225 ..Default::default()
1226 }
1227 .insert(&*tx)
1228 .await?;
1229
1230 let room = self.get_room(room_id, &tx).await?;
1231 let incoming_call = Self::build_incoming_call(&room, called_user_id)
1232 .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1233 Ok((room, incoming_call))
1234 })
1235 .await
1236 }
1237
1238 pub async fn call_failed(
1239 &self,
1240 room_id: RoomId,
1241 called_user_id: UserId,
1242 ) -> Result<RoomGuard<proto::Room>> {
1243 self.room_transaction(room_id, |tx| async move {
1244 room_participant::Entity::delete_many()
1245 .filter(
1246 room_participant::Column::RoomId
1247 .eq(room_id)
1248 .and(room_participant::Column::UserId.eq(called_user_id)),
1249 )
1250 .exec(&*tx)
1251 .await?;
1252 let room = self.get_room(room_id, &tx).await?;
1253 Ok(room)
1254 })
1255 .await
1256 }
1257
1258 pub async fn decline_call(
1259 &self,
1260 expected_room_id: Option<RoomId>,
1261 user_id: UserId,
1262 ) -> Result<Option<RoomGuard<proto::Room>>> {
1263 self.optional_room_transaction(|tx| async move {
1264 let mut filter = Condition::all()
1265 .add(room_participant::Column::UserId.eq(user_id))
1266 .add(room_participant::Column::AnsweringConnectionId.is_null());
1267 if let Some(room_id) = expected_room_id {
1268 filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1269 }
1270 let participant = room_participant::Entity::find()
1271 .filter(filter)
1272 .one(&*tx)
1273 .await?;
1274
1275 let participant = if let Some(participant) = participant {
1276 participant
1277 } else if expected_room_id.is_some() {
1278 return Err(anyhow!("could not find call to decline"))?;
1279 } else {
1280 return Ok(None);
1281 };
1282
1283 let room_id = participant.room_id;
1284 room_participant::Entity::delete(participant.into_active_model())
1285 .exec(&*tx)
1286 .await?;
1287
1288 let room = self.get_room(room_id, &tx).await?;
1289 Ok(Some((room_id, room)))
1290 })
1291 .await
1292 }
1293
1294 pub async fn cancel_call(
1295 &self,
1296 room_id: RoomId,
1297 calling_connection: ConnectionId,
1298 called_user_id: UserId,
1299 ) -> Result<RoomGuard<proto::Room>> {
1300 self.room_transaction(room_id, |tx| async move {
1301 let participant = room_participant::Entity::find()
1302 .filter(
1303 Condition::all()
1304 .add(room_participant::Column::UserId.eq(called_user_id))
1305 .add(room_participant::Column::RoomId.eq(room_id))
1306 .add(
1307 room_participant::Column::CallingConnectionId
1308 .eq(calling_connection.id as i32),
1309 )
1310 .add(
1311 room_participant::Column::CallingConnectionServerId
1312 .eq(calling_connection.owner_id as i32),
1313 )
1314 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1315 )
1316 .one(&*tx)
1317 .await?
1318 .ok_or_else(|| anyhow!("no call to cancel"))?;
1319
1320 room_participant::Entity::delete(participant.into_active_model())
1321 .exec(&*tx)
1322 .await?;
1323
1324 let room = self.get_room(room_id, &tx).await?;
1325 Ok(room)
1326 })
1327 .await
1328 }
1329
1330 pub async fn join_room(
1331 &self,
1332 room_id: RoomId,
1333 user_id: UserId,
1334 connection: ConnectionId,
1335 ) -> Result<RoomGuard<proto::Room>> {
1336 self.room_transaction(room_id, |tx| async move {
1337 let result = room_participant::Entity::update_many()
1338 .filter(
1339 Condition::all()
1340 .add(room_participant::Column::RoomId.eq(room_id))
1341 .add(room_participant::Column::UserId.eq(user_id))
1342 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1343 )
1344 .set(room_participant::ActiveModel {
1345 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1346 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1347 connection.owner_id as i32,
1348 ))),
1349 answering_connection_lost: ActiveValue::set(false),
1350 ..Default::default()
1351 })
1352 .exec(&*tx)
1353 .await?;
1354 if result.rows_affected == 0 {
1355 Err(anyhow!("room does not exist or was already joined"))?
1356 } else {
1357 let room = self.get_room(room_id, &tx).await?;
1358 Ok(room)
1359 }
1360 })
1361 .await
1362 }
1363
1364 pub async fn rejoin_room(
1365 &self,
1366 rejoin_room: proto::RejoinRoom,
1367 user_id: UserId,
1368 connection: ConnectionId,
1369 ) -> Result<RoomGuard<RejoinedRoom>> {
1370 let room_id = RoomId::from_proto(rejoin_room.id);
1371 self.room_transaction(room_id, |tx| async {
1372 let tx = tx;
1373 let participant_update = room_participant::Entity::update_many()
1374 .filter(
1375 Condition::all()
1376 .add(room_participant::Column::RoomId.eq(room_id))
1377 .add(room_participant::Column::UserId.eq(user_id))
1378 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1379 .add(
1380 Condition::any()
1381 .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1382 .add(
1383 room_participant::Column::AnsweringConnectionServerId
1384 .ne(connection.owner_id as i32),
1385 ),
1386 ),
1387 )
1388 .set(room_participant::ActiveModel {
1389 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1390 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1391 connection.owner_id as i32,
1392 ))),
1393 answering_connection_lost: ActiveValue::set(false),
1394 ..Default::default()
1395 })
1396 .exec(&*tx)
1397 .await?;
1398 if participant_update.rows_affected == 0 {
1399 return Err(anyhow!("room does not exist or was already joined"))?;
1400 }
1401
1402 let mut reshared_projects = Vec::new();
1403 for reshared_project in &rejoin_room.reshared_projects {
1404 let project_id = ProjectId::from_proto(reshared_project.project_id);
1405 let project = project::Entity::find_by_id(project_id)
1406 .one(&*tx)
1407 .await?
1408 .ok_or_else(|| anyhow!("project does not exist"))?;
1409 if project.host_user_id != user_id {
1410 return Err(anyhow!("no such project"))?;
1411 }
1412
1413 let mut collaborators = project
1414 .find_related(project_collaborator::Entity)
1415 .all(&*tx)
1416 .await?;
1417 let host_ix = collaborators
1418 .iter()
1419 .position(|collaborator| {
1420 collaborator.user_id == user_id && collaborator.is_host
1421 })
1422 .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1423 let host = collaborators.swap_remove(host_ix);
1424 let old_connection_id = host.connection();
1425
1426 project::Entity::update(project::ActiveModel {
1427 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1428 host_connection_server_id: ActiveValue::set(Some(ServerId(
1429 connection.owner_id as i32,
1430 ))),
1431 ..project.into_active_model()
1432 })
1433 .exec(&*tx)
1434 .await?;
1435 project_collaborator::Entity::update(project_collaborator::ActiveModel {
1436 connection_id: ActiveValue::set(connection.id as i32),
1437 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1438 ..host.into_active_model()
1439 })
1440 .exec(&*tx)
1441 .await?;
1442
1443 self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1444 .await?;
1445
1446 reshared_projects.push(ResharedProject {
1447 id: project_id,
1448 old_connection_id,
1449 collaborators: collaborators
1450 .iter()
1451 .map(|collaborator| ProjectCollaborator {
1452 connection_id: collaborator.connection(),
1453 user_id: collaborator.user_id,
1454 replica_id: collaborator.replica_id,
1455 is_host: collaborator.is_host,
1456 })
1457 .collect(),
1458 worktrees: reshared_project.worktrees.clone(),
1459 });
1460 }
1461
1462 project::Entity::delete_many()
1463 .filter(
1464 Condition::all()
1465 .add(project::Column::RoomId.eq(room_id))
1466 .add(project::Column::HostUserId.eq(user_id))
1467 .add(
1468 project::Column::Id
1469 .is_not_in(reshared_projects.iter().map(|project| project.id)),
1470 ),
1471 )
1472 .exec(&*tx)
1473 .await?;
1474
1475 let mut rejoined_projects = Vec::new();
1476 for rejoined_project in &rejoin_room.rejoined_projects {
1477 let project_id = ProjectId::from_proto(rejoined_project.id);
1478 let Some(project) = project::Entity::find_by_id(project_id)
1479 .one(&*tx)
1480 .await? else { continue };
1481
1482 let mut worktrees = Vec::new();
1483 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1484 for db_worktree in db_worktrees {
1485 let mut worktree = RejoinedWorktree {
1486 id: db_worktree.id as u64,
1487 abs_path: db_worktree.abs_path,
1488 root_name: db_worktree.root_name,
1489 visible: db_worktree.visible,
1490 updated_entries: Default::default(),
1491 removed_entries: Default::default(),
1492 diagnostic_summaries: Default::default(),
1493 scan_id: db_worktree.scan_id as u64,
1494 completed_scan_id: db_worktree.completed_scan_id as u64,
1495 };
1496
1497 let rejoined_worktree = rejoined_project
1498 .worktrees
1499 .iter()
1500 .find(|worktree| worktree.id == db_worktree.id as u64);
1501 let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1502 worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1503 } else {
1504 worktree_entry::Column::IsDeleted.eq(false)
1505 };
1506
1507 let mut db_entries = worktree_entry::Entity::find()
1508 .filter(
1509 Condition::all()
1510 .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1511 .add(entry_filter),
1512 )
1513 .stream(&*tx)
1514 .await?;
1515
1516 while let Some(db_entry) = db_entries.next().await {
1517 let db_entry = db_entry?;
1518 if db_entry.is_deleted {
1519 worktree.removed_entries.push(db_entry.id as u64);
1520 } else {
1521 worktree.updated_entries.push(proto::Entry {
1522 id: db_entry.id as u64,
1523 is_dir: db_entry.is_dir,
1524 path: db_entry.path,
1525 inode: db_entry.inode as u64,
1526 mtime: Some(proto::Timestamp {
1527 seconds: db_entry.mtime_seconds as u64,
1528 nanos: db_entry.mtime_nanos as u32,
1529 }),
1530 is_symlink: db_entry.is_symlink,
1531 is_ignored: db_entry.is_ignored,
1532 });
1533 }
1534 }
1535
1536 worktrees.push(worktree);
1537 }
1538
1539 let language_servers = project
1540 .find_related(language_server::Entity)
1541 .all(&*tx)
1542 .await?
1543 .into_iter()
1544 .map(|language_server| proto::LanguageServer {
1545 id: language_server.id as u64,
1546 name: language_server.name,
1547 })
1548 .collect::<Vec<_>>();
1549
1550 let mut collaborators = project
1551 .find_related(project_collaborator::Entity)
1552 .all(&*tx)
1553 .await?;
1554 let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1555 .iter()
1556 .position(|collaborator| collaborator.user_id == user_id)
1557 {
1558 collaborators.swap_remove(self_collaborator_ix)
1559 } else {
1560 continue;
1561 };
1562 let old_connection_id = self_collaborator.connection();
1563 project_collaborator::Entity::update(project_collaborator::ActiveModel {
1564 connection_id: ActiveValue::set(connection.id as i32),
1565 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1566 ..self_collaborator.into_active_model()
1567 })
1568 .exec(&*tx)
1569 .await?;
1570
1571 let collaborators = collaborators
1572 .into_iter()
1573 .map(|collaborator| ProjectCollaborator {
1574 connection_id: collaborator.connection(),
1575 user_id: collaborator.user_id,
1576 replica_id: collaborator.replica_id,
1577 is_host: collaborator.is_host,
1578 })
1579 .collect::<Vec<_>>();
1580
1581 rejoined_projects.push(RejoinedProject {
1582 id: project_id,
1583 old_connection_id,
1584 collaborators,
1585 worktrees,
1586 language_servers,
1587 });
1588 }
1589
1590 let room = self.get_room(room_id, &tx).await?;
1591 Ok(RejoinedRoom {
1592 room,
1593 rejoined_projects,
1594 reshared_projects,
1595 })
1596 })
1597 .await
1598 }
1599
1600 pub async fn leave_room(
1601 &self,
1602 connection: ConnectionId,
1603 ) -> Result<Option<RoomGuard<LeftRoom>>> {
1604 self.optional_room_transaction(|tx| async move {
1605 let leaving_participant = room_participant::Entity::find()
1606 .filter(
1607 Condition::all()
1608 .add(
1609 room_participant::Column::AnsweringConnectionId
1610 .eq(connection.id as i32),
1611 )
1612 .add(
1613 room_participant::Column::AnsweringConnectionServerId
1614 .eq(connection.owner_id as i32),
1615 ),
1616 )
1617 .one(&*tx)
1618 .await?;
1619
1620 if let Some(leaving_participant) = leaving_participant {
1621 // Leave room.
1622 let room_id = leaving_participant.room_id;
1623 room_participant::Entity::delete_by_id(leaving_participant.id)
1624 .exec(&*tx)
1625 .await?;
1626
1627 // Cancel pending calls initiated by the leaving user.
1628 let called_participants = room_participant::Entity::find()
1629 .filter(
1630 Condition::all()
1631 .add(
1632 room_participant::Column::CallingUserId
1633 .eq(leaving_participant.user_id),
1634 )
1635 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1636 )
1637 .all(&*tx)
1638 .await?;
1639 room_participant::Entity::delete_many()
1640 .filter(
1641 room_participant::Column::Id
1642 .is_in(called_participants.iter().map(|participant| participant.id)),
1643 )
1644 .exec(&*tx)
1645 .await?;
1646 let canceled_calls_to_user_ids = called_participants
1647 .into_iter()
1648 .map(|participant| participant.user_id)
1649 .collect();
1650
1651 // Detect left projects.
1652 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1653 enum QueryProjectIds {
1654 ProjectId,
1655 }
1656 let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1657 .select_only()
1658 .column_as(
1659 project_collaborator::Column::ProjectId,
1660 QueryProjectIds::ProjectId,
1661 )
1662 .filter(
1663 Condition::all()
1664 .add(
1665 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1666 )
1667 .add(
1668 project_collaborator::Column::ConnectionServerId
1669 .eq(connection.owner_id as i32),
1670 ),
1671 )
1672 .into_values::<_, QueryProjectIds>()
1673 .all(&*tx)
1674 .await?;
1675 let mut left_projects = HashMap::default();
1676 let mut collaborators = project_collaborator::Entity::find()
1677 .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1678 .stream(&*tx)
1679 .await?;
1680 while let Some(collaborator) = collaborators.next().await {
1681 let collaborator = collaborator?;
1682 let left_project =
1683 left_projects
1684 .entry(collaborator.project_id)
1685 .or_insert(LeftProject {
1686 id: collaborator.project_id,
1687 host_user_id: Default::default(),
1688 connection_ids: Default::default(),
1689 host_connection_id: Default::default(),
1690 });
1691
1692 let collaborator_connection_id = collaborator.connection();
1693 if collaborator_connection_id != connection {
1694 left_project.connection_ids.push(collaborator_connection_id);
1695 }
1696
1697 if collaborator.is_host {
1698 left_project.host_user_id = collaborator.user_id;
1699 left_project.host_connection_id = collaborator_connection_id;
1700 }
1701 }
1702 drop(collaborators);
1703
1704 // Leave projects.
1705 project_collaborator::Entity::delete_many()
1706 .filter(
1707 Condition::all()
1708 .add(
1709 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1710 )
1711 .add(
1712 project_collaborator::Column::ConnectionServerId
1713 .eq(connection.owner_id as i32),
1714 ),
1715 )
1716 .exec(&*tx)
1717 .await?;
1718
1719 // Unshare projects.
1720 project::Entity::delete_many()
1721 .filter(
1722 Condition::all()
1723 .add(project::Column::RoomId.eq(room_id))
1724 .add(project::Column::HostConnectionId.eq(connection.id as i32))
1725 .add(
1726 project::Column::HostConnectionServerId
1727 .eq(connection.owner_id as i32),
1728 ),
1729 )
1730 .exec(&*tx)
1731 .await?;
1732
1733 let room = self.get_room(room_id, &tx).await?;
1734 if room.participants.is_empty() {
1735 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
1736 }
1737
1738 let left_room = LeftRoom {
1739 room,
1740 left_projects,
1741 canceled_calls_to_user_ids,
1742 };
1743
1744 if left_room.room.participants.is_empty() {
1745 self.rooms.remove(&room_id);
1746 }
1747
1748 Ok(Some((room_id, left_room)))
1749 } else {
1750 Ok(None)
1751 }
1752 })
1753 .await
1754 }
1755
1756 pub async fn follow(
1757 &self,
1758 project_id: ProjectId,
1759 leader_connection: ConnectionId,
1760 follower_connection: ConnectionId,
1761 ) -> Result<RoomGuard<proto::Room>> {
1762 let room_id = self.room_id_for_project(project_id).await?;
1763 self.room_transaction(room_id, |tx| async move {
1764 follower::ActiveModel {
1765 room_id: ActiveValue::set(room_id),
1766 project_id: ActiveValue::set(project_id),
1767 leader_connection_server_id: ActiveValue::set(ServerId(
1768 leader_connection.owner_id as i32,
1769 )),
1770 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1771 follower_connection_server_id: ActiveValue::set(ServerId(
1772 follower_connection.owner_id as i32,
1773 )),
1774 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1775 ..Default::default()
1776 }
1777 .insert(&*tx)
1778 .await?;
1779
1780 let room = self.get_room(room_id, &*tx).await?;
1781 Ok(room)
1782 })
1783 .await
1784 }
1785
1786 pub async fn unfollow(
1787 &self,
1788 project_id: ProjectId,
1789 leader_connection: ConnectionId,
1790 follower_connection: ConnectionId,
1791 ) -> Result<RoomGuard<proto::Room>> {
1792 let room_id = self.room_id_for_project(project_id).await?;
1793 self.room_transaction(room_id, |tx| async move {
1794 follower::Entity::delete_many()
1795 .filter(
1796 Condition::all()
1797 .add(follower::Column::ProjectId.eq(project_id))
1798 .add(
1799 follower::Column::LeaderConnectionServerId
1800 .eq(leader_connection.owner_id),
1801 )
1802 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1803 .add(
1804 follower::Column::FollowerConnectionServerId
1805 .eq(follower_connection.owner_id),
1806 )
1807 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1808 )
1809 .exec(&*tx)
1810 .await?;
1811
1812 let room = self.get_room(room_id, &*tx).await?;
1813 Ok(room)
1814 })
1815 .await
1816 }
1817
1818 pub async fn update_room_participant_location(
1819 &self,
1820 room_id: RoomId,
1821 connection: ConnectionId,
1822 location: proto::ParticipantLocation,
1823 ) -> Result<RoomGuard<proto::Room>> {
1824 self.room_transaction(room_id, |tx| async {
1825 let tx = tx;
1826 let location_kind;
1827 let location_project_id;
1828 match location
1829 .variant
1830 .as_ref()
1831 .ok_or_else(|| anyhow!("invalid location"))?
1832 {
1833 proto::participant_location::Variant::SharedProject(project) => {
1834 location_kind = 0;
1835 location_project_id = Some(ProjectId::from_proto(project.id));
1836 }
1837 proto::participant_location::Variant::UnsharedProject(_) => {
1838 location_kind = 1;
1839 location_project_id = None;
1840 }
1841 proto::participant_location::Variant::External(_) => {
1842 location_kind = 2;
1843 location_project_id = None;
1844 }
1845 }
1846
1847 let result = room_participant::Entity::update_many()
1848 .filter(
1849 Condition::all()
1850 .add(room_participant::Column::RoomId.eq(room_id))
1851 .add(
1852 room_participant::Column::AnsweringConnectionId
1853 .eq(connection.id as i32),
1854 )
1855 .add(
1856 room_participant::Column::AnsweringConnectionServerId
1857 .eq(connection.owner_id as i32),
1858 ),
1859 )
1860 .set(room_participant::ActiveModel {
1861 location_kind: ActiveValue::set(Some(location_kind)),
1862 location_project_id: ActiveValue::set(location_project_id),
1863 ..Default::default()
1864 })
1865 .exec(&*tx)
1866 .await?;
1867
1868 if result.rows_affected == 1 {
1869 let room = self.get_room(room_id, &tx).await?;
1870 Ok(room)
1871 } else {
1872 Err(anyhow!("could not update room participant location"))?
1873 }
1874 })
1875 .await
1876 }
1877
1878 pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1879 self.transaction(|tx| async move {
1880 let participant = room_participant::Entity::find()
1881 .filter(
1882 Condition::all()
1883 .add(
1884 room_participant::Column::AnsweringConnectionId
1885 .eq(connection.id as i32),
1886 )
1887 .add(
1888 room_participant::Column::AnsweringConnectionServerId
1889 .eq(connection.owner_id as i32),
1890 ),
1891 )
1892 .one(&*tx)
1893 .await?
1894 .ok_or_else(|| anyhow!("not a participant in any room"))?;
1895
1896 room_participant::Entity::update(room_participant::ActiveModel {
1897 answering_connection_lost: ActiveValue::set(true),
1898 ..participant.into_active_model()
1899 })
1900 .exec(&*tx)
1901 .await?;
1902
1903 Ok(())
1904 })
1905 .await
1906 }
1907
1908 fn build_incoming_call(
1909 room: &proto::Room,
1910 called_user_id: UserId,
1911 ) -> Option<proto::IncomingCall> {
1912 let pending_participant = room
1913 .pending_participants
1914 .iter()
1915 .find(|participant| participant.user_id == called_user_id.to_proto())?;
1916
1917 Some(proto::IncomingCall {
1918 room_id: room.id,
1919 calling_user_id: pending_participant.calling_user_id,
1920 participant_user_ids: room
1921 .participants
1922 .iter()
1923 .map(|participant| participant.user_id)
1924 .collect(),
1925 initial_project: room.participants.iter().find_map(|participant| {
1926 let initial_project_id = pending_participant.initial_project_id?;
1927 participant
1928 .projects
1929 .iter()
1930 .find(|project| project.id == initial_project_id)
1931 .cloned()
1932 }),
1933 })
1934 }
1935
1936 async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1937 let db_room = room::Entity::find_by_id(room_id)
1938 .one(tx)
1939 .await?
1940 .ok_or_else(|| anyhow!("could not find room"))?;
1941
1942 let mut db_participants = db_room
1943 .find_related(room_participant::Entity)
1944 .stream(tx)
1945 .await?;
1946 let mut participants = HashMap::default();
1947 let mut pending_participants = Vec::new();
1948 while let Some(db_participant) = db_participants.next().await {
1949 let db_participant = db_participant?;
1950 if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
1951 .answering_connection_id
1952 .zip(db_participant.answering_connection_server_id)
1953 {
1954 let location = match (
1955 db_participant.location_kind,
1956 db_participant.location_project_id,
1957 ) {
1958 (Some(0), Some(project_id)) => {
1959 Some(proto::participant_location::Variant::SharedProject(
1960 proto::participant_location::SharedProject {
1961 id: project_id.to_proto(),
1962 },
1963 ))
1964 }
1965 (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1966 Default::default(),
1967 )),
1968 _ => Some(proto::participant_location::Variant::External(
1969 Default::default(),
1970 )),
1971 };
1972
1973 let answering_connection = ConnectionId {
1974 owner_id: answering_connection_server_id.0 as u32,
1975 id: answering_connection_id as u32,
1976 };
1977 participants.insert(
1978 answering_connection,
1979 proto::Participant {
1980 user_id: db_participant.user_id.to_proto(),
1981 peer_id: Some(answering_connection.into()),
1982 projects: Default::default(),
1983 location: Some(proto::ParticipantLocation { variant: location }),
1984 },
1985 );
1986 } else {
1987 pending_participants.push(proto::PendingParticipant {
1988 user_id: db_participant.user_id.to_proto(),
1989 calling_user_id: db_participant.calling_user_id.to_proto(),
1990 initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1991 });
1992 }
1993 }
1994 drop(db_participants);
1995
1996 let mut db_projects = db_room
1997 .find_related(project::Entity)
1998 .find_with_related(worktree::Entity)
1999 .stream(tx)
2000 .await?;
2001
2002 while let Some(row) = db_projects.next().await {
2003 let (db_project, db_worktree) = row?;
2004 let host_connection = db_project.host_connection()?;
2005 if let Some(participant) = participants.get_mut(&host_connection) {
2006 let project = if let Some(project) = participant
2007 .projects
2008 .iter_mut()
2009 .find(|project| project.id == db_project.id.to_proto())
2010 {
2011 project
2012 } else {
2013 participant.projects.push(proto::ParticipantProject {
2014 id: db_project.id.to_proto(),
2015 worktree_root_names: Default::default(),
2016 });
2017 participant.projects.last_mut().unwrap()
2018 };
2019
2020 if let Some(db_worktree) = db_worktree {
2021 if db_worktree.visible {
2022 project.worktree_root_names.push(db_worktree.root_name);
2023 }
2024 }
2025 }
2026 }
2027 drop(db_projects);
2028
2029 let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
2030 let mut followers = Vec::new();
2031 while let Some(db_follower) = db_followers.next().await {
2032 let db_follower = db_follower?;
2033 followers.push(proto::Follower {
2034 leader_id: Some(db_follower.leader_connection().into()),
2035 follower_id: Some(db_follower.follower_connection().into()),
2036 project_id: db_follower.project_id.to_proto(),
2037 });
2038 }
2039
2040 Ok(proto::Room {
2041 id: db_room.id.to_proto(),
2042 live_kit_room: db_room.live_kit_room,
2043 participants: participants.into_values().collect(),
2044 pending_participants,
2045 followers,
2046 })
2047 }
2048
2049 // projects
2050
2051 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
2052 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2053 enum QueryAs {
2054 Count,
2055 }
2056
2057 self.transaction(|tx| async move {
2058 Ok(project::Entity::find()
2059 .select_only()
2060 .column_as(project::Column::Id.count(), QueryAs::Count)
2061 .inner_join(user::Entity)
2062 .filter(user::Column::Admin.eq(false))
2063 .into_values::<_, QueryAs>()
2064 .one(&*tx)
2065 .await?
2066 .unwrap_or(0i64) as usize)
2067 })
2068 .await
2069 }
2070
2071 pub async fn share_project(
2072 &self,
2073 room_id: RoomId,
2074 connection: ConnectionId,
2075 worktrees: &[proto::WorktreeMetadata],
2076 ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
2077 self.room_transaction(room_id, |tx| async move {
2078 let participant = room_participant::Entity::find()
2079 .filter(
2080 Condition::all()
2081 .add(
2082 room_participant::Column::AnsweringConnectionId
2083 .eq(connection.id as i32),
2084 )
2085 .add(
2086 room_participant::Column::AnsweringConnectionServerId
2087 .eq(connection.owner_id as i32),
2088 ),
2089 )
2090 .one(&*tx)
2091 .await?
2092 .ok_or_else(|| anyhow!("could not find participant"))?;
2093 if participant.room_id != room_id {
2094 return Err(anyhow!("shared project on unexpected room"))?;
2095 }
2096
2097 let project = project::ActiveModel {
2098 room_id: ActiveValue::set(participant.room_id),
2099 host_user_id: ActiveValue::set(participant.user_id),
2100 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
2101 host_connection_server_id: ActiveValue::set(Some(ServerId(
2102 connection.owner_id as i32,
2103 ))),
2104 ..Default::default()
2105 }
2106 .insert(&*tx)
2107 .await?;
2108
2109 if !worktrees.is_empty() {
2110 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2111 worktree::ActiveModel {
2112 id: ActiveValue::set(worktree.id as i64),
2113 project_id: ActiveValue::set(project.id),
2114 abs_path: ActiveValue::set(worktree.abs_path.clone()),
2115 root_name: ActiveValue::set(worktree.root_name.clone()),
2116 visible: ActiveValue::set(worktree.visible),
2117 scan_id: ActiveValue::set(0),
2118 completed_scan_id: ActiveValue::set(0),
2119 }
2120 }))
2121 .exec(&*tx)
2122 .await?;
2123 }
2124
2125 project_collaborator::ActiveModel {
2126 project_id: ActiveValue::set(project.id),
2127 connection_id: ActiveValue::set(connection.id as i32),
2128 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2129 user_id: ActiveValue::set(participant.user_id),
2130 replica_id: ActiveValue::set(ReplicaId(0)),
2131 is_host: ActiveValue::set(true),
2132 ..Default::default()
2133 }
2134 .insert(&*tx)
2135 .await?;
2136
2137 let room = self.get_room(room_id, &tx).await?;
2138 Ok((project.id, room))
2139 })
2140 .await
2141 }
2142
2143 pub async fn unshare_project(
2144 &self,
2145 project_id: ProjectId,
2146 connection: ConnectionId,
2147 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2148 let room_id = self.room_id_for_project(project_id).await?;
2149 self.room_transaction(room_id, |tx| async move {
2150 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2151
2152 let project = project::Entity::find_by_id(project_id)
2153 .one(&*tx)
2154 .await?
2155 .ok_or_else(|| anyhow!("project not found"))?;
2156 if project.host_connection()? == connection {
2157 project::Entity::delete(project.into_active_model())
2158 .exec(&*tx)
2159 .await?;
2160 let room = self.get_room(room_id, &tx).await?;
2161 Ok((room, guest_connection_ids))
2162 } else {
2163 Err(anyhow!("cannot unshare a project hosted by another user"))?
2164 }
2165 })
2166 .await
2167 }
2168
2169 pub async fn update_project(
2170 &self,
2171 project_id: ProjectId,
2172 connection: ConnectionId,
2173 worktrees: &[proto::WorktreeMetadata],
2174 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2175 let room_id = self.room_id_for_project(project_id).await?;
2176 self.room_transaction(room_id, |tx| async move {
2177 let project = project::Entity::find_by_id(project_id)
2178 .filter(
2179 Condition::all()
2180 .add(project::Column::HostConnectionId.eq(connection.id as i32))
2181 .add(
2182 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2183 ),
2184 )
2185 .one(&*tx)
2186 .await?
2187 .ok_or_else(|| anyhow!("no such project"))?;
2188
2189 self.update_project_worktrees(project.id, worktrees, &tx)
2190 .await?;
2191
2192 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2193 let room = self.get_room(project.room_id, &tx).await?;
2194 Ok((room, guest_connection_ids))
2195 })
2196 .await
2197 }
2198
2199 async fn update_project_worktrees(
2200 &self,
2201 project_id: ProjectId,
2202 worktrees: &[proto::WorktreeMetadata],
2203 tx: &DatabaseTransaction,
2204 ) -> Result<()> {
2205 if !worktrees.is_empty() {
2206 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2207 id: ActiveValue::set(worktree.id as i64),
2208 project_id: ActiveValue::set(project_id),
2209 abs_path: ActiveValue::set(worktree.abs_path.clone()),
2210 root_name: ActiveValue::set(worktree.root_name.clone()),
2211 visible: ActiveValue::set(worktree.visible),
2212 scan_id: ActiveValue::set(0),
2213 completed_scan_id: ActiveValue::set(0),
2214 }))
2215 .on_conflict(
2216 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2217 .update_column(worktree::Column::RootName)
2218 .to_owned(),
2219 )
2220 .exec(&*tx)
2221 .await?;
2222 }
2223
2224 worktree::Entity::delete_many()
2225 .filter(worktree::Column::ProjectId.eq(project_id).and(
2226 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2227 ))
2228 .exec(&*tx)
2229 .await?;
2230
2231 Ok(())
2232 }
2233
2234 pub async fn update_worktree(
2235 &self,
2236 update: &proto::UpdateWorktree,
2237 connection: ConnectionId,
2238 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2239 let project_id = ProjectId::from_proto(update.project_id);
2240 let worktree_id = update.worktree_id as i64;
2241 let room_id = self.room_id_for_project(project_id).await?;
2242 self.room_transaction(room_id, |tx| async move {
2243 // Ensure the update comes from the host.
2244 let _project = project::Entity::find_by_id(project_id)
2245 .filter(
2246 Condition::all()
2247 .add(project::Column::HostConnectionId.eq(connection.id as i32))
2248 .add(
2249 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2250 ),
2251 )
2252 .one(&*tx)
2253 .await?
2254 .ok_or_else(|| anyhow!("no such project"))?;
2255
2256 // Update metadata.
2257 worktree::Entity::update(worktree::ActiveModel {
2258 id: ActiveValue::set(worktree_id),
2259 project_id: ActiveValue::set(project_id),
2260 root_name: ActiveValue::set(update.root_name.clone()),
2261 scan_id: ActiveValue::set(update.scan_id as i64),
2262 completed_scan_id: if update.is_last_update {
2263 ActiveValue::set(update.scan_id as i64)
2264 } else {
2265 ActiveValue::default()
2266 },
2267 abs_path: ActiveValue::set(update.abs_path.clone()),
2268 ..Default::default()
2269 })
2270 .exec(&*tx)
2271 .await?;
2272
2273 if !update.updated_entries.is_empty() {
2274 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2275 let mtime = entry.mtime.clone().unwrap_or_default();
2276 worktree_entry::ActiveModel {
2277 project_id: ActiveValue::set(project_id),
2278 worktree_id: ActiveValue::set(worktree_id),
2279 id: ActiveValue::set(entry.id as i64),
2280 is_dir: ActiveValue::set(entry.is_dir),
2281 path: ActiveValue::set(entry.path.clone()),
2282 inode: ActiveValue::set(entry.inode as i64),
2283 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2284 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2285 is_symlink: ActiveValue::set(entry.is_symlink),
2286 is_ignored: ActiveValue::set(entry.is_ignored),
2287 is_deleted: ActiveValue::set(false),
2288 scan_id: ActiveValue::set(update.scan_id as i64),
2289 }
2290 }))
2291 .on_conflict(
2292 OnConflict::columns([
2293 worktree_entry::Column::ProjectId,
2294 worktree_entry::Column::WorktreeId,
2295 worktree_entry::Column::Id,
2296 ])
2297 .update_columns([
2298 worktree_entry::Column::IsDir,
2299 worktree_entry::Column::Path,
2300 worktree_entry::Column::Inode,
2301 worktree_entry::Column::MtimeSeconds,
2302 worktree_entry::Column::MtimeNanos,
2303 worktree_entry::Column::IsSymlink,
2304 worktree_entry::Column::IsIgnored,
2305 worktree_entry::Column::ScanId,
2306 ])
2307 .to_owned(),
2308 )
2309 .exec(&*tx)
2310 .await?;
2311 }
2312
2313 if !update.removed_entries.is_empty() {
2314 worktree_entry::Entity::update_many()
2315 .filter(
2316 worktree_entry::Column::ProjectId
2317 .eq(project_id)
2318 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2319 .and(
2320 worktree_entry::Column::Id
2321 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2322 ),
2323 )
2324 .set(worktree_entry::ActiveModel {
2325 is_deleted: ActiveValue::Set(true),
2326 scan_id: ActiveValue::Set(update.scan_id as i64),
2327 ..Default::default()
2328 })
2329 .exec(&*tx)
2330 .await?;
2331 }
2332
2333 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2334 Ok(connection_ids)
2335 })
2336 .await
2337 }
2338
2339 pub async fn update_diagnostic_summary(
2340 &self,
2341 update: &proto::UpdateDiagnosticSummary,
2342 connection: ConnectionId,
2343 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2344 let project_id = ProjectId::from_proto(update.project_id);
2345 let worktree_id = update.worktree_id as i64;
2346 let room_id = self.room_id_for_project(project_id).await?;
2347 self.room_transaction(room_id, |tx| async move {
2348 let summary = update
2349 .summary
2350 .as_ref()
2351 .ok_or_else(|| anyhow!("invalid summary"))?;
2352
2353 // Ensure the update comes from the host.
2354 let project = project::Entity::find_by_id(project_id)
2355 .one(&*tx)
2356 .await?
2357 .ok_or_else(|| anyhow!("no such project"))?;
2358 if project.host_connection()? != connection {
2359 return Err(anyhow!("can't update a project hosted by someone else"))?;
2360 }
2361
2362 // Update summary.
2363 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2364 project_id: ActiveValue::set(project_id),
2365 worktree_id: ActiveValue::set(worktree_id),
2366 path: ActiveValue::set(summary.path.clone()),
2367 language_server_id: ActiveValue::set(summary.language_server_id as i64),
2368 error_count: ActiveValue::set(summary.error_count as i32),
2369 warning_count: ActiveValue::set(summary.warning_count as i32),
2370 ..Default::default()
2371 })
2372 .on_conflict(
2373 OnConflict::columns([
2374 worktree_diagnostic_summary::Column::ProjectId,
2375 worktree_diagnostic_summary::Column::WorktreeId,
2376 worktree_diagnostic_summary::Column::Path,
2377 ])
2378 .update_columns([
2379 worktree_diagnostic_summary::Column::LanguageServerId,
2380 worktree_diagnostic_summary::Column::ErrorCount,
2381 worktree_diagnostic_summary::Column::WarningCount,
2382 ])
2383 .to_owned(),
2384 )
2385 .exec(&*tx)
2386 .await?;
2387
2388 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2389 Ok(connection_ids)
2390 })
2391 .await
2392 }
2393
2394 pub async fn start_language_server(
2395 &self,
2396 update: &proto::StartLanguageServer,
2397 connection: ConnectionId,
2398 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2399 let project_id = ProjectId::from_proto(update.project_id);
2400 let room_id = self.room_id_for_project(project_id).await?;
2401 self.room_transaction(room_id, |tx| async move {
2402 let server = update
2403 .server
2404 .as_ref()
2405 .ok_or_else(|| anyhow!("invalid language server"))?;
2406
2407 // Ensure the update comes from the host.
2408 let project = project::Entity::find_by_id(project_id)
2409 .one(&*tx)
2410 .await?
2411 .ok_or_else(|| anyhow!("no such project"))?;
2412 if project.host_connection()? != connection {
2413 return Err(anyhow!("can't update a project hosted by someone else"))?;
2414 }
2415
2416 // Add the newly-started language server.
2417 language_server::Entity::insert(language_server::ActiveModel {
2418 project_id: ActiveValue::set(project_id),
2419 id: ActiveValue::set(server.id as i64),
2420 name: ActiveValue::set(server.name.clone()),
2421 ..Default::default()
2422 })
2423 .on_conflict(
2424 OnConflict::columns([
2425 language_server::Column::ProjectId,
2426 language_server::Column::Id,
2427 ])
2428 .update_column(language_server::Column::Name)
2429 .to_owned(),
2430 )
2431 .exec(&*tx)
2432 .await?;
2433
2434 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2435 Ok(connection_ids)
2436 })
2437 .await
2438 }
2439
2440 pub async fn join_project(
2441 &self,
2442 project_id: ProjectId,
2443 connection: ConnectionId,
2444 ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2445 let room_id = self.room_id_for_project(project_id).await?;
2446 self.room_transaction(room_id, |tx| async move {
2447 let participant = room_participant::Entity::find()
2448 .filter(
2449 Condition::all()
2450 .add(
2451 room_participant::Column::AnsweringConnectionId
2452 .eq(connection.id as i32),
2453 )
2454 .add(
2455 room_participant::Column::AnsweringConnectionServerId
2456 .eq(connection.owner_id as i32),
2457 ),
2458 )
2459 .one(&*tx)
2460 .await?
2461 .ok_or_else(|| anyhow!("must join a room first"))?;
2462
2463 let project = project::Entity::find_by_id(project_id)
2464 .one(&*tx)
2465 .await?
2466 .ok_or_else(|| anyhow!("no such project"))?;
2467 if project.room_id != participant.room_id {
2468 return Err(anyhow!("no such project"))?;
2469 }
2470
2471 let mut collaborators = project
2472 .find_related(project_collaborator::Entity)
2473 .all(&*tx)
2474 .await?;
2475 let replica_ids = collaborators
2476 .iter()
2477 .map(|c| c.replica_id)
2478 .collect::<HashSet<_>>();
2479 let mut replica_id = ReplicaId(1);
2480 while replica_ids.contains(&replica_id) {
2481 replica_id.0 += 1;
2482 }
2483 let new_collaborator = project_collaborator::ActiveModel {
2484 project_id: ActiveValue::set(project_id),
2485 connection_id: ActiveValue::set(connection.id as i32),
2486 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2487 user_id: ActiveValue::set(participant.user_id),
2488 replica_id: ActiveValue::set(replica_id),
2489 is_host: ActiveValue::set(false),
2490 ..Default::default()
2491 }
2492 .insert(&*tx)
2493 .await?;
2494 collaborators.push(new_collaborator);
2495
2496 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2497 let mut worktrees = db_worktrees
2498 .into_iter()
2499 .map(|db_worktree| {
2500 (
2501 db_worktree.id as u64,
2502 Worktree {
2503 id: db_worktree.id as u64,
2504 abs_path: db_worktree.abs_path,
2505 root_name: db_worktree.root_name,
2506 visible: db_worktree.visible,
2507 entries: Default::default(),
2508 diagnostic_summaries: Default::default(),
2509 scan_id: db_worktree.scan_id as u64,
2510 completed_scan_id: db_worktree.completed_scan_id as u64,
2511 },
2512 )
2513 })
2514 .collect::<BTreeMap<_, _>>();
2515
2516 // Populate worktree entries.
2517 {
2518 let mut db_entries = worktree_entry::Entity::find()
2519 .filter(
2520 Condition::all()
2521 .add(worktree_entry::Column::ProjectId.eq(project_id))
2522 .add(worktree_entry::Column::IsDeleted.eq(false)),
2523 )
2524 .stream(&*tx)
2525 .await?;
2526 while let Some(db_entry) = db_entries.next().await {
2527 let db_entry = db_entry?;
2528 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2529 worktree.entries.push(proto::Entry {
2530 id: db_entry.id as u64,
2531 is_dir: db_entry.is_dir,
2532 path: db_entry.path,
2533 inode: db_entry.inode as u64,
2534 mtime: Some(proto::Timestamp {
2535 seconds: db_entry.mtime_seconds as u64,
2536 nanos: db_entry.mtime_nanos as u32,
2537 }),
2538 is_symlink: db_entry.is_symlink,
2539 is_ignored: db_entry.is_ignored,
2540 });
2541 }
2542 }
2543 }
2544
2545 // Populate worktree diagnostic summaries.
2546 {
2547 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2548 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2549 .stream(&*tx)
2550 .await?;
2551 while let Some(db_summary) = db_summaries.next().await {
2552 let db_summary = db_summary?;
2553 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2554 worktree
2555 .diagnostic_summaries
2556 .push(proto::DiagnosticSummary {
2557 path: db_summary.path,
2558 language_server_id: db_summary.language_server_id as u64,
2559 error_count: db_summary.error_count as u32,
2560 warning_count: db_summary.warning_count as u32,
2561 });
2562 }
2563 }
2564 }
2565
2566 // Populate language servers.
2567 let language_servers = project
2568 .find_related(language_server::Entity)
2569 .all(&*tx)
2570 .await?;
2571
2572 let project = Project {
2573 collaborators: collaborators
2574 .into_iter()
2575 .map(|collaborator| ProjectCollaborator {
2576 connection_id: collaborator.connection(),
2577 user_id: collaborator.user_id,
2578 replica_id: collaborator.replica_id,
2579 is_host: collaborator.is_host,
2580 })
2581 .collect(),
2582 worktrees,
2583 language_servers: language_servers
2584 .into_iter()
2585 .map(|language_server| proto::LanguageServer {
2586 id: language_server.id as u64,
2587 name: language_server.name,
2588 })
2589 .collect(),
2590 };
2591 Ok((project, replica_id as ReplicaId))
2592 })
2593 .await
2594 }
2595
2596 pub async fn leave_project(
2597 &self,
2598 project_id: ProjectId,
2599 connection: ConnectionId,
2600 ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
2601 let room_id = self.room_id_for_project(project_id).await?;
2602 self.room_transaction(room_id, |tx| async move {
2603 let result = project_collaborator::Entity::delete_many()
2604 .filter(
2605 Condition::all()
2606 .add(project_collaborator::Column::ProjectId.eq(project_id))
2607 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2608 .add(
2609 project_collaborator::Column::ConnectionServerId
2610 .eq(connection.owner_id as i32),
2611 ),
2612 )
2613 .exec(&*tx)
2614 .await?;
2615 if result.rows_affected == 0 {
2616 Err(anyhow!("not a collaborator on this project"))?;
2617 }
2618
2619 let project = project::Entity::find_by_id(project_id)
2620 .one(&*tx)
2621 .await?
2622 .ok_or_else(|| anyhow!("no such project"))?;
2623 let collaborators = project
2624 .find_related(project_collaborator::Entity)
2625 .all(&*tx)
2626 .await?;
2627 let connection_ids = collaborators
2628 .into_iter()
2629 .map(|collaborator| collaborator.connection())
2630 .collect();
2631
2632 follower::Entity::delete_many()
2633 .filter(
2634 Condition::any()
2635 .add(
2636 Condition::all()
2637 .add(follower::Column::ProjectId.eq(project_id))
2638 .add(
2639 follower::Column::LeaderConnectionServerId
2640 .eq(connection.owner_id),
2641 )
2642 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
2643 )
2644 .add(
2645 Condition::all()
2646 .add(follower::Column::ProjectId.eq(project_id))
2647 .add(
2648 follower::Column::FollowerConnectionServerId
2649 .eq(connection.owner_id),
2650 )
2651 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
2652 ),
2653 )
2654 .exec(&*tx)
2655 .await?;
2656
2657 let room = self.get_room(project.room_id, &tx).await?;
2658 let left_project = LeftProject {
2659 id: project_id,
2660 host_user_id: project.host_user_id,
2661 host_connection_id: project.host_connection()?,
2662 connection_ids,
2663 };
2664 Ok((room, left_project))
2665 })
2666 .await
2667 }
2668
2669 pub async fn project_collaborators(
2670 &self,
2671 project_id: ProjectId,
2672 connection_id: ConnectionId,
2673 ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
2674 let room_id = self.room_id_for_project(project_id).await?;
2675 self.room_transaction(room_id, |tx| async move {
2676 let collaborators = project_collaborator::Entity::find()
2677 .filter(project_collaborator::Column::ProjectId.eq(project_id))
2678 .all(&*tx)
2679 .await?
2680 .into_iter()
2681 .map(|collaborator| ProjectCollaborator {
2682 connection_id: collaborator.connection(),
2683 user_id: collaborator.user_id,
2684 replica_id: collaborator.replica_id,
2685 is_host: collaborator.is_host,
2686 })
2687 .collect::<Vec<_>>();
2688
2689 if collaborators
2690 .iter()
2691 .any(|collaborator| collaborator.connection_id == connection_id)
2692 {
2693 Ok(collaborators)
2694 } else {
2695 Err(anyhow!("no such project"))?
2696 }
2697 })
2698 .await
2699 }
2700
2701 pub async fn project_connection_ids(
2702 &self,
2703 project_id: ProjectId,
2704 connection_id: ConnectionId,
2705 ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
2706 let room_id = self.room_id_for_project(project_id).await?;
2707 self.room_transaction(room_id, |tx| async move {
2708 let mut collaborators = project_collaborator::Entity::find()
2709 .filter(project_collaborator::Column::ProjectId.eq(project_id))
2710 .stream(&*tx)
2711 .await?;
2712
2713 let mut connection_ids = HashSet::default();
2714 while let Some(collaborator) = collaborators.next().await {
2715 let collaborator = collaborator?;
2716 connection_ids.insert(collaborator.connection());
2717 }
2718
2719 if connection_ids.contains(&connection_id) {
2720 Ok(connection_ids)
2721 } else {
2722 Err(anyhow!("no such project"))?
2723 }
2724 })
2725 .await
2726 }
2727
2728 async fn project_guest_connection_ids(
2729 &self,
2730 project_id: ProjectId,
2731 tx: &DatabaseTransaction,
2732 ) -> Result<Vec<ConnectionId>> {
2733 let mut collaborators = project_collaborator::Entity::find()
2734 .filter(
2735 project_collaborator::Column::ProjectId
2736 .eq(project_id)
2737 .and(project_collaborator::Column::IsHost.eq(false)),
2738 )
2739 .stream(tx)
2740 .await?;
2741
2742 let mut guest_connection_ids = Vec::new();
2743 while let Some(collaborator) = collaborators.next().await {
2744 let collaborator = collaborator?;
2745 guest_connection_ids.push(collaborator.connection());
2746 }
2747 Ok(guest_connection_ids)
2748 }
2749
2750 async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
2751 self.transaction(|tx| async move {
2752 let project = project::Entity::find_by_id(project_id)
2753 .one(&*tx)
2754 .await?
2755 .ok_or_else(|| anyhow!("project {} not found", project_id))?;
2756 Ok(project.room_id)
2757 })
2758 .await
2759 }
2760
2761 // access tokens
2762
2763 pub async fn create_access_token(
2764 &self,
2765 user_id: UserId,
2766 access_token_hash: &str,
2767 max_access_token_count: usize,
2768 ) -> Result<AccessTokenId> {
2769 self.transaction(|tx| async {
2770 let tx = tx;
2771
2772 let token = access_token::ActiveModel {
2773 user_id: ActiveValue::set(user_id),
2774 hash: ActiveValue::set(access_token_hash.into()),
2775 ..Default::default()
2776 }
2777 .insert(&*tx)
2778 .await?;
2779
2780 access_token::Entity::delete_many()
2781 .filter(
2782 access_token::Column::Id.in_subquery(
2783 Query::select()
2784 .column(access_token::Column::Id)
2785 .from(access_token::Entity)
2786 .and_where(access_token::Column::UserId.eq(user_id))
2787 .order_by(access_token::Column::Id, sea_orm::Order::Desc)
2788 .limit(10000)
2789 .offset(max_access_token_count as u64)
2790 .to_owned(),
2791 ),
2792 )
2793 .exec(&*tx)
2794 .await?;
2795 Ok(token.id)
2796 })
2797 .await
2798 }
2799
2800 pub async fn get_access_token(
2801 &self,
2802 access_token_id: AccessTokenId,
2803 ) -> Result<access_token::Model> {
2804 self.transaction(|tx| async move {
2805 Ok(access_token::Entity::find_by_id(access_token_id)
2806 .one(&*tx)
2807 .await?
2808 .ok_or_else(|| anyhow!("no such access token"))?)
2809 })
2810 .await
2811 }
2812
2813 async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
2814 where
2815 F: Send + Fn(TransactionHandle) -> Fut,
2816 Fut: Send + Future<Output = Result<T>>,
2817 {
2818 let body = async {
2819 let mut i = 0;
2820 loop {
2821 let (tx, result) = self.with_transaction(&f).await?;
2822 match result {
2823 Ok(result) => match tx.commit().await.map_err(Into::into) {
2824 Ok(()) => return Ok(result),
2825 Err(error) => {
2826 if !self.retry_on_serialization_error(&error, i).await {
2827 return Err(error);
2828 }
2829 }
2830 },
2831 Err(error) => {
2832 tx.rollback().await?;
2833 if !self.retry_on_serialization_error(&error, i).await {
2834 return Err(error);
2835 }
2836 }
2837 }
2838 i += 1;
2839 }
2840 };
2841
2842 self.run(body).await
2843 }
2844
2845 async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
2846 where
2847 F: Send + Fn(TransactionHandle) -> Fut,
2848 Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
2849 {
2850 let body = async {
2851 let mut i = 0;
2852 loop {
2853 let (tx, result) = self.with_transaction(&f).await?;
2854 match result {
2855 Ok(Some((room_id, data))) => {
2856 let lock = self.rooms.entry(room_id).or_default().clone();
2857 let _guard = lock.lock_owned().await;
2858 match tx.commit().await.map_err(Into::into) {
2859 Ok(()) => {
2860 return Ok(Some(RoomGuard {
2861 data,
2862 _guard,
2863 _not_send: PhantomData,
2864 }));
2865 }
2866 Err(error) => {
2867 if !self.retry_on_serialization_error(&error, i).await {
2868 return Err(error);
2869 }
2870 }
2871 }
2872 }
2873 Ok(None) => match tx.commit().await.map_err(Into::into) {
2874 Ok(()) => return Ok(None),
2875 Err(error) => {
2876 if !self.retry_on_serialization_error(&error, i).await {
2877 return Err(error);
2878 }
2879 }
2880 },
2881 Err(error) => {
2882 tx.rollback().await?;
2883 if !self.retry_on_serialization_error(&error, i).await {
2884 return Err(error);
2885 }
2886 }
2887 }
2888 i += 1;
2889 }
2890 };
2891
2892 self.run(body).await
2893 }
2894
2895 async fn room_transaction<F, Fut, T>(&self, room_id: RoomId, f: F) -> Result<RoomGuard<T>>
2896 where
2897 F: Send + Fn(TransactionHandle) -> Fut,
2898 Fut: Send + Future<Output = Result<T>>,
2899 {
2900 let body = async {
2901 let mut i = 0;
2902 loop {
2903 let lock = self.rooms.entry(room_id).or_default().clone();
2904 let _guard = lock.lock_owned().await;
2905 let (tx, result) = self.with_transaction(&f).await?;
2906 match result {
2907 Ok(data) => match tx.commit().await.map_err(Into::into) {
2908 Ok(()) => {
2909 return Ok(RoomGuard {
2910 data,
2911 _guard,
2912 _not_send: PhantomData,
2913 });
2914 }
2915 Err(error) => {
2916 if !self.retry_on_serialization_error(&error, i).await {
2917 return Err(error);
2918 }
2919 }
2920 },
2921 Err(error) => {
2922 tx.rollback().await?;
2923 if !self.retry_on_serialization_error(&error, i).await {
2924 return Err(error);
2925 }
2926 }
2927 }
2928 i += 1;
2929 }
2930 };
2931
2932 self.run(body).await
2933 }
2934
2935 async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
2936 where
2937 F: Send + Fn(TransactionHandle) -> Fut,
2938 Fut: Send + Future<Output = Result<T>>,
2939 {
2940 let tx = self
2941 .pool
2942 .begin_with_config(Some(IsolationLevel::Serializable), None)
2943 .await?;
2944
2945 let mut tx = Arc::new(Some(tx));
2946 let result = f(TransactionHandle(tx.clone())).await;
2947 let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
2948 return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
2949 };
2950
2951 Ok((tx, result))
2952 }
2953
2954 async fn run<F, T>(&self, future: F) -> Result<T>
2955 where
2956 F: Future<Output = Result<T>>,
2957 {
2958 #[cfg(test)]
2959 {
2960 if let Executor::Deterministic(executor) = &self.executor {
2961 executor.simulate_random_delay().await;
2962 }
2963
2964 self.runtime.as_ref().unwrap().block_on(future)
2965 }
2966
2967 #[cfg(not(test))]
2968 {
2969 future.await
2970 }
2971 }
2972
2973 async fn retry_on_serialization_error(&self, error: &Error, prev_attempt_count: u32) -> bool {
2974 // If the error is due to a failure to serialize concurrent transactions, then retry
2975 // this transaction after a delay. With each subsequent retry, double the delay duration.
2976 // Also vary the delay randomly in order to ensure different database connections retry
2977 // at different times.
2978 if is_serialization_error(error) {
2979 let base_delay = 4_u64 << prev_attempt_count.min(16);
2980 let randomized_delay = base_delay as f32 * self.rng.lock().await.gen_range(0.5..=2.0);
2981 log::info!(
2982 "retrying transaction after serialization error. delay: {} ms.",
2983 randomized_delay
2984 );
2985 self.executor
2986 .sleep(Duration::from_millis(randomized_delay as u64))
2987 .await;
2988 true
2989 } else {
2990 false
2991 }
2992 }
2993}
2994
2995fn is_serialization_error(error: &Error) -> bool {
2996 const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
2997 match error {
2998 Error::Database(
2999 DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
3000 | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
3001 ) if error
3002 .as_database_error()
3003 .and_then(|error| error.code())
3004 .as_deref()
3005 == Some(SERIALIZATION_FAILURE_CODE) =>
3006 {
3007 true
3008 }
3009 _ => false,
3010 }
3011}
3012
3013struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
3014
3015impl Deref for TransactionHandle {
3016 type Target = DatabaseTransaction;
3017
3018 fn deref(&self) -> &Self::Target {
3019 self.0.as_ref().as_ref().unwrap()
3020 }
3021}
3022
3023pub struct RoomGuard<T> {
3024 data: T,
3025 _guard: OwnedMutexGuard<()>,
3026 _not_send: PhantomData<Rc<()>>,
3027}
3028
3029impl<T> Deref for RoomGuard<T> {
3030 type Target = T;
3031
3032 fn deref(&self) -> &T {
3033 &self.data
3034 }
3035}
3036
3037impl<T> DerefMut for RoomGuard<T> {
3038 fn deref_mut(&mut self) -> &mut T {
3039 &mut self.data
3040 }
3041}
3042
3043#[derive(Debug, Serialize, Deserialize)]
3044pub struct NewUserParams {
3045 pub github_login: String,
3046 pub github_user_id: i32,
3047 pub invite_count: i32,
3048}
3049
3050#[derive(Debug)]
3051pub struct NewUserResult {
3052 pub user_id: UserId,
3053 pub metrics_id: String,
3054 pub inviting_user_id: Option<UserId>,
3055 pub signup_device_id: Option<String>,
3056}
3057
3058fn random_invite_code() -> String {
3059 nanoid::nanoid!(16)
3060}
3061
3062fn random_email_confirmation_code() -> String {
3063 nanoid::nanoid!(64)
3064}
3065
3066macro_rules! id_type {
3067 ($name:ident) => {
3068 #[derive(
3069 Clone,
3070 Copy,
3071 Debug,
3072 Default,
3073 PartialEq,
3074 Eq,
3075 PartialOrd,
3076 Ord,
3077 Hash,
3078 Serialize,
3079 Deserialize,
3080 )]
3081 #[serde(transparent)]
3082 pub struct $name(pub i32);
3083
3084 impl $name {
3085 #[allow(unused)]
3086 pub const MAX: Self = Self(i32::MAX);
3087
3088 #[allow(unused)]
3089 pub fn from_proto(value: u64) -> Self {
3090 Self(value as i32)
3091 }
3092
3093 #[allow(unused)]
3094 pub fn to_proto(self) -> u64 {
3095 self.0 as u64
3096 }
3097 }
3098
3099 impl std::fmt::Display for $name {
3100 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
3101 self.0.fmt(f)
3102 }
3103 }
3104
3105 impl From<$name> for sea_query::Value {
3106 fn from(value: $name) -> Self {
3107 sea_query::Value::Int(Some(value.0))
3108 }
3109 }
3110
3111 impl sea_orm::TryGetable for $name {
3112 fn try_get(
3113 res: &sea_orm::QueryResult,
3114 pre: &str,
3115 col: &str,
3116 ) -> Result<Self, sea_orm::TryGetError> {
3117 Ok(Self(i32::try_get(res, pre, col)?))
3118 }
3119 }
3120
3121 impl sea_query::ValueType for $name {
3122 fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
3123 match v {
3124 Value::TinyInt(Some(int)) => {
3125 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3126 }
3127 Value::SmallInt(Some(int)) => {
3128 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3129 }
3130 Value::Int(Some(int)) => {
3131 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3132 }
3133 Value::BigInt(Some(int)) => {
3134 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3135 }
3136 Value::TinyUnsigned(Some(int)) => {
3137 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3138 }
3139 Value::SmallUnsigned(Some(int)) => {
3140 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3141 }
3142 Value::Unsigned(Some(int)) => {
3143 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3144 }
3145 Value::BigUnsigned(Some(int)) => {
3146 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3147 }
3148 _ => Err(sea_query::ValueTypeErr),
3149 }
3150 }
3151
3152 fn type_name() -> String {
3153 stringify!($name).into()
3154 }
3155
3156 fn array_type() -> sea_query::ArrayType {
3157 sea_query::ArrayType::Int
3158 }
3159
3160 fn column_type() -> sea_query::ColumnType {
3161 sea_query::ColumnType::Integer(None)
3162 }
3163 }
3164
3165 impl sea_orm::TryFromU64 for $name {
3166 fn try_from_u64(n: u64) -> Result<Self, DbErr> {
3167 Ok(Self(n.try_into().map_err(|_| {
3168 DbErr::ConvertFromU64(concat!(
3169 "error converting ",
3170 stringify!($name),
3171 " to u64"
3172 ))
3173 })?))
3174 }
3175 }
3176
3177 impl sea_query::Nullable for $name {
3178 fn null() -> Value {
3179 Value::Int(None)
3180 }
3181 }
3182 };
3183}
3184
3185id_type!(AccessTokenId);
3186id_type!(ContactId);
3187id_type!(FollowerId);
3188id_type!(RoomId);
3189id_type!(RoomParticipantId);
3190id_type!(ProjectId);
3191id_type!(ProjectCollaboratorId);
3192id_type!(ReplicaId);
3193id_type!(ServerId);
3194id_type!(SignupId);
3195id_type!(UserId);
3196
3197pub struct RejoinedRoom {
3198 pub room: proto::Room,
3199 pub rejoined_projects: Vec<RejoinedProject>,
3200 pub reshared_projects: Vec<ResharedProject>,
3201}
3202
3203pub struct ResharedProject {
3204 pub id: ProjectId,
3205 pub old_connection_id: ConnectionId,
3206 pub collaborators: Vec<ProjectCollaborator>,
3207 pub worktrees: Vec<proto::WorktreeMetadata>,
3208}
3209
3210pub struct RejoinedProject {
3211 pub id: ProjectId,
3212 pub old_connection_id: ConnectionId,
3213 pub collaborators: Vec<ProjectCollaborator>,
3214 pub worktrees: Vec<RejoinedWorktree>,
3215 pub language_servers: Vec<proto::LanguageServer>,
3216}
3217
3218#[derive(Debug)]
3219pub struct RejoinedWorktree {
3220 pub id: u64,
3221 pub abs_path: String,
3222 pub root_name: String,
3223 pub visible: bool,
3224 pub updated_entries: Vec<proto::Entry>,
3225 pub removed_entries: Vec<u64>,
3226 pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3227 pub scan_id: u64,
3228 pub completed_scan_id: u64,
3229}
3230
3231pub struct LeftRoom {
3232 pub room: proto::Room,
3233 pub left_projects: HashMap<ProjectId, LeftProject>,
3234 pub canceled_calls_to_user_ids: Vec<UserId>,
3235}
3236
3237pub struct RefreshedRoom {
3238 pub room: proto::Room,
3239 pub stale_participant_user_ids: Vec<UserId>,
3240 pub canceled_calls_to_user_ids: Vec<UserId>,
3241}
3242
3243pub struct Project {
3244 pub collaborators: Vec<ProjectCollaborator>,
3245 pub worktrees: BTreeMap<u64, Worktree>,
3246 pub language_servers: Vec<proto::LanguageServer>,
3247}
3248
3249pub struct ProjectCollaborator {
3250 pub connection_id: ConnectionId,
3251 pub user_id: UserId,
3252 pub replica_id: ReplicaId,
3253 pub is_host: bool,
3254}
3255
3256impl ProjectCollaborator {
3257 pub fn to_proto(&self) -> proto::Collaborator {
3258 proto::Collaborator {
3259 peer_id: Some(self.connection_id.into()),
3260 replica_id: self.replica_id.0 as u32,
3261 user_id: self.user_id.to_proto(),
3262 }
3263 }
3264}
3265
3266#[derive(Debug)]
3267pub struct LeftProject {
3268 pub id: ProjectId,
3269 pub host_user_id: UserId,
3270 pub host_connection_id: ConnectionId,
3271 pub connection_ids: Vec<ConnectionId>,
3272}
3273
3274pub struct Worktree {
3275 pub id: u64,
3276 pub abs_path: String,
3277 pub root_name: String,
3278 pub visible: bool,
3279 pub entries: Vec<proto::Entry>,
3280 pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3281 pub scan_id: u64,
3282 pub completed_scan_id: u64,
3283}
3284
3285#[cfg(test)]
3286pub use test::*;
3287
3288#[cfg(test)]
3289mod test {
3290 use super::*;
3291 use gpui::executor::Background;
3292 use lazy_static::lazy_static;
3293 use parking_lot::Mutex;
3294 use sea_orm::ConnectionTrait;
3295 use sqlx::migrate::MigrateDatabase;
3296 use std::sync::Arc;
3297
3298 pub struct TestDb {
3299 pub db: Option<Arc<Database>>,
3300 pub connection: Option<sqlx::AnyConnection>,
3301 }
3302
3303 impl TestDb {
3304 pub fn sqlite(background: Arc<Background>) -> Self {
3305 let url = format!("sqlite::memory:");
3306 let runtime = tokio::runtime::Builder::new_current_thread()
3307 .enable_io()
3308 .enable_time()
3309 .build()
3310 .unwrap();
3311
3312 let mut db = runtime.block_on(async {
3313 let mut options = ConnectOptions::new(url);
3314 options.max_connections(5);
3315 let db = Database::new(options, Executor::Deterministic(background))
3316 .await
3317 .unwrap();
3318 let sql = include_str!(concat!(
3319 env!("CARGO_MANIFEST_DIR"),
3320 "/migrations.sqlite/20221109000000_test_schema.sql"
3321 ));
3322 db.pool
3323 .execute(sea_orm::Statement::from_string(
3324 db.pool.get_database_backend(),
3325 sql.into(),
3326 ))
3327 .await
3328 .unwrap();
3329 db
3330 });
3331
3332 db.runtime = Some(runtime);
3333
3334 Self {
3335 db: Some(Arc::new(db)),
3336 connection: None,
3337 }
3338 }
3339
3340 pub fn postgres(background: Arc<Background>) -> Self {
3341 lazy_static! {
3342 static ref LOCK: Mutex<()> = Mutex::new(());
3343 }
3344
3345 let _guard = LOCK.lock();
3346 let mut rng = StdRng::from_entropy();
3347 let url = format!(
3348 "postgres://postgres@localhost/zed-test-{}",
3349 rng.gen::<u128>()
3350 );
3351 let runtime = tokio::runtime::Builder::new_current_thread()
3352 .enable_io()
3353 .enable_time()
3354 .build()
3355 .unwrap();
3356
3357 let mut db = runtime.block_on(async {
3358 sqlx::Postgres::create_database(&url)
3359 .await
3360 .expect("failed to create test db");
3361 let mut options = ConnectOptions::new(url);
3362 options
3363 .max_connections(5)
3364 .idle_timeout(Duration::from_secs(0));
3365 let db = Database::new(options, Executor::Deterministic(background))
3366 .await
3367 .unwrap();
3368 let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
3369 db.migrate(Path::new(migrations_path), false).await.unwrap();
3370 db
3371 });
3372
3373 db.runtime = Some(runtime);
3374
3375 Self {
3376 db: Some(Arc::new(db)),
3377 connection: None,
3378 }
3379 }
3380
3381 pub fn db(&self) -> &Arc<Database> {
3382 self.db.as_ref().unwrap()
3383 }
3384 }
3385
3386 impl Drop for TestDb {
3387 fn drop(&mut self) {
3388 let db = self.db.take().unwrap();
3389 if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
3390 db.runtime.as_ref().unwrap().block_on(async {
3391 use util::ResultExt;
3392 let query = "
3393 SELECT pg_terminate_backend(pg_stat_activity.pid)
3394 FROM pg_stat_activity
3395 WHERE
3396 pg_stat_activity.datname = current_database() AND
3397 pid <> pg_backend_pid();
3398 ";
3399 db.pool
3400 .execute(sea_orm::Statement::from_string(
3401 db.pool.get_database_backend(),
3402 query.into(),
3403 ))
3404 .await
3405 .log_err();
3406 sqlx::Postgres::drop_database(db.options.get_url())
3407 .await
3408 .log_err();
3409 })
3410 }
3411 }
3412 }
3413}