1mod access_token;
2mod contact;
3mod follower;
4mod language_server;
5mod project;
6mod project_collaborator;
7mod room;
8mod room_participant;
9mod server;
10mod signup;
11#[cfg(test)]
12mod tests;
13mod user;
14mod worktree;
15mod worktree_diagnostic_summary;
16mod worktree_entry;
17mod worktree_repository;
18mod worktree_repository_statuses;
19mod worktree_settings_file;
20
21use crate::executor::Executor;
22use crate::{Error, Result};
23use anyhow::anyhow;
24use collections::{BTreeMap, HashMap, HashSet};
25pub use contact::Contact;
26use dashmap::DashMap;
27use futures::StreamExt;
28use hyper::StatusCode;
29use rand::prelude::StdRng;
30use rand::{Rng, SeedableRng};
31use rpc::{proto, ConnectionId};
32use sea_orm::Condition;
33pub use sea_orm::ConnectOptions;
34use sea_orm::{
35 entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
36 DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
37 Statement, TransactionTrait,
38};
39use sea_query::{Alias, Expr, OnConflict, Query};
40use serde::{Deserialize, Serialize};
41pub use signup::{Invite, NewSignup, WaitlistSummary};
42use sqlx::migrate::{Migrate, Migration, MigrationSource};
43use sqlx::Connection;
44use std::ops::{Deref, DerefMut};
45use std::path::Path;
46use std::time::Duration;
47use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
48use tokio::sync::{Mutex, OwnedMutexGuard};
49pub use user::Model as User;
50
51pub struct Database {
52 options: ConnectOptions,
53 pool: DatabaseConnection,
54 rooms: DashMap<RoomId, Arc<Mutex<()>>>,
55 rng: Mutex<StdRng>,
56 executor: Executor,
57 #[cfg(test)]
58 runtime: Option<tokio::runtime::Runtime>,
59}
60
61impl Database {
62 pub async fn new(options: ConnectOptions, executor: Executor) -> Result<Self> {
63 Ok(Self {
64 options: options.clone(),
65 pool: sea_orm::Database::connect(options).await?,
66 rooms: DashMap::with_capacity(16384),
67 rng: Mutex::new(StdRng::seed_from_u64(0)),
68 executor,
69 #[cfg(test)]
70 runtime: None,
71 })
72 }
73
74 #[cfg(test)]
75 pub fn reset(&self) {
76 self.rooms.clear();
77 }
78
79 pub async fn migrate(
80 &self,
81 migrations_path: &Path,
82 ignore_checksum_mismatch: bool,
83 ) -> anyhow::Result<Vec<(Migration, Duration)>> {
84 let migrations = MigrationSource::resolve(migrations_path)
85 .await
86 .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
87
88 let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
89
90 connection.ensure_migrations_table().await?;
91 let applied_migrations: HashMap<_, _> = connection
92 .list_applied_migrations()
93 .await?
94 .into_iter()
95 .map(|m| (m.version, m))
96 .collect();
97
98 let mut new_migrations = Vec::new();
99 for migration in migrations {
100 match applied_migrations.get(&migration.version) {
101 Some(applied_migration) => {
102 if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
103 {
104 Err(anyhow!(
105 "checksum mismatch for applied migration {}",
106 migration.description
107 ))?;
108 }
109 }
110 None => {
111 let elapsed = connection.apply(&migration).await?;
112 new_migrations.push((migration, elapsed));
113 }
114 }
115 }
116
117 Ok(new_migrations)
118 }
119
120 pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
121 self.transaction(|tx| async move {
122 let server = server::ActiveModel {
123 environment: ActiveValue::set(environment.into()),
124 ..Default::default()
125 }
126 .insert(&*tx)
127 .await?;
128 Ok(server.id)
129 })
130 .await
131 }
132
133 pub async fn stale_room_ids(
134 &self,
135 environment: &str,
136 new_server_id: ServerId,
137 ) -> Result<Vec<RoomId>> {
138 self.transaction(|tx| async move {
139 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
140 enum QueryAs {
141 RoomId,
142 }
143
144 let stale_server_epochs = self
145 .stale_server_ids(environment, new_server_id, &tx)
146 .await?;
147 Ok(room_participant::Entity::find()
148 .select_only()
149 .column(room_participant::Column::RoomId)
150 .distinct()
151 .filter(
152 room_participant::Column::AnsweringConnectionServerId
153 .is_in(stale_server_epochs),
154 )
155 .into_values::<_, QueryAs>()
156 .all(&*tx)
157 .await?)
158 })
159 .await
160 }
161
162 pub async fn refresh_room(
163 &self,
164 room_id: RoomId,
165 new_server_id: ServerId,
166 ) -> Result<RoomGuard<RefreshedRoom>> {
167 self.room_transaction(room_id, |tx| async move {
168 let stale_participant_filter = Condition::all()
169 .add(room_participant::Column::RoomId.eq(room_id))
170 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
171 .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
172
173 let stale_participant_user_ids = room_participant::Entity::find()
174 .filter(stale_participant_filter.clone())
175 .all(&*tx)
176 .await?
177 .into_iter()
178 .map(|participant| participant.user_id)
179 .collect::<Vec<_>>();
180
181 // Delete participants who failed to reconnect and cancel their calls.
182 let mut canceled_calls_to_user_ids = Vec::new();
183 room_participant::Entity::delete_many()
184 .filter(stale_participant_filter)
185 .exec(&*tx)
186 .await?;
187 let called_participants = room_participant::Entity::find()
188 .filter(
189 Condition::all()
190 .add(
191 room_participant::Column::CallingUserId
192 .is_in(stale_participant_user_ids.iter().copied()),
193 )
194 .add(room_participant::Column::AnsweringConnectionId.is_null()),
195 )
196 .all(&*tx)
197 .await?;
198 room_participant::Entity::delete_many()
199 .filter(
200 room_participant::Column::Id
201 .is_in(called_participants.iter().map(|participant| participant.id)),
202 )
203 .exec(&*tx)
204 .await?;
205 canceled_calls_to_user_ids.extend(
206 called_participants
207 .into_iter()
208 .map(|participant| participant.user_id),
209 );
210
211 let room = self.get_room(room_id, &tx).await?;
212 // Delete the room if it becomes empty.
213 if room.participants.is_empty() {
214 project::Entity::delete_many()
215 .filter(project::Column::RoomId.eq(room_id))
216 .exec(&*tx)
217 .await?;
218 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
219 }
220
221 Ok(RefreshedRoom {
222 room,
223 stale_participant_user_ids,
224 canceled_calls_to_user_ids,
225 })
226 })
227 .await
228 }
229
230 pub async fn delete_stale_servers(
231 &self,
232 environment: &str,
233 new_server_id: ServerId,
234 ) -> Result<()> {
235 self.transaction(|tx| async move {
236 server::Entity::delete_many()
237 .filter(
238 Condition::all()
239 .add(server::Column::Environment.eq(environment))
240 .add(server::Column::Id.ne(new_server_id)),
241 )
242 .exec(&*tx)
243 .await?;
244 Ok(())
245 })
246 .await
247 }
248
249 async fn stale_server_ids(
250 &self,
251 environment: &str,
252 new_server_id: ServerId,
253 tx: &DatabaseTransaction,
254 ) -> Result<Vec<ServerId>> {
255 let stale_servers = server::Entity::find()
256 .filter(
257 Condition::all()
258 .add(server::Column::Environment.eq(environment))
259 .add(server::Column::Id.ne(new_server_id)),
260 )
261 .all(&*tx)
262 .await?;
263 Ok(stale_servers.into_iter().map(|server| server.id).collect())
264 }
265
266 // users
267
268 pub async fn create_user(
269 &self,
270 email_address: &str,
271 admin: bool,
272 params: NewUserParams,
273 ) -> Result<NewUserResult> {
274 self.transaction(|tx| async {
275 let tx = tx;
276 let user = user::Entity::insert(user::ActiveModel {
277 email_address: ActiveValue::set(Some(email_address.into())),
278 github_login: ActiveValue::set(params.github_login.clone()),
279 github_user_id: ActiveValue::set(Some(params.github_user_id)),
280 admin: ActiveValue::set(admin),
281 metrics_id: ActiveValue::set(Uuid::new_v4()),
282 ..Default::default()
283 })
284 .on_conflict(
285 OnConflict::column(user::Column::GithubLogin)
286 .update_column(user::Column::GithubLogin)
287 .to_owned(),
288 )
289 .exec_with_returning(&*tx)
290 .await?;
291
292 Ok(NewUserResult {
293 user_id: user.id,
294 metrics_id: user.metrics_id.to_string(),
295 signup_device_id: None,
296 inviting_user_id: None,
297 })
298 })
299 .await
300 }
301
302 pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
303 self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
304 .await
305 }
306
307 pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
308 self.transaction(|tx| async {
309 let tx = tx;
310 Ok(user::Entity::find()
311 .filter(user::Column::Id.is_in(ids.iter().copied()))
312 .all(&*tx)
313 .await?)
314 })
315 .await
316 }
317
318 pub async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>> {
319 self.transaction(|tx| async move {
320 Ok(user::Entity::find()
321 .filter(user::Column::GithubLogin.eq(github_login))
322 .one(&*tx)
323 .await?)
324 })
325 .await
326 }
327
328 pub async fn get_or_create_user_by_github_account(
329 &self,
330 github_login: &str,
331 github_user_id: Option<i32>,
332 github_email: Option<&str>,
333 ) -> Result<Option<User>> {
334 self.transaction(|tx| async move {
335 let tx = &*tx;
336 if let Some(github_user_id) = github_user_id {
337 if let Some(user_by_github_user_id) = user::Entity::find()
338 .filter(user::Column::GithubUserId.eq(github_user_id))
339 .one(tx)
340 .await?
341 {
342 let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
343 user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
344 Ok(Some(user_by_github_user_id.update(tx).await?))
345 } else if let Some(user_by_github_login) = user::Entity::find()
346 .filter(user::Column::GithubLogin.eq(github_login))
347 .one(tx)
348 .await?
349 {
350 let mut user_by_github_login = user_by_github_login.into_active_model();
351 user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
352 Ok(Some(user_by_github_login.update(tx).await?))
353 } else {
354 let user = user::Entity::insert(user::ActiveModel {
355 email_address: ActiveValue::set(github_email.map(|email| email.into())),
356 github_login: ActiveValue::set(github_login.into()),
357 github_user_id: ActiveValue::set(Some(github_user_id)),
358 admin: ActiveValue::set(false),
359 invite_count: ActiveValue::set(0),
360 invite_code: ActiveValue::set(None),
361 metrics_id: ActiveValue::set(Uuid::new_v4()),
362 ..Default::default()
363 })
364 .exec_with_returning(&*tx)
365 .await?;
366 Ok(Some(user))
367 }
368 } else {
369 Ok(user::Entity::find()
370 .filter(user::Column::GithubLogin.eq(github_login))
371 .one(tx)
372 .await?)
373 }
374 })
375 .await
376 }
377
378 pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
379 self.transaction(|tx| async move {
380 Ok(user::Entity::find()
381 .order_by_asc(user::Column::GithubLogin)
382 .limit(limit as u64)
383 .offset(page as u64 * limit as u64)
384 .all(&*tx)
385 .await?)
386 })
387 .await
388 }
389
390 pub async fn get_users_with_no_invites(
391 &self,
392 invited_by_another_user: bool,
393 ) -> Result<Vec<User>> {
394 self.transaction(|tx| async move {
395 Ok(user::Entity::find()
396 .filter(
397 user::Column::InviteCount
398 .eq(0)
399 .and(if invited_by_another_user {
400 user::Column::InviterId.is_not_null()
401 } else {
402 user::Column::InviterId.is_null()
403 }),
404 )
405 .all(&*tx)
406 .await?)
407 })
408 .await
409 }
410
411 pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
412 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
413 enum QueryAs {
414 MetricsId,
415 }
416
417 self.transaction(|tx| async move {
418 let metrics_id: Uuid = user::Entity::find_by_id(id)
419 .select_only()
420 .column(user::Column::MetricsId)
421 .into_values::<_, QueryAs>()
422 .one(&*tx)
423 .await?
424 .ok_or_else(|| anyhow!("could not find user"))?;
425 Ok(metrics_id.to_string())
426 })
427 .await
428 }
429
430 pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
431 self.transaction(|tx| async move {
432 user::Entity::update_many()
433 .filter(user::Column::Id.eq(id))
434 .set(user::ActiveModel {
435 admin: ActiveValue::set(is_admin),
436 ..Default::default()
437 })
438 .exec(&*tx)
439 .await?;
440 Ok(())
441 })
442 .await
443 }
444
445 pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
446 self.transaction(|tx| async move {
447 user::Entity::update_many()
448 .filter(user::Column::Id.eq(id))
449 .set(user::ActiveModel {
450 connected_once: ActiveValue::set(connected_once),
451 ..Default::default()
452 })
453 .exec(&*tx)
454 .await?;
455 Ok(())
456 })
457 .await
458 }
459
460 pub async fn destroy_user(&self, id: UserId) -> Result<()> {
461 self.transaction(|tx| async move {
462 access_token::Entity::delete_many()
463 .filter(access_token::Column::UserId.eq(id))
464 .exec(&*tx)
465 .await?;
466 user::Entity::delete_by_id(id).exec(&*tx).await?;
467 Ok(())
468 })
469 .await
470 }
471
472 // contacts
473
474 pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
475 #[derive(Debug, FromQueryResult)]
476 struct ContactWithUserBusyStatuses {
477 user_id_a: UserId,
478 user_id_b: UserId,
479 a_to_b: bool,
480 accepted: bool,
481 should_notify: bool,
482 user_a_busy: bool,
483 user_b_busy: bool,
484 }
485
486 self.transaction(|tx| async move {
487 let user_a_participant = Alias::new("user_a_participant");
488 let user_b_participant = Alias::new("user_b_participant");
489 let mut db_contacts = contact::Entity::find()
490 .column_as(
491 Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
492 .is_not_null(),
493 "user_a_busy",
494 )
495 .column_as(
496 Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
497 .is_not_null(),
498 "user_b_busy",
499 )
500 .filter(
501 contact::Column::UserIdA
502 .eq(user_id)
503 .or(contact::Column::UserIdB.eq(user_id)),
504 )
505 .join_as(
506 JoinType::LeftJoin,
507 contact::Relation::UserARoomParticipant.def(),
508 user_a_participant,
509 )
510 .join_as(
511 JoinType::LeftJoin,
512 contact::Relation::UserBRoomParticipant.def(),
513 user_b_participant,
514 )
515 .into_model::<ContactWithUserBusyStatuses>()
516 .stream(&*tx)
517 .await?;
518
519 let mut contacts = Vec::new();
520 while let Some(db_contact) = db_contacts.next().await {
521 let db_contact = db_contact?;
522 if db_contact.user_id_a == user_id {
523 if db_contact.accepted {
524 contacts.push(Contact::Accepted {
525 user_id: db_contact.user_id_b,
526 should_notify: db_contact.should_notify && db_contact.a_to_b,
527 busy: db_contact.user_b_busy,
528 });
529 } else if db_contact.a_to_b {
530 contacts.push(Contact::Outgoing {
531 user_id: db_contact.user_id_b,
532 })
533 } else {
534 contacts.push(Contact::Incoming {
535 user_id: db_contact.user_id_b,
536 should_notify: db_contact.should_notify,
537 });
538 }
539 } else if db_contact.accepted {
540 contacts.push(Contact::Accepted {
541 user_id: db_contact.user_id_a,
542 should_notify: db_contact.should_notify && !db_contact.a_to_b,
543 busy: db_contact.user_a_busy,
544 });
545 } else if db_contact.a_to_b {
546 contacts.push(Contact::Incoming {
547 user_id: db_contact.user_id_a,
548 should_notify: db_contact.should_notify,
549 });
550 } else {
551 contacts.push(Contact::Outgoing {
552 user_id: db_contact.user_id_a,
553 });
554 }
555 }
556
557 contacts.sort_unstable_by_key(|contact| contact.user_id());
558
559 Ok(contacts)
560 })
561 .await
562 }
563
564 pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
565 self.transaction(|tx| async move {
566 let participant = room_participant::Entity::find()
567 .filter(room_participant::Column::UserId.eq(user_id))
568 .one(&*tx)
569 .await?;
570 Ok(participant.is_some())
571 })
572 .await
573 }
574
575 pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
576 self.transaction(|tx| async move {
577 let (id_a, id_b) = if user_id_1 < user_id_2 {
578 (user_id_1, user_id_2)
579 } else {
580 (user_id_2, user_id_1)
581 };
582
583 Ok(contact::Entity::find()
584 .filter(
585 contact::Column::UserIdA
586 .eq(id_a)
587 .and(contact::Column::UserIdB.eq(id_b))
588 .and(contact::Column::Accepted.eq(true)),
589 )
590 .one(&*tx)
591 .await?
592 .is_some())
593 })
594 .await
595 }
596
597 pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
598 self.transaction(|tx| async move {
599 let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
600 (sender_id, receiver_id, true)
601 } else {
602 (receiver_id, sender_id, false)
603 };
604
605 let rows_affected = contact::Entity::insert(contact::ActiveModel {
606 user_id_a: ActiveValue::set(id_a),
607 user_id_b: ActiveValue::set(id_b),
608 a_to_b: ActiveValue::set(a_to_b),
609 accepted: ActiveValue::set(false),
610 should_notify: ActiveValue::set(true),
611 ..Default::default()
612 })
613 .on_conflict(
614 OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
615 .values([
616 (contact::Column::Accepted, true.into()),
617 (contact::Column::ShouldNotify, false.into()),
618 ])
619 .action_and_where(
620 contact::Column::Accepted.eq(false).and(
621 contact::Column::AToB
622 .eq(a_to_b)
623 .and(contact::Column::UserIdA.eq(id_b))
624 .or(contact::Column::AToB
625 .ne(a_to_b)
626 .and(contact::Column::UserIdA.eq(id_a))),
627 ),
628 )
629 .to_owned(),
630 )
631 .exec_without_returning(&*tx)
632 .await?;
633
634 if rows_affected == 1 {
635 Ok(())
636 } else {
637 Err(anyhow!("contact already requested"))?
638 }
639 })
640 .await
641 }
642
643 /// Returns a bool indicating whether the removed contact had originally accepted or not
644 ///
645 /// Deletes the contact identified by the requester and responder ids, and then returns
646 /// whether the deleted contact had originally accepted or was a pending contact request.
647 ///
648 /// # Arguments
649 ///
650 /// * `requester_id` - The user that initiates this request
651 /// * `responder_id` - The user that will be removed
652 pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<bool> {
653 self.transaction(|tx| async move {
654 let (id_a, id_b) = if responder_id < requester_id {
655 (responder_id, requester_id)
656 } else {
657 (requester_id, responder_id)
658 };
659
660 let contact = contact::Entity::find()
661 .filter(
662 contact::Column::UserIdA
663 .eq(id_a)
664 .and(contact::Column::UserIdB.eq(id_b)),
665 )
666 .one(&*tx)
667 .await?
668 .ok_or_else(|| anyhow!("no such contact"))?;
669
670 contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
671 Ok(contact.accepted)
672 })
673 .await
674 }
675
676 pub async fn dismiss_contact_notification(
677 &self,
678 user_id: UserId,
679 contact_user_id: UserId,
680 ) -> Result<()> {
681 self.transaction(|tx| async move {
682 let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
683 (user_id, contact_user_id, true)
684 } else {
685 (contact_user_id, user_id, false)
686 };
687
688 let result = contact::Entity::update_many()
689 .set(contact::ActiveModel {
690 should_notify: ActiveValue::set(false),
691 ..Default::default()
692 })
693 .filter(
694 contact::Column::UserIdA
695 .eq(id_a)
696 .and(contact::Column::UserIdB.eq(id_b))
697 .and(
698 contact::Column::AToB
699 .eq(a_to_b)
700 .and(contact::Column::Accepted.eq(true))
701 .or(contact::Column::AToB
702 .ne(a_to_b)
703 .and(contact::Column::Accepted.eq(false))),
704 ),
705 )
706 .exec(&*tx)
707 .await?;
708 if result.rows_affected == 0 {
709 Err(anyhow!("no such contact request"))?
710 } else {
711 Ok(())
712 }
713 })
714 .await
715 }
716
717 pub async fn respond_to_contact_request(
718 &self,
719 responder_id: UserId,
720 requester_id: UserId,
721 accept: bool,
722 ) -> Result<()> {
723 self.transaction(|tx| async move {
724 let (id_a, id_b, a_to_b) = if responder_id < requester_id {
725 (responder_id, requester_id, false)
726 } else {
727 (requester_id, responder_id, true)
728 };
729 let rows_affected = if accept {
730 let result = contact::Entity::update_many()
731 .set(contact::ActiveModel {
732 accepted: ActiveValue::set(true),
733 should_notify: ActiveValue::set(true),
734 ..Default::default()
735 })
736 .filter(
737 contact::Column::UserIdA
738 .eq(id_a)
739 .and(contact::Column::UserIdB.eq(id_b))
740 .and(contact::Column::AToB.eq(a_to_b)),
741 )
742 .exec(&*tx)
743 .await?;
744 result.rows_affected
745 } else {
746 let result = contact::Entity::delete_many()
747 .filter(
748 contact::Column::UserIdA
749 .eq(id_a)
750 .and(contact::Column::UserIdB.eq(id_b))
751 .and(contact::Column::AToB.eq(a_to_b))
752 .and(contact::Column::Accepted.eq(false)),
753 )
754 .exec(&*tx)
755 .await?;
756
757 result.rows_affected
758 };
759
760 if rows_affected == 1 {
761 Ok(())
762 } else {
763 Err(anyhow!("no such contact request"))?
764 }
765 })
766 .await
767 }
768
769 pub fn fuzzy_like_string(string: &str) -> String {
770 let mut result = String::with_capacity(string.len() * 2 + 1);
771 for c in string.chars() {
772 if c.is_alphanumeric() {
773 result.push('%');
774 result.push(c);
775 }
776 }
777 result.push('%');
778 result
779 }
780
781 pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
782 self.transaction(|tx| async {
783 let tx = tx;
784 let like_string = Self::fuzzy_like_string(name_query);
785 let query = "
786 SELECT users.*
787 FROM users
788 WHERE github_login ILIKE $1
789 ORDER BY github_login <-> $2
790 LIMIT $3
791 ";
792
793 Ok(user::Entity::find()
794 .from_raw_sql(Statement::from_sql_and_values(
795 self.pool.get_database_backend(),
796 query.into(),
797 vec![like_string.into(), name_query.into(), limit.into()],
798 ))
799 .all(&*tx)
800 .await?)
801 })
802 .await
803 }
804
805 // signups
806
807 pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
808 self.transaction(|tx| async move {
809 signup::Entity::insert(signup::ActiveModel {
810 email_address: ActiveValue::set(signup.email_address.clone()),
811 email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
812 email_confirmation_sent: ActiveValue::set(false),
813 platform_mac: ActiveValue::set(signup.platform_mac),
814 platform_windows: ActiveValue::set(signup.platform_windows),
815 platform_linux: ActiveValue::set(signup.platform_linux),
816 platform_unknown: ActiveValue::set(false),
817 editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
818 programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
819 device_id: ActiveValue::set(signup.device_id.clone()),
820 added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
821 ..Default::default()
822 })
823 .on_conflict(
824 OnConflict::column(signup::Column::EmailAddress)
825 .update_columns([
826 signup::Column::PlatformMac,
827 signup::Column::PlatformWindows,
828 signup::Column::PlatformLinux,
829 signup::Column::EditorFeatures,
830 signup::Column::ProgrammingLanguages,
831 signup::Column::DeviceId,
832 signup::Column::AddedToMailingList,
833 ])
834 .to_owned(),
835 )
836 .exec(&*tx)
837 .await?;
838 Ok(())
839 })
840 .await
841 }
842
843 pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
844 self.transaction(|tx| async move {
845 let signup = signup::Entity::find()
846 .filter(signup::Column::EmailAddress.eq(email_address))
847 .one(&*tx)
848 .await?
849 .ok_or_else(|| {
850 anyhow!("signup with email address {} doesn't exist", email_address)
851 })?;
852
853 Ok(signup)
854 })
855 .await
856 }
857
858 pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
859 self.transaction(|tx| async move {
860 let query = "
861 SELECT
862 COUNT(*) as count,
863 COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
864 COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
865 COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
866 COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
867 FROM (
868 SELECT *
869 FROM signups
870 WHERE
871 NOT email_confirmation_sent
872 ) AS unsent
873 ";
874 Ok(
875 WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
876 self.pool.get_database_backend(),
877 query.into(),
878 vec![],
879 ))
880 .one(&*tx)
881 .await?
882 .ok_or_else(|| anyhow!("invalid result"))?,
883 )
884 })
885 .await
886 }
887
888 pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
889 let emails = invites
890 .iter()
891 .map(|s| s.email_address.as_str())
892 .collect::<Vec<_>>();
893 self.transaction(|tx| async {
894 let tx = tx;
895 signup::Entity::update_many()
896 .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
897 .set(signup::ActiveModel {
898 email_confirmation_sent: ActiveValue::set(true),
899 ..Default::default()
900 })
901 .exec(&*tx)
902 .await?;
903 Ok(())
904 })
905 .await
906 }
907
908 pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
909 self.transaction(|tx| async move {
910 Ok(signup::Entity::find()
911 .select_only()
912 .column(signup::Column::EmailAddress)
913 .column(signup::Column::EmailConfirmationCode)
914 .filter(
915 signup::Column::EmailConfirmationSent.eq(false).and(
916 signup::Column::PlatformMac
917 .eq(true)
918 .or(signup::Column::PlatformUnknown.eq(true)),
919 ),
920 )
921 .order_by_asc(signup::Column::CreatedAt)
922 .limit(count as u64)
923 .into_model()
924 .all(&*tx)
925 .await?)
926 })
927 .await
928 }
929
930 // invite codes
931
932 pub async fn create_invite_from_code(
933 &self,
934 code: &str,
935 email_address: &str,
936 device_id: Option<&str>,
937 added_to_mailing_list: bool,
938 ) -> Result<Invite> {
939 self.transaction(|tx| async move {
940 let existing_user = user::Entity::find()
941 .filter(user::Column::EmailAddress.eq(email_address))
942 .one(&*tx)
943 .await?;
944
945 if existing_user.is_some() {
946 Err(anyhow!("email address is already in use"))?;
947 }
948
949 let inviting_user_with_invites = match user::Entity::find()
950 .filter(
951 user::Column::InviteCode
952 .eq(code)
953 .and(user::Column::InviteCount.gt(0)),
954 )
955 .one(&*tx)
956 .await?
957 {
958 Some(inviting_user) => inviting_user,
959 None => {
960 return Err(Error::Http(
961 StatusCode::UNAUTHORIZED,
962 "unable to find an invite code with invites remaining".to_string(),
963 ))?
964 }
965 };
966 user::Entity::update_many()
967 .filter(
968 user::Column::Id
969 .eq(inviting_user_with_invites.id)
970 .and(user::Column::InviteCount.gt(0)),
971 )
972 .col_expr(
973 user::Column::InviteCount,
974 Expr::col(user::Column::InviteCount).sub(1),
975 )
976 .exec(&*tx)
977 .await?;
978
979 let signup = signup::Entity::insert(signup::ActiveModel {
980 email_address: ActiveValue::set(email_address.into()),
981 email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
982 email_confirmation_sent: ActiveValue::set(false),
983 inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
984 platform_linux: ActiveValue::set(false),
985 platform_mac: ActiveValue::set(false),
986 platform_windows: ActiveValue::set(false),
987 platform_unknown: ActiveValue::set(true),
988 device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
989 added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
990 ..Default::default()
991 })
992 .on_conflict(
993 OnConflict::column(signup::Column::EmailAddress)
994 .update_column(signup::Column::InvitingUserId)
995 .to_owned(),
996 )
997 .exec_with_returning(&*tx)
998 .await?;
999
1000 Ok(Invite {
1001 email_address: signup.email_address,
1002 email_confirmation_code: signup.email_confirmation_code,
1003 })
1004 })
1005 .await
1006 }
1007
1008 pub async fn create_user_from_invite(
1009 &self,
1010 invite: &Invite,
1011 user: NewUserParams,
1012 ) -> Result<Option<NewUserResult>> {
1013 self.transaction(|tx| async {
1014 let tx = tx;
1015 let signup = signup::Entity::find()
1016 .filter(
1017 signup::Column::EmailAddress
1018 .eq(invite.email_address.as_str())
1019 .and(
1020 signup::Column::EmailConfirmationCode
1021 .eq(invite.email_confirmation_code.as_str()),
1022 ),
1023 )
1024 .one(&*tx)
1025 .await?
1026 .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
1027
1028 if signup.user_id.is_some() {
1029 return Ok(None);
1030 }
1031
1032 let user = user::Entity::insert(user::ActiveModel {
1033 email_address: ActiveValue::set(Some(invite.email_address.clone())),
1034 github_login: ActiveValue::set(user.github_login.clone()),
1035 github_user_id: ActiveValue::set(Some(user.github_user_id)),
1036 admin: ActiveValue::set(false),
1037 invite_count: ActiveValue::set(user.invite_count),
1038 invite_code: ActiveValue::set(Some(random_invite_code())),
1039 metrics_id: ActiveValue::set(Uuid::new_v4()),
1040 ..Default::default()
1041 })
1042 .on_conflict(
1043 OnConflict::column(user::Column::GithubLogin)
1044 .update_columns([
1045 user::Column::EmailAddress,
1046 user::Column::GithubUserId,
1047 user::Column::Admin,
1048 ])
1049 .to_owned(),
1050 )
1051 .exec_with_returning(&*tx)
1052 .await?;
1053
1054 let mut signup = signup.into_active_model();
1055 signup.user_id = ActiveValue::set(Some(user.id));
1056 let signup = signup.update(&*tx).await?;
1057
1058 if let Some(inviting_user_id) = signup.inviting_user_id {
1059 let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1060 (inviting_user_id, user.id, true)
1061 } else {
1062 (user.id, inviting_user_id, false)
1063 };
1064
1065 contact::Entity::insert(contact::ActiveModel {
1066 user_id_a: ActiveValue::set(user_id_a),
1067 user_id_b: ActiveValue::set(user_id_b),
1068 a_to_b: ActiveValue::set(a_to_b),
1069 should_notify: ActiveValue::set(true),
1070 accepted: ActiveValue::set(true),
1071 ..Default::default()
1072 })
1073 .on_conflict(OnConflict::new().do_nothing().to_owned())
1074 .exec_without_returning(&*tx)
1075 .await?;
1076 }
1077
1078 Ok(Some(NewUserResult {
1079 user_id: user.id,
1080 metrics_id: user.metrics_id.to_string(),
1081 inviting_user_id: signup.inviting_user_id,
1082 signup_device_id: signup.device_id,
1083 }))
1084 })
1085 .await
1086 }
1087
1088 pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1089 self.transaction(|tx| async move {
1090 if count > 0 {
1091 user::Entity::update_many()
1092 .filter(
1093 user::Column::Id
1094 .eq(id)
1095 .and(user::Column::InviteCode.is_null()),
1096 )
1097 .set(user::ActiveModel {
1098 invite_code: ActiveValue::set(Some(random_invite_code())),
1099 ..Default::default()
1100 })
1101 .exec(&*tx)
1102 .await?;
1103 }
1104
1105 user::Entity::update_many()
1106 .filter(user::Column::Id.eq(id))
1107 .set(user::ActiveModel {
1108 invite_count: ActiveValue::set(count),
1109 ..Default::default()
1110 })
1111 .exec(&*tx)
1112 .await?;
1113 Ok(())
1114 })
1115 .await
1116 }
1117
1118 pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1119 self.transaction(|tx| async move {
1120 match user::Entity::find_by_id(id).one(&*tx).await? {
1121 Some(user) if user.invite_code.is_some() => {
1122 Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1123 }
1124 _ => Ok(None),
1125 }
1126 })
1127 .await
1128 }
1129
1130 pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1131 self.transaction(|tx| async move {
1132 user::Entity::find()
1133 .filter(user::Column::InviteCode.eq(code))
1134 .one(&*tx)
1135 .await?
1136 .ok_or_else(|| {
1137 Error::Http(
1138 StatusCode::NOT_FOUND,
1139 "that invite code does not exist".to_string(),
1140 )
1141 })
1142 })
1143 .await
1144 }
1145
1146 // rooms
1147
1148 pub async fn incoming_call_for_user(
1149 &self,
1150 user_id: UserId,
1151 ) -> Result<Option<proto::IncomingCall>> {
1152 self.transaction(|tx| async move {
1153 let pending_participant = room_participant::Entity::find()
1154 .filter(
1155 room_participant::Column::UserId
1156 .eq(user_id)
1157 .and(room_participant::Column::AnsweringConnectionId.is_null()),
1158 )
1159 .one(&*tx)
1160 .await?;
1161
1162 if let Some(pending_participant) = pending_participant {
1163 let room = self.get_room(pending_participant.room_id, &tx).await?;
1164 Ok(Self::build_incoming_call(&room, user_id))
1165 } else {
1166 Ok(None)
1167 }
1168 })
1169 .await
1170 }
1171
1172 pub async fn create_room(
1173 &self,
1174 user_id: UserId,
1175 connection: ConnectionId,
1176 live_kit_room: &str,
1177 ) -> Result<proto::Room> {
1178 self.transaction(|tx| async move {
1179 let room = room::ActiveModel {
1180 live_kit_room: ActiveValue::set(live_kit_room.into()),
1181 ..Default::default()
1182 }
1183 .insert(&*tx)
1184 .await?;
1185 room_participant::ActiveModel {
1186 room_id: ActiveValue::set(room.id),
1187 user_id: ActiveValue::set(user_id),
1188 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1189 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1190 connection.owner_id as i32,
1191 ))),
1192 answering_connection_lost: ActiveValue::set(false),
1193 calling_user_id: ActiveValue::set(user_id),
1194 calling_connection_id: ActiveValue::set(connection.id as i32),
1195 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1196 connection.owner_id as i32,
1197 ))),
1198 ..Default::default()
1199 }
1200 .insert(&*tx)
1201 .await?;
1202
1203 let room = self.get_room(room.id, &tx).await?;
1204 Ok(room)
1205 })
1206 .await
1207 }
1208
1209 pub async fn call(
1210 &self,
1211 room_id: RoomId,
1212 calling_user_id: UserId,
1213 calling_connection: ConnectionId,
1214 called_user_id: UserId,
1215 initial_project_id: Option<ProjectId>,
1216 ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1217 self.room_transaction(room_id, |tx| async move {
1218 room_participant::ActiveModel {
1219 room_id: ActiveValue::set(room_id),
1220 user_id: ActiveValue::set(called_user_id),
1221 answering_connection_lost: ActiveValue::set(false),
1222 calling_user_id: ActiveValue::set(calling_user_id),
1223 calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1224 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1225 calling_connection.owner_id as i32,
1226 ))),
1227 initial_project_id: ActiveValue::set(initial_project_id),
1228 ..Default::default()
1229 }
1230 .insert(&*tx)
1231 .await?;
1232
1233 let room = self.get_room(room_id, &tx).await?;
1234 let incoming_call = Self::build_incoming_call(&room, called_user_id)
1235 .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1236 Ok((room, incoming_call))
1237 })
1238 .await
1239 }
1240
1241 pub async fn call_failed(
1242 &self,
1243 room_id: RoomId,
1244 called_user_id: UserId,
1245 ) -> Result<RoomGuard<proto::Room>> {
1246 self.room_transaction(room_id, |tx| async move {
1247 room_participant::Entity::delete_many()
1248 .filter(
1249 room_participant::Column::RoomId
1250 .eq(room_id)
1251 .and(room_participant::Column::UserId.eq(called_user_id)),
1252 )
1253 .exec(&*tx)
1254 .await?;
1255 let room = self.get_room(room_id, &tx).await?;
1256 Ok(room)
1257 })
1258 .await
1259 }
1260
1261 pub async fn decline_call(
1262 &self,
1263 expected_room_id: Option<RoomId>,
1264 user_id: UserId,
1265 ) -> Result<Option<RoomGuard<proto::Room>>> {
1266 self.optional_room_transaction(|tx| async move {
1267 let mut filter = Condition::all()
1268 .add(room_participant::Column::UserId.eq(user_id))
1269 .add(room_participant::Column::AnsweringConnectionId.is_null());
1270 if let Some(room_id) = expected_room_id {
1271 filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1272 }
1273 let participant = room_participant::Entity::find()
1274 .filter(filter)
1275 .one(&*tx)
1276 .await?;
1277
1278 let participant = if let Some(participant) = participant {
1279 participant
1280 } else if expected_room_id.is_some() {
1281 return Err(anyhow!("could not find call to decline"))?;
1282 } else {
1283 return Ok(None);
1284 };
1285
1286 let room_id = participant.room_id;
1287 room_participant::Entity::delete(participant.into_active_model())
1288 .exec(&*tx)
1289 .await?;
1290
1291 let room = self.get_room(room_id, &tx).await?;
1292 Ok(Some((room_id, room)))
1293 })
1294 .await
1295 }
1296
1297 pub async fn cancel_call(
1298 &self,
1299 room_id: RoomId,
1300 calling_connection: ConnectionId,
1301 called_user_id: UserId,
1302 ) -> Result<RoomGuard<proto::Room>> {
1303 self.room_transaction(room_id, |tx| async move {
1304 let participant = room_participant::Entity::find()
1305 .filter(
1306 Condition::all()
1307 .add(room_participant::Column::UserId.eq(called_user_id))
1308 .add(room_participant::Column::RoomId.eq(room_id))
1309 .add(
1310 room_participant::Column::CallingConnectionId
1311 .eq(calling_connection.id as i32),
1312 )
1313 .add(
1314 room_participant::Column::CallingConnectionServerId
1315 .eq(calling_connection.owner_id as i32),
1316 )
1317 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1318 )
1319 .one(&*tx)
1320 .await?
1321 .ok_or_else(|| anyhow!("no call to cancel"))?;
1322
1323 room_participant::Entity::delete(participant.into_active_model())
1324 .exec(&*tx)
1325 .await?;
1326
1327 let room = self.get_room(room_id, &tx).await?;
1328 Ok(room)
1329 })
1330 .await
1331 }
1332
1333 pub async fn join_room(
1334 &self,
1335 room_id: RoomId,
1336 user_id: UserId,
1337 connection: ConnectionId,
1338 ) -> Result<RoomGuard<proto::Room>> {
1339 self.room_transaction(room_id, |tx| async move {
1340 let result = room_participant::Entity::update_many()
1341 .filter(
1342 Condition::all()
1343 .add(room_participant::Column::RoomId.eq(room_id))
1344 .add(room_participant::Column::UserId.eq(user_id))
1345 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1346 )
1347 .set(room_participant::ActiveModel {
1348 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1349 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1350 connection.owner_id as i32,
1351 ))),
1352 answering_connection_lost: ActiveValue::set(false),
1353 ..Default::default()
1354 })
1355 .exec(&*tx)
1356 .await?;
1357 if result.rows_affected == 0 {
1358 Err(anyhow!("room does not exist or was already joined"))?
1359 } else {
1360 let room = self.get_room(room_id, &tx).await?;
1361 Ok(room)
1362 }
1363 })
1364 .await
1365 }
1366
1367 pub async fn rejoin_room(
1368 &self,
1369 rejoin_room: proto::RejoinRoom,
1370 user_id: UserId,
1371 connection: ConnectionId,
1372 ) -> Result<RoomGuard<RejoinedRoom>> {
1373 let room_id = RoomId::from_proto(rejoin_room.id);
1374 self.room_transaction(room_id, |tx| async {
1375 let tx = tx;
1376 let participant_update = room_participant::Entity::update_many()
1377 .filter(
1378 Condition::all()
1379 .add(room_participant::Column::RoomId.eq(room_id))
1380 .add(room_participant::Column::UserId.eq(user_id))
1381 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1382 .add(
1383 Condition::any()
1384 .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1385 .add(
1386 room_participant::Column::AnsweringConnectionServerId
1387 .ne(connection.owner_id as i32),
1388 ),
1389 ),
1390 )
1391 .set(room_participant::ActiveModel {
1392 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1393 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1394 connection.owner_id as i32,
1395 ))),
1396 answering_connection_lost: ActiveValue::set(false),
1397 ..Default::default()
1398 })
1399 .exec(&*tx)
1400 .await?;
1401 if participant_update.rows_affected == 0 {
1402 return Err(anyhow!("room does not exist or was already joined"))?;
1403 }
1404
1405 let mut reshared_projects = Vec::new();
1406 for reshared_project in &rejoin_room.reshared_projects {
1407 let project_id = ProjectId::from_proto(reshared_project.project_id);
1408 let project = project::Entity::find_by_id(project_id)
1409 .one(&*tx)
1410 .await?
1411 .ok_or_else(|| anyhow!("project does not exist"))?;
1412 if project.host_user_id != user_id {
1413 return Err(anyhow!("no such project"))?;
1414 }
1415
1416 let mut collaborators = project
1417 .find_related(project_collaborator::Entity)
1418 .all(&*tx)
1419 .await?;
1420 let host_ix = collaborators
1421 .iter()
1422 .position(|collaborator| {
1423 collaborator.user_id == user_id && collaborator.is_host
1424 })
1425 .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1426 let host = collaborators.swap_remove(host_ix);
1427 let old_connection_id = host.connection();
1428
1429 project::Entity::update(project::ActiveModel {
1430 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1431 host_connection_server_id: ActiveValue::set(Some(ServerId(
1432 connection.owner_id as i32,
1433 ))),
1434 ..project.into_active_model()
1435 })
1436 .exec(&*tx)
1437 .await?;
1438 project_collaborator::Entity::update(project_collaborator::ActiveModel {
1439 connection_id: ActiveValue::set(connection.id as i32),
1440 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1441 ..host.into_active_model()
1442 })
1443 .exec(&*tx)
1444 .await?;
1445
1446 self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1447 .await?;
1448
1449 reshared_projects.push(ResharedProject {
1450 id: project_id,
1451 old_connection_id,
1452 collaborators: collaborators
1453 .iter()
1454 .map(|collaborator| ProjectCollaborator {
1455 connection_id: collaborator.connection(),
1456 user_id: collaborator.user_id,
1457 replica_id: collaborator.replica_id,
1458 is_host: collaborator.is_host,
1459 })
1460 .collect(),
1461 worktrees: reshared_project.worktrees.clone(),
1462 });
1463 }
1464
1465 project::Entity::delete_many()
1466 .filter(
1467 Condition::all()
1468 .add(project::Column::RoomId.eq(room_id))
1469 .add(project::Column::HostUserId.eq(user_id))
1470 .add(
1471 project::Column::Id
1472 .is_not_in(reshared_projects.iter().map(|project| project.id)),
1473 ),
1474 )
1475 .exec(&*tx)
1476 .await?;
1477
1478 let mut rejoined_projects = Vec::new();
1479 for rejoined_project in &rejoin_room.rejoined_projects {
1480 let project_id = ProjectId::from_proto(rejoined_project.id);
1481 let Some(project) = project::Entity::find_by_id(project_id)
1482 .one(&*tx)
1483 .await? else { continue };
1484
1485 let mut worktrees = Vec::new();
1486 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1487 for db_worktree in db_worktrees {
1488 let mut worktree = RejoinedWorktree {
1489 id: db_worktree.id as u64,
1490 abs_path: db_worktree.abs_path,
1491 root_name: db_worktree.root_name,
1492 visible: db_worktree.visible,
1493 updated_entries: Default::default(),
1494 removed_entries: Default::default(),
1495 updated_repositories: Default::default(),
1496 removed_repositories: Default::default(),
1497 diagnostic_summaries: Default::default(),
1498 settings_files: Default::default(),
1499 scan_id: db_worktree.scan_id as u64,
1500 completed_scan_id: db_worktree.completed_scan_id as u64,
1501 };
1502
1503 let rejoined_worktree = rejoined_project
1504 .worktrees
1505 .iter()
1506 .find(|worktree| worktree.id == db_worktree.id as u64);
1507
1508 // File entries
1509 {
1510 let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1511 worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1512 } else {
1513 worktree_entry::Column::IsDeleted.eq(false)
1514 };
1515
1516 let mut db_entries = worktree_entry::Entity::find()
1517 .filter(
1518 Condition::all()
1519 .add(worktree_entry::Column::ProjectId.eq(project.id))
1520 .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1521 .add(entry_filter),
1522 )
1523 .stream(&*tx)
1524 .await?;
1525
1526 while let Some(db_entry) = db_entries.next().await {
1527 let db_entry = db_entry?;
1528 if db_entry.is_deleted {
1529 worktree.removed_entries.push(db_entry.id as u64);
1530 } else {
1531 worktree.updated_entries.push(proto::Entry {
1532 id: db_entry.id as u64,
1533 is_dir: db_entry.is_dir,
1534 path: db_entry.path,
1535 inode: db_entry.inode as u64,
1536 mtime: Some(proto::Timestamp {
1537 seconds: db_entry.mtime_seconds as u64,
1538 nanos: db_entry.mtime_nanos as u32,
1539 }),
1540 is_symlink: db_entry.is_symlink,
1541 is_ignored: db_entry.is_ignored,
1542 });
1543 }
1544 }
1545 }
1546
1547 // Repository Entries
1548 {
1549 let repository_entry_filter =
1550 if let Some(rejoined_worktree) = rejoined_worktree {
1551 worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
1552 } else {
1553 worktree_repository::Column::IsDeleted.eq(false)
1554 };
1555
1556 let mut db_repositories = worktree_repository::Entity::find()
1557 .filter(
1558 Condition::all()
1559 .add(worktree_repository::Column::ProjectId.eq(project.id))
1560 .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
1561 .add(repository_entry_filter),
1562 )
1563 .stream(&*tx)
1564 .await?;
1565
1566 while let Some(db_repository) = db_repositories.next().await {
1567 let db_repository = db_repository?;
1568 if db_repository.is_deleted {
1569 worktree
1570 .removed_repositories
1571 .push(db_repository.work_directory_id as u64);
1572 } else {
1573 worktree.updated_repositories.push(proto::RepositoryEntry {
1574 work_directory_id: db_repository.work_directory_id as u64,
1575 branch: db_repository.branch,
1576 removed_repo_paths: Default::default(),
1577 updated_statuses: Default::default(),
1578 });
1579 }
1580 }
1581 }
1582
1583 // Repository Status Entries
1584 for repository in worktree.updated_repositories.iter_mut() {
1585 let repository_status_entry_filter =
1586 if let Some(rejoined_worktree) = rejoined_worktree {
1587 worktree_repository_statuses::Column::ScanId
1588 .gt(rejoined_worktree.scan_id)
1589 } else {
1590 worktree_repository_statuses::Column::IsDeleted.eq(false)
1591 };
1592
1593 let mut db_repository_statuses =
1594 worktree_repository_statuses::Entity::find()
1595 .filter(
1596 Condition::all()
1597 .add(
1598 worktree_repository_statuses::Column::ProjectId
1599 .eq(project.id),
1600 )
1601 .add(
1602 worktree_repository_statuses::Column::WorktreeId
1603 .eq(worktree.id),
1604 )
1605 .add(
1606 worktree_repository_statuses::Column::WorkDirectoryId
1607 .eq(repository.work_directory_id),
1608 )
1609 .add(repository_status_entry_filter),
1610 )
1611 .stream(&*tx)
1612 .await?;
1613
1614 while let Some(db_status_entry) = db_repository_statuses.next().await {
1615 let db_status_entry = db_status_entry?;
1616 if db_status_entry.is_deleted {
1617 repository
1618 .removed_repo_paths
1619 .push(db_status_entry.repo_path);
1620 } else {
1621 repository.updated_statuses.push(proto::StatusEntry {
1622 repo_path: db_status_entry.repo_path,
1623 status: db_status_entry.status as i32,
1624 });
1625 }
1626 }
1627 }
1628
1629 worktrees.push(worktree);
1630 }
1631
1632 let language_servers = project
1633 .find_related(language_server::Entity)
1634 .all(&*tx)
1635 .await?
1636 .into_iter()
1637 .map(|language_server| proto::LanguageServer {
1638 id: language_server.id as u64,
1639 name: language_server.name,
1640 })
1641 .collect::<Vec<_>>();
1642
1643 {
1644 let mut db_settings_files = worktree_settings_file::Entity::find()
1645 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
1646 .stream(&*tx)
1647 .await?;
1648 while let Some(db_settings_file) = db_settings_files.next().await {
1649 let db_settings_file = db_settings_file?;
1650 if let Some(worktree) = worktrees
1651 .iter_mut()
1652 .find(|w| w.id == db_settings_file.worktree_id as u64)
1653 {
1654 worktree.settings_files.push(WorktreeSettingsFile {
1655 path: db_settings_file.path,
1656 content: db_settings_file.content,
1657 });
1658 }
1659 }
1660 }
1661
1662 let mut collaborators = project
1663 .find_related(project_collaborator::Entity)
1664 .all(&*tx)
1665 .await?;
1666 let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1667 .iter()
1668 .position(|collaborator| collaborator.user_id == user_id)
1669 {
1670 collaborators.swap_remove(self_collaborator_ix)
1671 } else {
1672 continue;
1673 };
1674 let old_connection_id = self_collaborator.connection();
1675 project_collaborator::Entity::update(project_collaborator::ActiveModel {
1676 connection_id: ActiveValue::set(connection.id as i32),
1677 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1678 ..self_collaborator.into_active_model()
1679 })
1680 .exec(&*tx)
1681 .await?;
1682
1683 let collaborators = collaborators
1684 .into_iter()
1685 .map(|collaborator| ProjectCollaborator {
1686 connection_id: collaborator.connection(),
1687 user_id: collaborator.user_id,
1688 replica_id: collaborator.replica_id,
1689 is_host: collaborator.is_host,
1690 })
1691 .collect::<Vec<_>>();
1692
1693 rejoined_projects.push(RejoinedProject {
1694 id: project_id,
1695 old_connection_id,
1696 collaborators,
1697 worktrees,
1698 language_servers,
1699 });
1700 }
1701
1702 let room = self.get_room(room_id, &tx).await?;
1703 Ok(RejoinedRoom {
1704 room,
1705 rejoined_projects,
1706 reshared_projects,
1707 })
1708 })
1709 .await
1710 }
1711
1712 pub async fn leave_room(
1713 &self,
1714 connection: ConnectionId,
1715 ) -> Result<Option<RoomGuard<LeftRoom>>> {
1716 self.optional_room_transaction(|tx| async move {
1717 let leaving_participant = room_participant::Entity::find()
1718 .filter(
1719 Condition::all()
1720 .add(
1721 room_participant::Column::AnsweringConnectionId
1722 .eq(connection.id as i32),
1723 )
1724 .add(
1725 room_participant::Column::AnsweringConnectionServerId
1726 .eq(connection.owner_id as i32),
1727 ),
1728 )
1729 .one(&*tx)
1730 .await?;
1731
1732 if let Some(leaving_participant) = leaving_participant {
1733 // Leave room.
1734 let room_id = leaving_participant.room_id;
1735 room_participant::Entity::delete_by_id(leaving_participant.id)
1736 .exec(&*tx)
1737 .await?;
1738
1739 // Cancel pending calls initiated by the leaving user.
1740 let called_participants = room_participant::Entity::find()
1741 .filter(
1742 Condition::all()
1743 .add(
1744 room_participant::Column::CallingUserId
1745 .eq(leaving_participant.user_id),
1746 )
1747 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1748 )
1749 .all(&*tx)
1750 .await?;
1751 room_participant::Entity::delete_many()
1752 .filter(
1753 room_participant::Column::Id
1754 .is_in(called_participants.iter().map(|participant| participant.id)),
1755 )
1756 .exec(&*tx)
1757 .await?;
1758 let canceled_calls_to_user_ids = called_participants
1759 .into_iter()
1760 .map(|participant| participant.user_id)
1761 .collect();
1762
1763 // Detect left projects.
1764 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1765 enum QueryProjectIds {
1766 ProjectId,
1767 }
1768 let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1769 .select_only()
1770 .column_as(
1771 project_collaborator::Column::ProjectId,
1772 QueryProjectIds::ProjectId,
1773 )
1774 .filter(
1775 Condition::all()
1776 .add(
1777 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1778 )
1779 .add(
1780 project_collaborator::Column::ConnectionServerId
1781 .eq(connection.owner_id as i32),
1782 ),
1783 )
1784 .into_values::<_, QueryProjectIds>()
1785 .all(&*tx)
1786 .await?;
1787 let mut left_projects = HashMap::default();
1788 let mut collaborators = project_collaborator::Entity::find()
1789 .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1790 .stream(&*tx)
1791 .await?;
1792 while let Some(collaborator) = collaborators.next().await {
1793 let collaborator = collaborator?;
1794 let left_project =
1795 left_projects
1796 .entry(collaborator.project_id)
1797 .or_insert(LeftProject {
1798 id: collaborator.project_id,
1799 host_user_id: Default::default(),
1800 connection_ids: Default::default(),
1801 host_connection_id: Default::default(),
1802 });
1803
1804 let collaborator_connection_id = collaborator.connection();
1805 if collaborator_connection_id != connection {
1806 left_project.connection_ids.push(collaborator_connection_id);
1807 }
1808
1809 if collaborator.is_host {
1810 left_project.host_user_id = collaborator.user_id;
1811 left_project.host_connection_id = collaborator_connection_id;
1812 }
1813 }
1814 drop(collaborators);
1815
1816 // Leave projects.
1817 project_collaborator::Entity::delete_many()
1818 .filter(
1819 Condition::all()
1820 .add(
1821 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1822 )
1823 .add(
1824 project_collaborator::Column::ConnectionServerId
1825 .eq(connection.owner_id as i32),
1826 ),
1827 )
1828 .exec(&*tx)
1829 .await?;
1830
1831 // Unshare projects.
1832 project::Entity::delete_many()
1833 .filter(
1834 Condition::all()
1835 .add(project::Column::RoomId.eq(room_id))
1836 .add(project::Column::HostConnectionId.eq(connection.id as i32))
1837 .add(
1838 project::Column::HostConnectionServerId
1839 .eq(connection.owner_id as i32),
1840 ),
1841 )
1842 .exec(&*tx)
1843 .await?;
1844
1845 let room = self.get_room(room_id, &tx).await?;
1846 if room.participants.is_empty() {
1847 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
1848 }
1849
1850 let left_room = LeftRoom {
1851 room,
1852 left_projects,
1853 canceled_calls_to_user_ids,
1854 };
1855
1856 if left_room.room.participants.is_empty() {
1857 self.rooms.remove(&room_id);
1858 }
1859
1860 Ok(Some((room_id, left_room)))
1861 } else {
1862 Ok(None)
1863 }
1864 })
1865 .await
1866 }
1867
1868 pub async fn follow(
1869 &self,
1870 project_id: ProjectId,
1871 leader_connection: ConnectionId,
1872 follower_connection: ConnectionId,
1873 ) -> Result<RoomGuard<proto::Room>> {
1874 let room_id = self.room_id_for_project(project_id).await?;
1875 self.room_transaction(room_id, |tx| async move {
1876 follower::ActiveModel {
1877 room_id: ActiveValue::set(room_id),
1878 project_id: ActiveValue::set(project_id),
1879 leader_connection_server_id: ActiveValue::set(ServerId(
1880 leader_connection.owner_id as i32,
1881 )),
1882 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1883 follower_connection_server_id: ActiveValue::set(ServerId(
1884 follower_connection.owner_id as i32,
1885 )),
1886 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1887 ..Default::default()
1888 }
1889 .insert(&*tx)
1890 .await?;
1891
1892 let room = self.get_room(room_id, &*tx).await?;
1893 Ok(room)
1894 })
1895 .await
1896 }
1897
1898 pub async fn unfollow(
1899 &self,
1900 project_id: ProjectId,
1901 leader_connection: ConnectionId,
1902 follower_connection: ConnectionId,
1903 ) -> Result<RoomGuard<proto::Room>> {
1904 let room_id = self.room_id_for_project(project_id).await?;
1905 self.room_transaction(room_id, |tx| async move {
1906 follower::Entity::delete_many()
1907 .filter(
1908 Condition::all()
1909 .add(follower::Column::ProjectId.eq(project_id))
1910 .add(
1911 follower::Column::LeaderConnectionServerId
1912 .eq(leader_connection.owner_id),
1913 )
1914 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1915 .add(
1916 follower::Column::FollowerConnectionServerId
1917 .eq(follower_connection.owner_id),
1918 )
1919 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1920 )
1921 .exec(&*tx)
1922 .await?;
1923
1924 let room = self.get_room(room_id, &*tx).await?;
1925 Ok(room)
1926 })
1927 .await
1928 }
1929
1930 pub async fn update_room_participant_location(
1931 &self,
1932 room_id: RoomId,
1933 connection: ConnectionId,
1934 location: proto::ParticipantLocation,
1935 ) -> Result<RoomGuard<proto::Room>> {
1936 self.room_transaction(room_id, |tx| async {
1937 let tx = tx;
1938 let location_kind;
1939 let location_project_id;
1940 match location
1941 .variant
1942 .as_ref()
1943 .ok_or_else(|| anyhow!("invalid location"))?
1944 {
1945 proto::participant_location::Variant::SharedProject(project) => {
1946 location_kind = 0;
1947 location_project_id = Some(ProjectId::from_proto(project.id));
1948 }
1949 proto::participant_location::Variant::UnsharedProject(_) => {
1950 location_kind = 1;
1951 location_project_id = None;
1952 }
1953 proto::participant_location::Variant::External(_) => {
1954 location_kind = 2;
1955 location_project_id = None;
1956 }
1957 }
1958
1959 let result = room_participant::Entity::update_many()
1960 .filter(
1961 Condition::all()
1962 .add(room_participant::Column::RoomId.eq(room_id))
1963 .add(
1964 room_participant::Column::AnsweringConnectionId
1965 .eq(connection.id as i32),
1966 )
1967 .add(
1968 room_participant::Column::AnsweringConnectionServerId
1969 .eq(connection.owner_id as i32),
1970 ),
1971 )
1972 .set(room_participant::ActiveModel {
1973 location_kind: ActiveValue::set(Some(location_kind)),
1974 location_project_id: ActiveValue::set(location_project_id),
1975 ..Default::default()
1976 })
1977 .exec(&*tx)
1978 .await?;
1979
1980 if result.rows_affected == 1 {
1981 let room = self.get_room(room_id, &tx).await?;
1982 Ok(room)
1983 } else {
1984 Err(anyhow!("could not update room participant location"))?
1985 }
1986 })
1987 .await
1988 }
1989
1990 pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1991 self.transaction(|tx| async move {
1992 let participant = room_participant::Entity::find()
1993 .filter(
1994 Condition::all()
1995 .add(
1996 room_participant::Column::AnsweringConnectionId
1997 .eq(connection.id as i32),
1998 )
1999 .add(
2000 room_participant::Column::AnsweringConnectionServerId
2001 .eq(connection.owner_id as i32),
2002 ),
2003 )
2004 .one(&*tx)
2005 .await?
2006 .ok_or_else(|| anyhow!("not a participant in any room"))?;
2007
2008 room_participant::Entity::update(room_participant::ActiveModel {
2009 answering_connection_lost: ActiveValue::set(true),
2010 ..participant.into_active_model()
2011 })
2012 .exec(&*tx)
2013 .await?;
2014
2015 Ok(())
2016 })
2017 .await
2018 }
2019
2020 fn build_incoming_call(
2021 room: &proto::Room,
2022 called_user_id: UserId,
2023 ) -> Option<proto::IncomingCall> {
2024 let pending_participant = room
2025 .pending_participants
2026 .iter()
2027 .find(|participant| participant.user_id == called_user_id.to_proto())?;
2028
2029 Some(proto::IncomingCall {
2030 room_id: room.id,
2031 calling_user_id: pending_participant.calling_user_id,
2032 participant_user_ids: room
2033 .participants
2034 .iter()
2035 .map(|participant| participant.user_id)
2036 .collect(),
2037 initial_project: room.participants.iter().find_map(|participant| {
2038 let initial_project_id = pending_participant.initial_project_id?;
2039 participant
2040 .projects
2041 .iter()
2042 .find(|project| project.id == initial_project_id)
2043 .cloned()
2044 }),
2045 })
2046 }
2047
2048 async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
2049 let db_room = room::Entity::find_by_id(room_id)
2050 .one(tx)
2051 .await?
2052 .ok_or_else(|| anyhow!("could not find room"))?;
2053
2054 let mut db_participants = db_room
2055 .find_related(room_participant::Entity)
2056 .stream(tx)
2057 .await?;
2058 let mut participants = HashMap::default();
2059 let mut pending_participants = Vec::new();
2060 while let Some(db_participant) = db_participants.next().await {
2061 let db_participant = db_participant?;
2062 if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
2063 .answering_connection_id
2064 .zip(db_participant.answering_connection_server_id)
2065 {
2066 let location = match (
2067 db_participant.location_kind,
2068 db_participant.location_project_id,
2069 ) {
2070 (Some(0), Some(project_id)) => {
2071 Some(proto::participant_location::Variant::SharedProject(
2072 proto::participant_location::SharedProject {
2073 id: project_id.to_proto(),
2074 },
2075 ))
2076 }
2077 (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
2078 Default::default(),
2079 )),
2080 _ => Some(proto::participant_location::Variant::External(
2081 Default::default(),
2082 )),
2083 };
2084
2085 let answering_connection = ConnectionId {
2086 owner_id: answering_connection_server_id.0 as u32,
2087 id: answering_connection_id as u32,
2088 };
2089 participants.insert(
2090 answering_connection,
2091 proto::Participant {
2092 user_id: db_participant.user_id.to_proto(),
2093 peer_id: Some(answering_connection.into()),
2094 projects: Default::default(),
2095 location: Some(proto::ParticipantLocation { variant: location }),
2096 },
2097 );
2098 } else {
2099 pending_participants.push(proto::PendingParticipant {
2100 user_id: db_participant.user_id.to_proto(),
2101 calling_user_id: db_participant.calling_user_id.to_proto(),
2102 initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
2103 });
2104 }
2105 }
2106 drop(db_participants);
2107
2108 let mut db_projects = db_room
2109 .find_related(project::Entity)
2110 .find_with_related(worktree::Entity)
2111 .stream(tx)
2112 .await?;
2113
2114 while let Some(row) = db_projects.next().await {
2115 let (db_project, db_worktree) = row?;
2116 let host_connection = db_project.host_connection()?;
2117 if let Some(participant) = participants.get_mut(&host_connection) {
2118 let project = if let Some(project) = participant
2119 .projects
2120 .iter_mut()
2121 .find(|project| project.id == db_project.id.to_proto())
2122 {
2123 project
2124 } else {
2125 participant.projects.push(proto::ParticipantProject {
2126 id: db_project.id.to_proto(),
2127 worktree_root_names: Default::default(),
2128 });
2129 participant.projects.last_mut().unwrap()
2130 };
2131
2132 if let Some(db_worktree) = db_worktree {
2133 if db_worktree.visible {
2134 project.worktree_root_names.push(db_worktree.root_name);
2135 }
2136 }
2137 }
2138 }
2139 drop(db_projects);
2140
2141 let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
2142 let mut followers = Vec::new();
2143 while let Some(db_follower) = db_followers.next().await {
2144 let db_follower = db_follower?;
2145 followers.push(proto::Follower {
2146 leader_id: Some(db_follower.leader_connection().into()),
2147 follower_id: Some(db_follower.follower_connection().into()),
2148 project_id: db_follower.project_id.to_proto(),
2149 });
2150 }
2151
2152 Ok(proto::Room {
2153 id: db_room.id.to_proto(),
2154 live_kit_room: db_room.live_kit_room,
2155 participants: participants.into_values().collect(),
2156 pending_participants,
2157 followers,
2158 })
2159 }
2160
2161 // projects
2162
2163 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
2164 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2165 enum QueryAs {
2166 Count,
2167 }
2168
2169 self.transaction(|tx| async move {
2170 Ok(project::Entity::find()
2171 .select_only()
2172 .column_as(project::Column::Id.count(), QueryAs::Count)
2173 .inner_join(user::Entity)
2174 .filter(user::Column::Admin.eq(false))
2175 .into_values::<_, QueryAs>()
2176 .one(&*tx)
2177 .await?
2178 .unwrap_or(0i64) as usize)
2179 })
2180 .await
2181 }
2182
2183 pub async fn share_project(
2184 &self,
2185 room_id: RoomId,
2186 connection: ConnectionId,
2187 worktrees: &[proto::WorktreeMetadata],
2188 ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
2189 self.room_transaction(room_id, |tx| async move {
2190 let participant = room_participant::Entity::find()
2191 .filter(
2192 Condition::all()
2193 .add(
2194 room_participant::Column::AnsweringConnectionId
2195 .eq(connection.id as i32),
2196 )
2197 .add(
2198 room_participant::Column::AnsweringConnectionServerId
2199 .eq(connection.owner_id as i32),
2200 ),
2201 )
2202 .one(&*tx)
2203 .await?
2204 .ok_or_else(|| anyhow!("could not find participant"))?;
2205 if participant.room_id != room_id {
2206 return Err(anyhow!("shared project on unexpected room"))?;
2207 }
2208
2209 let project = project::ActiveModel {
2210 room_id: ActiveValue::set(participant.room_id),
2211 host_user_id: ActiveValue::set(participant.user_id),
2212 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
2213 host_connection_server_id: ActiveValue::set(Some(ServerId(
2214 connection.owner_id as i32,
2215 ))),
2216 ..Default::default()
2217 }
2218 .insert(&*tx)
2219 .await?;
2220
2221 if !worktrees.is_empty() {
2222 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2223 worktree::ActiveModel {
2224 id: ActiveValue::set(worktree.id as i64),
2225 project_id: ActiveValue::set(project.id),
2226 abs_path: ActiveValue::set(worktree.abs_path.clone()),
2227 root_name: ActiveValue::set(worktree.root_name.clone()),
2228 visible: ActiveValue::set(worktree.visible),
2229 scan_id: ActiveValue::set(0),
2230 completed_scan_id: ActiveValue::set(0),
2231 }
2232 }))
2233 .exec(&*tx)
2234 .await?;
2235 }
2236
2237 project_collaborator::ActiveModel {
2238 project_id: ActiveValue::set(project.id),
2239 connection_id: ActiveValue::set(connection.id as i32),
2240 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2241 user_id: ActiveValue::set(participant.user_id),
2242 replica_id: ActiveValue::set(ReplicaId(0)),
2243 is_host: ActiveValue::set(true),
2244 ..Default::default()
2245 }
2246 .insert(&*tx)
2247 .await?;
2248
2249 let room = self.get_room(room_id, &tx).await?;
2250 Ok((project.id, room))
2251 })
2252 .await
2253 }
2254
2255 pub async fn unshare_project(
2256 &self,
2257 project_id: ProjectId,
2258 connection: ConnectionId,
2259 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2260 let room_id = self.room_id_for_project(project_id).await?;
2261 self.room_transaction(room_id, |tx| async move {
2262 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2263
2264 let project = project::Entity::find_by_id(project_id)
2265 .one(&*tx)
2266 .await?
2267 .ok_or_else(|| anyhow!("project not found"))?;
2268 if project.host_connection()? == connection {
2269 project::Entity::delete(project.into_active_model())
2270 .exec(&*tx)
2271 .await?;
2272 let room = self.get_room(room_id, &tx).await?;
2273 Ok((room, guest_connection_ids))
2274 } else {
2275 Err(anyhow!("cannot unshare a project hosted by another user"))?
2276 }
2277 })
2278 .await
2279 }
2280
2281 pub async fn update_project(
2282 &self,
2283 project_id: ProjectId,
2284 connection: ConnectionId,
2285 worktrees: &[proto::WorktreeMetadata],
2286 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2287 let room_id = self.room_id_for_project(project_id).await?;
2288 self.room_transaction(room_id, |tx| async move {
2289 let project = project::Entity::find_by_id(project_id)
2290 .filter(
2291 Condition::all()
2292 .add(project::Column::HostConnectionId.eq(connection.id as i32))
2293 .add(
2294 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2295 ),
2296 )
2297 .one(&*tx)
2298 .await?
2299 .ok_or_else(|| anyhow!("no such project"))?;
2300
2301 self.update_project_worktrees(project.id, worktrees, &tx)
2302 .await?;
2303
2304 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2305 let room = self.get_room(project.room_id, &tx).await?;
2306 Ok((room, guest_connection_ids))
2307 })
2308 .await
2309 }
2310
2311 async fn update_project_worktrees(
2312 &self,
2313 project_id: ProjectId,
2314 worktrees: &[proto::WorktreeMetadata],
2315 tx: &DatabaseTransaction,
2316 ) -> Result<()> {
2317 if !worktrees.is_empty() {
2318 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2319 id: ActiveValue::set(worktree.id as i64),
2320 project_id: ActiveValue::set(project_id),
2321 abs_path: ActiveValue::set(worktree.abs_path.clone()),
2322 root_name: ActiveValue::set(worktree.root_name.clone()),
2323 visible: ActiveValue::set(worktree.visible),
2324 scan_id: ActiveValue::set(0),
2325 completed_scan_id: ActiveValue::set(0),
2326 }))
2327 .on_conflict(
2328 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2329 .update_column(worktree::Column::RootName)
2330 .to_owned(),
2331 )
2332 .exec(&*tx)
2333 .await?;
2334 }
2335
2336 worktree::Entity::delete_many()
2337 .filter(worktree::Column::ProjectId.eq(project_id).and(
2338 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2339 ))
2340 .exec(&*tx)
2341 .await?;
2342
2343 Ok(())
2344 }
2345
2346 pub async fn update_worktree(
2347 &self,
2348 update: &proto::UpdateWorktree,
2349 connection: ConnectionId,
2350 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2351 let project_id = ProjectId::from_proto(update.project_id);
2352 let worktree_id = update.worktree_id as i64;
2353 let room_id = self.room_id_for_project(project_id).await?;
2354 self.room_transaction(room_id, |tx| async move {
2355 // Ensure the update comes from the host.
2356 let _project = project::Entity::find_by_id(project_id)
2357 .filter(
2358 Condition::all()
2359 .add(project::Column::HostConnectionId.eq(connection.id as i32))
2360 .add(
2361 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2362 ),
2363 )
2364 .one(&*tx)
2365 .await?
2366 .ok_or_else(|| anyhow!("no such project"))?;
2367
2368 // Update metadata.
2369 worktree::Entity::update(worktree::ActiveModel {
2370 id: ActiveValue::set(worktree_id),
2371 project_id: ActiveValue::set(project_id),
2372 root_name: ActiveValue::set(update.root_name.clone()),
2373 scan_id: ActiveValue::set(update.scan_id as i64),
2374 completed_scan_id: if update.is_last_update {
2375 ActiveValue::set(update.scan_id as i64)
2376 } else {
2377 ActiveValue::default()
2378 },
2379 abs_path: ActiveValue::set(update.abs_path.clone()),
2380 ..Default::default()
2381 })
2382 .exec(&*tx)
2383 .await?;
2384
2385 if !update.updated_entries.is_empty() {
2386 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2387 let mtime = entry.mtime.clone().unwrap_or_default();
2388 worktree_entry::ActiveModel {
2389 project_id: ActiveValue::set(project_id),
2390 worktree_id: ActiveValue::set(worktree_id),
2391 id: ActiveValue::set(entry.id as i64),
2392 is_dir: ActiveValue::set(entry.is_dir),
2393 path: ActiveValue::set(entry.path.clone()),
2394 inode: ActiveValue::set(entry.inode as i64),
2395 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2396 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2397 is_symlink: ActiveValue::set(entry.is_symlink),
2398 is_ignored: ActiveValue::set(entry.is_ignored),
2399 is_deleted: ActiveValue::set(false),
2400 scan_id: ActiveValue::set(update.scan_id as i64),
2401 }
2402 }))
2403 .on_conflict(
2404 OnConflict::columns([
2405 worktree_entry::Column::ProjectId,
2406 worktree_entry::Column::WorktreeId,
2407 worktree_entry::Column::Id,
2408 ])
2409 .update_columns([
2410 worktree_entry::Column::IsDir,
2411 worktree_entry::Column::Path,
2412 worktree_entry::Column::Inode,
2413 worktree_entry::Column::MtimeSeconds,
2414 worktree_entry::Column::MtimeNanos,
2415 worktree_entry::Column::IsSymlink,
2416 worktree_entry::Column::IsIgnored,
2417 worktree_entry::Column::ScanId,
2418 ])
2419 .to_owned(),
2420 )
2421 .exec(&*tx)
2422 .await?;
2423 }
2424
2425 if !update.removed_entries.is_empty() {
2426 worktree_entry::Entity::update_many()
2427 .filter(
2428 worktree_entry::Column::ProjectId
2429 .eq(project_id)
2430 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2431 .and(
2432 worktree_entry::Column::Id
2433 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2434 ),
2435 )
2436 .set(worktree_entry::ActiveModel {
2437 is_deleted: ActiveValue::Set(true),
2438 scan_id: ActiveValue::Set(update.scan_id as i64),
2439 ..Default::default()
2440 })
2441 .exec(&*tx)
2442 .await?;
2443 }
2444
2445 if !update.updated_repositories.is_empty() {
2446 worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
2447 |repository| worktree_repository::ActiveModel {
2448 project_id: ActiveValue::set(project_id),
2449 worktree_id: ActiveValue::set(worktree_id),
2450 work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
2451 scan_id: ActiveValue::set(update.scan_id as i64),
2452 branch: ActiveValue::set(repository.branch.clone()),
2453 is_deleted: ActiveValue::set(false),
2454 },
2455 ))
2456 .on_conflict(
2457 OnConflict::columns([
2458 worktree_repository::Column::ProjectId,
2459 worktree_repository::Column::WorktreeId,
2460 worktree_repository::Column::WorkDirectoryId,
2461 ])
2462 .update_columns([
2463 worktree_repository::Column::ScanId,
2464 worktree_repository::Column::Branch,
2465 ])
2466 .to_owned(),
2467 )
2468 .exec(&*tx)
2469 .await?;
2470
2471 for repository in update.updated_repositories.iter() {
2472 if !repository.updated_statuses.is_empty() {
2473 worktree_repository_statuses::Entity::insert_many(
2474 repository.updated_statuses.iter().map(|status_entry| {
2475 worktree_repository_statuses::ActiveModel {
2476 project_id: ActiveValue::set(project_id),
2477 worktree_id: ActiveValue::set(worktree_id),
2478 work_directory_id: ActiveValue::set(
2479 repository.work_directory_id as i64,
2480 ),
2481 repo_path: ActiveValue::set(status_entry.repo_path.clone()),
2482 status: ActiveValue::set(status_entry.status as i64),
2483 scan_id: ActiveValue::set(update.scan_id as i64),
2484 is_deleted: ActiveValue::set(false),
2485 }
2486 }),
2487 )
2488 .on_conflict(
2489 OnConflict::columns([
2490 worktree_repository_statuses::Column::ProjectId,
2491 worktree_repository_statuses::Column::WorktreeId,
2492 worktree_repository_statuses::Column::WorkDirectoryId,
2493 worktree_repository_statuses::Column::RepoPath,
2494 ])
2495 .update_columns([
2496 worktree_repository_statuses::Column::ScanId,
2497 worktree_repository_statuses::Column::Status,
2498 worktree_repository_statuses::Column::IsDeleted,
2499 ])
2500 .to_owned(),
2501 )
2502 .exec(&*tx)
2503 .await?;
2504 }
2505
2506 if !repository.removed_repo_paths.is_empty() {
2507 worktree_repository_statuses::Entity::update_many()
2508 .filter(
2509 worktree_repository_statuses::Column::ProjectId
2510 .eq(project_id)
2511 .and(
2512 worktree_repository_statuses::Column::WorktreeId
2513 .eq(worktree_id),
2514 )
2515 .and(
2516 worktree_repository_statuses::Column::WorkDirectoryId
2517 .eq(repository.work_directory_id as i64),
2518 )
2519 .and(worktree_repository_statuses::Column::RepoPath.is_in(
2520 repository.removed_repo_paths.iter().map(String::as_str),
2521 )),
2522 )
2523 .set(worktree_repository_statuses::ActiveModel {
2524 is_deleted: ActiveValue::Set(true),
2525 scan_id: ActiveValue::Set(update.scan_id as i64),
2526 ..Default::default()
2527 })
2528 .exec(&*tx)
2529 .await?;
2530 }
2531 }
2532 }
2533
2534 if !update.removed_repositories.is_empty() {
2535 worktree_repository::Entity::update_many()
2536 .filter(
2537 worktree_repository::Column::ProjectId
2538 .eq(project_id)
2539 .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
2540 .and(
2541 worktree_repository::Column::WorkDirectoryId
2542 .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
2543 ),
2544 )
2545 .set(worktree_repository::ActiveModel {
2546 is_deleted: ActiveValue::Set(true),
2547 scan_id: ActiveValue::Set(update.scan_id as i64),
2548 ..Default::default()
2549 })
2550 .exec(&*tx)
2551 .await?;
2552 }
2553
2554 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2555 Ok(connection_ids)
2556 })
2557 .await
2558 }
2559
2560 pub async fn update_diagnostic_summary(
2561 &self,
2562 update: &proto::UpdateDiagnosticSummary,
2563 connection: ConnectionId,
2564 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2565 let project_id = ProjectId::from_proto(update.project_id);
2566 let worktree_id = update.worktree_id as i64;
2567 let room_id = self.room_id_for_project(project_id).await?;
2568 self.room_transaction(room_id, |tx| async move {
2569 let summary = update
2570 .summary
2571 .as_ref()
2572 .ok_or_else(|| anyhow!("invalid summary"))?;
2573
2574 // Ensure the update comes from the host.
2575 let project = project::Entity::find_by_id(project_id)
2576 .one(&*tx)
2577 .await?
2578 .ok_or_else(|| anyhow!("no such project"))?;
2579 if project.host_connection()? != connection {
2580 return Err(anyhow!("can't update a project hosted by someone else"))?;
2581 }
2582
2583 // Update summary.
2584 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2585 project_id: ActiveValue::set(project_id),
2586 worktree_id: ActiveValue::set(worktree_id),
2587 path: ActiveValue::set(summary.path.clone()),
2588 language_server_id: ActiveValue::set(summary.language_server_id as i64),
2589 error_count: ActiveValue::set(summary.error_count as i32),
2590 warning_count: ActiveValue::set(summary.warning_count as i32),
2591 ..Default::default()
2592 })
2593 .on_conflict(
2594 OnConflict::columns([
2595 worktree_diagnostic_summary::Column::ProjectId,
2596 worktree_diagnostic_summary::Column::WorktreeId,
2597 worktree_diagnostic_summary::Column::Path,
2598 ])
2599 .update_columns([
2600 worktree_diagnostic_summary::Column::LanguageServerId,
2601 worktree_diagnostic_summary::Column::ErrorCount,
2602 worktree_diagnostic_summary::Column::WarningCount,
2603 ])
2604 .to_owned(),
2605 )
2606 .exec(&*tx)
2607 .await?;
2608
2609 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2610 Ok(connection_ids)
2611 })
2612 .await
2613 }
2614
2615 pub async fn start_language_server(
2616 &self,
2617 update: &proto::StartLanguageServer,
2618 connection: ConnectionId,
2619 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2620 let project_id = ProjectId::from_proto(update.project_id);
2621 let room_id = self.room_id_for_project(project_id).await?;
2622 self.room_transaction(room_id, |tx| async move {
2623 let server = update
2624 .server
2625 .as_ref()
2626 .ok_or_else(|| anyhow!("invalid language server"))?;
2627
2628 // Ensure the update comes from the host.
2629 let project = project::Entity::find_by_id(project_id)
2630 .one(&*tx)
2631 .await?
2632 .ok_or_else(|| anyhow!("no such project"))?;
2633 if project.host_connection()? != connection {
2634 return Err(anyhow!("can't update a project hosted by someone else"))?;
2635 }
2636
2637 // Add the newly-started language server.
2638 language_server::Entity::insert(language_server::ActiveModel {
2639 project_id: ActiveValue::set(project_id),
2640 id: ActiveValue::set(server.id as i64),
2641 name: ActiveValue::set(server.name.clone()),
2642 ..Default::default()
2643 })
2644 .on_conflict(
2645 OnConflict::columns([
2646 language_server::Column::ProjectId,
2647 language_server::Column::Id,
2648 ])
2649 .update_column(language_server::Column::Name)
2650 .to_owned(),
2651 )
2652 .exec(&*tx)
2653 .await?;
2654
2655 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2656 Ok(connection_ids)
2657 })
2658 .await
2659 }
2660
2661 pub async fn update_worktree_settings(
2662 &self,
2663 update: &proto::UpdateWorktreeSettings,
2664 connection: ConnectionId,
2665 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2666 let project_id = ProjectId::from_proto(update.project_id);
2667 let room_id = self.room_id_for_project(project_id).await?;
2668 self.room_transaction(room_id, |tx| async move {
2669 // Ensure the update comes from the host.
2670 let project = project::Entity::find_by_id(project_id)
2671 .one(&*tx)
2672 .await?
2673 .ok_or_else(|| anyhow!("no such project"))?;
2674 if project.host_connection()? != connection {
2675 return Err(anyhow!("can't update a project hosted by someone else"))?;
2676 }
2677
2678 if let Some(content) = &update.content {
2679 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
2680 project_id: ActiveValue::Set(project_id),
2681 worktree_id: ActiveValue::Set(update.worktree_id as i64),
2682 path: ActiveValue::Set(update.path.clone()),
2683 content: ActiveValue::Set(content.clone()),
2684 })
2685 .on_conflict(
2686 OnConflict::columns([
2687 worktree_settings_file::Column::ProjectId,
2688 worktree_settings_file::Column::WorktreeId,
2689 worktree_settings_file::Column::Path,
2690 ])
2691 .update_column(worktree_settings_file::Column::Content)
2692 .to_owned(),
2693 )
2694 .exec(&*tx)
2695 .await?;
2696 } else {
2697 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
2698 project_id: ActiveValue::Set(project_id),
2699 worktree_id: ActiveValue::Set(update.worktree_id as i64),
2700 path: ActiveValue::Set(update.path.clone()),
2701 ..Default::default()
2702 })
2703 .exec(&*tx)
2704 .await?;
2705 }
2706
2707 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2708 Ok(connection_ids)
2709 })
2710 .await
2711 }
2712
2713 pub async fn join_project(
2714 &self,
2715 project_id: ProjectId,
2716 connection: ConnectionId,
2717 ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2718 let room_id = self.room_id_for_project(project_id).await?;
2719 self.room_transaction(room_id, |tx| async move {
2720 let participant = room_participant::Entity::find()
2721 .filter(
2722 Condition::all()
2723 .add(
2724 room_participant::Column::AnsweringConnectionId
2725 .eq(connection.id as i32),
2726 )
2727 .add(
2728 room_participant::Column::AnsweringConnectionServerId
2729 .eq(connection.owner_id as i32),
2730 ),
2731 )
2732 .one(&*tx)
2733 .await?
2734 .ok_or_else(|| anyhow!("must join a room first"))?;
2735
2736 let project = project::Entity::find_by_id(project_id)
2737 .one(&*tx)
2738 .await?
2739 .ok_or_else(|| anyhow!("no such project"))?;
2740 if project.room_id != participant.room_id {
2741 return Err(anyhow!("no such project"))?;
2742 }
2743
2744 let mut collaborators = project
2745 .find_related(project_collaborator::Entity)
2746 .all(&*tx)
2747 .await?;
2748 let replica_ids = collaborators
2749 .iter()
2750 .map(|c| c.replica_id)
2751 .collect::<HashSet<_>>();
2752 let mut replica_id = ReplicaId(1);
2753 while replica_ids.contains(&replica_id) {
2754 replica_id.0 += 1;
2755 }
2756 let new_collaborator = project_collaborator::ActiveModel {
2757 project_id: ActiveValue::set(project_id),
2758 connection_id: ActiveValue::set(connection.id as i32),
2759 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2760 user_id: ActiveValue::set(participant.user_id),
2761 replica_id: ActiveValue::set(replica_id),
2762 is_host: ActiveValue::set(false),
2763 ..Default::default()
2764 }
2765 .insert(&*tx)
2766 .await?;
2767 collaborators.push(new_collaborator);
2768
2769 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2770 let mut worktrees = db_worktrees
2771 .into_iter()
2772 .map(|db_worktree| {
2773 (
2774 db_worktree.id as u64,
2775 Worktree {
2776 id: db_worktree.id as u64,
2777 abs_path: db_worktree.abs_path,
2778 root_name: db_worktree.root_name,
2779 visible: db_worktree.visible,
2780 entries: Default::default(),
2781 repository_entries: Default::default(),
2782 diagnostic_summaries: Default::default(),
2783 settings_files: Default::default(),
2784 scan_id: db_worktree.scan_id as u64,
2785 completed_scan_id: db_worktree.completed_scan_id as u64,
2786 },
2787 )
2788 })
2789 .collect::<BTreeMap<_, _>>();
2790
2791 // Populate worktree entries.
2792 {
2793 let mut db_entries = worktree_entry::Entity::find()
2794 .filter(
2795 Condition::all()
2796 .add(worktree_entry::Column::ProjectId.eq(project_id))
2797 .add(worktree_entry::Column::IsDeleted.eq(false)),
2798 )
2799 .stream(&*tx)
2800 .await?;
2801 while let Some(db_entry) = db_entries.next().await {
2802 let db_entry = db_entry?;
2803 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2804 worktree.entries.push(proto::Entry {
2805 id: db_entry.id as u64,
2806 is_dir: db_entry.is_dir,
2807 path: db_entry.path,
2808 inode: db_entry.inode as u64,
2809 mtime: Some(proto::Timestamp {
2810 seconds: db_entry.mtime_seconds as u64,
2811 nanos: db_entry.mtime_nanos as u32,
2812 }),
2813 is_symlink: db_entry.is_symlink,
2814 is_ignored: db_entry.is_ignored,
2815 });
2816 }
2817 }
2818 }
2819
2820 // Populate repository entries.
2821 {
2822 let mut db_repository_entries = worktree_repository::Entity::find()
2823 .filter(
2824 Condition::all()
2825 .add(worktree_repository::Column::ProjectId.eq(project_id))
2826 .add(worktree_repository::Column::IsDeleted.eq(false)),
2827 )
2828 .stream(&*tx)
2829 .await?;
2830 while let Some(db_repository_entry) = db_repository_entries.next().await {
2831 let db_repository_entry = db_repository_entry?;
2832 if let Some(worktree) =
2833 worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
2834 {
2835 worktree.repository_entries.insert(
2836 db_repository_entry.work_directory_id as u64,
2837 proto::RepositoryEntry {
2838 work_directory_id: db_repository_entry.work_directory_id as u64,
2839 branch: db_repository_entry.branch,
2840 removed_repo_paths: Default::default(),
2841 updated_statuses: Default::default(),
2842 },
2843 );
2844 }
2845 }
2846 }
2847
2848 {
2849 let mut db_status_entries = worktree_repository_statuses::Entity::find()
2850 .filter(
2851 Condition::all()
2852 .add(worktree_repository_statuses::Column::ProjectId.eq(project_id))
2853 .add(worktree_repository_statuses::Column::IsDeleted.eq(false)),
2854 )
2855 .stream(&*tx)
2856 .await?;
2857
2858 while let Some(db_status_entry) = db_status_entries.next().await {
2859 let db_status_entry = db_status_entry?;
2860 if let Some(worktree) = worktrees.get_mut(&(db_status_entry.worktree_id as u64))
2861 {
2862 if let Some(repository_entry) = worktree
2863 .repository_entries
2864 .get_mut(&(db_status_entry.work_directory_id as u64))
2865 {
2866 repository_entry.updated_statuses.push(proto::StatusEntry {
2867 repo_path: db_status_entry.repo_path,
2868 status: db_status_entry.status as i32,
2869 });
2870 }
2871 }
2872 }
2873 }
2874
2875 // Populate worktree diagnostic summaries.
2876 {
2877 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2878 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2879 .stream(&*tx)
2880 .await?;
2881 while let Some(db_summary) = db_summaries.next().await {
2882 let db_summary = db_summary?;
2883 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2884 worktree
2885 .diagnostic_summaries
2886 .push(proto::DiagnosticSummary {
2887 path: db_summary.path,
2888 language_server_id: db_summary.language_server_id as u64,
2889 error_count: db_summary.error_count as u32,
2890 warning_count: db_summary.warning_count as u32,
2891 });
2892 }
2893 }
2894 }
2895
2896 // Populate worktree settings files
2897 {
2898 let mut db_settings_files = worktree_settings_file::Entity::find()
2899 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
2900 .stream(&*tx)
2901 .await?;
2902 while let Some(db_settings_file) = db_settings_files.next().await {
2903 let db_settings_file = db_settings_file?;
2904 if let Some(worktree) =
2905 worktrees.get_mut(&(db_settings_file.worktree_id as u64))
2906 {
2907 worktree.settings_files.push(WorktreeSettingsFile {
2908 path: db_settings_file.path,
2909 content: db_settings_file.content,
2910 });
2911 }
2912 }
2913 }
2914
2915 // Populate language servers.
2916 let language_servers = project
2917 .find_related(language_server::Entity)
2918 .all(&*tx)
2919 .await?;
2920
2921 let project = Project {
2922 collaborators: collaborators
2923 .into_iter()
2924 .map(|collaborator| ProjectCollaborator {
2925 connection_id: collaborator.connection(),
2926 user_id: collaborator.user_id,
2927 replica_id: collaborator.replica_id,
2928 is_host: collaborator.is_host,
2929 })
2930 .collect(),
2931 worktrees,
2932 language_servers: language_servers
2933 .into_iter()
2934 .map(|language_server| proto::LanguageServer {
2935 id: language_server.id as u64,
2936 name: language_server.name,
2937 })
2938 .collect(),
2939 };
2940 Ok((project, replica_id as ReplicaId))
2941 })
2942 .await
2943 }
2944
2945 pub async fn leave_project(
2946 &self,
2947 project_id: ProjectId,
2948 connection: ConnectionId,
2949 ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
2950 let room_id = self.room_id_for_project(project_id).await?;
2951 self.room_transaction(room_id, |tx| async move {
2952 let result = project_collaborator::Entity::delete_many()
2953 .filter(
2954 Condition::all()
2955 .add(project_collaborator::Column::ProjectId.eq(project_id))
2956 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2957 .add(
2958 project_collaborator::Column::ConnectionServerId
2959 .eq(connection.owner_id as i32),
2960 ),
2961 )
2962 .exec(&*tx)
2963 .await?;
2964 if result.rows_affected == 0 {
2965 Err(anyhow!("not a collaborator on this project"))?;
2966 }
2967
2968 let project = project::Entity::find_by_id(project_id)
2969 .one(&*tx)
2970 .await?
2971 .ok_or_else(|| anyhow!("no such project"))?;
2972 let collaborators = project
2973 .find_related(project_collaborator::Entity)
2974 .all(&*tx)
2975 .await?;
2976 let connection_ids = collaborators
2977 .into_iter()
2978 .map(|collaborator| collaborator.connection())
2979 .collect();
2980
2981 follower::Entity::delete_many()
2982 .filter(
2983 Condition::any()
2984 .add(
2985 Condition::all()
2986 .add(follower::Column::ProjectId.eq(project_id))
2987 .add(
2988 follower::Column::LeaderConnectionServerId
2989 .eq(connection.owner_id),
2990 )
2991 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
2992 )
2993 .add(
2994 Condition::all()
2995 .add(follower::Column::ProjectId.eq(project_id))
2996 .add(
2997 follower::Column::FollowerConnectionServerId
2998 .eq(connection.owner_id),
2999 )
3000 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
3001 ),
3002 )
3003 .exec(&*tx)
3004 .await?;
3005
3006 let room = self.get_room(project.room_id, &tx).await?;
3007 let left_project = LeftProject {
3008 id: project_id,
3009 host_user_id: project.host_user_id,
3010 host_connection_id: project.host_connection()?,
3011 connection_ids,
3012 };
3013 Ok((room, left_project))
3014 })
3015 .await
3016 }
3017
3018 pub async fn project_collaborators(
3019 &self,
3020 project_id: ProjectId,
3021 connection_id: ConnectionId,
3022 ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
3023 let room_id = self.room_id_for_project(project_id).await?;
3024 self.room_transaction(room_id, |tx| async move {
3025 let collaborators = project_collaborator::Entity::find()
3026 .filter(project_collaborator::Column::ProjectId.eq(project_id))
3027 .all(&*tx)
3028 .await?
3029 .into_iter()
3030 .map(|collaborator| ProjectCollaborator {
3031 connection_id: collaborator.connection(),
3032 user_id: collaborator.user_id,
3033 replica_id: collaborator.replica_id,
3034 is_host: collaborator.is_host,
3035 })
3036 .collect::<Vec<_>>();
3037
3038 if collaborators
3039 .iter()
3040 .any(|collaborator| collaborator.connection_id == connection_id)
3041 {
3042 Ok(collaborators)
3043 } else {
3044 Err(anyhow!("no such project"))?
3045 }
3046 })
3047 .await
3048 }
3049
3050 pub async fn project_connection_ids(
3051 &self,
3052 project_id: ProjectId,
3053 connection_id: ConnectionId,
3054 ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
3055 let room_id = self.room_id_for_project(project_id).await?;
3056 self.room_transaction(room_id, |tx| async move {
3057 let mut collaborators = project_collaborator::Entity::find()
3058 .filter(project_collaborator::Column::ProjectId.eq(project_id))
3059 .stream(&*tx)
3060 .await?;
3061
3062 let mut connection_ids = HashSet::default();
3063 while let Some(collaborator) = collaborators.next().await {
3064 let collaborator = collaborator?;
3065 connection_ids.insert(collaborator.connection());
3066 }
3067
3068 if connection_ids.contains(&connection_id) {
3069 Ok(connection_ids)
3070 } else {
3071 Err(anyhow!("no such project"))?
3072 }
3073 })
3074 .await
3075 }
3076
3077 async fn project_guest_connection_ids(
3078 &self,
3079 project_id: ProjectId,
3080 tx: &DatabaseTransaction,
3081 ) -> Result<Vec<ConnectionId>> {
3082 let mut collaborators = project_collaborator::Entity::find()
3083 .filter(
3084 project_collaborator::Column::ProjectId
3085 .eq(project_id)
3086 .and(project_collaborator::Column::IsHost.eq(false)),
3087 )
3088 .stream(tx)
3089 .await?;
3090
3091 let mut guest_connection_ids = Vec::new();
3092 while let Some(collaborator) = collaborators.next().await {
3093 let collaborator = collaborator?;
3094 guest_connection_ids.push(collaborator.connection());
3095 }
3096 Ok(guest_connection_ids)
3097 }
3098
3099 async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
3100 self.transaction(|tx| async move {
3101 let project = project::Entity::find_by_id(project_id)
3102 .one(&*tx)
3103 .await?
3104 .ok_or_else(|| anyhow!("project {} not found", project_id))?;
3105 Ok(project.room_id)
3106 })
3107 .await
3108 }
3109
3110 // access tokens
3111
3112 pub async fn create_access_token(
3113 &self,
3114 user_id: UserId,
3115 access_token_hash: &str,
3116 max_access_token_count: usize,
3117 ) -> Result<AccessTokenId> {
3118 self.transaction(|tx| async {
3119 let tx = tx;
3120
3121 let token = access_token::ActiveModel {
3122 user_id: ActiveValue::set(user_id),
3123 hash: ActiveValue::set(access_token_hash.into()),
3124 ..Default::default()
3125 }
3126 .insert(&*tx)
3127 .await?;
3128
3129 access_token::Entity::delete_many()
3130 .filter(
3131 access_token::Column::Id.in_subquery(
3132 Query::select()
3133 .column(access_token::Column::Id)
3134 .from(access_token::Entity)
3135 .and_where(access_token::Column::UserId.eq(user_id))
3136 .order_by(access_token::Column::Id, sea_orm::Order::Desc)
3137 .limit(10000)
3138 .offset(max_access_token_count as u64)
3139 .to_owned(),
3140 ),
3141 )
3142 .exec(&*tx)
3143 .await?;
3144 Ok(token.id)
3145 })
3146 .await
3147 }
3148
3149 pub async fn get_access_token(
3150 &self,
3151 access_token_id: AccessTokenId,
3152 ) -> Result<access_token::Model> {
3153 self.transaction(|tx| async move {
3154 Ok(access_token::Entity::find_by_id(access_token_id)
3155 .one(&*tx)
3156 .await?
3157 .ok_or_else(|| anyhow!("no such access token"))?)
3158 })
3159 .await
3160 }
3161
3162 async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
3163 where
3164 F: Send + Fn(TransactionHandle) -> Fut,
3165 Fut: Send + Future<Output = Result<T>>,
3166 {
3167 let body = async {
3168 let mut i = 0;
3169 loop {
3170 let (tx, result) = self.with_transaction(&f).await?;
3171 match result {
3172 Ok(result) => match tx.commit().await.map_err(Into::into) {
3173 Ok(()) => return Ok(result),
3174 Err(error) => {
3175 if !self.retry_on_serialization_error(&error, i).await {
3176 return Err(error);
3177 }
3178 }
3179 },
3180 Err(error) => {
3181 tx.rollback().await?;
3182 if !self.retry_on_serialization_error(&error, i).await {
3183 return Err(error);
3184 }
3185 }
3186 }
3187 i += 1;
3188 }
3189 };
3190
3191 self.run(body).await
3192 }
3193
3194 async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
3195 where
3196 F: Send + Fn(TransactionHandle) -> Fut,
3197 Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
3198 {
3199 let body = async {
3200 let mut i = 0;
3201 loop {
3202 let (tx, result) = self.with_transaction(&f).await?;
3203 match result {
3204 Ok(Some((room_id, data))) => {
3205 let lock = self.rooms.entry(room_id).or_default().clone();
3206 let _guard = lock.lock_owned().await;
3207 match tx.commit().await.map_err(Into::into) {
3208 Ok(()) => {
3209 return Ok(Some(RoomGuard {
3210 data,
3211 _guard,
3212 _not_send: PhantomData,
3213 }));
3214 }
3215 Err(error) => {
3216 if !self.retry_on_serialization_error(&error, i).await {
3217 return Err(error);
3218 }
3219 }
3220 }
3221 }
3222 Ok(None) => match tx.commit().await.map_err(Into::into) {
3223 Ok(()) => return Ok(None),
3224 Err(error) => {
3225 if !self.retry_on_serialization_error(&error, i).await {
3226 return Err(error);
3227 }
3228 }
3229 },
3230 Err(error) => {
3231 tx.rollback().await?;
3232 if !self.retry_on_serialization_error(&error, i).await {
3233 return Err(error);
3234 }
3235 }
3236 }
3237 i += 1;
3238 }
3239 };
3240
3241 self.run(body).await
3242 }
3243
3244 async fn room_transaction<F, Fut, T>(&self, room_id: RoomId, f: F) -> Result<RoomGuard<T>>
3245 where
3246 F: Send + Fn(TransactionHandle) -> Fut,
3247 Fut: Send + Future<Output = Result<T>>,
3248 {
3249 let body = async {
3250 let mut i = 0;
3251 loop {
3252 let lock = self.rooms.entry(room_id).or_default().clone();
3253 let _guard = lock.lock_owned().await;
3254 let (tx, result) = self.with_transaction(&f).await?;
3255 match result {
3256 Ok(data) => match tx.commit().await.map_err(Into::into) {
3257 Ok(()) => {
3258 return Ok(RoomGuard {
3259 data,
3260 _guard,
3261 _not_send: PhantomData,
3262 });
3263 }
3264 Err(error) => {
3265 if !self.retry_on_serialization_error(&error, i).await {
3266 return Err(error);
3267 }
3268 }
3269 },
3270 Err(error) => {
3271 tx.rollback().await?;
3272 if !self.retry_on_serialization_error(&error, i).await {
3273 return Err(error);
3274 }
3275 }
3276 }
3277 i += 1;
3278 }
3279 };
3280
3281 self.run(body).await
3282 }
3283
3284 async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
3285 where
3286 F: Send + Fn(TransactionHandle) -> Fut,
3287 Fut: Send + Future<Output = Result<T>>,
3288 {
3289 let tx = self
3290 .pool
3291 .begin_with_config(Some(IsolationLevel::Serializable), None)
3292 .await?;
3293
3294 let mut tx = Arc::new(Some(tx));
3295 let result = f(TransactionHandle(tx.clone())).await;
3296 let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
3297 return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
3298 };
3299
3300 Ok((tx, result))
3301 }
3302
3303 async fn run<F, T>(&self, future: F) -> Result<T>
3304 where
3305 F: Future<Output = Result<T>>,
3306 {
3307 #[cfg(test)]
3308 {
3309 if let Executor::Deterministic(executor) = &self.executor {
3310 executor.simulate_random_delay().await;
3311 }
3312
3313 self.runtime.as_ref().unwrap().block_on(future)
3314 }
3315
3316 #[cfg(not(test))]
3317 {
3318 future.await
3319 }
3320 }
3321
3322 async fn retry_on_serialization_error(&self, error: &Error, prev_attempt_count: u32) -> bool {
3323 // If the error is due to a failure to serialize concurrent transactions, then retry
3324 // this transaction after a delay. With each subsequent retry, double the delay duration.
3325 // Also vary the delay randomly in order to ensure different database connections retry
3326 // at different times.
3327 if is_serialization_error(error) {
3328 let base_delay = 4_u64 << prev_attempt_count.min(16);
3329 let randomized_delay = base_delay as f32 * self.rng.lock().await.gen_range(0.5..=2.0);
3330 log::info!(
3331 "retrying transaction after serialization error. delay: {} ms.",
3332 randomized_delay
3333 );
3334 self.executor
3335 .sleep(Duration::from_millis(randomized_delay as u64))
3336 .await;
3337 true
3338 } else {
3339 false
3340 }
3341 }
3342}
3343
3344fn is_serialization_error(error: &Error) -> bool {
3345 const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
3346 match error {
3347 Error::Database(
3348 DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
3349 | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
3350 ) if error
3351 .as_database_error()
3352 .and_then(|error| error.code())
3353 .as_deref()
3354 == Some(SERIALIZATION_FAILURE_CODE) =>
3355 {
3356 true
3357 }
3358 _ => false,
3359 }
3360}
3361
3362struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
3363
3364impl Deref for TransactionHandle {
3365 type Target = DatabaseTransaction;
3366
3367 fn deref(&self) -> &Self::Target {
3368 self.0.as_ref().as_ref().unwrap()
3369 }
3370}
3371
3372pub struct RoomGuard<T> {
3373 data: T,
3374 _guard: OwnedMutexGuard<()>,
3375 _not_send: PhantomData<Rc<()>>,
3376}
3377
3378impl<T> Deref for RoomGuard<T> {
3379 type Target = T;
3380
3381 fn deref(&self) -> &T {
3382 &self.data
3383 }
3384}
3385
3386impl<T> DerefMut for RoomGuard<T> {
3387 fn deref_mut(&mut self) -> &mut T {
3388 &mut self.data
3389 }
3390}
3391
3392#[derive(Debug, Serialize, Deserialize)]
3393pub struct NewUserParams {
3394 pub github_login: String,
3395 pub github_user_id: i32,
3396 pub invite_count: i32,
3397}
3398
3399#[derive(Debug)]
3400pub struct NewUserResult {
3401 pub user_id: UserId,
3402 pub metrics_id: String,
3403 pub inviting_user_id: Option<UserId>,
3404 pub signup_device_id: Option<String>,
3405}
3406
3407fn random_invite_code() -> String {
3408 nanoid::nanoid!(16)
3409}
3410
3411fn random_email_confirmation_code() -> String {
3412 nanoid::nanoid!(64)
3413}
3414
3415macro_rules! id_type {
3416 ($name:ident) => {
3417 #[derive(
3418 Clone,
3419 Copy,
3420 Debug,
3421 Default,
3422 PartialEq,
3423 Eq,
3424 PartialOrd,
3425 Ord,
3426 Hash,
3427 Serialize,
3428 Deserialize,
3429 )]
3430 #[serde(transparent)]
3431 pub struct $name(pub i32);
3432
3433 impl $name {
3434 #[allow(unused)]
3435 pub const MAX: Self = Self(i32::MAX);
3436
3437 #[allow(unused)]
3438 pub fn from_proto(value: u64) -> Self {
3439 Self(value as i32)
3440 }
3441
3442 #[allow(unused)]
3443 pub fn to_proto(self) -> u64 {
3444 self.0 as u64
3445 }
3446 }
3447
3448 impl std::fmt::Display for $name {
3449 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
3450 self.0.fmt(f)
3451 }
3452 }
3453
3454 impl From<$name> for sea_query::Value {
3455 fn from(value: $name) -> Self {
3456 sea_query::Value::Int(Some(value.0))
3457 }
3458 }
3459
3460 impl sea_orm::TryGetable for $name {
3461 fn try_get(
3462 res: &sea_orm::QueryResult,
3463 pre: &str,
3464 col: &str,
3465 ) -> Result<Self, sea_orm::TryGetError> {
3466 Ok(Self(i32::try_get(res, pre, col)?))
3467 }
3468 }
3469
3470 impl sea_query::ValueType for $name {
3471 fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
3472 match v {
3473 Value::TinyInt(Some(int)) => {
3474 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3475 }
3476 Value::SmallInt(Some(int)) => {
3477 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3478 }
3479 Value::Int(Some(int)) => {
3480 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3481 }
3482 Value::BigInt(Some(int)) => {
3483 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3484 }
3485 Value::TinyUnsigned(Some(int)) => {
3486 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3487 }
3488 Value::SmallUnsigned(Some(int)) => {
3489 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3490 }
3491 Value::Unsigned(Some(int)) => {
3492 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3493 }
3494 Value::BigUnsigned(Some(int)) => {
3495 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3496 }
3497 _ => Err(sea_query::ValueTypeErr),
3498 }
3499 }
3500
3501 fn type_name() -> String {
3502 stringify!($name).into()
3503 }
3504
3505 fn array_type() -> sea_query::ArrayType {
3506 sea_query::ArrayType::Int
3507 }
3508
3509 fn column_type() -> sea_query::ColumnType {
3510 sea_query::ColumnType::Integer(None)
3511 }
3512 }
3513
3514 impl sea_orm::TryFromU64 for $name {
3515 fn try_from_u64(n: u64) -> Result<Self, DbErr> {
3516 Ok(Self(n.try_into().map_err(|_| {
3517 DbErr::ConvertFromU64(concat!(
3518 "error converting ",
3519 stringify!($name),
3520 " to u64"
3521 ))
3522 })?))
3523 }
3524 }
3525
3526 impl sea_query::Nullable for $name {
3527 fn null() -> Value {
3528 Value::Int(None)
3529 }
3530 }
3531 };
3532}
3533
3534id_type!(AccessTokenId);
3535id_type!(ContactId);
3536id_type!(FollowerId);
3537id_type!(RoomId);
3538id_type!(RoomParticipantId);
3539id_type!(ProjectId);
3540id_type!(ProjectCollaboratorId);
3541id_type!(ReplicaId);
3542id_type!(ServerId);
3543id_type!(SignupId);
3544id_type!(UserId);
3545
3546pub struct RejoinedRoom {
3547 pub room: proto::Room,
3548 pub rejoined_projects: Vec<RejoinedProject>,
3549 pub reshared_projects: Vec<ResharedProject>,
3550}
3551
3552pub struct ResharedProject {
3553 pub id: ProjectId,
3554 pub old_connection_id: ConnectionId,
3555 pub collaborators: Vec<ProjectCollaborator>,
3556 pub worktrees: Vec<proto::WorktreeMetadata>,
3557}
3558
3559pub struct RejoinedProject {
3560 pub id: ProjectId,
3561 pub old_connection_id: ConnectionId,
3562 pub collaborators: Vec<ProjectCollaborator>,
3563 pub worktrees: Vec<RejoinedWorktree>,
3564 pub language_servers: Vec<proto::LanguageServer>,
3565}
3566
3567#[derive(Debug)]
3568pub struct RejoinedWorktree {
3569 pub id: u64,
3570 pub abs_path: String,
3571 pub root_name: String,
3572 pub visible: bool,
3573 pub updated_entries: Vec<proto::Entry>,
3574 pub removed_entries: Vec<u64>,
3575 pub updated_repositories: Vec<proto::RepositoryEntry>,
3576 pub removed_repositories: Vec<u64>,
3577 pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3578 pub settings_files: Vec<WorktreeSettingsFile>,
3579 pub scan_id: u64,
3580 pub completed_scan_id: u64,
3581}
3582
3583pub struct LeftRoom {
3584 pub room: proto::Room,
3585 pub left_projects: HashMap<ProjectId, LeftProject>,
3586 pub canceled_calls_to_user_ids: Vec<UserId>,
3587}
3588
3589pub struct RefreshedRoom {
3590 pub room: proto::Room,
3591 pub stale_participant_user_ids: Vec<UserId>,
3592 pub canceled_calls_to_user_ids: Vec<UserId>,
3593}
3594
3595pub struct Project {
3596 pub collaborators: Vec<ProjectCollaborator>,
3597 pub worktrees: BTreeMap<u64, Worktree>,
3598 pub language_servers: Vec<proto::LanguageServer>,
3599}
3600
3601pub struct ProjectCollaborator {
3602 pub connection_id: ConnectionId,
3603 pub user_id: UserId,
3604 pub replica_id: ReplicaId,
3605 pub is_host: bool,
3606}
3607
3608impl ProjectCollaborator {
3609 pub fn to_proto(&self) -> proto::Collaborator {
3610 proto::Collaborator {
3611 peer_id: Some(self.connection_id.into()),
3612 replica_id: self.replica_id.0 as u32,
3613 user_id: self.user_id.to_proto(),
3614 }
3615 }
3616}
3617
3618#[derive(Debug)]
3619pub struct LeftProject {
3620 pub id: ProjectId,
3621 pub host_user_id: UserId,
3622 pub host_connection_id: ConnectionId,
3623 pub connection_ids: Vec<ConnectionId>,
3624}
3625
3626pub struct Worktree {
3627 pub id: u64,
3628 pub abs_path: String,
3629 pub root_name: String,
3630 pub visible: bool,
3631 pub entries: Vec<proto::Entry>,
3632 pub repository_entries: BTreeMap<u64, proto::RepositoryEntry>,
3633 pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3634 pub settings_files: Vec<WorktreeSettingsFile>,
3635 pub scan_id: u64,
3636 pub completed_scan_id: u64,
3637}
3638
3639#[derive(Debug)]
3640pub struct WorktreeSettingsFile {
3641 pub path: String,
3642 pub content: String,
3643}
3644
3645#[cfg(test)]
3646pub use test::*;
3647
3648#[cfg(test)]
3649mod test {
3650 use super::*;
3651 use gpui::executor::Background;
3652 use lazy_static::lazy_static;
3653 use parking_lot::Mutex;
3654 use sea_orm::ConnectionTrait;
3655 use sqlx::migrate::MigrateDatabase;
3656 use std::sync::Arc;
3657
3658 pub struct TestDb {
3659 pub db: Option<Arc<Database>>,
3660 pub connection: Option<sqlx::AnyConnection>,
3661 }
3662
3663 impl TestDb {
3664 pub fn sqlite(background: Arc<Background>) -> Self {
3665 let url = format!("sqlite::memory:");
3666 let runtime = tokio::runtime::Builder::new_current_thread()
3667 .enable_io()
3668 .enable_time()
3669 .build()
3670 .unwrap();
3671
3672 let mut db = runtime.block_on(async {
3673 let mut options = ConnectOptions::new(url);
3674 options.max_connections(5);
3675 let db = Database::new(options, Executor::Deterministic(background))
3676 .await
3677 .unwrap();
3678 let sql = include_str!(concat!(
3679 env!("CARGO_MANIFEST_DIR"),
3680 "/migrations.sqlite/20221109000000_test_schema.sql"
3681 ));
3682 db.pool
3683 .execute(sea_orm::Statement::from_string(
3684 db.pool.get_database_backend(),
3685 sql.into(),
3686 ))
3687 .await
3688 .unwrap();
3689 db
3690 });
3691
3692 db.runtime = Some(runtime);
3693
3694 Self {
3695 db: Some(Arc::new(db)),
3696 connection: None,
3697 }
3698 }
3699
3700 pub fn postgres(background: Arc<Background>) -> Self {
3701 lazy_static! {
3702 static ref LOCK: Mutex<()> = Mutex::new(());
3703 }
3704
3705 let _guard = LOCK.lock();
3706 let mut rng = StdRng::from_entropy();
3707 let url = format!(
3708 "postgres://postgres@localhost/zed-test-{}",
3709 rng.gen::<u128>()
3710 );
3711 let runtime = tokio::runtime::Builder::new_current_thread()
3712 .enable_io()
3713 .enable_time()
3714 .build()
3715 .unwrap();
3716
3717 let mut db = runtime.block_on(async {
3718 sqlx::Postgres::create_database(&url)
3719 .await
3720 .expect("failed to create test db");
3721 let mut options = ConnectOptions::new(url);
3722 options
3723 .max_connections(5)
3724 .idle_timeout(Duration::from_secs(0));
3725 let db = Database::new(options, Executor::Deterministic(background))
3726 .await
3727 .unwrap();
3728 let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
3729 db.migrate(Path::new(migrations_path), false).await.unwrap();
3730 db
3731 });
3732
3733 db.runtime = Some(runtime);
3734
3735 Self {
3736 db: Some(Arc::new(db)),
3737 connection: None,
3738 }
3739 }
3740
3741 pub fn db(&self) -> &Arc<Database> {
3742 self.db.as_ref().unwrap()
3743 }
3744 }
3745
3746 impl Drop for TestDb {
3747 fn drop(&mut self) {
3748 let db = self.db.take().unwrap();
3749 if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
3750 db.runtime.as_ref().unwrap().block_on(async {
3751 use util::ResultExt;
3752 let query = "
3753 SELECT pg_terminate_backend(pg_stat_activity.pid)
3754 FROM pg_stat_activity
3755 WHERE
3756 pg_stat_activity.datname = current_database() AND
3757 pid <> pg_backend_pid();
3758 ";
3759 db.pool
3760 .execute(sea_orm::Statement::from_string(
3761 db.pool.get_database_backend(),
3762 query.into(),
3763 ))
3764 .await
3765 .log_err();
3766 sqlx::Postgres::drop_database(db.options.get_url())
3767 .await
3768 .log_err();
3769 })
3770 }
3771 }
3772 }
3773}