1mod access_token;
2mod channel;
3mod channel_member;
4mod channel_parent;
5mod contact;
6mod follower;
7mod language_server;
8mod project;
9mod project_collaborator;
10mod room;
11mod room_participant;
12mod server;
13mod signup;
14#[cfg(test)]
15mod tests;
16mod user;
17mod worktree;
18mod worktree_diagnostic_summary;
19mod worktree_entry;
20mod worktree_repository;
21mod worktree_repository_statuses;
22mod worktree_settings_file;
23
24use crate::executor::Executor;
25use crate::{Error, Result};
26use anyhow::anyhow;
27use collections::{BTreeMap, HashMap, HashSet};
28pub use contact::Contact;
29use dashmap::DashMap;
30use futures::StreamExt;
31use hyper::StatusCode;
32use rand::prelude::StdRng;
33use rand::{Rng, SeedableRng};
34use rpc::{proto, ConnectionId};
35use sea_orm::Condition;
36pub use sea_orm::ConnectOptions;
37use sea_orm::{
38 entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
39 DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
40 Statement, TransactionTrait,
41};
42use sea_query::{
43 Alias, ColumnRef, CommonTableExpression, Expr, OnConflict, Order, Query, QueryStatementWriter,
44 SelectStatement, UnionType, WithClause,
45};
46use serde::{Deserialize, Serialize};
47pub use signup::{Invite, NewSignup, WaitlistSummary};
48use sqlx::migrate::{Migrate, Migration, MigrationSource};
49use sqlx::Connection;
50use std::ops::{Deref, DerefMut};
51use std::path::Path;
52use std::time::Duration;
53use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
54use tokio::sync::{Mutex, OwnedMutexGuard};
55pub use user::Model as User;
56
57pub struct Database {
58 options: ConnectOptions,
59 pool: DatabaseConnection,
60 rooms: DashMap<RoomId, Arc<Mutex<()>>>,
61 rng: Mutex<StdRng>,
62 executor: Executor,
63 #[cfg(test)]
64 runtime: Option<tokio::runtime::Runtime>,
65}
66
67impl Database {
68 pub async fn new(options: ConnectOptions, executor: Executor) -> Result<Self> {
69 Ok(Self {
70 options: options.clone(),
71 pool: sea_orm::Database::connect(options).await?,
72 rooms: DashMap::with_capacity(16384),
73 rng: Mutex::new(StdRng::seed_from_u64(0)),
74 executor,
75 #[cfg(test)]
76 runtime: None,
77 })
78 }
79
80 #[cfg(test)]
81 pub fn reset(&self) {
82 self.rooms.clear();
83 }
84
85 pub async fn migrate(
86 &self,
87 migrations_path: &Path,
88 ignore_checksum_mismatch: bool,
89 ) -> anyhow::Result<Vec<(Migration, Duration)>> {
90 let migrations = MigrationSource::resolve(migrations_path)
91 .await
92 .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
93
94 let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
95
96 connection.ensure_migrations_table().await?;
97 let applied_migrations: HashMap<_, _> = connection
98 .list_applied_migrations()
99 .await?
100 .into_iter()
101 .map(|m| (m.version, m))
102 .collect();
103
104 let mut new_migrations = Vec::new();
105 for migration in migrations {
106 match applied_migrations.get(&migration.version) {
107 Some(applied_migration) => {
108 if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
109 {
110 Err(anyhow!(
111 "checksum mismatch for applied migration {}",
112 migration.description
113 ))?;
114 }
115 }
116 None => {
117 let elapsed = connection.apply(&migration).await?;
118 new_migrations.push((migration, elapsed));
119 }
120 }
121 }
122
123 Ok(new_migrations)
124 }
125
126 pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
127 self.transaction(|tx| async move {
128 let server = server::ActiveModel {
129 environment: ActiveValue::set(environment.into()),
130 ..Default::default()
131 }
132 .insert(&*tx)
133 .await?;
134 Ok(server.id)
135 })
136 .await
137 }
138
139 pub async fn stale_room_ids(
140 &self,
141 environment: &str,
142 new_server_id: ServerId,
143 ) -> Result<Vec<RoomId>> {
144 self.transaction(|tx| async move {
145 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
146 enum QueryAs {
147 RoomId,
148 }
149
150 let stale_server_epochs = self
151 .stale_server_ids(environment, new_server_id, &tx)
152 .await?;
153 Ok(room_participant::Entity::find()
154 .select_only()
155 .column(room_participant::Column::RoomId)
156 .distinct()
157 .filter(
158 room_participant::Column::AnsweringConnectionServerId
159 .is_in(stale_server_epochs),
160 )
161 .into_values::<_, QueryAs>()
162 .all(&*tx)
163 .await?)
164 })
165 .await
166 }
167
168 pub async fn refresh_room(
169 &self,
170 room_id: RoomId,
171 new_server_id: ServerId,
172 ) -> Result<RoomGuard<RefreshedRoom>> {
173 self.room_transaction(room_id, |tx| async move {
174 let stale_participant_filter = Condition::all()
175 .add(room_participant::Column::RoomId.eq(room_id))
176 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
177 .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
178
179 let stale_participant_user_ids = room_participant::Entity::find()
180 .filter(stale_participant_filter.clone())
181 .all(&*tx)
182 .await?
183 .into_iter()
184 .map(|participant| participant.user_id)
185 .collect::<Vec<_>>();
186
187 // Delete participants who failed to reconnect and cancel their calls.
188 let mut canceled_calls_to_user_ids = Vec::new();
189 room_participant::Entity::delete_many()
190 .filter(stale_participant_filter)
191 .exec(&*tx)
192 .await?;
193 let called_participants = room_participant::Entity::find()
194 .filter(
195 Condition::all()
196 .add(
197 room_participant::Column::CallingUserId
198 .is_in(stale_participant_user_ids.iter().copied()),
199 )
200 .add(room_participant::Column::AnsweringConnectionId.is_null()),
201 )
202 .all(&*tx)
203 .await?;
204 room_participant::Entity::delete_many()
205 .filter(
206 room_participant::Column::Id
207 .is_in(called_participants.iter().map(|participant| participant.id)),
208 )
209 .exec(&*tx)
210 .await?;
211 canceled_calls_to_user_ids.extend(
212 called_participants
213 .into_iter()
214 .map(|participant| participant.user_id),
215 );
216
217 let room = self.get_room(room_id, &tx).await?;
218 // Delete the room if it becomes empty.
219 if room.participants.is_empty() {
220 project::Entity::delete_many()
221 .filter(project::Column::RoomId.eq(room_id))
222 .exec(&*tx)
223 .await?;
224 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
225 }
226
227 Ok(RefreshedRoom {
228 room,
229 stale_participant_user_ids,
230 canceled_calls_to_user_ids,
231 })
232 })
233 .await
234 }
235
236 pub async fn delete_stale_servers(
237 &self,
238 environment: &str,
239 new_server_id: ServerId,
240 ) -> Result<()> {
241 self.transaction(|tx| async move {
242 server::Entity::delete_many()
243 .filter(
244 Condition::all()
245 .add(server::Column::Environment.eq(environment))
246 .add(server::Column::Id.ne(new_server_id)),
247 )
248 .exec(&*tx)
249 .await?;
250 Ok(())
251 })
252 .await
253 }
254
255 async fn stale_server_ids(
256 &self,
257 environment: &str,
258 new_server_id: ServerId,
259 tx: &DatabaseTransaction,
260 ) -> Result<Vec<ServerId>> {
261 let stale_servers = server::Entity::find()
262 .filter(
263 Condition::all()
264 .add(server::Column::Environment.eq(environment))
265 .add(server::Column::Id.ne(new_server_id)),
266 )
267 .all(&*tx)
268 .await?;
269 Ok(stale_servers.into_iter().map(|server| server.id).collect())
270 }
271
272 // users
273
274 pub async fn create_user(
275 &self,
276 email_address: &str,
277 admin: bool,
278 params: NewUserParams,
279 ) -> Result<NewUserResult> {
280 self.transaction(|tx| async {
281 let tx = tx;
282 let user = user::Entity::insert(user::ActiveModel {
283 email_address: ActiveValue::set(Some(email_address.into())),
284 github_login: ActiveValue::set(params.github_login.clone()),
285 github_user_id: ActiveValue::set(Some(params.github_user_id)),
286 admin: ActiveValue::set(admin),
287 metrics_id: ActiveValue::set(Uuid::new_v4()),
288 ..Default::default()
289 })
290 .on_conflict(
291 OnConflict::column(user::Column::GithubLogin)
292 .update_column(user::Column::GithubLogin)
293 .to_owned(),
294 )
295 .exec_with_returning(&*tx)
296 .await?;
297
298 Ok(NewUserResult {
299 user_id: user.id,
300 metrics_id: user.metrics_id.to_string(),
301 signup_device_id: None,
302 inviting_user_id: None,
303 })
304 })
305 .await
306 }
307
308 pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
309 self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
310 .await
311 }
312
313 pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
314 self.transaction(|tx| async {
315 let tx = tx;
316 Ok(user::Entity::find()
317 .filter(user::Column::Id.is_in(ids.iter().copied()))
318 .all(&*tx)
319 .await?)
320 })
321 .await
322 }
323
324 pub async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>> {
325 self.transaction(|tx| async move {
326 Ok(user::Entity::find()
327 .filter(user::Column::GithubLogin.eq(github_login))
328 .one(&*tx)
329 .await?)
330 })
331 .await
332 }
333
334 pub async fn get_or_create_user_by_github_account(
335 &self,
336 github_login: &str,
337 github_user_id: Option<i32>,
338 github_email: Option<&str>,
339 ) -> Result<Option<User>> {
340 self.transaction(|tx| async move {
341 let tx = &*tx;
342 if let Some(github_user_id) = github_user_id {
343 if let Some(user_by_github_user_id) = user::Entity::find()
344 .filter(user::Column::GithubUserId.eq(github_user_id))
345 .one(tx)
346 .await?
347 {
348 let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
349 user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
350 Ok(Some(user_by_github_user_id.update(tx).await?))
351 } else if let Some(user_by_github_login) = user::Entity::find()
352 .filter(user::Column::GithubLogin.eq(github_login))
353 .one(tx)
354 .await?
355 {
356 let mut user_by_github_login = user_by_github_login.into_active_model();
357 user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
358 Ok(Some(user_by_github_login.update(tx).await?))
359 } else {
360 let user = user::Entity::insert(user::ActiveModel {
361 email_address: ActiveValue::set(github_email.map(|email| email.into())),
362 github_login: ActiveValue::set(github_login.into()),
363 github_user_id: ActiveValue::set(Some(github_user_id)),
364 admin: ActiveValue::set(false),
365 invite_count: ActiveValue::set(0),
366 invite_code: ActiveValue::set(None),
367 metrics_id: ActiveValue::set(Uuid::new_v4()),
368 ..Default::default()
369 })
370 .exec_with_returning(&*tx)
371 .await?;
372 Ok(Some(user))
373 }
374 } else {
375 Ok(user::Entity::find()
376 .filter(user::Column::GithubLogin.eq(github_login))
377 .one(tx)
378 .await?)
379 }
380 })
381 .await
382 }
383
384 pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
385 self.transaction(|tx| async move {
386 Ok(user::Entity::find()
387 .order_by_asc(user::Column::GithubLogin)
388 .limit(limit as u64)
389 .offset(page as u64 * limit as u64)
390 .all(&*tx)
391 .await?)
392 })
393 .await
394 }
395
396 pub async fn get_users_with_no_invites(
397 &self,
398 invited_by_another_user: bool,
399 ) -> Result<Vec<User>> {
400 self.transaction(|tx| async move {
401 Ok(user::Entity::find()
402 .filter(
403 user::Column::InviteCount
404 .eq(0)
405 .and(if invited_by_another_user {
406 user::Column::InviterId.is_not_null()
407 } else {
408 user::Column::InviterId.is_null()
409 }),
410 )
411 .all(&*tx)
412 .await?)
413 })
414 .await
415 }
416
417 pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
418 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
419 enum QueryAs {
420 MetricsId,
421 }
422
423 self.transaction(|tx| async move {
424 let metrics_id: Uuid = user::Entity::find_by_id(id)
425 .select_only()
426 .column(user::Column::MetricsId)
427 .into_values::<_, QueryAs>()
428 .one(&*tx)
429 .await?
430 .ok_or_else(|| anyhow!("could not find user"))?;
431 Ok(metrics_id.to_string())
432 })
433 .await
434 }
435
436 pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
437 self.transaction(|tx| async move {
438 user::Entity::update_many()
439 .filter(user::Column::Id.eq(id))
440 .set(user::ActiveModel {
441 admin: ActiveValue::set(is_admin),
442 ..Default::default()
443 })
444 .exec(&*tx)
445 .await?;
446 Ok(())
447 })
448 .await
449 }
450
451 pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
452 self.transaction(|tx| async move {
453 user::Entity::update_many()
454 .filter(user::Column::Id.eq(id))
455 .set(user::ActiveModel {
456 connected_once: ActiveValue::set(connected_once),
457 ..Default::default()
458 })
459 .exec(&*tx)
460 .await?;
461 Ok(())
462 })
463 .await
464 }
465
466 pub async fn destroy_user(&self, id: UserId) -> Result<()> {
467 self.transaction(|tx| async move {
468 access_token::Entity::delete_many()
469 .filter(access_token::Column::UserId.eq(id))
470 .exec(&*tx)
471 .await?;
472 user::Entity::delete_by_id(id).exec(&*tx).await?;
473 Ok(())
474 })
475 .await
476 }
477
478 // contacts
479
480 pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
481 #[derive(Debug, FromQueryResult)]
482 struct ContactWithUserBusyStatuses {
483 user_id_a: UserId,
484 user_id_b: UserId,
485 a_to_b: bool,
486 accepted: bool,
487 should_notify: bool,
488 user_a_busy: bool,
489 user_b_busy: bool,
490 }
491
492 self.transaction(|tx| async move {
493 let user_a_participant = Alias::new("user_a_participant");
494 let user_b_participant = Alias::new("user_b_participant");
495 let mut db_contacts = contact::Entity::find()
496 .column_as(
497 Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
498 .is_not_null(),
499 "user_a_busy",
500 )
501 .column_as(
502 Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
503 .is_not_null(),
504 "user_b_busy",
505 )
506 .filter(
507 contact::Column::UserIdA
508 .eq(user_id)
509 .or(contact::Column::UserIdB.eq(user_id)),
510 )
511 .join_as(
512 JoinType::LeftJoin,
513 contact::Relation::UserARoomParticipant.def(),
514 user_a_participant,
515 )
516 .join_as(
517 JoinType::LeftJoin,
518 contact::Relation::UserBRoomParticipant.def(),
519 user_b_participant,
520 )
521 .into_model::<ContactWithUserBusyStatuses>()
522 .stream(&*tx)
523 .await?;
524
525 let mut contacts = Vec::new();
526 while let Some(db_contact) = db_contacts.next().await {
527 let db_contact = db_contact?;
528 if db_contact.user_id_a == user_id {
529 if db_contact.accepted {
530 contacts.push(Contact::Accepted {
531 user_id: db_contact.user_id_b,
532 should_notify: db_contact.should_notify && db_contact.a_to_b,
533 busy: db_contact.user_b_busy,
534 });
535 } else if db_contact.a_to_b {
536 contacts.push(Contact::Outgoing {
537 user_id: db_contact.user_id_b,
538 })
539 } else {
540 contacts.push(Contact::Incoming {
541 user_id: db_contact.user_id_b,
542 should_notify: db_contact.should_notify,
543 });
544 }
545 } else if db_contact.accepted {
546 contacts.push(Contact::Accepted {
547 user_id: db_contact.user_id_a,
548 should_notify: db_contact.should_notify && !db_contact.a_to_b,
549 busy: db_contact.user_a_busy,
550 });
551 } else if db_contact.a_to_b {
552 contacts.push(Contact::Incoming {
553 user_id: db_contact.user_id_a,
554 should_notify: db_contact.should_notify,
555 });
556 } else {
557 contacts.push(Contact::Outgoing {
558 user_id: db_contact.user_id_a,
559 });
560 }
561 }
562
563 contacts.sort_unstable_by_key(|contact| contact.user_id());
564
565 Ok(contacts)
566 })
567 .await
568 }
569
570 pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
571 self.transaction(|tx| async move {
572 let participant = room_participant::Entity::find()
573 .filter(room_participant::Column::UserId.eq(user_id))
574 .one(&*tx)
575 .await?;
576 Ok(participant.is_some())
577 })
578 .await
579 }
580
581 pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
582 self.transaction(|tx| async move {
583 let (id_a, id_b) = if user_id_1 < user_id_2 {
584 (user_id_1, user_id_2)
585 } else {
586 (user_id_2, user_id_1)
587 };
588
589 Ok(contact::Entity::find()
590 .filter(
591 contact::Column::UserIdA
592 .eq(id_a)
593 .and(contact::Column::UserIdB.eq(id_b))
594 .and(contact::Column::Accepted.eq(true)),
595 )
596 .one(&*tx)
597 .await?
598 .is_some())
599 })
600 .await
601 }
602
603 pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
604 self.transaction(|tx| async move {
605 let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
606 (sender_id, receiver_id, true)
607 } else {
608 (receiver_id, sender_id, false)
609 };
610
611 let rows_affected = contact::Entity::insert(contact::ActiveModel {
612 user_id_a: ActiveValue::set(id_a),
613 user_id_b: ActiveValue::set(id_b),
614 a_to_b: ActiveValue::set(a_to_b),
615 accepted: ActiveValue::set(false),
616 should_notify: ActiveValue::set(true),
617 ..Default::default()
618 })
619 .on_conflict(
620 OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
621 .values([
622 (contact::Column::Accepted, true.into()),
623 (contact::Column::ShouldNotify, false.into()),
624 ])
625 .action_and_where(
626 contact::Column::Accepted.eq(false).and(
627 contact::Column::AToB
628 .eq(a_to_b)
629 .and(contact::Column::UserIdA.eq(id_b))
630 .or(contact::Column::AToB
631 .ne(a_to_b)
632 .and(contact::Column::UserIdA.eq(id_a))),
633 ),
634 )
635 .to_owned(),
636 )
637 .exec_without_returning(&*tx)
638 .await?;
639
640 if rows_affected == 1 {
641 Ok(())
642 } else {
643 Err(anyhow!("contact already requested"))?
644 }
645 })
646 .await
647 }
648
649 /// Returns a bool indicating whether the removed contact had originally accepted or not
650 ///
651 /// Deletes the contact identified by the requester and responder ids, and then returns
652 /// whether the deleted contact had originally accepted or was a pending contact request.
653 ///
654 /// # Arguments
655 ///
656 /// * `requester_id` - The user that initiates this request
657 /// * `responder_id` - The user that will be removed
658 pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<bool> {
659 self.transaction(|tx| async move {
660 let (id_a, id_b) = if responder_id < requester_id {
661 (responder_id, requester_id)
662 } else {
663 (requester_id, responder_id)
664 };
665
666 let contact = contact::Entity::find()
667 .filter(
668 contact::Column::UserIdA
669 .eq(id_a)
670 .and(contact::Column::UserIdB.eq(id_b)),
671 )
672 .one(&*tx)
673 .await?
674 .ok_or_else(|| anyhow!("no such contact"))?;
675
676 contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
677 Ok(contact.accepted)
678 })
679 .await
680 }
681
682 pub async fn dismiss_contact_notification(
683 &self,
684 user_id: UserId,
685 contact_user_id: UserId,
686 ) -> Result<()> {
687 self.transaction(|tx| async move {
688 let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
689 (user_id, contact_user_id, true)
690 } else {
691 (contact_user_id, user_id, false)
692 };
693
694 let result = contact::Entity::update_many()
695 .set(contact::ActiveModel {
696 should_notify: ActiveValue::set(false),
697 ..Default::default()
698 })
699 .filter(
700 contact::Column::UserIdA
701 .eq(id_a)
702 .and(contact::Column::UserIdB.eq(id_b))
703 .and(
704 contact::Column::AToB
705 .eq(a_to_b)
706 .and(contact::Column::Accepted.eq(true))
707 .or(contact::Column::AToB
708 .ne(a_to_b)
709 .and(contact::Column::Accepted.eq(false))),
710 ),
711 )
712 .exec(&*tx)
713 .await?;
714 if result.rows_affected == 0 {
715 Err(anyhow!("no such contact request"))?
716 } else {
717 Ok(())
718 }
719 })
720 .await
721 }
722
723 pub async fn respond_to_contact_request(
724 &self,
725 responder_id: UserId,
726 requester_id: UserId,
727 accept: bool,
728 ) -> Result<()> {
729 self.transaction(|tx| async move {
730 let (id_a, id_b, a_to_b) = if responder_id < requester_id {
731 (responder_id, requester_id, false)
732 } else {
733 (requester_id, responder_id, true)
734 };
735 let rows_affected = if accept {
736 let result = contact::Entity::update_many()
737 .set(contact::ActiveModel {
738 accepted: ActiveValue::set(true),
739 should_notify: ActiveValue::set(true),
740 ..Default::default()
741 })
742 .filter(
743 contact::Column::UserIdA
744 .eq(id_a)
745 .and(contact::Column::UserIdB.eq(id_b))
746 .and(contact::Column::AToB.eq(a_to_b)),
747 )
748 .exec(&*tx)
749 .await?;
750 result.rows_affected
751 } else {
752 let result = contact::Entity::delete_many()
753 .filter(
754 contact::Column::UserIdA
755 .eq(id_a)
756 .and(contact::Column::UserIdB.eq(id_b))
757 .and(contact::Column::AToB.eq(a_to_b))
758 .and(contact::Column::Accepted.eq(false)),
759 )
760 .exec(&*tx)
761 .await?;
762
763 result.rows_affected
764 };
765
766 if rows_affected == 1 {
767 Ok(())
768 } else {
769 Err(anyhow!("no such contact request"))?
770 }
771 })
772 .await
773 }
774
775 pub fn fuzzy_like_string(string: &str) -> String {
776 let mut result = String::with_capacity(string.len() * 2 + 1);
777 for c in string.chars() {
778 if c.is_alphanumeric() {
779 result.push('%');
780 result.push(c);
781 }
782 }
783 result.push('%');
784 result
785 }
786
787 pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
788 self.transaction(|tx| async {
789 let tx = tx;
790 let like_string = Self::fuzzy_like_string(name_query);
791 let query = "
792 SELECT users.*
793 FROM users
794 WHERE github_login ILIKE $1
795 ORDER BY github_login <-> $2
796 LIMIT $3
797 ";
798
799 Ok(user::Entity::find()
800 .from_raw_sql(Statement::from_sql_and_values(
801 self.pool.get_database_backend(),
802 query.into(),
803 vec![like_string.into(), name_query.into(), limit.into()],
804 ))
805 .all(&*tx)
806 .await?)
807 })
808 .await
809 }
810
811 // signups
812
813 pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
814 self.transaction(|tx| async move {
815 signup::Entity::insert(signup::ActiveModel {
816 email_address: ActiveValue::set(signup.email_address.clone()),
817 email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
818 email_confirmation_sent: ActiveValue::set(false),
819 platform_mac: ActiveValue::set(signup.platform_mac),
820 platform_windows: ActiveValue::set(signup.platform_windows),
821 platform_linux: ActiveValue::set(signup.platform_linux),
822 platform_unknown: ActiveValue::set(false),
823 editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
824 programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
825 device_id: ActiveValue::set(signup.device_id.clone()),
826 added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
827 ..Default::default()
828 })
829 .on_conflict(
830 OnConflict::column(signup::Column::EmailAddress)
831 .update_columns([
832 signup::Column::PlatformMac,
833 signup::Column::PlatformWindows,
834 signup::Column::PlatformLinux,
835 signup::Column::EditorFeatures,
836 signup::Column::ProgrammingLanguages,
837 signup::Column::DeviceId,
838 signup::Column::AddedToMailingList,
839 ])
840 .to_owned(),
841 )
842 .exec(&*tx)
843 .await?;
844 Ok(())
845 })
846 .await
847 }
848
849 pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
850 self.transaction(|tx| async move {
851 let signup = signup::Entity::find()
852 .filter(signup::Column::EmailAddress.eq(email_address))
853 .one(&*tx)
854 .await?
855 .ok_or_else(|| {
856 anyhow!("signup with email address {} doesn't exist", email_address)
857 })?;
858
859 Ok(signup)
860 })
861 .await
862 }
863
864 pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
865 self.transaction(|tx| async move {
866 let query = "
867 SELECT
868 COUNT(*) as count,
869 COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
870 COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
871 COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
872 COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
873 FROM (
874 SELECT *
875 FROM signups
876 WHERE
877 NOT email_confirmation_sent
878 ) AS unsent
879 ";
880 Ok(
881 WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
882 self.pool.get_database_backend(),
883 query.into(),
884 vec![],
885 ))
886 .one(&*tx)
887 .await?
888 .ok_or_else(|| anyhow!("invalid result"))?,
889 )
890 })
891 .await
892 }
893
894 pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
895 let emails = invites
896 .iter()
897 .map(|s| s.email_address.as_str())
898 .collect::<Vec<_>>();
899 self.transaction(|tx| async {
900 let tx = tx;
901 signup::Entity::update_many()
902 .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
903 .set(signup::ActiveModel {
904 email_confirmation_sent: ActiveValue::set(true),
905 ..Default::default()
906 })
907 .exec(&*tx)
908 .await?;
909 Ok(())
910 })
911 .await
912 }
913
914 pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
915 self.transaction(|tx| async move {
916 Ok(signup::Entity::find()
917 .select_only()
918 .column(signup::Column::EmailAddress)
919 .column(signup::Column::EmailConfirmationCode)
920 .filter(
921 signup::Column::EmailConfirmationSent.eq(false).and(
922 signup::Column::PlatformMac
923 .eq(true)
924 .or(signup::Column::PlatformUnknown.eq(true)),
925 ),
926 )
927 .order_by_asc(signup::Column::CreatedAt)
928 .limit(count as u64)
929 .into_model()
930 .all(&*tx)
931 .await?)
932 })
933 .await
934 }
935
936 // invite codes
937
938 pub async fn create_invite_from_code(
939 &self,
940 code: &str,
941 email_address: &str,
942 device_id: Option<&str>,
943 added_to_mailing_list: bool,
944 ) -> Result<Invite> {
945 self.transaction(|tx| async move {
946 let existing_user = user::Entity::find()
947 .filter(user::Column::EmailAddress.eq(email_address))
948 .one(&*tx)
949 .await?;
950
951 if existing_user.is_some() {
952 Err(anyhow!("email address is already in use"))?;
953 }
954
955 let inviting_user_with_invites = match user::Entity::find()
956 .filter(
957 user::Column::InviteCode
958 .eq(code)
959 .and(user::Column::InviteCount.gt(0)),
960 )
961 .one(&*tx)
962 .await?
963 {
964 Some(inviting_user) => inviting_user,
965 None => {
966 return Err(Error::Http(
967 StatusCode::UNAUTHORIZED,
968 "unable to find an invite code with invites remaining".to_string(),
969 ))?
970 }
971 };
972 user::Entity::update_many()
973 .filter(
974 user::Column::Id
975 .eq(inviting_user_with_invites.id)
976 .and(user::Column::InviteCount.gt(0)),
977 )
978 .col_expr(
979 user::Column::InviteCount,
980 Expr::col(user::Column::InviteCount).sub(1),
981 )
982 .exec(&*tx)
983 .await?;
984
985 let signup = signup::Entity::insert(signup::ActiveModel {
986 email_address: ActiveValue::set(email_address.into()),
987 email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
988 email_confirmation_sent: ActiveValue::set(false),
989 inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
990 platform_linux: ActiveValue::set(false),
991 platform_mac: ActiveValue::set(false),
992 platform_windows: ActiveValue::set(false),
993 platform_unknown: ActiveValue::set(true),
994 device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
995 added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
996 ..Default::default()
997 })
998 .on_conflict(
999 OnConflict::column(signup::Column::EmailAddress)
1000 .update_column(signup::Column::InvitingUserId)
1001 .to_owned(),
1002 )
1003 .exec_with_returning(&*tx)
1004 .await?;
1005
1006 Ok(Invite {
1007 email_address: signup.email_address,
1008 email_confirmation_code: signup.email_confirmation_code,
1009 })
1010 })
1011 .await
1012 }
1013
1014 pub async fn create_user_from_invite(
1015 &self,
1016 invite: &Invite,
1017 user: NewUserParams,
1018 ) -> Result<Option<NewUserResult>> {
1019 self.transaction(|tx| async {
1020 let tx = tx;
1021 let signup = signup::Entity::find()
1022 .filter(
1023 signup::Column::EmailAddress
1024 .eq(invite.email_address.as_str())
1025 .and(
1026 signup::Column::EmailConfirmationCode
1027 .eq(invite.email_confirmation_code.as_str()),
1028 ),
1029 )
1030 .one(&*tx)
1031 .await?
1032 .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
1033
1034 if signup.user_id.is_some() {
1035 return Ok(None);
1036 }
1037
1038 let user = user::Entity::insert(user::ActiveModel {
1039 email_address: ActiveValue::set(Some(invite.email_address.clone())),
1040 github_login: ActiveValue::set(user.github_login.clone()),
1041 github_user_id: ActiveValue::set(Some(user.github_user_id)),
1042 admin: ActiveValue::set(false),
1043 invite_count: ActiveValue::set(user.invite_count),
1044 invite_code: ActiveValue::set(Some(random_invite_code())),
1045 metrics_id: ActiveValue::set(Uuid::new_v4()),
1046 ..Default::default()
1047 })
1048 .on_conflict(
1049 OnConflict::column(user::Column::GithubLogin)
1050 .update_columns([
1051 user::Column::EmailAddress,
1052 user::Column::GithubUserId,
1053 user::Column::Admin,
1054 ])
1055 .to_owned(),
1056 )
1057 .exec_with_returning(&*tx)
1058 .await?;
1059
1060 let mut signup = signup.into_active_model();
1061 signup.user_id = ActiveValue::set(Some(user.id));
1062 let signup = signup.update(&*tx).await?;
1063
1064 if let Some(inviting_user_id) = signup.inviting_user_id {
1065 let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1066 (inviting_user_id, user.id, true)
1067 } else {
1068 (user.id, inviting_user_id, false)
1069 };
1070
1071 contact::Entity::insert(contact::ActiveModel {
1072 user_id_a: ActiveValue::set(user_id_a),
1073 user_id_b: ActiveValue::set(user_id_b),
1074 a_to_b: ActiveValue::set(a_to_b),
1075 should_notify: ActiveValue::set(true),
1076 accepted: ActiveValue::set(true),
1077 ..Default::default()
1078 })
1079 .on_conflict(OnConflict::new().do_nothing().to_owned())
1080 .exec_without_returning(&*tx)
1081 .await?;
1082 }
1083
1084 Ok(Some(NewUserResult {
1085 user_id: user.id,
1086 metrics_id: user.metrics_id.to_string(),
1087 inviting_user_id: signup.inviting_user_id,
1088 signup_device_id: signup.device_id,
1089 }))
1090 })
1091 .await
1092 }
1093
1094 pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1095 self.transaction(|tx| async move {
1096 if count > 0 {
1097 user::Entity::update_many()
1098 .filter(
1099 user::Column::Id
1100 .eq(id)
1101 .and(user::Column::InviteCode.is_null()),
1102 )
1103 .set(user::ActiveModel {
1104 invite_code: ActiveValue::set(Some(random_invite_code())),
1105 ..Default::default()
1106 })
1107 .exec(&*tx)
1108 .await?;
1109 }
1110
1111 user::Entity::update_many()
1112 .filter(user::Column::Id.eq(id))
1113 .set(user::ActiveModel {
1114 invite_count: ActiveValue::set(count),
1115 ..Default::default()
1116 })
1117 .exec(&*tx)
1118 .await?;
1119 Ok(())
1120 })
1121 .await
1122 }
1123
1124 pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1125 self.transaction(|tx| async move {
1126 match user::Entity::find_by_id(id).one(&*tx).await? {
1127 Some(user) if user.invite_code.is_some() => {
1128 Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1129 }
1130 _ => Ok(None),
1131 }
1132 })
1133 .await
1134 }
1135
1136 pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1137 self.transaction(|tx| async move {
1138 user::Entity::find()
1139 .filter(user::Column::InviteCode.eq(code))
1140 .one(&*tx)
1141 .await?
1142 .ok_or_else(|| {
1143 Error::Http(
1144 StatusCode::NOT_FOUND,
1145 "that invite code does not exist".to_string(),
1146 )
1147 })
1148 })
1149 .await
1150 }
1151
1152 // rooms
1153
1154 pub async fn incoming_call_for_user(
1155 &self,
1156 user_id: UserId,
1157 ) -> Result<Option<proto::IncomingCall>> {
1158 self.transaction(|tx| async move {
1159 let pending_participant = room_participant::Entity::find()
1160 .filter(
1161 room_participant::Column::UserId
1162 .eq(user_id)
1163 .and(room_participant::Column::AnsweringConnectionId.is_null()),
1164 )
1165 .one(&*tx)
1166 .await?;
1167
1168 if let Some(pending_participant) = pending_participant {
1169 let room = self.get_room(pending_participant.room_id, &tx).await?;
1170 Ok(Self::build_incoming_call(&room, user_id))
1171 } else {
1172 Ok(None)
1173 }
1174 })
1175 .await
1176 }
1177
1178 pub async fn create_room(
1179 &self,
1180 user_id: UserId,
1181 connection: ConnectionId,
1182 live_kit_room: &str,
1183 ) -> Result<proto::Room> {
1184 self.transaction(|tx| async move {
1185 let room = room::ActiveModel {
1186 live_kit_room: ActiveValue::set(live_kit_room.into()),
1187 ..Default::default()
1188 }
1189 .insert(&*tx)
1190 .await?;
1191 room_participant::ActiveModel {
1192 room_id: ActiveValue::set(room.id),
1193 user_id: ActiveValue::set(user_id),
1194 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1195 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1196 connection.owner_id as i32,
1197 ))),
1198 answering_connection_lost: ActiveValue::set(false),
1199 calling_user_id: ActiveValue::set(user_id),
1200 calling_connection_id: ActiveValue::set(connection.id as i32),
1201 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1202 connection.owner_id as i32,
1203 ))),
1204 ..Default::default()
1205 }
1206 .insert(&*tx)
1207 .await?;
1208
1209 let room = self.get_room(room.id, &tx).await?;
1210 Ok(room)
1211 })
1212 .await
1213 }
1214
1215 pub async fn call(
1216 &self,
1217 room_id: RoomId,
1218 calling_user_id: UserId,
1219 calling_connection: ConnectionId,
1220 called_user_id: UserId,
1221 initial_project_id: Option<ProjectId>,
1222 ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1223 self.room_transaction(room_id, |tx| async move {
1224 room_participant::ActiveModel {
1225 room_id: ActiveValue::set(room_id),
1226 user_id: ActiveValue::set(called_user_id),
1227 answering_connection_lost: ActiveValue::set(false),
1228 calling_user_id: ActiveValue::set(calling_user_id),
1229 calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1230 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1231 calling_connection.owner_id as i32,
1232 ))),
1233 initial_project_id: ActiveValue::set(initial_project_id),
1234 ..Default::default()
1235 }
1236 .insert(&*tx)
1237 .await?;
1238
1239 let room = self.get_room(room_id, &tx).await?;
1240 let incoming_call = Self::build_incoming_call(&room, called_user_id)
1241 .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1242 Ok((room, incoming_call))
1243 })
1244 .await
1245 }
1246
1247 pub async fn call_failed(
1248 &self,
1249 room_id: RoomId,
1250 called_user_id: UserId,
1251 ) -> Result<RoomGuard<proto::Room>> {
1252 self.room_transaction(room_id, |tx| async move {
1253 room_participant::Entity::delete_many()
1254 .filter(
1255 room_participant::Column::RoomId
1256 .eq(room_id)
1257 .and(room_participant::Column::UserId.eq(called_user_id)),
1258 )
1259 .exec(&*tx)
1260 .await?;
1261 let room = self.get_room(room_id, &tx).await?;
1262 Ok(room)
1263 })
1264 .await
1265 }
1266
1267 pub async fn decline_call(
1268 &self,
1269 expected_room_id: Option<RoomId>,
1270 user_id: UserId,
1271 ) -> Result<Option<RoomGuard<proto::Room>>> {
1272 self.optional_room_transaction(|tx| async move {
1273 let mut filter = Condition::all()
1274 .add(room_participant::Column::UserId.eq(user_id))
1275 .add(room_participant::Column::AnsweringConnectionId.is_null());
1276 if let Some(room_id) = expected_room_id {
1277 filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1278 }
1279 let participant = room_participant::Entity::find()
1280 .filter(filter)
1281 .one(&*tx)
1282 .await?;
1283
1284 let participant = if let Some(participant) = participant {
1285 participant
1286 } else if expected_room_id.is_some() {
1287 return Err(anyhow!("could not find call to decline"))?;
1288 } else {
1289 return Ok(None);
1290 };
1291
1292 let room_id = participant.room_id;
1293 room_participant::Entity::delete(participant.into_active_model())
1294 .exec(&*tx)
1295 .await?;
1296
1297 let room = self.get_room(room_id, &tx).await?;
1298 Ok(Some((room_id, room)))
1299 })
1300 .await
1301 }
1302
1303 pub async fn cancel_call(
1304 &self,
1305 room_id: RoomId,
1306 calling_connection: ConnectionId,
1307 called_user_id: UserId,
1308 ) -> Result<RoomGuard<proto::Room>> {
1309 self.room_transaction(room_id, |tx| async move {
1310 let participant = room_participant::Entity::find()
1311 .filter(
1312 Condition::all()
1313 .add(room_participant::Column::UserId.eq(called_user_id))
1314 .add(room_participant::Column::RoomId.eq(room_id))
1315 .add(
1316 room_participant::Column::CallingConnectionId
1317 .eq(calling_connection.id as i32),
1318 )
1319 .add(
1320 room_participant::Column::CallingConnectionServerId
1321 .eq(calling_connection.owner_id as i32),
1322 )
1323 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1324 )
1325 .one(&*tx)
1326 .await?
1327 .ok_or_else(|| anyhow!("no call to cancel"))?;
1328
1329 room_participant::Entity::delete(participant.into_active_model())
1330 .exec(&*tx)
1331 .await?;
1332
1333 let room = self.get_room(room_id, &tx).await?;
1334 Ok(room)
1335 })
1336 .await
1337 }
1338
1339 pub async fn join_room(
1340 &self,
1341 room_id: RoomId,
1342 user_id: UserId,
1343 connection: ConnectionId,
1344 ) -> Result<RoomGuard<proto::Room>> {
1345 self.room_transaction(room_id, |tx| async move {
1346 let result = room_participant::Entity::update_many()
1347 .filter(
1348 Condition::all()
1349 .add(room_participant::Column::RoomId.eq(room_id))
1350 .add(room_participant::Column::UserId.eq(user_id))
1351 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1352 )
1353 .set(room_participant::ActiveModel {
1354 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1355 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1356 connection.owner_id as i32,
1357 ))),
1358 answering_connection_lost: ActiveValue::set(false),
1359 ..Default::default()
1360 })
1361 .exec(&*tx)
1362 .await?;
1363 if result.rows_affected == 0 {
1364 Err(anyhow!("room does not exist or was already joined"))?
1365 } else {
1366 let room = self.get_room(room_id, &tx).await?;
1367 Ok(room)
1368 }
1369 })
1370 .await
1371 }
1372
1373 pub async fn rejoin_room(
1374 &self,
1375 rejoin_room: proto::RejoinRoom,
1376 user_id: UserId,
1377 connection: ConnectionId,
1378 ) -> Result<RoomGuard<RejoinedRoom>> {
1379 let room_id = RoomId::from_proto(rejoin_room.id);
1380 self.room_transaction(room_id, |tx| async {
1381 let tx = tx;
1382 let participant_update = room_participant::Entity::update_many()
1383 .filter(
1384 Condition::all()
1385 .add(room_participant::Column::RoomId.eq(room_id))
1386 .add(room_participant::Column::UserId.eq(user_id))
1387 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1388 .add(
1389 Condition::any()
1390 .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1391 .add(
1392 room_participant::Column::AnsweringConnectionServerId
1393 .ne(connection.owner_id as i32),
1394 ),
1395 ),
1396 )
1397 .set(room_participant::ActiveModel {
1398 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1399 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1400 connection.owner_id as i32,
1401 ))),
1402 answering_connection_lost: ActiveValue::set(false),
1403 ..Default::default()
1404 })
1405 .exec(&*tx)
1406 .await?;
1407 if participant_update.rows_affected == 0 {
1408 return Err(anyhow!("room does not exist or was already joined"))?;
1409 }
1410
1411 let mut reshared_projects = Vec::new();
1412 for reshared_project in &rejoin_room.reshared_projects {
1413 let project_id = ProjectId::from_proto(reshared_project.project_id);
1414 let project = project::Entity::find_by_id(project_id)
1415 .one(&*tx)
1416 .await?
1417 .ok_or_else(|| anyhow!("project does not exist"))?;
1418 if project.host_user_id != user_id {
1419 return Err(anyhow!("no such project"))?;
1420 }
1421
1422 let mut collaborators = project
1423 .find_related(project_collaborator::Entity)
1424 .all(&*tx)
1425 .await?;
1426 let host_ix = collaborators
1427 .iter()
1428 .position(|collaborator| {
1429 collaborator.user_id == user_id && collaborator.is_host
1430 })
1431 .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1432 let host = collaborators.swap_remove(host_ix);
1433 let old_connection_id = host.connection();
1434
1435 project::Entity::update(project::ActiveModel {
1436 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1437 host_connection_server_id: ActiveValue::set(Some(ServerId(
1438 connection.owner_id as i32,
1439 ))),
1440 ..project.into_active_model()
1441 })
1442 .exec(&*tx)
1443 .await?;
1444 project_collaborator::Entity::update(project_collaborator::ActiveModel {
1445 connection_id: ActiveValue::set(connection.id as i32),
1446 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1447 ..host.into_active_model()
1448 })
1449 .exec(&*tx)
1450 .await?;
1451
1452 self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1453 .await?;
1454
1455 reshared_projects.push(ResharedProject {
1456 id: project_id,
1457 old_connection_id,
1458 collaborators: collaborators
1459 .iter()
1460 .map(|collaborator| ProjectCollaborator {
1461 connection_id: collaborator.connection(),
1462 user_id: collaborator.user_id,
1463 replica_id: collaborator.replica_id,
1464 is_host: collaborator.is_host,
1465 })
1466 .collect(),
1467 worktrees: reshared_project.worktrees.clone(),
1468 });
1469 }
1470
1471 project::Entity::delete_many()
1472 .filter(
1473 Condition::all()
1474 .add(project::Column::RoomId.eq(room_id))
1475 .add(project::Column::HostUserId.eq(user_id))
1476 .add(
1477 project::Column::Id
1478 .is_not_in(reshared_projects.iter().map(|project| project.id)),
1479 ),
1480 )
1481 .exec(&*tx)
1482 .await?;
1483
1484 let mut rejoined_projects = Vec::new();
1485 for rejoined_project in &rejoin_room.rejoined_projects {
1486 let project_id = ProjectId::from_proto(rejoined_project.id);
1487 let Some(project) = project::Entity::find_by_id(project_id)
1488 .one(&*tx)
1489 .await? else { continue };
1490
1491 let mut worktrees = Vec::new();
1492 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1493 for db_worktree in db_worktrees {
1494 let mut worktree = RejoinedWorktree {
1495 id: db_worktree.id as u64,
1496 abs_path: db_worktree.abs_path,
1497 root_name: db_worktree.root_name,
1498 visible: db_worktree.visible,
1499 updated_entries: Default::default(),
1500 removed_entries: Default::default(),
1501 updated_repositories: Default::default(),
1502 removed_repositories: Default::default(),
1503 diagnostic_summaries: Default::default(),
1504 settings_files: Default::default(),
1505 scan_id: db_worktree.scan_id as u64,
1506 completed_scan_id: db_worktree.completed_scan_id as u64,
1507 };
1508
1509 let rejoined_worktree = rejoined_project
1510 .worktrees
1511 .iter()
1512 .find(|worktree| worktree.id == db_worktree.id as u64);
1513
1514 // File entries
1515 {
1516 let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1517 worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1518 } else {
1519 worktree_entry::Column::IsDeleted.eq(false)
1520 };
1521
1522 let mut db_entries = worktree_entry::Entity::find()
1523 .filter(
1524 Condition::all()
1525 .add(worktree_entry::Column::ProjectId.eq(project.id))
1526 .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1527 .add(entry_filter),
1528 )
1529 .stream(&*tx)
1530 .await?;
1531
1532 while let Some(db_entry) = db_entries.next().await {
1533 let db_entry = db_entry?;
1534 if db_entry.is_deleted {
1535 worktree.removed_entries.push(db_entry.id as u64);
1536 } else {
1537 worktree.updated_entries.push(proto::Entry {
1538 id: db_entry.id as u64,
1539 is_dir: db_entry.is_dir,
1540 path: db_entry.path,
1541 inode: db_entry.inode as u64,
1542 mtime: Some(proto::Timestamp {
1543 seconds: db_entry.mtime_seconds as u64,
1544 nanos: db_entry.mtime_nanos as u32,
1545 }),
1546 is_symlink: db_entry.is_symlink,
1547 is_ignored: db_entry.is_ignored,
1548 is_external: db_entry.is_external,
1549 git_status: db_entry.git_status.map(|status| status as i32),
1550 });
1551 }
1552 }
1553 }
1554
1555 // Repository Entries
1556 {
1557 let repository_entry_filter =
1558 if let Some(rejoined_worktree) = rejoined_worktree {
1559 worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
1560 } else {
1561 worktree_repository::Column::IsDeleted.eq(false)
1562 };
1563
1564 let mut db_repositories = worktree_repository::Entity::find()
1565 .filter(
1566 Condition::all()
1567 .add(worktree_repository::Column::ProjectId.eq(project.id))
1568 .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
1569 .add(repository_entry_filter),
1570 )
1571 .stream(&*tx)
1572 .await?;
1573
1574 while let Some(db_repository) = db_repositories.next().await {
1575 let db_repository = db_repository?;
1576 if db_repository.is_deleted {
1577 worktree
1578 .removed_repositories
1579 .push(db_repository.work_directory_id as u64);
1580 } else {
1581 worktree.updated_repositories.push(proto::RepositoryEntry {
1582 work_directory_id: db_repository.work_directory_id as u64,
1583 branch: db_repository.branch,
1584 });
1585 }
1586 }
1587 }
1588
1589 worktrees.push(worktree);
1590 }
1591
1592 let language_servers = project
1593 .find_related(language_server::Entity)
1594 .all(&*tx)
1595 .await?
1596 .into_iter()
1597 .map(|language_server| proto::LanguageServer {
1598 id: language_server.id as u64,
1599 name: language_server.name,
1600 })
1601 .collect::<Vec<_>>();
1602
1603 {
1604 let mut db_settings_files = worktree_settings_file::Entity::find()
1605 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
1606 .stream(&*tx)
1607 .await?;
1608 while let Some(db_settings_file) = db_settings_files.next().await {
1609 let db_settings_file = db_settings_file?;
1610 if let Some(worktree) = worktrees
1611 .iter_mut()
1612 .find(|w| w.id == db_settings_file.worktree_id as u64)
1613 {
1614 worktree.settings_files.push(WorktreeSettingsFile {
1615 path: db_settings_file.path,
1616 content: db_settings_file.content,
1617 });
1618 }
1619 }
1620 }
1621
1622 let mut collaborators = project
1623 .find_related(project_collaborator::Entity)
1624 .all(&*tx)
1625 .await?;
1626 let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1627 .iter()
1628 .position(|collaborator| collaborator.user_id == user_id)
1629 {
1630 collaborators.swap_remove(self_collaborator_ix)
1631 } else {
1632 continue;
1633 };
1634 let old_connection_id = self_collaborator.connection();
1635 project_collaborator::Entity::update(project_collaborator::ActiveModel {
1636 connection_id: ActiveValue::set(connection.id as i32),
1637 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1638 ..self_collaborator.into_active_model()
1639 })
1640 .exec(&*tx)
1641 .await?;
1642
1643 let collaborators = collaborators
1644 .into_iter()
1645 .map(|collaborator| ProjectCollaborator {
1646 connection_id: collaborator.connection(),
1647 user_id: collaborator.user_id,
1648 replica_id: collaborator.replica_id,
1649 is_host: collaborator.is_host,
1650 })
1651 .collect::<Vec<_>>();
1652
1653 rejoined_projects.push(RejoinedProject {
1654 id: project_id,
1655 old_connection_id,
1656 collaborators,
1657 worktrees,
1658 language_servers,
1659 });
1660 }
1661
1662 let room = self.get_room(room_id, &tx).await?;
1663 Ok(RejoinedRoom {
1664 room,
1665 rejoined_projects,
1666 reshared_projects,
1667 })
1668 })
1669 .await
1670 }
1671
1672 pub async fn leave_room(
1673 &self,
1674 connection: ConnectionId,
1675 ) -> Result<Option<RoomGuard<LeftRoom>>> {
1676 self.optional_room_transaction(|tx| async move {
1677 let leaving_participant = room_participant::Entity::find()
1678 .filter(
1679 Condition::all()
1680 .add(
1681 room_participant::Column::AnsweringConnectionId
1682 .eq(connection.id as i32),
1683 )
1684 .add(
1685 room_participant::Column::AnsweringConnectionServerId
1686 .eq(connection.owner_id as i32),
1687 ),
1688 )
1689 .one(&*tx)
1690 .await?;
1691
1692 if let Some(leaving_participant) = leaving_participant {
1693 // Leave room.
1694 let room_id = leaving_participant.room_id;
1695 room_participant::Entity::delete_by_id(leaving_participant.id)
1696 .exec(&*tx)
1697 .await?;
1698
1699 // Cancel pending calls initiated by the leaving user.
1700 let called_participants = room_participant::Entity::find()
1701 .filter(
1702 Condition::all()
1703 .add(
1704 room_participant::Column::CallingUserId
1705 .eq(leaving_participant.user_id),
1706 )
1707 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1708 )
1709 .all(&*tx)
1710 .await?;
1711 room_participant::Entity::delete_many()
1712 .filter(
1713 room_participant::Column::Id
1714 .is_in(called_participants.iter().map(|participant| participant.id)),
1715 )
1716 .exec(&*tx)
1717 .await?;
1718 let canceled_calls_to_user_ids = called_participants
1719 .into_iter()
1720 .map(|participant| participant.user_id)
1721 .collect();
1722
1723 // Detect left projects.
1724 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1725 enum QueryProjectIds {
1726 ProjectId,
1727 }
1728 let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1729 .select_only()
1730 .column_as(
1731 project_collaborator::Column::ProjectId,
1732 QueryProjectIds::ProjectId,
1733 )
1734 .filter(
1735 Condition::all()
1736 .add(
1737 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1738 )
1739 .add(
1740 project_collaborator::Column::ConnectionServerId
1741 .eq(connection.owner_id as i32),
1742 ),
1743 )
1744 .into_values::<_, QueryProjectIds>()
1745 .all(&*tx)
1746 .await?;
1747 let mut left_projects = HashMap::default();
1748 let mut collaborators = project_collaborator::Entity::find()
1749 .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1750 .stream(&*tx)
1751 .await?;
1752 while let Some(collaborator) = collaborators.next().await {
1753 let collaborator = collaborator?;
1754 let left_project =
1755 left_projects
1756 .entry(collaborator.project_id)
1757 .or_insert(LeftProject {
1758 id: collaborator.project_id,
1759 host_user_id: Default::default(),
1760 connection_ids: Default::default(),
1761 host_connection_id: Default::default(),
1762 });
1763
1764 let collaborator_connection_id = collaborator.connection();
1765 if collaborator_connection_id != connection {
1766 left_project.connection_ids.push(collaborator_connection_id);
1767 }
1768
1769 if collaborator.is_host {
1770 left_project.host_user_id = collaborator.user_id;
1771 left_project.host_connection_id = collaborator_connection_id;
1772 }
1773 }
1774 drop(collaborators);
1775
1776 // Leave projects.
1777 project_collaborator::Entity::delete_many()
1778 .filter(
1779 Condition::all()
1780 .add(
1781 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1782 )
1783 .add(
1784 project_collaborator::Column::ConnectionServerId
1785 .eq(connection.owner_id as i32),
1786 ),
1787 )
1788 .exec(&*tx)
1789 .await?;
1790
1791 // Unshare projects.
1792 project::Entity::delete_many()
1793 .filter(
1794 Condition::all()
1795 .add(project::Column::RoomId.eq(room_id))
1796 .add(project::Column::HostConnectionId.eq(connection.id as i32))
1797 .add(
1798 project::Column::HostConnectionServerId
1799 .eq(connection.owner_id as i32),
1800 ),
1801 )
1802 .exec(&*tx)
1803 .await?;
1804
1805 let room = self.get_room(room_id, &tx).await?;
1806 if room.participants.is_empty() {
1807 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
1808 }
1809
1810 let left_room = LeftRoom {
1811 room,
1812 left_projects,
1813 canceled_calls_to_user_ids,
1814 };
1815
1816 if left_room.room.participants.is_empty() {
1817 self.rooms.remove(&room_id);
1818 }
1819
1820 Ok(Some((room_id, left_room)))
1821 } else {
1822 Ok(None)
1823 }
1824 })
1825 .await
1826 }
1827
1828 pub async fn follow(
1829 &self,
1830 project_id: ProjectId,
1831 leader_connection: ConnectionId,
1832 follower_connection: ConnectionId,
1833 ) -> Result<RoomGuard<proto::Room>> {
1834 let room_id = self.room_id_for_project(project_id).await?;
1835 self.room_transaction(room_id, |tx| async move {
1836 follower::ActiveModel {
1837 room_id: ActiveValue::set(room_id),
1838 project_id: ActiveValue::set(project_id),
1839 leader_connection_server_id: ActiveValue::set(ServerId(
1840 leader_connection.owner_id as i32,
1841 )),
1842 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1843 follower_connection_server_id: ActiveValue::set(ServerId(
1844 follower_connection.owner_id as i32,
1845 )),
1846 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1847 ..Default::default()
1848 }
1849 .insert(&*tx)
1850 .await?;
1851
1852 let room = self.get_room(room_id, &*tx).await?;
1853 Ok(room)
1854 })
1855 .await
1856 }
1857
1858 pub async fn unfollow(
1859 &self,
1860 project_id: ProjectId,
1861 leader_connection: ConnectionId,
1862 follower_connection: ConnectionId,
1863 ) -> Result<RoomGuard<proto::Room>> {
1864 let room_id = self.room_id_for_project(project_id).await?;
1865 self.room_transaction(room_id, |tx| async move {
1866 follower::Entity::delete_many()
1867 .filter(
1868 Condition::all()
1869 .add(follower::Column::ProjectId.eq(project_id))
1870 .add(
1871 follower::Column::LeaderConnectionServerId
1872 .eq(leader_connection.owner_id),
1873 )
1874 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1875 .add(
1876 follower::Column::FollowerConnectionServerId
1877 .eq(follower_connection.owner_id),
1878 )
1879 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1880 )
1881 .exec(&*tx)
1882 .await?;
1883
1884 let room = self.get_room(room_id, &*tx).await?;
1885 Ok(room)
1886 })
1887 .await
1888 }
1889
1890 pub async fn update_room_participant_location(
1891 &self,
1892 room_id: RoomId,
1893 connection: ConnectionId,
1894 location: proto::ParticipantLocation,
1895 ) -> Result<RoomGuard<proto::Room>> {
1896 self.room_transaction(room_id, |tx| async {
1897 let tx = tx;
1898 let location_kind;
1899 let location_project_id;
1900 match location
1901 .variant
1902 .as_ref()
1903 .ok_or_else(|| anyhow!("invalid location"))?
1904 {
1905 proto::participant_location::Variant::SharedProject(project) => {
1906 location_kind = 0;
1907 location_project_id = Some(ProjectId::from_proto(project.id));
1908 }
1909 proto::participant_location::Variant::UnsharedProject(_) => {
1910 location_kind = 1;
1911 location_project_id = None;
1912 }
1913 proto::participant_location::Variant::External(_) => {
1914 location_kind = 2;
1915 location_project_id = None;
1916 }
1917 }
1918
1919 let result = room_participant::Entity::update_many()
1920 .filter(
1921 Condition::all()
1922 .add(room_participant::Column::RoomId.eq(room_id))
1923 .add(
1924 room_participant::Column::AnsweringConnectionId
1925 .eq(connection.id as i32),
1926 )
1927 .add(
1928 room_participant::Column::AnsweringConnectionServerId
1929 .eq(connection.owner_id as i32),
1930 ),
1931 )
1932 .set(room_participant::ActiveModel {
1933 location_kind: ActiveValue::set(Some(location_kind)),
1934 location_project_id: ActiveValue::set(location_project_id),
1935 ..Default::default()
1936 })
1937 .exec(&*tx)
1938 .await?;
1939
1940 if result.rows_affected == 1 {
1941 let room = self.get_room(room_id, &tx).await?;
1942 Ok(room)
1943 } else {
1944 Err(anyhow!("could not update room participant location"))?
1945 }
1946 })
1947 .await
1948 }
1949
1950 pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1951 self.transaction(|tx| async move {
1952 let participant = room_participant::Entity::find()
1953 .filter(
1954 Condition::all()
1955 .add(
1956 room_participant::Column::AnsweringConnectionId
1957 .eq(connection.id as i32),
1958 )
1959 .add(
1960 room_participant::Column::AnsweringConnectionServerId
1961 .eq(connection.owner_id as i32),
1962 ),
1963 )
1964 .one(&*tx)
1965 .await?
1966 .ok_or_else(|| anyhow!("not a participant in any room"))?;
1967
1968 room_participant::Entity::update(room_participant::ActiveModel {
1969 answering_connection_lost: ActiveValue::set(true),
1970 ..participant.into_active_model()
1971 })
1972 .exec(&*tx)
1973 .await?;
1974
1975 Ok(())
1976 })
1977 .await
1978 }
1979
1980 fn build_incoming_call(
1981 room: &proto::Room,
1982 called_user_id: UserId,
1983 ) -> Option<proto::IncomingCall> {
1984 let pending_participant = room
1985 .pending_participants
1986 .iter()
1987 .find(|participant| participant.user_id == called_user_id.to_proto())?;
1988
1989 Some(proto::IncomingCall {
1990 room_id: room.id,
1991 calling_user_id: pending_participant.calling_user_id,
1992 participant_user_ids: room
1993 .participants
1994 .iter()
1995 .map(|participant| participant.user_id)
1996 .collect(),
1997 initial_project: room.participants.iter().find_map(|participant| {
1998 let initial_project_id = pending_participant.initial_project_id?;
1999 participant
2000 .projects
2001 .iter()
2002 .find(|project| project.id == initial_project_id)
2003 .cloned()
2004 }),
2005 })
2006 }
2007
2008 async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
2009 let db_room = room::Entity::find_by_id(room_id)
2010 .one(tx)
2011 .await?
2012 .ok_or_else(|| anyhow!("could not find room"))?;
2013
2014 let mut db_participants = db_room
2015 .find_related(room_participant::Entity)
2016 .stream(tx)
2017 .await?;
2018 let mut participants = HashMap::default();
2019 let mut pending_participants = Vec::new();
2020 while let Some(db_participant) = db_participants.next().await {
2021 let db_participant = db_participant?;
2022 if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
2023 .answering_connection_id
2024 .zip(db_participant.answering_connection_server_id)
2025 {
2026 let location = match (
2027 db_participant.location_kind,
2028 db_participant.location_project_id,
2029 ) {
2030 (Some(0), Some(project_id)) => {
2031 Some(proto::participant_location::Variant::SharedProject(
2032 proto::participant_location::SharedProject {
2033 id: project_id.to_proto(),
2034 },
2035 ))
2036 }
2037 (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
2038 Default::default(),
2039 )),
2040 _ => Some(proto::participant_location::Variant::External(
2041 Default::default(),
2042 )),
2043 };
2044
2045 let answering_connection = ConnectionId {
2046 owner_id: answering_connection_server_id.0 as u32,
2047 id: answering_connection_id as u32,
2048 };
2049 participants.insert(
2050 answering_connection,
2051 proto::Participant {
2052 user_id: db_participant.user_id.to_proto(),
2053 peer_id: Some(answering_connection.into()),
2054 projects: Default::default(),
2055 location: Some(proto::ParticipantLocation { variant: location }),
2056 },
2057 );
2058 } else {
2059 pending_participants.push(proto::PendingParticipant {
2060 user_id: db_participant.user_id.to_proto(),
2061 calling_user_id: db_participant.calling_user_id.to_proto(),
2062 initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
2063 });
2064 }
2065 }
2066 drop(db_participants);
2067
2068 let mut db_projects = db_room
2069 .find_related(project::Entity)
2070 .find_with_related(worktree::Entity)
2071 .stream(tx)
2072 .await?;
2073
2074 while let Some(row) = db_projects.next().await {
2075 let (db_project, db_worktree) = row?;
2076 let host_connection = db_project.host_connection()?;
2077 if let Some(participant) = participants.get_mut(&host_connection) {
2078 let project = if let Some(project) = participant
2079 .projects
2080 .iter_mut()
2081 .find(|project| project.id == db_project.id.to_proto())
2082 {
2083 project
2084 } else {
2085 participant.projects.push(proto::ParticipantProject {
2086 id: db_project.id.to_proto(),
2087 worktree_root_names: Default::default(),
2088 });
2089 participant.projects.last_mut().unwrap()
2090 };
2091
2092 if let Some(db_worktree) = db_worktree {
2093 if db_worktree.visible {
2094 project.worktree_root_names.push(db_worktree.root_name);
2095 }
2096 }
2097 }
2098 }
2099 drop(db_projects);
2100
2101 let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
2102 let mut followers = Vec::new();
2103 while let Some(db_follower) = db_followers.next().await {
2104 let db_follower = db_follower?;
2105 followers.push(proto::Follower {
2106 leader_id: Some(db_follower.leader_connection().into()),
2107 follower_id: Some(db_follower.follower_connection().into()),
2108 project_id: db_follower.project_id.to_proto(),
2109 });
2110 }
2111
2112 Ok(proto::Room {
2113 id: db_room.id.to_proto(),
2114 live_kit_room: db_room.live_kit_room,
2115 participants: participants.into_values().collect(),
2116 pending_participants,
2117 followers,
2118 })
2119 }
2120
2121 // projects
2122
2123 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
2124 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2125 enum QueryAs {
2126 Count,
2127 }
2128
2129 self.transaction(|tx| async move {
2130 Ok(project::Entity::find()
2131 .select_only()
2132 .column_as(project::Column::Id.count(), QueryAs::Count)
2133 .inner_join(user::Entity)
2134 .filter(user::Column::Admin.eq(false))
2135 .into_values::<_, QueryAs>()
2136 .one(&*tx)
2137 .await?
2138 .unwrap_or(0i64) as usize)
2139 })
2140 .await
2141 }
2142
2143 pub async fn share_project(
2144 &self,
2145 room_id: RoomId,
2146 connection: ConnectionId,
2147 worktrees: &[proto::WorktreeMetadata],
2148 ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
2149 self.room_transaction(room_id, |tx| async move {
2150 let participant = room_participant::Entity::find()
2151 .filter(
2152 Condition::all()
2153 .add(
2154 room_participant::Column::AnsweringConnectionId
2155 .eq(connection.id as i32),
2156 )
2157 .add(
2158 room_participant::Column::AnsweringConnectionServerId
2159 .eq(connection.owner_id as i32),
2160 ),
2161 )
2162 .one(&*tx)
2163 .await?
2164 .ok_or_else(|| anyhow!("could not find participant"))?;
2165 if participant.room_id != room_id {
2166 return Err(anyhow!("shared project on unexpected room"))?;
2167 }
2168
2169 let project = project::ActiveModel {
2170 room_id: ActiveValue::set(participant.room_id),
2171 host_user_id: ActiveValue::set(participant.user_id),
2172 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
2173 host_connection_server_id: ActiveValue::set(Some(ServerId(
2174 connection.owner_id as i32,
2175 ))),
2176 ..Default::default()
2177 }
2178 .insert(&*tx)
2179 .await?;
2180
2181 if !worktrees.is_empty() {
2182 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2183 worktree::ActiveModel {
2184 id: ActiveValue::set(worktree.id as i64),
2185 project_id: ActiveValue::set(project.id),
2186 abs_path: ActiveValue::set(worktree.abs_path.clone()),
2187 root_name: ActiveValue::set(worktree.root_name.clone()),
2188 visible: ActiveValue::set(worktree.visible),
2189 scan_id: ActiveValue::set(0),
2190 completed_scan_id: ActiveValue::set(0),
2191 }
2192 }))
2193 .exec(&*tx)
2194 .await?;
2195 }
2196
2197 project_collaborator::ActiveModel {
2198 project_id: ActiveValue::set(project.id),
2199 connection_id: ActiveValue::set(connection.id as i32),
2200 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2201 user_id: ActiveValue::set(participant.user_id),
2202 replica_id: ActiveValue::set(ReplicaId(0)),
2203 is_host: ActiveValue::set(true),
2204 ..Default::default()
2205 }
2206 .insert(&*tx)
2207 .await?;
2208
2209 let room = self.get_room(room_id, &tx).await?;
2210 Ok((project.id, room))
2211 })
2212 .await
2213 }
2214
2215 pub async fn unshare_project(
2216 &self,
2217 project_id: ProjectId,
2218 connection: ConnectionId,
2219 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2220 let room_id = self.room_id_for_project(project_id).await?;
2221 self.room_transaction(room_id, |tx| async move {
2222 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2223
2224 let project = project::Entity::find_by_id(project_id)
2225 .one(&*tx)
2226 .await?
2227 .ok_or_else(|| anyhow!("project not found"))?;
2228 if project.host_connection()? == connection {
2229 project::Entity::delete(project.into_active_model())
2230 .exec(&*tx)
2231 .await?;
2232 let room = self.get_room(room_id, &tx).await?;
2233 Ok((room, guest_connection_ids))
2234 } else {
2235 Err(anyhow!("cannot unshare a project hosted by another user"))?
2236 }
2237 })
2238 .await
2239 }
2240
2241 pub async fn update_project(
2242 &self,
2243 project_id: ProjectId,
2244 connection: ConnectionId,
2245 worktrees: &[proto::WorktreeMetadata],
2246 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2247 let room_id = self.room_id_for_project(project_id).await?;
2248 self.room_transaction(room_id, |tx| async move {
2249 let project = project::Entity::find_by_id(project_id)
2250 .filter(
2251 Condition::all()
2252 .add(project::Column::HostConnectionId.eq(connection.id as i32))
2253 .add(
2254 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2255 ),
2256 )
2257 .one(&*tx)
2258 .await?
2259 .ok_or_else(|| anyhow!("no such project"))?;
2260
2261 self.update_project_worktrees(project.id, worktrees, &tx)
2262 .await?;
2263
2264 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2265 let room = self.get_room(project.room_id, &tx).await?;
2266 Ok((room, guest_connection_ids))
2267 })
2268 .await
2269 }
2270
2271 async fn update_project_worktrees(
2272 &self,
2273 project_id: ProjectId,
2274 worktrees: &[proto::WorktreeMetadata],
2275 tx: &DatabaseTransaction,
2276 ) -> Result<()> {
2277 if !worktrees.is_empty() {
2278 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2279 id: ActiveValue::set(worktree.id as i64),
2280 project_id: ActiveValue::set(project_id),
2281 abs_path: ActiveValue::set(worktree.abs_path.clone()),
2282 root_name: ActiveValue::set(worktree.root_name.clone()),
2283 visible: ActiveValue::set(worktree.visible),
2284 scan_id: ActiveValue::set(0),
2285 completed_scan_id: ActiveValue::set(0),
2286 }))
2287 .on_conflict(
2288 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2289 .update_column(worktree::Column::RootName)
2290 .to_owned(),
2291 )
2292 .exec(&*tx)
2293 .await?;
2294 }
2295
2296 worktree::Entity::delete_many()
2297 .filter(worktree::Column::ProjectId.eq(project_id).and(
2298 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2299 ))
2300 .exec(&*tx)
2301 .await?;
2302
2303 Ok(())
2304 }
2305
2306 pub async fn update_worktree(
2307 &self,
2308 update: &proto::UpdateWorktree,
2309 connection: ConnectionId,
2310 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2311 let project_id = ProjectId::from_proto(update.project_id);
2312 let worktree_id = update.worktree_id as i64;
2313 let room_id = self.room_id_for_project(project_id).await?;
2314 self.room_transaction(room_id, |tx| async move {
2315 // Ensure the update comes from the host.
2316 let _project = project::Entity::find_by_id(project_id)
2317 .filter(
2318 Condition::all()
2319 .add(project::Column::HostConnectionId.eq(connection.id as i32))
2320 .add(
2321 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2322 ),
2323 )
2324 .one(&*tx)
2325 .await?
2326 .ok_or_else(|| anyhow!("no such project"))?;
2327
2328 // Update metadata.
2329 worktree::Entity::update(worktree::ActiveModel {
2330 id: ActiveValue::set(worktree_id),
2331 project_id: ActiveValue::set(project_id),
2332 root_name: ActiveValue::set(update.root_name.clone()),
2333 scan_id: ActiveValue::set(update.scan_id as i64),
2334 completed_scan_id: if update.is_last_update {
2335 ActiveValue::set(update.scan_id as i64)
2336 } else {
2337 ActiveValue::default()
2338 },
2339 abs_path: ActiveValue::set(update.abs_path.clone()),
2340 ..Default::default()
2341 })
2342 .exec(&*tx)
2343 .await?;
2344
2345 if !update.updated_entries.is_empty() {
2346 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2347 let mtime = entry.mtime.clone().unwrap_or_default();
2348 worktree_entry::ActiveModel {
2349 project_id: ActiveValue::set(project_id),
2350 worktree_id: ActiveValue::set(worktree_id),
2351 id: ActiveValue::set(entry.id as i64),
2352 is_dir: ActiveValue::set(entry.is_dir),
2353 path: ActiveValue::set(entry.path.clone()),
2354 inode: ActiveValue::set(entry.inode as i64),
2355 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2356 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2357 is_symlink: ActiveValue::set(entry.is_symlink),
2358 is_ignored: ActiveValue::set(entry.is_ignored),
2359 is_external: ActiveValue::set(entry.is_external),
2360 git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
2361 is_deleted: ActiveValue::set(false),
2362 scan_id: ActiveValue::set(update.scan_id as i64),
2363 }
2364 }))
2365 .on_conflict(
2366 OnConflict::columns([
2367 worktree_entry::Column::ProjectId,
2368 worktree_entry::Column::WorktreeId,
2369 worktree_entry::Column::Id,
2370 ])
2371 .update_columns([
2372 worktree_entry::Column::IsDir,
2373 worktree_entry::Column::Path,
2374 worktree_entry::Column::Inode,
2375 worktree_entry::Column::MtimeSeconds,
2376 worktree_entry::Column::MtimeNanos,
2377 worktree_entry::Column::IsSymlink,
2378 worktree_entry::Column::IsIgnored,
2379 worktree_entry::Column::GitStatus,
2380 worktree_entry::Column::ScanId,
2381 ])
2382 .to_owned(),
2383 )
2384 .exec(&*tx)
2385 .await?;
2386 }
2387
2388 if !update.removed_entries.is_empty() {
2389 worktree_entry::Entity::update_many()
2390 .filter(
2391 worktree_entry::Column::ProjectId
2392 .eq(project_id)
2393 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2394 .and(
2395 worktree_entry::Column::Id
2396 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2397 ),
2398 )
2399 .set(worktree_entry::ActiveModel {
2400 is_deleted: ActiveValue::Set(true),
2401 scan_id: ActiveValue::Set(update.scan_id as i64),
2402 ..Default::default()
2403 })
2404 .exec(&*tx)
2405 .await?;
2406 }
2407
2408 if !update.updated_repositories.is_empty() {
2409 worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
2410 |repository| worktree_repository::ActiveModel {
2411 project_id: ActiveValue::set(project_id),
2412 worktree_id: ActiveValue::set(worktree_id),
2413 work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
2414 scan_id: ActiveValue::set(update.scan_id as i64),
2415 branch: ActiveValue::set(repository.branch.clone()),
2416 is_deleted: ActiveValue::set(false),
2417 },
2418 ))
2419 .on_conflict(
2420 OnConflict::columns([
2421 worktree_repository::Column::ProjectId,
2422 worktree_repository::Column::WorktreeId,
2423 worktree_repository::Column::WorkDirectoryId,
2424 ])
2425 .update_columns([
2426 worktree_repository::Column::ScanId,
2427 worktree_repository::Column::Branch,
2428 ])
2429 .to_owned(),
2430 )
2431 .exec(&*tx)
2432 .await?;
2433 }
2434
2435 if !update.removed_repositories.is_empty() {
2436 worktree_repository::Entity::update_many()
2437 .filter(
2438 worktree_repository::Column::ProjectId
2439 .eq(project_id)
2440 .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
2441 .and(
2442 worktree_repository::Column::WorkDirectoryId
2443 .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
2444 ),
2445 )
2446 .set(worktree_repository::ActiveModel {
2447 is_deleted: ActiveValue::Set(true),
2448 scan_id: ActiveValue::Set(update.scan_id as i64),
2449 ..Default::default()
2450 })
2451 .exec(&*tx)
2452 .await?;
2453 }
2454
2455 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2456 Ok(connection_ids)
2457 })
2458 .await
2459 }
2460
2461 pub async fn update_diagnostic_summary(
2462 &self,
2463 update: &proto::UpdateDiagnosticSummary,
2464 connection: ConnectionId,
2465 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2466 let project_id = ProjectId::from_proto(update.project_id);
2467 let worktree_id = update.worktree_id as i64;
2468 let room_id = self.room_id_for_project(project_id).await?;
2469 self.room_transaction(room_id, |tx| async move {
2470 let summary = update
2471 .summary
2472 .as_ref()
2473 .ok_or_else(|| anyhow!("invalid summary"))?;
2474
2475 // Ensure the update comes from the host.
2476 let project = project::Entity::find_by_id(project_id)
2477 .one(&*tx)
2478 .await?
2479 .ok_or_else(|| anyhow!("no such project"))?;
2480 if project.host_connection()? != connection {
2481 return Err(anyhow!("can't update a project hosted by someone else"))?;
2482 }
2483
2484 // Update summary.
2485 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2486 project_id: ActiveValue::set(project_id),
2487 worktree_id: ActiveValue::set(worktree_id),
2488 path: ActiveValue::set(summary.path.clone()),
2489 language_server_id: ActiveValue::set(summary.language_server_id as i64),
2490 error_count: ActiveValue::set(summary.error_count as i32),
2491 warning_count: ActiveValue::set(summary.warning_count as i32),
2492 ..Default::default()
2493 })
2494 .on_conflict(
2495 OnConflict::columns([
2496 worktree_diagnostic_summary::Column::ProjectId,
2497 worktree_diagnostic_summary::Column::WorktreeId,
2498 worktree_diagnostic_summary::Column::Path,
2499 ])
2500 .update_columns([
2501 worktree_diagnostic_summary::Column::LanguageServerId,
2502 worktree_diagnostic_summary::Column::ErrorCount,
2503 worktree_diagnostic_summary::Column::WarningCount,
2504 ])
2505 .to_owned(),
2506 )
2507 .exec(&*tx)
2508 .await?;
2509
2510 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2511 Ok(connection_ids)
2512 })
2513 .await
2514 }
2515
2516 pub async fn start_language_server(
2517 &self,
2518 update: &proto::StartLanguageServer,
2519 connection: ConnectionId,
2520 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2521 let project_id = ProjectId::from_proto(update.project_id);
2522 let room_id = self.room_id_for_project(project_id).await?;
2523 self.room_transaction(room_id, |tx| async move {
2524 let server = update
2525 .server
2526 .as_ref()
2527 .ok_or_else(|| anyhow!("invalid language server"))?;
2528
2529 // Ensure the update comes from the host.
2530 let project = project::Entity::find_by_id(project_id)
2531 .one(&*tx)
2532 .await?
2533 .ok_or_else(|| anyhow!("no such project"))?;
2534 if project.host_connection()? != connection {
2535 return Err(anyhow!("can't update a project hosted by someone else"))?;
2536 }
2537
2538 // Add the newly-started language server.
2539 language_server::Entity::insert(language_server::ActiveModel {
2540 project_id: ActiveValue::set(project_id),
2541 id: ActiveValue::set(server.id as i64),
2542 name: ActiveValue::set(server.name.clone()),
2543 ..Default::default()
2544 })
2545 .on_conflict(
2546 OnConflict::columns([
2547 language_server::Column::ProjectId,
2548 language_server::Column::Id,
2549 ])
2550 .update_column(language_server::Column::Name)
2551 .to_owned(),
2552 )
2553 .exec(&*tx)
2554 .await?;
2555
2556 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2557 Ok(connection_ids)
2558 })
2559 .await
2560 }
2561
2562 pub async fn update_worktree_settings(
2563 &self,
2564 update: &proto::UpdateWorktreeSettings,
2565 connection: ConnectionId,
2566 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2567 let project_id = ProjectId::from_proto(update.project_id);
2568 let room_id = self.room_id_for_project(project_id).await?;
2569 self.room_transaction(room_id, |tx| async move {
2570 // Ensure the update comes from the host.
2571 let project = project::Entity::find_by_id(project_id)
2572 .one(&*tx)
2573 .await?
2574 .ok_or_else(|| anyhow!("no such project"))?;
2575 if project.host_connection()? != connection {
2576 return Err(anyhow!("can't update a project hosted by someone else"))?;
2577 }
2578
2579 if let Some(content) = &update.content {
2580 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
2581 project_id: ActiveValue::Set(project_id),
2582 worktree_id: ActiveValue::Set(update.worktree_id as i64),
2583 path: ActiveValue::Set(update.path.clone()),
2584 content: ActiveValue::Set(content.clone()),
2585 })
2586 .on_conflict(
2587 OnConflict::columns([
2588 worktree_settings_file::Column::ProjectId,
2589 worktree_settings_file::Column::WorktreeId,
2590 worktree_settings_file::Column::Path,
2591 ])
2592 .update_column(worktree_settings_file::Column::Content)
2593 .to_owned(),
2594 )
2595 .exec(&*tx)
2596 .await?;
2597 } else {
2598 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
2599 project_id: ActiveValue::Set(project_id),
2600 worktree_id: ActiveValue::Set(update.worktree_id as i64),
2601 path: ActiveValue::Set(update.path.clone()),
2602 ..Default::default()
2603 })
2604 .exec(&*tx)
2605 .await?;
2606 }
2607
2608 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2609 Ok(connection_ids)
2610 })
2611 .await
2612 }
2613
2614 pub async fn join_project(
2615 &self,
2616 project_id: ProjectId,
2617 connection: ConnectionId,
2618 ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2619 let room_id = self.room_id_for_project(project_id).await?;
2620 self.room_transaction(room_id, |tx| async move {
2621 let participant = room_participant::Entity::find()
2622 .filter(
2623 Condition::all()
2624 .add(
2625 room_participant::Column::AnsweringConnectionId
2626 .eq(connection.id as i32),
2627 )
2628 .add(
2629 room_participant::Column::AnsweringConnectionServerId
2630 .eq(connection.owner_id as i32),
2631 ),
2632 )
2633 .one(&*tx)
2634 .await?
2635 .ok_or_else(|| anyhow!("must join a room first"))?;
2636
2637 let project = project::Entity::find_by_id(project_id)
2638 .one(&*tx)
2639 .await?
2640 .ok_or_else(|| anyhow!("no such project"))?;
2641 if project.room_id != participant.room_id {
2642 return Err(anyhow!("no such project"))?;
2643 }
2644
2645 let mut collaborators = project
2646 .find_related(project_collaborator::Entity)
2647 .all(&*tx)
2648 .await?;
2649 let replica_ids = collaborators
2650 .iter()
2651 .map(|c| c.replica_id)
2652 .collect::<HashSet<_>>();
2653 let mut replica_id = ReplicaId(1);
2654 while replica_ids.contains(&replica_id) {
2655 replica_id.0 += 1;
2656 }
2657 let new_collaborator = project_collaborator::ActiveModel {
2658 project_id: ActiveValue::set(project_id),
2659 connection_id: ActiveValue::set(connection.id as i32),
2660 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2661 user_id: ActiveValue::set(participant.user_id),
2662 replica_id: ActiveValue::set(replica_id),
2663 is_host: ActiveValue::set(false),
2664 ..Default::default()
2665 }
2666 .insert(&*tx)
2667 .await?;
2668 collaborators.push(new_collaborator);
2669
2670 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2671 let mut worktrees = db_worktrees
2672 .into_iter()
2673 .map(|db_worktree| {
2674 (
2675 db_worktree.id as u64,
2676 Worktree {
2677 id: db_worktree.id as u64,
2678 abs_path: db_worktree.abs_path,
2679 root_name: db_worktree.root_name,
2680 visible: db_worktree.visible,
2681 entries: Default::default(),
2682 repository_entries: Default::default(),
2683 diagnostic_summaries: Default::default(),
2684 settings_files: Default::default(),
2685 scan_id: db_worktree.scan_id as u64,
2686 completed_scan_id: db_worktree.completed_scan_id as u64,
2687 },
2688 )
2689 })
2690 .collect::<BTreeMap<_, _>>();
2691
2692 // Populate worktree entries.
2693 {
2694 let mut db_entries = worktree_entry::Entity::find()
2695 .filter(
2696 Condition::all()
2697 .add(worktree_entry::Column::ProjectId.eq(project_id))
2698 .add(worktree_entry::Column::IsDeleted.eq(false)),
2699 )
2700 .stream(&*tx)
2701 .await?;
2702 while let Some(db_entry) = db_entries.next().await {
2703 let db_entry = db_entry?;
2704 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2705 worktree.entries.push(proto::Entry {
2706 id: db_entry.id as u64,
2707 is_dir: db_entry.is_dir,
2708 path: db_entry.path,
2709 inode: db_entry.inode as u64,
2710 mtime: Some(proto::Timestamp {
2711 seconds: db_entry.mtime_seconds as u64,
2712 nanos: db_entry.mtime_nanos as u32,
2713 }),
2714 is_symlink: db_entry.is_symlink,
2715 is_ignored: db_entry.is_ignored,
2716 is_external: db_entry.is_external,
2717 git_status: db_entry.git_status.map(|status| status as i32),
2718 });
2719 }
2720 }
2721 }
2722
2723 // Populate repository entries.
2724 {
2725 let mut db_repository_entries = worktree_repository::Entity::find()
2726 .filter(
2727 Condition::all()
2728 .add(worktree_repository::Column::ProjectId.eq(project_id))
2729 .add(worktree_repository::Column::IsDeleted.eq(false)),
2730 )
2731 .stream(&*tx)
2732 .await?;
2733 while let Some(db_repository_entry) = db_repository_entries.next().await {
2734 let db_repository_entry = db_repository_entry?;
2735 if let Some(worktree) =
2736 worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
2737 {
2738 worktree.repository_entries.insert(
2739 db_repository_entry.work_directory_id as u64,
2740 proto::RepositoryEntry {
2741 work_directory_id: db_repository_entry.work_directory_id as u64,
2742 branch: db_repository_entry.branch,
2743 },
2744 );
2745 }
2746 }
2747 }
2748
2749 // Populate worktree diagnostic summaries.
2750 {
2751 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2752 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2753 .stream(&*tx)
2754 .await?;
2755 while let Some(db_summary) = db_summaries.next().await {
2756 let db_summary = db_summary?;
2757 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2758 worktree
2759 .diagnostic_summaries
2760 .push(proto::DiagnosticSummary {
2761 path: db_summary.path,
2762 language_server_id: db_summary.language_server_id as u64,
2763 error_count: db_summary.error_count as u32,
2764 warning_count: db_summary.warning_count as u32,
2765 });
2766 }
2767 }
2768 }
2769
2770 // Populate worktree settings files
2771 {
2772 let mut db_settings_files = worktree_settings_file::Entity::find()
2773 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
2774 .stream(&*tx)
2775 .await?;
2776 while let Some(db_settings_file) = db_settings_files.next().await {
2777 let db_settings_file = db_settings_file?;
2778 if let Some(worktree) =
2779 worktrees.get_mut(&(db_settings_file.worktree_id as u64))
2780 {
2781 worktree.settings_files.push(WorktreeSettingsFile {
2782 path: db_settings_file.path,
2783 content: db_settings_file.content,
2784 });
2785 }
2786 }
2787 }
2788
2789 // Populate language servers.
2790 let language_servers = project
2791 .find_related(language_server::Entity)
2792 .all(&*tx)
2793 .await?;
2794
2795 let project = Project {
2796 collaborators: collaborators
2797 .into_iter()
2798 .map(|collaborator| ProjectCollaborator {
2799 connection_id: collaborator.connection(),
2800 user_id: collaborator.user_id,
2801 replica_id: collaborator.replica_id,
2802 is_host: collaborator.is_host,
2803 })
2804 .collect(),
2805 worktrees,
2806 language_servers: language_servers
2807 .into_iter()
2808 .map(|language_server| proto::LanguageServer {
2809 id: language_server.id as u64,
2810 name: language_server.name,
2811 })
2812 .collect(),
2813 };
2814 Ok((project, replica_id as ReplicaId))
2815 })
2816 .await
2817 }
2818
2819 pub async fn leave_project(
2820 &self,
2821 project_id: ProjectId,
2822 connection: ConnectionId,
2823 ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
2824 let room_id = self.room_id_for_project(project_id).await?;
2825 self.room_transaction(room_id, |tx| async move {
2826 let result = project_collaborator::Entity::delete_many()
2827 .filter(
2828 Condition::all()
2829 .add(project_collaborator::Column::ProjectId.eq(project_id))
2830 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2831 .add(
2832 project_collaborator::Column::ConnectionServerId
2833 .eq(connection.owner_id as i32),
2834 ),
2835 )
2836 .exec(&*tx)
2837 .await?;
2838 if result.rows_affected == 0 {
2839 Err(anyhow!("not a collaborator on this project"))?;
2840 }
2841
2842 let project = project::Entity::find_by_id(project_id)
2843 .one(&*tx)
2844 .await?
2845 .ok_or_else(|| anyhow!("no such project"))?;
2846 let collaborators = project
2847 .find_related(project_collaborator::Entity)
2848 .all(&*tx)
2849 .await?;
2850 let connection_ids = collaborators
2851 .into_iter()
2852 .map(|collaborator| collaborator.connection())
2853 .collect();
2854
2855 follower::Entity::delete_many()
2856 .filter(
2857 Condition::any()
2858 .add(
2859 Condition::all()
2860 .add(follower::Column::ProjectId.eq(project_id))
2861 .add(
2862 follower::Column::LeaderConnectionServerId
2863 .eq(connection.owner_id),
2864 )
2865 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
2866 )
2867 .add(
2868 Condition::all()
2869 .add(follower::Column::ProjectId.eq(project_id))
2870 .add(
2871 follower::Column::FollowerConnectionServerId
2872 .eq(connection.owner_id),
2873 )
2874 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
2875 ),
2876 )
2877 .exec(&*tx)
2878 .await?;
2879
2880 let room = self.get_room(project.room_id, &tx).await?;
2881 let left_project = LeftProject {
2882 id: project_id,
2883 host_user_id: project.host_user_id,
2884 host_connection_id: project.host_connection()?,
2885 connection_ids,
2886 };
2887 Ok((room, left_project))
2888 })
2889 .await
2890 }
2891
2892 pub async fn project_collaborators(
2893 &self,
2894 project_id: ProjectId,
2895 connection_id: ConnectionId,
2896 ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
2897 let room_id = self.room_id_for_project(project_id).await?;
2898 self.room_transaction(room_id, |tx| async move {
2899 let collaborators = project_collaborator::Entity::find()
2900 .filter(project_collaborator::Column::ProjectId.eq(project_id))
2901 .all(&*tx)
2902 .await?
2903 .into_iter()
2904 .map(|collaborator| ProjectCollaborator {
2905 connection_id: collaborator.connection(),
2906 user_id: collaborator.user_id,
2907 replica_id: collaborator.replica_id,
2908 is_host: collaborator.is_host,
2909 })
2910 .collect::<Vec<_>>();
2911
2912 if collaborators
2913 .iter()
2914 .any(|collaborator| collaborator.connection_id == connection_id)
2915 {
2916 Ok(collaborators)
2917 } else {
2918 Err(anyhow!("no such project"))?
2919 }
2920 })
2921 .await
2922 }
2923
2924 pub async fn project_connection_ids(
2925 &self,
2926 project_id: ProjectId,
2927 connection_id: ConnectionId,
2928 ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
2929 let room_id = self.room_id_for_project(project_id).await?;
2930 self.room_transaction(room_id, |tx| async move {
2931 let mut collaborators = project_collaborator::Entity::find()
2932 .filter(project_collaborator::Column::ProjectId.eq(project_id))
2933 .stream(&*tx)
2934 .await?;
2935
2936 let mut connection_ids = HashSet::default();
2937 while let Some(collaborator) = collaborators.next().await {
2938 let collaborator = collaborator?;
2939 connection_ids.insert(collaborator.connection());
2940 }
2941
2942 if connection_ids.contains(&connection_id) {
2943 Ok(connection_ids)
2944 } else {
2945 Err(anyhow!("no such project"))?
2946 }
2947 })
2948 .await
2949 }
2950
2951 async fn project_guest_connection_ids(
2952 &self,
2953 project_id: ProjectId,
2954 tx: &DatabaseTransaction,
2955 ) -> Result<Vec<ConnectionId>> {
2956 let mut collaborators = project_collaborator::Entity::find()
2957 .filter(
2958 project_collaborator::Column::ProjectId
2959 .eq(project_id)
2960 .and(project_collaborator::Column::IsHost.eq(false)),
2961 )
2962 .stream(tx)
2963 .await?;
2964
2965 let mut guest_connection_ids = Vec::new();
2966 while let Some(collaborator) = collaborators.next().await {
2967 let collaborator = collaborator?;
2968 guest_connection_ids.push(collaborator.connection());
2969 }
2970 Ok(guest_connection_ids)
2971 }
2972
2973 async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
2974 self.transaction(|tx| async move {
2975 let project = project::Entity::find_by_id(project_id)
2976 .one(&*tx)
2977 .await?
2978 .ok_or_else(|| anyhow!("project {} not found", project_id))?;
2979 Ok(project.room_id)
2980 })
2981 .await
2982 }
2983
2984 // access tokens
2985
2986 pub async fn create_access_token(
2987 &self,
2988 user_id: UserId,
2989 access_token_hash: &str,
2990 max_access_token_count: usize,
2991 ) -> Result<AccessTokenId> {
2992 self.transaction(|tx| async {
2993 let tx = tx;
2994
2995 let token = access_token::ActiveModel {
2996 user_id: ActiveValue::set(user_id),
2997 hash: ActiveValue::set(access_token_hash.into()),
2998 ..Default::default()
2999 }
3000 .insert(&*tx)
3001 .await?;
3002
3003 access_token::Entity::delete_many()
3004 .filter(
3005 access_token::Column::Id.in_subquery(
3006 Query::select()
3007 .column(access_token::Column::Id)
3008 .from(access_token::Entity)
3009 .and_where(access_token::Column::UserId.eq(user_id))
3010 .order_by(access_token::Column::Id, sea_orm::Order::Desc)
3011 .limit(10000)
3012 .offset(max_access_token_count as u64)
3013 .to_owned(),
3014 ),
3015 )
3016 .exec(&*tx)
3017 .await?;
3018 Ok(token.id)
3019 })
3020 .await
3021 }
3022
3023 pub async fn get_access_token(
3024 &self,
3025 access_token_id: AccessTokenId,
3026 ) -> Result<access_token::Model> {
3027 self.transaction(|tx| async move {
3028 Ok(access_token::Entity::find_by_id(access_token_id)
3029 .one(&*tx)
3030 .await?
3031 .ok_or_else(|| anyhow!("no such access token"))?)
3032 })
3033 .await
3034 }
3035
3036 // channels
3037
3038 pub async fn create_root_channel(&self, name: &str) -> Result<ChannelId> {
3039 self.create_channel(name, None).await
3040 }
3041
3042 pub async fn create_channel(&self, name: &str, parent: Option<ChannelId>) -> Result<ChannelId> {
3043 self.transaction(move |tx| async move {
3044 let tx = tx;
3045
3046 let channel = channel::ActiveModel {
3047 name: ActiveValue::Set(name.to_string()),
3048 ..Default::default()
3049 };
3050
3051 let channel = channel.insert(&*tx).await?;
3052
3053 if let Some(parent) = parent {
3054 channel_parent::ActiveModel {
3055 child_id: ActiveValue::Set(channel.id),
3056 parent_id: ActiveValue::Set(parent),
3057 }
3058 .insert(&*tx)
3059 .await?;
3060 }
3061
3062 Ok(channel.id)
3063 })
3064 .await
3065 }
3066
3067 // Property: Members are only
3068 pub async fn add_channel_member(&self, channel_id: ChannelId, user_id: UserId) -> Result<()> {
3069 self.transaction(move |tx| async move {
3070 let tx = tx;
3071
3072 let channel_membership = channel_member::ActiveModel {
3073 channel_id: ActiveValue::Set(channel_id),
3074 user_id: ActiveValue::Set(user_id),
3075 ..Default::default()
3076 };
3077
3078 channel_membership.insert(&*tx).await?;
3079
3080 Ok(())
3081 })
3082 .await
3083 }
3084
3085 pub async fn get_channels(&self, user_id: UserId) -> Result<Vec<Channel>> {
3086 self.transaction(|tx| async move {
3087 let tx = tx;
3088
3089 // This is the SQL statement we want to generate:
3090 let sql = r#"
3091 WITH RECURSIVE channel_tree(child_id, parent_id, depth) AS (
3092 SELECT channel_id as child_id, NULL as parent_id, 0
3093 FROM channel_members
3094 WHERE user_id = ?
3095 UNION ALL
3096 SELECT channel_parents.child_id, channel_parents.parent_id, channel_tree.depth + 1
3097 FROM channel_parents, channel_tree
3098 WHERE channel_parents.parent_id = channel_tree.child_id
3099 )
3100 SELECT channel_tree.child_id as id, channels.name, channel_tree.parent_id
3101 FROM channel_tree
3102 JOIN channels ON channels.id = channel_tree.child_id
3103 ORDER BY channel_tree.depth;
3104 "#;
3105
3106 // let root_channel_ids_query = SelectStatement::new()
3107 // .column(channel_member::Column::ChannelId)
3108 // .expr(Expr::value("NULL"))
3109 // .from(channel_member::Entity.table_ref())
3110 // .and_where(
3111 // Expr::col(channel_member::Column::UserId)
3112 // .eq(Expr::cust_with_values("?", vec![user_id])),
3113 // );
3114
3115 // let build_tree_query = SelectStatement::new()
3116 // .column(channel_parent::Column::ChildId)
3117 // .column(channel_parent::Column::ParentId)
3118 // .expr(Expr::col(Alias::new("channel_tree.depth")).add(1i32))
3119 // .from(Alias::new("channel_tree"))
3120 // .and_where(
3121 // Expr::col(channel_parent::Column::ParentId)
3122 // .equals(Alias::new("channel_tree"), Alias::new("child_id")),
3123 // )
3124 // .to_owned();
3125
3126 // let common_table_expression = CommonTableExpression::new()
3127 // .query(
3128 // root_channel_ids_query
3129 // .union(UnionType::Distinct, build_tree_query)
3130 // .to_owned(),
3131 // )
3132 // .column(Alias::new("child_id"))
3133 // .column(Alias::new("parent_id"))
3134 // .column(Alias::new("depth"))
3135 // .table_name(Alias::new("channel_tree"))
3136 // .to_owned();
3137
3138 // let select = SelectStatement::new()
3139 // .expr_as(
3140 // Expr::col(Alias::new("channel_tree.child_id")),
3141 // Alias::new("id"),
3142 // )
3143 // .column(channel::Column::Name)
3144 // .column(Alias::new("channel_tree.parent_id"))
3145 // .from(Alias::new("channel_tree"))
3146 // .inner_join(
3147 // channel::Entity.table_ref(),
3148 // Expr::eq(
3149 // channel::Column::Id.into_expr(),
3150 // Expr::tbl(Alias::new("channel_tree"), Alias::new("child_id")),
3151 // ),
3152 // )
3153 // .order_by(Alias::new("channel_tree.child_id"), Order::Asc)
3154 // .to_owned();
3155
3156 // let with_clause = WithClause::new()
3157 // .recursive(true)
3158 // .cte(common_table_expression)
3159 // .to_owned();
3160
3161 // let query = select.with(with_clause);
3162
3163 // let query = SelectStatement::new()
3164 // .column(ColumnRef::Asterisk)
3165 // .from_subquery(query, Alias::new("channel_tree")
3166 // .to_owned();
3167
3168 // let stmt = self.pool.get_database_backend().build(&query);
3169
3170 let stmt = Statement::from_sql_and_values(
3171 self.pool.get_database_backend(),
3172 sql,
3173 vec![user_id.into()],
3174 );
3175
3176 Ok(channel_parent::Entity::find()
3177 .from_raw_sql(stmt)
3178 .into_model::<Channel>()
3179 .all(&*tx)
3180 .await?)
3181 })
3182 .await
3183 }
3184
3185 async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
3186 where
3187 F: Send + Fn(TransactionHandle) -> Fut,
3188 Fut: Send + Future<Output = Result<T>>,
3189 {
3190 let body = async {
3191 let mut i = 0;
3192 loop {
3193 let (tx, result) = self.with_transaction(&f).await?;
3194 match result {
3195 Ok(result) => match tx.commit().await.map_err(Into::into) {
3196 Ok(()) => return Ok(result),
3197 Err(error) => {
3198 if !self.retry_on_serialization_error(&error, i).await {
3199 return Err(error);
3200 }
3201 }
3202 },
3203 Err(error) => {
3204 tx.rollback().await?;
3205 if !self.retry_on_serialization_error(&error, i).await {
3206 return Err(error);
3207 }
3208 }
3209 }
3210 i += 1;
3211 }
3212 };
3213
3214 self.run(body).await
3215 }
3216
3217 async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
3218 where
3219 F: Send + Fn(TransactionHandle) -> Fut,
3220 Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
3221 {
3222 let body = async {
3223 let mut i = 0;
3224 loop {
3225 let (tx, result) = self.with_transaction(&f).await?;
3226 match result {
3227 Ok(Some((room_id, data))) => {
3228 let lock = self.rooms.entry(room_id).or_default().clone();
3229 let _guard = lock.lock_owned().await;
3230 match tx.commit().await.map_err(Into::into) {
3231 Ok(()) => {
3232 return Ok(Some(RoomGuard {
3233 data,
3234 _guard,
3235 _not_send: PhantomData,
3236 }));
3237 }
3238 Err(error) => {
3239 if !self.retry_on_serialization_error(&error, i).await {
3240 return Err(error);
3241 }
3242 }
3243 }
3244 }
3245 Ok(None) => match tx.commit().await.map_err(Into::into) {
3246 Ok(()) => return Ok(None),
3247 Err(error) => {
3248 if !self.retry_on_serialization_error(&error, i).await {
3249 return Err(error);
3250 }
3251 }
3252 },
3253 Err(error) => {
3254 tx.rollback().await?;
3255 if !self.retry_on_serialization_error(&error, i).await {
3256 return Err(error);
3257 }
3258 }
3259 }
3260 i += 1;
3261 }
3262 };
3263
3264 self.run(body).await
3265 }
3266
3267 async fn room_transaction<F, Fut, T>(&self, room_id: RoomId, f: F) -> Result<RoomGuard<T>>
3268 where
3269 F: Send + Fn(TransactionHandle) -> Fut,
3270 Fut: Send + Future<Output = Result<T>>,
3271 {
3272 let body = async {
3273 let mut i = 0;
3274 loop {
3275 let lock = self.rooms.entry(room_id).or_default().clone();
3276 let _guard = lock.lock_owned().await;
3277 let (tx, result) = self.with_transaction(&f).await?;
3278 match result {
3279 Ok(data) => match tx.commit().await.map_err(Into::into) {
3280 Ok(()) => {
3281 return Ok(RoomGuard {
3282 data,
3283 _guard,
3284 _not_send: PhantomData,
3285 });
3286 }
3287 Err(error) => {
3288 if !self.retry_on_serialization_error(&error, i).await {
3289 return Err(error);
3290 }
3291 }
3292 },
3293 Err(error) => {
3294 tx.rollback().await?;
3295 if !self.retry_on_serialization_error(&error, i).await {
3296 return Err(error);
3297 }
3298 }
3299 }
3300 i += 1;
3301 }
3302 };
3303
3304 self.run(body).await
3305 }
3306
3307 async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
3308 where
3309 F: Send + Fn(TransactionHandle) -> Fut,
3310 Fut: Send + Future<Output = Result<T>>,
3311 {
3312 let tx = self
3313 .pool
3314 .begin_with_config(Some(IsolationLevel::Serializable), None)
3315 .await?;
3316
3317 let mut tx = Arc::new(Some(tx));
3318 let result = f(TransactionHandle(tx.clone())).await;
3319 let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
3320 return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
3321 };
3322
3323 Ok((tx, result))
3324 }
3325
3326 async fn run<F, T>(&self, future: F) -> Result<T>
3327 where
3328 F: Future<Output = Result<T>>,
3329 {
3330 #[cfg(test)]
3331 {
3332 if let Executor::Deterministic(executor) = &self.executor {
3333 executor.simulate_random_delay().await;
3334 }
3335
3336 self.runtime.as_ref().unwrap().block_on(future)
3337 }
3338
3339 #[cfg(not(test))]
3340 {
3341 future.await
3342 }
3343 }
3344
3345 async fn retry_on_serialization_error(&self, error: &Error, prev_attempt_count: u32) -> bool {
3346 // If the error is due to a failure to serialize concurrent transactions, then retry
3347 // this transaction after a delay. With each subsequent retry, double the delay duration.
3348 // Also vary the delay randomly in order to ensure different database connections retry
3349 // at different times.
3350 if is_serialization_error(error) {
3351 let base_delay = 4_u64 << prev_attempt_count.min(16);
3352 let randomized_delay = base_delay as f32 * self.rng.lock().await.gen_range(0.5..=2.0);
3353 log::info!(
3354 "retrying transaction after serialization error. delay: {} ms.",
3355 randomized_delay
3356 );
3357 self.executor
3358 .sleep(Duration::from_millis(randomized_delay as u64))
3359 .await;
3360 true
3361 } else {
3362 false
3363 }
3364 }
3365}
3366
3367fn is_serialization_error(error: &Error) -> bool {
3368 const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
3369 match error {
3370 Error::Database(
3371 DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
3372 | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
3373 ) if error
3374 .as_database_error()
3375 .and_then(|error| error.code())
3376 .as_deref()
3377 == Some(SERIALIZATION_FAILURE_CODE) =>
3378 {
3379 true
3380 }
3381 _ => false,
3382 }
3383}
3384
3385struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
3386
3387impl Deref for TransactionHandle {
3388 type Target = DatabaseTransaction;
3389
3390 fn deref(&self) -> &Self::Target {
3391 self.0.as_ref().as_ref().unwrap()
3392 }
3393}
3394
3395pub struct RoomGuard<T> {
3396 data: T,
3397 _guard: OwnedMutexGuard<()>,
3398 _not_send: PhantomData<Rc<()>>,
3399}
3400
3401impl<T> Deref for RoomGuard<T> {
3402 type Target = T;
3403
3404 fn deref(&self) -> &T {
3405 &self.data
3406 }
3407}
3408
3409impl<T> DerefMut for RoomGuard<T> {
3410 fn deref_mut(&mut self) -> &mut T {
3411 &mut self.data
3412 }
3413}
3414
3415#[derive(Debug, Serialize, Deserialize)]
3416pub struct NewUserParams {
3417 pub github_login: String,
3418 pub github_user_id: i32,
3419 pub invite_count: i32,
3420}
3421
3422#[derive(Debug)]
3423pub struct NewUserResult {
3424 pub user_id: UserId,
3425 pub metrics_id: String,
3426 pub inviting_user_id: Option<UserId>,
3427 pub signup_device_id: Option<String>,
3428}
3429
3430#[derive(FromQueryResult, Debug, PartialEq)]
3431pub struct Channel {
3432 pub id: ChannelId,
3433 pub name: String,
3434 pub parent_id: Option<ChannelId>,
3435}
3436
3437fn random_invite_code() -> String {
3438 nanoid::nanoid!(16)
3439}
3440
3441fn random_email_confirmation_code() -> String {
3442 nanoid::nanoid!(64)
3443}
3444
3445macro_rules! id_type {
3446 ($name:ident) => {
3447 #[derive(
3448 Clone,
3449 Copy,
3450 Debug,
3451 Default,
3452 PartialEq,
3453 Eq,
3454 PartialOrd,
3455 Ord,
3456 Hash,
3457 Serialize,
3458 Deserialize,
3459 )]
3460 #[serde(transparent)]
3461 pub struct $name(pub i32);
3462
3463 impl $name {
3464 #[allow(unused)]
3465 pub const MAX: Self = Self(i32::MAX);
3466
3467 #[allow(unused)]
3468 pub fn from_proto(value: u64) -> Self {
3469 Self(value as i32)
3470 }
3471
3472 #[allow(unused)]
3473 pub fn to_proto(self) -> u64 {
3474 self.0 as u64
3475 }
3476 }
3477
3478 impl std::fmt::Display for $name {
3479 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
3480 self.0.fmt(f)
3481 }
3482 }
3483
3484 impl From<$name> for sea_query::Value {
3485 fn from(value: $name) -> Self {
3486 sea_query::Value::Int(Some(value.0))
3487 }
3488 }
3489
3490 impl sea_orm::TryGetable for $name {
3491 fn try_get(
3492 res: &sea_orm::QueryResult,
3493 pre: &str,
3494 col: &str,
3495 ) -> Result<Self, sea_orm::TryGetError> {
3496 Ok(Self(i32::try_get(res, pre, col)?))
3497 }
3498 }
3499
3500 impl sea_query::ValueType for $name {
3501 fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
3502 match v {
3503 Value::TinyInt(Some(int)) => {
3504 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3505 }
3506 Value::SmallInt(Some(int)) => {
3507 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3508 }
3509 Value::Int(Some(int)) => {
3510 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3511 }
3512 Value::BigInt(Some(int)) => {
3513 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3514 }
3515 Value::TinyUnsigned(Some(int)) => {
3516 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3517 }
3518 Value::SmallUnsigned(Some(int)) => {
3519 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3520 }
3521 Value::Unsigned(Some(int)) => {
3522 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3523 }
3524 Value::BigUnsigned(Some(int)) => {
3525 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3526 }
3527 _ => Err(sea_query::ValueTypeErr),
3528 }
3529 }
3530
3531 fn type_name() -> String {
3532 stringify!($name).into()
3533 }
3534
3535 fn array_type() -> sea_query::ArrayType {
3536 sea_query::ArrayType::Int
3537 }
3538
3539 fn column_type() -> sea_query::ColumnType {
3540 sea_query::ColumnType::Integer(None)
3541 }
3542 }
3543
3544 impl sea_orm::TryFromU64 for $name {
3545 fn try_from_u64(n: u64) -> Result<Self, DbErr> {
3546 Ok(Self(n.try_into().map_err(|_| {
3547 DbErr::ConvertFromU64(concat!(
3548 "error converting ",
3549 stringify!($name),
3550 " to u64"
3551 ))
3552 })?))
3553 }
3554 }
3555
3556 impl sea_query::Nullable for $name {
3557 fn null() -> Value {
3558 Value::Int(None)
3559 }
3560 }
3561 };
3562}
3563
3564id_type!(AccessTokenId);
3565id_type!(ChannelId);
3566id_type!(ChannelMemberId);
3567id_type!(ContactId);
3568id_type!(FollowerId);
3569id_type!(RoomId);
3570id_type!(RoomParticipantId);
3571id_type!(ProjectId);
3572id_type!(ProjectCollaboratorId);
3573id_type!(ReplicaId);
3574id_type!(ServerId);
3575id_type!(SignupId);
3576id_type!(UserId);
3577
3578pub struct RejoinedRoom {
3579 pub room: proto::Room,
3580 pub rejoined_projects: Vec<RejoinedProject>,
3581 pub reshared_projects: Vec<ResharedProject>,
3582}
3583
3584pub struct ResharedProject {
3585 pub id: ProjectId,
3586 pub old_connection_id: ConnectionId,
3587 pub collaborators: Vec<ProjectCollaborator>,
3588 pub worktrees: Vec<proto::WorktreeMetadata>,
3589}
3590
3591pub struct RejoinedProject {
3592 pub id: ProjectId,
3593 pub old_connection_id: ConnectionId,
3594 pub collaborators: Vec<ProjectCollaborator>,
3595 pub worktrees: Vec<RejoinedWorktree>,
3596 pub language_servers: Vec<proto::LanguageServer>,
3597}
3598
3599#[derive(Debug)]
3600pub struct RejoinedWorktree {
3601 pub id: u64,
3602 pub abs_path: String,
3603 pub root_name: String,
3604 pub visible: bool,
3605 pub updated_entries: Vec<proto::Entry>,
3606 pub removed_entries: Vec<u64>,
3607 pub updated_repositories: Vec<proto::RepositoryEntry>,
3608 pub removed_repositories: Vec<u64>,
3609 pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3610 pub settings_files: Vec<WorktreeSettingsFile>,
3611 pub scan_id: u64,
3612 pub completed_scan_id: u64,
3613}
3614
3615pub struct LeftRoom {
3616 pub room: proto::Room,
3617 pub left_projects: HashMap<ProjectId, LeftProject>,
3618 pub canceled_calls_to_user_ids: Vec<UserId>,
3619}
3620
3621pub struct RefreshedRoom {
3622 pub room: proto::Room,
3623 pub stale_participant_user_ids: Vec<UserId>,
3624 pub canceled_calls_to_user_ids: Vec<UserId>,
3625}
3626
3627pub struct Project {
3628 pub collaborators: Vec<ProjectCollaborator>,
3629 pub worktrees: BTreeMap<u64, Worktree>,
3630 pub language_servers: Vec<proto::LanguageServer>,
3631}
3632
3633pub struct ProjectCollaborator {
3634 pub connection_id: ConnectionId,
3635 pub user_id: UserId,
3636 pub replica_id: ReplicaId,
3637 pub is_host: bool,
3638}
3639
3640impl ProjectCollaborator {
3641 pub fn to_proto(&self) -> proto::Collaborator {
3642 proto::Collaborator {
3643 peer_id: Some(self.connection_id.into()),
3644 replica_id: self.replica_id.0 as u32,
3645 user_id: self.user_id.to_proto(),
3646 }
3647 }
3648}
3649
3650#[derive(Debug)]
3651pub struct LeftProject {
3652 pub id: ProjectId,
3653 pub host_user_id: UserId,
3654 pub host_connection_id: ConnectionId,
3655 pub connection_ids: Vec<ConnectionId>,
3656}
3657
3658pub struct Worktree {
3659 pub id: u64,
3660 pub abs_path: String,
3661 pub root_name: String,
3662 pub visible: bool,
3663 pub entries: Vec<proto::Entry>,
3664 pub repository_entries: BTreeMap<u64, proto::RepositoryEntry>,
3665 pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3666 pub settings_files: Vec<WorktreeSettingsFile>,
3667 pub scan_id: u64,
3668 pub completed_scan_id: u64,
3669}
3670
3671#[derive(Debug)]
3672pub struct WorktreeSettingsFile {
3673 pub path: String,
3674 pub content: String,
3675}
3676
3677#[cfg(test)]
3678pub use test::*;
3679
3680#[cfg(test)]
3681mod test {
3682 use super::*;
3683 use gpui::executor::Background;
3684 use parking_lot::Mutex;
3685 use sea_orm::ConnectionTrait;
3686 use sqlx::migrate::MigrateDatabase;
3687 use std::sync::Arc;
3688
3689 pub struct TestDb {
3690 pub db: Option<Arc<Database>>,
3691 pub connection: Option<sqlx::AnyConnection>,
3692 }
3693
3694 impl TestDb {
3695 pub fn sqlite(background: Arc<Background>) -> Self {
3696 let url = format!("sqlite::memory:");
3697 let runtime = tokio::runtime::Builder::new_current_thread()
3698 .enable_io()
3699 .enable_time()
3700 .build()
3701 .unwrap();
3702
3703 let mut db = runtime.block_on(async {
3704 let mut options = ConnectOptions::new(url);
3705 options.max_connections(5);
3706 let db = Database::new(options, Executor::Deterministic(background))
3707 .await
3708 .unwrap();
3709 let sql = include_str!(concat!(
3710 env!("CARGO_MANIFEST_DIR"),
3711 "/migrations.sqlite/20221109000000_test_schema.sql"
3712 ));
3713 db.pool
3714 .execute(sea_orm::Statement::from_string(
3715 db.pool.get_database_backend(),
3716 sql.into(),
3717 ))
3718 .await
3719 .unwrap();
3720 db
3721 });
3722
3723 db.runtime = Some(runtime);
3724
3725 Self {
3726 db: Some(Arc::new(db)),
3727 connection: None,
3728 }
3729 }
3730
3731 pub fn postgres(background: Arc<Background>) -> Self {
3732 static LOCK: Mutex<()> = Mutex::new(());
3733
3734 let _guard = LOCK.lock();
3735 let mut rng = StdRng::from_entropy();
3736 let url = format!(
3737 "postgres://postgres@localhost/zed-test-{}",
3738 rng.gen::<u128>()
3739 );
3740 let runtime = tokio::runtime::Builder::new_current_thread()
3741 .enable_io()
3742 .enable_time()
3743 .build()
3744 .unwrap();
3745
3746 let mut db = runtime.block_on(async {
3747 sqlx::Postgres::create_database(&url)
3748 .await
3749 .expect("failed to create test db");
3750 let mut options = ConnectOptions::new(url);
3751 options
3752 .max_connections(5)
3753 .idle_timeout(Duration::from_secs(0));
3754 let db = Database::new(options, Executor::Deterministic(background))
3755 .await
3756 .unwrap();
3757 let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
3758 db.migrate(Path::new(migrations_path), false).await.unwrap();
3759 db
3760 });
3761
3762 db.runtime = Some(runtime);
3763
3764 Self {
3765 db: Some(Arc::new(db)),
3766 connection: None,
3767 }
3768 }
3769
3770 pub fn db(&self) -> &Arc<Database> {
3771 self.db.as_ref().unwrap()
3772 }
3773 }
3774
3775 impl Drop for TestDb {
3776 fn drop(&mut self) {
3777 let db = self.db.take().unwrap();
3778 if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
3779 db.runtime.as_ref().unwrap().block_on(async {
3780 use util::ResultExt;
3781 let query = "
3782 SELECT pg_terminate_backend(pg_stat_activity.pid)
3783 FROM pg_stat_activity
3784 WHERE
3785 pg_stat_activity.datname = current_database() AND
3786 pid <> pg_backend_pid();
3787 ";
3788 db.pool
3789 .execute(sea_orm::Statement::from_string(
3790 db.pool.get_database_backend(),
3791 query.into(),
3792 ))
3793 .await
3794 .log_err();
3795 sqlx::Postgres::drop_database(db.options.get_url())
3796 .await
3797 .log_err();
3798 })
3799 }
3800 }
3801 }
3802}