1mod access_token;
2mod contact;
3mod follower;
4mod language_server;
5mod project;
6mod project_collaborator;
7mod room;
8mod room_participant;
9mod server;
10mod signup;
11#[cfg(test)]
12mod tests;
13mod user;
14mod worktree;
15mod worktree_diagnostic_summary;
16mod worktree_entry;
17mod worktree_repository;
18
19use crate::executor::Executor;
20use crate::{Error, Result};
21use anyhow::anyhow;
22use collections::{BTreeMap, HashMap, HashSet};
23pub use contact::Contact;
24use dashmap::DashMap;
25use futures::StreamExt;
26use hyper::StatusCode;
27use rand::prelude::StdRng;
28use rand::{Rng, SeedableRng};
29use rpc::{proto, ConnectionId};
30use sea_orm::Condition;
31pub use sea_orm::ConnectOptions;
32use sea_orm::{
33 entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
34 DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
35 Statement, TransactionTrait,
36};
37use sea_query::{Alias, Expr, OnConflict, Query};
38use serde::{Deserialize, Serialize};
39pub use signup::{Invite, NewSignup, WaitlistSummary};
40use sqlx::migrate::{Migrate, Migration, MigrationSource};
41use sqlx::Connection;
42use std::ops::{Deref, DerefMut};
43use std::path::Path;
44use std::time::Duration;
45use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
46use tokio::sync::{Mutex, OwnedMutexGuard};
47pub use user::Model as User;
48
49pub struct Database {
50 options: ConnectOptions,
51 pool: DatabaseConnection,
52 rooms: DashMap<RoomId, Arc<Mutex<()>>>,
53 rng: Mutex<StdRng>,
54 executor: Executor,
55 #[cfg(test)]
56 runtime: Option<tokio::runtime::Runtime>,
57}
58
59impl Database {
60 pub async fn new(options: ConnectOptions, executor: Executor) -> Result<Self> {
61 Ok(Self {
62 options: options.clone(),
63 pool: sea_orm::Database::connect(options).await?,
64 rooms: DashMap::with_capacity(16384),
65 rng: Mutex::new(StdRng::seed_from_u64(0)),
66 executor,
67 #[cfg(test)]
68 runtime: None,
69 })
70 }
71
72 #[cfg(test)]
73 pub fn reset(&self) {
74 self.rooms.clear();
75 }
76
77 pub async fn migrate(
78 &self,
79 migrations_path: &Path,
80 ignore_checksum_mismatch: bool,
81 ) -> anyhow::Result<Vec<(Migration, Duration)>> {
82 let migrations = MigrationSource::resolve(migrations_path)
83 .await
84 .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
85
86 let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
87
88 connection.ensure_migrations_table().await?;
89 let applied_migrations: HashMap<_, _> = connection
90 .list_applied_migrations()
91 .await?
92 .into_iter()
93 .map(|m| (m.version, m))
94 .collect();
95
96 let mut new_migrations = Vec::new();
97 for migration in migrations {
98 match applied_migrations.get(&migration.version) {
99 Some(applied_migration) => {
100 if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
101 {
102 Err(anyhow!(
103 "checksum mismatch for applied migration {}",
104 migration.description
105 ))?;
106 }
107 }
108 None => {
109 let elapsed = connection.apply(&migration).await?;
110 new_migrations.push((migration, elapsed));
111 }
112 }
113 }
114
115 Ok(new_migrations)
116 }
117
118 pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
119 self.transaction(|tx| async move {
120 let server = server::ActiveModel {
121 environment: ActiveValue::set(environment.into()),
122 ..Default::default()
123 }
124 .insert(&*tx)
125 .await?;
126 Ok(server.id)
127 })
128 .await
129 }
130
131 pub async fn stale_room_ids(
132 &self,
133 environment: &str,
134 new_server_id: ServerId,
135 ) -> Result<Vec<RoomId>> {
136 self.transaction(|tx| async move {
137 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
138 enum QueryAs {
139 RoomId,
140 }
141
142 let stale_server_epochs = self
143 .stale_server_ids(environment, new_server_id, &tx)
144 .await?;
145 Ok(room_participant::Entity::find()
146 .select_only()
147 .column(room_participant::Column::RoomId)
148 .distinct()
149 .filter(
150 room_participant::Column::AnsweringConnectionServerId
151 .is_in(stale_server_epochs),
152 )
153 .into_values::<_, QueryAs>()
154 .all(&*tx)
155 .await?)
156 })
157 .await
158 }
159
160 pub async fn refresh_room(
161 &self,
162 room_id: RoomId,
163 new_server_id: ServerId,
164 ) -> Result<RoomGuard<RefreshedRoom>> {
165 self.room_transaction(room_id, |tx| async move {
166 let stale_participant_filter = Condition::all()
167 .add(room_participant::Column::RoomId.eq(room_id))
168 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
169 .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
170
171 let stale_participant_user_ids = room_participant::Entity::find()
172 .filter(stale_participant_filter.clone())
173 .all(&*tx)
174 .await?
175 .into_iter()
176 .map(|participant| participant.user_id)
177 .collect::<Vec<_>>();
178
179 // Delete participants who failed to reconnect and cancel their calls.
180 let mut canceled_calls_to_user_ids = Vec::new();
181 room_participant::Entity::delete_many()
182 .filter(stale_participant_filter)
183 .exec(&*tx)
184 .await?;
185 let called_participants = room_participant::Entity::find()
186 .filter(
187 Condition::all()
188 .add(
189 room_participant::Column::CallingUserId
190 .is_in(stale_participant_user_ids.iter().copied()),
191 )
192 .add(room_participant::Column::AnsweringConnectionId.is_null()),
193 )
194 .all(&*tx)
195 .await?;
196 room_participant::Entity::delete_many()
197 .filter(
198 room_participant::Column::Id
199 .is_in(called_participants.iter().map(|participant| participant.id)),
200 )
201 .exec(&*tx)
202 .await?;
203 canceled_calls_to_user_ids.extend(
204 called_participants
205 .into_iter()
206 .map(|participant| participant.user_id),
207 );
208
209 let room = self.get_room(room_id, &tx).await?;
210 // Delete the room if it becomes empty.
211 if room.participants.is_empty() {
212 project::Entity::delete_many()
213 .filter(project::Column::RoomId.eq(room_id))
214 .exec(&*tx)
215 .await?;
216 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
217 }
218
219 Ok(RefreshedRoom {
220 room,
221 stale_participant_user_ids,
222 canceled_calls_to_user_ids,
223 })
224 })
225 .await
226 }
227
228 pub async fn delete_stale_servers(
229 &self,
230 environment: &str,
231 new_server_id: ServerId,
232 ) -> Result<()> {
233 self.transaction(|tx| async move {
234 server::Entity::delete_many()
235 .filter(
236 Condition::all()
237 .add(server::Column::Environment.eq(environment))
238 .add(server::Column::Id.ne(new_server_id)),
239 )
240 .exec(&*tx)
241 .await?;
242 Ok(())
243 })
244 .await
245 }
246
247 async fn stale_server_ids(
248 &self,
249 environment: &str,
250 new_server_id: ServerId,
251 tx: &DatabaseTransaction,
252 ) -> Result<Vec<ServerId>> {
253 let stale_servers = server::Entity::find()
254 .filter(
255 Condition::all()
256 .add(server::Column::Environment.eq(environment))
257 .add(server::Column::Id.ne(new_server_id)),
258 )
259 .all(&*tx)
260 .await?;
261 Ok(stale_servers.into_iter().map(|server| server.id).collect())
262 }
263
264 // users
265
266 pub async fn create_user(
267 &self,
268 email_address: &str,
269 admin: bool,
270 params: NewUserParams,
271 ) -> Result<NewUserResult> {
272 self.transaction(|tx| async {
273 let tx = tx;
274 let user = user::Entity::insert(user::ActiveModel {
275 email_address: ActiveValue::set(Some(email_address.into())),
276 github_login: ActiveValue::set(params.github_login.clone()),
277 github_user_id: ActiveValue::set(Some(params.github_user_id)),
278 admin: ActiveValue::set(admin),
279 metrics_id: ActiveValue::set(Uuid::new_v4()),
280 ..Default::default()
281 })
282 .on_conflict(
283 OnConflict::column(user::Column::GithubLogin)
284 .update_column(user::Column::GithubLogin)
285 .to_owned(),
286 )
287 .exec_with_returning(&*tx)
288 .await?;
289
290 Ok(NewUserResult {
291 user_id: user.id,
292 metrics_id: user.metrics_id.to_string(),
293 signup_device_id: None,
294 inviting_user_id: None,
295 })
296 })
297 .await
298 }
299
300 pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
301 self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
302 .await
303 }
304
305 pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
306 self.transaction(|tx| async {
307 let tx = tx;
308 Ok(user::Entity::find()
309 .filter(user::Column::Id.is_in(ids.iter().copied()))
310 .all(&*tx)
311 .await?)
312 })
313 .await
314 }
315
316 pub async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>> {
317 self.transaction(|tx| async move {
318 Ok(user::Entity::find()
319 .filter(user::Column::GithubLogin.eq(github_login))
320 .one(&*tx)
321 .await?)
322 })
323 .await
324 }
325
326 pub async fn get_or_create_user_by_github_account(
327 &self,
328 github_login: &str,
329 github_user_id: Option<i32>,
330 github_email: Option<&str>,
331 ) -> Result<Option<User>> {
332 self.transaction(|tx| async move {
333 let tx = &*tx;
334 if let Some(github_user_id) = github_user_id {
335 if let Some(user_by_github_user_id) = user::Entity::find()
336 .filter(user::Column::GithubUserId.eq(github_user_id))
337 .one(tx)
338 .await?
339 {
340 let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
341 user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
342 Ok(Some(user_by_github_user_id.update(tx).await?))
343 } else if let Some(user_by_github_login) = user::Entity::find()
344 .filter(user::Column::GithubLogin.eq(github_login))
345 .one(tx)
346 .await?
347 {
348 let mut user_by_github_login = user_by_github_login.into_active_model();
349 user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
350 Ok(Some(user_by_github_login.update(tx).await?))
351 } else {
352 let user = user::Entity::insert(user::ActiveModel {
353 email_address: ActiveValue::set(github_email.map(|email| email.into())),
354 github_login: ActiveValue::set(github_login.into()),
355 github_user_id: ActiveValue::set(Some(github_user_id)),
356 admin: ActiveValue::set(false),
357 invite_count: ActiveValue::set(0),
358 invite_code: ActiveValue::set(None),
359 metrics_id: ActiveValue::set(Uuid::new_v4()),
360 ..Default::default()
361 })
362 .exec_with_returning(&*tx)
363 .await?;
364 Ok(Some(user))
365 }
366 } else {
367 Ok(user::Entity::find()
368 .filter(user::Column::GithubLogin.eq(github_login))
369 .one(tx)
370 .await?)
371 }
372 })
373 .await
374 }
375
376 pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
377 self.transaction(|tx| async move {
378 Ok(user::Entity::find()
379 .order_by_asc(user::Column::GithubLogin)
380 .limit(limit as u64)
381 .offset(page as u64 * limit as u64)
382 .all(&*tx)
383 .await?)
384 })
385 .await
386 }
387
388 pub async fn get_users_with_no_invites(
389 &self,
390 invited_by_another_user: bool,
391 ) -> Result<Vec<User>> {
392 self.transaction(|tx| async move {
393 Ok(user::Entity::find()
394 .filter(
395 user::Column::InviteCount
396 .eq(0)
397 .and(if invited_by_another_user {
398 user::Column::InviterId.is_not_null()
399 } else {
400 user::Column::InviterId.is_null()
401 }),
402 )
403 .all(&*tx)
404 .await?)
405 })
406 .await
407 }
408
409 pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
410 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
411 enum QueryAs {
412 MetricsId,
413 }
414
415 self.transaction(|tx| async move {
416 let metrics_id: Uuid = user::Entity::find_by_id(id)
417 .select_only()
418 .column(user::Column::MetricsId)
419 .into_values::<_, QueryAs>()
420 .one(&*tx)
421 .await?
422 .ok_or_else(|| anyhow!("could not find user"))?;
423 Ok(metrics_id.to_string())
424 })
425 .await
426 }
427
428 pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
429 self.transaction(|tx| async move {
430 user::Entity::update_many()
431 .filter(user::Column::Id.eq(id))
432 .set(user::ActiveModel {
433 admin: ActiveValue::set(is_admin),
434 ..Default::default()
435 })
436 .exec(&*tx)
437 .await?;
438 Ok(())
439 })
440 .await
441 }
442
443 pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
444 self.transaction(|tx| async move {
445 user::Entity::update_many()
446 .filter(user::Column::Id.eq(id))
447 .set(user::ActiveModel {
448 connected_once: ActiveValue::set(connected_once),
449 ..Default::default()
450 })
451 .exec(&*tx)
452 .await?;
453 Ok(())
454 })
455 .await
456 }
457
458 pub async fn destroy_user(&self, id: UserId) -> Result<()> {
459 self.transaction(|tx| async move {
460 access_token::Entity::delete_many()
461 .filter(access_token::Column::UserId.eq(id))
462 .exec(&*tx)
463 .await?;
464 user::Entity::delete_by_id(id).exec(&*tx).await?;
465 Ok(())
466 })
467 .await
468 }
469
470 // contacts
471
472 pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
473 #[derive(Debug, FromQueryResult)]
474 struct ContactWithUserBusyStatuses {
475 user_id_a: UserId,
476 user_id_b: UserId,
477 a_to_b: bool,
478 accepted: bool,
479 should_notify: bool,
480 user_a_busy: bool,
481 user_b_busy: bool,
482 }
483
484 self.transaction(|tx| async move {
485 let user_a_participant = Alias::new("user_a_participant");
486 let user_b_participant = Alias::new("user_b_participant");
487 let mut db_contacts = contact::Entity::find()
488 .column_as(
489 Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
490 .is_not_null(),
491 "user_a_busy",
492 )
493 .column_as(
494 Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
495 .is_not_null(),
496 "user_b_busy",
497 )
498 .filter(
499 contact::Column::UserIdA
500 .eq(user_id)
501 .or(contact::Column::UserIdB.eq(user_id)),
502 )
503 .join_as(
504 JoinType::LeftJoin,
505 contact::Relation::UserARoomParticipant.def(),
506 user_a_participant,
507 )
508 .join_as(
509 JoinType::LeftJoin,
510 contact::Relation::UserBRoomParticipant.def(),
511 user_b_participant,
512 )
513 .into_model::<ContactWithUserBusyStatuses>()
514 .stream(&*tx)
515 .await?;
516
517 let mut contacts = Vec::new();
518 while let Some(db_contact) = db_contacts.next().await {
519 let db_contact = db_contact?;
520 if db_contact.user_id_a == user_id {
521 if db_contact.accepted {
522 contacts.push(Contact::Accepted {
523 user_id: db_contact.user_id_b,
524 should_notify: db_contact.should_notify && db_contact.a_to_b,
525 busy: db_contact.user_b_busy,
526 });
527 } else if db_contact.a_to_b {
528 contacts.push(Contact::Outgoing {
529 user_id: db_contact.user_id_b,
530 })
531 } else {
532 contacts.push(Contact::Incoming {
533 user_id: db_contact.user_id_b,
534 should_notify: db_contact.should_notify,
535 });
536 }
537 } else if db_contact.accepted {
538 contacts.push(Contact::Accepted {
539 user_id: db_contact.user_id_a,
540 should_notify: db_contact.should_notify && !db_contact.a_to_b,
541 busy: db_contact.user_a_busy,
542 });
543 } else if db_contact.a_to_b {
544 contacts.push(Contact::Incoming {
545 user_id: db_contact.user_id_a,
546 should_notify: db_contact.should_notify,
547 });
548 } else {
549 contacts.push(Contact::Outgoing {
550 user_id: db_contact.user_id_a,
551 });
552 }
553 }
554
555 contacts.sort_unstable_by_key(|contact| contact.user_id());
556
557 Ok(contacts)
558 })
559 .await
560 }
561
562 pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
563 self.transaction(|tx| async move {
564 let participant = room_participant::Entity::find()
565 .filter(room_participant::Column::UserId.eq(user_id))
566 .one(&*tx)
567 .await?;
568 Ok(participant.is_some())
569 })
570 .await
571 }
572
573 pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
574 self.transaction(|tx| async move {
575 let (id_a, id_b) = if user_id_1 < user_id_2 {
576 (user_id_1, user_id_2)
577 } else {
578 (user_id_2, user_id_1)
579 };
580
581 Ok(contact::Entity::find()
582 .filter(
583 contact::Column::UserIdA
584 .eq(id_a)
585 .and(contact::Column::UserIdB.eq(id_b))
586 .and(contact::Column::Accepted.eq(true)),
587 )
588 .one(&*tx)
589 .await?
590 .is_some())
591 })
592 .await
593 }
594
595 pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
596 self.transaction(|tx| async move {
597 let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
598 (sender_id, receiver_id, true)
599 } else {
600 (receiver_id, sender_id, false)
601 };
602
603 let rows_affected = contact::Entity::insert(contact::ActiveModel {
604 user_id_a: ActiveValue::set(id_a),
605 user_id_b: ActiveValue::set(id_b),
606 a_to_b: ActiveValue::set(a_to_b),
607 accepted: ActiveValue::set(false),
608 should_notify: ActiveValue::set(true),
609 ..Default::default()
610 })
611 .on_conflict(
612 OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
613 .values([
614 (contact::Column::Accepted, true.into()),
615 (contact::Column::ShouldNotify, false.into()),
616 ])
617 .action_and_where(
618 contact::Column::Accepted.eq(false).and(
619 contact::Column::AToB
620 .eq(a_to_b)
621 .and(contact::Column::UserIdA.eq(id_b))
622 .or(contact::Column::AToB
623 .ne(a_to_b)
624 .and(contact::Column::UserIdA.eq(id_a))),
625 ),
626 )
627 .to_owned(),
628 )
629 .exec_without_returning(&*tx)
630 .await?;
631
632 if rows_affected == 1 {
633 Ok(())
634 } else {
635 Err(anyhow!("contact already requested"))?
636 }
637 })
638 .await
639 }
640
641 /// Returns a bool indicating whether the removed contact had originally accepted or not
642 ///
643 /// Deletes the contact identified by the requester and responder ids, and then returns
644 /// whether the deleted contact had originally accepted or was a pending contact request.
645 ///
646 /// # Arguments
647 ///
648 /// * `requester_id` - The user that initiates this request
649 /// * `responder_id` - The user that will be removed
650 pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<bool> {
651 self.transaction(|tx| async move {
652 let (id_a, id_b) = if responder_id < requester_id {
653 (responder_id, requester_id)
654 } else {
655 (requester_id, responder_id)
656 };
657
658 let contact = contact::Entity::find()
659 .filter(
660 contact::Column::UserIdA
661 .eq(id_a)
662 .and(contact::Column::UserIdB.eq(id_b)),
663 )
664 .one(&*tx)
665 .await?
666 .ok_or_else(|| anyhow!("no such contact"))?;
667
668 contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
669 Ok(contact.accepted)
670 })
671 .await
672 }
673
674 pub async fn dismiss_contact_notification(
675 &self,
676 user_id: UserId,
677 contact_user_id: UserId,
678 ) -> Result<()> {
679 self.transaction(|tx| async move {
680 let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
681 (user_id, contact_user_id, true)
682 } else {
683 (contact_user_id, user_id, false)
684 };
685
686 let result = contact::Entity::update_many()
687 .set(contact::ActiveModel {
688 should_notify: ActiveValue::set(false),
689 ..Default::default()
690 })
691 .filter(
692 contact::Column::UserIdA
693 .eq(id_a)
694 .and(contact::Column::UserIdB.eq(id_b))
695 .and(
696 contact::Column::AToB
697 .eq(a_to_b)
698 .and(contact::Column::Accepted.eq(true))
699 .or(contact::Column::AToB
700 .ne(a_to_b)
701 .and(contact::Column::Accepted.eq(false))),
702 ),
703 )
704 .exec(&*tx)
705 .await?;
706 if result.rows_affected == 0 {
707 Err(anyhow!("no such contact request"))?
708 } else {
709 Ok(())
710 }
711 })
712 .await
713 }
714
715 pub async fn respond_to_contact_request(
716 &self,
717 responder_id: UserId,
718 requester_id: UserId,
719 accept: bool,
720 ) -> Result<()> {
721 self.transaction(|tx| async move {
722 let (id_a, id_b, a_to_b) = if responder_id < requester_id {
723 (responder_id, requester_id, false)
724 } else {
725 (requester_id, responder_id, true)
726 };
727 let rows_affected = if accept {
728 let result = contact::Entity::update_many()
729 .set(contact::ActiveModel {
730 accepted: ActiveValue::set(true),
731 should_notify: ActiveValue::set(true),
732 ..Default::default()
733 })
734 .filter(
735 contact::Column::UserIdA
736 .eq(id_a)
737 .and(contact::Column::UserIdB.eq(id_b))
738 .and(contact::Column::AToB.eq(a_to_b)),
739 )
740 .exec(&*tx)
741 .await?;
742 result.rows_affected
743 } else {
744 let result = contact::Entity::delete_many()
745 .filter(
746 contact::Column::UserIdA
747 .eq(id_a)
748 .and(contact::Column::UserIdB.eq(id_b))
749 .and(contact::Column::AToB.eq(a_to_b))
750 .and(contact::Column::Accepted.eq(false)),
751 )
752 .exec(&*tx)
753 .await?;
754
755 result.rows_affected
756 };
757
758 if rows_affected == 1 {
759 Ok(())
760 } else {
761 Err(anyhow!("no such contact request"))?
762 }
763 })
764 .await
765 }
766
767 pub fn fuzzy_like_string(string: &str) -> String {
768 let mut result = String::with_capacity(string.len() * 2 + 1);
769 for c in string.chars() {
770 if c.is_alphanumeric() {
771 result.push('%');
772 result.push(c);
773 }
774 }
775 result.push('%');
776 result
777 }
778
779 pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
780 self.transaction(|tx| async {
781 let tx = tx;
782 let like_string = Self::fuzzy_like_string(name_query);
783 let query = "
784 SELECT users.*
785 FROM users
786 WHERE github_login ILIKE $1
787 ORDER BY github_login <-> $2
788 LIMIT $3
789 ";
790
791 Ok(user::Entity::find()
792 .from_raw_sql(Statement::from_sql_and_values(
793 self.pool.get_database_backend(),
794 query.into(),
795 vec![like_string.into(), name_query.into(), limit.into()],
796 ))
797 .all(&*tx)
798 .await?)
799 })
800 .await
801 }
802
803 // signups
804
805 pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
806 self.transaction(|tx| async move {
807 signup::Entity::insert(signup::ActiveModel {
808 email_address: ActiveValue::set(signup.email_address.clone()),
809 email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
810 email_confirmation_sent: ActiveValue::set(false),
811 platform_mac: ActiveValue::set(signup.platform_mac),
812 platform_windows: ActiveValue::set(signup.platform_windows),
813 platform_linux: ActiveValue::set(signup.platform_linux),
814 platform_unknown: ActiveValue::set(false),
815 editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
816 programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
817 device_id: ActiveValue::set(signup.device_id.clone()),
818 added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
819 ..Default::default()
820 })
821 .on_conflict(
822 OnConflict::column(signup::Column::EmailAddress)
823 .update_columns([
824 signup::Column::PlatformMac,
825 signup::Column::PlatformWindows,
826 signup::Column::PlatformLinux,
827 signup::Column::EditorFeatures,
828 signup::Column::ProgrammingLanguages,
829 signup::Column::DeviceId,
830 signup::Column::AddedToMailingList,
831 ])
832 .to_owned(),
833 )
834 .exec(&*tx)
835 .await?;
836 Ok(())
837 })
838 .await
839 }
840
841 pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
842 self.transaction(|tx| async move {
843 let signup = signup::Entity::find()
844 .filter(signup::Column::EmailAddress.eq(email_address))
845 .one(&*tx)
846 .await?
847 .ok_or_else(|| {
848 anyhow!("signup with email address {} doesn't exist", email_address)
849 })?;
850
851 Ok(signup)
852 })
853 .await
854 }
855
856 pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
857 self.transaction(|tx| async move {
858 let query = "
859 SELECT
860 COUNT(*) as count,
861 COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
862 COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
863 COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
864 COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
865 FROM (
866 SELECT *
867 FROM signups
868 WHERE
869 NOT email_confirmation_sent
870 ) AS unsent
871 ";
872 Ok(
873 WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
874 self.pool.get_database_backend(),
875 query.into(),
876 vec![],
877 ))
878 .one(&*tx)
879 .await?
880 .ok_or_else(|| anyhow!("invalid result"))?,
881 )
882 })
883 .await
884 }
885
886 pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
887 let emails = invites
888 .iter()
889 .map(|s| s.email_address.as_str())
890 .collect::<Vec<_>>();
891 self.transaction(|tx| async {
892 let tx = tx;
893 signup::Entity::update_many()
894 .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
895 .set(signup::ActiveModel {
896 email_confirmation_sent: ActiveValue::set(true),
897 ..Default::default()
898 })
899 .exec(&*tx)
900 .await?;
901 Ok(())
902 })
903 .await
904 }
905
906 pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
907 self.transaction(|tx| async move {
908 Ok(signup::Entity::find()
909 .select_only()
910 .column(signup::Column::EmailAddress)
911 .column(signup::Column::EmailConfirmationCode)
912 .filter(
913 signup::Column::EmailConfirmationSent.eq(false).and(
914 signup::Column::PlatformMac
915 .eq(true)
916 .or(signup::Column::PlatformUnknown.eq(true)),
917 ),
918 )
919 .order_by_asc(signup::Column::CreatedAt)
920 .limit(count as u64)
921 .into_model()
922 .all(&*tx)
923 .await?)
924 })
925 .await
926 }
927
928 // invite codes
929
930 pub async fn create_invite_from_code(
931 &self,
932 code: &str,
933 email_address: &str,
934 device_id: Option<&str>,
935 added_to_mailing_list: bool,
936 ) -> Result<Invite> {
937 self.transaction(|tx| async move {
938 let existing_user = user::Entity::find()
939 .filter(user::Column::EmailAddress.eq(email_address))
940 .one(&*tx)
941 .await?;
942
943 if existing_user.is_some() {
944 Err(anyhow!("email address is already in use"))?;
945 }
946
947 let inviting_user_with_invites = match user::Entity::find()
948 .filter(
949 user::Column::InviteCode
950 .eq(code)
951 .and(user::Column::InviteCount.gt(0)),
952 )
953 .one(&*tx)
954 .await?
955 {
956 Some(inviting_user) => inviting_user,
957 None => {
958 return Err(Error::Http(
959 StatusCode::UNAUTHORIZED,
960 "unable to find an invite code with invites remaining".to_string(),
961 ))?
962 }
963 };
964 user::Entity::update_many()
965 .filter(
966 user::Column::Id
967 .eq(inviting_user_with_invites.id)
968 .and(user::Column::InviteCount.gt(0)),
969 )
970 .col_expr(
971 user::Column::InviteCount,
972 Expr::col(user::Column::InviteCount).sub(1),
973 )
974 .exec(&*tx)
975 .await?;
976
977 let signup = signup::Entity::insert(signup::ActiveModel {
978 email_address: ActiveValue::set(email_address.into()),
979 email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
980 email_confirmation_sent: ActiveValue::set(false),
981 inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
982 platform_linux: ActiveValue::set(false),
983 platform_mac: ActiveValue::set(false),
984 platform_windows: ActiveValue::set(false),
985 platform_unknown: ActiveValue::set(true),
986 device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
987 added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
988 ..Default::default()
989 })
990 .on_conflict(
991 OnConflict::column(signup::Column::EmailAddress)
992 .update_column(signup::Column::InvitingUserId)
993 .to_owned(),
994 )
995 .exec_with_returning(&*tx)
996 .await?;
997
998 Ok(Invite {
999 email_address: signup.email_address,
1000 email_confirmation_code: signup.email_confirmation_code,
1001 })
1002 })
1003 .await
1004 }
1005
1006 pub async fn create_user_from_invite(
1007 &self,
1008 invite: &Invite,
1009 user: NewUserParams,
1010 ) -> Result<Option<NewUserResult>> {
1011 self.transaction(|tx| async {
1012 let tx = tx;
1013 let signup = signup::Entity::find()
1014 .filter(
1015 signup::Column::EmailAddress
1016 .eq(invite.email_address.as_str())
1017 .and(
1018 signup::Column::EmailConfirmationCode
1019 .eq(invite.email_confirmation_code.as_str()),
1020 ),
1021 )
1022 .one(&*tx)
1023 .await?
1024 .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
1025
1026 if signup.user_id.is_some() {
1027 return Ok(None);
1028 }
1029
1030 let user = user::Entity::insert(user::ActiveModel {
1031 email_address: ActiveValue::set(Some(invite.email_address.clone())),
1032 github_login: ActiveValue::set(user.github_login.clone()),
1033 github_user_id: ActiveValue::set(Some(user.github_user_id)),
1034 admin: ActiveValue::set(false),
1035 invite_count: ActiveValue::set(user.invite_count),
1036 invite_code: ActiveValue::set(Some(random_invite_code())),
1037 metrics_id: ActiveValue::set(Uuid::new_v4()),
1038 ..Default::default()
1039 })
1040 .on_conflict(
1041 OnConflict::column(user::Column::GithubLogin)
1042 .update_columns([
1043 user::Column::EmailAddress,
1044 user::Column::GithubUserId,
1045 user::Column::Admin,
1046 ])
1047 .to_owned(),
1048 )
1049 .exec_with_returning(&*tx)
1050 .await?;
1051
1052 let mut signup = signup.into_active_model();
1053 signup.user_id = ActiveValue::set(Some(user.id));
1054 let signup = signup.update(&*tx).await?;
1055
1056 if let Some(inviting_user_id) = signup.inviting_user_id {
1057 let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1058 (inviting_user_id, user.id, true)
1059 } else {
1060 (user.id, inviting_user_id, false)
1061 };
1062
1063 contact::Entity::insert(contact::ActiveModel {
1064 user_id_a: ActiveValue::set(user_id_a),
1065 user_id_b: ActiveValue::set(user_id_b),
1066 a_to_b: ActiveValue::set(a_to_b),
1067 should_notify: ActiveValue::set(true),
1068 accepted: ActiveValue::set(true),
1069 ..Default::default()
1070 })
1071 .on_conflict(OnConflict::new().do_nothing().to_owned())
1072 .exec_without_returning(&*tx)
1073 .await?;
1074 }
1075
1076 Ok(Some(NewUserResult {
1077 user_id: user.id,
1078 metrics_id: user.metrics_id.to_string(),
1079 inviting_user_id: signup.inviting_user_id,
1080 signup_device_id: signup.device_id,
1081 }))
1082 })
1083 .await
1084 }
1085
1086 pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1087 self.transaction(|tx| async move {
1088 if count > 0 {
1089 user::Entity::update_many()
1090 .filter(
1091 user::Column::Id
1092 .eq(id)
1093 .and(user::Column::InviteCode.is_null()),
1094 )
1095 .set(user::ActiveModel {
1096 invite_code: ActiveValue::set(Some(random_invite_code())),
1097 ..Default::default()
1098 })
1099 .exec(&*tx)
1100 .await?;
1101 }
1102
1103 user::Entity::update_many()
1104 .filter(user::Column::Id.eq(id))
1105 .set(user::ActiveModel {
1106 invite_count: ActiveValue::set(count),
1107 ..Default::default()
1108 })
1109 .exec(&*tx)
1110 .await?;
1111 Ok(())
1112 })
1113 .await
1114 }
1115
1116 pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1117 self.transaction(|tx| async move {
1118 match user::Entity::find_by_id(id).one(&*tx).await? {
1119 Some(user) if user.invite_code.is_some() => {
1120 Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1121 }
1122 _ => Ok(None),
1123 }
1124 })
1125 .await
1126 }
1127
1128 pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1129 self.transaction(|tx| async move {
1130 user::Entity::find()
1131 .filter(user::Column::InviteCode.eq(code))
1132 .one(&*tx)
1133 .await?
1134 .ok_or_else(|| {
1135 Error::Http(
1136 StatusCode::NOT_FOUND,
1137 "that invite code does not exist".to_string(),
1138 )
1139 })
1140 })
1141 .await
1142 }
1143
1144 // rooms
1145
1146 pub async fn incoming_call_for_user(
1147 &self,
1148 user_id: UserId,
1149 ) -> Result<Option<proto::IncomingCall>> {
1150 self.transaction(|tx| async move {
1151 let pending_participant = room_participant::Entity::find()
1152 .filter(
1153 room_participant::Column::UserId
1154 .eq(user_id)
1155 .and(room_participant::Column::AnsweringConnectionId.is_null()),
1156 )
1157 .one(&*tx)
1158 .await?;
1159
1160 if let Some(pending_participant) = pending_participant {
1161 let room = self.get_room(pending_participant.room_id, &tx).await?;
1162 Ok(Self::build_incoming_call(&room, user_id))
1163 } else {
1164 Ok(None)
1165 }
1166 })
1167 .await
1168 }
1169
1170 pub async fn create_room(
1171 &self,
1172 user_id: UserId,
1173 connection: ConnectionId,
1174 live_kit_room: &str,
1175 ) -> Result<proto::Room> {
1176 self.transaction(|tx| async move {
1177 let room = room::ActiveModel {
1178 live_kit_room: ActiveValue::set(live_kit_room.into()),
1179 ..Default::default()
1180 }
1181 .insert(&*tx)
1182 .await?;
1183 room_participant::ActiveModel {
1184 room_id: ActiveValue::set(room.id),
1185 user_id: ActiveValue::set(user_id),
1186 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1187 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1188 connection.owner_id as i32,
1189 ))),
1190 answering_connection_lost: ActiveValue::set(false),
1191 calling_user_id: ActiveValue::set(user_id),
1192 calling_connection_id: ActiveValue::set(connection.id as i32),
1193 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1194 connection.owner_id as i32,
1195 ))),
1196 ..Default::default()
1197 }
1198 .insert(&*tx)
1199 .await?;
1200
1201 let room = self.get_room(room.id, &tx).await?;
1202 Ok(room)
1203 })
1204 .await
1205 }
1206
1207 pub async fn call(
1208 &self,
1209 room_id: RoomId,
1210 calling_user_id: UserId,
1211 calling_connection: ConnectionId,
1212 called_user_id: UserId,
1213 initial_project_id: Option<ProjectId>,
1214 ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1215 self.room_transaction(room_id, |tx| async move {
1216 room_participant::ActiveModel {
1217 room_id: ActiveValue::set(room_id),
1218 user_id: ActiveValue::set(called_user_id),
1219 answering_connection_lost: ActiveValue::set(false),
1220 calling_user_id: ActiveValue::set(calling_user_id),
1221 calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1222 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1223 calling_connection.owner_id as i32,
1224 ))),
1225 initial_project_id: ActiveValue::set(initial_project_id),
1226 ..Default::default()
1227 }
1228 .insert(&*tx)
1229 .await?;
1230
1231 let room = self.get_room(room_id, &tx).await?;
1232 let incoming_call = Self::build_incoming_call(&room, called_user_id)
1233 .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1234 Ok((room, incoming_call))
1235 })
1236 .await
1237 }
1238
1239 pub async fn call_failed(
1240 &self,
1241 room_id: RoomId,
1242 called_user_id: UserId,
1243 ) -> Result<RoomGuard<proto::Room>> {
1244 self.room_transaction(room_id, |tx| async move {
1245 room_participant::Entity::delete_many()
1246 .filter(
1247 room_participant::Column::RoomId
1248 .eq(room_id)
1249 .and(room_participant::Column::UserId.eq(called_user_id)),
1250 )
1251 .exec(&*tx)
1252 .await?;
1253 let room = self.get_room(room_id, &tx).await?;
1254 Ok(room)
1255 })
1256 .await
1257 }
1258
1259 pub async fn decline_call(
1260 &self,
1261 expected_room_id: Option<RoomId>,
1262 user_id: UserId,
1263 ) -> Result<Option<RoomGuard<proto::Room>>> {
1264 self.optional_room_transaction(|tx| async move {
1265 let mut filter = Condition::all()
1266 .add(room_participant::Column::UserId.eq(user_id))
1267 .add(room_participant::Column::AnsweringConnectionId.is_null());
1268 if let Some(room_id) = expected_room_id {
1269 filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1270 }
1271 let participant = room_participant::Entity::find()
1272 .filter(filter)
1273 .one(&*tx)
1274 .await?;
1275
1276 let participant = if let Some(participant) = participant {
1277 participant
1278 } else if expected_room_id.is_some() {
1279 return Err(anyhow!("could not find call to decline"))?;
1280 } else {
1281 return Ok(None);
1282 };
1283
1284 let room_id = participant.room_id;
1285 room_participant::Entity::delete(participant.into_active_model())
1286 .exec(&*tx)
1287 .await?;
1288
1289 let room = self.get_room(room_id, &tx).await?;
1290 Ok(Some((room_id, room)))
1291 })
1292 .await
1293 }
1294
1295 pub async fn cancel_call(
1296 &self,
1297 room_id: RoomId,
1298 calling_connection: ConnectionId,
1299 called_user_id: UserId,
1300 ) -> Result<RoomGuard<proto::Room>> {
1301 self.room_transaction(room_id, |tx| async move {
1302 let participant = room_participant::Entity::find()
1303 .filter(
1304 Condition::all()
1305 .add(room_participant::Column::UserId.eq(called_user_id))
1306 .add(room_participant::Column::RoomId.eq(room_id))
1307 .add(
1308 room_participant::Column::CallingConnectionId
1309 .eq(calling_connection.id as i32),
1310 )
1311 .add(
1312 room_participant::Column::CallingConnectionServerId
1313 .eq(calling_connection.owner_id as i32),
1314 )
1315 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1316 )
1317 .one(&*tx)
1318 .await?
1319 .ok_or_else(|| anyhow!("no call to cancel"))?;
1320
1321 room_participant::Entity::delete(participant.into_active_model())
1322 .exec(&*tx)
1323 .await?;
1324
1325 let room = self.get_room(room_id, &tx).await?;
1326 Ok(room)
1327 })
1328 .await
1329 }
1330
1331 pub async fn join_room(
1332 &self,
1333 room_id: RoomId,
1334 user_id: UserId,
1335 connection: ConnectionId,
1336 ) -> Result<RoomGuard<proto::Room>> {
1337 self.room_transaction(room_id, |tx| async move {
1338 let result = room_participant::Entity::update_many()
1339 .filter(
1340 Condition::all()
1341 .add(room_participant::Column::RoomId.eq(room_id))
1342 .add(room_participant::Column::UserId.eq(user_id))
1343 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1344 )
1345 .set(room_participant::ActiveModel {
1346 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1347 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1348 connection.owner_id as i32,
1349 ))),
1350 answering_connection_lost: ActiveValue::set(false),
1351 ..Default::default()
1352 })
1353 .exec(&*tx)
1354 .await?;
1355 if result.rows_affected == 0 {
1356 Err(anyhow!("room does not exist or was already joined"))?
1357 } else {
1358 let room = self.get_room(room_id, &tx).await?;
1359 Ok(room)
1360 }
1361 })
1362 .await
1363 }
1364
1365 pub async fn rejoin_room(
1366 &self,
1367 rejoin_room: proto::RejoinRoom,
1368 user_id: UserId,
1369 connection: ConnectionId,
1370 ) -> Result<RoomGuard<RejoinedRoom>> {
1371 let room_id = RoomId::from_proto(rejoin_room.id);
1372 self.room_transaction(room_id, |tx| async {
1373 let tx = tx;
1374 let participant_update = room_participant::Entity::update_many()
1375 .filter(
1376 Condition::all()
1377 .add(room_participant::Column::RoomId.eq(room_id))
1378 .add(room_participant::Column::UserId.eq(user_id))
1379 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1380 .add(
1381 Condition::any()
1382 .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1383 .add(
1384 room_participant::Column::AnsweringConnectionServerId
1385 .ne(connection.owner_id as i32),
1386 ),
1387 ),
1388 )
1389 .set(room_participant::ActiveModel {
1390 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1391 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1392 connection.owner_id as i32,
1393 ))),
1394 answering_connection_lost: ActiveValue::set(false),
1395 ..Default::default()
1396 })
1397 .exec(&*tx)
1398 .await?;
1399 if participant_update.rows_affected == 0 {
1400 return Err(anyhow!("room does not exist or was already joined"))?;
1401 }
1402
1403 let mut reshared_projects = Vec::new();
1404 for reshared_project in &rejoin_room.reshared_projects {
1405 let project_id = ProjectId::from_proto(reshared_project.project_id);
1406 let project = project::Entity::find_by_id(project_id)
1407 .one(&*tx)
1408 .await?
1409 .ok_or_else(|| anyhow!("project does not exist"))?;
1410 if project.host_user_id != user_id {
1411 return Err(anyhow!("no such project"))?;
1412 }
1413
1414 let mut collaborators = project
1415 .find_related(project_collaborator::Entity)
1416 .all(&*tx)
1417 .await?;
1418 let host_ix = collaborators
1419 .iter()
1420 .position(|collaborator| {
1421 collaborator.user_id == user_id && collaborator.is_host
1422 })
1423 .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1424 let host = collaborators.swap_remove(host_ix);
1425 let old_connection_id = host.connection();
1426
1427 project::Entity::update(project::ActiveModel {
1428 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1429 host_connection_server_id: ActiveValue::set(Some(ServerId(
1430 connection.owner_id as i32,
1431 ))),
1432 ..project.into_active_model()
1433 })
1434 .exec(&*tx)
1435 .await?;
1436 project_collaborator::Entity::update(project_collaborator::ActiveModel {
1437 connection_id: ActiveValue::set(connection.id as i32),
1438 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1439 ..host.into_active_model()
1440 })
1441 .exec(&*tx)
1442 .await?;
1443
1444 self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1445 .await?;
1446
1447 reshared_projects.push(ResharedProject {
1448 id: project_id,
1449 old_connection_id,
1450 collaborators: collaborators
1451 .iter()
1452 .map(|collaborator| ProjectCollaborator {
1453 connection_id: collaborator.connection(),
1454 user_id: collaborator.user_id,
1455 replica_id: collaborator.replica_id,
1456 is_host: collaborator.is_host,
1457 })
1458 .collect(),
1459 worktrees: reshared_project.worktrees.clone(),
1460 });
1461 }
1462
1463 project::Entity::delete_many()
1464 .filter(
1465 Condition::all()
1466 .add(project::Column::RoomId.eq(room_id))
1467 .add(project::Column::HostUserId.eq(user_id))
1468 .add(
1469 project::Column::Id
1470 .is_not_in(reshared_projects.iter().map(|project| project.id)),
1471 ),
1472 )
1473 .exec(&*tx)
1474 .await?;
1475
1476 let mut rejoined_projects = Vec::new();
1477 for rejoined_project in &rejoin_room.rejoined_projects {
1478 let project_id = ProjectId::from_proto(rejoined_project.id);
1479 let Some(project) = project::Entity::find_by_id(project_id)
1480 .one(&*tx)
1481 .await? else { continue };
1482
1483 let mut worktrees = Vec::new();
1484 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1485 for db_worktree in db_worktrees {
1486 let mut worktree = RejoinedWorktree {
1487 id: db_worktree.id as u64,
1488 abs_path: db_worktree.abs_path,
1489 root_name: db_worktree.root_name,
1490 visible: db_worktree.visible,
1491 updated_entries: Default::default(),
1492 removed_entries: Default::default(),
1493 updated_repositories: Default::default(),
1494 removed_repositories: Default::default(),
1495 diagnostic_summaries: Default::default(),
1496 scan_id: db_worktree.scan_id as u64,
1497 completed_scan_id: db_worktree.completed_scan_id as u64,
1498 };
1499
1500 let rejoined_worktree = rejoined_project
1501 .worktrees
1502 .iter()
1503 .find(|worktree| worktree.id == db_worktree.id as u64);
1504
1505 // File entries
1506 {
1507 let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1508 worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1509 } else {
1510 worktree_entry::Column::IsDeleted.eq(false)
1511 };
1512
1513 let mut db_entries = worktree_entry::Entity::find()
1514 .filter(
1515 Condition::all()
1516 .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1517 .add(entry_filter),
1518 )
1519 .stream(&*tx)
1520 .await?;
1521
1522 while let Some(db_entry) = db_entries.next().await {
1523 let db_entry = db_entry?;
1524 if db_entry.is_deleted {
1525 worktree.removed_entries.push(db_entry.id as u64);
1526 } else {
1527 worktree.updated_entries.push(proto::Entry {
1528 id: db_entry.id as u64,
1529 is_dir: db_entry.is_dir,
1530 path: db_entry.path,
1531 inode: db_entry.inode as u64,
1532 mtime: Some(proto::Timestamp {
1533 seconds: db_entry.mtime_seconds as u64,
1534 nanos: db_entry.mtime_nanos as u32,
1535 }),
1536 is_symlink: db_entry.is_symlink,
1537 is_ignored: db_entry.is_ignored,
1538 });
1539 }
1540 }
1541 }
1542
1543 // Repository Entries
1544 {
1545 let repository_entry_filter =
1546 if let Some(rejoined_worktree) = rejoined_worktree {
1547 worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
1548 } else {
1549 worktree_repository::Column::IsDeleted.eq(false)
1550 };
1551
1552 let mut db_repositories = worktree_repository::Entity::find()
1553 .filter(
1554 Condition::all()
1555 .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
1556 .add(repository_entry_filter),
1557 )
1558 .stream(&*tx)
1559 .await?;
1560
1561 while let Some(db_repository) = db_repositories.next().await {
1562 let db_repository = db_repository?;
1563 if db_repository.is_deleted {
1564 worktree
1565 .removed_repositories
1566 .push(db_repository.work_directory_id as u64);
1567 } else {
1568 worktree.updated_repositories.push(proto::RepositoryEntry {
1569 work_directory_id: db_repository.work_directory_id as u64,
1570 branch: db_repository.branch,
1571 removed_statuses: Default::default(),
1572 updated_statuses: Default::default(),
1573
1574 });
1575 }
1576 }
1577 }
1578
1579 worktrees.push(worktree);
1580 }
1581
1582 let language_servers = project
1583 .find_related(language_server::Entity)
1584 .all(&*tx)
1585 .await?
1586 .into_iter()
1587 .map(|language_server| proto::LanguageServer {
1588 id: language_server.id as u64,
1589 name: language_server.name,
1590 })
1591 .collect::<Vec<_>>();
1592
1593 let mut collaborators = project
1594 .find_related(project_collaborator::Entity)
1595 .all(&*tx)
1596 .await?;
1597 let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1598 .iter()
1599 .position(|collaborator| collaborator.user_id == user_id)
1600 {
1601 collaborators.swap_remove(self_collaborator_ix)
1602 } else {
1603 continue;
1604 };
1605 let old_connection_id = self_collaborator.connection();
1606 project_collaborator::Entity::update(project_collaborator::ActiveModel {
1607 connection_id: ActiveValue::set(connection.id as i32),
1608 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1609 ..self_collaborator.into_active_model()
1610 })
1611 .exec(&*tx)
1612 .await?;
1613
1614 let collaborators = collaborators
1615 .into_iter()
1616 .map(|collaborator| ProjectCollaborator {
1617 connection_id: collaborator.connection(),
1618 user_id: collaborator.user_id,
1619 replica_id: collaborator.replica_id,
1620 is_host: collaborator.is_host,
1621 })
1622 .collect::<Vec<_>>();
1623
1624 rejoined_projects.push(RejoinedProject {
1625 id: project_id,
1626 old_connection_id,
1627 collaborators,
1628 worktrees,
1629 language_servers,
1630 });
1631 }
1632
1633 let room = self.get_room(room_id, &tx).await?;
1634 Ok(RejoinedRoom {
1635 room,
1636 rejoined_projects,
1637 reshared_projects,
1638 })
1639 })
1640 .await
1641 }
1642
1643 pub async fn leave_room(
1644 &self,
1645 connection: ConnectionId,
1646 ) -> Result<Option<RoomGuard<LeftRoom>>> {
1647 self.optional_room_transaction(|tx| async move {
1648 let leaving_participant = room_participant::Entity::find()
1649 .filter(
1650 Condition::all()
1651 .add(
1652 room_participant::Column::AnsweringConnectionId
1653 .eq(connection.id as i32),
1654 )
1655 .add(
1656 room_participant::Column::AnsweringConnectionServerId
1657 .eq(connection.owner_id as i32),
1658 ),
1659 )
1660 .one(&*tx)
1661 .await?;
1662
1663 if let Some(leaving_participant) = leaving_participant {
1664 // Leave room.
1665 let room_id = leaving_participant.room_id;
1666 room_participant::Entity::delete_by_id(leaving_participant.id)
1667 .exec(&*tx)
1668 .await?;
1669
1670 // Cancel pending calls initiated by the leaving user.
1671 let called_participants = room_participant::Entity::find()
1672 .filter(
1673 Condition::all()
1674 .add(
1675 room_participant::Column::CallingUserId
1676 .eq(leaving_participant.user_id),
1677 )
1678 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1679 )
1680 .all(&*tx)
1681 .await?;
1682 room_participant::Entity::delete_many()
1683 .filter(
1684 room_participant::Column::Id
1685 .is_in(called_participants.iter().map(|participant| participant.id)),
1686 )
1687 .exec(&*tx)
1688 .await?;
1689 let canceled_calls_to_user_ids = called_participants
1690 .into_iter()
1691 .map(|participant| participant.user_id)
1692 .collect();
1693
1694 // Detect left projects.
1695 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1696 enum QueryProjectIds {
1697 ProjectId,
1698 }
1699 let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1700 .select_only()
1701 .column_as(
1702 project_collaborator::Column::ProjectId,
1703 QueryProjectIds::ProjectId,
1704 )
1705 .filter(
1706 Condition::all()
1707 .add(
1708 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1709 )
1710 .add(
1711 project_collaborator::Column::ConnectionServerId
1712 .eq(connection.owner_id as i32),
1713 ),
1714 )
1715 .into_values::<_, QueryProjectIds>()
1716 .all(&*tx)
1717 .await?;
1718 let mut left_projects = HashMap::default();
1719 let mut collaborators = project_collaborator::Entity::find()
1720 .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1721 .stream(&*tx)
1722 .await?;
1723 while let Some(collaborator) = collaborators.next().await {
1724 let collaborator = collaborator?;
1725 let left_project =
1726 left_projects
1727 .entry(collaborator.project_id)
1728 .or_insert(LeftProject {
1729 id: collaborator.project_id,
1730 host_user_id: Default::default(),
1731 connection_ids: Default::default(),
1732 host_connection_id: Default::default(),
1733 });
1734
1735 let collaborator_connection_id = collaborator.connection();
1736 if collaborator_connection_id != connection {
1737 left_project.connection_ids.push(collaborator_connection_id);
1738 }
1739
1740 if collaborator.is_host {
1741 left_project.host_user_id = collaborator.user_id;
1742 left_project.host_connection_id = collaborator_connection_id;
1743 }
1744 }
1745 drop(collaborators);
1746
1747 // Leave projects.
1748 project_collaborator::Entity::delete_many()
1749 .filter(
1750 Condition::all()
1751 .add(
1752 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1753 )
1754 .add(
1755 project_collaborator::Column::ConnectionServerId
1756 .eq(connection.owner_id as i32),
1757 ),
1758 )
1759 .exec(&*tx)
1760 .await?;
1761
1762 // Unshare projects.
1763 project::Entity::delete_many()
1764 .filter(
1765 Condition::all()
1766 .add(project::Column::RoomId.eq(room_id))
1767 .add(project::Column::HostConnectionId.eq(connection.id as i32))
1768 .add(
1769 project::Column::HostConnectionServerId
1770 .eq(connection.owner_id as i32),
1771 ),
1772 )
1773 .exec(&*tx)
1774 .await?;
1775
1776 let room = self.get_room(room_id, &tx).await?;
1777 if room.participants.is_empty() {
1778 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
1779 }
1780
1781 let left_room = LeftRoom {
1782 room,
1783 left_projects,
1784 canceled_calls_to_user_ids,
1785 };
1786
1787 if left_room.room.participants.is_empty() {
1788 self.rooms.remove(&room_id);
1789 }
1790
1791 Ok(Some((room_id, left_room)))
1792 } else {
1793 Ok(None)
1794 }
1795 })
1796 .await
1797 }
1798
1799 pub async fn follow(
1800 &self,
1801 project_id: ProjectId,
1802 leader_connection: ConnectionId,
1803 follower_connection: ConnectionId,
1804 ) -> Result<RoomGuard<proto::Room>> {
1805 let room_id = self.room_id_for_project(project_id).await?;
1806 self.room_transaction(room_id, |tx| async move {
1807 follower::ActiveModel {
1808 room_id: ActiveValue::set(room_id),
1809 project_id: ActiveValue::set(project_id),
1810 leader_connection_server_id: ActiveValue::set(ServerId(
1811 leader_connection.owner_id as i32,
1812 )),
1813 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1814 follower_connection_server_id: ActiveValue::set(ServerId(
1815 follower_connection.owner_id as i32,
1816 )),
1817 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1818 ..Default::default()
1819 }
1820 .insert(&*tx)
1821 .await?;
1822
1823 let room = self.get_room(room_id, &*tx).await?;
1824 Ok(room)
1825 })
1826 .await
1827 }
1828
1829 pub async fn unfollow(
1830 &self,
1831 project_id: ProjectId,
1832 leader_connection: ConnectionId,
1833 follower_connection: ConnectionId,
1834 ) -> Result<RoomGuard<proto::Room>> {
1835 let room_id = self.room_id_for_project(project_id).await?;
1836 self.room_transaction(room_id, |tx| async move {
1837 follower::Entity::delete_many()
1838 .filter(
1839 Condition::all()
1840 .add(follower::Column::ProjectId.eq(project_id))
1841 .add(
1842 follower::Column::LeaderConnectionServerId
1843 .eq(leader_connection.owner_id),
1844 )
1845 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1846 .add(
1847 follower::Column::FollowerConnectionServerId
1848 .eq(follower_connection.owner_id),
1849 )
1850 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1851 )
1852 .exec(&*tx)
1853 .await?;
1854
1855 let room = self.get_room(room_id, &*tx).await?;
1856 Ok(room)
1857 })
1858 .await
1859 }
1860
1861 pub async fn update_room_participant_location(
1862 &self,
1863 room_id: RoomId,
1864 connection: ConnectionId,
1865 location: proto::ParticipantLocation,
1866 ) -> Result<RoomGuard<proto::Room>> {
1867 self.room_transaction(room_id, |tx| async {
1868 let tx = tx;
1869 let location_kind;
1870 let location_project_id;
1871 match location
1872 .variant
1873 .as_ref()
1874 .ok_or_else(|| anyhow!("invalid location"))?
1875 {
1876 proto::participant_location::Variant::SharedProject(project) => {
1877 location_kind = 0;
1878 location_project_id = Some(ProjectId::from_proto(project.id));
1879 }
1880 proto::participant_location::Variant::UnsharedProject(_) => {
1881 location_kind = 1;
1882 location_project_id = None;
1883 }
1884 proto::participant_location::Variant::External(_) => {
1885 location_kind = 2;
1886 location_project_id = None;
1887 }
1888 }
1889
1890 let result = room_participant::Entity::update_many()
1891 .filter(
1892 Condition::all()
1893 .add(room_participant::Column::RoomId.eq(room_id))
1894 .add(
1895 room_participant::Column::AnsweringConnectionId
1896 .eq(connection.id as i32),
1897 )
1898 .add(
1899 room_participant::Column::AnsweringConnectionServerId
1900 .eq(connection.owner_id as i32),
1901 ),
1902 )
1903 .set(room_participant::ActiveModel {
1904 location_kind: ActiveValue::set(Some(location_kind)),
1905 location_project_id: ActiveValue::set(location_project_id),
1906 ..Default::default()
1907 })
1908 .exec(&*tx)
1909 .await?;
1910
1911 if result.rows_affected == 1 {
1912 let room = self.get_room(room_id, &tx).await?;
1913 Ok(room)
1914 } else {
1915 Err(anyhow!("could not update room participant location"))?
1916 }
1917 })
1918 .await
1919 }
1920
1921 pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1922 self.transaction(|tx| async move {
1923 let participant = room_participant::Entity::find()
1924 .filter(
1925 Condition::all()
1926 .add(
1927 room_participant::Column::AnsweringConnectionId
1928 .eq(connection.id as i32),
1929 )
1930 .add(
1931 room_participant::Column::AnsweringConnectionServerId
1932 .eq(connection.owner_id as i32),
1933 ),
1934 )
1935 .one(&*tx)
1936 .await?
1937 .ok_or_else(|| anyhow!("not a participant in any room"))?;
1938
1939 room_participant::Entity::update(room_participant::ActiveModel {
1940 answering_connection_lost: ActiveValue::set(true),
1941 ..participant.into_active_model()
1942 })
1943 .exec(&*tx)
1944 .await?;
1945
1946 Ok(())
1947 })
1948 .await
1949 }
1950
1951 fn build_incoming_call(
1952 room: &proto::Room,
1953 called_user_id: UserId,
1954 ) -> Option<proto::IncomingCall> {
1955 let pending_participant = room
1956 .pending_participants
1957 .iter()
1958 .find(|participant| participant.user_id == called_user_id.to_proto())?;
1959
1960 Some(proto::IncomingCall {
1961 room_id: room.id,
1962 calling_user_id: pending_participant.calling_user_id,
1963 participant_user_ids: room
1964 .participants
1965 .iter()
1966 .map(|participant| participant.user_id)
1967 .collect(),
1968 initial_project: room.participants.iter().find_map(|participant| {
1969 let initial_project_id = pending_participant.initial_project_id?;
1970 participant
1971 .projects
1972 .iter()
1973 .find(|project| project.id == initial_project_id)
1974 .cloned()
1975 }),
1976 })
1977 }
1978
1979 async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1980 let db_room = room::Entity::find_by_id(room_id)
1981 .one(tx)
1982 .await?
1983 .ok_or_else(|| anyhow!("could not find room"))?;
1984
1985 let mut db_participants = db_room
1986 .find_related(room_participant::Entity)
1987 .stream(tx)
1988 .await?;
1989 let mut participants = HashMap::default();
1990 let mut pending_participants = Vec::new();
1991 while let Some(db_participant) = db_participants.next().await {
1992 let db_participant = db_participant?;
1993 if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
1994 .answering_connection_id
1995 .zip(db_participant.answering_connection_server_id)
1996 {
1997 let location = match (
1998 db_participant.location_kind,
1999 db_participant.location_project_id,
2000 ) {
2001 (Some(0), Some(project_id)) => {
2002 Some(proto::participant_location::Variant::SharedProject(
2003 proto::participant_location::SharedProject {
2004 id: project_id.to_proto(),
2005 },
2006 ))
2007 }
2008 (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
2009 Default::default(),
2010 )),
2011 _ => Some(proto::participant_location::Variant::External(
2012 Default::default(),
2013 )),
2014 };
2015
2016 let answering_connection = ConnectionId {
2017 owner_id: answering_connection_server_id.0 as u32,
2018 id: answering_connection_id as u32,
2019 };
2020 participants.insert(
2021 answering_connection,
2022 proto::Participant {
2023 user_id: db_participant.user_id.to_proto(),
2024 peer_id: Some(answering_connection.into()),
2025 projects: Default::default(),
2026 location: Some(proto::ParticipantLocation { variant: location }),
2027 },
2028 );
2029 } else {
2030 pending_participants.push(proto::PendingParticipant {
2031 user_id: db_participant.user_id.to_proto(),
2032 calling_user_id: db_participant.calling_user_id.to_proto(),
2033 initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
2034 });
2035 }
2036 }
2037 drop(db_participants);
2038
2039 let mut db_projects = db_room
2040 .find_related(project::Entity)
2041 .find_with_related(worktree::Entity)
2042 .stream(tx)
2043 .await?;
2044
2045 while let Some(row) = db_projects.next().await {
2046 let (db_project, db_worktree) = row?;
2047 let host_connection = db_project.host_connection()?;
2048 if let Some(participant) = participants.get_mut(&host_connection) {
2049 let project = if let Some(project) = participant
2050 .projects
2051 .iter_mut()
2052 .find(|project| project.id == db_project.id.to_proto())
2053 {
2054 project
2055 } else {
2056 participant.projects.push(proto::ParticipantProject {
2057 id: db_project.id.to_proto(),
2058 worktree_root_names: Default::default(),
2059 });
2060 participant.projects.last_mut().unwrap()
2061 };
2062
2063 if let Some(db_worktree) = db_worktree {
2064 if db_worktree.visible {
2065 project.worktree_root_names.push(db_worktree.root_name);
2066 }
2067 }
2068 }
2069 }
2070 drop(db_projects);
2071
2072 let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
2073 let mut followers = Vec::new();
2074 while let Some(db_follower) = db_followers.next().await {
2075 let db_follower = db_follower?;
2076 followers.push(proto::Follower {
2077 leader_id: Some(db_follower.leader_connection().into()),
2078 follower_id: Some(db_follower.follower_connection().into()),
2079 project_id: db_follower.project_id.to_proto(),
2080 });
2081 }
2082
2083 Ok(proto::Room {
2084 id: db_room.id.to_proto(),
2085 live_kit_room: db_room.live_kit_room,
2086 participants: participants.into_values().collect(),
2087 pending_participants,
2088 followers,
2089 })
2090 }
2091
2092 // projects
2093
2094 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
2095 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2096 enum QueryAs {
2097 Count,
2098 }
2099
2100 self.transaction(|tx| async move {
2101 Ok(project::Entity::find()
2102 .select_only()
2103 .column_as(project::Column::Id.count(), QueryAs::Count)
2104 .inner_join(user::Entity)
2105 .filter(user::Column::Admin.eq(false))
2106 .into_values::<_, QueryAs>()
2107 .one(&*tx)
2108 .await?
2109 .unwrap_or(0i64) as usize)
2110 })
2111 .await
2112 }
2113
2114 pub async fn share_project(
2115 &self,
2116 room_id: RoomId,
2117 connection: ConnectionId,
2118 worktrees: &[proto::WorktreeMetadata],
2119 ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
2120 self.room_transaction(room_id, |tx| async move {
2121 let participant = room_participant::Entity::find()
2122 .filter(
2123 Condition::all()
2124 .add(
2125 room_participant::Column::AnsweringConnectionId
2126 .eq(connection.id as i32),
2127 )
2128 .add(
2129 room_participant::Column::AnsweringConnectionServerId
2130 .eq(connection.owner_id as i32),
2131 ),
2132 )
2133 .one(&*tx)
2134 .await?
2135 .ok_or_else(|| anyhow!("could not find participant"))?;
2136 if participant.room_id != room_id {
2137 return Err(anyhow!("shared project on unexpected room"))?;
2138 }
2139
2140 let project = project::ActiveModel {
2141 room_id: ActiveValue::set(participant.room_id),
2142 host_user_id: ActiveValue::set(participant.user_id),
2143 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
2144 host_connection_server_id: ActiveValue::set(Some(ServerId(
2145 connection.owner_id as i32,
2146 ))),
2147 ..Default::default()
2148 }
2149 .insert(&*tx)
2150 .await?;
2151
2152 if !worktrees.is_empty() {
2153 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2154 worktree::ActiveModel {
2155 id: ActiveValue::set(worktree.id as i64),
2156 project_id: ActiveValue::set(project.id),
2157 abs_path: ActiveValue::set(worktree.abs_path.clone()),
2158 root_name: ActiveValue::set(worktree.root_name.clone()),
2159 visible: ActiveValue::set(worktree.visible),
2160 scan_id: ActiveValue::set(0),
2161 completed_scan_id: ActiveValue::set(0),
2162 }
2163 }))
2164 .exec(&*tx)
2165 .await?;
2166 }
2167
2168 project_collaborator::ActiveModel {
2169 project_id: ActiveValue::set(project.id),
2170 connection_id: ActiveValue::set(connection.id as i32),
2171 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2172 user_id: ActiveValue::set(participant.user_id),
2173 replica_id: ActiveValue::set(ReplicaId(0)),
2174 is_host: ActiveValue::set(true),
2175 ..Default::default()
2176 }
2177 .insert(&*tx)
2178 .await?;
2179
2180 let room = self.get_room(room_id, &tx).await?;
2181 Ok((project.id, room))
2182 })
2183 .await
2184 }
2185
2186 pub async fn unshare_project(
2187 &self,
2188 project_id: ProjectId,
2189 connection: ConnectionId,
2190 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2191 let room_id = self.room_id_for_project(project_id).await?;
2192 self.room_transaction(room_id, |tx| async move {
2193 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2194
2195 let project = project::Entity::find_by_id(project_id)
2196 .one(&*tx)
2197 .await?
2198 .ok_or_else(|| anyhow!("project not found"))?;
2199 if project.host_connection()? == connection {
2200 project::Entity::delete(project.into_active_model())
2201 .exec(&*tx)
2202 .await?;
2203 let room = self.get_room(room_id, &tx).await?;
2204 Ok((room, guest_connection_ids))
2205 } else {
2206 Err(anyhow!("cannot unshare a project hosted by another user"))?
2207 }
2208 })
2209 .await
2210 }
2211
2212 pub async fn update_project(
2213 &self,
2214 project_id: ProjectId,
2215 connection: ConnectionId,
2216 worktrees: &[proto::WorktreeMetadata],
2217 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2218 let room_id = self.room_id_for_project(project_id).await?;
2219 self.room_transaction(room_id, |tx| async move {
2220 let project = project::Entity::find_by_id(project_id)
2221 .filter(
2222 Condition::all()
2223 .add(project::Column::HostConnectionId.eq(connection.id as i32))
2224 .add(
2225 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2226 ),
2227 )
2228 .one(&*tx)
2229 .await?
2230 .ok_or_else(|| anyhow!("no such project"))?;
2231
2232 self.update_project_worktrees(project.id, worktrees, &tx)
2233 .await?;
2234
2235 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2236 let room = self.get_room(project.room_id, &tx).await?;
2237 Ok((room, guest_connection_ids))
2238 })
2239 .await
2240 }
2241
2242 async fn update_project_worktrees(
2243 &self,
2244 project_id: ProjectId,
2245 worktrees: &[proto::WorktreeMetadata],
2246 tx: &DatabaseTransaction,
2247 ) -> Result<()> {
2248 if !worktrees.is_empty() {
2249 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2250 id: ActiveValue::set(worktree.id as i64),
2251 project_id: ActiveValue::set(project_id),
2252 abs_path: ActiveValue::set(worktree.abs_path.clone()),
2253 root_name: ActiveValue::set(worktree.root_name.clone()),
2254 visible: ActiveValue::set(worktree.visible),
2255 scan_id: ActiveValue::set(0),
2256 completed_scan_id: ActiveValue::set(0),
2257 }))
2258 .on_conflict(
2259 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2260 .update_column(worktree::Column::RootName)
2261 .to_owned(),
2262 )
2263 .exec(&*tx)
2264 .await?;
2265 }
2266
2267 worktree::Entity::delete_many()
2268 .filter(worktree::Column::ProjectId.eq(project_id).and(
2269 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2270 ))
2271 .exec(&*tx)
2272 .await?;
2273
2274 Ok(())
2275 }
2276
2277 pub async fn update_worktree(
2278 &self,
2279 update: &proto::UpdateWorktree,
2280 connection: ConnectionId,
2281 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2282 let project_id = ProjectId::from_proto(update.project_id);
2283 let worktree_id = update.worktree_id as i64;
2284 let room_id = self.room_id_for_project(project_id).await?;
2285 self.room_transaction(room_id, |tx| async move {
2286 // Ensure the update comes from the host.
2287 let _project = project::Entity::find_by_id(project_id)
2288 .filter(
2289 Condition::all()
2290 .add(project::Column::HostConnectionId.eq(connection.id as i32))
2291 .add(
2292 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2293 ),
2294 )
2295 .one(&*tx)
2296 .await?
2297 .ok_or_else(|| anyhow!("no such project"))?;
2298
2299 // Update metadata.
2300 worktree::Entity::update(worktree::ActiveModel {
2301 id: ActiveValue::set(worktree_id),
2302 project_id: ActiveValue::set(project_id),
2303 root_name: ActiveValue::set(update.root_name.clone()),
2304 scan_id: ActiveValue::set(update.scan_id as i64),
2305 completed_scan_id: if update.is_last_update {
2306 ActiveValue::set(update.scan_id as i64)
2307 } else {
2308 ActiveValue::default()
2309 },
2310 abs_path: ActiveValue::set(update.abs_path.clone()),
2311 ..Default::default()
2312 })
2313 .exec(&*tx)
2314 .await?;
2315
2316 if !update.updated_entries.is_empty() {
2317 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2318 let mtime = entry.mtime.clone().unwrap_or_default();
2319 worktree_entry::ActiveModel {
2320 project_id: ActiveValue::set(project_id),
2321 worktree_id: ActiveValue::set(worktree_id),
2322 id: ActiveValue::set(entry.id as i64),
2323 is_dir: ActiveValue::set(entry.is_dir),
2324 path: ActiveValue::set(entry.path.clone()),
2325 inode: ActiveValue::set(entry.inode as i64),
2326 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2327 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2328 is_symlink: ActiveValue::set(entry.is_symlink),
2329 is_ignored: ActiveValue::set(entry.is_ignored),
2330 is_deleted: ActiveValue::set(false),
2331 scan_id: ActiveValue::set(update.scan_id as i64),
2332 }
2333 }))
2334 .on_conflict(
2335 OnConflict::columns([
2336 worktree_entry::Column::ProjectId,
2337 worktree_entry::Column::WorktreeId,
2338 worktree_entry::Column::Id,
2339 ])
2340 .update_columns([
2341 worktree_entry::Column::IsDir,
2342 worktree_entry::Column::Path,
2343 worktree_entry::Column::Inode,
2344 worktree_entry::Column::MtimeSeconds,
2345 worktree_entry::Column::MtimeNanos,
2346 worktree_entry::Column::IsSymlink,
2347 worktree_entry::Column::IsIgnored,
2348 worktree_entry::Column::ScanId,
2349 ])
2350 .to_owned(),
2351 )
2352 .exec(&*tx)
2353 .await?;
2354 }
2355
2356 if !update.removed_entries.is_empty() {
2357 worktree_entry::Entity::update_many()
2358 .filter(
2359 worktree_entry::Column::ProjectId
2360 .eq(project_id)
2361 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2362 .and(
2363 worktree_entry::Column::Id
2364 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2365 ),
2366 )
2367 .set(worktree_entry::ActiveModel {
2368 is_deleted: ActiveValue::Set(true),
2369 scan_id: ActiveValue::Set(update.scan_id as i64),
2370 ..Default::default()
2371 })
2372 .exec(&*tx)
2373 .await?;
2374 }
2375
2376 if !update.updated_repositories.is_empty() {
2377 worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
2378 |repository| worktree_repository::ActiveModel {
2379 project_id: ActiveValue::set(project_id),
2380 worktree_id: ActiveValue::set(worktree_id),
2381 work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
2382 scan_id: ActiveValue::set(update.scan_id as i64),
2383 branch: ActiveValue::set(repository.branch.clone()),
2384 is_deleted: ActiveValue::set(false),
2385 },
2386 ))
2387 .on_conflict(
2388 OnConflict::columns([
2389 worktree_repository::Column::ProjectId,
2390 worktree_repository::Column::WorktreeId,
2391 worktree_repository::Column::WorkDirectoryId,
2392 ])
2393 .update_columns([
2394 worktree_repository::Column::ScanId,
2395 worktree_repository::Column::Branch,
2396 ])
2397 .to_owned(),
2398 )
2399 .exec(&*tx)
2400 .await?;
2401 }
2402
2403 if !update.removed_repositories.is_empty() {
2404 worktree_repository::Entity::update_many()
2405 .filter(
2406 worktree_repository::Column::ProjectId
2407 .eq(project_id)
2408 .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
2409 .and(
2410 worktree_repository::Column::WorkDirectoryId
2411 .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
2412 ),
2413 )
2414 .set(worktree_repository::ActiveModel {
2415 is_deleted: ActiveValue::Set(true),
2416 scan_id: ActiveValue::Set(update.scan_id as i64),
2417 ..Default::default()
2418 })
2419 .exec(&*tx)
2420 .await?;
2421 }
2422
2423 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2424 Ok(connection_ids)
2425 })
2426 .await
2427 }
2428
2429 pub async fn update_diagnostic_summary(
2430 &self,
2431 update: &proto::UpdateDiagnosticSummary,
2432 connection: ConnectionId,
2433 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2434 let project_id = ProjectId::from_proto(update.project_id);
2435 let worktree_id = update.worktree_id as i64;
2436 let room_id = self.room_id_for_project(project_id).await?;
2437 self.room_transaction(room_id, |tx| async move {
2438 let summary = update
2439 .summary
2440 .as_ref()
2441 .ok_or_else(|| anyhow!("invalid summary"))?;
2442
2443 // Ensure the update comes from the host.
2444 let project = project::Entity::find_by_id(project_id)
2445 .one(&*tx)
2446 .await?
2447 .ok_or_else(|| anyhow!("no such project"))?;
2448 if project.host_connection()? != connection {
2449 return Err(anyhow!("can't update a project hosted by someone else"))?;
2450 }
2451
2452 // Update summary.
2453 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2454 project_id: ActiveValue::set(project_id),
2455 worktree_id: ActiveValue::set(worktree_id),
2456 path: ActiveValue::set(summary.path.clone()),
2457 language_server_id: ActiveValue::set(summary.language_server_id as i64),
2458 error_count: ActiveValue::set(summary.error_count as i32),
2459 warning_count: ActiveValue::set(summary.warning_count as i32),
2460 ..Default::default()
2461 })
2462 .on_conflict(
2463 OnConflict::columns([
2464 worktree_diagnostic_summary::Column::ProjectId,
2465 worktree_diagnostic_summary::Column::WorktreeId,
2466 worktree_diagnostic_summary::Column::Path,
2467 ])
2468 .update_columns([
2469 worktree_diagnostic_summary::Column::LanguageServerId,
2470 worktree_diagnostic_summary::Column::ErrorCount,
2471 worktree_diagnostic_summary::Column::WarningCount,
2472 ])
2473 .to_owned(),
2474 )
2475 .exec(&*tx)
2476 .await?;
2477
2478 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2479 Ok(connection_ids)
2480 })
2481 .await
2482 }
2483
2484 pub async fn start_language_server(
2485 &self,
2486 update: &proto::StartLanguageServer,
2487 connection: ConnectionId,
2488 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2489 let project_id = ProjectId::from_proto(update.project_id);
2490 let room_id = self.room_id_for_project(project_id).await?;
2491 self.room_transaction(room_id, |tx| async move {
2492 let server = update
2493 .server
2494 .as_ref()
2495 .ok_or_else(|| anyhow!("invalid language server"))?;
2496
2497 // Ensure the update comes from the host.
2498 let project = project::Entity::find_by_id(project_id)
2499 .one(&*tx)
2500 .await?
2501 .ok_or_else(|| anyhow!("no such project"))?;
2502 if project.host_connection()? != connection {
2503 return Err(anyhow!("can't update a project hosted by someone else"))?;
2504 }
2505
2506 // Add the newly-started language server.
2507 language_server::Entity::insert(language_server::ActiveModel {
2508 project_id: ActiveValue::set(project_id),
2509 id: ActiveValue::set(server.id as i64),
2510 name: ActiveValue::set(server.name.clone()),
2511 ..Default::default()
2512 })
2513 .on_conflict(
2514 OnConflict::columns([
2515 language_server::Column::ProjectId,
2516 language_server::Column::Id,
2517 ])
2518 .update_column(language_server::Column::Name)
2519 .to_owned(),
2520 )
2521 .exec(&*tx)
2522 .await?;
2523
2524 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2525 Ok(connection_ids)
2526 })
2527 .await
2528 }
2529
2530 pub async fn join_project(
2531 &self,
2532 project_id: ProjectId,
2533 connection: ConnectionId,
2534 ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2535 let room_id = self.room_id_for_project(project_id).await?;
2536 self.room_transaction(room_id, |tx| async move {
2537 let participant = room_participant::Entity::find()
2538 .filter(
2539 Condition::all()
2540 .add(
2541 room_participant::Column::AnsweringConnectionId
2542 .eq(connection.id as i32),
2543 )
2544 .add(
2545 room_participant::Column::AnsweringConnectionServerId
2546 .eq(connection.owner_id as i32),
2547 ),
2548 )
2549 .one(&*tx)
2550 .await?
2551 .ok_or_else(|| anyhow!("must join a room first"))?;
2552
2553 let project = project::Entity::find_by_id(project_id)
2554 .one(&*tx)
2555 .await?
2556 .ok_or_else(|| anyhow!("no such project"))?;
2557 if project.room_id != participant.room_id {
2558 return Err(anyhow!("no such project"))?;
2559 }
2560
2561 let mut collaborators = project
2562 .find_related(project_collaborator::Entity)
2563 .all(&*tx)
2564 .await?;
2565 let replica_ids = collaborators
2566 .iter()
2567 .map(|c| c.replica_id)
2568 .collect::<HashSet<_>>();
2569 let mut replica_id = ReplicaId(1);
2570 while replica_ids.contains(&replica_id) {
2571 replica_id.0 += 1;
2572 }
2573 let new_collaborator = project_collaborator::ActiveModel {
2574 project_id: ActiveValue::set(project_id),
2575 connection_id: ActiveValue::set(connection.id as i32),
2576 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2577 user_id: ActiveValue::set(participant.user_id),
2578 replica_id: ActiveValue::set(replica_id),
2579 is_host: ActiveValue::set(false),
2580 ..Default::default()
2581 }
2582 .insert(&*tx)
2583 .await?;
2584 collaborators.push(new_collaborator);
2585
2586 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2587 let mut worktrees = db_worktrees
2588 .into_iter()
2589 .map(|db_worktree| {
2590 (
2591 db_worktree.id as u64,
2592 Worktree {
2593 id: db_worktree.id as u64,
2594 abs_path: db_worktree.abs_path,
2595 root_name: db_worktree.root_name,
2596 visible: db_worktree.visible,
2597 entries: Default::default(),
2598 repository_entries: Default::default(),
2599 diagnostic_summaries: Default::default(),
2600 scan_id: db_worktree.scan_id as u64,
2601 completed_scan_id: db_worktree.completed_scan_id as u64,
2602 },
2603 )
2604 })
2605 .collect::<BTreeMap<_, _>>();
2606
2607 // Populate worktree entries.
2608 {
2609 let mut db_entries = worktree_entry::Entity::find()
2610 .filter(
2611 Condition::all()
2612 .add(worktree_entry::Column::ProjectId.eq(project_id))
2613 .add(worktree_entry::Column::IsDeleted.eq(false)),
2614 )
2615 .stream(&*tx)
2616 .await?;
2617 while let Some(db_entry) = db_entries.next().await {
2618 let db_entry = db_entry?;
2619 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2620 worktree.entries.push(proto::Entry {
2621 id: db_entry.id as u64,
2622 is_dir: db_entry.is_dir,
2623 path: db_entry.path,
2624 inode: db_entry.inode as u64,
2625 mtime: Some(proto::Timestamp {
2626 seconds: db_entry.mtime_seconds as u64,
2627 nanos: db_entry.mtime_nanos as u32,
2628 }),
2629 is_symlink: db_entry.is_symlink,
2630 is_ignored: db_entry.is_ignored,
2631 });
2632 }
2633 }
2634 }
2635
2636 // Populate repository entries.
2637 {
2638 let mut db_repository_entries = worktree_repository::Entity::find()
2639 .filter(
2640 Condition::all()
2641 .add(worktree_repository::Column::ProjectId.eq(project_id))
2642 .add(worktree_repository::Column::IsDeleted.eq(false)),
2643 )
2644 .stream(&*tx)
2645 .await?;
2646 while let Some(db_repository_entry) = db_repository_entries.next().await {
2647 let db_repository_entry = db_repository_entry?;
2648 if let Some(worktree) =
2649 worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
2650 {
2651 worktree.repository_entries.push(proto::RepositoryEntry {
2652 work_directory_id: db_repository_entry.work_directory_id as u64,
2653 branch: db_repository_entry.branch,
2654 removed_statuses: Default::default(),
2655 updated_statuses: Default::default(),
2656
2657 });
2658 }
2659 }
2660 }
2661
2662 // Populate worktree diagnostic summaries.
2663 {
2664 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2665 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2666 .stream(&*tx)
2667 .await?;
2668 while let Some(db_summary) = db_summaries.next().await {
2669 let db_summary = db_summary?;
2670 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2671 worktree
2672 .diagnostic_summaries
2673 .push(proto::DiagnosticSummary {
2674 path: db_summary.path,
2675 language_server_id: db_summary.language_server_id as u64,
2676 error_count: db_summary.error_count as u32,
2677 warning_count: db_summary.warning_count as u32,
2678 });
2679 }
2680 }
2681 }
2682
2683 // Populate language servers.
2684 let language_servers = project
2685 .find_related(language_server::Entity)
2686 .all(&*tx)
2687 .await?;
2688
2689 let project = Project {
2690 collaborators: collaborators
2691 .into_iter()
2692 .map(|collaborator| ProjectCollaborator {
2693 connection_id: collaborator.connection(),
2694 user_id: collaborator.user_id,
2695 replica_id: collaborator.replica_id,
2696 is_host: collaborator.is_host,
2697 })
2698 .collect(),
2699 worktrees,
2700 language_servers: language_servers
2701 .into_iter()
2702 .map(|language_server| proto::LanguageServer {
2703 id: language_server.id as u64,
2704 name: language_server.name,
2705 })
2706 .collect(),
2707 };
2708 Ok((project, replica_id as ReplicaId))
2709 })
2710 .await
2711 }
2712
2713 pub async fn leave_project(
2714 &self,
2715 project_id: ProjectId,
2716 connection: ConnectionId,
2717 ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
2718 let room_id = self.room_id_for_project(project_id).await?;
2719 self.room_transaction(room_id, |tx| async move {
2720 let result = project_collaborator::Entity::delete_many()
2721 .filter(
2722 Condition::all()
2723 .add(project_collaborator::Column::ProjectId.eq(project_id))
2724 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2725 .add(
2726 project_collaborator::Column::ConnectionServerId
2727 .eq(connection.owner_id as i32),
2728 ),
2729 )
2730 .exec(&*tx)
2731 .await?;
2732 if result.rows_affected == 0 {
2733 Err(anyhow!("not a collaborator on this project"))?;
2734 }
2735
2736 let project = project::Entity::find_by_id(project_id)
2737 .one(&*tx)
2738 .await?
2739 .ok_or_else(|| anyhow!("no such project"))?;
2740 let collaborators = project
2741 .find_related(project_collaborator::Entity)
2742 .all(&*tx)
2743 .await?;
2744 let connection_ids = collaborators
2745 .into_iter()
2746 .map(|collaborator| collaborator.connection())
2747 .collect();
2748
2749 follower::Entity::delete_many()
2750 .filter(
2751 Condition::any()
2752 .add(
2753 Condition::all()
2754 .add(follower::Column::ProjectId.eq(project_id))
2755 .add(
2756 follower::Column::LeaderConnectionServerId
2757 .eq(connection.owner_id),
2758 )
2759 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
2760 )
2761 .add(
2762 Condition::all()
2763 .add(follower::Column::ProjectId.eq(project_id))
2764 .add(
2765 follower::Column::FollowerConnectionServerId
2766 .eq(connection.owner_id),
2767 )
2768 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
2769 ),
2770 )
2771 .exec(&*tx)
2772 .await?;
2773
2774 let room = self.get_room(project.room_id, &tx).await?;
2775 let left_project = LeftProject {
2776 id: project_id,
2777 host_user_id: project.host_user_id,
2778 host_connection_id: project.host_connection()?,
2779 connection_ids,
2780 };
2781 Ok((room, left_project))
2782 })
2783 .await
2784 }
2785
2786 pub async fn project_collaborators(
2787 &self,
2788 project_id: ProjectId,
2789 connection_id: ConnectionId,
2790 ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
2791 let room_id = self.room_id_for_project(project_id).await?;
2792 self.room_transaction(room_id, |tx| async move {
2793 let collaborators = project_collaborator::Entity::find()
2794 .filter(project_collaborator::Column::ProjectId.eq(project_id))
2795 .all(&*tx)
2796 .await?
2797 .into_iter()
2798 .map(|collaborator| ProjectCollaborator {
2799 connection_id: collaborator.connection(),
2800 user_id: collaborator.user_id,
2801 replica_id: collaborator.replica_id,
2802 is_host: collaborator.is_host,
2803 })
2804 .collect::<Vec<_>>();
2805
2806 if collaborators
2807 .iter()
2808 .any(|collaborator| collaborator.connection_id == connection_id)
2809 {
2810 Ok(collaborators)
2811 } else {
2812 Err(anyhow!("no such project"))?
2813 }
2814 })
2815 .await
2816 }
2817
2818 pub async fn project_connection_ids(
2819 &self,
2820 project_id: ProjectId,
2821 connection_id: ConnectionId,
2822 ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
2823 let room_id = self.room_id_for_project(project_id).await?;
2824 self.room_transaction(room_id, |tx| async move {
2825 let mut collaborators = project_collaborator::Entity::find()
2826 .filter(project_collaborator::Column::ProjectId.eq(project_id))
2827 .stream(&*tx)
2828 .await?;
2829
2830 let mut connection_ids = HashSet::default();
2831 while let Some(collaborator) = collaborators.next().await {
2832 let collaborator = collaborator?;
2833 connection_ids.insert(collaborator.connection());
2834 }
2835
2836 if connection_ids.contains(&connection_id) {
2837 Ok(connection_ids)
2838 } else {
2839 Err(anyhow!("no such project"))?
2840 }
2841 })
2842 .await
2843 }
2844
2845 async fn project_guest_connection_ids(
2846 &self,
2847 project_id: ProjectId,
2848 tx: &DatabaseTransaction,
2849 ) -> Result<Vec<ConnectionId>> {
2850 let mut collaborators = project_collaborator::Entity::find()
2851 .filter(
2852 project_collaborator::Column::ProjectId
2853 .eq(project_id)
2854 .and(project_collaborator::Column::IsHost.eq(false)),
2855 )
2856 .stream(tx)
2857 .await?;
2858
2859 let mut guest_connection_ids = Vec::new();
2860 while let Some(collaborator) = collaborators.next().await {
2861 let collaborator = collaborator?;
2862 guest_connection_ids.push(collaborator.connection());
2863 }
2864 Ok(guest_connection_ids)
2865 }
2866
2867 async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
2868 self.transaction(|tx| async move {
2869 let project = project::Entity::find_by_id(project_id)
2870 .one(&*tx)
2871 .await?
2872 .ok_or_else(|| anyhow!("project {} not found", project_id))?;
2873 Ok(project.room_id)
2874 })
2875 .await
2876 }
2877
2878 // access tokens
2879
2880 pub async fn create_access_token(
2881 &self,
2882 user_id: UserId,
2883 access_token_hash: &str,
2884 max_access_token_count: usize,
2885 ) -> Result<AccessTokenId> {
2886 self.transaction(|tx| async {
2887 let tx = tx;
2888
2889 let token = access_token::ActiveModel {
2890 user_id: ActiveValue::set(user_id),
2891 hash: ActiveValue::set(access_token_hash.into()),
2892 ..Default::default()
2893 }
2894 .insert(&*tx)
2895 .await?;
2896
2897 access_token::Entity::delete_many()
2898 .filter(
2899 access_token::Column::Id.in_subquery(
2900 Query::select()
2901 .column(access_token::Column::Id)
2902 .from(access_token::Entity)
2903 .and_where(access_token::Column::UserId.eq(user_id))
2904 .order_by(access_token::Column::Id, sea_orm::Order::Desc)
2905 .limit(10000)
2906 .offset(max_access_token_count as u64)
2907 .to_owned(),
2908 ),
2909 )
2910 .exec(&*tx)
2911 .await?;
2912 Ok(token.id)
2913 })
2914 .await
2915 }
2916
2917 pub async fn get_access_token(
2918 &self,
2919 access_token_id: AccessTokenId,
2920 ) -> Result<access_token::Model> {
2921 self.transaction(|tx| async move {
2922 Ok(access_token::Entity::find_by_id(access_token_id)
2923 .one(&*tx)
2924 .await?
2925 .ok_or_else(|| anyhow!("no such access token"))?)
2926 })
2927 .await
2928 }
2929
2930 async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
2931 where
2932 F: Send + Fn(TransactionHandle) -> Fut,
2933 Fut: Send + Future<Output = Result<T>>,
2934 {
2935 let body = async {
2936 let mut i = 0;
2937 loop {
2938 let (tx, result) = self.with_transaction(&f).await?;
2939 match result {
2940 Ok(result) => match tx.commit().await.map_err(Into::into) {
2941 Ok(()) => return Ok(result),
2942 Err(error) => {
2943 if !self.retry_on_serialization_error(&error, i).await {
2944 return Err(error);
2945 }
2946 }
2947 },
2948 Err(error) => {
2949 tx.rollback().await?;
2950 if !self.retry_on_serialization_error(&error, i).await {
2951 return Err(error);
2952 }
2953 }
2954 }
2955 i += 1;
2956 }
2957 };
2958
2959 self.run(body).await
2960 }
2961
2962 async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
2963 where
2964 F: Send + Fn(TransactionHandle) -> Fut,
2965 Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
2966 {
2967 let body = async {
2968 let mut i = 0;
2969 loop {
2970 let (tx, result) = self.with_transaction(&f).await?;
2971 match result {
2972 Ok(Some((room_id, data))) => {
2973 let lock = self.rooms.entry(room_id).or_default().clone();
2974 let _guard = lock.lock_owned().await;
2975 match tx.commit().await.map_err(Into::into) {
2976 Ok(()) => {
2977 return Ok(Some(RoomGuard {
2978 data,
2979 _guard,
2980 _not_send: PhantomData,
2981 }));
2982 }
2983 Err(error) => {
2984 if !self.retry_on_serialization_error(&error, i).await {
2985 return Err(error);
2986 }
2987 }
2988 }
2989 }
2990 Ok(None) => match tx.commit().await.map_err(Into::into) {
2991 Ok(()) => return Ok(None),
2992 Err(error) => {
2993 if !self.retry_on_serialization_error(&error, i).await {
2994 return Err(error);
2995 }
2996 }
2997 },
2998 Err(error) => {
2999 tx.rollback().await?;
3000 if !self.retry_on_serialization_error(&error, i).await {
3001 return Err(error);
3002 }
3003 }
3004 }
3005 i += 1;
3006 }
3007 };
3008
3009 self.run(body).await
3010 }
3011
3012 async fn room_transaction<F, Fut, T>(&self, room_id: RoomId, f: F) -> Result<RoomGuard<T>>
3013 where
3014 F: Send + Fn(TransactionHandle) -> Fut,
3015 Fut: Send + Future<Output = Result<T>>,
3016 {
3017 let body = async {
3018 let mut i = 0;
3019 loop {
3020 let lock = self.rooms.entry(room_id).or_default().clone();
3021 let _guard = lock.lock_owned().await;
3022 let (tx, result) = self.with_transaction(&f).await?;
3023 match result {
3024 Ok(data) => match tx.commit().await.map_err(Into::into) {
3025 Ok(()) => {
3026 return Ok(RoomGuard {
3027 data,
3028 _guard,
3029 _not_send: PhantomData,
3030 });
3031 }
3032 Err(error) => {
3033 if !self.retry_on_serialization_error(&error, i).await {
3034 return Err(error);
3035 }
3036 }
3037 },
3038 Err(error) => {
3039 tx.rollback().await?;
3040 if !self.retry_on_serialization_error(&error, i).await {
3041 return Err(error);
3042 }
3043 }
3044 }
3045 i += 1;
3046 }
3047 };
3048
3049 self.run(body).await
3050 }
3051
3052 async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
3053 where
3054 F: Send + Fn(TransactionHandle) -> Fut,
3055 Fut: Send + Future<Output = Result<T>>,
3056 {
3057 let tx = self
3058 .pool
3059 .begin_with_config(Some(IsolationLevel::Serializable), None)
3060 .await?;
3061
3062 let mut tx = Arc::new(Some(tx));
3063 let result = f(TransactionHandle(tx.clone())).await;
3064 let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
3065 return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
3066 };
3067
3068 Ok((tx, result))
3069 }
3070
3071 async fn run<F, T>(&self, future: F) -> Result<T>
3072 where
3073 F: Future<Output = Result<T>>,
3074 {
3075 #[cfg(test)]
3076 {
3077 if let Executor::Deterministic(executor) = &self.executor {
3078 executor.simulate_random_delay().await;
3079 }
3080
3081 self.runtime.as_ref().unwrap().block_on(future)
3082 }
3083
3084 #[cfg(not(test))]
3085 {
3086 future.await
3087 }
3088 }
3089
3090 async fn retry_on_serialization_error(&self, error: &Error, prev_attempt_count: u32) -> bool {
3091 // If the error is due to a failure to serialize concurrent transactions, then retry
3092 // this transaction after a delay. With each subsequent retry, double the delay duration.
3093 // Also vary the delay randomly in order to ensure different database connections retry
3094 // at different times.
3095 if is_serialization_error(error) {
3096 let base_delay = 4_u64 << prev_attempt_count.min(16);
3097 let randomized_delay = base_delay as f32 * self.rng.lock().await.gen_range(0.5..=2.0);
3098 log::info!(
3099 "retrying transaction after serialization error. delay: {} ms.",
3100 randomized_delay
3101 );
3102 self.executor
3103 .sleep(Duration::from_millis(randomized_delay as u64))
3104 .await;
3105 true
3106 } else {
3107 false
3108 }
3109 }
3110}
3111
3112fn is_serialization_error(error: &Error) -> bool {
3113 const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
3114 match error {
3115 Error::Database(
3116 DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
3117 | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
3118 ) if error
3119 .as_database_error()
3120 .and_then(|error| error.code())
3121 .as_deref()
3122 == Some(SERIALIZATION_FAILURE_CODE) =>
3123 {
3124 true
3125 }
3126 _ => false,
3127 }
3128}
3129
3130struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
3131
3132impl Deref for TransactionHandle {
3133 type Target = DatabaseTransaction;
3134
3135 fn deref(&self) -> &Self::Target {
3136 self.0.as_ref().as_ref().unwrap()
3137 }
3138}
3139
3140pub struct RoomGuard<T> {
3141 data: T,
3142 _guard: OwnedMutexGuard<()>,
3143 _not_send: PhantomData<Rc<()>>,
3144}
3145
3146impl<T> Deref for RoomGuard<T> {
3147 type Target = T;
3148
3149 fn deref(&self) -> &T {
3150 &self.data
3151 }
3152}
3153
3154impl<T> DerefMut for RoomGuard<T> {
3155 fn deref_mut(&mut self) -> &mut T {
3156 &mut self.data
3157 }
3158}
3159
3160#[derive(Debug, Serialize, Deserialize)]
3161pub struct NewUserParams {
3162 pub github_login: String,
3163 pub github_user_id: i32,
3164 pub invite_count: i32,
3165}
3166
3167#[derive(Debug)]
3168pub struct NewUserResult {
3169 pub user_id: UserId,
3170 pub metrics_id: String,
3171 pub inviting_user_id: Option<UserId>,
3172 pub signup_device_id: Option<String>,
3173}
3174
3175fn random_invite_code() -> String {
3176 nanoid::nanoid!(16)
3177}
3178
3179fn random_email_confirmation_code() -> String {
3180 nanoid::nanoid!(64)
3181}
3182
3183macro_rules! id_type {
3184 ($name:ident) => {
3185 #[derive(
3186 Clone,
3187 Copy,
3188 Debug,
3189 Default,
3190 PartialEq,
3191 Eq,
3192 PartialOrd,
3193 Ord,
3194 Hash,
3195 Serialize,
3196 Deserialize,
3197 )]
3198 #[serde(transparent)]
3199 pub struct $name(pub i32);
3200
3201 impl $name {
3202 #[allow(unused)]
3203 pub const MAX: Self = Self(i32::MAX);
3204
3205 #[allow(unused)]
3206 pub fn from_proto(value: u64) -> Self {
3207 Self(value as i32)
3208 }
3209
3210 #[allow(unused)]
3211 pub fn to_proto(self) -> u64 {
3212 self.0 as u64
3213 }
3214 }
3215
3216 impl std::fmt::Display for $name {
3217 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
3218 self.0.fmt(f)
3219 }
3220 }
3221
3222 impl From<$name> for sea_query::Value {
3223 fn from(value: $name) -> Self {
3224 sea_query::Value::Int(Some(value.0))
3225 }
3226 }
3227
3228 impl sea_orm::TryGetable for $name {
3229 fn try_get(
3230 res: &sea_orm::QueryResult,
3231 pre: &str,
3232 col: &str,
3233 ) -> Result<Self, sea_orm::TryGetError> {
3234 Ok(Self(i32::try_get(res, pre, col)?))
3235 }
3236 }
3237
3238 impl sea_query::ValueType for $name {
3239 fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
3240 match v {
3241 Value::TinyInt(Some(int)) => {
3242 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3243 }
3244 Value::SmallInt(Some(int)) => {
3245 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3246 }
3247 Value::Int(Some(int)) => {
3248 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3249 }
3250 Value::BigInt(Some(int)) => {
3251 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3252 }
3253 Value::TinyUnsigned(Some(int)) => {
3254 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3255 }
3256 Value::SmallUnsigned(Some(int)) => {
3257 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3258 }
3259 Value::Unsigned(Some(int)) => {
3260 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3261 }
3262 Value::BigUnsigned(Some(int)) => {
3263 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3264 }
3265 _ => Err(sea_query::ValueTypeErr),
3266 }
3267 }
3268
3269 fn type_name() -> String {
3270 stringify!($name).into()
3271 }
3272
3273 fn array_type() -> sea_query::ArrayType {
3274 sea_query::ArrayType::Int
3275 }
3276
3277 fn column_type() -> sea_query::ColumnType {
3278 sea_query::ColumnType::Integer(None)
3279 }
3280 }
3281
3282 impl sea_orm::TryFromU64 for $name {
3283 fn try_from_u64(n: u64) -> Result<Self, DbErr> {
3284 Ok(Self(n.try_into().map_err(|_| {
3285 DbErr::ConvertFromU64(concat!(
3286 "error converting ",
3287 stringify!($name),
3288 " to u64"
3289 ))
3290 })?))
3291 }
3292 }
3293
3294 impl sea_query::Nullable for $name {
3295 fn null() -> Value {
3296 Value::Int(None)
3297 }
3298 }
3299 };
3300}
3301
3302id_type!(AccessTokenId);
3303id_type!(ContactId);
3304id_type!(FollowerId);
3305id_type!(RoomId);
3306id_type!(RoomParticipantId);
3307id_type!(ProjectId);
3308id_type!(ProjectCollaboratorId);
3309id_type!(ReplicaId);
3310id_type!(ServerId);
3311id_type!(SignupId);
3312id_type!(UserId);
3313
3314pub struct RejoinedRoom {
3315 pub room: proto::Room,
3316 pub rejoined_projects: Vec<RejoinedProject>,
3317 pub reshared_projects: Vec<ResharedProject>,
3318}
3319
3320pub struct ResharedProject {
3321 pub id: ProjectId,
3322 pub old_connection_id: ConnectionId,
3323 pub collaborators: Vec<ProjectCollaborator>,
3324 pub worktrees: Vec<proto::WorktreeMetadata>,
3325}
3326
3327pub struct RejoinedProject {
3328 pub id: ProjectId,
3329 pub old_connection_id: ConnectionId,
3330 pub collaborators: Vec<ProjectCollaborator>,
3331 pub worktrees: Vec<RejoinedWorktree>,
3332 pub language_servers: Vec<proto::LanguageServer>,
3333}
3334
3335#[derive(Debug)]
3336pub struct RejoinedWorktree {
3337 pub id: u64,
3338 pub abs_path: String,
3339 pub root_name: String,
3340 pub visible: bool,
3341 pub updated_entries: Vec<proto::Entry>,
3342 pub removed_entries: Vec<u64>,
3343 pub updated_repositories: Vec<proto::RepositoryEntry>,
3344 pub removed_repositories: Vec<u64>,
3345 pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3346 pub scan_id: u64,
3347 pub completed_scan_id: u64,
3348}
3349
3350pub struct LeftRoom {
3351 pub room: proto::Room,
3352 pub left_projects: HashMap<ProjectId, LeftProject>,
3353 pub canceled_calls_to_user_ids: Vec<UserId>,
3354}
3355
3356pub struct RefreshedRoom {
3357 pub room: proto::Room,
3358 pub stale_participant_user_ids: Vec<UserId>,
3359 pub canceled_calls_to_user_ids: Vec<UserId>,
3360}
3361
3362pub struct Project {
3363 pub collaborators: Vec<ProjectCollaborator>,
3364 pub worktrees: BTreeMap<u64, Worktree>,
3365 pub language_servers: Vec<proto::LanguageServer>,
3366}
3367
3368pub struct ProjectCollaborator {
3369 pub connection_id: ConnectionId,
3370 pub user_id: UserId,
3371 pub replica_id: ReplicaId,
3372 pub is_host: bool,
3373}
3374
3375impl ProjectCollaborator {
3376 pub fn to_proto(&self) -> proto::Collaborator {
3377 proto::Collaborator {
3378 peer_id: Some(self.connection_id.into()),
3379 replica_id: self.replica_id.0 as u32,
3380 user_id: self.user_id.to_proto(),
3381 }
3382 }
3383}
3384
3385#[derive(Debug)]
3386pub struct LeftProject {
3387 pub id: ProjectId,
3388 pub host_user_id: UserId,
3389 pub host_connection_id: ConnectionId,
3390 pub connection_ids: Vec<ConnectionId>,
3391}
3392
3393pub struct Worktree {
3394 pub id: u64,
3395 pub abs_path: String,
3396 pub root_name: String,
3397 pub visible: bool,
3398 pub entries: Vec<proto::Entry>,
3399 pub repository_entries: Vec<proto::RepositoryEntry>,
3400 pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3401 pub scan_id: u64,
3402 pub completed_scan_id: u64,
3403}
3404
3405#[cfg(test)]
3406pub use test::*;
3407
3408#[cfg(test)]
3409mod test {
3410 use super::*;
3411 use gpui::executor::Background;
3412 use lazy_static::lazy_static;
3413 use parking_lot::Mutex;
3414 use sea_orm::ConnectionTrait;
3415 use sqlx::migrate::MigrateDatabase;
3416 use std::sync::Arc;
3417
3418 pub struct TestDb {
3419 pub db: Option<Arc<Database>>,
3420 pub connection: Option<sqlx::AnyConnection>,
3421 }
3422
3423 impl TestDb {
3424 pub fn sqlite(background: Arc<Background>) -> Self {
3425 let url = format!("sqlite::memory:");
3426 let runtime = tokio::runtime::Builder::new_current_thread()
3427 .enable_io()
3428 .enable_time()
3429 .build()
3430 .unwrap();
3431
3432 let mut db = runtime.block_on(async {
3433 let mut options = ConnectOptions::new(url);
3434 options.max_connections(5);
3435 let db = Database::new(options, Executor::Deterministic(background))
3436 .await
3437 .unwrap();
3438 let sql = include_str!(concat!(
3439 env!("CARGO_MANIFEST_DIR"),
3440 "/migrations.sqlite/20221109000000_test_schema.sql"
3441 ));
3442 db.pool
3443 .execute(sea_orm::Statement::from_string(
3444 db.pool.get_database_backend(),
3445 sql.into(),
3446 ))
3447 .await
3448 .unwrap();
3449 db
3450 });
3451
3452 db.runtime = Some(runtime);
3453
3454 Self {
3455 db: Some(Arc::new(db)),
3456 connection: None,
3457 }
3458 }
3459
3460 pub fn postgres(background: Arc<Background>) -> Self {
3461 lazy_static! {
3462 static ref LOCK: Mutex<()> = Mutex::new(());
3463 }
3464
3465 let _guard = LOCK.lock();
3466 let mut rng = StdRng::from_entropy();
3467 let url = format!(
3468 "postgres://postgres@localhost/zed-test-{}",
3469 rng.gen::<u128>()
3470 );
3471 let runtime = tokio::runtime::Builder::new_current_thread()
3472 .enable_io()
3473 .enable_time()
3474 .build()
3475 .unwrap();
3476
3477 let mut db = runtime.block_on(async {
3478 sqlx::Postgres::create_database(&url)
3479 .await
3480 .expect("failed to create test db");
3481 let mut options = ConnectOptions::new(url);
3482 options
3483 .max_connections(5)
3484 .idle_timeout(Duration::from_secs(0));
3485 let db = Database::new(options, Executor::Deterministic(background))
3486 .await
3487 .unwrap();
3488 let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
3489 db.migrate(Path::new(migrations_path), false).await.unwrap();
3490 db
3491 });
3492
3493 db.runtime = Some(runtime);
3494
3495 Self {
3496 db: Some(Arc::new(db)),
3497 connection: None,
3498 }
3499 }
3500
3501 pub fn db(&self) -> &Arc<Database> {
3502 self.db.as_ref().unwrap()
3503 }
3504 }
3505
3506 impl Drop for TestDb {
3507 fn drop(&mut self) {
3508 let db = self.db.take().unwrap();
3509 if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
3510 db.runtime.as_ref().unwrap().block_on(async {
3511 use util::ResultExt;
3512 let query = "
3513 SELECT pg_terminate_backend(pg_stat_activity.pid)
3514 FROM pg_stat_activity
3515 WHERE
3516 pg_stat_activity.datname = current_database() AND
3517 pid <> pg_backend_pid();
3518 ";
3519 db.pool
3520 .execute(sea_orm::Statement::from_string(
3521 db.pool.get_database_backend(),
3522 query.into(),
3523 ))
3524 .await
3525 .log_err();
3526 sqlx::Postgres::drop_database(db.options.get_url())
3527 .await
3528 .log_err();
3529 })
3530 }
3531 }
3532 }
3533}