1mod access_token;
2mod contact;
3mod follower;
4mod language_server;
5mod project;
6mod project_collaborator;
7mod room;
8mod room_participant;
9mod server;
10mod signup;
11#[cfg(test)]
12mod tests;
13mod user;
14mod worktree;
15mod worktree_diagnostic_summary;
16mod worktree_entry;
17mod worktree_repository;
18
19use crate::executor::Executor;
20use crate::{Error, Result};
21use anyhow::anyhow;
22use collections::{BTreeMap, HashMap, HashSet};
23pub use contact::Contact;
24use dashmap::DashMap;
25use futures::StreamExt;
26use hyper::StatusCode;
27use rand::prelude::StdRng;
28use rand::{Rng, SeedableRng};
29use rpc::{proto, ConnectionId};
30use sea_orm::Condition;
31pub use sea_orm::ConnectOptions;
32use sea_orm::{
33 entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
34 DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
35 Statement, TransactionTrait,
36};
37use sea_query::{Alias, Expr, OnConflict, Query};
38use serde::{Deserialize, Serialize};
39pub use signup::{Invite, NewSignup, WaitlistSummary};
40use sqlx::migrate::{Migrate, Migration, MigrationSource};
41use sqlx::Connection;
42use std::ops::{Deref, DerefMut};
43use std::path::Path;
44use std::time::Duration;
45use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
46use tokio::sync::{Mutex, OwnedMutexGuard};
47pub use user::Model as User;
48
49pub struct Database {
50 options: ConnectOptions,
51 pool: DatabaseConnection,
52 rooms: DashMap<RoomId, Arc<Mutex<()>>>,
53 rng: Mutex<StdRng>,
54 executor: Executor,
55 #[cfg(test)]
56 runtime: Option<tokio::runtime::Runtime>,
57}
58
59impl Database {
60 pub async fn new(options: ConnectOptions, executor: Executor) -> Result<Self> {
61 Ok(Self {
62 options: options.clone(),
63 pool: sea_orm::Database::connect(options).await?,
64 rooms: DashMap::with_capacity(16384),
65 rng: Mutex::new(StdRng::seed_from_u64(0)),
66 executor,
67 #[cfg(test)]
68 runtime: None,
69 })
70 }
71
72 #[cfg(test)]
73 pub fn reset(&self) {
74 self.rooms.clear();
75 }
76
77 pub async fn migrate(
78 &self,
79 migrations_path: &Path,
80 ignore_checksum_mismatch: bool,
81 ) -> anyhow::Result<Vec<(Migration, Duration)>> {
82 let migrations = MigrationSource::resolve(migrations_path)
83 .await
84 .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
85
86 let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
87
88 connection.ensure_migrations_table().await?;
89 let applied_migrations: HashMap<_, _> = connection
90 .list_applied_migrations()
91 .await?
92 .into_iter()
93 .map(|m| (m.version, m))
94 .collect();
95
96 let mut new_migrations = Vec::new();
97 for migration in migrations {
98 match applied_migrations.get(&migration.version) {
99 Some(applied_migration) => {
100 if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
101 {
102 Err(anyhow!(
103 "checksum mismatch for applied migration {}",
104 migration.description
105 ))?;
106 }
107 }
108 None => {
109 let elapsed = connection.apply(&migration).await?;
110 new_migrations.push((migration, elapsed));
111 }
112 }
113 }
114
115 Ok(new_migrations)
116 }
117
118 pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
119 self.transaction(|tx| async move {
120 let server = server::ActiveModel {
121 environment: ActiveValue::set(environment.into()),
122 ..Default::default()
123 }
124 .insert(&*tx)
125 .await?;
126 Ok(server.id)
127 })
128 .await
129 }
130
131 pub async fn stale_room_ids(
132 &self,
133 environment: &str,
134 new_server_id: ServerId,
135 ) -> Result<Vec<RoomId>> {
136 self.transaction(|tx| async move {
137 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
138 enum QueryAs {
139 RoomId,
140 }
141
142 let stale_server_epochs = self
143 .stale_server_ids(environment, new_server_id, &tx)
144 .await?;
145 Ok(room_participant::Entity::find()
146 .select_only()
147 .column(room_participant::Column::RoomId)
148 .distinct()
149 .filter(
150 room_participant::Column::AnsweringConnectionServerId
151 .is_in(stale_server_epochs),
152 )
153 .into_values::<_, QueryAs>()
154 .all(&*tx)
155 .await?)
156 })
157 .await
158 }
159
160 pub async fn refresh_room(
161 &self,
162 room_id: RoomId,
163 new_server_id: ServerId,
164 ) -> Result<RoomGuard<RefreshedRoom>> {
165 self.room_transaction(room_id, |tx| async move {
166 let stale_participant_filter = Condition::all()
167 .add(room_participant::Column::RoomId.eq(room_id))
168 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
169 .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
170
171 let stale_participant_user_ids = room_participant::Entity::find()
172 .filter(stale_participant_filter.clone())
173 .all(&*tx)
174 .await?
175 .into_iter()
176 .map(|participant| participant.user_id)
177 .collect::<Vec<_>>();
178
179 // Delete participants who failed to reconnect and cancel their calls.
180 let mut canceled_calls_to_user_ids = Vec::new();
181 room_participant::Entity::delete_many()
182 .filter(stale_participant_filter)
183 .exec(&*tx)
184 .await?;
185 let called_participants = room_participant::Entity::find()
186 .filter(
187 Condition::all()
188 .add(
189 room_participant::Column::CallingUserId
190 .is_in(stale_participant_user_ids.iter().copied()),
191 )
192 .add(room_participant::Column::AnsweringConnectionId.is_null()),
193 )
194 .all(&*tx)
195 .await?;
196 room_participant::Entity::delete_many()
197 .filter(
198 room_participant::Column::Id
199 .is_in(called_participants.iter().map(|participant| participant.id)),
200 )
201 .exec(&*tx)
202 .await?;
203 canceled_calls_to_user_ids.extend(
204 called_participants
205 .into_iter()
206 .map(|participant| participant.user_id),
207 );
208
209 let room = self.get_room(room_id, &tx).await?;
210 // Delete the room if it becomes empty.
211 if room.participants.is_empty() {
212 project::Entity::delete_many()
213 .filter(project::Column::RoomId.eq(room_id))
214 .exec(&*tx)
215 .await?;
216 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
217 }
218
219 Ok(RefreshedRoom {
220 room,
221 stale_participant_user_ids,
222 canceled_calls_to_user_ids,
223 })
224 })
225 .await
226 }
227
228 pub async fn delete_stale_servers(
229 &self,
230 environment: &str,
231 new_server_id: ServerId,
232 ) -> Result<()> {
233 self.transaction(|tx| async move {
234 server::Entity::delete_many()
235 .filter(
236 Condition::all()
237 .add(server::Column::Environment.eq(environment))
238 .add(server::Column::Id.ne(new_server_id)),
239 )
240 .exec(&*tx)
241 .await?;
242 Ok(())
243 })
244 .await
245 }
246
247 async fn stale_server_ids(
248 &self,
249 environment: &str,
250 new_server_id: ServerId,
251 tx: &DatabaseTransaction,
252 ) -> Result<Vec<ServerId>> {
253 let stale_servers = server::Entity::find()
254 .filter(
255 Condition::all()
256 .add(server::Column::Environment.eq(environment))
257 .add(server::Column::Id.ne(new_server_id)),
258 )
259 .all(&*tx)
260 .await?;
261 Ok(stale_servers.into_iter().map(|server| server.id).collect())
262 }
263
264 // users
265
266 pub async fn create_user(
267 &self,
268 email_address: &str,
269 admin: bool,
270 params: NewUserParams,
271 ) -> Result<NewUserResult> {
272 self.transaction(|tx| async {
273 let tx = tx;
274 let user = user::Entity::insert(user::ActiveModel {
275 email_address: ActiveValue::set(Some(email_address.into())),
276 github_login: ActiveValue::set(params.github_login.clone()),
277 github_user_id: ActiveValue::set(Some(params.github_user_id)),
278 admin: ActiveValue::set(admin),
279 metrics_id: ActiveValue::set(Uuid::new_v4()),
280 ..Default::default()
281 })
282 .on_conflict(
283 OnConflict::column(user::Column::GithubLogin)
284 .update_column(user::Column::GithubLogin)
285 .to_owned(),
286 )
287 .exec_with_returning(&*tx)
288 .await?;
289
290 Ok(NewUserResult {
291 user_id: user.id,
292 metrics_id: user.metrics_id.to_string(),
293 signup_device_id: None,
294 inviting_user_id: None,
295 })
296 })
297 .await
298 }
299
300 pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
301 self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
302 .await
303 }
304
305 pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
306 self.transaction(|tx| async {
307 let tx = tx;
308 Ok(user::Entity::find()
309 .filter(user::Column::Id.is_in(ids.iter().copied()))
310 .all(&*tx)
311 .await?)
312 })
313 .await
314 }
315
316 pub async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>> {
317 self.transaction(|tx| async move {
318 Ok(user::Entity::find()
319 .filter(user::Column::GithubLogin.eq(github_login))
320 .one(&*tx)
321 .await?)
322 })
323 .await
324 }
325
326 pub async fn get_or_create_user_by_github_account(
327 &self,
328 github_login: &str,
329 github_user_id: Option<i32>,
330 github_email: Option<&str>,
331 ) -> Result<Option<User>> {
332 self.transaction(|tx| async move {
333 let tx = &*tx;
334 if let Some(github_user_id) = github_user_id {
335 if let Some(user_by_github_user_id) = user::Entity::find()
336 .filter(user::Column::GithubUserId.eq(github_user_id))
337 .one(tx)
338 .await?
339 {
340 let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
341 user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
342 Ok(Some(user_by_github_user_id.update(tx).await?))
343 } else if let Some(user_by_github_login) = user::Entity::find()
344 .filter(user::Column::GithubLogin.eq(github_login))
345 .one(tx)
346 .await?
347 {
348 let mut user_by_github_login = user_by_github_login.into_active_model();
349 user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
350 Ok(Some(user_by_github_login.update(tx).await?))
351 } else {
352 let user = user::Entity::insert(user::ActiveModel {
353 email_address: ActiveValue::set(github_email.map(|email| email.into())),
354 github_login: ActiveValue::set(github_login.into()),
355 github_user_id: ActiveValue::set(Some(github_user_id)),
356 admin: ActiveValue::set(false),
357 invite_count: ActiveValue::set(0),
358 invite_code: ActiveValue::set(None),
359 metrics_id: ActiveValue::set(Uuid::new_v4()),
360 ..Default::default()
361 })
362 .exec_with_returning(&*tx)
363 .await?;
364 Ok(Some(user))
365 }
366 } else {
367 Ok(user::Entity::find()
368 .filter(user::Column::GithubLogin.eq(github_login))
369 .one(tx)
370 .await?)
371 }
372 })
373 .await
374 }
375
376 pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
377 self.transaction(|tx| async move {
378 Ok(user::Entity::find()
379 .order_by_asc(user::Column::GithubLogin)
380 .limit(limit as u64)
381 .offset(page as u64 * limit as u64)
382 .all(&*tx)
383 .await?)
384 })
385 .await
386 }
387
388 pub async fn get_users_with_no_invites(
389 &self,
390 invited_by_another_user: bool,
391 ) -> Result<Vec<User>> {
392 self.transaction(|tx| async move {
393 Ok(user::Entity::find()
394 .filter(
395 user::Column::InviteCount
396 .eq(0)
397 .and(if invited_by_another_user {
398 user::Column::InviterId.is_not_null()
399 } else {
400 user::Column::InviterId.is_null()
401 }),
402 )
403 .all(&*tx)
404 .await?)
405 })
406 .await
407 }
408
409 pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
410 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
411 enum QueryAs {
412 MetricsId,
413 }
414
415 self.transaction(|tx| async move {
416 let metrics_id: Uuid = user::Entity::find_by_id(id)
417 .select_only()
418 .column(user::Column::MetricsId)
419 .into_values::<_, QueryAs>()
420 .one(&*tx)
421 .await?
422 .ok_or_else(|| anyhow!("could not find user"))?;
423 Ok(metrics_id.to_string())
424 })
425 .await
426 }
427
428 pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
429 self.transaction(|tx| async move {
430 user::Entity::update_many()
431 .filter(user::Column::Id.eq(id))
432 .set(user::ActiveModel {
433 admin: ActiveValue::set(is_admin),
434 ..Default::default()
435 })
436 .exec(&*tx)
437 .await?;
438 Ok(())
439 })
440 .await
441 }
442
443 pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
444 self.transaction(|tx| async move {
445 user::Entity::update_many()
446 .filter(user::Column::Id.eq(id))
447 .set(user::ActiveModel {
448 connected_once: ActiveValue::set(connected_once),
449 ..Default::default()
450 })
451 .exec(&*tx)
452 .await?;
453 Ok(())
454 })
455 .await
456 }
457
458 pub async fn destroy_user(&self, id: UserId) -> Result<()> {
459 self.transaction(|tx| async move {
460 access_token::Entity::delete_many()
461 .filter(access_token::Column::UserId.eq(id))
462 .exec(&*tx)
463 .await?;
464 user::Entity::delete_by_id(id).exec(&*tx).await?;
465 Ok(())
466 })
467 .await
468 }
469
470 // contacts
471
472 pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
473 #[derive(Debug, FromQueryResult)]
474 struct ContactWithUserBusyStatuses {
475 user_id_a: UserId,
476 user_id_b: UserId,
477 a_to_b: bool,
478 accepted: bool,
479 should_notify: bool,
480 user_a_busy: bool,
481 user_b_busy: bool,
482 }
483
484 self.transaction(|tx| async move {
485 let user_a_participant = Alias::new("user_a_participant");
486 let user_b_participant = Alias::new("user_b_participant");
487 let mut db_contacts = contact::Entity::find()
488 .column_as(
489 Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
490 .is_not_null(),
491 "user_a_busy",
492 )
493 .column_as(
494 Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
495 .is_not_null(),
496 "user_b_busy",
497 )
498 .filter(
499 contact::Column::UserIdA
500 .eq(user_id)
501 .or(contact::Column::UserIdB.eq(user_id)),
502 )
503 .join_as(
504 JoinType::LeftJoin,
505 contact::Relation::UserARoomParticipant.def(),
506 user_a_participant,
507 )
508 .join_as(
509 JoinType::LeftJoin,
510 contact::Relation::UserBRoomParticipant.def(),
511 user_b_participant,
512 )
513 .into_model::<ContactWithUserBusyStatuses>()
514 .stream(&*tx)
515 .await?;
516
517 let mut contacts = Vec::new();
518 while let Some(db_contact) = db_contacts.next().await {
519 let db_contact = db_contact?;
520 if db_contact.user_id_a == user_id {
521 if db_contact.accepted {
522 contacts.push(Contact::Accepted {
523 user_id: db_contact.user_id_b,
524 should_notify: db_contact.should_notify && db_contact.a_to_b,
525 busy: db_contact.user_b_busy,
526 });
527 } else if db_contact.a_to_b {
528 contacts.push(Contact::Outgoing {
529 user_id: db_contact.user_id_b,
530 })
531 } else {
532 contacts.push(Contact::Incoming {
533 user_id: db_contact.user_id_b,
534 should_notify: db_contact.should_notify,
535 });
536 }
537 } else if db_contact.accepted {
538 contacts.push(Contact::Accepted {
539 user_id: db_contact.user_id_a,
540 should_notify: db_contact.should_notify && !db_contact.a_to_b,
541 busy: db_contact.user_a_busy,
542 });
543 } else if db_contact.a_to_b {
544 contacts.push(Contact::Incoming {
545 user_id: db_contact.user_id_a,
546 should_notify: db_contact.should_notify,
547 });
548 } else {
549 contacts.push(Contact::Outgoing {
550 user_id: db_contact.user_id_a,
551 });
552 }
553 }
554
555 contacts.sort_unstable_by_key(|contact| contact.user_id());
556
557 Ok(contacts)
558 })
559 .await
560 }
561
562 pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
563 self.transaction(|tx| async move {
564 let participant = room_participant::Entity::find()
565 .filter(room_participant::Column::UserId.eq(user_id))
566 .one(&*tx)
567 .await?;
568 Ok(participant.is_some())
569 })
570 .await
571 }
572
573 pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
574 self.transaction(|tx| async move {
575 let (id_a, id_b) = if user_id_1 < user_id_2 {
576 (user_id_1, user_id_2)
577 } else {
578 (user_id_2, user_id_1)
579 };
580
581 Ok(contact::Entity::find()
582 .filter(
583 contact::Column::UserIdA
584 .eq(id_a)
585 .and(contact::Column::UserIdB.eq(id_b))
586 .and(contact::Column::Accepted.eq(true)),
587 )
588 .one(&*tx)
589 .await?
590 .is_some())
591 })
592 .await
593 }
594
595 pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
596 self.transaction(|tx| async move {
597 let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
598 (sender_id, receiver_id, true)
599 } else {
600 (receiver_id, sender_id, false)
601 };
602
603 let rows_affected = contact::Entity::insert(contact::ActiveModel {
604 user_id_a: ActiveValue::set(id_a),
605 user_id_b: ActiveValue::set(id_b),
606 a_to_b: ActiveValue::set(a_to_b),
607 accepted: ActiveValue::set(false),
608 should_notify: ActiveValue::set(true),
609 ..Default::default()
610 })
611 .on_conflict(
612 OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
613 .values([
614 (contact::Column::Accepted, true.into()),
615 (contact::Column::ShouldNotify, false.into()),
616 ])
617 .action_and_where(
618 contact::Column::Accepted.eq(false).and(
619 contact::Column::AToB
620 .eq(a_to_b)
621 .and(contact::Column::UserIdA.eq(id_b))
622 .or(contact::Column::AToB
623 .ne(a_to_b)
624 .and(contact::Column::UserIdA.eq(id_a))),
625 ),
626 )
627 .to_owned(),
628 )
629 .exec_without_returning(&*tx)
630 .await?;
631
632 if rows_affected == 1 {
633 Ok(())
634 } else {
635 Err(anyhow!("contact already requested"))?
636 }
637 })
638 .await
639 }
640
641 /// Returns a bool indicating whether the removed contact had originally accepted or not
642 ///
643 /// Deletes the contact identified by the requester and responder ids, and then returns
644 /// whether the deleted contact had originally accepted or was a pending contact request.
645 ///
646 /// # Arguments
647 ///
648 /// * `requester_id` - The user that initiates this request
649 /// * `responder_id` - The user that will be removed
650 pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<bool> {
651 self.transaction(|tx| async move {
652 let (id_a, id_b) = if responder_id < requester_id {
653 (responder_id, requester_id)
654 } else {
655 (requester_id, responder_id)
656 };
657
658 let contact = contact::Entity::find()
659 .filter(
660 contact::Column::UserIdA
661 .eq(id_a)
662 .and(contact::Column::UserIdB.eq(id_b)),
663 )
664 .one(&*tx)
665 .await?
666 .ok_or_else(|| anyhow!("no such contact"))?;
667
668 contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
669 Ok(contact.accepted)
670 })
671 .await
672 }
673
674 pub async fn dismiss_contact_notification(
675 &self,
676 user_id: UserId,
677 contact_user_id: UserId,
678 ) -> Result<()> {
679 self.transaction(|tx| async move {
680 let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
681 (user_id, contact_user_id, true)
682 } else {
683 (contact_user_id, user_id, false)
684 };
685
686 let result = contact::Entity::update_many()
687 .set(contact::ActiveModel {
688 should_notify: ActiveValue::set(false),
689 ..Default::default()
690 })
691 .filter(
692 contact::Column::UserIdA
693 .eq(id_a)
694 .and(contact::Column::UserIdB.eq(id_b))
695 .and(
696 contact::Column::AToB
697 .eq(a_to_b)
698 .and(contact::Column::Accepted.eq(true))
699 .or(contact::Column::AToB
700 .ne(a_to_b)
701 .and(contact::Column::Accepted.eq(false))),
702 ),
703 )
704 .exec(&*tx)
705 .await?;
706 if result.rows_affected == 0 {
707 Err(anyhow!("no such contact request"))?
708 } else {
709 Ok(())
710 }
711 })
712 .await
713 }
714
715 pub async fn respond_to_contact_request(
716 &self,
717 responder_id: UserId,
718 requester_id: UserId,
719 accept: bool,
720 ) -> Result<()> {
721 self.transaction(|tx| async move {
722 let (id_a, id_b, a_to_b) = if responder_id < requester_id {
723 (responder_id, requester_id, false)
724 } else {
725 (requester_id, responder_id, true)
726 };
727 let rows_affected = if accept {
728 let result = contact::Entity::update_many()
729 .set(contact::ActiveModel {
730 accepted: ActiveValue::set(true),
731 should_notify: ActiveValue::set(true),
732 ..Default::default()
733 })
734 .filter(
735 contact::Column::UserIdA
736 .eq(id_a)
737 .and(contact::Column::UserIdB.eq(id_b))
738 .and(contact::Column::AToB.eq(a_to_b)),
739 )
740 .exec(&*tx)
741 .await?;
742 result.rows_affected
743 } else {
744 let result = contact::Entity::delete_many()
745 .filter(
746 contact::Column::UserIdA
747 .eq(id_a)
748 .and(contact::Column::UserIdB.eq(id_b))
749 .and(contact::Column::AToB.eq(a_to_b))
750 .and(contact::Column::Accepted.eq(false)),
751 )
752 .exec(&*tx)
753 .await?;
754
755 result.rows_affected
756 };
757
758 if rows_affected == 1 {
759 Ok(())
760 } else {
761 Err(anyhow!("no such contact request"))?
762 }
763 })
764 .await
765 }
766
767 pub fn fuzzy_like_string(string: &str) -> String {
768 let mut result = String::with_capacity(string.len() * 2 + 1);
769 for c in string.chars() {
770 if c.is_alphanumeric() {
771 result.push('%');
772 result.push(c);
773 }
774 }
775 result.push('%');
776 result
777 }
778
779 pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
780 self.transaction(|tx| async {
781 let tx = tx;
782 let like_string = Self::fuzzy_like_string(name_query);
783 let query = "
784 SELECT users.*
785 FROM users
786 WHERE github_login ILIKE $1
787 ORDER BY github_login <-> $2
788 LIMIT $3
789 ";
790
791 Ok(user::Entity::find()
792 .from_raw_sql(Statement::from_sql_and_values(
793 self.pool.get_database_backend(),
794 query.into(),
795 vec![like_string.into(), name_query.into(), limit.into()],
796 ))
797 .all(&*tx)
798 .await?)
799 })
800 .await
801 }
802
803 // signups
804
805 pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
806 self.transaction(|tx| async move {
807 signup::Entity::insert(signup::ActiveModel {
808 email_address: ActiveValue::set(signup.email_address.clone()),
809 email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
810 email_confirmation_sent: ActiveValue::set(false),
811 platform_mac: ActiveValue::set(signup.platform_mac),
812 platform_windows: ActiveValue::set(signup.platform_windows),
813 platform_linux: ActiveValue::set(signup.platform_linux),
814 platform_unknown: ActiveValue::set(false),
815 editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
816 programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
817 device_id: ActiveValue::set(signup.device_id.clone()),
818 added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
819 ..Default::default()
820 })
821 .on_conflict(
822 OnConflict::column(signup::Column::EmailAddress)
823 .update_columns([
824 signup::Column::PlatformMac,
825 signup::Column::PlatformWindows,
826 signup::Column::PlatformLinux,
827 signup::Column::EditorFeatures,
828 signup::Column::ProgrammingLanguages,
829 signup::Column::DeviceId,
830 signup::Column::AddedToMailingList,
831 ])
832 .to_owned(),
833 )
834 .exec(&*tx)
835 .await?;
836 Ok(())
837 })
838 .await
839 }
840
841 pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
842 self.transaction(|tx| async move {
843 let signup = signup::Entity::find()
844 .filter(signup::Column::EmailAddress.eq(email_address))
845 .one(&*tx)
846 .await?
847 .ok_or_else(|| {
848 anyhow!("signup with email address {} doesn't exist", email_address)
849 })?;
850
851 Ok(signup)
852 })
853 .await
854 }
855
856 pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
857 self.transaction(|tx| async move {
858 let query = "
859 SELECT
860 COUNT(*) as count,
861 COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
862 COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
863 COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
864 COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
865 FROM (
866 SELECT *
867 FROM signups
868 WHERE
869 NOT email_confirmation_sent
870 ) AS unsent
871 ";
872 Ok(
873 WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
874 self.pool.get_database_backend(),
875 query.into(),
876 vec![],
877 ))
878 .one(&*tx)
879 .await?
880 .ok_or_else(|| anyhow!("invalid result"))?,
881 )
882 })
883 .await
884 }
885
886 pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
887 let emails = invites
888 .iter()
889 .map(|s| s.email_address.as_str())
890 .collect::<Vec<_>>();
891 self.transaction(|tx| async {
892 let tx = tx;
893 signup::Entity::update_many()
894 .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
895 .set(signup::ActiveModel {
896 email_confirmation_sent: ActiveValue::set(true),
897 ..Default::default()
898 })
899 .exec(&*tx)
900 .await?;
901 Ok(())
902 })
903 .await
904 }
905
906 pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
907 self.transaction(|tx| async move {
908 Ok(signup::Entity::find()
909 .select_only()
910 .column(signup::Column::EmailAddress)
911 .column(signup::Column::EmailConfirmationCode)
912 .filter(
913 signup::Column::EmailConfirmationSent.eq(false).and(
914 signup::Column::PlatformMac
915 .eq(true)
916 .or(signup::Column::PlatformUnknown.eq(true)),
917 ),
918 )
919 .order_by_asc(signup::Column::CreatedAt)
920 .limit(count as u64)
921 .into_model()
922 .all(&*tx)
923 .await?)
924 })
925 .await
926 }
927
928 // invite codes
929
930 pub async fn create_invite_from_code(
931 &self,
932 code: &str,
933 email_address: &str,
934 device_id: Option<&str>,
935 added_to_mailing_list: bool,
936 ) -> Result<Invite> {
937 self.transaction(|tx| async move {
938 let existing_user = user::Entity::find()
939 .filter(user::Column::EmailAddress.eq(email_address))
940 .one(&*tx)
941 .await?;
942
943 if existing_user.is_some() {
944 Err(anyhow!("email address is already in use"))?;
945 }
946
947 let inviting_user_with_invites = match user::Entity::find()
948 .filter(
949 user::Column::InviteCode
950 .eq(code)
951 .and(user::Column::InviteCount.gt(0)),
952 )
953 .one(&*tx)
954 .await?
955 {
956 Some(inviting_user) => inviting_user,
957 None => {
958 return Err(Error::Http(
959 StatusCode::UNAUTHORIZED,
960 "unable to find an invite code with invites remaining".to_string(),
961 ))?
962 }
963 };
964 user::Entity::update_many()
965 .filter(
966 user::Column::Id
967 .eq(inviting_user_with_invites.id)
968 .and(user::Column::InviteCount.gt(0)),
969 )
970 .col_expr(
971 user::Column::InviteCount,
972 Expr::col(user::Column::InviteCount).sub(1),
973 )
974 .exec(&*tx)
975 .await?;
976
977 let signup = signup::Entity::insert(signup::ActiveModel {
978 email_address: ActiveValue::set(email_address.into()),
979 email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
980 email_confirmation_sent: ActiveValue::set(false),
981 inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
982 platform_linux: ActiveValue::set(false),
983 platform_mac: ActiveValue::set(false),
984 platform_windows: ActiveValue::set(false),
985 platform_unknown: ActiveValue::set(true),
986 device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
987 added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
988 ..Default::default()
989 })
990 .on_conflict(
991 OnConflict::column(signup::Column::EmailAddress)
992 .update_column(signup::Column::InvitingUserId)
993 .to_owned(),
994 )
995 .exec_with_returning(&*tx)
996 .await?;
997
998 Ok(Invite {
999 email_address: signup.email_address,
1000 email_confirmation_code: signup.email_confirmation_code,
1001 })
1002 })
1003 .await
1004 }
1005
1006 pub async fn create_user_from_invite(
1007 &self,
1008 invite: &Invite,
1009 user: NewUserParams,
1010 ) -> Result<Option<NewUserResult>> {
1011 self.transaction(|tx| async {
1012 let tx = tx;
1013 let signup = signup::Entity::find()
1014 .filter(
1015 signup::Column::EmailAddress
1016 .eq(invite.email_address.as_str())
1017 .and(
1018 signup::Column::EmailConfirmationCode
1019 .eq(invite.email_confirmation_code.as_str()),
1020 ),
1021 )
1022 .one(&*tx)
1023 .await?
1024 .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
1025
1026 if signup.user_id.is_some() {
1027 return Ok(None);
1028 }
1029
1030 let user = user::Entity::insert(user::ActiveModel {
1031 email_address: ActiveValue::set(Some(invite.email_address.clone())),
1032 github_login: ActiveValue::set(user.github_login.clone()),
1033 github_user_id: ActiveValue::set(Some(user.github_user_id)),
1034 admin: ActiveValue::set(false),
1035 invite_count: ActiveValue::set(user.invite_count),
1036 invite_code: ActiveValue::set(Some(random_invite_code())),
1037 metrics_id: ActiveValue::set(Uuid::new_v4()),
1038 ..Default::default()
1039 })
1040 .on_conflict(
1041 OnConflict::column(user::Column::GithubLogin)
1042 .update_columns([
1043 user::Column::EmailAddress,
1044 user::Column::GithubUserId,
1045 user::Column::Admin,
1046 ])
1047 .to_owned(),
1048 )
1049 .exec_with_returning(&*tx)
1050 .await?;
1051
1052 let mut signup = signup.into_active_model();
1053 signup.user_id = ActiveValue::set(Some(user.id));
1054 let signup = signup.update(&*tx).await?;
1055
1056 if let Some(inviting_user_id) = signup.inviting_user_id {
1057 let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1058 (inviting_user_id, user.id, true)
1059 } else {
1060 (user.id, inviting_user_id, false)
1061 };
1062
1063 contact::Entity::insert(contact::ActiveModel {
1064 user_id_a: ActiveValue::set(user_id_a),
1065 user_id_b: ActiveValue::set(user_id_b),
1066 a_to_b: ActiveValue::set(a_to_b),
1067 should_notify: ActiveValue::set(true),
1068 accepted: ActiveValue::set(true),
1069 ..Default::default()
1070 })
1071 .on_conflict(OnConflict::new().do_nothing().to_owned())
1072 .exec_without_returning(&*tx)
1073 .await?;
1074 }
1075
1076 Ok(Some(NewUserResult {
1077 user_id: user.id,
1078 metrics_id: user.metrics_id.to_string(),
1079 inviting_user_id: signup.inviting_user_id,
1080 signup_device_id: signup.device_id,
1081 }))
1082 })
1083 .await
1084 }
1085
1086 pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1087 self.transaction(|tx| async move {
1088 if count > 0 {
1089 user::Entity::update_many()
1090 .filter(
1091 user::Column::Id
1092 .eq(id)
1093 .and(user::Column::InviteCode.is_null()),
1094 )
1095 .set(user::ActiveModel {
1096 invite_code: ActiveValue::set(Some(random_invite_code())),
1097 ..Default::default()
1098 })
1099 .exec(&*tx)
1100 .await?;
1101 }
1102
1103 user::Entity::update_many()
1104 .filter(user::Column::Id.eq(id))
1105 .set(user::ActiveModel {
1106 invite_count: ActiveValue::set(count),
1107 ..Default::default()
1108 })
1109 .exec(&*tx)
1110 .await?;
1111 Ok(())
1112 })
1113 .await
1114 }
1115
1116 pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1117 self.transaction(|tx| async move {
1118 match user::Entity::find_by_id(id).one(&*tx).await? {
1119 Some(user) if user.invite_code.is_some() => {
1120 Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1121 }
1122 _ => Ok(None),
1123 }
1124 })
1125 .await
1126 }
1127
1128 pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1129 self.transaction(|tx| async move {
1130 user::Entity::find()
1131 .filter(user::Column::InviteCode.eq(code))
1132 .one(&*tx)
1133 .await?
1134 .ok_or_else(|| {
1135 Error::Http(
1136 StatusCode::NOT_FOUND,
1137 "that invite code does not exist".to_string(),
1138 )
1139 })
1140 })
1141 .await
1142 }
1143
1144 // rooms
1145
1146 pub async fn incoming_call_for_user(
1147 &self,
1148 user_id: UserId,
1149 ) -> Result<Option<proto::IncomingCall>> {
1150 self.transaction(|tx| async move {
1151 let pending_participant = room_participant::Entity::find()
1152 .filter(
1153 room_participant::Column::UserId
1154 .eq(user_id)
1155 .and(room_participant::Column::AnsweringConnectionId.is_null()),
1156 )
1157 .one(&*tx)
1158 .await?;
1159
1160 if let Some(pending_participant) = pending_participant {
1161 let room = self.get_room(pending_participant.room_id, &tx).await?;
1162 Ok(Self::build_incoming_call(&room, user_id))
1163 } else {
1164 Ok(None)
1165 }
1166 })
1167 .await
1168 }
1169
1170 pub async fn create_room(
1171 &self,
1172 user_id: UserId,
1173 connection: ConnectionId,
1174 live_kit_room: &str,
1175 ) -> Result<proto::Room> {
1176 self.transaction(|tx| async move {
1177 let room = room::ActiveModel {
1178 live_kit_room: ActiveValue::set(live_kit_room.into()),
1179 ..Default::default()
1180 }
1181 .insert(&*tx)
1182 .await?;
1183 room_participant::ActiveModel {
1184 room_id: ActiveValue::set(room.id),
1185 user_id: ActiveValue::set(user_id),
1186 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1187 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1188 connection.owner_id as i32,
1189 ))),
1190 answering_connection_lost: ActiveValue::set(false),
1191 calling_user_id: ActiveValue::set(user_id),
1192 calling_connection_id: ActiveValue::set(connection.id as i32),
1193 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1194 connection.owner_id as i32,
1195 ))),
1196 ..Default::default()
1197 }
1198 .insert(&*tx)
1199 .await?;
1200
1201 let room = self.get_room(room.id, &tx).await?;
1202 Ok(room)
1203 })
1204 .await
1205 }
1206
1207 pub async fn call(
1208 &self,
1209 room_id: RoomId,
1210 calling_user_id: UserId,
1211 calling_connection: ConnectionId,
1212 called_user_id: UserId,
1213 initial_project_id: Option<ProjectId>,
1214 ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1215 self.room_transaction(room_id, |tx| async move {
1216 room_participant::ActiveModel {
1217 room_id: ActiveValue::set(room_id),
1218 user_id: ActiveValue::set(called_user_id),
1219 answering_connection_lost: ActiveValue::set(false),
1220 calling_user_id: ActiveValue::set(calling_user_id),
1221 calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1222 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1223 calling_connection.owner_id as i32,
1224 ))),
1225 initial_project_id: ActiveValue::set(initial_project_id),
1226 ..Default::default()
1227 }
1228 .insert(&*tx)
1229 .await?;
1230
1231 let room = self.get_room(room_id, &tx).await?;
1232 let incoming_call = Self::build_incoming_call(&room, called_user_id)
1233 .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1234 Ok((room, incoming_call))
1235 })
1236 .await
1237 }
1238
1239 pub async fn call_failed(
1240 &self,
1241 room_id: RoomId,
1242 called_user_id: UserId,
1243 ) -> Result<RoomGuard<proto::Room>> {
1244 self.room_transaction(room_id, |tx| async move {
1245 room_participant::Entity::delete_many()
1246 .filter(
1247 room_participant::Column::RoomId
1248 .eq(room_id)
1249 .and(room_participant::Column::UserId.eq(called_user_id)),
1250 )
1251 .exec(&*tx)
1252 .await?;
1253 let room = self.get_room(room_id, &tx).await?;
1254 Ok(room)
1255 })
1256 .await
1257 }
1258
1259 pub async fn decline_call(
1260 &self,
1261 expected_room_id: Option<RoomId>,
1262 user_id: UserId,
1263 ) -> Result<Option<RoomGuard<proto::Room>>> {
1264 self.optional_room_transaction(|tx| async move {
1265 let mut filter = Condition::all()
1266 .add(room_participant::Column::UserId.eq(user_id))
1267 .add(room_participant::Column::AnsweringConnectionId.is_null());
1268 if let Some(room_id) = expected_room_id {
1269 filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1270 }
1271 let participant = room_participant::Entity::find()
1272 .filter(filter)
1273 .one(&*tx)
1274 .await?;
1275
1276 let participant = if let Some(participant) = participant {
1277 participant
1278 } else if expected_room_id.is_some() {
1279 return Err(anyhow!("could not find call to decline"))?;
1280 } else {
1281 return Ok(None);
1282 };
1283
1284 let room_id = participant.room_id;
1285 room_participant::Entity::delete(participant.into_active_model())
1286 .exec(&*tx)
1287 .await?;
1288
1289 let room = self.get_room(room_id, &tx).await?;
1290 Ok(Some((room_id, room)))
1291 })
1292 .await
1293 }
1294
1295 pub async fn cancel_call(
1296 &self,
1297 room_id: RoomId,
1298 calling_connection: ConnectionId,
1299 called_user_id: UserId,
1300 ) -> Result<RoomGuard<proto::Room>> {
1301 self.room_transaction(room_id, |tx| async move {
1302 let participant = room_participant::Entity::find()
1303 .filter(
1304 Condition::all()
1305 .add(room_participant::Column::UserId.eq(called_user_id))
1306 .add(room_participant::Column::RoomId.eq(room_id))
1307 .add(
1308 room_participant::Column::CallingConnectionId
1309 .eq(calling_connection.id as i32),
1310 )
1311 .add(
1312 room_participant::Column::CallingConnectionServerId
1313 .eq(calling_connection.owner_id as i32),
1314 )
1315 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1316 )
1317 .one(&*tx)
1318 .await?
1319 .ok_or_else(|| anyhow!("no call to cancel"))?;
1320
1321 room_participant::Entity::delete(participant.into_active_model())
1322 .exec(&*tx)
1323 .await?;
1324
1325 let room = self.get_room(room_id, &tx).await?;
1326 Ok(room)
1327 })
1328 .await
1329 }
1330
1331 pub async fn join_room(
1332 &self,
1333 room_id: RoomId,
1334 user_id: UserId,
1335 connection: ConnectionId,
1336 ) -> Result<RoomGuard<proto::Room>> {
1337 self.room_transaction(room_id, |tx| async move {
1338 let result = room_participant::Entity::update_many()
1339 .filter(
1340 Condition::all()
1341 .add(room_participant::Column::RoomId.eq(room_id))
1342 .add(room_participant::Column::UserId.eq(user_id))
1343 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1344 )
1345 .set(room_participant::ActiveModel {
1346 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1347 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1348 connection.owner_id as i32,
1349 ))),
1350 answering_connection_lost: ActiveValue::set(false),
1351 ..Default::default()
1352 })
1353 .exec(&*tx)
1354 .await?;
1355 if result.rows_affected == 0 {
1356 Err(anyhow!("room does not exist or was already joined"))?
1357 } else {
1358 let room = self.get_room(room_id, &tx).await?;
1359 Ok(room)
1360 }
1361 })
1362 .await
1363 }
1364
1365 pub async fn rejoin_room(
1366 &self,
1367 rejoin_room: proto::RejoinRoom,
1368 user_id: UserId,
1369 connection: ConnectionId,
1370 ) -> Result<RoomGuard<RejoinedRoom>> {
1371 let room_id = RoomId::from_proto(rejoin_room.id);
1372 self.room_transaction(room_id, |tx| async {
1373 let tx = tx;
1374 let participant_update = room_participant::Entity::update_many()
1375 .filter(
1376 Condition::all()
1377 .add(room_participant::Column::RoomId.eq(room_id))
1378 .add(room_participant::Column::UserId.eq(user_id))
1379 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1380 .add(
1381 Condition::any()
1382 .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1383 .add(
1384 room_participant::Column::AnsweringConnectionServerId
1385 .ne(connection.owner_id as i32),
1386 ),
1387 ),
1388 )
1389 .set(room_participant::ActiveModel {
1390 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1391 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1392 connection.owner_id as i32,
1393 ))),
1394 answering_connection_lost: ActiveValue::set(false),
1395 ..Default::default()
1396 })
1397 .exec(&*tx)
1398 .await?;
1399 if participant_update.rows_affected == 0 {
1400 return Err(anyhow!("room does not exist or was already joined"))?;
1401 }
1402
1403 let mut reshared_projects = Vec::new();
1404 for reshared_project in &rejoin_room.reshared_projects {
1405 let project_id = ProjectId::from_proto(reshared_project.project_id);
1406 let project = project::Entity::find_by_id(project_id)
1407 .one(&*tx)
1408 .await?
1409 .ok_or_else(|| anyhow!("project does not exist"))?;
1410 if project.host_user_id != user_id {
1411 return Err(anyhow!("no such project"))?;
1412 }
1413
1414 let mut collaborators = project
1415 .find_related(project_collaborator::Entity)
1416 .all(&*tx)
1417 .await?;
1418 let host_ix = collaborators
1419 .iter()
1420 .position(|collaborator| {
1421 collaborator.user_id == user_id && collaborator.is_host
1422 })
1423 .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1424 let host = collaborators.swap_remove(host_ix);
1425 let old_connection_id = host.connection();
1426
1427 project::Entity::update(project::ActiveModel {
1428 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1429 host_connection_server_id: ActiveValue::set(Some(ServerId(
1430 connection.owner_id as i32,
1431 ))),
1432 ..project.into_active_model()
1433 })
1434 .exec(&*tx)
1435 .await?;
1436 project_collaborator::Entity::update(project_collaborator::ActiveModel {
1437 connection_id: ActiveValue::set(connection.id as i32),
1438 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1439 ..host.into_active_model()
1440 })
1441 .exec(&*tx)
1442 .await?;
1443
1444 self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1445 .await?;
1446
1447 reshared_projects.push(ResharedProject {
1448 id: project_id,
1449 old_connection_id,
1450 collaborators: collaborators
1451 .iter()
1452 .map(|collaborator| ProjectCollaborator {
1453 connection_id: collaborator.connection(),
1454 user_id: collaborator.user_id,
1455 replica_id: collaborator.replica_id,
1456 is_host: collaborator.is_host,
1457 })
1458 .collect(),
1459 worktrees: reshared_project.worktrees.clone(),
1460 });
1461 }
1462
1463 project::Entity::delete_many()
1464 .filter(
1465 Condition::all()
1466 .add(project::Column::RoomId.eq(room_id))
1467 .add(project::Column::HostUserId.eq(user_id))
1468 .add(
1469 project::Column::Id
1470 .is_not_in(reshared_projects.iter().map(|project| project.id)),
1471 ),
1472 )
1473 .exec(&*tx)
1474 .await?;
1475
1476 let mut rejoined_projects = Vec::new();
1477 for rejoined_project in &rejoin_room.rejoined_projects {
1478 let project_id = ProjectId::from_proto(rejoined_project.id);
1479 let Some(project) = project::Entity::find_by_id(project_id)
1480 .one(&*tx)
1481 .await? else { continue };
1482
1483 let mut worktrees = Vec::new();
1484 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1485 for db_worktree in db_worktrees {
1486 let mut worktree = RejoinedWorktree {
1487 id: db_worktree.id as u64,
1488 abs_path: db_worktree.abs_path,
1489 root_name: db_worktree.root_name,
1490 visible: db_worktree.visible,
1491 updated_entries: Default::default(),
1492 removed_entries: Default::default(),
1493 updated_repositories: Default::default(),
1494 removed_repositories: Default::default(),
1495 diagnostic_summaries: Default::default(),
1496 scan_id: db_worktree.scan_id as u64,
1497 completed_scan_id: db_worktree.completed_scan_id as u64,
1498 };
1499
1500 let rejoined_worktree = rejoined_project
1501 .worktrees
1502 .iter()
1503 .find(|worktree| worktree.id == db_worktree.id as u64);
1504
1505 // File entries
1506 {
1507 let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1508 worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1509 } else {
1510 worktree_entry::Column::IsDeleted.eq(false)
1511 };
1512
1513 let mut db_entries = worktree_entry::Entity::find()
1514 .filter(
1515 Condition::all()
1516 .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1517 .add(entry_filter),
1518 )
1519 .stream(&*tx)
1520 .await?;
1521
1522 while let Some(db_entry) = db_entries.next().await {
1523 let db_entry = db_entry?;
1524 if db_entry.is_deleted {
1525 worktree.removed_entries.push(db_entry.id as u64);
1526 } else {
1527 worktree.updated_entries.push(proto::Entry {
1528 id: db_entry.id as u64,
1529 is_dir: db_entry.is_dir,
1530 path: db_entry.path,
1531 inode: db_entry.inode as u64,
1532 mtime: Some(proto::Timestamp {
1533 seconds: db_entry.mtime_seconds as u64,
1534 nanos: db_entry.mtime_nanos as u32,
1535 }),
1536 is_symlink: db_entry.is_symlink,
1537 is_ignored: db_entry.is_ignored,
1538 });
1539 }
1540 }
1541 }
1542
1543 // Repository Entries
1544 {
1545 let repository_entry_filter =
1546 if let Some(rejoined_worktree) = rejoined_worktree {
1547 worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
1548 } else {
1549 worktree_repository::Column::IsDeleted.eq(false)
1550 };
1551
1552 let mut db_repositories = worktree_repository::Entity::find()
1553 .filter(
1554 Condition::all()
1555 .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
1556 .add(repository_entry_filter),
1557 )
1558 .stream(&*tx)
1559 .await?;
1560
1561 while let Some(db_repository) = db_repositories.next().await {
1562 let db_repository = db_repository?;
1563 if db_repository.is_deleted {
1564 worktree
1565 .removed_repositories
1566 .push(db_repository.dot_git_entry_id as u64);
1567 } else {
1568 worktree.updated_repositories.push(proto::RepositoryEntry {
1569 dot_git_entry_id: db_repository.dot_git_entry_id as u64,
1570 scan_id: db_repository.scan_id as u64,
1571 work_directory: db_repository.work_directory_path,
1572 branch: db_repository.branch,
1573 });
1574 }
1575 }
1576 }
1577
1578 worktrees.push(worktree);
1579 }
1580
1581 let language_servers = project
1582 .find_related(language_server::Entity)
1583 .all(&*tx)
1584 .await?
1585 .into_iter()
1586 .map(|language_server| proto::LanguageServer {
1587 id: language_server.id as u64,
1588 name: language_server.name,
1589 })
1590 .collect::<Vec<_>>();
1591
1592 let mut collaborators = project
1593 .find_related(project_collaborator::Entity)
1594 .all(&*tx)
1595 .await?;
1596 let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1597 .iter()
1598 .position(|collaborator| collaborator.user_id == user_id)
1599 {
1600 collaborators.swap_remove(self_collaborator_ix)
1601 } else {
1602 continue;
1603 };
1604 let old_connection_id = self_collaborator.connection();
1605 project_collaborator::Entity::update(project_collaborator::ActiveModel {
1606 connection_id: ActiveValue::set(connection.id as i32),
1607 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1608 ..self_collaborator.into_active_model()
1609 })
1610 .exec(&*tx)
1611 .await?;
1612
1613 let collaborators = collaborators
1614 .into_iter()
1615 .map(|collaborator| ProjectCollaborator {
1616 connection_id: collaborator.connection(),
1617 user_id: collaborator.user_id,
1618 replica_id: collaborator.replica_id,
1619 is_host: collaborator.is_host,
1620 })
1621 .collect::<Vec<_>>();
1622
1623 rejoined_projects.push(RejoinedProject {
1624 id: project_id,
1625 old_connection_id,
1626 collaborators,
1627 worktrees,
1628 language_servers,
1629 });
1630 }
1631
1632 let room = self.get_room(room_id, &tx).await?;
1633 Ok(RejoinedRoom {
1634 room,
1635 rejoined_projects,
1636 reshared_projects,
1637 })
1638 })
1639 .await
1640 }
1641
1642 pub async fn leave_room(
1643 &self,
1644 connection: ConnectionId,
1645 ) -> Result<Option<RoomGuard<LeftRoom>>> {
1646 self.optional_room_transaction(|tx| async move {
1647 let leaving_participant = room_participant::Entity::find()
1648 .filter(
1649 Condition::all()
1650 .add(
1651 room_participant::Column::AnsweringConnectionId
1652 .eq(connection.id as i32),
1653 )
1654 .add(
1655 room_participant::Column::AnsweringConnectionServerId
1656 .eq(connection.owner_id as i32),
1657 ),
1658 )
1659 .one(&*tx)
1660 .await?;
1661
1662 if let Some(leaving_participant) = leaving_participant {
1663 // Leave room.
1664 let room_id = leaving_participant.room_id;
1665 room_participant::Entity::delete_by_id(leaving_participant.id)
1666 .exec(&*tx)
1667 .await?;
1668
1669 // Cancel pending calls initiated by the leaving user.
1670 let called_participants = room_participant::Entity::find()
1671 .filter(
1672 Condition::all()
1673 .add(
1674 room_participant::Column::CallingUserId
1675 .eq(leaving_participant.user_id),
1676 )
1677 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1678 )
1679 .all(&*tx)
1680 .await?;
1681 room_participant::Entity::delete_many()
1682 .filter(
1683 room_participant::Column::Id
1684 .is_in(called_participants.iter().map(|participant| participant.id)),
1685 )
1686 .exec(&*tx)
1687 .await?;
1688 let canceled_calls_to_user_ids = called_participants
1689 .into_iter()
1690 .map(|participant| participant.user_id)
1691 .collect();
1692
1693 // Detect left projects.
1694 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1695 enum QueryProjectIds {
1696 ProjectId,
1697 }
1698 let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1699 .select_only()
1700 .column_as(
1701 project_collaborator::Column::ProjectId,
1702 QueryProjectIds::ProjectId,
1703 )
1704 .filter(
1705 Condition::all()
1706 .add(
1707 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1708 )
1709 .add(
1710 project_collaborator::Column::ConnectionServerId
1711 .eq(connection.owner_id as i32),
1712 ),
1713 )
1714 .into_values::<_, QueryProjectIds>()
1715 .all(&*tx)
1716 .await?;
1717 let mut left_projects = HashMap::default();
1718 let mut collaborators = project_collaborator::Entity::find()
1719 .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1720 .stream(&*tx)
1721 .await?;
1722 while let Some(collaborator) = collaborators.next().await {
1723 let collaborator = collaborator?;
1724 let left_project =
1725 left_projects
1726 .entry(collaborator.project_id)
1727 .or_insert(LeftProject {
1728 id: collaborator.project_id,
1729 host_user_id: Default::default(),
1730 connection_ids: Default::default(),
1731 host_connection_id: Default::default(),
1732 });
1733
1734 let collaborator_connection_id = collaborator.connection();
1735 if collaborator_connection_id != connection {
1736 left_project.connection_ids.push(collaborator_connection_id);
1737 }
1738
1739 if collaborator.is_host {
1740 left_project.host_user_id = collaborator.user_id;
1741 left_project.host_connection_id = collaborator_connection_id;
1742 }
1743 }
1744 drop(collaborators);
1745
1746 // Leave projects.
1747 project_collaborator::Entity::delete_many()
1748 .filter(
1749 Condition::all()
1750 .add(
1751 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1752 )
1753 .add(
1754 project_collaborator::Column::ConnectionServerId
1755 .eq(connection.owner_id as i32),
1756 ),
1757 )
1758 .exec(&*tx)
1759 .await?;
1760
1761 // Unshare projects.
1762 project::Entity::delete_many()
1763 .filter(
1764 Condition::all()
1765 .add(project::Column::RoomId.eq(room_id))
1766 .add(project::Column::HostConnectionId.eq(connection.id as i32))
1767 .add(
1768 project::Column::HostConnectionServerId
1769 .eq(connection.owner_id as i32),
1770 ),
1771 )
1772 .exec(&*tx)
1773 .await?;
1774
1775 let room = self.get_room(room_id, &tx).await?;
1776 if room.participants.is_empty() {
1777 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
1778 }
1779
1780 let left_room = LeftRoom {
1781 room,
1782 left_projects,
1783 canceled_calls_to_user_ids,
1784 };
1785
1786 if left_room.room.participants.is_empty() {
1787 self.rooms.remove(&room_id);
1788 }
1789
1790 Ok(Some((room_id, left_room)))
1791 } else {
1792 Ok(None)
1793 }
1794 })
1795 .await
1796 }
1797
1798 pub async fn follow(
1799 &self,
1800 project_id: ProjectId,
1801 leader_connection: ConnectionId,
1802 follower_connection: ConnectionId,
1803 ) -> Result<RoomGuard<proto::Room>> {
1804 let room_id = self.room_id_for_project(project_id).await?;
1805 self.room_transaction(room_id, |tx| async move {
1806 follower::ActiveModel {
1807 room_id: ActiveValue::set(room_id),
1808 project_id: ActiveValue::set(project_id),
1809 leader_connection_server_id: ActiveValue::set(ServerId(
1810 leader_connection.owner_id as i32,
1811 )),
1812 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1813 follower_connection_server_id: ActiveValue::set(ServerId(
1814 follower_connection.owner_id as i32,
1815 )),
1816 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1817 ..Default::default()
1818 }
1819 .insert(&*tx)
1820 .await?;
1821
1822 let room = self.get_room(room_id, &*tx).await?;
1823 Ok(room)
1824 })
1825 .await
1826 }
1827
1828 pub async fn unfollow(
1829 &self,
1830 project_id: ProjectId,
1831 leader_connection: ConnectionId,
1832 follower_connection: ConnectionId,
1833 ) -> Result<RoomGuard<proto::Room>> {
1834 let room_id = self.room_id_for_project(project_id).await?;
1835 self.room_transaction(room_id, |tx| async move {
1836 follower::Entity::delete_many()
1837 .filter(
1838 Condition::all()
1839 .add(follower::Column::ProjectId.eq(project_id))
1840 .add(
1841 follower::Column::LeaderConnectionServerId
1842 .eq(leader_connection.owner_id),
1843 )
1844 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1845 .add(
1846 follower::Column::FollowerConnectionServerId
1847 .eq(follower_connection.owner_id),
1848 )
1849 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1850 )
1851 .exec(&*tx)
1852 .await?;
1853
1854 let room = self.get_room(room_id, &*tx).await?;
1855 Ok(room)
1856 })
1857 .await
1858 }
1859
1860 pub async fn update_room_participant_location(
1861 &self,
1862 room_id: RoomId,
1863 connection: ConnectionId,
1864 location: proto::ParticipantLocation,
1865 ) -> Result<RoomGuard<proto::Room>> {
1866 self.room_transaction(room_id, |tx| async {
1867 let tx = tx;
1868 let location_kind;
1869 let location_project_id;
1870 match location
1871 .variant
1872 .as_ref()
1873 .ok_or_else(|| anyhow!("invalid location"))?
1874 {
1875 proto::participant_location::Variant::SharedProject(project) => {
1876 location_kind = 0;
1877 location_project_id = Some(ProjectId::from_proto(project.id));
1878 }
1879 proto::participant_location::Variant::UnsharedProject(_) => {
1880 location_kind = 1;
1881 location_project_id = None;
1882 }
1883 proto::participant_location::Variant::External(_) => {
1884 location_kind = 2;
1885 location_project_id = None;
1886 }
1887 }
1888
1889 let result = room_participant::Entity::update_many()
1890 .filter(
1891 Condition::all()
1892 .add(room_participant::Column::RoomId.eq(room_id))
1893 .add(
1894 room_participant::Column::AnsweringConnectionId
1895 .eq(connection.id as i32),
1896 )
1897 .add(
1898 room_participant::Column::AnsweringConnectionServerId
1899 .eq(connection.owner_id as i32),
1900 ),
1901 )
1902 .set(room_participant::ActiveModel {
1903 location_kind: ActiveValue::set(Some(location_kind)),
1904 location_project_id: ActiveValue::set(location_project_id),
1905 ..Default::default()
1906 })
1907 .exec(&*tx)
1908 .await?;
1909
1910 if result.rows_affected == 1 {
1911 let room = self.get_room(room_id, &tx).await?;
1912 Ok(room)
1913 } else {
1914 Err(anyhow!("could not update room participant location"))?
1915 }
1916 })
1917 .await
1918 }
1919
1920 pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1921 self.transaction(|tx| async move {
1922 let participant = room_participant::Entity::find()
1923 .filter(
1924 Condition::all()
1925 .add(
1926 room_participant::Column::AnsweringConnectionId
1927 .eq(connection.id as i32),
1928 )
1929 .add(
1930 room_participant::Column::AnsweringConnectionServerId
1931 .eq(connection.owner_id as i32),
1932 ),
1933 )
1934 .one(&*tx)
1935 .await?
1936 .ok_or_else(|| anyhow!("not a participant in any room"))?;
1937
1938 room_participant::Entity::update(room_participant::ActiveModel {
1939 answering_connection_lost: ActiveValue::set(true),
1940 ..participant.into_active_model()
1941 })
1942 .exec(&*tx)
1943 .await?;
1944
1945 Ok(())
1946 })
1947 .await
1948 }
1949
1950 fn build_incoming_call(
1951 room: &proto::Room,
1952 called_user_id: UserId,
1953 ) -> Option<proto::IncomingCall> {
1954 let pending_participant = room
1955 .pending_participants
1956 .iter()
1957 .find(|participant| participant.user_id == called_user_id.to_proto())?;
1958
1959 Some(proto::IncomingCall {
1960 room_id: room.id,
1961 calling_user_id: pending_participant.calling_user_id,
1962 participant_user_ids: room
1963 .participants
1964 .iter()
1965 .map(|participant| participant.user_id)
1966 .collect(),
1967 initial_project: room.participants.iter().find_map(|participant| {
1968 let initial_project_id = pending_participant.initial_project_id?;
1969 participant
1970 .projects
1971 .iter()
1972 .find(|project| project.id == initial_project_id)
1973 .cloned()
1974 }),
1975 })
1976 }
1977
1978 async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1979 let db_room = room::Entity::find_by_id(room_id)
1980 .one(tx)
1981 .await?
1982 .ok_or_else(|| anyhow!("could not find room"))?;
1983
1984 let mut db_participants = db_room
1985 .find_related(room_participant::Entity)
1986 .stream(tx)
1987 .await?;
1988 let mut participants = HashMap::default();
1989 let mut pending_participants = Vec::new();
1990 while let Some(db_participant) = db_participants.next().await {
1991 let db_participant = db_participant?;
1992 if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
1993 .answering_connection_id
1994 .zip(db_participant.answering_connection_server_id)
1995 {
1996 let location = match (
1997 db_participant.location_kind,
1998 db_participant.location_project_id,
1999 ) {
2000 (Some(0), Some(project_id)) => {
2001 Some(proto::participant_location::Variant::SharedProject(
2002 proto::participant_location::SharedProject {
2003 id: project_id.to_proto(),
2004 },
2005 ))
2006 }
2007 (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
2008 Default::default(),
2009 )),
2010 _ => Some(proto::participant_location::Variant::External(
2011 Default::default(),
2012 )),
2013 };
2014
2015 let answering_connection = ConnectionId {
2016 owner_id: answering_connection_server_id.0 as u32,
2017 id: answering_connection_id as u32,
2018 };
2019 participants.insert(
2020 answering_connection,
2021 proto::Participant {
2022 user_id: db_participant.user_id.to_proto(),
2023 peer_id: Some(answering_connection.into()),
2024 projects: Default::default(),
2025 location: Some(proto::ParticipantLocation { variant: location }),
2026 },
2027 );
2028 } else {
2029 pending_participants.push(proto::PendingParticipant {
2030 user_id: db_participant.user_id.to_proto(),
2031 calling_user_id: db_participant.calling_user_id.to_proto(),
2032 initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
2033 });
2034 }
2035 }
2036 drop(db_participants);
2037
2038 let mut db_projects = db_room
2039 .find_related(project::Entity)
2040 .find_with_related(worktree::Entity)
2041 .stream(tx)
2042 .await?;
2043
2044 while let Some(row) = db_projects.next().await {
2045 let (db_project, db_worktree) = row?;
2046 let host_connection = db_project.host_connection()?;
2047 if let Some(participant) = participants.get_mut(&host_connection) {
2048 let project = if let Some(project) = participant
2049 .projects
2050 .iter_mut()
2051 .find(|project| project.id == db_project.id.to_proto())
2052 {
2053 project
2054 } else {
2055 participant.projects.push(proto::ParticipantProject {
2056 id: db_project.id.to_proto(),
2057 worktree_root_names: Default::default(),
2058 });
2059 participant.projects.last_mut().unwrap()
2060 };
2061
2062 if let Some(db_worktree) = db_worktree {
2063 if db_worktree.visible {
2064 project.worktree_root_names.push(db_worktree.root_name);
2065 }
2066 }
2067 }
2068 }
2069 drop(db_projects);
2070
2071 let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
2072 let mut followers = Vec::new();
2073 while let Some(db_follower) = db_followers.next().await {
2074 let db_follower = db_follower?;
2075 followers.push(proto::Follower {
2076 leader_id: Some(db_follower.leader_connection().into()),
2077 follower_id: Some(db_follower.follower_connection().into()),
2078 project_id: db_follower.project_id.to_proto(),
2079 });
2080 }
2081
2082 Ok(proto::Room {
2083 id: db_room.id.to_proto(),
2084 live_kit_room: db_room.live_kit_room,
2085 participants: participants.into_values().collect(),
2086 pending_participants,
2087 followers,
2088 })
2089 }
2090
2091 // projects
2092
2093 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
2094 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2095 enum QueryAs {
2096 Count,
2097 }
2098
2099 self.transaction(|tx| async move {
2100 Ok(project::Entity::find()
2101 .select_only()
2102 .column_as(project::Column::Id.count(), QueryAs::Count)
2103 .inner_join(user::Entity)
2104 .filter(user::Column::Admin.eq(false))
2105 .into_values::<_, QueryAs>()
2106 .one(&*tx)
2107 .await?
2108 .unwrap_or(0i64) as usize)
2109 })
2110 .await
2111 }
2112
2113 pub async fn share_project(
2114 &self,
2115 room_id: RoomId,
2116 connection: ConnectionId,
2117 worktrees: &[proto::WorktreeMetadata],
2118 ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
2119 self.room_transaction(room_id, |tx| async move {
2120 let participant = room_participant::Entity::find()
2121 .filter(
2122 Condition::all()
2123 .add(
2124 room_participant::Column::AnsweringConnectionId
2125 .eq(connection.id as i32),
2126 )
2127 .add(
2128 room_participant::Column::AnsweringConnectionServerId
2129 .eq(connection.owner_id as i32),
2130 ),
2131 )
2132 .one(&*tx)
2133 .await?
2134 .ok_or_else(|| anyhow!("could not find participant"))?;
2135 if participant.room_id != room_id {
2136 return Err(anyhow!("shared project on unexpected room"))?;
2137 }
2138
2139 let project = project::ActiveModel {
2140 room_id: ActiveValue::set(participant.room_id),
2141 host_user_id: ActiveValue::set(participant.user_id),
2142 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
2143 host_connection_server_id: ActiveValue::set(Some(ServerId(
2144 connection.owner_id as i32,
2145 ))),
2146 ..Default::default()
2147 }
2148 .insert(&*tx)
2149 .await?;
2150
2151 if !worktrees.is_empty() {
2152 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2153 worktree::ActiveModel {
2154 id: ActiveValue::set(worktree.id as i64),
2155 project_id: ActiveValue::set(project.id),
2156 abs_path: ActiveValue::set(worktree.abs_path.clone()),
2157 root_name: ActiveValue::set(worktree.root_name.clone()),
2158 visible: ActiveValue::set(worktree.visible),
2159 scan_id: ActiveValue::set(0),
2160 completed_scan_id: ActiveValue::set(0),
2161 }
2162 }))
2163 .exec(&*tx)
2164 .await?;
2165 }
2166
2167 project_collaborator::ActiveModel {
2168 project_id: ActiveValue::set(project.id),
2169 connection_id: ActiveValue::set(connection.id as i32),
2170 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2171 user_id: ActiveValue::set(participant.user_id),
2172 replica_id: ActiveValue::set(ReplicaId(0)),
2173 is_host: ActiveValue::set(true),
2174 ..Default::default()
2175 }
2176 .insert(&*tx)
2177 .await?;
2178
2179 let room = self.get_room(room_id, &tx).await?;
2180 Ok((project.id, room))
2181 })
2182 .await
2183 }
2184
2185 pub async fn unshare_project(
2186 &self,
2187 project_id: ProjectId,
2188 connection: ConnectionId,
2189 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2190 let room_id = self.room_id_for_project(project_id).await?;
2191 self.room_transaction(room_id, |tx| async move {
2192 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2193
2194 let project = project::Entity::find_by_id(project_id)
2195 .one(&*tx)
2196 .await?
2197 .ok_or_else(|| anyhow!("project not found"))?;
2198 if project.host_connection()? == connection {
2199 project::Entity::delete(project.into_active_model())
2200 .exec(&*tx)
2201 .await?;
2202 let room = self.get_room(room_id, &tx).await?;
2203 Ok((room, guest_connection_ids))
2204 } else {
2205 Err(anyhow!("cannot unshare a project hosted by another user"))?
2206 }
2207 })
2208 .await
2209 }
2210
2211 pub async fn update_project(
2212 &self,
2213 project_id: ProjectId,
2214 connection: ConnectionId,
2215 worktrees: &[proto::WorktreeMetadata],
2216 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2217 let room_id = self.room_id_for_project(project_id).await?;
2218 self.room_transaction(room_id, |tx| async move {
2219 let project = project::Entity::find_by_id(project_id)
2220 .filter(
2221 Condition::all()
2222 .add(project::Column::HostConnectionId.eq(connection.id as i32))
2223 .add(
2224 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2225 ),
2226 )
2227 .one(&*tx)
2228 .await?
2229 .ok_or_else(|| anyhow!("no such project"))?;
2230
2231 self.update_project_worktrees(project.id, worktrees, &tx)
2232 .await?;
2233
2234 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2235 let room = self.get_room(project.room_id, &tx).await?;
2236 Ok((room, guest_connection_ids))
2237 })
2238 .await
2239 }
2240
2241 async fn update_project_worktrees(
2242 &self,
2243 project_id: ProjectId,
2244 worktrees: &[proto::WorktreeMetadata],
2245 tx: &DatabaseTransaction,
2246 ) -> Result<()> {
2247 if !worktrees.is_empty() {
2248 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2249 id: ActiveValue::set(worktree.id as i64),
2250 project_id: ActiveValue::set(project_id),
2251 abs_path: ActiveValue::set(worktree.abs_path.clone()),
2252 root_name: ActiveValue::set(worktree.root_name.clone()),
2253 visible: ActiveValue::set(worktree.visible),
2254 scan_id: ActiveValue::set(0),
2255 completed_scan_id: ActiveValue::set(0),
2256 }))
2257 .on_conflict(
2258 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2259 .update_column(worktree::Column::RootName)
2260 .to_owned(),
2261 )
2262 .exec(&*tx)
2263 .await?;
2264 }
2265
2266 worktree::Entity::delete_many()
2267 .filter(worktree::Column::ProjectId.eq(project_id).and(
2268 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2269 ))
2270 .exec(&*tx)
2271 .await?;
2272
2273 Ok(())
2274 }
2275
2276 pub async fn update_worktree(
2277 &self,
2278 update: &proto::UpdateWorktree,
2279 connection: ConnectionId,
2280 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2281 let project_id = ProjectId::from_proto(update.project_id);
2282 let worktree_id = update.worktree_id as i64;
2283 let room_id = self.room_id_for_project(project_id).await?;
2284 self.room_transaction(room_id, |tx| async move {
2285 // Ensure the update comes from the host.
2286 let _project = project::Entity::find_by_id(project_id)
2287 .filter(
2288 Condition::all()
2289 .add(project::Column::HostConnectionId.eq(connection.id as i32))
2290 .add(
2291 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2292 ),
2293 )
2294 .one(&*tx)
2295 .await?
2296 .ok_or_else(|| anyhow!("no such project"))?;
2297
2298 // Update metadata.
2299 worktree::Entity::update(worktree::ActiveModel {
2300 id: ActiveValue::set(worktree_id),
2301 project_id: ActiveValue::set(project_id),
2302 root_name: ActiveValue::set(update.root_name.clone()),
2303 scan_id: ActiveValue::set(update.scan_id as i64),
2304 completed_scan_id: if update.is_last_update {
2305 ActiveValue::set(update.scan_id as i64)
2306 } else {
2307 ActiveValue::default()
2308 },
2309 abs_path: ActiveValue::set(update.abs_path.clone()),
2310 ..Default::default()
2311 })
2312 .exec(&*tx)
2313 .await?;
2314
2315 if !update.updated_entries.is_empty() {
2316 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2317 let mtime = entry.mtime.clone().unwrap_or_default();
2318 worktree_entry::ActiveModel {
2319 project_id: ActiveValue::set(project_id),
2320 worktree_id: ActiveValue::set(worktree_id),
2321 id: ActiveValue::set(entry.id as i64),
2322 is_dir: ActiveValue::set(entry.is_dir),
2323 path: ActiveValue::set(entry.path.clone()),
2324 inode: ActiveValue::set(entry.inode as i64),
2325 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2326 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2327 is_symlink: ActiveValue::set(entry.is_symlink),
2328 is_ignored: ActiveValue::set(entry.is_ignored),
2329 is_deleted: ActiveValue::set(false),
2330 scan_id: ActiveValue::set(update.scan_id as i64),
2331 }
2332 }))
2333 .on_conflict(
2334 OnConflict::columns([
2335 worktree_entry::Column::ProjectId,
2336 worktree_entry::Column::WorktreeId,
2337 worktree_entry::Column::Id,
2338 ])
2339 .update_columns([
2340 worktree_entry::Column::IsDir,
2341 worktree_entry::Column::Path,
2342 worktree_entry::Column::Inode,
2343 worktree_entry::Column::MtimeSeconds,
2344 worktree_entry::Column::MtimeNanos,
2345 worktree_entry::Column::IsSymlink,
2346 worktree_entry::Column::IsIgnored,
2347 worktree_entry::Column::ScanId,
2348 ])
2349 .to_owned(),
2350 )
2351 .exec(&*tx)
2352 .await?;
2353 }
2354
2355 if !update.removed_entries.is_empty() {
2356 worktree_entry::Entity::update_many()
2357 .filter(
2358 worktree_entry::Column::ProjectId
2359 .eq(project_id)
2360 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2361 .and(
2362 worktree_entry::Column::Id
2363 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2364 ),
2365 )
2366 .set(worktree_entry::ActiveModel {
2367 is_deleted: ActiveValue::Set(true),
2368 scan_id: ActiveValue::Set(update.scan_id as i64),
2369 ..Default::default()
2370 })
2371 .exec(&*tx)
2372 .await?;
2373 }
2374
2375 if !update.updated_repositories.is_empty() {
2376 worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
2377 |repository| worktree_repository::ActiveModel {
2378 project_id: ActiveValue::set(project_id),
2379 worktree_id: ActiveValue::set(worktree_id),
2380 dot_git_entry_id: ActiveValue::set(repository.dot_git_entry_id as i64),
2381 work_directory_path: ActiveValue::set(repository.work_directory.clone()),
2382 scan_id: ActiveValue::set(update.scan_id as i64),
2383 branch: ActiveValue::set(repository.branch.clone()),
2384 is_deleted: ActiveValue::set(false),
2385 },
2386 ))
2387 .on_conflict(
2388 OnConflict::columns([
2389 worktree_repository::Column::ProjectId,
2390 worktree_repository::Column::WorktreeId,
2391 worktree_repository::Column::DotGitEntryId,
2392 ])
2393 .update_columns([
2394 worktree_repository::Column::ScanId,
2395 worktree_repository::Column::WorkDirectoryPath,
2396 worktree_repository::Column::Branch,
2397 ])
2398 .to_owned(),
2399 )
2400 .exec(&*tx)
2401 .await?;
2402 }
2403
2404 if !update.removed_repositories.is_empty() {
2405 worktree_repository::Entity::update_many()
2406 .filter(
2407 worktree_repository::Column::ProjectId
2408 .eq(project_id)
2409 .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
2410 .and(
2411 worktree_repository::Column::DotGitEntryId
2412 .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
2413 ),
2414 )
2415 .set(worktree_repository::ActiveModel {
2416 is_deleted: ActiveValue::Set(true),
2417 scan_id: ActiveValue::Set(update.scan_id as i64),
2418 ..Default::default()
2419 })
2420 .exec(&*tx)
2421 .await?;
2422 }
2423
2424 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2425 Ok(connection_ids)
2426 })
2427 .await
2428 }
2429
2430 pub async fn update_diagnostic_summary(
2431 &self,
2432 update: &proto::UpdateDiagnosticSummary,
2433 connection: ConnectionId,
2434 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2435 let project_id = ProjectId::from_proto(update.project_id);
2436 let worktree_id = update.worktree_id as i64;
2437 let room_id = self.room_id_for_project(project_id).await?;
2438 self.room_transaction(room_id, |tx| async move {
2439 let summary = update
2440 .summary
2441 .as_ref()
2442 .ok_or_else(|| anyhow!("invalid summary"))?;
2443
2444 // Ensure the update comes from the host.
2445 let project = project::Entity::find_by_id(project_id)
2446 .one(&*tx)
2447 .await?
2448 .ok_or_else(|| anyhow!("no such project"))?;
2449 if project.host_connection()? != connection {
2450 return Err(anyhow!("can't update a project hosted by someone else"))?;
2451 }
2452
2453 // Update summary.
2454 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2455 project_id: ActiveValue::set(project_id),
2456 worktree_id: ActiveValue::set(worktree_id),
2457 path: ActiveValue::set(summary.path.clone()),
2458 language_server_id: ActiveValue::set(summary.language_server_id as i64),
2459 error_count: ActiveValue::set(summary.error_count as i32),
2460 warning_count: ActiveValue::set(summary.warning_count as i32),
2461 ..Default::default()
2462 })
2463 .on_conflict(
2464 OnConflict::columns([
2465 worktree_diagnostic_summary::Column::ProjectId,
2466 worktree_diagnostic_summary::Column::WorktreeId,
2467 worktree_diagnostic_summary::Column::Path,
2468 ])
2469 .update_columns([
2470 worktree_diagnostic_summary::Column::LanguageServerId,
2471 worktree_diagnostic_summary::Column::ErrorCount,
2472 worktree_diagnostic_summary::Column::WarningCount,
2473 ])
2474 .to_owned(),
2475 )
2476 .exec(&*tx)
2477 .await?;
2478
2479 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2480 Ok(connection_ids)
2481 })
2482 .await
2483 }
2484
2485 pub async fn start_language_server(
2486 &self,
2487 update: &proto::StartLanguageServer,
2488 connection: ConnectionId,
2489 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2490 let project_id = ProjectId::from_proto(update.project_id);
2491 let room_id = self.room_id_for_project(project_id).await?;
2492 self.room_transaction(room_id, |tx| async move {
2493 let server = update
2494 .server
2495 .as_ref()
2496 .ok_or_else(|| anyhow!("invalid language server"))?;
2497
2498 // Ensure the update comes from the host.
2499 let project = project::Entity::find_by_id(project_id)
2500 .one(&*tx)
2501 .await?
2502 .ok_or_else(|| anyhow!("no such project"))?;
2503 if project.host_connection()? != connection {
2504 return Err(anyhow!("can't update a project hosted by someone else"))?;
2505 }
2506
2507 // Add the newly-started language server.
2508 language_server::Entity::insert(language_server::ActiveModel {
2509 project_id: ActiveValue::set(project_id),
2510 id: ActiveValue::set(server.id as i64),
2511 name: ActiveValue::set(server.name.clone()),
2512 ..Default::default()
2513 })
2514 .on_conflict(
2515 OnConflict::columns([
2516 language_server::Column::ProjectId,
2517 language_server::Column::Id,
2518 ])
2519 .update_column(language_server::Column::Name)
2520 .to_owned(),
2521 )
2522 .exec(&*tx)
2523 .await?;
2524
2525 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2526 Ok(connection_ids)
2527 })
2528 .await
2529 }
2530
2531 pub async fn join_project(
2532 &self,
2533 project_id: ProjectId,
2534 connection: ConnectionId,
2535 ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2536 let room_id = self.room_id_for_project(project_id).await?;
2537 self.room_transaction(room_id, |tx| async move {
2538 let participant = room_participant::Entity::find()
2539 .filter(
2540 Condition::all()
2541 .add(
2542 room_participant::Column::AnsweringConnectionId
2543 .eq(connection.id as i32),
2544 )
2545 .add(
2546 room_participant::Column::AnsweringConnectionServerId
2547 .eq(connection.owner_id as i32),
2548 ),
2549 )
2550 .one(&*tx)
2551 .await?
2552 .ok_or_else(|| anyhow!("must join a room first"))?;
2553
2554 let project = project::Entity::find_by_id(project_id)
2555 .one(&*tx)
2556 .await?
2557 .ok_or_else(|| anyhow!("no such project"))?;
2558 if project.room_id != participant.room_id {
2559 return Err(anyhow!("no such project"))?;
2560 }
2561
2562 let mut collaborators = project
2563 .find_related(project_collaborator::Entity)
2564 .all(&*tx)
2565 .await?;
2566 let replica_ids = collaborators
2567 .iter()
2568 .map(|c| c.replica_id)
2569 .collect::<HashSet<_>>();
2570 let mut replica_id = ReplicaId(1);
2571 while replica_ids.contains(&replica_id) {
2572 replica_id.0 += 1;
2573 }
2574 let new_collaborator = project_collaborator::ActiveModel {
2575 project_id: ActiveValue::set(project_id),
2576 connection_id: ActiveValue::set(connection.id as i32),
2577 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2578 user_id: ActiveValue::set(participant.user_id),
2579 replica_id: ActiveValue::set(replica_id),
2580 is_host: ActiveValue::set(false),
2581 ..Default::default()
2582 }
2583 .insert(&*tx)
2584 .await?;
2585 collaborators.push(new_collaborator);
2586
2587 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2588 let mut worktrees = db_worktrees
2589 .into_iter()
2590 .map(|db_worktree| {
2591 (
2592 db_worktree.id as u64,
2593 Worktree {
2594 id: db_worktree.id as u64,
2595 abs_path: db_worktree.abs_path,
2596 root_name: db_worktree.root_name,
2597 visible: db_worktree.visible,
2598 entries: Default::default(),
2599 repository_entries: Default::default(),
2600 diagnostic_summaries: Default::default(),
2601 scan_id: db_worktree.scan_id as u64,
2602 completed_scan_id: db_worktree.completed_scan_id as u64,
2603 },
2604 )
2605 })
2606 .collect::<BTreeMap<_, _>>();
2607
2608 // Populate worktree entries.
2609 {
2610 let mut db_entries = worktree_entry::Entity::find()
2611 .filter(
2612 Condition::all()
2613 .add(worktree_entry::Column::ProjectId.eq(project_id))
2614 .add(worktree_entry::Column::IsDeleted.eq(false)),
2615 )
2616 .stream(&*tx)
2617 .await?;
2618 while let Some(db_entry) = db_entries.next().await {
2619 let db_entry = db_entry?;
2620 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2621 worktree.entries.push(proto::Entry {
2622 id: db_entry.id as u64,
2623 is_dir: db_entry.is_dir,
2624 path: db_entry.path,
2625 inode: db_entry.inode as u64,
2626 mtime: Some(proto::Timestamp {
2627 seconds: db_entry.mtime_seconds as u64,
2628 nanos: db_entry.mtime_nanos as u32,
2629 }),
2630 is_symlink: db_entry.is_symlink,
2631 is_ignored: db_entry.is_ignored,
2632 });
2633 }
2634 }
2635 }
2636
2637 // Populate repository entries.
2638 {
2639 let mut db_repository_entries = worktree_repository::Entity::find()
2640 .filter(
2641 Condition::all()
2642 .add(worktree_repository::Column::ProjectId.eq(project_id))
2643 .add(worktree_repository::Column::IsDeleted.eq(false)),
2644 )
2645 .stream(&*tx)
2646 .await?;
2647 while let Some(db_repository_entry) = db_repository_entries.next().await {
2648 let db_repository_entry = db_repository_entry?;
2649 if let Some(worktree) =
2650 worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
2651 {
2652 worktree.repository_entries.push(proto::RepositoryEntry {
2653 dot_git_entry_id: db_repository_entry.dot_git_entry_id as u64,
2654 scan_id: db_repository_entry.scan_id as u64,
2655 work_directory: db_repository_entry.work_directory_path,
2656 branch: db_repository_entry.branch,
2657 });
2658 }
2659 }
2660 }
2661
2662 // Populate worktree diagnostic summaries.
2663 {
2664 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2665 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2666 .stream(&*tx)
2667 .await?;
2668 while let Some(db_summary) = db_summaries.next().await {
2669 let db_summary = db_summary?;
2670 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2671 worktree
2672 .diagnostic_summaries
2673 .push(proto::DiagnosticSummary {
2674 path: db_summary.path,
2675 language_server_id: db_summary.language_server_id as u64,
2676 error_count: db_summary.error_count as u32,
2677 warning_count: db_summary.warning_count as u32,
2678 });
2679 }
2680 }
2681 }
2682
2683 // Populate language servers.
2684 let language_servers = project
2685 .find_related(language_server::Entity)
2686 .all(&*tx)
2687 .await?;
2688
2689 let project = Project {
2690 collaborators: collaborators
2691 .into_iter()
2692 .map(|collaborator| ProjectCollaborator {
2693 connection_id: collaborator.connection(),
2694 user_id: collaborator.user_id,
2695 replica_id: collaborator.replica_id,
2696 is_host: collaborator.is_host,
2697 })
2698 .collect(),
2699 worktrees,
2700 language_servers: language_servers
2701 .into_iter()
2702 .map(|language_server| proto::LanguageServer {
2703 id: language_server.id as u64,
2704 name: language_server.name,
2705 })
2706 .collect(),
2707 };
2708 Ok((project, replica_id as ReplicaId))
2709 })
2710 .await
2711 }
2712
2713 pub async fn leave_project(
2714 &self,
2715 project_id: ProjectId,
2716 connection: ConnectionId,
2717 ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
2718 let room_id = self.room_id_for_project(project_id).await?;
2719 self.room_transaction(room_id, |tx| async move {
2720 let result = project_collaborator::Entity::delete_many()
2721 .filter(
2722 Condition::all()
2723 .add(project_collaborator::Column::ProjectId.eq(project_id))
2724 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2725 .add(
2726 project_collaborator::Column::ConnectionServerId
2727 .eq(connection.owner_id as i32),
2728 ),
2729 )
2730 .exec(&*tx)
2731 .await?;
2732 if result.rows_affected == 0 {
2733 Err(anyhow!("not a collaborator on this project"))?;
2734 }
2735
2736 let project = project::Entity::find_by_id(project_id)
2737 .one(&*tx)
2738 .await?
2739 .ok_or_else(|| anyhow!("no such project"))?;
2740 let collaborators = project
2741 .find_related(project_collaborator::Entity)
2742 .all(&*tx)
2743 .await?;
2744 let connection_ids = collaborators
2745 .into_iter()
2746 .map(|collaborator| collaborator.connection())
2747 .collect();
2748
2749 follower::Entity::delete_many()
2750 .filter(
2751 Condition::any()
2752 .add(
2753 Condition::all()
2754 .add(follower::Column::ProjectId.eq(project_id))
2755 .add(
2756 follower::Column::LeaderConnectionServerId
2757 .eq(connection.owner_id),
2758 )
2759 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
2760 )
2761 .add(
2762 Condition::all()
2763 .add(follower::Column::ProjectId.eq(project_id))
2764 .add(
2765 follower::Column::FollowerConnectionServerId
2766 .eq(connection.owner_id),
2767 )
2768 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
2769 ),
2770 )
2771 .exec(&*tx)
2772 .await?;
2773
2774 let room = self.get_room(project.room_id, &tx).await?;
2775 let left_project = LeftProject {
2776 id: project_id,
2777 host_user_id: project.host_user_id,
2778 host_connection_id: project.host_connection()?,
2779 connection_ids,
2780 };
2781 Ok((room, left_project))
2782 })
2783 .await
2784 }
2785
2786 pub async fn project_collaborators(
2787 &self,
2788 project_id: ProjectId,
2789 connection_id: ConnectionId,
2790 ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
2791 let room_id = self.room_id_for_project(project_id).await?;
2792 self.room_transaction(room_id, |tx| async move {
2793 let collaborators = project_collaborator::Entity::find()
2794 .filter(project_collaborator::Column::ProjectId.eq(project_id))
2795 .all(&*tx)
2796 .await?
2797 .into_iter()
2798 .map(|collaborator| ProjectCollaborator {
2799 connection_id: collaborator.connection(),
2800 user_id: collaborator.user_id,
2801 replica_id: collaborator.replica_id,
2802 is_host: collaborator.is_host,
2803 })
2804 .collect::<Vec<_>>();
2805
2806 if collaborators
2807 .iter()
2808 .any(|collaborator| collaborator.connection_id == connection_id)
2809 {
2810 Ok(collaborators)
2811 } else {
2812 Err(anyhow!("no such project"))?
2813 }
2814 })
2815 .await
2816 }
2817
2818 pub async fn project_connection_ids(
2819 &self,
2820 project_id: ProjectId,
2821 connection_id: ConnectionId,
2822 ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
2823 let room_id = self.room_id_for_project(project_id).await?;
2824 self.room_transaction(room_id, |tx| async move {
2825 let mut collaborators = project_collaborator::Entity::find()
2826 .filter(project_collaborator::Column::ProjectId.eq(project_id))
2827 .stream(&*tx)
2828 .await?;
2829
2830 let mut connection_ids = HashSet::default();
2831 while let Some(collaborator) = collaborators.next().await {
2832 let collaborator = collaborator?;
2833 connection_ids.insert(collaborator.connection());
2834 }
2835
2836 if connection_ids.contains(&connection_id) {
2837 Ok(connection_ids)
2838 } else {
2839 Err(anyhow!("no such project"))?
2840 }
2841 })
2842 .await
2843 }
2844
2845 async fn project_guest_connection_ids(
2846 &self,
2847 project_id: ProjectId,
2848 tx: &DatabaseTransaction,
2849 ) -> Result<Vec<ConnectionId>> {
2850 let mut collaborators = project_collaborator::Entity::find()
2851 .filter(
2852 project_collaborator::Column::ProjectId
2853 .eq(project_id)
2854 .and(project_collaborator::Column::IsHost.eq(false)),
2855 )
2856 .stream(tx)
2857 .await?;
2858
2859 let mut guest_connection_ids = Vec::new();
2860 while let Some(collaborator) = collaborators.next().await {
2861 let collaborator = collaborator?;
2862 guest_connection_ids.push(collaborator.connection());
2863 }
2864 Ok(guest_connection_ids)
2865 }
2866
2867 async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
2868 self.transaction(|tx| async move {
2869 let project = project::Entity::find_by_id(project_id)
2870 .one(&*tx)
2871 .await?
2872 .ok_or_else(|| anyhow!("project {} not found", project_id))?;
2873 Ok(project.room_id)
2874 })
2875 .await
2876 }
2877
2878 // access tokens
2879
2880 pub async fn create_access_token(
2881 &self,
2882 user_id: UserId,
2883 access_token_hash: &str,
2884 max_access_token_count: usize,
2885 ) -> Result<AccessTokenId> {
2886 self.transaction(|tx| async {
2887 let tx = tx;
2888
2889 let token = access_token::ActiveModel {
2890 user_id: ActiveValue::set(user_id),
2891 hash: ActiveValue::set(access_token_hash.into()),
2892 ..Default::default()
2893 }
2894 .insert(&*tx)
2895 .await?;
2896
2897 access_token::Entity::delete_many()
2898 .filter(
2899 access_token::Column::Id.in_subquery(
2900 Query::select()
2901 .column(access_token::Column::Id)
2902 .from(access_token::Entity)
2903 .and_where(access_token::Column::UserId.eq(user_id))
2904 .order_by(access_token::Column::Id, sea_orm::Order::Desc)
2905 .limit(10000)
2906 .offset(max_access_token_count as u64)
2907 .to_owned(),
2908 ),
2909 )
2910 .exec(&*tx)
2911 .await?;
2912 Ok(token.id)
2913 })
2914 .await
2915 }
2916
2917 pub async fn get_access_token(
2918 &self,
2919 access_token_id: AccessTokenId,
2920 ) -> Result<access_token::Model> {
2921 self.transaction(|tx| async move {
2922 Ok(access_token::Entity::find_by_id(access_token_id)
2923 .one(&*tx)
2924 .await?
2925 .ok_or_else(|| anyhow!("no such access token"))?)
2926 })
2927 .await
2928 }
2929
2930 async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
2931 where
2932 F: Send + Fn(TransactionHandle) -> Fut,
2933 Fut: Send + Future<Output = Result<T>>,
2934 {
2935 let body = async {
2936 let mut i = 0;
2937 loop {
2938 let (tx, result) = self.with_transaction(&f).await?;
2939 match result {
2940 Ok(result) => match tx.commit().await.map_err(Into::into) {
2941 Ok(()) => return Ok(result),
2942 Err(error) => {
2943 if !self.retry_on_serialization_error(&error, i).await {
2944 return Err(error);
2945 }
2946 }
2947 },
2948 Err(error) => {
2949 tx.rollback().await?;
2950 if !self.retry_on_serialization_error(&error, i).await {
2951 return Err(error);
2952 }
2953 }
2954 }
2955 i += 1;
2956 }
2957 };
2958
2959 self.run(body).await
2960 }
2961
2962 async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
2963 where
2964 F: Send + Fn(TransactionHandle) -> Fut,
2965 Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
2966 {
2967 let body = async {
2968 let mut i = 0;
2969 loop {
2970 let (tx, result) = self.with_transaction(&f).await?;
2971 match result {
2972 Ok(Some((room_id, data))) => {
2973 let lock = self.rooms.entry(room_id).or_default().clone();
2974 let _guard = lock.lock_owned().await;
2975 match tx.commit().await.map_err(Into::into) {
2976 Ok(()) => {
2977 return Ok(Some(RoomGuard {
2978 data,
2979 _guard,
2980 _not_send: PhantomData,
2981 }));
2982 }
2983 Err(error) => {
2984 if !self.retry_on_serialization_error(&error, i).await {
2985 return Err(error);
2986 }
2987 }
2988 }
2989 }
2990 Ok(None) => match tx.commit().await.map_err(Into::into) {
2991 Ok(()) => return Ok(None),
2992 Err(error) => {
2993 if !self.retry_on_serialization_error(&error, i).await {
2994 return Err(error);
2995 }
2996 }
2997 },
2998 Err(error) => {
2999 tx.rollback().await?;
3000 if !self.retry_on_serialization_error(&error, i).await {
3001 return Err(error);
3002 }
3003 }
3004 }
3005 i += 1;
3006 }
3007 };
3008
3009 self.run(body).await
3010 }
3011
3012 async fn room_transaction<F, Fut, T>(&self, room_id: RoomId, f: F) -> Result<RoomGuard<T>>
3013 where
3014 F: Send + Fn(TransactionHandle) -> Fut,
3015 Fut: Send + Future<Output = Result<T>>,
3016 {
3017 let body = async {
3018 let mut i = 0;
3019 loop {
3020 let lock = self.rooms.entry(room_id).or_default().clone();
3021 let _guard = lock.lock_owned().await;
3022 let (tx, result) = self.with_transaction(&f).await?;
3023 match result {
3024 Ok(data) => match tx.commit().await.map_err(Into::into) {
3025 Ok(()) => {
3026 return Ok(RoomGuard {
3027 data,
3028 _guard,
3029 _not_send: PhantomData,
3030 });
3031 }
3032 Err(error) => {
3033 if !self.retry_on_serialization_error(&error, i).await {
3034 return Err(error);
3035 }
3036 }
3037 },
3038 Err(error) => {
3039 tx.rollback().await?;
3040 if !self.retry_on_serialization_error(&error, i).await {
3041 return Err(error);
3042 }
3043 }
3044 }
3045 i += 1;
3046 }
3047 };
3048
3049 self.run(body).await
3050 }
3051
3052 async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
3053 where
3054 F: Send + Fn(TransactionHandle) -> Fut,
3055 Fut: Send + Future<Output = Result<T>>,
3056 {
3057 let tx = self
3058 .pool
3059 .begin_with_config(Some(IsolationLevel::Serializable), None)
3060 .await?;
3061
3062 let mut tx = Arc::new(Some(tx));
3063 let result = f(TransactionHandle(tx.clone())).await;
3064 let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
3065 return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
3066 };
3067
3068 Ok((tx, result))
3069 }
3070
3071 async fn run<F, T>(&self, future: F) -> Result<T>
3072 where
3073 F: Future<Output = Result<T>>,
3074 {
3075 #[cfg(test)]
3076 {
3077 if let Executor::Deterministic(executor) = &self.executor {
3078 executor.simulate_random_delay().await;
3079 }
3080
3081 self.runtime.as_ref().unwrap().block_on(future)
3082 }
3083
3084 #[cfg(not(test))]
3085 {
3086 future.await
3087 }
3088 }
3089
3090 async fn retry_on_serialization_error(&self, error: &Error, prev_attempt_count: u32) -> bool {
3091 // If the error is due to a failure to serialize concurrent transactions, then retry
3092 // this transaction after a delay. With each subsequent retry, double the delay duration.
3093 // Also vary the delay randomly in order to ensure different database connections retry
3094 // at different times.
3095 if is_serialization_error(error) {
3096 let base_delay = 4_u64 << prev_attempt_count.min(16);
3097 let randomized_delay = base_delay as f32 * self.rng.lock().await.gen_range(0.5..=2.0);
3098 log::info!(
3099 "retrying transaction after serialization error. delay: {} ms.",
3100 randomized_delay
3101 );
3102 self.executor
3103 .sleep(Duration::from_millis(randomized_delay as u64))
3104 .await;
3105 true
3106 } else {
3107 false
3108 }
3109 }
3110}
3111
3112fn is_serialization_error(error: &Error) -> bool {
3113 const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
3114 match error {
3115 Error::Database(
3116 DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
3117 | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
3118 ) if error
3119 .as_database_error()
3120 .and_then(|error| error.code())
3121 .as_deref()
3122 == Some(SERIALIZATION_FAILURE_CODE) =>
3123 {
3124 true
3125 }
3126 _ => false,
3127 }
3128}
3129
3130struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
3131
3132impl Deref for TransactionHandle {
3133 type Target = DatabaseTransaction;
3134
3135 fn deref(&self) -> &Self::Target {
3136 self.0.as_ref().as_ref().unwrap()
3137 }
3138}
3139
3140pub struct RoomGuard<T> {
3141 data: T,
3142 _guard: OwnedMutexGuard<()>,
3143 _not_send: PhantomData<Rc<()>>,
3144}
3145
3146impl<T> Deref for RoomGuard<T> {
3147 type Target = T;
3148
3149 fn deref(&self) -> &T {
3150 &self.data
3151 }
3152}
3153
3154impl<T> DerefMut for RoomGuard<T> {
3155 fn deref_mut(&mut self) -> &mut T {
3156 &mut self.data
3157 }
3158}
3159
3160#[derive(Debug, Serialize, Deserialize)]
3161pub struct NewUserParams {
3162 pub github_login: String,
3163 pub github_user_id: i32,
3164 pub invite_count: i32,
3165}
3166
3167#[derive(Debug)]
3168pub struct NewUserResult {
3169 pub user_id: UserId,
3170 pub metrics_id: String,
3171 pub inviting_user_id: Option<UserId>,
3172 pub signup_device_id: Option<String>,
3173}
3174
3175fn random_invite_code() -> String {
3176 nanoid::nanoid!(16)
3177}
3178
3179fn random_email_confirmation_code() -> String {
3180 nanoid::nanoid!(64)
3181}
3182
3183macro_rules! id_type {
3184 ($name:ident) => {
3185 #[derive(
3186 Clone,
3187 Copy,
3188 Debug,
3189 Default,
3190 PartialEq,
3191 Eq,
3192 PartialOrd,
3193 Ord,
3194 Hash,
3195 Serialize,
3196 Deserialize,
3197 )]
3198 #[serde(transparent)]
3199 pub struct $name(pub i32);
3200
3201 impl $name {
3202 #[allow(unused)]
3203 pub const MAX: Self = Self(i32::MAX);
3204
3205 #[allow(unused)]
3206 pub fn from_proto(value: u64) -> Self {
3207 Self(value as i32)
3208 }
3209
3210 #[allow(unused)]
3211 pub fn to_proto(self) -> u64 {
3212 self.0 as u64
3213 }
3214 }
3215
3216 impl std::fmt::Display for $name {
3217 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
3218 self.0.fmt(f)
3219 }
3220 }
3221
3222 impl From<$name> for sea_query::Value {
3223 fn from(value: $name) -> Self {
3224 sea_query::Value::Int(Some(value.0))
3225 }
3226 }
3227
3228 impl sea_orm::TryGetable for $name {
3229 fn try_get(
3230 res: &sea_orm::QueryResult,
3231 pre: &str,
3232 col: &str,
3233 ) -> Result<Self, sea_orm::TryGetError> {
3234 Ok(Self(i32::try_get(res, pre, col)?))
3235 }
3236 }
3237
3238 impl sea_query::ValueType for $name {
3239 fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
3240 match v {
3241 Value::TinyInt(Some(int)) => {
3242 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3243 }
3244 Value::SmallInt(Some(int)) => {
3245 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3246 }
3247 Value::Int(Some(int)) => {
3248 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3249 }
3250 Value::BigInt(Some(int)) => {
3251 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3252 }
3253 Value::TinyUnsigned(Some(int)) => {
3254 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3255 }
3256 Value::SmallUnsigned(Some(int)) => {
3257 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3258 }
3259 Value::Unsigned(Some(int)) => {
3260 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3261 }
3262 Value::BigUnsigned(Some(int)) => {
3263 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3264 }
3265 _ => Err(sea_query::ValueTypeErr),
3266 }
3267 }
3268
3269 fn type_name() -> String {
3270 stringify!($name).into()
3271 }
3272
3273 fn array_type() -> sea_query::ArrayType {
3274 sea_query::ArrayType::Int
3275 }
3276
3277 fn column_type() -> sea_query::ColumnType {
3278 sea_query::ColumnType::Integer(None)
3279 }
3280 }
3281
3282 impl sea_orm::TryFromU64 for $name {
3283 fn try_from_u64(n: u64) -> Result<Self, DbErr> {
3284 Ok(Self(n.try_into().map_err(|_| {
3285 DbErr::ConvertFromU64(concat!(
3286 "error converting ",
3287 stringify!($name),
3288 " to u64"
3289 ))
3290 })?))
3291 }
3292 }
3293
3294 impl sea_query::Nullable for $name {
3295 fn null() -> Value {
3296 Value::Int(None)
3297 }
3298 }
3299 };
3300}
3301
3302id_type!(AccessTokenId);
3303id_type!(ContactId);
3304id_type!(FollowerId);
3305id_type!(RoomId);
3306id_type!(RoomParticipantId);
3307id_type!(ProjectId);
3308id_type!(ProjectCollaboratorId);
3309id_type!(ReplicaId);
3310id_type!(ServerId);
3311id_type!(SignupId);
3312id_type!(UserId);
3313
3314pub struct RejoinedRoom {
3315 pub room: proto::Room,
3316 pub rejoined_projects: Vec<RejoinedProject>,
3317 pub reshared_projects: Vec<ResharedProject>,
3318}
3319
3320pub struct ResharedProject {
3321 pub id: ProjectId,
3322 pub old_connection_id: ConnectionId,
3323 pub collaborators: Vec<ProjectCollaborator>,
3324 pub worktrees: Vec<proto::WorktreeMetadata>,
3325}
3326
3327pub struct RejoinedProject {
3328 pub id: ProjectId,
3329 pub old_connection_id: ConnectionId,
3330 pub collaborators: Vec<ProjectCollaborator>,
3331 pub worktrees: Vec<RejoinedWorktree>,
3332 pub language_servers: Vec<proto::LanguageServer>,
3333}
3334
3335#[derive(Debug)]
3336pub struct RejoinedWorktree {
3337 pub id: u64,
3338 pub abs_path: String,
3339 pub root_name: String,
3340 pub visible: bool,
3341 pub updated_entries: Vec<proto::Entry>,
3342 pub removed_entries: Vec<u64>,
3343 pub updated_repositories: Vec<proto::RepositoryEntry>,
3344 pub removed_repositories: Vec<u64>,
3345 pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3346 pub scan_id: u64,
3347 pub completed_scan_id: u64,
3348}
3349
3350pub struct LeftRoom {
3351 pub room: proto::Room,
3352 pub left_projects: HashMap<ProjectId, LeftProject>,
3353 pub canceled_calls_to_user_ids: Vec<UserId>,
3354}
3355
3356pub struct RefreshedRoom {
3357 pub room: proto::Room,
3358 pub stale_participant_user_ids: Vec<UserId>,
3359 pub canceled_calls_to_user_ids: Vec<UserId>,
3360}
3361
3362pub struct Project {
3363 pub collaborators: Vec<ProjectCollaborator>,
3364 pub worktrees: BTreeMap<u64, Worktree>,
3365 pub language_servers: Vec<proto::LanguageServer>,
3366}
3367
3368pub struct ProjectCollaborator {
3369 pub connection_id: ConnectionId,
3370 pub user_id: UserId,
3371 pub replica_id: ReplicaId,
3372 pub is_host: bool,
3373}
3374
3375impl ProjectCollaborator {
3376 pub fn to_proto(&self) -> proto::Collaborator {
3377 proto::Collaborator {
3378 peer_id: Some(self.connection_id.into()),
3379 replica_id: self.replica_id.0 as u32,
3380 user_id: self.user_id.to_proto(),
3381 }
3382 }
3383}
3384
3385#[derive(Debug)]
3386pub struct LeftProject {
3387 pub id: ProjectId,
3388 pub host_user_id: UserId,
3389 pub host_connection_id: ConnectionId,
3390 pub connection_ids: Vec<ConnectionId>,
3391}
3392
3393pub struct Worktree {
3394 pub id: u64,
3395 pub abs_path: String,
3396 pub root_name: String,
3397 pub visible: bool,
3398 pub entries: Vec<proto::Entry>,
3399 pub repository_entries: Vec<proto::RepositoryEntry>,
3400 pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3401 pub scan_id: u64,
3402 pub completed_scan_id: u64,
3403}
3404
3405#[cfg(test)]
3406pub use test::*;
3407
3408#[cfg(test)]
3409mod test {
3410 use super::*;
3411 use gpui::executor::Background;
3412 use lazy_static::lazy_static;
3413 use parking_lot::Mutex;
3414 use sea_orm::ConnectionTrait;
3415 use sqlx::migrate::MigrateDatabase;
3416 use std::sync::Arc;
3417
3418 pub struct TestDb {
3419 pub db: Option<Arc<Database>>,
3420 pub connection: Option<sqlx::AnyConnection>,
3421 }
3422
3423 impl TestDb {
3424 pub fn sqlite(background: Arc<Background>) -> Self {
3425 let url = format!("sqlite::memory:");
3426 let runtime = tokio::runtime::Builder::new_current_thread()
3427 .enable_io()
3428 .enable_time()
3429 .build()
3430 .unwrap();
3431
3432 let mut db = runtime.block_on(async {
3433 let mut options = ConnectOptions::new(url);
3434 options.max_connections(5);
3435 let db = Database::new(options, Executor::Deterministic(background))
3436 .await
3437 .unwrap();
3438 let sql = include_str!(concat!(
3439 env!("CARGO_MANIFEST_DIR"),
3440 "/migrations.sqlite/20221109000000_test_schema.sql"
3441 ));
3442 db.pool
3443 .execute(sea_orm::Statement::from_string(
3444 db.pool.get_database_backend(),
3445 sql.into(),
3446 ))
3447 .await
3448 .unwrap();
3449 db
3450 });
3451
3452 db.runtime = Some(runtime);
3453
3454 Self {
3455 db: Some(Arc::new(db)),
3456 connection: None,
3457 }
3458 }
3459
3460 pub fn postgres(background: Arc<Background>) -> Self {
3461 lazy_static! {
3462 static ref LOCK: Mutex<()> = Mutex::new(());
3463 }
3464
3465 let _guard = LOCK.lock();
3466 let mut rng = StdRng::from_entropy();
3467 let url = format!(
3468 "postgres://postgres@localhost/zed-test-{}",
3469 rng.gen::<u128>()
3470 );
3471 let runtime = tokio::runtime::Builder::new_current_thread()
3472 .enable_io()
3473 .enable_time()
3474 .build()
3475 .unwrap();
3476
3477 let mut db = runtime.block_on(async {
3478 sqlx::Postgres::create_database(&url)
3479 .await
3480 .expect("failed to create test db");
3481 let mut options = ConnectOptions::new(url);
3482 options
3483 .max_connections(5)
3484 .idle_timeout(Duration::from_secs(0));
3485 let db = Database::new(options, Executor::Deterministic(background))
3486 .await
3487 .unwrap();
3488 let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
3489 db.migrate(Path::new(migrations_path), false).await.unwrap();
3490 db
3491 });
3492
3493 db.runtime = Some(runtime);
3494
3495 Self {
3496 db: Some(Arc::new(db)),
3497 connection: None,
3498 }
3499 }
3500
3501 pub fn db(&self) -> &Arc<Database> {
3502 self.db.as_ref().unwrap()
3503 }
3504 }
3505
3506 impl Drop for TestDb {
3507 fn drop(&mut self) {
3508 let db = self.db.take().unwrap();
3509 if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
3510 db.runtime.as_ref().unwrap().block_on(async {
3511 use util::ResultExt;
3512 let query = "
3513 SELECT pg_terminate_backend(pg_stat_activity.pid)
3514 FROM pg_stat_activity
3515 WHERE
3516 pg_stat_activity.datname = current_database() AND
3517 pid <> pg_backend_pid();
3518 ";
3519 db.pool
3520 .execute(sea_orm::Statement::from_string(
3521 db.pool.get_database_backend(),
3522 query.into(),
3523 ))
3524 .await
3525 .log_err();
3526 sqlx::Postgres::drop_database(db.options.get_url())
3527 .await
3528 .log_err();
3529 })
3530 }
3531 }
3532 }
3533}