1mod access_token;
2mod contact;
3mod follower;
4mod language_server;
5mod project;
6mod project_collaborator;
7mod room;
8mod room_participant;
9mod server;
10mod signup;
11#[cfg(test)]
12mod tests;
13mod user;
14mod worktree;
15mod worktree_diagnostic_summary;
16mod worktree_entry;
17
18use crate::{Error, Result};
19use anyhow::anyhow;
20use collections::{BTreeMap, HashMap, HashSet};
21pub use contact::Contact;
22use dashmap::DashMap;
23use futures::StreamExt;
24use hyper::StatusCode;
25use rpc::{proto, ConnectionId};
26use sea_orm::Condition;
27pub use sea_orm::ConnectOptions;
28use sea_orm::{
29 entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
30 DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
31 Statement, TransactionTrait,
32};
33use sea_query::{Alias, Expr, OnConflict, Query};
34use serde::{Deserialize, Serialize};
35pub use signup::{Invite, NewSignup, WaitlistSummary};
36use sqlx::migrate::{Migrate, Migration, MigrationSource};
37use sqlx::Connection;
38use std::ops::{Deref, DerefMut};
39use std::path::Path;
40use std::time::Duration;
41use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
42use tokio::sync::{Mutex, OwnedMutexGuard};
43pub use user::Model as User;
44
45pub struct Database {
46 options: ConnectOptions,
47 pool: DatabaseConnection,
48 rooms: DashMap<RoomId, Arc<Mutex<()>>>,
49 #[cfg(test)]
50 background: Option<std::sync::Arc<gpui::executor::Background>>,
51 #[cfg(test)]
52 runtime: Option<tokio::runtime::Runtime>,
53}
54
55impl Database {
56 pub async fn new(options: ConnectOptions) -> Result<Self> {
57 Ok(Self {
58 options: options.clone(),
59 pool: sea_orm::Database::connect(options).await?,
60 rooms: DashMap::with_capacity(16384),
61 #[cfg(test)]
62 background: None,
63 #[cfg(test)]
64 runtime: None,
65 })
66 }
67
68 #[cfg(test)]
69 pub fn reset(&self) {
70 self.rooms.clear();
71 }
72
73 pub async fn migrate(
74 &self,
75 migrations_path: &Path,
76 ignore_checksum_mismatch: bool,
77 ) -> anyhow::Result<Vec<(Migration, Duration)>> {
78 let migrations = MigrationSource::resolve(migrations_path)
79 .await
80 .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
81
82 let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
83
84 connection.ensure_migrations_table().await?;
85 let applied_migrations: HashMap<_, _> = connection
86 .list_applied_migrations()
87 .await?
88 .into_iter()
89 .map(|m| (m.version, m))
90 .collect();
91
92 let mut new_migrations = Vec::new();
93 for migration in migrations {
94 match applied_migrations.get(&migration.version) {
95 Some(applied_migration) => {
96 if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
97 {
98 Err(anyhow!(
99 "checksum mismatch for applied migration {}",
100 migration.description
101 ))?;
102 }
103 }
104 None => {
105 let elapsed = connection.apply(&migration).await?;
106 new_migrations.push((migration, elapsed));
107 }
108 }
109 }
110
111 Ok(new_migrations)
112 }
113
114 pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
115 self.transaction(|tx| async move {
116 let server = server::ActiveModel {
117 environment: ActiveValue::set(environment.into()),
118 ..Default::default()
119 }
120 .insert(&*tx)
121 .await?;
122 Ok(server.id)
123 })
124 .await
125 }
126
127 pub async fn stale_room_ids(
128 &self,
129 environment: &str,
130 new_server_id: ServerId,
131 ) -> Result<Vec<RoomId>> {
132 self.transaction(|tx| async move {
133 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
134 enum QueryAs {
135 RoomId,
136 }
137
138 let stale_server_epochs = self
139 .stale_server_ids(environment, new_server_id, &tx)
140 .await?;
141 Ok(room_participant::Entity::find()
142 .select_only()
143 .column(room_participant::Column::RoomId)
144 .distinct()
145 .filter(
146 room_participant::Column::AnsweringConnectionServerId
147 .is_in(stale_server_epochs),
148 )
149 .into_values::<_, QueryAs>()
150 .all(&*tx)
151 .await?)
152 })
153 .await
154 }
155
156 pub async fn refresh_room(
157 &self,
158 room_id: RoomId,
159 new_server_id: ServerId,
160 ) -> Result<RoomGuard<RefreshedRoom>> {
161 self.room_transaction(room_id, |tx| async move {
162 let stale_participant_filter = Condition::all()
163 .add(room_participant::Column::RoomId.eq(room_id))
164 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
165 .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
166
167 let stale_participant_user_ids = room_participant::Entity::find()
168 .filter(stale_participant_filter.clone())
169 .all(&*tx)
170 .await?
171 .into_iter()
172 .map(|participant| participant.user_id)
173 .collect::<Vec<_>>();
174
175 // Delete participants who failed to reconnect.
176 room_participant::Entity::delete_many()
177 .filter(stale_participant_filter)
178 .exec(&*tx)
179 .await?;
180
181 let room = self.get_room(room_id, &tx).await?;
182 let mut canceled_calls_to_user_ids = Vec::new();
183 // Delete the room if it becomes empty and cancel pending calls.
184 if room.participants.is_empty() {
185 canceled_calls_to_user_ids.extend(
186 room.pending_participants
187 .iter()
188 .map(|pending_participant| UserId::from_proto(pending_participant.user_id)),
189 );
190 room_participant::Entity::delete_many()
191 .filter(room_participant::Column::RoomId.eq(room_id))
192 .exec(&*tx)
193 .await?;
194 project::Entity::delete_many()
195 .filter(project::Column::RoomId.eq(room_id))
196 .exec(&*tx)
197 .await?;
198 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
199 }
200
201 Ok(RefreshedRoom {
202 room,
203 stale_participant_user_ids,
204 canceled_calls_to_user_ids,
205 })
206 })
207 .await
208 }
209
210 pub async fn delete_stale_servers(
211 &self,
212 environment: &str,
213 new_server_id: ServerId,
214 ) -> Result<()> {
215 self.transaction(|tx| async move {
216 server::Entity::delete_many()
217 .filter(
218 Condition::all()
219 .add(server::Column::Environment.eq(environment))
220 .add(server::Column::Id.ne(new_server_id)),
221 )
222 .exec(&*tx)
223 .await?;
224 Ok(())
225 })
226 .await
227 }
228
229 async fn stale_server_ids(
230 &self,
231 environment: &str,
232 new_server_id: ServerId,
233 tx: &DatabaseTransaction,
234 ) -> Result<Vec<ServerId>> {
235 let stale_servers = server::Entity::find()
236 .filter(
237 Condition::all()
238 .add(server::Column::Environment.eq(environment))
239 .add(server::Column::Id.ne(new_server_id)),
240 )
241 .all(&*tx)
242 .await?;
243 Ok(stale_servers.into_iter().map(|server| server.id).collect())
244 }
245
246 // users
247
248 pub async fn create_user(
249 &self,
250 email_address: &str,
251 admin: bool,
252 params: NewUserParams,
253 ) -> Result<NewUserResult> {
254 self.transaction(|tx| async {
255 let tx = tx;
256 let user = user::Entity::insert(user::ActiveModel {
257 email_address: ActiveValue::set(Some(email_address.into())),
258 github_login: ActiveValue::set(params.github_login.clone()),
259 github_user_id: ActiveValue::set(Some(params.github_user_id)),
260 admin: ActiveValue::set(admin),
261 metrics_id: ActiveValue::set(Uuid::new_v4()),
262 ..Default::default()
263 })
264 .on_conflict(
265 OnConflict::column(user::Column::GithubLogin)
266 .update_column(user::Column::GithubLogin)
267 .to_owned(),
268 )
269 .exec_with_returning(&*tx)
270 .await?;
271
272 Ok(NewUserResult {
273 user_id: user.id,
274 metrics_id: user.metrics_id.to_string(),
275 signup_device_id: None,
276 inviting_user_id: None,
277 })
278 })
279 .await
280 }
281
282 pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
283 self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
284 .await
285 }
286
287 pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
288 self.transaction(|tx| async {
289 let tx = tx;
290 Ok(user::Entity::find()
291 .filter(user::Column::Id.is_in(ids.iter().copied()))
292 .all(&*tx)
293 .await?)
294 })
295 .await
296 }
297
298 pub async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>> {
299 self.transaction(|tx| async move {
300 Ok(user::Entity::find()
301 .filter(user::Column::GithubLogin.eq(github_login))
302 .one(&*tx)
303 .await?)
304 })
305 .await
306 }
307
308 pub async fn get_or_create_user_by_github_account(
309 &self,
310 github_login: &str,
311 github_user_id: Option<i32>,
312 github_email: Option<&str>,
313 ) -> Result<Option<User>> {
314 self.transaction(|tx| async move {
315 let tx = &*tx;
316 if let Some(github_user_id) = github_user_id {
317 if let Some(user_by_github_user_id) = user::Entity::find()
318 .filter(user::Column::GithubUserId.eq(github_user_id))
319 .one(tx)
320 .await?
321 {
322 let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
323 user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
324 Ok(Some(user_by_github_user_id.update(tx).await?))
325 } else if let Some(user_by_github_login) = user::Entity::find()
326 .filter(user::Column::GithubLogin.eq(github_login))
327 .one(tx)
328 .await?
329 {
330 let mut user_by_github_login = user_by_github_login.into_active_model();
331 user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
332 Ok(Some(user_by_github_login.update(tx).await?))
333 } else {
334 let user = user::Entity::insert(user::ActiveModel {
335 email_address: ActiveValue::set(github_email.map(|email| email.into())),
336 github_login: ActiveValue::set(github_login.into()),
337 github_user_id: ActiveValue::set(Some(github_user_id)),
338 admin: ActiveValue::set(false),
339 invite_count: ActiveValue::set(0),
340 invite_code: ActiveValue::set(None),
341 metrics_id: ActiveValue::set(Uuid::new_v4()),
342 ..Default::default()
343 })
344 .exec_with_returning(&*tx)
345 .await?;
346 Ok(Some(user))
347 }
348 } else {
349 Ok(user::Entity::find()
350 .filter(user::Column::GithubLogin.eq(github_login))
351 .one(tx)
352 .await?)
353 }
354 })
355 .await
356 }
357
358 pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
359 self.transaction(|tx| async move {
360 Ok(user::Entity::find()
361 .order_by_asc(user::Column::GithubLogin)
362 .limit(limit as u64)
363 .offset(page as u64 * limit as u64)
364 .all(&*tx)
365 .await?)
366 })
367 .await
368 }
369
370 pub async fn get_users_with_no_invites(
371 &self,
372 invited_by_another_user: bool,
373 ) -> Result<Vec<User>> {
374 self.transaction(|tx| async move {
375 Ok(user::Entity::find()
376 .filter(
377 user::Column::InviteCount
378 .eq(0)
379 .and(if invited_by_another_user {
380 user::Column::InviterId.is_not_null()
381 } else {
382 user::Column::InviterId.is_null()
383 }),
384 )
385 .all(&*tx)
386 .await?)
387 })
388 .await
389 }
390
391 pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
392 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
393 enum QueryAs {
394 MetricsId,
395 }
396
397 self.transaction(|tx| async move {
398 let metrics_id: Uuid = user::Entity::find_by_id(id)
399 .select_only()
400 .column(user::Column::MetricsId)
401 .into_values::<_, QueryAs>()
402 .one(&*tx)
403 .await?
404 .ok_or_else(|| anyhow!("could not find user"))?;
405 Ok(metrics_id.to_string())
406 })
407 .await
408 }
409
410 pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
411 self.transaction(|tx| async move {
412 user::Entity::update_many()
413 .filter(user::Column::Id.eq(id))
414 .set(user::ActiveModel {
415 admin: ActiveValue::set(is_admin),
416 ..Default::default()
417 })
418 .exec(&*tx)
419 .await?;
420 Ok(())
421 })
422 .await
423 }
424
425 pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
426 self.transaction(|tx| async move {
427 user::Entity::update_many()
428 .filter(user::Column::Id.eq(id))
429 .set(user::ActiveModel {
430 connected_once: ActiveValue::set(connected_once),
431 ..Default::default()
432 })
433 .exec(&*tx)
434 .await?;
435 Ok(())
436 })
437 .await
438 }
439
440 pub async fn destroy_user(&self, id: UserId) -> Result<()> {
441 self.transaction(|tx| async move {
442 access_token::Entity::delete_many()
443 .filter(access_token::Column::UserId.eq(id))
444 .exec(&*tx)
445 .await?;
446 user::Entity::delete_by_id(id).exec(&*tx).await?;
447 Ok(())
448 })
449 .await
450 }
451
452 // contacts
453
454 pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
455 #[derive(Debug, FromQueryResult)]
456 struct ContactWithUserBusyStatuses {
457 user_id_a: UserId,
458 user_id_b: UserId,
459 a_to_b: bool,
460 accepted: bool,
461 should_notify: bool,
462 user_a_busy: bool,
463 user_b_busy: bool,
464 }
465
466 self.transaction(|tx| async move {
467 let user_a_participant = Alias::new("user_a_participant");
468 let user_b_participant = Alias::new("user_b_participant");
469 let mut db_contacts = contact::Entity::find()
470 .column_as(
471 Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
472 .is_not_null(),
473 "user_a_busy",
474 )
475 .column_as(
476 Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
477 .is_not_null(),
478 "user_b_busy",
479 )
480 .filter(
481 contact::Column::UserIdA
482 .eq(user_id)
483 .or(contact::Column::UserIdB.eq(user_id)),
484 )
485 .join_as(
486 JoinType::LeftJoin,
487 contact::Relation::UserARoomParticipant.def(),
488 user_a_participant,
489 )
490 .join_as(
491 JoinType::LeftJoin,
492 contact::Relation::UserBRoomParticipant.def(),
493 user_b_participant,
494 )
495 .into_model::<ContactWithUserBusyStatuses>()
496 .stream(&*tx)
497 .await?;
498
499 let mut contacts = Vec::new();
500 while let Some(db_contact) = db_contacts.next().await {
501 let db_contact = db_contact?;
502 if db_contact.user_id_a == user_id {
503 if db_contact.accepted {
504 contacts.push(Contact::Accepted {
505 user_id: db_contact.user_id_b,
506 should_notify: db_contact.should_notify && db_contact.a_to_b,
507 busy: db_contact.user_b_busy,
508 });
509 } else if db_contact.a_to_b {
510 contacts.push(Contact::Outgoing {
511 user_id: db_contact.user_id_b,
512 })
513 } else {
514 contacts.push(Contact::Incoming {
515 user_id: db_contact.user_id_b,
516 should_notify: db_contact.should_notify,
517 });
518 }
519 } else if db_contact.accepted {
520 contacts.push(Contact::Accepted {
521 user_id: db_contact.user_id_a,
522 should_notify: db_contact.should_notify && !db_contact.a_to_b,
523 busy: db_contact.user_a_busy,
524 });
525 } else if db_contact.a_to_b {
526 contacts.push(Contact::Incoming {
527 user_id: db_contact.user_id_a,
528 should_notify: db_contact.should_notify,
529 });
530 } else {
531 contacts.push(Contact::Outgoing {
532 user_id: db_contact.user_id_a,
533 });
534 }
535 }
536
537 contacts.sort_unstable_by_key(|contact| contact.user_id());
538
539 Ok(contacts)
540 })
541 .await
542 }
543
544 pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
545 self.transaction(|tx| async move {
546 let participant = room_participant::Entity::find()
547 .filter(room_participant::Column::UserId.eq(user_id))
548 .one(&*tx)
549 .await?;
550 Ok(participant.is_some())
551 })
552 .await
553 }
554
555 pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
556 self.transaction(|tx| async move {
557 let (id_a, id_b) = if user_id_1 < user_id_2 {
558 (user_id_1, user_id_2)
559 } else {
560 (user_id_2, user_id_1)
561 };
562
563 Ok(contact::Entity::find()
564 .filter(
565 contact::Column::UserIdA
566 .eq(id_a)
567 .and(contact::Column::UserIdB.eq(id_b))
568 .and(contact::Column::Accepted.eq(true)),
569 )
570 .one(&*tx)
571 .await?
572 .is_some())
573 })
574 .await
575 }
576
577 pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
578 self.transaction(|tx| async move {
579 let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
580 (sender_id, receiver_id, true)
581 } else {
582 (receiver_id, sender_id, false)
583 };
584
585 let rows_affected = contact::Entity::insert(contact::ActiveModel {
586 user_id_a: ActiveValue::set(id_a),
587 user_id_b: ActiveValue::set(id_b),
588 a_to_b: ActiveValue::set(a_to_b),
589 accepted: ActiveValue::set(false),
590 should_notify: ActiveValue::set(true),
591 ..Default::default()
592 })
593 .on_conflict(
594 OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
595 .values([
596 (contact::Column::Accepted, true.into()),
597 (contact::Column::ShouldNotify, false.into()),
598 ])
599 .action_and_where(
600 contact::Column::Accepted.eq(false).and(
601 contact::Column::AToB
602 .eq(a_to_b)
603 .and(contact::Column::UserIdA.eq(id_b))
604 .or(contact::Column::AToB
605 .ne(a_to_b)
606 .and(contact::Column::UserIdA.eq(id_a))),
607 ),
608 )
609 .to_owned(),
610 )
611 .exec_without_returning(&*tx)
612 .await?;
613
614 if rows_affected == 1 {
615 Ok(())
616 } else {
617 Err(anyhow!("contact already requested"))?
618 }
619 })
620 .await
621 }
622
623 /// Returns a bool indicating whether the removed contact had originally accepted or not
624 ///
625 /// Deletes the contact identified by the requester and responder ids, and then returns
626 /// whether the deleted contact had originally accepted or was a pending contact request.
627 ///
628 /// # Arguments
629 ///
630 /// * `requester_id` - The user that initiates this request
631 /// * `responder_id` - The user that will be removed
632 pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<bool> {
633 self.transaction(|tx| async move {
634 let (id_a, id_b) = if responder_id < requester_id {
635 (responder_id, requester_id)
636 } else {
637 (requester_id, responder_id)
638 };
639
640 let contact = contact::Entity::find()
641 .filter(
642 contact::Column::UserIdA
643 .eq(id_a)
644 .and(contact::Column::UserIdB.eq(id_b)),
645 )
646 .one(&*tx)
647 .await?
648 .ok_or_else(|| anyhow!("no such contact"))?;
649
650 contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
651 Ok(contact.accepted)
652 })
653 .await
654 }
655
656 pub async fn dismiss_contact_notification(
657 &self,
658 user_id: UserId,
659 contact_user_id: UserId,
660 ) -> Result<()> {
661 self.transaction(|tx| async move {
662 let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
663 (user_id, contact_user_id, true)
664 } else {
665 (contact_user_id, user_id, false)
666 };
667
668 let result = contact::Entity::update_many()
669 .set(contact::ActiveModel {
670 should_notify: ActiveValue::set(false),
671 ..Default::default()
672 })
673 .filter(
674 contact::Column::UserIdA
675 .eq(id_a)
676 .and(contact::Column::UserIdB.eq(id_b))
677 .and(
678 contact::Column::AToB
679 .eq(a_to_b)
680 .and(contact::Column::Accepted.eq(true))
681 .or(contact::Column::AToB
682 .ne(a_to_b)
683 .and(contact::Column::Accepted.eq(false))),
684 ),
685 )
686 .exec(&*tx)
687 .await?;
688 if result.rows_affected == 0 {
689 Err(anyhow!("no such contact request"))?
690 } else {
691 Ok(())
692 }
693 })
694 .await
695 }
696
697 pub async fn respond_to_contact_request(
698 &self,
699 responder_id: UserId,
700 requester_id: UserId,
701 accept: bool,
702 ) -> Result<()> {
703 self.transaction(|tx| async move {
704 let (id_a, id_b, a_to_b) = if responder_id < requester_id {
705 (responder_id, requester_id, false)
706 } else {
707 (requester_id, responder_id, true)
708 };
709 let rows_affected = if accept {
710 let result = contact::Entity::update_many()
711 .set(contact::ActiveModel {
712 accepted: ActiveValue::set(true),
713 should_notify: ActiveValue::set(true),
714 ..Default::default()
715 })
716 .filter(
717 contact::Column::UserIdA
718 .eq(id_a)
719 .and(contact::Column::UserIdB.eq(id_b))
720 .and(contact::Column::AToB.eq(a_to_b)),
721 )
722 .exec(&*tx)
723 .await?;
724 result.rows_affected
725 } else {
726 let result = contact::Entity::delete_many()
727 .filter(
728 contact::Column::UserIdA
729 .eq(id_a)
730 .and(contact::Column::UserIdB.eq(id_b))
731 .and(contact::Column::AToB.eq(a_to_b))
732 .and(contact::Column::Accepted.eq(false)),
733 )
734 .exec(&*tx)
735 .await?;
736
737 result.rows_affected
738 };
739
740 if rows_affected == 1 {
741 Ok(())
742 } else {
743 Err(anyhow!("no such contact request"))?
744 }
745 })
746 .await
747 }
748
749 pub fn fuzzy_like_string(string: &str) -> String {
750 let mut result = String::with_capacity(string.len() * 2 + 1);
751 for c in string.chars() {
752 if c.is_alphanumeric() {
753 result.push('%');
754 result.push(c);
755 }
756 }
757 result.push('%');
758 result
759 }
760
761 pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
762 self.transaction(|tx| async {
763 let tx = tx;
764 let like_string = Self::fuzzy_like_string(name_query);
765 let query = "
766 SELECT users.*
767 FROM users
768 WHERE github_login ILIKE $1
769 ORDER BY github_login <-> $2
770 LIMIT $3
771 ";
772
773 Ok(user::Entity::find()
774 .from_raw_sql(Statement::from_sql_and_values(
775 self.pool.get_database_backend(),
776 query.into(),
777 vec![like_string.into(), name_query.into(), limit.into()],
778 ))
779 .all(&*tx)
780 .await?)
781 })
782 .await
783 }
784
785 // signups
786
787 pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
788 self.transaction(|tx| async move {
789 signup::Entity::insert(signup::ActiveModel {
790 email_address: ActiveValue::set(signup.email_address.clone()),
791 email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
792 email_confirmation_sent: ActiveValue::set(false),
793 platform_mac: ActiveValue::set(signup.platform_mac),
794 platform_windows: ActiveValue::set(signup.platform_windows),
795 platform_linux: ActiveValue::set(signup.platform_linux),
796 platform_unknown: ActiveValue::set(false),
797 editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
798 programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
799 device_id: ActiveValue::set(signup.device_id.clone()),
800 added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
801 ..Default::default()
802 })
803 .on_conflict(
804 OnConflict::column(signup::Column::EmailAddress)
805 .update_columns([
806 signup::Column::PlatformMac,
807 signup::Column::PlatformWindows,
808 signup::Column::PlatformLinux,
809 signup::Column::EditorFeatures,
810 signup::Column::ProgrammingLanguages,
811 signup::Column::DeviceId,
812 signup::Column::AddedToMailingList,
813 ])
814 .to_owned(),
815 )
816 .exec(&*tx)
817 .await?;
818 Ok(())
819 })
820 .await
821 }
822
823 pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
824 self.transaction(|tx| async move {
825 let signup = signup::Entity::find()
826 .filter(signup::Column::EmailAddress.eq(email_address))
827 .one(&*tx)
828 .await?
829 .ok_or_else(|| {
830 anyhow!("signup with email address {} doesn't exist", email_address)
831 })?;
832
833 Ok(signup)
834 })
835 .await
836 }
837
838 pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
839 self.transaction(|tx| async move {
840 let query = "
841 SELECT
842 COUNT(*) as count,
843 COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
844 COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
845 COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
846 COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
847 FROM (
848 SELECT *
849 FROM signups
850 WHERE
851 NOT email_confirmation_sent
852 ) AS unsent
853 ";
854 Ok(
855 WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
856 self.pool.get_database_backend(),
857 query.into(),
858 vec![],
859 ))
860 .one(&*tx)
861 .await?
862 .ok_or_else(|| anyhow!("invalid result"))?,
863 )
864 })
865 .await
866 }
867
868 pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
869 let emails = invites
870 .iter()
871 .map(|s| s.email_address.as_str())
872 .collect::<Vec<_>>();
873 self.transaction(|tx| async {
874 let tx = tx;
875 signup::Entity::update_many()
876 .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
877 .set(signup::ActiveModel {
878 email_confirmation_sent: ActiveValue::set(true),
879 ..Default::default()
880 })
881 .exec(&*tx)
882 .await?;
883 Ok(())
884 })
885 .await
886 }
887
888 pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
889 self.transaction(|tx| async move {
890 Ok(signup::Entity::find()
891 .select_only()
892 .column(signup::Column::EmailAddress)
893 .column(signup::Column::EmailConfirmationCode)
894 .filter(
895 signup::Column::EmailConfirmationSent.eq(false).and(
896 signup::Column::PlatformMac
897 .eq(true)
898 .or(signup::Column::PlatformUnknown.eq(true)),
899 ),
900 )
901 .order_by_asc(signup::Column::CreatedAt)
902 .limit(count as u64)
903 .into_model()
904 .all(&*tx)
905 .await?)
906 })
907 .await
908 }
909
910 // invite codes
911
912 pub async fn create_invite_from_code(
913 &self,
914 code: &str,
915 email_address: &str,
916 device_id: Option<&str>,
917 added_to_mailing_list: bool,
918 ) -> Result<Invite> {
919 self.transaction(|tx| async move {
920 let existing_user = user::Entity::find()
921 .filter(user::Column::EmailAddress.eq(email_address))
922 .one(&*tx)
923 .await?;
924
925 if existing_user.is_some() {
926 Err(anyhow!("email address is already in use"))?;
927 }
928
929 let inviting_user_with_invites = match user::Entity::find()
930 .filter(
931 user::Column::InviteCode
932 .eq(code)
933 .and(user::Column::InviteCount.gt(0)),
934 )
935 .one(&*tx)
936 .await?
937 {
938 Some(inviting_user) => inviting_user,
939 None => {
940 return Err(Error::Http(
941 StatusCode::UNAUTHORIZED,
942 "unable to find an invite code with invites remaining".to_string(),
943 ))?
944 }
945 };
946 user::Entity::update_many()
947 .filter(
948 user::Column::Id
949 .eq(inviting_user_with_invites.id)
950 .and(user::Column::InviteCount.gt(0)),
951 )
952 .col_expr(
953 user::Column::InviteCount,
954 Expr::col(user::Column::InviteCount).sub(1),
955 )
956 .exec(&*tx)
957 .await?;
958
959 let signup = signup::Entity::insert(signup::ActiveModel {
960 email_address: ActiveValue::set(email_address.into()),
961 email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
962 email_confirmation_sent: ActiveValue::set(false),
963 inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
964 platform_linux: ActiveValue::set(false),
965 platform_mac: ActiveValue::set(false),
966 platform_windows: ActiveValue::set(false),
967 platform_unknown: ActiveValue::set(true),
968 device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
969 added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
970 ..Default::default()
971 })
972 .on_conflict(
973 OnConflict::column(signup::Column::EmailAddress)
974 .update_column(signup::Column::InvitingUserId)
975 .to_owned(),
976 )
977 .exec_with_returning(&*tx)
978 .await?;
979
980 Ok(Invite {
981 email_address: signup.email_address,
982 email_confirmation_code: signup.email_confirmation_code,
983 })
984 })
985 .await
986 }
987
988 pub async fn create_user_from_invite(
989 &self,
990 invite: &Invite,
991 user: NewUserParams,
992 ) -> Result<Option<NewUserResult>> {
993 self.transaction(|tx| async {
994 let tx = tx;
995 let signup = signup::Entity::find()
996 .filter(
997 signup::Column::EmailAddress
998 .eq(invite.email_address.as_str())
999 .and(
1000 signup::Column::EmailConfirmationCode
1001 .eq(invite.email_confirmation_code.as_str()),
1002 ),
1003 )
1004 .one(&*tx)
1005 .await?
1006 .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
1007
1008 if signup.user_id.is_some() {
1009 return Ok(None);
1010 }
1011
1012 let user = user::Entity::insert(user::ActiveModel {
1013 email_address: ActiveValue::set(Some(invite.email_address.clone())),
1014 github_login: ActiveValue::set(user.github_login.clone()),
1015 github_user_id: ActiveValue::set(Some(user.github_user_id)),
1016 admin: ActiveValue::set(false),
1017 invite_count: ActiveValue::set(user.invite_count),
1018 invite_code: ActiveValue::set(Some(random_invite_code())),
1019 metrics_id: ActiveValue::set(Uuid::new_v4()),
1020 ..Default::default()
1021 })
1022 .on_conflict(
1023 OnConflict::column(user::Column::GithubLogin)
1024 .update_columns([
1025 user::Column::EmailAddress,
1026 user::Column::GithubUserId,
1027 user::Column::Admin,
1028 ])
1029 .to_owned(),
1030 )
1031 .exec_with_returning(&*tx)
1032 .await?;
1033
1034 let mut signup = signup.into_active_model();
1035 signup.user_id = ActiveValue::set(Some(user.id));
1036 let signup = signup.update(&*tx).await?;
1037
1038 if let Some(inviting_user_id) = signup.inviting_user_id {
1039 let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1040 (inviting_user_id, user.id, true)
1041 } else {
1042 (user.id, inviting_user_id, false)
1043 };
1044
1045 contact::Entity::insert(contact::ActiveModel {
1046 user_id_a: ActiveValue::set(user_id_a),
1047 user_id_b: ActiveValue::set(user_id_b),
1048 a_to_b: ActiveValue::set(a_to_b),
1049 should_notify: ActiveValue::set(true),
1050 accepted: ActiveValue::set(true),
1051 ..Default::default()
1052 })
1053 .on_conflict(OnConflict::new().do_nothing().to_owned())
1054 .exec_without_returning(&*tx)
1055 .await?;
1056 }
1057
1058 Ok(Some(NewUserResult {
1059 user_id: user.id,
1060 metrics_id: user.metrics_id.to_string(),
1061 inviting_user_id: signup.inviting_user_id,
1062 signup_device_id: signup.device_id,
1063 }))
1064 })
1065 .await
1066 }
1067
1068 pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1069 self.transaction(|tx| async move {
1070 if count > 0 {
1071 user::Entity::update_many()
1072 .filter(
1073 user::Column::Id
1074 .eq(id)
1075 .and(user::Column::InviteCode.is_null()),
1076 )
1077 .set(user::ActiveModel {
1078 invite_code: ActiveValue::set(Some(random_invite_code())),
1079 ..Default::default()
1080 })
1081 .exec(&*tx)
1082 .await?;
1083 }
1084
1085 user::Entity::update_many()
1086 .filter(user::Column::Id.eq(id))
1087 .set(user::ActiveModel {
1088 invite_count: ActiveValue::set(count),
1089 ..Default::default()
1090 })
1091 .exec(&*tx)
1092 .await?;
1093 Ok(())
1094 })
1095 .await
1096 }
1097
1098 pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1099 self.transaction(|tx| async move {
1100 match user::Entity::find_by_id(id).one(&*tx).await? {
1101 Some(user) if user.invite_code.is_some() => {
1102 Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1103 }
1104 _ => Ok(None),
1105 }
1106 })
1107 .await
1108 }
1109
1110 pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1111 self.transaction(|tx| async move {
1112 user::Entity::find()
1113 .filter(user::Column::InviteCode.eq(code))
1114 .one(&*tx)
1115 .await?
1116 .ok_or_else(|| {
1117 Error::Http(
1118 StatusCode::NOT_FOUND,
1119 "that invite code does not exist".to_string(),
1120 )
1121 })
1122 })
1123 .await
1124 }
1125
1126 // rooms
1127
1128 pub async fn incoming_call_for_user(
1129 &self,
1130 user_id: UserId,
1131 ) -> Result<Option<proto::IncomingCall>> {
1132 self.transaction(|tx| async move {
1133 let pending_participant = room_participant::Entity::find()
1134 .filter(
1135 room_participant::Column::UserId
1136 .eq(user_id)
1137 .and(room_participant::Column::AnsweringConnectionId.is_null()),
1138 )
1139 .one(&*tx)
1140 .await?;
1141
1142 if let Some(pending_participant) = pending_participant {
1143 let room = self.get_room(pending_participant.room_id, &tx).await?;
1144 Ok(Self::build_incoming_call(&room, user_id))
1145 } else {
1146 Ok(None)
1147 }
1148 })
1149 .await
1150 }
1151
1152 pub async fn create_room(
1153 &self,
1154 user_id: UserId,
1155 connection: ConnectionId,
1156 live_kit_room: &str,
1157 ) -> Result<proto::Room> {
1158 self.transaction(|tx| async move {
1159 let room = room::ActiveModel {
1160 live_kit_room: ActiveValue::set(live_kit_room.into()),
1161 ..Default::default()
1162 }
1163 .insert(&*tx)
1164 .await?;
1165 room_participant::ActiveModel {
1166 room_id: ActiveValue::set(room.id),
1167 user_id: ActiveValue::set(user_id),
1168 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1169 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1170 connection.owner_id as i32,
1171 ))),
1172 answering_connection_lost: ActiveValue::set(false),
1173 calling_user_id: ActiveValue::set(user_id),
1174 calling_connection_id: ActiveValue::set(connection.id as i32),
1175 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1176 connection.owner_id as i32,
1177 ))),
1178 ..Default::default()
1179 }
1180 .insert(&*tx)
1181 .await?;
1182
1183 let room = self.get_room(room.id, &tx).await?;
1184 Ok(room)
1185 })
1186 .await
1187 }
1188
1189 pub async fn call(
1190 &self,
1191 room_id: RoomId,
1192 calling_user_id: UserId,
1193 calling_connection: ConnectionId,
1194 called_user_id: UserId,
1195 initial_project_id: Option<ProjectId>,
1196 ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1197 self.room_transaction(room_id, |tx| async move {
1198 room_participant::ActiveModel {
1199 room_id: ActiveValue::set(room_id),
1200 user_id: ActiveValue::set(called_user_id),
1201 answering_connection_lost: ActiveValue::set(false),
1202 calling_user_id: ActiveValue::set(calling_user_id),
1203 calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1204 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1205 calling_connection.owner_id as i32,
1206 ))),
1207 initial_project_id: ActiveValue::set(initial_project_id),
1208 ..Default::default()
1209 }
1210 .insert(&*tx)
1211 .await?;
1212
1213 let room = self.get_room(room_id, &tx).await?;
1214 let incoming_call = Self::build_incoming_call(&room, called_user_id)
1215 .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1216 Ok((room, incoming_call))
1217 })
1218 .await
1219 }
1220
1221 pub async fn call_failed(
1222 &self,
1223 room_id: RoomId,
1224 called_user_id: UserId,
1225 ) -> Result<RoomGuard<proto::Room>> {
1226 self.room_transaction(room_id, |tx| async move {
1227 room_participant::Entity::delete_many()
1228 .filter(
1229 room_participant::Column::RoomId
1230 .eq(room_id)
1231 .and(room_participant::Column::UserId.eq(called_user_id)),
1232 )
1233 .exec(&*tx)
1234 .await?;
1235 let room = self.get_room(room_id, &tx).await?;
1236 Ok(room)
1237 })
1238 .await
1239 }
1240
1241 pub async fn decline_call(
1242 &self,
1243 expected_room_id: Option<RoomId>,
1244 user_id: UserId,
1245 ) -> Result<Option<RoomGuard<proto::Room>>> {
1246 self.optional_room_transaction(|tx| async move {
1247 let mut filter = Condition::all()
1248 .add(room_participant::Column::UserId.eq(user_id))
1249 .add(room_participant::Column::AnsweringConnectionId.is_null());
1250 if let Some(room_id) = expected_room_id {
1251 filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1252 }
1253 let participant = room_participant::Entity::find()
1254 .filter(filter)
1255 .one(&*tx)
1256 .await?;
1257
1258 let participant = if let Some(participant) = participant {
1259 participant
1260 } else if expected_room_id.is_some() {
1261 return Err(anyhow!("could not find call to decline"))?;
1262 } else {
1263 return Ok(None);
1264 };
1265
1266 let room_id = participant.room_id;
1267 room_participant::Entity::delete(participant.into_active_model())
1268 .exec(&*tx)
1269 .await?;
1270
1271 let room = self.get_room(room_id, &tx).await?;
1272 Ok(Some((room_id, room)))
1273 })
1274 .await
1275 }
1276
1277 pub async fn cancel_call(
1278 &self,
1279 room_id: RoomId,
1280 calling_connection: ConnectionId,
1281 called_user_id: UserId,
1282 ) -> Result<RoomGuard<proto::Room>> {
1283 self.room_transaction(room_id, |tx| async move {
1284 let participant = room_participant::Entity::find()
1285 .filter(
1286 Condition::all()
1287 .add(room_participant::Column::UserId.eq(called_user_id))
1288 .add(room_participant::Column::RoomId.eq(room_id))
1289 .add(
1290 room_participant::Column::CallingConnectionId
1291 .eq(calling_connection.id as i32),
1292 )
1293 .add(
1294 room_participant::Column::CallingConnectionServerId
1295 .eq(calling_connection.owner_id as i32),
1296 )
1297 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1298 )
1299 .one(&*tx)
1300 .await?
1301 .ok_or_else(|| anyhow!("no call to cancel"))?;
1302
1303 room_participant::Entity::delete(participant.into_active_model())
1304 .exec(&*tx)
1305 .await?;
1306
1307 let room = self.get_room(room_id, &tx).await?;
1308 Ok(room)
1309 })
1310 .await
1311 }
1312
1313 pub async fn join_room(
1314 &self,
1315 room_id: RoomId,
1316 user_id: UserId,
1317 connection: ConnectionId,
1318 ) -> Result<RoomGuard<proto::Room>> {
1319 self.room_transaction(room_id, |tx| async move {
1320 let result = room_participant::Entity::update_many()
1321 .filter(
1322 Condition::all()
1323 .add(room_participant::Column::RoomId.eq(room_id))
1324 .add(room_participant::Column::UserId.eq(user_id))
1325 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1326 )
1327 .set(room_participant::ActiveModel {
1328 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1329 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1330 connection.owner_id as i32,
1331 ))),
1332 answering_connection_lost: ActiveValue::set(false),
1333 ..Default::default()
1334 })
1335 .exec(&*tx)
1336 .await?;
1337 if result.rows_affected == 0 {
1338 Err(anyhow!("room does not exist or was already joined"))?
1339 } else {
1340 let room = self.get_room(room_id, &tx).await?;
1341 Ok(room)
1342 }
1343 })
1344 .await
1345 }
1346
1347 pub async fn rejoin_room(
1348 &self,
1349 rejoin_room: proto::RejoinRoom,
1350 user_id: UserId,
1351 connection: ConnectionId,
1352 ) -> Result<RoomGuard<RejoinedRoom>> {
1353 let room_id = RoomId::from_proto(rejoin_room.id);
1354 self.room_transaction(room_id, |tx| async {
1355 let tx = tx;
1356 let participant_update = room_participant::Entity::update_many()
1357 .filter(
1358 Condition::all()
1359 .add(room_participant::Column::RoomId.eq(room_id))
1360 .add(room_participant::Column::UserId.eq(user_id))
1361 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1362 .add(
1363 Condition::any()
1364 .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1365 .add(
1366 room_participant::Column::AnsweringConnectionServerId
1367 .ne(connection.owner_id as i32),
1368 ),
1369 ),
1370 )
1371 .set(room_participant::ActiveModel {
1372 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1373 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1374 connection.owner_id as i32,
1375 ))),
1376 answering_connection_lost: ActiveValue::set(false),
1377 ..Default::default()
1378 })
1379 .exec(&*tx)
1380 .await?;
1381 if participant_update.rows_affected == 0 {
1382 return Err(anyhow!("room does not exist or was already joined"))?;
1383 }
1384
1385 let mut reshared_projects = Vec::new();
1386 for reshared_project in &rejoin_room.reshared_projects {
1387 let project_id = ProjectId::from_proto(reshared_project.project_id);
1388 let project = project::Entity::find_by_id(project_id)
1389 .one(&*tx)
1390 .await?
1391 .ok_or_else(|| anyhow!("project does not exist"))?;
1392 if project.host_user_id != user_id {
1393 return Err(anyhow!("no such project"))?;
1394 }
1395
1396 let mut collaborators = project
1397 .find_related(project_collaborator::Entity)
1398 .all(&*tx)
1399 .await?;
1400 let host_ix = collaborators
1401 .iter()
1402 .position(|collaborator| {
1403 collaborator.user_id == user_id && collaborator.is_host
1404 })
1405 .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1406 let host = collaborators.swap_remove(host_ix);
1407 let old_connection_id = host.connection();
1408
1409 project::Entity::update(project::ActiveModel {
1410 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1411 host_connection_server_id: ActiveValue::set(Some(ServerId(
1412 connection.owner_id as i32,
1413 ))),
1414 ..project.into_active_model()
1415 })
1416 .exec(&*tx)
1417 .await?;
1418 project_collaborator::Entity::update(project_collaborator::ActiveModel {
1419 connection_id: ActiveValue::set(connection.id as i32),
1420 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1421 ..host.into_active_model()
1422 })
1423 .exec(&*tx)
1424 .await?;
1425
1426 self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1427 .await?;
1428
1429 reshared_projects.push(ResharedProject {
1430 id: project_id,
1431 old_connection_id,
1432 collaborators: collaborators
1433 .iter()
1434 .map(|collaborator| ProjectCollaborator {
1435 connection_id: collaborator.connection(),
1436 user_id: collaborator.user_id,
1437 replica_id: collaborator.replica_id,
1438 is_host: collaborator.is_host,
1439 })
1440 .collect(),
1441 worktrees: reshared_project.worktrees.clone(),
1442 });
1443 }
1444
1445 project::Entity::delete_many()
1446 .filter(
1447 Condition::all()
1448 .add(project::Column::RoomId.eq(room_id))
1449 .add(project::Column::HostUserId.eq(user_id))
1450 .add(
1451 project::Column::Id
1452 .is_not_in(reshared_projects.iter().map(|project| project.id)),
1453 ),
1454 )
1455 .exec(&*tx)
1456 .await?;
1457
1458 let mut rejoined_projects = Vec::new();
1459 for rejoined_project in &rejoin_room.rejoined_projects {
1460 let project_id = ProjectId::from_proto(rejoined_project.id);
1461 let Some(project) = project::Entity::find_by_id(project_id)
1462 .one(&*tx)
1463 .await? else { continue };
1464
1465 let mut worktrees = Vec::new();
1466 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1467 for db_worktree in db_worktrees {
1468 let mut worktree = RejoinedWorktree {
1469 id: db_worktree.id as u64,
1470 abs_path: db_worktree.abs_path,
1471 root_name: db_worktree.root_name,
1472 visible: db_worktree.visible,
1473 updated_entries: Default::default(),
1474 removed_entries: Default::default(),
1475 diagnostic_summaries: Default::default(),
1476 scan_id: db_worktree.scan_id as u64,
1477 completed_scan_id: db_worktree.completed_scan_id as u64,
1478 };
1479
1480 let rejoined_worktree = rejoined_project
1481 .worktrees
1482 .iter()
1483 .find(|worktree| worktree.id == db_worktree.id as u64);
1484 let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1485 worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1486 } else {
1487 worktree_entry::Column::IsDeleted.eq(false)
1488 };
1489
1490 let mut db_entries = worktree_entry::Entity::find()
1491 .filter(
1492 Condition::all()
1493 .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1494 .add(entry_filter),
1495 )
1496 .stream(&*tx)
1497 .await?;
1498
1499 while let Some(db_entry) = db_entries.next().await {
1500 let db_entry = db_entry?;
1501 if db_entry.is_deleted {
1502 worktree.removed_entries.push(db_entry.id as u64);
1503 } else {
1504 worktree.updated_entries.push(proto::Entry {
1505 id: db_entry.id as u64,
1506 is_dir: db_entry.is_dir,
1507 path: db_entry.path,
1508 inode: db_entry.inode as u64,
1509 mtime: Some(proto::Timestamp {
1510 seconds: db_entry.mtime_seconds as u64,
1511 nanos: db_entry.mtime_nanos as u32,
1512 }),
1513 is_symlink: db_entry.is_symlink,
1514 is_ignored: db_entry.is_ignored,
1515 });
1516 }
1517 }
1518
1519 worktrees.push(worktree);
1520 }
1521
1522 let language_servers = project
1523 .find_related(language_server::Entity)
1524 .all(&*tx)
1525 .await?
1526 .into_iter()
1527 .map(|language_server| proto::LanguageServer {
1528 id: language_server.id as u64,
1529 name: language_server.name,
1530 })
1531 .collect::<Vec<_>>();
1532
1533 let mut collaborators = project
1534 .find_related(project_collaborator::Entity)
1535 .all(&*tx)
1536 .await?;
1537 let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1538 .iter()
1539 .position(|collaborator| collaborator.user_id == user_id)
1540 {
1541 collaborators.swap_remove(self_collaborator_ix)
1542 } else {
1543 continue;
1544 };
1545 let old_connection_id = self_collaborator.connection();
1546 project_collaborator::Entity::update(project_collaborator::ActiveModel {
1547 connection_id: ActiveValue::set(connection.id as i32),
1548 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1549 ..self_collaborator.into_active_model()
1550 })
1551 .exec(&*tx)
1552 .await?;
1553
1554 let collaborators = collaborators
1555 .into_iter()
1556 .map(|collaborator| ProjectCollaborator {
1557 connection_id: collaborator.connection(),
1558 user_id: collaborator.user_id,
1559 replica_id: collaborator.replica_id,
1560 is_host: collaborator.is_host,
1561 })
1562 .collect::<Vec<_>>();
1563
1564 rejoined_projects.push(RejoinedProject {
1565 id: project_id,
1566 old_connection_id,
1567 collaborators,
1568 worktrees,
1569 language_servers,
1570 });
1571 }
1572
1573 let room = self.get_room(room_id, &tx).await?;
1574 Ok(RejoinedRoom {
1575 room,
1576 rejoined_projects,
1577 reshared_projects,
1578 })
1579 })
1580 .await
1581 }
1582
1583 pub async fn leave_room(
1584 &self,
1585 connection: ConnectionId,
1586 ) -> Result<Option<RoomGuard<LeftRoom>>> {
1587 self.optional_room_transaction(|tx| async move {
1588 let leaving_participant = room_participant::Entity::find()
1589 .filter(
1590 Condition::all()
1591 .add(
1592 room_participant::Column::AnsweringConnectionId
1593 .eq(connection.id as i32),
1594 )
1595 .add(
1596 room_participant::Column::AnsweringConnectionServerId
1597 .eq(connection.owner_id as i32),
1598 ),
1599 )
1600 .one(&*tx)
1601 .await?;
1602
1603 if let Some(leaving_participant) = leaving_participant {
1604 // Leave room.
1605 let room_id = leaving_participant.room_id;
1606 room_participant::Entity::delete_by_id(leaving_participant.id)
1607 .exec(&*tx)
1608 .await?;
1609
1610 // Cancel pending calls initiated by the leaving user.
1611 let called_participants = room_participant::Entity::find()
1612 .filter(
1613 Condition::all()
1614 .add(
1615 room_participant::Column::CallingUserId
1616 .eq(leaving_participant.user_id),
1617 )
1618 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1619 )
1620 .all(&*tx)
1621 .await?;
1622 room_participant::Entity::delete_many()
1623 .filter(
1624 room_participant::Column::Id
1625 .is_in(called_participants.iter().map(|participant| participant.id)),
1626 )
1627 .exec(&*tx)
1628 .await?;
1629 let canceled_calls_to_user_ids = called_participants
1630 .into_iter()
1631 .map(|participant| participant.user_id)
1632 .collect();
1633
1634 // Detect left projects.
1635 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1636 enum QueryProjectIds {
1637 ProjectId,
1638 }
1639 let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1640 .select_only()
1641 .column_as(
1642 project_collaborator::Column::ProjectId,
1643 QueryProjectIds::ProjectId,
1644 )
1645 .filter(
1646 Condition::all()
1647 .add(
1648 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1649 )
1650 .add(
1651 project_collaborator::Column::ConnectionServerId
1652 .eq(connection.owner_id as i32),
1653 ),
1654 )
1655 .into_values::<_, QueryProjectIds>()
1656 .all(&*tx)
1657 .await?;
1658 let mut left_projects = HashMap::default();
1659 let mut collaborators = project_collaborator::Entity::find()
1660 .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1661 .stream(&*tx)
1662 .await?;
1663 while let Some(collaborator) = collaborators.next().await {
1664 let collaborator = collaborator?;
1665 let left_project =
1666 left_projects
1667 .entry(collaborator.project_id)
1668 .or_insert(LeftProject {
1669 id: collaborator.project_id,
1670 host_user_id: Default::default(),
1671 connection_ids: Default::default(),
1672 host_connection_id: Default::default(),
1673 });
1674
1675 let collaborator_connection_id = collaborator.connection();
1676 if collaborator_connection_id != connection {
1677 left_project.connection_ids.push(collaborator_connection_id);
1678 }
1679
1680 if collaborator.is_host {
1681 left_project.host_user_id = collaborator.user_id;
1682 left_project.host_connection_id = collaborator_connection_id;
1683 }
1684 }
1685 drop(collaborators);
1686
1687 // Leave projects.
1688 project_collaborator::Entity::delete_many()
1689 .filter(
1690 Condition::all()
1691 .add(
1692 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1693 )
1694 .add(
1695 project_collaborator::Column::ConnectionServerId
1696 .eq(connection.owner_id as i32),
1697 ),
1698 )
1699 .exec(&*tx)
1700 .await?;
1701
1702 // Unshare projects.
1703 project::Entity::delete_many()
1704 .filter(
1705 Condition::all()
1706 .add(project::Column::RoomId.eq(room_id))
1707 .add(project::Column::HostConnectionId.eq(connection.id as i32))
1708 .add(
1709 project::Column::HostConnectionServerId
1710 .eq(connection.owner_id as i32),
1711 ),
1712 )
1713 .exec(&*tx)
1714 .await?;
1715
1716 let room = self.get_room(room_id, &tx).await?;
1717 if room.participants.is_empty() {
1718 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
1719 }
1720
1721 let left_room = LeftRoom {
1722 room,
1723 left_projects,
1724 canceled_calls_to_user_ids,
1725 };
1726
1727 if left_room.room.participants.is_empty() {
1728 self.rooms.remove(&room_id);
1729 }
1730
1731 Ok(Some((room_id, left_room)))
1732 } else {
1733 Ok(None)
1734 }
1735 })
1736 .await
1737 }
1738
1739 pub async fn follow(
1740 &self,
1741 project_id: ProjectId,
1742 leader_connection: ConnectionId,
1743 follower_connection: ConnectionId,
1744 ) -> Result<RoomGuard<proto::Room>> {
1745 let room_id = self.room_id_for_project(project_id).await?;
1746 self.room_transaction(room_id, |tx| async move {
1747 follower::ActiveModel {
1748 room_id: ActiveValue::set(room_id),
1749 project_id: ActiveValue::set(project_id),
1750 leader_connection_server_id: ActiveValue::set(ServerId(
1751 leader_connection.owner_id as i32,
1752 )),
1753 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1754 follower_connection_server_id: ActiveValue::set(ServerId(
1755 follower_connection.owner_id as i32,
1756 )),
1757 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1758 ..Default::default()
1759 }
1760 .insert(&*tx)
1761 .await?;
1762
1763 let room = self.get_room(room_id, &*tx).await?;
1764 Ok(room)
1765 })
1766 .await
1767 }
1768
1769 pub async fn unfollow(
1770 &self,
1771 project_id: ProjectId,
1772 leader_connection: ConnectionId,
1773 follower_connection: ConnectionId,
1774 ) -> Result<RoomGuard<proto::Room>> {
1775 let room_id = self.room_id_for_project(project_id).await?;
1776 self.room_transaction(room_id, |tx| async move {
1777 follower::Entity::delete_many()
1778 .filter(
1779 Condition::all()
1780 .add(follower::Column::ProjectId.eq(project_id))
1781 .add(
1782 follower::Column::LeaderConnectionServerId
1783 .eq(leader_connection.owner_id),
1784 )
1785 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1786 .add(
1787 follower::Column::FollowerConnectionServerId
1788 .eq(follower_connection.owner_id),
1789 )
1790 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1791 )
1792 .exec(&*tx)
1793 .await?;
1794
1795 let room = self.get_room(room_id, &*tx).await?;
1796 Ok(room)
1797 })
1798 .await
1799 }
1800
1801 pub async fn update_room_participant_location(
1802 &self,
1803 room_id: RoomId,
1804 connection: ConnectionId,
1805 location: proto::ParticipantLocation,
1806 ) -> Result<RoomGuard<proto::Room>> {
1807 self.room_transaction(room_id, |tx| async {
1808 let tx = tx;
1809 let location_kind;
1810 let location_project_id;
1811 match location
1812 .variant
1813 .as_ref()
1814 .ok_or_else(|| anyhow!("invalid location"))?
1815 {
1816 proto::participant_location::Variant::SharedProject(project) => {
1817 location_kind = 0;
1818 location_project_id = Some(ProjectId::from_proto(project.id));
1819 }
1820 proto::participant_location::Variant::UnsharedProject(_) => {
1821 location_kind = 1;
1822 location_project_id = None;
1823 }
1824 proto::participant_location::Variant::External(_) => {
1825 location_kind = 2;
1826 location_project_id = None;
1827 }
1828 }
1829
1830 let result = room_participant::Entity::update_many()
1831 .filter(
1832 Condition::all()
1833 .add(room_participant::Column::RoomId.eq(room_id))
1834 .add(
1835 room_participant::Column::AnsweringConnectionId
1836 .eq(connection.id as i32),
1837 )
1838 .add(
1839 room_participant::Column::AnsweringConnectionServerId
1840 .eq(connection.owner_id as i32),
1841 ),
1842 )
1843 .set(room_participant::ActiveModel {
1844 location_kind: ActiveValue::set(Some(location_kind)),
1845 location_project_id: ActiveValue::set(location_project_id),
1846 ..Default::default()
1847 })
1848 .exec(&*tx)
1849 .await?;
1850
1851 if result.rows_affected == 1 {
1852 let room = self.get_room(room_id, &tx).await?;
1853 Ok(room)
1854 } else {
1855 Err(anyhow!("could not update room participant location"))?
1856 }
1857 })
1858 .await
1859 }
1860
1861 pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1862 self.transaction(|tx| async move {
1863 let participant = room_participant::Entity::find()
1864 .filter(
1865 Condition::all()
1866 .add(
1867 room_participant::Column::AnsweringConnectionId
1868 .eq(connection.id as i32),
1869 )
1870 .add(
1871 room_participant::Column::AnsweringConnectionServerId
1872 .eq(connection.owner_id as i32),
1873 ),
1874 )
1875 .one(&*tx)
1876 .await?
1877 .ok_or_else(|| anyhow!("not a participant in any room"))?;
1878
1879 room_participant::Entity::update(room_participant::ActiveModel {
1880 answering_connection_lost: ActiveValue::set(true),
1881 ..participant.into_active_model()
1882 })
1883 .exec(&*tx)
1884 .await?;
1885
1886 Ok(())
1887 })
1888 .await
1889 }
1890
1891 fn build_incoming_call(
1892 room: &proto::Room,
1893 called_user_id: UserId,
1894 ) -> Option<proto::IncomingCall> {
1895 let pending_participant = room
1896 .pending_participants
1897 .iter()
1898 .find(|participant| participant.user_id == called_user_id.to_proto())?;
1899
1900 Some(proto::IncomingCall {
1901 room_id: room.id,
1902 calling_user_id: pending_participant.calling_user_id,
1903 participant_user_ids: room
1904 .participants
1905 .iter()
1906 .map(|participant| participant.user_id)
1907 .collect(),
1908 initial_project: room.participants.iter().find_map(|participant| {
1909 let initial_project_id = pending_participant.initial_project_id?;
1910 participant
1911 .projects
1912 .iter()
1913 .find(|project| project.id == initial_project_id)
1914 .cloned()
1915 }),
1916 })
1917 }
1918
1919 async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1920 let db_room = room::Entity::find_by_id(room_id)
1921 .one(tx)
1922 .await?
1923 .ok_or_else(|| anyhow!("could not find room"))?;
1924
1925 let mut db_participants = db_room
1926 .find_related(room_participant::Entity)
1927 .stream(tx)
1928 .await?;
1929 let mut participants = HashMap::default();
1930 let mut pending_participants = Vec::new();
1931 while let Some(db_participant) = db_participants.next().await {
1932 let db_participant = db_participant?;
1933 if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
1934 .answering_connection_id
1935 .zip(db_participant.answering_connection_server_id)
1936 {
1937 let location = match (
1938 db_participant.location_kind,
1939 db_participant.location_project_id,
1940 ) {
1941 (Some(0), Some(project_id)) => {
1942 Some(proto::participant_location::Variant::SharedProject(
1943 proto::participant_location::SharedProject {
1944 id: project_id.to_proto(),
1945 },
1946 ))
1947 }
1948 (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1949 Default::default(),
1950 )),
1951 _ => Some(proto::participant_location::Variant::External(
1952 Default::default(),
1953 )),
1954 };
1955
1956 let answering_connection = ConnectionId {
1957 owner_id: answering_connection_server_id.0 as u32,
1958 id: answering_connection_id as u32,
1959 };
1960 participants.insert(
1961 answering_connection,
1962 proto::Participant {
1963 user_id: db_participant.user_id.to_proto(),
1964 peer_id: Some(answering_connection.into()),
1965 projects: Default::default(),
1966 location: Some(proto::ParticipantLocation { variant: location }),
1967 },
1968 );
1969 } else {
1970 pending_participants.push(proto::PendingParticipant {
1971 user_id: db_participant.user_id.to_proto(),
1972 calling_user_id: db_participant.calling_user_id.to_proto(),
1973 initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1974 });
1975 }
1976 }
1977 drop(db_participants);
1978
1979 let mut db_projects = db_room
1980 .find_related(project::Entity)
1981 .find_with_related(worktree::Entity)
1982 .stream(tx)
1983 .await?;
1984
1985 while let Some(row) = db_projects.next().await {
1986 let (db_project, db_worktree) = row?;
1987 let host_connection = db_project.host_connection()?;
1988 if let Some(participant) = participants.get_mut(&host_connection) {
1989 let project = if let Some(project) = participant
1990 .projects
1991 .iter_mut()
1992 .find(|project| project.id == db_project.id.to_proto())
1993 {
1994 project
1995 } else {
1996 participant.projects.push(proto::ParticipantProject {
1997 id: db_project.id.to_proto(),
1998 worktree_root_names: Default::default(),
1999 });
2000 participant.projects.last_mut().unwrap()
2001 };
2002
2003 if let Some(db_worktree) = db_worktree {
2004 if db_worktree.visible {
2005 project.worktree_root_names.push(db_worktree.root_name);
2006 }
2007 }
2008 }
2009 }
2010 drop(db_projects);
2011
2012 let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
2013 let mut followers = Vec::new();
2014 while let Some(db_follower) = db_followers.next().await {
2015 let db_follower = db_follower?;
2016 followers.push(proto::Follower {
2017 leader_id: Some(db_follower.leader_connection().into()),
2018 follower_id: Some(db_follower.follower_connection().into()),
2019 project_id: db_follower.project_id.to_proto(),
2020 });
2021 }
2022
2023 Ok(proto::Room {
2024 id: db_room.id.to_proto(),
2025 live_kit_room: db_room.live_kit_room,
2026 participants: participants.into_values().collect(),
2027 pending_participants,
2028 followers,
2029 })
2030 }
2031
2032 // projects
2033
2034 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
2035 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2036 enum QueryAs {
2037 Count,
2038 }
2039
2040 self.transaction(|tx| async move {
2041 Ok(project::Entity::find()
2042 .select_only()
2043 .column_as(project::Column::Id.count(), QueryAs::Count)
2044 .inner_join(user::Entity)
2045 .filter(user::Column::Admin.eq(false))
2046 .into_values::<_, QueryAs>()
2047 .one(&*tx)
2048 .await?
2049 .unwrap_or(0i64) as usize)
2050 })
2051 .await
2052 }
2053
2054 pub async fn share_project(
2055 &self,
2056 room_id: RoomId,
2057 connection: ConnectionId,
2058 worktrees: &[proto::WorktreeMetadata],
2059 ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
2060 self.room_transaction(room_id, |tx| async move {
2061 let participant = room_participant::Entity::find()
2062 .filter(
2063 Condition::all()
2064 .add(
2065 room_participant::Column::AnsweringConnectionId
2066 .eq(connection.id as i32),
2067 )
2068 .add(
2069 room_participant::Column::AnsweringConnectionServerId
2070 .eq(connection.owner_id as i32),
2071 ),
2072 )
2073 .one(&*tx)
2074 .await?
2075 .ok_or_else(|| anyhow!("could not find participant"))?;
2076 if participant.room_id != room_id {
2077 return Err(anyhow!("shared project on unexpected room"))?;
2078 }
2079
2080 let project = project::ActiveModel {
2081 room_id: ActiveValue::set(participant.room_id),
2082 host_user_id: ActiveValue::set(participant.user_id),
2083 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
2084 host_connection_server_id: ActiveValue::set(Some(ServerId(
2085 connection.owner_id as i32,
2086 ))),
2087 ..Default::default()
2088 }
2089 .insert(&*tx)
2090 .await?;
2091
2092 if !worktrees.is_empty() {
2093 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2094 worktree::ActiveModel {
2095 id: ActiveValue::set(worktree.id as i64),
2096 project_id: ActiveValue::set(project.id),
2097 abs_path: ActiveValue::set(worktree.abs_path.clone()),
2098 root_name: ActiveValue::set(worktree.root_name.clone()),
2099 visible: ActiveValue::set(worktree.visible),
2100 scan_id: ActiveValue::set(0),
2101 completed_scan_id: ActiveValue::set(0),
2102 }
2103 }))
2104 .exec(&*tx)
2105 .await?;
2106 }
2107
2108 project_collaborator::ActiveModel {
2109 project_id: ActiveValue::set(project.id),
2110 connection_id: ActiveValue::set(connection.id as i32),
2111 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2112 user_id: ActiveValue::set(participant.user_id),
2113 replica_id: ActiveValue::set(ReplicaId(0)),
2114 is_host: ActiveValue::set(true),
2115 ..Default::default()
2116 }
2117 .insert(&*tx)
2118 .await?;
2119
2120 let room = self.get_room(room_id, &tx).await?;
2121 Ok((project.id, room))
2122 })
2123 .await
2124 }
2125
2126 pub async fn unshare_project(
2127 &self,
2128 project_id: ProjectId,
2129 connection: ConnectionId,
2130 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2131 let room_id = self.room_id_for_project(project_id).await?;
2132 self.room_transaction(room_id, |tx| async move {
2133 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2134
2135 let project = project::Entity::find_by_id(project_id)
2136 .one(&*tx)
2137 .await?
2138 .ok_or_else(|| anyhow!("project not found"))?;
2139 if project.host_connection()? == connection {
2140 project::Entity::delete(project.into_active_model())
2141 .exec(&*tx)
2142 .await?;
2143 let room = self.get_room(room_id, &tx).await?;
2144 Ok((room, guest_connection_ids))
2145 } else {
2146 Err(anyhow!("cannot unshare a project hosted by another user"))?
2147 }
2148 })
2149 .await
2150 }
2151
2152 pub async fn update_project(
2153 &self,
2154 project_id: ProjectId,
2155 connection: ConnectionId,
2156 worktrees: &[proto::WorktreeMetadata],
2157 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2158 let room_id = self.room_id_for_project(project_id).await?;
2159 self.room_transaction(room_id, |tx| async move {
2160 let project = project::Entity::find_by_id(project_id)
2161 .filter(
2162 Condition::all()
2163 .add(project::Column::HostConnectionId.eq(connection.id as i32))
2164 .add(
2165 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2166 ),
2167 )
2168 .one(&*tx)
2169 .await?
2170 .ok_or_else(|| anyhow!("no such project"))?;
2171
2172 self.update_project_worktrees(project.id, worktrees, &tx)
2173 .await?;
2174
2175 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2176 let room = self.get_room(project.room_id, &tx).await?;
2177 Ok((room, guest_connection_ids))
2178 })
2179 .await
2180 }
2181
2182 async fn update_project_worktrees(
2183 &self,
2184 project_id: ProjectId,
2185 worktrees: &[proto::WorktreeMetadata],
2186 tx: &DatabaseTransaction,
2187 ) -> Result<()> {
2188 if !worktrees.is_empty() {
2189 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2190 id: ActiveValue::set(worktree.id as i64),
2191 project_id: ActiveValue::set(project_id),
2192 abs_path: ActiveValue::set(worktree.abs_path.clone()),
2193 root_name: ActiveValue::set(worktree.root_name.clone()),
2194 visible: ActiveValue::set(worktree.visible),
2195 scan_id: ActiveValue::set(0),
2196 completed_scan_id: ActiveValue::set(0),
2197 }))
2198 .on_conflict(
2199 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2200 .update_column(worktree::Column::RootName)
2201 .to_owned(),
2202 )
2203 .exec(&*tx)
2204 .await?;
2205 }
2206
2207 worktree::Entity::delete_many()
2208 .filter(worktree::Column::ProjectId.eq(project_id).and(
2209 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2210 ))
2211 .exec(&*tx)
2212 .await?;
2213
2214 Ok(())
2215 }
2216
2217 pub async fn update_worktree(
2218 &self,
2219 update: &proto::UpdateWorktree,
2220 connection: ConnectionId,
2221 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2222 let project_id = ProjectId::from_proto(update.project_id);
2223 let worktree_id = update.worktree_id as i64;
2224 let room_id = self.room_id_for_project(project_id).await?;
2225 self.room_transaction(room_id, |tx| async move {
2226 // Ensure the update comes from the host.
2227 let _project = project::Entity::find_by_id(project_id)
2228 .filter(
2229 Condition::all()
2230 .add(project::Column::HostConnectionId.eq(connection.id as i32))
2231 .add(
2232 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2233 ),
2234 )
2235 .one(&*tx)
2236 .await?
2237 .ok_or_else(|| anyhow!("no such project"))?;
2238
2239 // Update metadata.
2240 worktree::Entity::update(worktree::ActiveModel {
2241 id: ActiveValue::set(worktree_id),
2242 project_id: ActiveValue::set(project_id),
2243 root_name: ActiveValue::set(update.root_name.clone()),
2244 scan_id: ActiveValue::set(update.scan_id as i64),
2245 completed_scan_id: if update.is_last_update {
2246 ActiveValue::set(update.scan_id as i64)
2247 } else {
2248 ActiveValue::default()
2249 },
2250 abs_path: ActiveValue::set(update.abs_path.clone()),
2251 ..Default::default()
2252 })
2253 .exec(&*tx)
2254 .await?;
2255
2256 if !update.updated_entries.is_empty() {
2257 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2258 let mtime = entry.mtime.clone().unwrap_or_default();
2259 worktree_entry::ActiveModel {
2260 project_id: ActiveValue::set(project_id),
2261 worktree_id: ActiveValue::set(worktree_id),
2262 id: ActiveValue::set(entry.id as i64),
2263 is_dir: ActiveValue::set(entry.is_dir),
2264 path: ActiveValue::set(entry.path.clone()),
2265 inode: ActiveValue::set(entry.inode as i64),
2266 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2267 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2268 is_symlink: ActiveValue::set(entry.is_symlink),
2269 is_ignored: ActiveValue::set(entry.is_ignored),
2270 is_deleted: ActiveValue::set(false),
2271 scan_id: ActiveValue::set(update.scan_id as i64),
2272 }
2273 }))
2274 .on_conflict(
2275 OnConflict::columns([
2276 worktree_entry::Column::ProjectId,
2277 worktree_entry::Column::WorktreeId,
2278 worktree_entry::Column::Id,
2279 ])
2280 .update_columns([
2281 worktree_entry::Column::IsDir,
2282 worktree_entry::Column::Path,
2283 worktree_entry::Column::Inode,
2284 worktree_entry::Column::MtimeSeconds,
2285 worktree_entry::Column::MtimeNanos,
2286 worktree_entry::Column::IsSymlink,
2287 worktree_entry::Column::IsIgnored,
2288 worktree_entry::Column::ScanId,
2289 ])
2290 .to_owned(),
2291 )
2292 .exec(&*tx)
2293 .await?;
2294 }
2295
2296 if !update.removed_entries.is_empty() {
2297 worktree_entry::Entity::update_many()
2298 .filter(
2299 worktree_entry::Column::ProjectId
2300 .eq(project_id)
2301 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2302 .and(
2303 worktree_entry::Column::Id
2304 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2305 ),
2306 )
2307 .set(worktree_entry::ActiveModel {
2308 is_deleted: ActiveValue::Set(true),
2309 scan_id: ActiveValue::Set(update.scan_id as i64),
2310 ..Default::default()
2311 })
2312 .exec(&*tx)
2313 .await?;
2314 }
2315
2316 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2317 Ok(connection_ids)
2318 })
2319 .await
2320 }
2321
2322 pub async fn update_diagnostic_summary(
2323 &self,
2324 update: &proto::UpdateDiagnosticSummary,
2325 connection: ConnectionId,
2326 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2327 let project_id = ProjectId::from_proto(update.project_id);
2328 let worktree_id = update.worktree_id as i64;
2329 let room_id = self.room_id_for_project(project_id).await?;
2330 self.room_transaction(room_id, |tx| async move {
2331 let summary = update
2332 .summary
2333 .as_ref()
2334 .ok_or_else(|| anyhow!("invalid summary"))?;
2335
2336 // Ensure the update comes from the host.
2337 let project = project::Entity::find_by_id(project_id)
2338 .one(&*tx)
2339 .await?
2340 .ok_or_else(|| anyhow!("no such project"))?;
2341 if project.host_connection()? != connection {
2342 return Err(anyhow!("can't update a project hosted by someone else"))?;
2343 }
2344
2345 // Update summary.
2346 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2347 project_id: ActiveValue::set(project_id),
2348 worktree_id: ActiveValue::set(worktree_id),
2349 path: ActiveValue::set(summary.path.clone()),
2350 language_server_id: ActiveValue::set(summary.language_server_id as i64),
2351 error_count: ActiveValue::set(summary.error_count as i32),
2352 warning_count: ActiveValue::set(summary.warning_count as i32),
2353 ..Default::default()
2354 })
2355 .on_conflict(
2356 OnConflict::columns([
2357 worktree_diagnostic_summary::Column::ProjectId,
2358 worktree_diagnostic_summary::Column::WorktreeId,
2359 worktree_diagnostic_summary::Column::Path,
2360 ])
2361 .update_columns([
2362 worktree_diagnostic_summary::Column::LanguageServerId,
2363 worktree_diagnostic_summary::Column::ErrorCount,
2364 worktree_diagnostic_summary::Column::WarningCount,
2365 ])
2366 .to_owned(),
2367 )
2368 .exec(&*tx)
2369 .await?;
2370
2371 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2372 Ok(connection_ids)
2373 })
2374 .await
2375 }
2376
2377 pub async fn start_language_server(
2378 &self,
2379 update: &proto::StartLanguageServer,
2380 connection: ConnectionId,
2381 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2382 let project_id = ProjectId::from_proto(update.project_id);
2383 let room_id = self.room_id_for_project(project_id).await?;
2384 self.room_transaction(room_id, |tx| async move {
2385 let server = update
2386 .server
2387 .as_ref()
2388 .ok_or_else(|| anyhow!("invalid language server"))?;
2389
2390 // Ensure the update comes from the host.
2391 let project = project::Entity::find_by_id(project_id)
2392 .one(&*tx)
2393 .await?
2394 .ok_or_else(|| anyhow!("no such project"))?;
2395 if project.host_connection()? != connection {
2396 return Err(anyhow!("can't update a project hosted by someone else"))?;
2397 }
2398
2399 // Add the newly-started language server.
2400 language_server::Entity::insert(language_server::ActiveModel {
2401 project_id: ActiveValue::set(project_id),
2402 id: ActiveValue::set(server.id as i64),
2403 name: ActiveValue::set(server.name.clone()),
2404 ..Default::default()
2405 })
2406 .on_conflict(
2407 OnConflict::columns([
2408 language_server::Column::ProjectId,
2409 language_server::Column::Id,
2410 ])
2411 .update_column(language_server::Column::Name)
2412 .to_owned(),
2413 )
2414 .exec(&*tx)
2415 .await?;
2416
2417 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2418 Ok(connection_ids)
2419 })
2420 .await
2421 }
2422
2423 pub async fn join_project(
2424 &self,
2425 project_id: ProjectId,
2426 connection: ConnectionId,
2427 ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2428 let room_id = self.room_id_for_project(project_id).await?;
2429 self.room_transaction(room_id, |tx| async move {
2430 let participant = room_participant::Entity::find()
2431 .filter(
2432 Condition::all()
2433 .add(
2434 room_participant::Column::AnsweringConnectionId
2435 .eq(connection.id as i32),
2436 )
2437 .add(
2438 room_participant::Column::AnsweringConnectionServerId
2439 .eq(connection.owner_id as i32),
2440 ),
2441 )
2442 .one(&*tx)
2443 .await?
2444 .ok_or_else(|| anyhow!("must join a room first"))?;
2445
2446 let project = project::Entity::find_by_id(project_id)
2447 .one(&*tx)
2448 .await?
2449 .ok_or_else(|| anyhow!("no such project"))?;
2450 if project.room_id != participant.room_id {
2451 return Err(anyhow!("no such project"))?;
2452 }
2453
2454 let mut collaborators = project
2455 .find_related(project_collaborator::Entity)
2456 .all(&*tx)
2457 .await?;
2458 let replica_ids = collaborators
2459 .iter()
2460 .map(|c| c.replica_id)
2461 .collect::<HashSet<_>>();
2462 let mut replica_id = ReplicaId(1);
2463 while replica_ids.contains(&replica_id) {
2464 replica_id.0 += 1;
2465 }
2466 let new_collaborator = project_collaborator::ActiveModel {
2467 project_id: ActiveValue::set(project_id),
2468 connection_id: ActiveValue::set(connection.id as i32),
2469 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2470 user_id: ActiveValue::set(participant.user_id),
2471 replica_id: ActiveValue::set(replica_id),
2472 is_host: ActiveValue::set(false),
2473 ..Default::default()
2474 }
2475 .insert(&*tx)
2476 .await?;
2477 collaborators.push(new_collaborator);
2478
2479 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2480 let mut worktrees = db_worktrees
2481 .into_iter()
2482 .map(|db_worktree| {
2483 (
2484 db_worktree.id as u64,
2485 Worktree {
2486 id: db_worktree.id as u64,
2487 abs_path: db_worktree.abs_path,
2488 root_name: db_worktree.root_name,
2489 visible: db_worktree.visible,
2490 entries: Default::default(),
2491 diagnostic_summaries: Default::default(),
2492 scan_id: db_worktree.scan_id as u64,
2493 completed_scan_id: db_worktree.completed_scan_id as u64,
2494 },
2495 )
2496 })
2497 .collect::<BTreeMap<_, _>>();
2498
2499 // Populate worktree entries.
2500 {
2501 let mut db_entries = worktree_entry::Entity::find()
2502 .filter(
2503 Condition::all()
2504 .add(worktree_entry::Column::ProjectId.eq(project_id))
2505 .add(worktree_entry::Column::IsDeleted.eq(false)),
2506 )
2507 .stream(&*tx)
2508 .await?;
2509 while let Some(db_entry) = db_entries.next().await {
2510 let db_entry = db_entry?;
2511 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2512 worktree.entries.push(proto::Entry {
2513 id: db_entry.id as u64,
2514 is_dir: db_entry.is_dir,
2515 path: db_entry.path,
2516 inode: db_entry.inode as u64,
2517 mtime: Some(proto::Timestamp {
2518 seconds: db_entry.mtime_seconds as u64,
2519 nanos: db_entry.mtime_nanos as u32,
2520 }),
2521 is_symlink: db_entry.is_symlink,
2522 is_ignored: db_entry.is_ignored,
2523 });
2524 }
2525 }
2526 }
2527
2528 // Populate worktree diagnostic summaries.
2529 {
2530 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2531 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2532 .stream(&*tx)
2533 .await?;
2534 while let Some(db_summary) = db_summaries.next().await {
2535 let db_summary = db_summary?;
2536 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2537 worktree
2538 .diagnostic_summaries
2539 .push(proto::DiagnosticSummary {
2540 path: db_summary.path,
2541 language_server_id: db_summary.language_server_id as u64,
2542 error_count: db_summary.error_count as u32,
2543 warning_count: db_summary.warning_count as u32,
2544 });
2545 }
2546 }
2547 }
2548
2549 // Populate language servers.
2550 let language_servers = project
2551 .find_related(language_server::Entity)
2552 .all(&*tx)
2553 .await?;
2554
2555 let project = Project {
2556 collaborators: collaborators
2557 .into_iter()
2558 .map(|collaborator| ProjectCollaborator {
2559 connection_id: collaborator.connection(),
2560 user_id: collaborator.user_id,
2561 replica_id: collaborator.replica_id,
2562 is_host: collaborator.is_host,
2563 })
2564 .collect(),
2565 worktrees,
2566 language_servers: language_servers
2567 .into_iter()
2568 .map(|language_server| proto::LanguageServer {
2569 id: language_server.id as u64,
2570 name: language_server.name,
2571 })
2572 .collect(),
2573 };
2574 Ok((project, replica_id as ReplicaId))
2575 })
2576 .await
2577 }
2578
2579 pub async fn leave_project(
2580 &self,
2581 project_id: ProjectId,
2582 connection: ConnectionId,
2583 ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
2584 let room_id = self.room_id_for_project(project_id).await?;
2585 self.room_transaction(room_id, |tx| async move {
2586 let result = project_collaborator::Entity::delete_many()
2587 .filter(
2588 Condition::all()
2589 .add(project_collaborator::Column::ProjectId.eq(project_id))
2590 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2591 .add(
2592 project_collaborator::Column::ConnectionServerId
2593 .eq(connection.owner_id as i32),
2594 ),
2595 )
2596 .exec(&*tx)
2597 .await?;
2598 if result.rows_affected == 0 {
2599 Err(anyhow!("not a collaborator on this project"))?;
2600 }
2601
2602 let project = project::Entity::find_by_id(project_id)
2603 .one(&*tx)
2604 .await?
2605 .ok_or_else(|| anyhow!("no such project"))?;
2606 let collaborators = project
2607 .find_related(project_collaborator::Entity)
2608 .all(&*tx)
2609 .await?;
2610 let connection_ids = collaborators
2611 .into_iter()
2612 .map(|collaborator| collaborator.connection())
2613 .collect();
2614
2615 follower::Entity::delete_many()
2616 .filter(
2617 Condition::any()
2618 .add(
2619 Condition::all()
2620 .add(follower::Column::ProjectId.eq(project_id))
2621 .add(
2622 follower::Column::LeaderConnectionServerId
2623 .eq(connection.owner_id),
2624 )
2625 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
2626 )
2627 .add(
2628 Condition::all()
2629 .add(follower::Column::ProjectId.eq(project_id))
2630 .add(
2631 follower::Column::FollowerConnectionServerId
2632 .eq(connection.owner_id),
2633 )
2634 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
2635 ),
2636 )
2637 .exec(&*tx)
2638 .await?;
2639
2640 let room = self.get_room(project.room_id, &tx).await?;
2641 let left_project = LeftProject {
2642 id: project_id,
2643 host_user_id: project.host_user_id,
2644 host_connection_id: project.host_connection()?,
2645 connection_ids,
2646 };
2647 Ok((room, left_project))
2648 })
2649 .await
2650 }
2651
2652 pub async fn project_collaborators(
2653 &self,
2654 project_id: ProjectId,
2655 connection_id: ConnectionId,
2656 ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
2657 let room_id = self.room_id_for_project(project_id).await?;
2658 self.room_transaction(room_id, |tx| async move {
2659 let collaborators = project_collaborator::Entity::find()
2660 .filter(project_collaborator::Column::ProjectId.eq(project_id))
2661 .all(&*tx)
2662 .await?
2663 .into_iter()
2664 .map(|collaborator| ProjectCollaborator {
2665 connection_id: collaborator.connection(),
2666 user_id: collaborator.user_id,
2667 replica_id: collaborator.replica_id,
2668 is_host: collaborator.is_host,
2669 })
2670 .collect::<Vec<_>>();
2671
2672 if collaborators
2673 .iter()
2674 .any(|collaborator| collaborator.connection_id == connection_id)
2675 {
2676 Ok(collaborators)
2677 } else {
2678 Err(anyhow!("no such project"))?
2679 }
2680 })
2681 .await
2682 }
2683
2684 pub async fn project_connection_ids(
2685 &self,
2686 project_id: ProjectId,
2687 connection_id: ConnectionId,
2688 ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
2689 let room_id = self.room_id_for_project(project_id).await?;
2690 self.room_transaction(room_id, |tx| async move {
2691 let mut collaborators = project_collaborator::Entity::find()
2692 .filter(project_collaborator::Column::ProjectId.eq(project_id))
2693 .stream(&*tx)
2694 .await?;
2695
2696 let mut connection_ids = HashSet::default();
2697 while let Some(collaborator) = collaborators.next().await {
2698 let collaborator = collaborator?;
2699 connection_ids.insert(collaborator.connection());
2700 }
2701
2702 if connection_ids.contains(&connection_id) {
2703 Ok(connection_ids)
2704 } else {
2705 Err(anyhow!("no such project"))?
2706 }
2707 })
2708 .await
2709 }
2710
2711 async fn project_guest_connection_ids(
2712 &self,
2713 project_id: ProjectId,
2714 tx: &DatabaseTransaction,
2715 ) -> Result<Vec<ConnectionId>> {
2716 let mut collaborators = project_collaborator::Entity::find()
2717 .filter(
2718 project_collaborator::Column::ProjectId
2719 .eq(project_id)
2720 .and(project_collaborator::Column::IsHost.eq(false)),
2721 )
2722 .stream(tx)
2723 .await?;
2724
2725 let mut guest_connection_ids = Vec::new();
2726 while let Some(collaborator) = collaborators.next().await {
2727 let collaborator = collaborator?;
2728 guest_connection_ids.push(collaborator.connection());
2729 }
2730 Ok(guest_connection_ids)
2731 }
2732
2733 async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
2734 self.transaction(|tx| async move {
2735 let project = project::Entity::find_by_id(project_id)
2736 .one(&*tx)
2737 .await?
2738 .ok_or_else(|| anyhow!("project {} not found", project_id))?;
2739 Ok(project.room_id)
2740 })
2741 .await
2742 }
2743
2744 // access tokens
2745
2746 pub async fn create_access_token_hash(
2747 &self,
2748 user_id: UserId,
2749 access_token_hash: &str,
2750 max_access_token_count: usize,
2751 ) -> Result<()> {
2752 self.transaction(|tx| async {
2753 let tx = tx;
2754
2755 access_token::ActiveModel {
2756 user_id: ActiveValue::set(user_id),
2757 hash: ActiveValue::set(access_token_hash.into()),
2758 ..Default::default()
2759 }
2760 .insert(&*tx)
2761 .await?;
2762
2763 access_token::Entity::delete_many()
2764 .filter(
2765 access_token::Column::Id.in_subquery(
2766 Query::select()
2767 .column(access_token::Column::Id)
2768 .from(access_token::Entity)
2769 .and_where(access_token::Column::UserId.eq(user_id))
2770 .order_by(access_token::Column::Id, sea_orm::Order::Desc)
2771 .limit(10000)
2772 .offset(max_access_token_count as u64)
2773 .to_owned(),
2774 ),
2775 )
2776 .exec(&*tx)
2777 .await?;
2778 Ok(())
2779 })
2780 .await
2781 }
2782
2783 pub async fn get_access_token_hashes(&self, user_id: UserId) -> Result<Vec<String>> {
2784 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2785 enum QueryAs {
2786 Hash,
2787 }
2788
2789 self.transaction(|tx| async move {
2790 Ok(access_token::Entity::find()
2791 .select_only()
2792 .column(access_token::Column::Hash)
2793 .filter(access_token::Column::UserId.eq(user_id))
2794 .order_by_desc(access_token::Column::Id)
2795 .into_values::<_, QueryAs>()
2796 .all(&*tx)
2797 .await?)
2798 })
2799 .await
2800 }
2801
2802 async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
2803 where
2804 F: Send + Fn(TransactionHandle) -> Fut,
2805 Fut: Send + Future<Output = Result<T>>,
2806 {
2807 let body = async {
2808 loop {
2809 let (tx, result) = self.with_transaction(&f).await?;
2810 match result {
2811 Ok(result) => {
2812 match tx.commit().await.map_err(Into::into) {
2813 Ok(()) => return Ok(result),
2814 Err(error) => {
2815 if is_serialization_error(&error) {
2816 // Retry (don't break the loop)
2817 } else {
2818 return Err(error);
2819 }
2820 }
2821 }
2822 }
2823 Err(error) => {
2824 tx.rollback().await?;
2825 if is_serialization_error(&error) {
2826 // Retry (don't break the loop)
2827 } else {
2828 return Err(error);
2829 }
2830 }
2831 }
2832 }
2833 };
2834
2835 self.run(body).await
2836 }
2837
2838 async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
2839 where
2840 F: Send + Fn(TransactionHandle) -> Fut,
2841 Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
2842 {
2843 let body = async {
2844 loop {
2845 let (tx, result) = self.with_transaction(&f).await?;
2846 match result {
2847 Ok(Some((room_id, data))) => {
2848 let lock = self.rooms.entry(room_id).or_default().clone();
2849 let _guard = lock.lock_owned().await;
2850 match tx.commit().await.map_err(Into::into) {
2851 Ok(()) => {
2852 return Ok(Some(RoomGuard {
2853 data,
2854 _guard,
2855 _not_send: PhantomData,
2856 }));
2857 }
2858 Err(error) => {
2859 if is_serialization_error(&error) {
2860 // Retry (don't break the loop)
2861 } else {
2862 return Err(error);
2863 }
2864 }
2865 }
2866 }
2867 Ok(None) => {
2868 match tx.commit().await.map_err(Into::into) {
2869 Ok(()) => return Ok(None),
2870 Err(error) => {
2871 if is_serialization_error(&error) {
2872 // Retry (don't break the loop)
2873 } else {
2874 return Err(error);
2875 }
2876 }
2877 }
2878 }
2879 Err(error) => {
2880 tx.rollback().await?;
2881 if is_serialization_error(&error) {
2882 // Retry (don't break the loop)
2883 } else {
2884 return Err(error);
2885 }
2886 }
2887 }
2888 }
2889 };
2890
2891 self.run(body).await
2892 }
2893
2894 async fn room_transaction<F, Fut, T>(&self, room_id: RoomId, f: F) -> Result<RoomGuard<T>>
2895 where
2896 F: Send + Fn(TransactionHandle) -> Fut,
2897 Fut: Send + Future<Output = Result<T>>,
2898 {
2899 let body = async {
2900 loop {
2901 let lock = self.rooms.entry(room_id).or_default().clone();
2902 let _guard = lock.lock_owned().await;
2903 let (tx, result) = self.with_transaction(&f).await?;
2904 match result {
2905 Ok(data) => {
2906 match tx.commit().await.map_err(Into::into) {
2907 Ok(()) => {
2908 return Ok(RoomGuard {
2909 data,
2910 _guard,
2911 _not_send: PhantomData,
2912 });
2913 }
2914 Err(error) => {
2915 if is_serialization_error(&error) {
2916 // Retry (don't break the loop)
2917 } else {
2918 return Err(error);
2919 }
2920 }
2921 }
2922 }
2923 Err(error) => {
2924 tx.rollback().await?;
2925 if is_serialization_error(&error) {
2926 // Retry (don't break the loop)
2927 } else {
2928 return Err(error);
2929 }
2930 }
2931 }
2932 }
2933 };
2934
2935 self.run(body).await
2936 }
2937
2938 async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
2939 where
2940 F: Send + Fn(TransactionHandle) -> Fut,
2941 Fut: Send + Future<Output = Result<T>>,
2942 {
2943 let tx = self
2944 .pool
2945 .begin_with_config(Some(IsolationLevel::Serializable), None)
2946 .await?;
2947
2948 let mut tx = Arc::new(Some(tx));
2949 let result = f(TransactionHandle(tx.clone())).await;
2950 let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
2951 return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
2952 };
2953
2954 Ok((tx, result))
2955 }
2956
2957 async fn run<F, T>(&self, future: F) -> T
2958 where
2959 F: Future<Output = T>,
2960 {
2961 #[cfg(test)]
2962 {
2963 if let Some(background) = self.background.as_ref() {
2964 background.simulate_random_delay().await;
2965 }
2966
2967 self.runtime.as_ref().unwrap().block_on(future)
2968 }
2969
2970 #[cfg(not(test))]
2971 {
2972 future.await
2973 }
2974 }
2975}
2976
2977fn is_serialization_error(error: &Error) -> bool {
2978 const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
2979 match error {
2980 Error::Database(
2981 DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
2982 | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
2983 ) if error
2984 .as_database_error()
2985 .and_then(|error| error.code())
2986 .as_deref()
2987 == Some(SERIALIZATION_FAILURE_CODE) =>
2988 {
2989 true
2990 }
2991 _ => false,
2992 }
2993}
2994
2995struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
2996
2997impl Deref for TransactionHandle {
2998 type Target = DatabaseTransaction;
2999
3000 fn deref(&self) -> &Self::Target {
3001 self.0.as_ref().as_ref().unwrap()
3002 }
3003}
3004
3005pub struct RoomGuard<T> {
3006 data: T,
3007 _guard: OwnedMutexGuard<()>,
3008 _not_send: PhantomData<Rc<()>>,
3009}
3010
3011impl<T> Deref for RoomGuard<T> {
3012 type Target = T;
3013
3014 fn deref(&self) -> &T {
3015 &self.data
3016 }
3017}
3018
3019impl<T> DerefMut for RoomGuard<T> {
3020 fn deref_mut(&mut self) -> &mut T {
3021 &mut self.data
3022 }
3023}
3024
3025#[derive(Debug, Serialize, Deserialize)]
3026pub struct NewUserParams {
3027 pub github_login: String,
3028 pub github_user_id: i32,
3029 pub invite_count: i32,
3030}
3031
3032#[derive(Debug)]
3033pub struct NewUserResult {
3034 pub user_id: UserId,
3035 pub metrics_id: String,
3036 pub inviting_user_id: Option<UserId>,
3037 pub signup_device_id: Option<String>,
3038}
3039
3040fn random_invite_code() -> String {
3041 nanoid::nanoid!(16)
3042}
3043
3044fn random_email_confirmation_code() -> String {
3045 nanoid::nanoid!(64)
3046}
3047
3048macro_rules! id_type {
3049 ($name:ident) => {
3050 #[derive(
3051 Clone,
3052 Copy,
3053 Debug,
3054 Default,
3055 PartialEq,
3056 Eq,
3057 PartialOrd,
3058 Ord,
3059 Hash,
3060 Serialize,
3061 Deserialize,
3062 )]
3063 #[serde(transparent)]
3064 pub struct $name(pub i32);
3065
3066 impl $name {
3067 #[allow(unused)]
3068 pub const MAX: Self = Self(i32::MAX);
3069
3070 #[allow(unused)]
3071 pub fn from_proto(value: u64) -> Self {
3072 Self(value as i32)
3073 }
3074
3075 #[allow(unused)]
3076 pub fn to_proto(self) -> u64 {
3077 self.0 as u64
3078 }
3079 }
3080
3081 impl std::fmt::Display for $name {
3082 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
3083 self.0.fmt(f)
3084 }
3085 }
3086
3087 impl From<$name> for sea_query::Value {
3088 fn from(value: $name) -> Self {
3089 sea_query::Value::Int(Some(value.0))
3090 }
3091 }
3092
3093 impl sea_orm::TryGetable for $name {
3094 fn try_get(
3095 res: &sea_orm::QueryResult,
3096 pre: &str,
3097 col: &str,
3098 ) -> Result<Self, sea_orm::TryGetError> {
3099 Ok(Self(i32::try_get(res, pre, col)?))
3100 }
3101 }
3102
3103 impl sea_query::ValueType for $name {
3104 fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
3105 match v {
3106 Value::TinyInt(Some(int)) => {
3107 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3108 }
3109 Value::SmallInt(Some(int)) => {
3110 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3111 }
3112 Value::Int(Some(int)) => {
3113 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3114 }
3115 Value::BigInt(Some(int)) => {
3116 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3117 }
3118 Value::TinyUnsigned(Some(int)) => {
3119 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3120 }
3121 Value::SmallUnsigned(Some(int)) => {
3122 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3123 }
3124 Value::Unsigned(Some(int)) => {
3125 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3126 }
3127 Value::BigUnsigned(Some(int)) => {
3128 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3129 }
3130 _ => Err(sea_query::ValueTypeErr),
3131 }
3132 }
3133
3134 fn type_name() -> String {
3135 stringify!($name).into()
3136 }
3137
3138 fn array_type() -> sea_query::ArrayType {
3139 sea_query::ArrayType::Int
3140 }
3141
3142 fn column_type() -> sea_query::ColumnType {
3143 sea_query::ColumnType::Integer(None)
3144 }
3145 }
3146
3147 impl sea_orm::TryFromU64 for $name {
3148 fn try_from_u64(n: u64) -> Result<Self, DbErr> {
3149 Ok(Self(n.try_into().map_err(|_| {
3150 DbErr::ConvertFromU64(concat!(
3151 "error converting ",
3152 stringify!($name),
3153 " to u64"
3154 ))
3155 })?))
3156 }
3157 }
3158
3159 impl sea_query::Nullable for $name {
3160 fn null() -> Value {
3161 Value::Int(None)
3162 }
3163 }
3164 };
3165}
3166
3167id_type!(AccessTokenId);
3168id_type!(ContactId);
3169id_type!(FollowerId);
3170id_type!(RoomId);
3171id_type!(RoomParticipantId);
3172id_type!(ProjectId);
3173id_type!(ProjectCollaboratorId);
3174id_type!(ReplicaId);
3175id_type!(ServerId);
3176id_type!(SignupId);
3177id_type!(UserId);
3178
3179pub struct RejoinedRoom {
3180 pub room: proto::Room,
3181 pub rejoined_projects: Vec<RejoinedProject>,
3182 pub reshared_projects: Vec<ResharedProject>,
3183}
3184
3185pub struct ResharedProject {
3186 pub id: ProjectId,
3187 pub old_connection_id: ConnectionId,
3188 pub collaborators: Vec<ProjectCollaborator>,
3189 pub worktrees: Vec<proto::WorktreeMetadata>,
3190}
3191
3192pub struct RejoinedProject {
3193 pub id: ProjectId,
3194 pub old_connection_id: ConnectionId,
3195 pub collaborators: Vec<ProjectCollaborator>,
3196 pub worktrees: Vec<RejoinedWorktree>,
3197 pub language_servers: Vec<proto::LanguageServer>,
3198}
3199
3200#[derive(Debug)]
3201pub struct RejoinedWorktree {
3202 pub id: u64,
3203 pub abs_path: String,
3204 pub root_name: String,
3205 pub visible: bool,
3206 pub updated_entries: Vec<proto::Entry>,
3207 pub removed_entries: Vec<u64>,
3208 pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3209 pub scan_id: u64,
3210 pub completed_scan_id: u64,
3211}
3212
3213pub struct LeftRoom {
3214 pub room: proto::Room,
3215 pub left_projects: HashMap<ProjectId, LeftProject>,
3216 pub canceled_calls_to_user_ids: Vec<UserId>,
3217}
3218
3219pub struct RefreshedRoom {
3220 pub room: proto::Room,
3221 pub stale_participant_user_ids: Vec<UserId>,
3222 pub canceled_calls_to_user_ids: Vec<UserId>,
3223}
3224
3225pub struct Project {
3226 pub collaborators: Vec<ProjectCollaborator>,
3227 pub worktrees: BTreeMap<u64, Worktree>,
3228 pub language_servers: Vec<proto::LanguageServer>,
3229}
3230
3231pub struct ProjectCollaborator {
3232 pub connection_id: ConnectionId,
3233 pub user_id: UserId,
3234 pub replica_id: ReplicaId,
3235 pub is_host: bool,
3236}
3237
3238impl ProjectCollaborator {
3239 pub fn to_proto(&self) -> proto::Collaborator {
3240 proto::Collaborator {
3241 peer_id: Some(self.connection_id.into()),
3242 replica_id: self.replica_id.0 as u32,
3243 user_id: self.user_id.to_proto(),
3244 }
3245 }
3246}
3247
3248#[derive(Debug)]
3249pub struct LeftProject {
3250 pub id: ProjectId,
3251 pub host_user_id: UserId,
3252 pub host_connection_id: ConnectionId,
3253 pub connection_ids: Vec<ConnectionId>,
3254}
3255
3256pub struct Worktree {
3257 pub id: u64,
3258 pub abs_path: String,
3259 pub root_name: String,
3260 pub visible: bool,
3261 pub entries: Vec<proto::Entry>,
3262 pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3263 pub scan_id: u64,
3264 pub completed_scan_id: u64,
3265}
3266
3267#[cfg(test)]
3268pub use test::*;
3269
3270#[cfg(test)]
3271mod test {
3272 use super::*;
3273 use gpui::executor::Background;
3274 use lazy_static::lazy_static;
3275 use parking_lot::Mutex;
3276 use rand::prelude::*;
3277 use sea_orm::ConnectionTrait;
3278 use sqlx::migrate::MigrateDatabase;
3279 use std::sync::Arc;
3280
3281 pub struct TestDb {
3282 pub db: Option<Arc<Database>>,
3283 pub connection: Option<sqlx::AnyConnection>,
3284 }
3285
3286 impl TestDb {
3287 pub fn sqlite(background: Arc<Background>) -> Self {
3288 let url = format!("sqlite::memory:");
3289 let runtime = tokio::runtime::Builder::new_current_thread()
3290 .enable_io()
3291 .enable_time()
3292 .build()
3293 .unwrap();
3294
3295 let mut db = runtime.block_on(async {
3296 let mut options = ConnectOptions::new(url);
3297 options.max_connections(5);
3298 let db = Database::new(options).await.unwrap();
3299 let sql = include_str!(concat!(
3300 env!("CARGO_MANIFEST_DIR"),
3301 "/migrations.sqlite/20221109000000_test_schema.sql"
3302 ));
3303 db.pool
3304 .execute(sea_orm::Statement::from_string(
3305 db.pool.get_database_backend(),
3306 sql.into(),
3307 ))
3308 .await
3309 .unwrap();
3310 db
3311 });
3312
3313 db.background = Some(background);
3314 db.runtime = Some(runtime);
3315
3316 Self {
3317 db: Some(Arc::new(db)),
3318 connection: None,
3319 }
3320 }
3321
3322 pub fn postgres(background: Arc<Background>) -> Self {
3323 lazy_static! {
3324 static ref LOCK: Mutex<()> = Mutex::new(());
3325 }
3326
3327 let _guard = LOCK.lock();
3328 let mut rng = StdRng::from_entropy();
3329 let url = format!(
3330 "postgres://postgres@localhost/zed-test-{}",
3331 rng.gen::<u128>()
3332 );
3333 let runtime = tokio::runtime::Builder::new_current_thread()
3334 .enable_io()
3335 .enable_time()
3336 .build()
3337 .unwrap();
3338
3339 let mut db = runtime.block_on(async {
3340 sqlx::Postgres::create_database(&url)
3341 .await
3342 .expect("failed to create test db");
3343 let mut options = ConnectOptions::new(url);
3344 options
3345 .max_connections(5)
3346 .idle_timeout(Duration::from_secs(0));
3347 let db = Database::new(options).await.unwrap();
3348 let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
3349 db.migrate(Path::new(migrations_path), false).await.unwrap();
3350 db
3351 });
3352
3353 db.background = Some(background);
3354 db.runtime = Some(runtime);
3355
3356 Self {
3357 db: Some(Arc::new(db)),
3358 connection: None,
3359 }
3360 }
3361
3362 pub fn db(&self) -> &Arc<Database> {
3363 self.db.as_ref().unwrap()
3364 }
3365 }
3366
3367 impl Drop for TestDb {
3368 fn drop(&mut self) {
3369 let db = self.db.take().unwrap();
3370 if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
3371 db.runtime.as_ref().unwrap().block_on(async {
3372 use util::ResultExt;
3373 let query = "
3374 SELECT pg_terminate_backend(pg_stat_activity.pid)
3375 FROM pg_stat_activity
3376 WHERE
3377 pg_stat_activity.datname = current_database() AND
3378 pid <> pg_backend_pid();
3379 ";
3380 db.pool
3381 .execute(sea_orm::Statement::from_string(
3382 db.pool.get_database_backend(),
3383 query.into(),
3384 ))
3385 .await
3386 .log_err();
3387 sqlx::Postgres::drop_database(db.options.get_url())
3388 .await
3389 .log_err();
3390 })
3391 }
3392 }
3393 }
3394}