1mod access_token;
2mod channel;
3mod channel_member;
4mod channel_parent;
5mod contact;
6mod follower;
7mod language_server;
8mod project;
9mod project_collaborator;
10mod room;
11mod room_participant;
12mod server;
13mod signup;
14#[cfg(test)]
15mod tests;
16mod user;
17mod worktree;
18mod worktree_diagnostic_summary;
19mod worktree_entry;
20mod worktree_repository;
21mod worktree_repository_statuses;
22mod worktree_settings_file;
23
24use crate::executor::Executor;
25use crate::{Error, Result};
26use anyhow::anyhow;
27use collections::{BTreeMap, HashMap, HashSet};
28pub use contact::Contact;
29use dashmap::DashMap;
30use futures::StreamExt;
31use hyper::StatusCode;
32use rand::prelude::StdRng;
33use rand::{Rng, SeedableRng};
34use rpc::{proto, ConnectionId};
35use sea_orm::Condition;
36pub use sea_orm::ConnectOptions;
37use sea_orm::{
38 entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
39 DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
40 Statement, TransactionTrait,
41};
42use sea_query::{Alias, Expr, OnConflict, Query};
43use serde::{Deserialize, Serialize};
44pub use signup::{Invite, NewSignup, WaitlistSummary};
45use sqlx::migrate::{Migrate, Migration, MigrationSource};
46use sqlx::Connection;
47use std::fmt::Write as _;
48use std::ops::{Deref, DerefMut};
49use std::path::Path;
50use std::time::Duration;
51use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
52use tokio::sync::{Mutex, OwnedMutexGuard};
53pub use user::Model as User;
54
55pub struct Database {
56 options: ConnectOptions,
57 pool: DatabaseConnection,
58 rooms: DashMap<RoomId, Arc<Mutex<()>>>,
59 rng: Mutex<StdRng>,
60 executor: Executor,
61 #[cfg(test)]
62 runtime: Option<tokio::runtime::Runtime>,
63}
64
65impl Database {
66 pub async fn new(options: ConnectOptions, executor: Executor) -> Result<Self> {
67 Ok(Self {
68 options: options.clone(),
69 pool: sea_orm::Database::connect(options).await?,
70 rooms: DashMap::with_capacity(16384),
71 rng: Mutex::new(StdRng::seed_from_u64(0)),
72 executor,
73 #[cfg(test)]
74 runtime: None,
75 })
76 }
77
78 #[cfg(test)]
79 pub fn reset(&self) {
80 self.rooms.clear();
81 }
82
83 pub async fn migrate(
84 &self,
85 migrations_path: &Path,
86 ignore_checksum_mismatch: bool,
87 ) -> anyhow::Result<Vec<(Migration, Duration)>> {
88 let migrations = MigrationSource::resolve(migrations_path)
89 .await
90 .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
91
92 let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
93
94 connection.ensure_migrations_table().await?;
95 let applied_migrations: HashMap<_, _> = connection
96 .list_applied_migrations()
97 .await?
98 .into_iter()
99 .map(|m| (m.version, m))
100 .collect();
101
102 let mut new_migrations = Vec::new();
103 for migration in migrations {
104 match applied_migrations.get(&migration.version) {
105 Some(applied_migration) => {
106 if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
107 {
108 Err(anyhow!(
109 "checksum mismatch for applied migration {}",
110 migration.description
111 ))?;
112 }
113 }
114 None => {
115 let elapsed = connection.apply(&migration).await?;
116 new_migrations.push((migration, elapsed));
117 }
118 }
119 }
120
121 Ok(new_migrations)
122 }
123
124 pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
125 self.transaction(|tx| async move {
126 let server = server::ActiveModel {
127 environment: ActiveValue::set(environment.into()),
128 ..Default::default()
129 }
130 .insert(&*tx)
131 .await?;
132 Ok(server.id)
133 })
134 .await
135 }
136
137 pub async fn stale_room_ids(
138 &self,
139 environment: &str,
140 new_server_id: ServerId,
141 ) -> Result<Vec<RoomId>> {
142 self.transaction(|tx| async move {
143 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
144 enum QueryAs {
145 RoomId,
146 }
147
148 let stale_server_epochs = self
149 .stale_server_ids(environment, new_server_id, &tx)
150 .await?;
151 Ok(room_participant::Entity::find()
152 .select_only()
153 .column(room_participant::Column::RoomId)
154 .distinct()
155 .filter(
156 room_participant::Column::AnsweringConnectionServerId
157 .is_in(stale_server_epochs),
158 )
159 .into_values::<_, QueryAs>()
160 .all(&*tx)
161 .await?)
162 })
163 .await
164 }
165
166 pub async fn refresh_room(
167 &self,
168 room_id: RoomId,
169 new_server_id: ServerId,
170 ) -> Result<RoomGuard<RefreshedRoom>> {
171 self.room_transaction(room_id, |tx| async move {
172 let stale_participant_filter = Condition::all()
173 .add(room_participant::Column::RoomId.eq(room_id))
174 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
175 .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
176
177 let stale_participant_user_ids = room_participant::Entity::find()
178 .filter(stale_participant_filter.clone())
179 .all(&*tx)
180 .await?
181 .into_iter()
182 .map(|participant| participant.user_id)
183 .collect::<Vec<_>>();
184
185 // Delete participants who failed to reconnect and cancel their calls.
186 let mut canceled_calls_to_user_ids = Vec::new();
187 room_participant::Entity::delete_many()
188 .filter(stale_participant_filter)
189 .exec(&*tx)
190 .await?;
191 let called_participants = room_participant::Entity::find()
192 .filter(
193 Condition::all()
194 .add(
195 room_participant::Column::CallingUserId
196 .is_in(stale_participant_user_ids.iter().copied()),
197 )
198 .add(room_participant::Column::AnsweringConnectionId.is_null()),
199 )
200 .all(&*tx)
201 .await?;
202 room_participant::Entity::delete_many()
203 .filter(
204 room_participant::Column::Id
205 .is_in(called_participants.iter().map(|participant| participant.id)),
206 )
207 .exec(&*tx)
208 .await?;
209 canceled_calls_to_user_ids.extend(
210 called_participants
211 .into_iter()
212 .map(|participant| participant.user_id),
213 );
214
215 let room = self.get_room(room_id, &tx).await?;
216 // Delete the room if it becomes empty.
217 if room.participants.is_empty() {
218 project::Entity::delete_many()
219 .filter(project::Column::RoomId.eq(room_id))
220 .exec(&*tx)
221 .await?;
222 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
223 }
224
225 Ok(RefreshedRoom {
226 room,
227 stale_participant_user_ids,
228 canceled_calls_to_user_ids,
229 })
230 })
231 .await
232 }
233
234 pub async fn delete_stale_servers(
235 &self,
236 environment: &str,
237 new_server_id: ServerId,
238 ) -> Result<()> {
239 self.transaction(|tx| async move {
240 server::Entity::delete_many()
241 .filter(
242 Condition::all()
243 .add(server::Column::Environment.eq(environment))
244 .add(server::Column::Id.ne(new_server_id)),
245 )
246 .exec(&*tx)
247 .await?;
248 Ok(())
249 })
250 .await
251 }
252
253 async fn stale_server_ids(
254 &self,
255 environment: &str,
256 new_server_id: ServerId,
257 tx: &DatabaseTransaction,
258 ) -> Result<Vec<ServerId>> {
259 let stale_servers = server::Entity::find()
260 .filter(
261 Condition::all()
262 .add(server::Column::Environment.eq(environment))
263 .add(server::Column::Id.ne(new_server_id)),
264 )
265 .all(&*tx)
266 .await?;
267 Ok(stale_servers.into_iter().map(|server| server.id).collect())
268 }
269
270 // users
271
272 pub async fn create_user(
273 &self,
274 email_address: &str,
275 admin: bool,
276 params: NewUserParams,
277 ) -> Result<NewUserResult> {
278 self.transaction(|tx| async {
279 let tx = tx;
280 let user = user::Entity::insert(user::ActiveModel {
281 email_address: ActiveValue::set(Some(email_address.into())),
282 github_login: ActiveValue::set(params.github_login.clone()),
283 github_user_id: ActiveValue::set(Some(params.github_user_id)),
284 admin: ActiveValue::set(admin),
285 metrics_id: ActiveValue::set(Uuid::new_v4()),
286 ..Default::default()
287 })
288 .on_conflict(
289 OnConflict::column(user::Column::GithubLogin)
290 .update_column(user::Column::GithubLogin)
291 .to_owned(),
292 )
293 .exec_with_returning(&*tx)
294 .await?;
295
296 Ok(NewUserResult {
297 user_id: user.id,
298 metrics_id: user.metrics_id.to_string(),
299 signup_device_id: None,
300 inviting_user_id: None,
301 })
302 })
303 .await
304 }
305
306 pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
307 self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
308 .await
309 }
310
311 pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
312 self.transaction(|tx| async {
313 let tx = tx;
314 Ok(user::Entity::find()
315 .filter(user::Column::Id.is_in(ids.iter().copied()))
316 .all(&*tx)
317 .await?)
318 })
319 .await
320 }
321
322 pub async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>> {
323 self.transaction(|tx| async move {
324 Ok(user::Entity::find()
325 .filter(user::Column::GithubLogin.eq(github_login))
326 .one(&*tx)
327 .await?)
328 })
329 .await
330 }
331
332 pub async fn get_or_create_user_by_github_account(
333 &self,
334 github_login: &str,
335 github_user_id: Option<i32>,
336 github_email: Option<&str>,
337 ) -> Result<Option<User>> {
338 self.transaction(|tx| async move {
339 let tx = &*tx;
340 if let Some(github_user_id) = github_user_id {
341 if let Some(user_by_github_user_id) = user::Entity::find()
342 .filter(user::Column::GithubUserId.eq(github_user_id))
343 .one(tx)
344 .await?
345 {
346 let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
347 user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
348 Ok(Some(user_by_github_user_id.update(tx).await?))
349 } else if let Some(user_by_github_login) = user::Entity::find()
350 .filter(user::Column::GithubLogin.eq(github_login))
351 .one(tx)
352 .await?
353 {
354 let mut user_by_github_login = user_by_github_login.into_active_model();
355 user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
356 Ok(Some(user_by_github_login.update(tx).await?))
357 } else {
358 let user = user::Entity::insert(user::ActiveModel {
359 email_address: ActiveValue::set(github_email.map(|email| email.into())),
360 github_login: ActiveValue::set(github_login.into()),
361 github_user_id: ActiveValue::set(Some(github_user_id)),
362 admin: ActiveValue::set(false),
363 invite_count: ActiveValue::set(0),
364 invite_code: ActiveValue::set(None),
365 metrics_id: ActiveValue::set(Uuid::new_v4()),
366 ..Default::default()
367 })
368 .exec_with_returning(&*tx)
369 .await?;
370 Ok(Some(user))
371 }
372 } else {
373 Ok(user::Entity::find()
374 .filter(user::Column::GithubLogin.eq(github_login))
375 .one(tx)
376 .await?)
377 }
378 })
379 .await
380 }
381
382 pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
383 self.transaction(|tx| async move {
384 Ok(user::Entity::find()
385 .order_by_asc(user::Column::GithubLogin)
386 .limit(limit as u64)
387 .offset(page as u64 * limit as u64)
388 .all(&*tx)
389 .await?)
390 })
391 .await
392 }
393
394 pub async fn get_users_with_no_invites(
395 &self,
396 invited_by_another_user: bool,
397 ) -> Result<Vec<User>> {
398 self.transaction(|tx| async move {
399 Ok(user::Entity::find()
400 .filter(
401 user::Column::InviteCount
402 .eq(0)
403 .and(if invited_by_another_user {
404 user::Column::InviterId.is_not_null()
405 } else {
406 user::Column::InviterId.is_null()
407 }),
408 )
409 .all(&*tx)
410 .await?)
411 })
412 .await
413 }
414
415 pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
416 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
417 enum QueryAs {
418 MetricsId,
419 }
420
421 self.transaction(|tx| async move {
422 let metrics_id: Uuid = user::Entity::find_by_id(id)
423 .select_only()
424 .column(user::Column::MetricsId)
425 .into_values::<_, QueryAs>()
426 .one(&*tx)
427 .await?
428 .ok_or_else(|| anyhow!("could not find user"))?;
429 Ok(metrics_id.to_string())
430 })
431 .await
432 }
433
434 pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
435 self.transaction(|tx| async move {
436 user::Entity::update_many()
437 .filter(user::Column::Id.eq(id))
438 .set(user::ActiveModel {
439 admin: ActiveValue::set(is_admin),
440 ..Default::default()
441 })
442 .exec(&*tx)
443 .await?;
444 Ok(())
445 })
446 .await
447 }
448
449 pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
450 self.transaction(|tx| async move {
451 user::Entity::update_many()
452 .filter(user::Column::Id.eq(id))
453 .set(user::ActiveModel {
454 connected_once: ActiveValue::set(connected_once),
455 ..Default::default()
456 })
457 .exec(&*tx)
458 .await?;
459 Ok(())
460 })
461 .await
462 }
463
464 pub async fn destroy_user(&self, id: UserId) -> Result<()> {
465 self.transaction(|tx| async move {
466 access_token::Entity::delete_many()
467 .filter(access_token::Column::UserId.eq(id))
468 .exec(&*tx)
469 .await?;
470 user::Entity::delete_by_id(id).exec(&*tx).await?;
471 Ok(())
472 })
473 .await
474 }
475
476 // contacts
477
478 pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
479 #[derive(Debug, FromQueryResult)]
480 struct ContactWithUserBusyStatuses {
481 user_id_a: UserId,
482 user_id_b: UserId,
483 a_to_b: bool,
484 accepted: bool,
485 should_notify: bool,
486 user_a_busy: bool,
487 user_b_busy: bool,
488 }
489
490 self.transaction(|tx| async move {
491 let user_a_participant = Alias::new("user_a_participant");
492 let user_b_participant = Alias::new("user_b_participant");
493 let mut db_contacts = contact::Entity::find()
494 .column_as(
495 Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
496 .is_not_null(),
497 "user_a_busy",
498 )
499 .column_as(
500 Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
501 .is_not_null(),
502 "user_b_busy",
503 )
504 .filter(
505 contact::Column::UserIdA
506 .eq(user_id)
507 .or(contact::Column::UserIdB.eq(user_id)),
508 )
509 .join_as(
510 JoinType::LeftJoin,
511 contact::Relation::UserARoomParticipant.def(),
512 user_a_participant,
513 )
514 .join_as(
515 JoinType::LeftJoin,
516 contact::Relation::UserBRoomParticipant.def(),
517 user_b_participant,
518 )
519 .into_model::<ContactWithUserBusyStatuses>()
520 .stream(&*tx)
521 .await?;
522
523 let mut contacts = Vec::new();
524 while let Some(db_contact) = db_contacts.next().await {
525 let db_contact = db_contact?;
526 if db_contact.user_id_a == user_id {
527 if db_contact.accepted {
528 contacts.push(Contact::Accepted {
529 user_id: db_contact.user_id_b,
530 should_notify: db_contact.should_notify && db_contact.a_to_b,
531 busy: db_contact.user_b_busy,
532 });
533 } else if db_contact.a_to_b {
534 contacts.push(Contact::Outgoing {
535 user_id: db_contact.user_id_b,
536 })
537 } else {
538 contacts.push(Contact::Incoming {
539 user_id: db_contact.user_id_b,
540 should_notify: db_contact.should_notify,
541 });
542 }
543 } else if db_contact.accepted {
544 contacts.push(Contact::Accepted {
545 user_id: db_contact.user_id_a,
546 should_notify: db_contact.should_notify && !db_contact.a_to_b,
547 busy: db_contact.user_a_busy,
548 });
549 } else if db_contact.a_to_b {
550 contacts.push(Contact::Incoming {
551 user_id: db_contact.user_id_a,
552 should_notify: db_contact.should_notify,
553 });
554 } else {
555 contacts.push(Contact::Outgoing {
556 user_id: db_contact.user_id_a,
557 });
558 }
559 }
560
561 contacts.sort_unstable_by_key(|contact| contact.user_id());
562
563 Ok(contacts)
564 })
565 .await
566 }
567
568 pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
569 self.transaction(|tx| async move {
570 let participant = room_participant::Entity::find()
571 .filter(room_participant::Column::UserId.eq(user_id))
572 .one(&*tx)
573 .await?;
574 Ok(participant.is_some())
575 })
576 .await
577 }
578
579 pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
580 self.transaction(|tx| async move {
581 let (id_a, id_b) = if user_id_1 < user_id_2 {
582 (user_id_1, user_id_2)
583 } else {
584 (user_id_2, user_id_1)
585 };
586
587 Ok(contact::Entity::find()
588 .filter(
589 contact::Column::UserIdA
590 .eq(id_a)
591 .and(contact::Column::UserIdB.eq(id_b))
592 .and(contact::Column::Accepted.eq(true)),
593 )
594 .one(&*tx)
595 .await?
596 .is_some())
597 })
598 .await
599 }
600
601 pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
602 self.transaction(|tx| async move {
603 let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
604 (sender_id, receiver_id, true)
605 } else {
606 (receiver_id, sender_id, false)
607 };
608
609 let rows_affected = contact::Entity::insert(contact::ActiveModel {
610 user_id_a: ActiveValue::set(id_a),
611 user_id_b: ActiveValue::set(id_b),
612 a_to_b: ActiveValue::set(a_to_b),
613 accepted: ActiveValue::set(false),
614 should_notify: ActiveValue::set(true),
615 ..Default::default()
616 })
617 .on_conflict(
618 OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
619 .values([
620 (contact::Column::Accepted, true.into()),
621 (contact::Column::ShouldNotify, false.into()),
622 ])
623 .action_and_where(
624 contact::Column::Accepted.eq(false).and(
625 contact::Column::AToB
626 .eq(a_to_b)
627 .and(contact::Column::UserIdA.eq(id_b))
628 .or(contact::Column::AToB
629 .ne(a_to_b)
630 .and(contact::Column::UserIdA.eq(id_a))),
631 ),
632 )
633 .to_owned(),
634 )
635 .exec_without_returning(&*tx)
636 .await?;
637
638 if rows_affected == 1 {
639 Ok(())
640 } else {
641 Err(anyhow!("contact already requested"))?
642 }
643 })
644 .await
645 }
646
647 /// Returns a bool indicating whether the removed contact had originally accepted or not
648 ///
649 /// Deletes the contact identified by the requester and responder ids, and then returns
650 /// whether the deleted contact had originally accepted or was a pending contact request.
651 ///
652 /// # Arguments
653 ///
654 /// * `requester_id` - The user that initiates this request
655 /// * `responder_id` - The user that will be removed
656 pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<bool> {
657 self.transaction(|tx| async move {
658 let (id_a, id_b) = if responder_id < requester_id {
659 (responder_id, requester_id)
660 } else {
661 (requester_id, responder_id)
662 };
663
664 let contact = contact::Entity::find()
665 .filter(
666 contact::Column::UserIdA
667 .eq(id_a)
668 .and(contact::Column::UserIdB.eq(id_b)),
669 )
670 .one(&*tx)
671 .await?
672 .ok_or_else(|| anyhow!("no such contact"))?;
673
674 contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
675 Ok(contact.accepted)
676 })
677 .await
678 }
679
680 pub async fn dismiss_contact_notification(
681 &self,
682 user_id: UserId,
683 contact_user_id: UserId,
684 ) -> Result<()> {
685 self.transaction(|tx| async move {
686 let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
687 (user_id, contact_user_id, true)
688 } else {
689 (contact_user_id, user_id, false)
690 };
691
692 let result = contact::Entity::update_many()
693 .set(contact::ActiveModel {
694 should_notify: ActiveValue::set(false),
695 ..Default::default()
696 })
697 .filter(
698 contact::Column::UserIdA
699 .eq(id_a)
700 .and(contact::Column::UserIdB.eq(id_b))
701 .and(
702 contact::Column::AToB
703 .eq(a_to_b)
704 .and(contact::Column::Accepted.eq(true))
705 .or(contact::Column::AToB
706 .ne(a_to_b)
707 .and(contact::Column::Accepted.eq(false))),
708 ),
709 )
710 .exec(&*tx)
711 .await?;
712 if result.rows_affected == 0 {
713 Err(anyhow!("no such contact request"))?
714 } else {
715 Ok(())
716 }
717 })
718 .await
719 }
720
721 pub async fn respond_to_contact_request(
722 &self,
723 responder_id: UserId,
724 requester_id: UserId,
725 accept: bool,
726 ) -> Result<()> {
727 self.transaction(|tx| async move {
728 let (id_a, id_b, a_to_b) = if responder_id < requester_id {
729 (responder_id, requester_id, false)
730 } else {
731 (requester_id, responder_id, true)
732 };
733 let rows_affected = if accept {
734 let result = contact::Entity::update_many()
735 .set(contact::ActiveModel {
736 accepted: ActiveValue::set(true),
737 should_notify: ActiveValue::set(true),
738 ..Default::default()
739 })
740 .filter(
741 contact::Column::UserIdA
742 .eq(id_a)
743 .and(contact::Column::UserIdB.eq(id_b))
744 .and(contact::Column::AToB.eq(a_to_b)),
745 )
746 .exec(&*tx)
747 .await?;
748 result.rows_affected
749 } else {
750 let result = contact::Entity::delete_many()
751 .filter(
752 contact::Column::UserIdA
753 .eq(id_a)
754 .and(contact::Column::UserIdB.eq(id_b))
755 .and(contact::Column::AToB.eq(a_to_b))
756 .and(contact::Column::Accepted.eq(false)),
757 )
758 .exec(&*tx)
759 .await?;
760
761 result.rows_affected
762 };
763
764 if rows_affected == 1 {
765 Ok(())
766 } else {
767 Err(anyhow!("no such contact request"))?
768 }
769 })
770 .await
771 }
772
773 pub fn fuzzy_like_string(string: &str) -> String {
774 let mut result = String::with_capacity(string.len() * 2 + 1);
775 for c in string.chars() {
776 if c.is_alphanumeric() {
777 result.push('%');
778 result.push(c);
779 }
780 }
781 result.push('%');
782 result
783 }
784
785 pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
786 self.transaction(|tx| async {
787 let tx = tx;
788 let like_string = Self::fuzzy_like_string(name_query);
789 let query = "
790 SELECT users.*
791 FROM users
792 WHERE github_login ILIKE $1
793 ORDER BY github_login <-> $2
794 LIMIT $3
795 ";
796
797 Ok(user::Entity::find()
798 .from_raw_sql(Statement::from_sql_and_values(
799 self.pool.get_database_backend(),
800 query.into(),
801 vec![like_string.into(), name_query.into(), limit.into()],
802 ))
803 .all(&*tx)
804 .await?)
805 })
806 .await
807 }
808
809 // signups
810
811 pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
812 self.transaction(|tx| async move {
813 signup::Entity::insert(signup::ActiveModel {
814 email_address: ActiveValue::set(signup.email_address.clone()),
815 email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
816 email_confirmation_sent: ActiveValue::set(false),
817 platform_mac: ActiveValue::set(signup.platform_mac),
818 platform_windows: ActiveValue::set(signup.platform_windows),
819 platform_linux: ActiveValue::set(signup.platform_linux),
820 platform_unknown: ActiveValue::set(false),
821 editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
822 programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
823 device_id: ActiveValue::set(signup.device_id.clone()),
824 added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
825 ..Default::default()
826 })
827 .on_conflict(
828 OnConflict::column(signup::Column::EmailAddress)
829 .update_columns([
830 signup::Column::PlatformMac,
831 signup::Column::PlatformWindows,
832 signup::Column::PlatformLinux,
833 signup::Column::EditorFeatures,
834 signup::Column::ProgrammingLanguages,
835 signup::Column::DeviceId,
836 signup::Column::AddedToMailingList,
837 ])
838 .to_owned(),
839 )
840 .exec(&*tx)
841 .await?;
842 Ok(())
843 })
844 .await
845 }
846
847 pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
848 self.transaction(|tx| async move {
849 let signup = signup::Entity::find()
850 .filter(signup::Column::EmailAddress.eq(email_address))
851 .one(&*tx)
852 .await?
853 .ok_or_else(|| {
854 anyhow!("signup with email address {} doesn't exist", email_address)
855 })?;
856
857 Ok(signup)
858 })
859 .await
860 }
861
862 pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
863 self.transaction(|tx| async move {
864 let query = "
865 SELECT
866 COUNT(*) as count,
867 COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
868 COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
869 COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
870 COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
871 FROM (
872 SELECT *
873 FROM signups
874 WHERE
875 NOT email_confirmation_sent
876 ) AS unsent
877 ";
878 Ok(
879 WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
880 self.pool.get_database_backend(),
881 query.into(),
882 vec![],
883 ))
884 .one(&*tx)
885 .await?
886 .ok_or_else(|| anyhow!("invalid result"))?,
887 )
888 })
889 .await
890 }
891
892 pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
893 let emails = invites
894 .iter()
895 .map(|s| s.email_address.as_str())
896 .collect::<Vec<_>>();
897 self.transaction(|tx| async {
898 let tx = tx;
899 signup::Entity::update_many()
900 .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
901 .set(signup::ActiveModel {
902 email_confirmation_sent: ActiveValue::set(true),
903 ..Default::default()
904 })
905 .exec(&*tx)
906 .await?;
907 Ok(())
908 })
909 .await
910 }
911
912 pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
913 self.transaction(|tx| async move {
914 Ok(signup::Entity::find()
915 .select_only()
916 .column(signup::Column::EmailAddress)
917 .column(signup::Column::EmailConfirmationCode)
918 .filter(
919 signup::Column::EmailConfirmationSent.eq(false).and(
920 signup::Column::PlatformMac
921 .eq(true)
922 .or(signup::Column::PlatformUnknown.eq(true)),
923 ),
924 )
925 .order_by_asc(signup::Column::CreatedAt)
926 .limit(count as u64)
927 .into_model()
928 .all(&*tx)
929 .await?)
930 })
931 .await
932 }
933
934 // invite codes
935
936 pub async fn create_invite_from_code(
937 &self,
938 code: &str,
939 email_address: &str,
940 device_id: Option<&str>,
941 added_to_mailing_list: bool,
942 ) -> Result<Invite> {
943 self.transaction(|tx| async move {
944 let existing_user = user::Entity::find()
945 .filter(user::Column::EmailAddress.eq(email_address))
946 .one(&*tx)
947 .await?;
948
949 if existing_user.is_some() {
950 Err(anyhow!("email address is already in use"))?;
951 }
952
953 let inviting_user_with_invites = match user::Entity::find()
954 .filter(
955 user::Column::InviteCode
956 .eq(code)
957 .and(user::Column::InviteCount.gt(0)),
958 )
959 .one(&*tx)
960 .await?
961 {
962 Some(inviting_user) => inviting_user,
963 None => {
964 return Err(Error::Http(
965 StatusCode::UNAUTHORIZED,
966 "unable to find an invite code with invites remaining".to_string(),
967 ))?
968 }
969 };
970 user::Entity::update_many()
971 .filter(
972 user::Column::Id
973 .eq(inviting_user_with_invites.id)
974 .and(user::Column::InviteCount.gt(0)),
975 )
976 .col_expr(
977 user::Column::InviteCount,
978 Expr::col(user::Column::InviteCount).sub(1),
979 )
980 .exec(&*tx)
981 .await?;
982
983 let signup = signup::Entity::insert(signup::ActiveModel {
984 email_address: ActiveValue::set(email_address.into()),
985 email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
986 email_confirmation_sent: ActiveValue::set(false),
987 inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
988 platform_linux: ActiveValue::set(false),
989 platform_mac: ActiveValue::set(false),
990 platform_windows: ActiveValue::set(false),
991 platform_unknown: ActiveValue::set(true),
992 device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
993 added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
994 ..Default::default()
995 })
996 .on_conflict(
997 OnConflict::column(signup::Column::EmailAddress)
998 .update_column(signup::Column::InvitingUserId)
999 .to_owned(),
1000 )
1001 .exec_with_returning(&*tx)
1002 .await?;
1003
1004 Ok(Invite {
1005 email_address: signup.email_address,
1006 email_confirmation_code: signup.email_confirmation_code,
1007 })
1008 })
1009 .await
1010 }
1011
1012 pub async fn create_user_from_invite(
1013 &self,
1014 invite: &Invite,
1015 user: NewUserParams,
1016 ) -> Result<Option<NewUserResult>> {
1017 self.transaction(|tx| async {
1018 let tx = tx;
1019 let signup = signup::Entity::find()
1020 .filter(
1021 signup::Column::EmailAddress
1022 .eq(invite.email_address.as_str())
1023 .and(
1024 signup::Column::EmailConfirmationCode
1025 .eq(invite.email_confirmation_code.as_str()),
1026 ),
1027 )
1028 .one(&*tx)
1029 .await?
1030 .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
1031
1032 if signup.user_id.is_some() {
1033 return Ok(None);
1034 }
1035
1036 let user = user::Entity::insert(user::ActiveModel {
1037 email_address: ActiveValue::set(Some(invite.email_address.clone())),
1038 github_login: ActiveValue::set(user.github_login.clone()),
1039 github_user_id: ActiveValue::set(Some(user.github_user_id)),
1040 admin: ActiveValue::set(false),
1041 invite_count: ActiveValue::set(user.invite_count),
1042 invite_code: ActiveValue::set(Some(random_invite_code())),
1043 metrics_id: ActiveValue::set(Uuid::new_v4()),
1044 ..Default::default()
1045 })
1046 .on_conflict(
1047 OnConflict::column(user::Column::GithubLogin)
1048 .update_columns([
1049 user::Column::EmailAddress,
1050 user::Column::GithubUserId,
1051 user::Column::Admin,
1052 ])
1053 .to_owned(),
1054 )
1055 .exec_with_returning(&*tx)
1056 .await?;
1057
1058 let mut signup = signup.into_active_model();
1059 signup.user_id = ActiveValue::set(Some(user.id));
1060 let signup = signup.update(&*tx).await?;
1061
1062 if let Some(inviting_user_id) = signup.inviting_user_id {
1063 let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1064 (inviting_user_id, user.id, true)
1065 } else {
1066 (user.id, inviting_user_id, false)
1067 };
1068
1069 contact::Entity::insert(contact::ActiveModel {
1070 user_id_a: ActiveValue::set(user_id_a),
1071 user_id_b: ActiveValue::set(user_id_b),
1072 a_to_b: ActiveValue::set(a_to_b),
1073 should_notify: ActiveValue::set(true),
1074 accepted: ActiveValue::set(true),
1075 ..Default::default()
1076 })
1077 .on_conflict(OnConflict::new().do_nothing().to_owned())
1078 .exec_without_returning(&*tx)
1079 .await?;
1080 }
1081
1082 Ok(Some(NewUserResult {
1083 user_id: user.id,
1084 metrics_id: user.metrics_id.to_string(),
1085 inviting_user_id: signup.inviting_user_id,
1086 signup_device_id: signup.device_id,
1087 }))
1088 })
1089 .await
1090 }
1091
1092 pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1093 self.transaction(|tx| async move {
1094 if count > 0 {
1095 user::Entity::update_many()
1096 .filter(
1097 user::Column::Id
1098 .eq(id)
1099 .and(user::Column::InviteCode.is_null()),
1100 )
1101 .set(user::ActiveModel {
1102 invite_code: ActiveValue::set(Some(random_invite_code())),
1103 ..Default::default()
1104 })
1105 .exec(&*tx)
1106 .await?;
1107 }
1108
1109 user::Entity::update_many()
1110 .filter(user::Column::Id.eq(id))
1111 .set(user::ActiveModel {
1112 invite_count: ActiveValue::set(count),
1113 ..Default::default()
1114 })
1115 .exec(&*tx)
1116 .await?;
1117 Ok(())
1118 })
1119 .await
1120 }
1121
1122 pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1123 self.transaction(|tx| async move {
1124 match user::Entity::find_by_id(id).one(&*tx).await? {
1125 Some(user) if user.invite_code.is_some() => {
1126 Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1127 }
1128 _ => Ok(None),
1129 }
1130 })
1131 .await
1132 }
1133
1134 pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1135 self.transaction(|tx| async move {
1136 user::Entity::find()
1137 .filter(user::Column::InviteCode.eq(code))
1138 .one(&*tx)
1139 .await?
1140 .ok_or_else(|| {
1141 Error::Http(
1142 StatusCode::NOT_FOUND,
1143 "that invite code does not exist".to_string(),
1144 )
1145 })
1146 })
1147 .await
1148 }
1149
1150 // rooms
1151
1152 pub async fn incoming_call_for_user(
1153 &self,
1154 user_id: UserId,
1155 ) -> Result<Option<proto::IncomingCall>> {
1156 self.transaction(|tx| async move {
1157 let pending_participant = room_participant::Entity::find()
1158 .filter(
1159 room_participant::Column::UserId
1160 .eq(user_id)
1161 .and(room_participant::Column::AnsweringConnectionId.is_null()),
1162 )
1163 .one(&*tx)
1164 .await?;
1165
1166 if let Some(pending_participant) = pending_participant {
1167 let room = self.get_room(pending_participant.room_id, &tx).await?;
1168 Ok(Self::build_incoming_call(&room, user_id))
1169 } else {
1170 Ok(None)
1171 }
1172 })
1173 .await
1174 }
1175
1176 pub async fn create_room(
1177 &self,
1178 user_id: UserId,
1179 connection: ConnectionId,
1180 live_kit_room: &str,
1181 ) -> Result<proto::Room> {
1182 self.transaction(|tx| async move {
1183 let room = room::ActiveModel {
1184 live_kit_room: ActiveValue::set(live_kit_room.into()),
1185 ..Default::default()
1186 }
1187 .insert(&*tx)
1188 .await?;
1189 room_participant::ActiveModel {
1190 room_id: ActiveValue::set(room.id),
1191 user_id: ActiveValue::set(user_id),
1192 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1193 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1194 connection.owner_id as i32,
1195 ))),
1196 answering_connection_lost: ActiveValue::set(false),
1197 calling_user_id: ActiveValue::set(user_id),
1198 calling_connection_id: ActiveValue::set(connection.id as i32),
1199 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1200 connection.owner_id as i32,
1201 ))),
1202 ..Default::default()
1203 }
1204 .insert(&*tx)
1205 .await?;
1206
1207 let room = self.get_room(room.id, &tx).await?;
1208 Ok(room)
1209 })
1210 .await
1211 }
1212
1213 pub async fn call(
1214 &self,
1215 room_id: RoomId,
1216 calling_user_id: UserId,
1217 calling_connection: ConnectionId,
1218 called_user_id: UserId,
1219 initial_project_id: Option<ProjectId>,
1220 ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1221 self.room_transaction(room_id, |tx| async move {
1222 room_participant::ActiveModel {
1223 room_id: ActiveValue::set(room_id),
1224 user_id: ActiveValue::set(called_user_id),
1225 answering_connection_lost: ActiveValue::set(false),
1226 calling_user_id: ActiveValue::set(calling_user_id),
1227 calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1228 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1229 calling_connection.owner_id as i32,
1230 ))),
1231 initial_project_id: ActiveValue::set(initial_project_id),
1232 ..Default::default()
1233 }
1234 .insert(&*tx)
1235 .await?;
1236
1237 let room = self.get_room(room_id, &tx).await?;
1238 let incoming_call = Self::build_incoming_call(&room, called_user_id)
1239 .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1240 Ok((room, incoming_call))
1241 })
1242 .await
1243 }
1244
1245 pub async fn call_failed(
1246 &self,
1247 room_id: RoomId,
1248 called_user_id: UserId,
1249 ) -> Result<RoomGuard<proto::Room>> {
1250 self.room_transaction(room_id, |tx| async move {
1251 room_participant::Entity::delete_many()
1252 .filter(
1253 room_participant::Column::RoomId
1254 .eq(room_id)
1255 .and(room_participant::Column::UserId.eq(called_user_id)),
1256 )
1257 .exec(&*tx)
1258 .await?;
1259 let room = self.get_room(room_id, &tx).await?;
1260 Ok(room)
1261 })
1262 .await
1263 }
1264
1265 pub async fn decline_call(
1266 &self,
1267 expected_room_id: Option<RoomId>,
1268 user_id: UserId,
1269 ) -> Result<Option<RoomGuard<proto::Room>>> {
1270 self.optional_room_transaction(|tx| async move {
1271 let mut filter = Condition::all()
1272 .add(room_participant::Column::UserId.eq(user_id))
1273 .add(room_participant::Column::AnsweringConnectionId.is_null());
1274 if let Some(room_id) = expected_room_id {
1275 filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1276 }
1277 let participant = room_participant::Entity::find()
1278 .filter(filter)
1279 .one(&*tx)
1280 .await?;
1281
1282 let participant = if let Some(participant) = participant {
1283 participant
1284 } else if expected_room_id.is_some() {
1285 return Err(anyhow!("could not find call to decline"))?;
1286 } else {
1287 return Ok(None);
1288 };
1289
1290 let room_id = participant.room_id;
1291 room_participant::Entity::delete(participant.into_active_model())
1292 .exec(&*tx)
1293 .await?;
1294
1295 let room = self.get_room(room_id, &tx).await?;
1296 Ok(Some((room_id, room)))
1297 })
1298 .await
1299 }
1300
1301 pub async fn cancel_call(
1302 &self,
1303 room_id: RoomId,
1304 calling_connection: ConnectionId,
1305 called_user_id: UserId,
1306 ) -> Result<RoomGuard<proto::Room>> {
1307 self.room_transaction(room_id, |tx| async move {
1308 let participant = room_participant::Entity::find()
1309 .filter(
1310 Condition::all()
1311 .add(room_participant::Column::UserId.eq(called_user_id))
1312 .add(room_participant::Column::RoomId.eq(room_id))
1313 .add(
1314 room_participant::Column::CallingConnectionId
1315 .eq(calling_connection.id as i32),
1316 )
1317 .add(
1318 room_participant::Column::CallingConnectionServerId
1319 .eq(calling_connection.owner_id as i32),
1320 )
1321 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1322 )
1323 .one(&*tx)
1324 .await?
1325 .ok_or_else(|| anyhow!("no call to cancel"))?;
1326
1327 room_participant::Entity::delete(participant.into_active_model())
1328 .exec(&*tx)
1329 .await?;
1330
1331 let room = self.get_room(room_id, &tx).await?;
1332 Ok(room)
1333 })
1334 .await
1335 }
1336
1337 pub async fn join_room(
1338 &self,
1339 room_id: RoomId,
1340 user_id: UserId,
1341 channel_id: Option<ChannelId>,
1342 connection: ConnectionId,
1343 ) -> Result<RoomGuard<proto::Room>> {
1344 self.room_transaction(room_id, |tx| async move {
1345 if let Some(channel_id) = channel_id {
1346 channel_member::Entity::find()
1347 .filter(
1348 channel_member::Column::ChannelId
1349 .eq(channel_id)
1350 .and(channel_member::Column::UserId.eq(user_id))
1351 .and(channel_member::Column::Accepted.eq(true)),
1352 )
1353 .one(&*tx)
1354 .await?
1355 .ok_or_else(|| anyhow!("no such channel membership"))?;
1356
1357 room_participant::ActiveModel {
1358 room_id: ActiveValue::set(room_id),
1359 user_id: ActiveValue::set(user_id),
1360 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1361 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1362 connection.owner_id as i32,
1363 ))),
1364 answering_connection_lost: ActiveValue::set(false),
1365 // Redundant for the channel join use case, used for channel and call invitations
1366 calling_user_id: ActiveValue::set(user_id),
1367 calling_connection_id: ActiveValue::set(connection.id as i32),
1368 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1369 connection.owner_id as i32,
1370 ))),
1371 ..Default::default()
1372 }
1373 .insert(&*tx)
1374 .await?;
1375 } else {
1376 let result = room_participant::Entity::update_many()
1377 .filter(
1378 Condition::all()
1379 .add(room_participant::Column::RoomId.eq(room_id))
1380 .add(room_participant::Column::UserId.eq(user_id))
1381 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1382 )
1383 .set(room_participant::ActiveModel {
1384 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1385 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1386 connection.owner_id as i32,
1387 ))),
1388 answering_connection_lost: ActiveValue::set(false),
1389 ..Default::default()
1390 })
1391 .exec(&*tx)
1392 .await?;
1393 if result.rows_affected == 0 {
1394 Err(anyhow!("room does not exist or was already joined"))?;
1395 }
1396 }
1397
1398 let room = self.get_room(room_id, &tx).await?;
1399 Ok(room)
1400 })
1401 .await
1402 }
1403
1404 pub async fn rejoin_room(
1405 &self,
1406 rejoin_room: proto::RejoinRoom,
1407 user_id: UserId,
1408 connection: ConnectionId,
1409 ) -> Result<RoomGuard<RejoinedRoom>> {
1410 let room_id = RoomId::from_proto(rejoin_room.id);
1411 self.room_transaction(room_id, |tx| async {
1412 let tx = tx;
1413 let participant_update = room_participant::Entity::update_many()
1414 .filter(
1415 Condition::all()
1416 .add(room_participant::Column::RoomId.eq(room_id))
1417 .add(room_participant::Column::UserId.eq(user_id))
1418 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1419 .add(
1420 Condition::any()
1421 .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1422 .add(
1423 room_participant::Column::AnsweringConnectionServerId
1424 .ne(connection.owner_id as i32),
1425 ),
1426 ),
1427 )
1428 .set(room_participant::ActiveModel {
1429 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1430 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1431 connection.owner_id as i32,
1432 ))),
1433 answering_connection_lost: ActiveValue::set(false),
1434 ..Default::default()
1435 })
1436 .exec(&*tx)
1437 .await?;
1438 if participant_update.rows_affected == 0 {
1439 return Err(anyhow!("room does not exist or was already joined"))?;
1440 }
1441
1442 let mut reshared_projects = Vec::new();
1443 for reshared_project in &rejoin_room.reshared_projects {
1444 let project_id = ProjectId::from_proto(reshared_project.project_id);
1445 let project = project::Entity::find_by_id(project_id)
1446 .one(&*tx)
1447 .await?
1448 .ok_or_else(|| anyhow!("project does not exist"))?;
1449 if project.host_user_id != user_id {
1450 return Err(anyhow!("no such project"))?;
1451 }
1452
1453 let mut collaborators = project
1454 .find_related(project_collaborator::Entity)
1455 .all(&*tx)
1456 .await?;
1457 let host_ix = collaborators
1458 .iter()
1459 .position(|collaborator| {
1460 collaborator.user_id == user_id && collaborator.is_host
1461 })
1462 .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1463 let host = collaborators.swap_remove(host_ix);
1464 let old_connection_id = host.connection();
1465
1466 project::Entity::update(project::ActiveModel {
1467 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1468 host_connection_server_id: ActiveValue::set(Some(ServerId(
1469 connection.owner_id as i32,
1470 ))),
1471 ..project.into_active_model()
1472 })
1473 .exec(&*tx)
1474 .await?;
1475 project_collaborator::Entity::update(project_collaborator::ActiveModel {
1476 connection_id: ActiveValue::set(connection.id as i32),
1477 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1478 ..host.into_active_model()
1479 })
1480 .exec(&*tx)
1481 .await?;
1482
1483 self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1484 .await?;
1485
1486 reshared_projects.push(ResharedProject {
1487 id: project_id,
1488 old_connection_id,
1489 collaborators: collaborators
1490 .iter()
1491 .map(|collaborator| ProjectCollaborator {
1492 connection_id: collaborator.connection(),
1493 user_id: collaborator.user_id,
1494 replica_id: collaborator.replica_id,
1495 is_host: collaborator.is_host,
1496 })
1497 .collect(),
1498 worktrees: reshared_project.worktrees.clone(),
1499 });
1500 }
1501
1502 project::Entity::delete_many()
1503 .filter(
1504 Condition::all()
1505 .add(project::Column::RoomId.eq(room_id))
1506 .add(project::Column::HostUserId.eq(user_id))
1507 .add(
1508 project::Column::Id
1509 .is_not_in(reshared_projects.iter().map(|project| project.id)),
1510 ),
1511 )
1512 .exec(&*tx)
1513 .await?;
1514
1515 let mut rejoined_projects = Vec::new();
1516 for rejoined_project in &rejoin_room.rejoined_projects {
1517 let project_id = ProjectId::from_proto(rejoined_project.id);
1518 let Some(project) = project::Entity::find_by_id(project_id)
1519 .one(&*tx)
1520 .await? else { continue };
1521
1522 let mut worktrees = Vec::new();
1523 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1524 for db_worktree in db_worktrees {
1525 let mut worktree = RejoinedWorktree {
1526 id: db_worktree.id as u64,
1527 abs_path: db_worktree.abs_path,
1528 root_name: db_worktree.root_name,
1529 visible: db_worktree.visible,
1530 updated_entries: Default::default(),
1531 removed_entries: Default::default(),
1532 updated_repositories: Default::default(),
1533 removed_repositories: Default::default(),
1534 diagnostic_summaries: Default::default(),
1535 settings_files: Default::default(),
1536 scan_id: db_worktree.scan_id as u64,
1537 completed_scan_id: db_worktree.completed_scan_id as u64,
1538 };
1539
1540 let rejoined_worktree = rejoined_project
1541 .worktrees
1542 .iter()
1543 .find(|worktree| worktree.id == db_worktree.id as u64);
1544
1545 // File entries
1546 {
1547 let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1548 worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1549 } else {
1550 worktree_entry::Column::IsDeleted.eq(false)
1551 };
1552
1553 let mut db_entries = worktree_entry::Entity::find()
1554 .filter(
1555 Condition::all()
1556 .add(worktree_entry::Column::ProjectId.eq(project.id))
1557 .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1558 .add(entry_filter),
1559 )
1560 .stream(&*tx)
1561 .await?;
1562
1563 while let Some(db_entry) = db_entries.next().await {
1564 let db_entry = db_entry?;
1565 if db_entry.is_deleted {
1566 worktree.removed_entries.push(db_entry.id as u64);
1567 } else {
1568 worktree.updated_entries.push(proto::Entry {
1569 id: db_entry.id as u64,
1570 is_dir: db_entry.is_dir,
1571 path: db_entry.path,
1572 inode: db_entry.inode as u64,
1573 mtime: Some(proto::Timestamp {
1574 seconds: db_entry.mtime_seconds as u64,
1575 nanos: db_entry.mtime_nanos as u32,
1576 }),
1577 is_symlink: db_entry.is_symlink,
1578 is_ignored: db_entry.is_ignored,
1579 is_external: db_entry.is_external,
1580 git_status: db_entry.git_status.map(|status| status as i32),
1581 });
1582 }
1583 }
1584 }
1585
1586 // Repository Entries
1587 {
1588 let repository_entry_filter =
1589 if let Some(rejoined_worktree) = rejoined_worktree {
1590 worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
1591 } else {
1592 worktree_repository::Column::IsDeleted.eq(false)
1593 };
1594
1595 let mut db_repositories = worktree_repository::Entity::find()
1596 .filter(
1597 Condition::all()
1598 .add(worktree_repository::Column::ProjectId.eq(project.id))
1599 .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
1600 .add(repository_entry_filter),
1601 )
1602 .stream(&*tx)
1603 .await?;
1604
1605 while let Some(db_repository) = db_repositories.next().await {
1606 let db_repository = db_repository?;
1607 if db_repository.is_deleted {
1608 worktree
1609 .removed_repositories
1610 .push(db_repository.work_directory_id as u64);
1611 } else {
1612 worktree.updated_repositories.push(proto::RepositoryEntry {
1613 work_directory_id: db_repository.work_directory_id as u64,
1614 branch: db_repository.branch,
1615 });
1616 }
1617 }
1618 }
1619
1620 worktrees.push(worktree);
1621 }
1622
1623 let language_servers = project
1624 .find_related(language_server::Entity)
1625 .all(&*tx)
1626 .await?
1627 .into_iter()
1628 .map(|language_server| proto::LanguageServer {
1629 id: language_server.id as u64,
1630 name: language_server.name,
1631 })
1632 .collect::<Vec<_>>();
1633
1634 {
1635 let mut db_settings_files = worktree_settings_file::Entity::find()
1636 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
1637 .stream(&*tx)
1638 .await?;
1639 while let Some(db_settings_file) = db_settings_files.next().await {
1640 let db_settings_file = db_settings_file?;
1641 if let Some(worktree) = worktrees
1642 .iter_mut()
1643 .find(|w| w.id == db_settings_file.worktree_id as u64)
1644 {
1645 worktree.settings_files.push(WorktreeSettingsFile {
1646 path: db_settings_file.path,
1647 content: db_settings_file.content,
1648 });
1649 }
1650 }
1651 }
1652
1653 let mut collaborators = project
1654 .find_related(project_collaborator::Entity)
1655 .all(&*tx)
1656 .await?;
1657 let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1658 .iter()
1659 .position(|collaborator| collaborator.user_id == user_id)
1660 {
1661 collaborators.swap_remove(self_collaborator_ix)
1662 } else {
1663 continue;
1664 };
1665 let old_connection_id = self_collaborator.connection();
1666 project_collaborator::Entity::update(project_collaborator::ActiveModel {
1667 connection_id: ActiveValue::set(connection.id as i32),
1668 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1669 ..self_collaborator.into_active_model()
1670 })
1671 .exec(&*tx)
1672 .await?;
1673
1674 let collaborators = collaborators
1675 .into_iter()
1676 .map(|collaborator| ProjectCollaborator {
1677 connection_id: collaborator.connection(),
1678 user_id: collaborator.user_id,
1679 replica_id: collaborator.replica_id,
1680 is_host: collaborator.is_host,
1681 })
1682 .collect::<Vec<_>>();
1683
1684 rejoined_projects.push(RejoinedProject {
1685 id: project_id,
1686 old_connection_id,
1687 collaborators,
1688 worktrees,
1689 language_servers,
1690 });
1691 }
1692
1693 let room = self.get_room(room_id, &tx).await?;
1694 Ok(RejoinedRoom {
1695 room,
1696 rejoined_projects,
1697 reshared_projects,
1698 })
1699 })
1700 .await
1701 }
1702
1703 pub async fn leave_room(
1704 &self,
1705 connection: ConnectionId,
1706 ) -> Result<Option<RoomGuard<LeftRoom>>> {
1707 self.optional_room_transaction(|tx| async move {
1708 let leaving_participant = room_participant::Entity::find()
1709 .filter(
1710 Condition::all()
1711 .add(
1712 room_participant::Column::AnsweringConnectionId
1713 .eq(connection.id as i32),
1714 )
1715 .add(
1716 room_participant::Column::AnsweringConnectionServerId
1717 .eq(connection.owner_id as i32),
1718 ),
1719 )
1720 .one(&*tx)
1721 .await?;
1722
1723 if let Some(leaving_participant) = leaving_participant {
1724 // Leave room.
1725 let room_id = leaving_participant.room_id;
1726 room_participant::Entity::delete_by_id(leaving_participant.id)
1727 .exec(&*tx)
1728 .await?;
1729
1730 // Cancel pending calls initiated by the leaving user.
1731 let called_participants = room_participant::Entity::find()
1732 .filter(
1733 Condition::all()
1734 .add(
1735 room_participant::Column::CallingUserId
1736 .eq(leaving_participant.user_id),
1737 )
1738 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1739 )
1740 .all(&*tx)
1741 .await?;
1742 room_participant::Entity::delete_many()
1743 .filter(
1744 room_participant::Column::Id
1745 .is_in(called_participants.iter().map(|participant| participant.id)),
1746 )
1747 .exec(&*tx)
1748 .await?;
1749 let canceled_calls_to_user_ids = called_participants
1750 .into_iter()
1751 .map(|participant| participant.user_id)
1752 .collect();
1753
1754 // Detect left projects.
1755 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1756 enum QueryProjectIds {
1757 ProjectId,
1758 }
1759 let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1760 .select_only()
1761 .column_as(
1762 project_collaborator::Column::ProjectId,
1763 QueryProjectIds::ProjectId,
1764 )
1765 .filter(
1766 Condition::all()
1767 .add(
1768 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1769 )
1770 .add(
1771 project_collaborator::Column::ConnectionServerId
1772 .eq(connection.owner_id as i32),
1773 ),
1774 )
1775 .into_values::<_, QueryProjectIds>()
1776 .all(&*tx)
1777 .await?;
1778 let mut left_projects = HashMap::default();
1779 let mut collaborators = project_collaborator::Entity::find()
1780 .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1781 .stream(&*tx)
1782 .await?;
1783 while let Some(collaborator) = collaborators.next().await {
1784 let collaborator = collaborator?;
1785 let left_project =
1786 left_projects
1787 .entry(collaborator.project_id)
1788 .or_insert(LeftProject {
1789 id: collaborator.project_id,
1790 host_user_id: Default::default(),
1791 connection_ids: Default::default(),
1792 host_connection_id: Default::default(),
1793 });
1794
1795 let collaborator_connection_id = collaborator.connection();
1796 if collaborator_connection_id != connection {
1797 left_project.connection_ids.push(collaborator_connection_id);
1798 }
1799
1800 if collaborator.is_host {
1801 left_project.host_user_id = collaborator.user_id;
1802 left_project.host_connection_id = collaborator_connection_id;
1803 }
1804 }
1805 drop(collaborators);
1806
1807 // Leave projects.
1808 project_collaborator::Entity::delete_many()
1809 .filter(
1810 Condition::all()
1811 .add(
1812 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1813 )
1814 .add(
1815 project_collaborator::Column::ConnectionServerId
1816 .eq(connection.owner_id as i32),
1817 ),
1818 )
1819 .exec(&*tx)
1820 .await?;
1821
1822 // Unshare projects.
1823 project::Entity::delete_many()
1824 .filter(
1825 Condition::all()
1826 .add(project::Column::RoomId.eq(room_id))
1827 .add(project::Column::HostConnectionId.eq(connection.id as i32))
1828 .add(
1829 project::Column::HostConnectionServerId
1830 .eq(connection.owner_id as i32),
1831 ),
1832 )
1833 .exec(&*tx)
1834 .await?;
1835
1836 let room = self.get_room(room_id, &tx).await?;
1837 let deleted = if room.participants.is_empty() {
1838 let result = room::Entity::delete_by_id(room_id)
1839 .filter(room::Column::ChannelId.is_null())
1840 .exec(&*tx)
1841 .await?;
1842 result.rows_affected > 0
1843 } else {
1844 false
1845 };
1846
1847 let left_room = LeftRoom {
1848 room,
1849 left_projects,
1850 canceled_calls_to_user_ids,
1851 deleted,
1852 };
1853
1854 if left_room.room.participants.is_empty() {
1855 self.rooms.remove(&room_id);
1856 }
1857
1858 Ok(Some((room_id, left_room)))
1859 } else {
1860 Ok(None)
1861 }
1862 })
1863 .await
1864 }
1865
1866 pub async fn follow(
1867 &self,
1868 project_id: ProjectId,
1869 leader_connection: ConnectionId,
1870 follower_connection: ConnectionId,
1871 ) -> Result<RoomGuard<proto::Room>> {
1872 let room_id = self.room_id_for_project(project_id).await?;
1873 self.room_transaction(room_id, |tx| async move {
1874 follower::ActiveModel {
1875 room_id: ActiveValue::set(room_id),
1876 project_id: ActiveValue::set(project_id),
1877 leader_connection_server_id: ActiveValue::set(ServerId(
1878 leader_connection.owner_id as i32,
1879 )),
1880 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1881 follower_connection_server_id: ActiveValue::set(ServerId(
1882 follower_connection.owner_id as i32,
1883 )),
1884 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1885 ..Default::default()
1886 }
1887 .insert(&*tx)
1888 .await?;
1889
1890 let room = self.get_room(room_id, &*tx).await?;
1891 Ok(room)
1892 })
1893 .await
1894 }
1895
1896 pub async fn unfollow(
1897 &self,
1898 project_id: ProjectId,
1899 leader_connection: ConnectionId,
1900 follower_connection: ConnectionId,
1901 ) -> Result<RoomGuard<proto::Room>> {
1902 let room_id = self.room_id_for_project(project_id).await?;
1903 self.room_transaction(room_id, |tx| async move {
1904 follower::Entity::delete_many()
1905 .filter(
1906 Condition::all()
1907 .add(follower::Column::ProjectId.eq(project_id))
1908 .add(
1909 follower::Column::LeaderConnectionServerId
1910 .eq(leader_connection.owner_id),
1911 )
1912 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1913 .add(
1914 follower::Column::FollowerConnectionServerId
1915 .eq(follower_connection.owner_id),
1916 )
1917 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1918 )
1919 .exec(&*tx)
1920 .await?;
1921
1922 let room = self.get_room(room_id, &*tx).await?;
1923 Ok(room)
1924 })
1925 .await
1926 }
1927
1928 pub async fn update_room_participant_location(
1929 &self,
1930 room_id: RoomId,
1931 connection: ConnectionId,
1932 location: proto::ParticipantLocation,
1933 ) -> Result<RoomGuard<proto::Room>> {
1934 self.room_transaction(room_id, |tx| async {
1935 let tx = tx;
1936 let location_kind;
1937 let location_project_id;
1938 match location
1939 .variant
1940 .as_ref()
1941 .ok_or_else(|| anyhow!("invalid location"))?
1942 {
1943 proto::participant_location::Variant::SharedProject(project) => {
1944 location_kind = 0;
1945 location_project_id = Some(ProjectId::from_proto(project.id));
1946 }
1947 proto::participant_location::Variant::UnsharedProject(_) => {
1948 location_kind = 1;
1949 location_project_id = None;
1950 }
1951 proto::participant_location::Variant::External(_) => {
1952 location_kind = 2;
1953 location_project_id = None;
1954 }
1955 }
1956
1957 let result = room_participant::Entity::update_many()
1958 .filter(
1959 Condition::all()
1960 .add(room_participant::Column::RoomId.eq(room_id))
1961 .add(
1962 room_participant::Column::AnsweringConnectionId
1963 .eq(connection.id as i32),
1964 )
1965 .add(
1966 room_participant::Column::AnsweringConnectionServerId
1967 .eq(connection.owner_id as i32),
1968 ),
1969 )
1970 .set(room_participant::ActiveModel {
1971 location_kind: ActiveValue::set(Some(location_kind)),
1972 location_project_id: ActiveValue::set(location_project_id),
1973 ..Default::default()
1974 })
1975 .exec(&*tx)
1976 .await?;
1977
1978 if result.rows_affected == 1 {
1979 let room = self.get_room(room_id, &tx).await?;
1980 Ok(room)
1981 } else {
1982 Err(anyhow!("could not update room participant location"))?
1983 }
1984 })
1985 .await
1986 }
1987
1988 pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1989 self.transaction(|tx| async move {
1990 let participant = room_participant::Entity::find()
1991 .filter(
1992 Condition::all()
1993 .add(
1994 room_participant::Column::AnsweringConnectionId
1995 .eq(connection.id as i32),
1996 )
1997 .add(
1998 room_participant::Column::AnsweringConnectionServerId
1999 .eq(connection.owner_id as i32),
2000 ),
2001 )
2002 .one(&*tx)
2003 .await?
2004 .ok_or_else(|| anyhow!("not a participant in any room"))?;
2005
2006 room_participant::Entity::update(room_participant::ActiveModel {
2007 answering_connection_lost: ActiveValue::set(true),
2008 ..participant.into_active_model()
2009 })
2010 .exec(&*tx)
2011 .await?;
2012
2013 Ok(())
2014 })
2015 .await
2016 }
2017
2018 fn build_incoming_call(
2019 room: &proto::Room,
2020 called_user_id: UserId,
2021 ) -> Option<proto::IncomingCall> {
2022 let pending_participant = room
2023 .pending_participants
2024 .iter()
2025 .find(|participant| participant.user_id == called_user_id.to_proto())?;
2026
2027 Some(proto::IncomingCall {
2028 room_id: room.id,
2029 calling_user_id: pending_participant.calling_user_id,
2030 participant_user_ids: room
2031 .participants
2032 .iter()
2033 .map(|participant| participant.user_id)
2034 .collect(),
2035 initial_project: room.participants.iter().find_map(|participant| {
2036 let initial_project_id = pending_participant.initial_project_id?;
2037 participant
2038 .projects
2039 .iter()
2040 .find(|project| project.id == initial_project_id)
2041 .cloned()
2042 }),
2043 })
2044 }
2045
2046 async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
2047 let db_room = room::Entity::find_by_id(room_id)
2048 .one(tx)
2049 .await?
2050 .ok_or_else(|| anyhow!("could not find room"))?;
2051
2052 let mut db_participants = db_room
2053 .find_related(room_participant::Entity)
2054 .stream(tx)
2055 .await?;
2056 let mut participants = HashMap::default();
2057 let mut pending_participants = Vec::new();
2058 while let Some(db_participant) = db_participants.next().await {
2059 let db_participant = db_participant?;
2060 if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
2061 .answering_connection_id
2062 .zip(db_participant.answering_connection_server_id)
2063 {
2064 let location = match (
2065 db_participant.location_kind,
2066 db_participant.location_project_id,
2067 ) {
2068 (Some(0), Some(project_id)) => {
2069 Some(proto::participant_location::Variant::SharedProject(
2070 proto::participant_location::SharedProject {
2071 id: project_id.to_proto(),
2072 },
2073 ))
2074 }
2075 (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
2076 Default::default(),
2077 )),
2078 _ => Some(proto::participant_location::Variant::External(
2079 Default::default(),
2080 )),
2081 };
2082
2083 let answering_connection = ConnectionId {
2084 owner_id: answering_connection_server_id.0 as u32,
2085 id: answering_connection_id as u32,
2086 };
2087 participants.insert(
2088 answering_connection,
2089 proto::Participant {
2090 user_id: db_participant.user_id.to_proto(),
2091 peer_id: Some(answering_connection.into()),
2092 projects: Default::default(),
2093 location: Some(proto::ParticipantLocation { variant: location }),
2094 },
2095 );
2096 } else {
2097 pending_participants.push(proto::PendingParticipant {
2098 user_id: db_participant.user_id.to_proto(),
2099 calling_user_id: db_participant.calling_user_id.to_proto(),
2100 initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
2101 });
2102 }
2103 }
2104 drop(db_participants);
2105
2106 let mut db_projects = db_room
2107 .find_related(project::Entity)
2108 .find_with_related(worktree::Entity)
2109 .stream(tx)
2110 .await?;
2111
2112 while let Some(row) = db_projects.next().await {
2113 let (db_project, db_worktree) = row?;
2114 let host_connection = db_project.host_connection()?;
2115 if let Some(participant) = participants.get_mut(&host_connection) {
2116 let project = if let Some(project) = participant
2117 .projects
2118 .iter_mut()
2119 .find(|project| project.id == db_project.id.to_proto())
2120 {
2121 project
2122 } else {
2123 participant.projects.push(proto::ParticipantProject {
2124 id: db_project.id.to_proto(),
2125 worktree_root_names: Default::default(),
2126 });
2127 participant.projects.last_mut().unwrap()
2128 };
2129
2130 if let Some(db_worktree) = db_worktree {
2131 if db_worktree.visible {
2132 project.worktree_root_names.push(db_worktree.root_name);
2133 }
2134 }
2135 }
2136 }
2137 drop(db_projects);
2138
2139 let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
2140 let mut followers = Vec::new();
2141 while let Some(db_follower) = db_followers.next().await {
2142 let db_follower = db_follower?;
2143 followers.push(proto::Follower {
2144 leader_id: Some(db_follower.leader_connection().into()),
2145 follower_id: Some(db_follower.follower_connection().into()),
2146 project_id: db_follower.project_id.to_proto(),
2147 });
2148 }
2149
2150 Ok(proto::Room {
2151 id: db_room.id.to_proto(),
2152 live_kit_room: db_room.live_kit_room,
2153 participants: participants.into_values().collect(),
2154 pending_participants,
2155 followers,
2156 })
2157 }
2158
2159 // projects
2160
2161 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
2162 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2163 enum QueryAs {
2164 Count,
2165 }
2166
2167 self.transaction(|tx| async move {
2168 Ok(project::Entity::find()
2169 .select_only()
2170 .column_as(project::Column::Id.count(), QueryAs::Count)
2171 .inner_join(user::Entity)
2172 .filter(user::Column::Admin.eq(false))
2173 .into_values::<_, QueryAs>()
2174 .one(&*tx)
2175 .await?
2176 .unwrap_or(0i64) as usize)
2177 })
2178 .await
2179 }
2180
2181 pub async fn share_project(
2182 &self,
2183 room_id: RoomId,
2184 connection: ConnectionId,
2185 worktrees: &[proto::WorktreeMetadata],
2186 ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
2187 self.room_transaction(room_id, |tx| async move {
2188 let participant = room_participant::Entity::find()
2189 .filter(
2190 Condition::all()
2191 .add(
2192 room_participant::Column::AnsweringConnectionId
2193 .eq(connection.id as i32),
2194 )
2195 .add(
2196 room_participant::Column::AnsweringConnectionServerId
2197 .eq(connection.owner_id as i32),
2198 ),
2199 )
2200 .one(&*tx)
2201 .await?
2202 .ok_or_else(|| anyhow!("could not find participant"))?;
2203 if participant.room_id != room_id {
2204 return Err(anyhow!("shared project on unexpected room"))?;
2205 }
2206
2207 let project = project::ActiveModel {
2208 room_id: ActiveValue::set(participant.room_id),
2209 host_user_id: ActiveValue::set(participant.user_id),
2210 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
2211 host_connection_server_id: ActiveValue::set(Some(ServerId(
2212 connection.owner_id as i32,
2213 ))),
2214 ..Default::default()
2215 }
2216 .insert(&*tx)
2217 .await?;
2218
2219 if !worktrees.is_empty() {
2220 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2221 worktree::ActiveModel {
2222 id: ActiveValue::set(worktree.id as i64),
2223 project_id: ActiveValue::set(project.id),
2224 abs_path: ActiveValue::set(worktree.abs_path.clone()),
2225 root_name: ActiveValue::set(worktree.root_name.clone()),
2226 visible: ActiveValue::set(worktree.visible),
2227 scan_id: ActiveValue::set(0),
2228 completed_scan_id: ActiveValue::set(0),
2229 }
2230 }))
2231 .exec(&*tx)
2232 .await?;
2233 }
2234
2235 project_collaborator::ActiveModel {
2236 project_id: ActiveValue::set(project.id),
2237 connection_id: ActiveValue::set(connection.id as i32),
2238 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2239 user_id: ActiveValue::set(participant.user_id),
2240 replica_id: ActiveValue::set(ReplicaId(0)),
2241 is_host: ActiveValue::set(true),
2242 ..Default::default()
2243 }
2244 .insert(&*tx)
2245 .await?;
2246
2247 let room = self.get_room(room_id, &tx).await?;
2248 Ok((project.id, room))
2249 })
2250 .await
2251 }
2252
2253 pub async fn unshare_project(
2254 &self,
2255 project_id: ProjectId,
2256 connection: ConnectionId,
2257 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2258 let room_id = self.room_id_for_project(project_id).await?;
2259 self.room_transaction(room_id, |tx| async move {
2260 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2261
2262 let project = project::Entity::find_by_id(project_id)
2263 .one(&*tx)
2264 .await?
2265 .ok_or_else(|| anyhow!("project not found"))?;
2266 if project.host_connection()? == connection {
2267 project::Entity::delete(project.into_active_model())
2268 .exec(&*tx)
2269 .await?;
2270 let room = self.get_room(room_id, &tx).await?;
2271 Ok((room, guest_connection_ids))
2272 } else {
2273 Err(anyhow!("cannot unshare a project hosted by another user"))?
2274 }
2275 })
2276 .await
2277 }
2278
2279 pub async fn update_project(
2280 &self,
2281 project_id: ProjectId,
2282 connection: ConnectionId,
2283 worktrees: &[proto::WorktreeMetadata],
2284 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2285 let room_id = self.room_id_for_project(project_id).await?;
2286 self.room_transaction(room_id, |tx| async move {
2287 let project = project::Entity::find_by_id(project_id)
2288 .filter(
2289 Condition::all()
2290 .add(project::Column::HostConnectionId.eq(connection.id as i32))
2291 .add(
2292 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2293 ),
2294 )
2295 .one(&*tx)
2296 .await?
2297 .ok_or_else(|| anyhow!("no such project"))?;
2298
2299 self.update_project_worktrees(project.id, worktrees, &tx)
2300 .await?;
2301
2302 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2303 let room = self.get_room(project.room_id, &tx).await?;
2304 Ok((room, guest_connection_ids))
2305 })
2306 .await
2307 }
2308
2309 async fn update_project_worktrees(
2310 &self,
2311 project_id: ProjectId,
2312 worktrees: &[proto::WorktreeMetadata],
2313 tx: &DatabaseTransaction,
2314 ) -> Result<()> {
2315 if !worktrees.is_empty() {
2316 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2317 id: ActiveValue::set(worktree.id as i64),
2318 project_id: ActiveValue::set(project_id),
2319 abs_path: ActiveValue::set(worktree.abs_path.clone()),
2320 root_name: ActiveValue::set(worktree.root_name.clone()),
2321 visible: ActiveValue::set(worktree.visible),
2322 scan_id: ActiveValue::set(0),
2323 completed_scan_id: ActiveValue::set(0),
2324 }))
2325 .on_conflict(
2326 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2327 .update_column(worktree::Column::RootName)
2328 .to_owned(),
2329 )
2330 .exec(&*tx)
2331 .await?;
2332 }
2333
2334 worktree::Entity::delete_many()
2335 .filter(worktree::Column::ProjectId.eq(project_id).and(
2336 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2337 ))
2338 .exec(&*tx)
2339 .await?;
2340
2341 Ok(())
2342 }
2343
2344 pub async fn update_worktree(
2345 &self,
2346 update: &proto::UpdateWorktree,
2347 connection: ConnectionId,
2348 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2349 let project_id = ProjectId::from_proto(update.project_id);
2350 let worktree_id = update.worktree_id as i64;
2351 let room_id = self.room_id_for_project(project_id).await?;
2352 self.room_transaction(room_id, |tx| async move {
2353 // Ensure the update comes from the host.
2354 let _project = project::Entity::find_by_id(project_id)
2355 .filter(
2356 Condition::all()
2357 .add(project::Column::HostConnectionId.eq(connection.id as i32))
2358 .add(
2359 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2360 ),
2361 )
2362 .one(&*tx)
2363 .await?
2364 .ok_or_else(|| anyhow!("no such project"))?;
2365
2366 // Update metadata.
2367 worktree::Entity::update(worktree::ActiveModel {
2368 id: ActiveValue::set(worktree_id),
2369 project_id: ActiveValue::set(project_id),
2370 root_name: ActiveValue::set(update.root_name.clone()),
2371 scan_id: ActiveValue::set(update.scan_id as i64),
2372 completed_scan_id: if update.is_last_update {
2373 ActiveValue::set(update.scan_id as i64)
2374 } else {
2375 ActiveValue::default()
2376 },
2377 abs_path: ActiveValue::set(update.abs_path.clone()),
2378 ..Default::default()
2379 })
2380 .exec(&*tx)
2381 .await?;
2382
2383 if !update.updated_entries.is_empty() {
2384 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2385 let mtime = entry.mtime.clone().unwrap_or_default();
2386 worktree_entry::ActiveModel {
2387 project_id: ActiveValue::set(project_id),
2388 worktree_id: ActiveValue::set(worktree_id),
2389 id: ActiveValue::set(entry.id as i64),
2390 is_dir: ActiveValue::set(entry.is_dir),
2391 path: ActiveValue::set(entry.path.clone()),
2392 inode: ActiveValue::set(entry.inode as i64),
2393 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2394 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2395 is_symlink: ActiveValue::set(entry.is_symlink),
2396 is_ignored: ActiveValue::set(entry.is_ignored),
2397 is_external: ActiveValue::set(entry.is_external),
2398 git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
2399 is_deleted: ActiveValue::set(false),
2400 scan_id: ActiveValue::set(update.scan_id as i64),
2401 }
2402 }))
2403 .on_conflict(
2404 OnConflict::columns([
2405 worktree_entry::Column::ProjectId,
2406 worktree_entry::Column::WorktreeId,
2407 worktree_entry::Column::Id,
2408 ])
2409 .update_columns([
2410 worktree_entry::Column::IsDir,
2411 worktree_entry::Column::Path,
2412 worktree_entry::Column::Inode,
2413 worktree_entry::Column::MtimeSeconds,
2414 worktree_entry::Column::MtimeNanos,
2415 worktree_entry::Column::IsSymlink,
2416 worktree_entry::Column::IsIgnored,
2417 worktree_entry::Column::GitStatus,
2418 worktree_entry::Column::ScanId,
2419 ])
2420 .to_owned(),
2421 )
2422 .exec(&*tx)
2423 .await?;
2424 }
2425
2426 if !update.removed_entries.is_empty() {
2427 worktree_entry::Entity::update_many()
2428 .filter(
2429 worktree_entry::Column::ProjectId
2430 .eq(project_id)
2431 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2432 .and(
2433 worktree_entry::Column::Id
2434 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2435 ),
2436 )
2437 .set(worktree_entry::ActiveModel {
2438 is_deleted: ActiveValue::Set(true),
2439 scan_id: ActiveValue::Set(update.scan_id as i64),
2440 ..Default::default()
2441 })
2442 .exec(&*tx)
2443 .await?;
2444 }
2445
2446 if !update.updated_repositories.is_empty() {
2447 worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
2448 |repository| worktree_repository::ActiveModel {
2449 project_id: ActiveValue::set(project_id),
2450 worktree_id: ActiveValue::set(worktree_id),
2451 work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
2452 scan_id: ActiveValue::set(update.scan_id as i64),
2453 branch: ActiveValue::set(repository.branch.clone()),
2454 is_deleted: ActiveValue::set(false),
2455 },
2456 ))
2457 .on_conflict(
2458 OnConflict::columns([
2459 worktree_repository::Column::ProjectId,
2460 worktree_repository::Column::WorktreeId,
2461 worktree_repository::Column::WorkDirectoryId,
2462 ])
2463 .update_columns([
2464 worktree_repository::Column::ScanId,
2465 worktree_repository::Column::Branch,
2466 ])
2467 .to_owned(),
2468 )
2469 .exec(&*tx)
2470 .await?;
2471 }
2472
2473 if !update.removed_repositories.is_empty() {
2474 worktree_repository::Entity::update_many()
2475 .filter(
2476 worktree_repository::Column::ProjectId
2477 .eq(project_id)
2478 .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
2479 .and(
2480 worktree_repository::Column::WorkDirectoryId
2481 .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
2482 ),
2483 )
2484 .set(worktree_repository::ActiveModel {
2485 is_deleted: ActiveValue::Set(true),
2486 scan_id: ActiveValue::Set(update.scan_id as i64),
2487 ..Default::default()
2488 })
2489 .exec(&*tx)
2490 .await?;
2491 }
2492
2493 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2494 Ok(connection_ids)
2495 })
2496 .await
2497 }
2498
2499 pub async fn update_diagnostic_summary(
2500 &self,
2501 update: &proto::UpdateDiagnosticSummary,
2502 connection: ConnectionId,
2503 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2504 let project_id = ProjectId::from_proto(update.project_id);
2505 let worktree_id = update.worktree_id as i64;
2506 let room_id = self.room_id_for_project(project_id).await?;
2507 self.room_transaction(room_id, |tx| async move {
2508 let summary = update
2509 .summary
2510 .as_ref()
2511 .ok_or_else(|| anyhow!("invalid summary"))?;
2512
2513 // Ensure the update comes from the host.
2514 let project = project::Entity::find_by_id(project_id)
2515 .one(&*tx)
2516 .await?
2517 .ok_or_else(|| anyhow!("no such project"))?;
2518 if project.host_connection()? != connection {
2519 return Err(anyhow!("can't update a project hosted by someone else"))?;
2520 }
2521
2522 // Update summary.
2523 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2524 project_id: ActiveValue::set(project_id),
2525 worktree_id: ActiveValue::set(worktree_id),
2526 path: ActiveValue::set(summary.path.clone()),
2527 language_server_id: ActiveValue::set(summary.language_server_id as i64),
2528 error_count: ActiveValue::set(summary.error_count as i32),
2529 warning_count: ActiveValue::set(summary.warning_count as i32),
2530 ..Default::default()
2531 })
2532 .on_conflict(
2533 OnConflict::columns([
2534 worktree_diagnostic_summary::Column::ProjectId,
2535 worktree_diagnostic_summary::Column::WorktreeId,
2536 worktree_diagnostic_summary::Column::Path,
2537 ])
2538 .update_columns([
2539 worktree_diagnostic_summary::Column::LanguageServerId,
2540 worktree_diagnostic_summary::Column::ErrorCount,
2541 worktree_diagnostic_summary::Column::WarningCount,
2542 ])
2543 .to_owned(),
2544 )
2545 .exec(&*tx)
2546 .await?;
2547
2548 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2549 Ok(connection_ids)
2550 })
2551 .await
2552 }
2553
2554 pub async fn start_language_server(
2555 &self,
2556 update: &proto::StartLanguageServer,
2557 connection: ConnectionId,
2558 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2559 let project_id = ProjectId::from_proto(update.project_id);
2560 let room_id = self.room_id_for_project(project_id).await?;
2561 self.room_transaction(room_id, |tx| async move {
2562 let server = update
2563 .server
2564 .as_ref()
2565 .ok_or_else(|| anyhow!("invalid language server"))?;
2566
2567 // Ensure the update comes from the host.
2568 let project = project::Entity::find_by_id(project_id)
2569 .one(&*tx)
2570 .await?
2571 .ok_or_else(|| anyhow!("no such project"))?;
2572 if project.host_connection()? != connection {
2573 return Err(anyhow!("can't update a project hosted by someone else"))?;
2574 }
2575
2576 // Add the newly-started language server.
2577 language_server::Entity::insert(language_server::ActiveModel {
2578 project_id: ActiveValue::set(project_id),
2579 id: ActiveValue::set(server.id as i64),
2580 name: ActiveValue::set(server.name.clone()),
2581 ..Default::default()
2582 })
2583 .on_conflict(
2584 OnConflict::columns([
2585 language_server::Column::ProjectId,
2586 language_server::Column::Id,
2587 ])
2588 .update_column(language_server::Column::Name)
2589 .to_owned(),
2590 )
2591 .exec(&*tx)
2592 .await?;
2593
2594 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2595 Ok(connection_ids)
2596 })
2597 .await
2598 }
2599
2600 pub async fn update_worktree_settings(
2601 &self,
2602 update: &proto::UpdateWorktreeSettings,
2603 connection: ConnectionId,
2604 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2605 let project_id = ProjectId::from_proto(update.project_id);
2606 let room_id = self.room_id_for_project(project_id).await?;
2607 self.room_transaction(room_id, |tx| async move {
2608 // Ensure the update comes from the host.
2609 let project = project::Entity::find_by_id(project_id)
2610 .one(&*tx)
2611 .await?
2612 .ok_or_else(|| anyhow!("no such project"))?;
2613 if project.host_connection()? != connection {
2614 return Err(anyhow!("can't update a project hosted by someone else"))?;
2615 }
2616
2617 if let Some(content) = &update.content {
2618 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
2619 project_id: ActiveValue::Set(project_id),
2620 worktree_id: ActiveValue::Set(update.worktree_id as i64),
2621 path: ActiveValue::Set(update.path.clone()),
2622 content: ActiveValue::Set(content.clone()),
2623 })
2624 .on_conflict(
2625 OnConflict::columns([
2626 worktree_settings_file::Column::ProjectId,
2627 worktree_settings_file::Column::WorktreeId,
2628 worktree_settings_file::Column::Path,
2629 ])
2630 .update_column(worktree_settings_file::Column::Content)
2631 .to_owned(),
2632 )
2633 .exec(&*tx)
2634 .await?;
2635 } else {
2636 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
2637 project_id: ActiveValue::Set(project_id),
2638 worktree_id: ActiveValue::Set(update.worktree_id as i64),
2639 path: ActiveValue::Set(update.path.clone()),
2640 ..Default::default()
2641 })
2642 .exec(&*tx)
2643 .await?;
2644 }
2645
2646 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2647 Ok(connection_ids)
2648 })
2649 .await
2650 }
2651
2652 pub async fn join_project(
2653 &self,
2654 project_id: ProjectId,
2655 connection: ConnectionId,
2656 ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2657 let room_id = self.room_id_for_project(project_id).await?;
2658 self.room_transaction(room_id, |tx| async move {
2659 let participant = room_participant::Entity::find()
2660 .filter(
2661 Condition::all()
2662 .add(
2663 room_participant::Column::AnsweringConnectionId
2664 .eq(connection.id as i32),
2665 )
2666 .add(
2667 room_participant::Column::AnsweringConnectionServerId
2668 .eq(connection.owner_id as i32),
2669 ),
2670 )
2671 .one(&*tx)
2672 .await?
2673 .ok_or_else(|| anyhow!("must join a room first"))?;
2674
2675 let project = project::Entity::find_by_id(project_id)
2676 .one(&*tx)
2677 .await?
2678 .ok_or_else(|| anyhow!("no such project"))?;
2679 if project.room_id != participant.room_id {
2680 return Err(anyhow!("no such project"))?;
2681 }
2682
2683 let mut collaborators = project
2684 .find_related(project_collaborator::Entity)
2685 .all(&*tx)
2686 .await?;
2687 let replica_ids = collaborators
2688 .iter()
2689 .map(|c| c.replica_id)
2690 .collect::<HashSet<_>>();
2691 let mut replica_id = ReplicaId(1);
2692 while replica_ids.contains(&replica_id) {
2693 replica_id.0 += 1;
2694 }
2695 let new_collaborator = project_collaborator::ActiveModel {
2696 project_id: ActiveValue::set(project_id),
2697 connection_id: ActiveValue::set(connection.id as i32),
2698 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2699 user_id: ActiveValue::set(participant.user_id),
2700 replica_id: ActiveValue::set(replica_id),
2701 is_host: ActiveValue::set(false),
2702 ..Default::default()
2703 }
2704 .insert(&*tx)
2705 .await?;
2706 collaborators.push(new_collaborator);
2707
2708 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2709 let mut worktrees = db_worktrees
2710 .into_iter()
2711 .map(|db_worktree| {
2712 (
2713 db_worktree.id as u64,
2714 Worktree {
2715 id: db_worktree.id as u64,
2716 abs_path: db_worktree.abs_path,
2717 root_name: db_worktree.root_name,
2718 visible: db_worktree.visible,
2719 entries: Default::default(),
2720 repository_entries: Default::default(),
2721 diagnostic_summaries: Default::default(),
2722 settings_files: Default::default(),
2723 scan_id: db_worktree.scan_id as u64,
2724 completed_scan_id: db_worktree.completed_scan_id as u64,
2725 },
2726 )
2727 })
2728 .collect::<BTreeMap<_, _>>();
2729
2730 // Populate worktree entries.
2731 {
2732 let mut db_entries = worktree_entry::Entity::find()
2733 .filter(
2734 Condition::all()
2735 .add(worktree_entry::Column::ProjectId.eq(project_id))
2736 .add(worktree_entry::Column::IsDeleted.eq(false)),
2737 )
2738 .stream(&*tx)
2739 .await?;
2740 while let Some(db_entry) = db_entries.next().await {
2741 let db_entry = db_entry?;
2742 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2743 worktree.entries.push(proto::Entry {
2744 id: db_entry.id as u64,
2745 is_dir: db_entry.is_dir,
2746 path: db_entry.path,
2747 inode: db_entry.inode as u64,
2748 mtime: Some(proto::Timestamp {
2749 seconds: db_entry.mtime_seconds as u64,
2750 nanos: db_entry.mtime_nanos as u32,
2751 }),
2752 is_symlink: db_entry.is_symlink,
2753 is_ignored: db_entry.is_ignored,
2754 is_external: db_entry.is_external,
2755 git_status: db_entry.git_status.map(|status| status as i32),
2756 });
2757 }
2758 }
2759 }
2760
2761 // Populate repository entries.
2762 {
2763 let mut db_repository_entries = worktree_repository::Entity::find()
2764 .filter(
2765 Condition::all()
2766 .add(worktree_repository::Column::ProjectId.eq(project_id))
2767 .add(worktree_repository::Column::IsDeleted.eq(false)),
2768 )
2769 .stream(&*tx)
2770 .await?;
2771 while let Some(db_repository_entry) = db_repository_entries.next().await {
2772 let db_repository_entry = db_repository_entry?;
2773 if let Some(worktree) =
2774 worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
2775 {
2776 worktree.repository_entries.insert(
2777 db_repository_entry.work_directory_id as u64,
2778 proto::RepositoryEntry {
2779 work_directory_id: db_repository_entry.work_directory_id as u64,
2780 branch: db_repository_entry.branch,
2781 },
2782 );
2783 }
2784 }
2785 }
2786
2787 // Populate worktree diagnostic summaries.
2788 {
2789 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2790 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2791 .stream(&*tx)
2792 .await?;
2793 while let Some(db_summary) = db_summaries.next().await {
2794 let db_summary = db_summary?;
2795 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2796 worktree
2797 .diagnostic_summaries
2798 .push(proto::DiagnosticSummary {
2799 path: db_summary.path,
2800 language_server_id: db_summary.language_server_id as u64,
2801 error_count: db_summary.error_count as u32,
2802 warning_count: db_summary.warning_count as u32,
2803 });
2804 }
2805 }
2806 }
2807
2808 // Populate worktree settings files
2809 {
2810 let mut db_settings_files = worktree_settings_file::Entity::find()
2811 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
2812 .stream(&*tx)
2813 .await?;
2814 while let Some(db_settings_file) = db_settings_files.next().await {
2815 let db_settings_file = db_settings_file?;
2816 if let Some(worktree) =
2817 worktrees.get_mut(&(db_settings_file.worktree_id as u64))
2818 {
2819 worktree.settings_files.push(WorktreeSettingsFile {
2820 path: db_settings_file.path,
2821 content: db_settings_file.content,
2822 });
2823 }
2824 }
2825 }
2826
2827 // Populate language servers.
2828 let language_servers = project
2829 .find_related(language_server::Entity)
2830 .all(&*tx)
2831 .await?;
2832
2833 let project = Project {
2834 collaborators: collaborators
2835 .into_iter()
2836 .map(|collaborator| ProjectCollaborator {
2837 connection_id: collaborator.connection(),
2838 user_id: collaborator.user_id,
2839 replica_id: collaborator.replica_id,
2840 is_host: collaborator.is_host,
2841 })
2842 .collect(),
2843 worktrees,
2844 language_servers: language_servers
2845 .into_iter()
2846 .map(|language_server| proto::LanguageServer {
2847 id: language_server.id as u64,
2848 name: language_server.name,
2849 })
2850 .collect(),
2851 };
2852 Ok((project, replica_id as ReplicaId))
2853 })
2854 .await
2855 }
2856
2857 pub async fn leave_project(
2858 &self,
2859 project_id: ProjectId,
2860 connection: ConnectionId,
2861 ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
2862 let room_id = self.room_id_for_project(project_id).await?;
2863 self.room_transaction(room_id, |tx| async move {
2864 let result = project_collaborator::Entity::delete_many()
2865 .filter(
2866 Condition::all()
2867 .add(project_collaborator::Column::ProjectId.eq(project_id))
2868 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2869 .add(
2870 project_collaborator::Column::ConnectionServerId
2871 .eq(connection.owner_id as i32),
2872 ),
2873 )
2874 .exec(&*tx)
2875 .await?;
2876 if result.rows_affected == 0 {
2877 Err(anyhow!("not a collaborator on this project"))?;
2878 }
2879
2880 let project = project::Entity::find_by_id(project_id)
2881 .one(&*tx)
2882 .await?
2883 .ok_or_else(|| anyhow!("no such project"))?;
2884 let collaborators = project
2885 .find_related(project_collaborator::Entity)
2886 .all(&*tx)
2887 .await?;
2888 let connection_ids = collaborators
2889 .into_iter()
2890 .map(|collaborator| collaborator.connection())
2891 .collect();
2892
2893 follower::Entity::delete_many()
2894 .filter(
2895 Condition::any()
2896 .add(
2897 Condition::all()
2898 .add(follower::Column::ProjectId.eq(project_id))
2899 .add(
2900 follower::Column::LeaderConnectionServerId
2901 .eq(connection.owner_id),
2902 )
2903 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
2904 )
2905 .add(
2906 Condition::all()
2907 .add(follower::Column::ProjectId.eq(project_id))
2908 .add(
2909 follower::Column::FollowerConnectionServerId
2910 .eq(connection.owner_id),
2911 )
2912 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
2913 ),
2914 )
2915 .exec(&*tx)
2916 .await?;
2917
2918 let room = self.get_room(project.room_id, &tx).await?;
2919 let left_project = LeftProject {
2920 id: project_id,
2921 host_user_id: project.host_user_id,
2922 host_connection_id: project.host_connection()?,
2923 connection_ids,
2924 };
2925 Ok((room, left_project))
2926 })
2927 .await
2928 }
2929
2930 pub async fn project_collaborators(
2931 &self,
2932 project_id: ProjectId,
2933 connection_id: ConnectionId,
2934 ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
2935 let room_id = self.room_id_for_project(project_id).await?;
2936 self.room_transaction(room_id, |tx| async move {
2937 let collaborators = project_collaborator::Entity::find()
2938 .filter(project_collaborator::Column::ProjectId.eq(project_id))
2939 .all(&*tx)
2940 .await?
2941 .into_iter()
2942 .map(|collaborator| ProjectCollaborator {
2943 connection_id: collaborator.connection(),
2944 user_id: collaborator.user_id,
2945 replica_id: collaborator.replica_id,
2946 is_host: collaborator.is_host,
2947 })
2948 .collect::<Vec<_>>();
2949
2950 if collaborators
2951 .iter()
2952 .any(|collaborator| collaborator.connection_id == connection_id)
2953 {
2954 Ok(collaborators)
2955 } else {
2956 Err(anyhow!("no such project"))?
2957 }
2958 })
2959 .await
2960 }
2961
2962 pub async fn project_connection_ids(
2963 &self,
2964 project_id: ProjectId,
2965 connection_id: ConnectionId,
2966 ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
2967 let room_id = self.room_id_for_project(project_id).await?;
2968 self.room_transaction(room_id, |tx| async move {
2969 let mut collaborators = project_collaborator::Entity::find()
2970 .filter(project_collaborator::Column::ProjectId.eq(project_id))
2971 .stream(&*tx)
2972 .await?;
2973
2974 let mut connection_ids = HashSet::default();
2975 while let Some(collaborator) = collaborators.next().await {
2976 let collaborator = collaborator?;
2977 connection_ids.insert(collaborator.connection());
2978 }
2979
2980 if connection_ids.contains(&connection_id) {
2981 Ok(connection_ids)
2982 } else {
2983 Err(anyhow!("no such project"))?
2984 }
2985 })
2986 .await
2987 }
2988
2989 async fn project_guest_connection_ids(
2990 &self,
2991 project_id: ProjectId,
2992 tx: &DatabaseTransaction,
2993 ) -> Result<Vec<ConnectionId>> {
2994 let mut collaborators = project_collaborator::Entity::find()
2995 .filter(
2996 project_collaborator::Column::ProjectId
2997 .eq(project_id)
2998 .and(project_collaborator::Column::IsHost.eq(false)),
2999 )
3000 .stream(tx)
3001 .await?;
3002
3003 let mut guest_connection_ids = Vec::new();
3004 while let Some(collaborator) = collaborators.next().await {
3005 let collaborator = collaborator?;
3006 guest_connection_ids.push(collaborator.connection());
3007 }
3008 Ok(guest_connection_ids)
3009 }
3010
3011 async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
3012 self.transaction(|tx| async move {
3013 let project = project::Entity::find_by_id(project_id)
3014 .one(&*tx)
3015 .await?
3016 .ok_or_else(|| anyhow!("project {} not found", project_id))?;
3017 Ok(project.room_id)
3018 })
3019 .await
3020 }
3021
3022 // access tokens
3023
3024 pub async fn create_access_token(
3025 &self,
3026 user_id: UserId,
3027 access_token_hash: &str,
3028 max_access_token_count: usize,
3029 ) -> Result<AccessTokenId> {
3030 self.transaction(|tx| async {
3031 let tx = tx;
3032
3033 let token = access_token::ActiveModel {
3034 user_id: ActiveValue::set(user_id),
3035 hash: ActiveValue::set(access_token_hash.into()),
3036 ..Default::default()
3037 }
3038 .insert(&*tx)
3039 .await?;
3040
3041 access_token::Entity::delete_many()
3042 .filter(
3043 access_token::Column::Id.in_subquery(
3044 Query::select()
3045 .column(access_token::Column::Id)
3046 .from(access_token::Entity)
3047 .and_where(access_token::Column::UserId.eq(user_id))
3048 .order_by(access_token::Column::Id, sea_orm::Order::Desc)
3049 .limit(10000)
3050 .offset(max_access_token_count as u64)
3051 .to_owned(),
3052 ),
3053 )
3054 .exec(&*tx)
3055 .await?;
3056 Ok(token.id)
3057 })
3058 .await
3059 }
3060
3061 pub async fn get_access_token(
3062 &self,
3063 access_token_id: AccessTokenId,
3064 ) -> Result<access_token::Model> {
3065 self.transaction(|tx| async move {
3066 Ok(access_token::Entity::find_by_id(access_token_id)
3067 .one(&*tx)
3068 .await?
3069 .ok_or_else(|| anyhow!("no such access token"))?)
3070 })
3071 .await
3072 }
3073
3074 // channels
3075
3076 pub async fn create_root_channel(
3077 &self,
3078 name: &str,
3079 live_kit_room: &str,
3080 creator_id: UserId,
3081 ) -> Result<ChannelId> {
3082 self.create_channel(name, None, live_kit_room, creator_id)
3083 .await
3084 }
3085
3086 pub async fn create_channel(
3087 &self,
3088 name: &str,
3089 parent: Option<ChannelId>,
3090 live_kit_room: &str,
3091 creator_id: UserId,
3092 ) -> Result<ChannelId> {
3093 self.transaction(move |tx| async move {
3094 let tx = tx;
3095
3096 if let Some(parent) = parent {
3097 let channels = self.get_channel_ancestors(parent, &*tx).await?;
3098 channel_member::Entity::find()
3099 .filter(channel_member::Column::ChannelId.is_in(channels.iter().copied()))
3100 .filter(
3101 channel_member::Column::UserId
3102 .eq(creator_id)
3103 .and(channel_member::Column::Accepted.eq(true)),
3104 )
3105 .one(&*tx)
3106 .await?
3107 .ok_or_else(|| {
3108 anyhow!("User does not have the permissions to create this channel")
3109 })?;
3110 }
3111
3112 let channel = channel::ActiveModel {
3113 name: ActiveValue::Set(name.to_string()),
3114 ..Default::default()
3115 };
3116
3117 let channel = channel.insert(&*tx).await?;
3118
3119 if let Some(parent) = parent {
3120 channel_parent::ActiveModel {
3121 child_id: ActiveValue::Set(channel.id),
3122 parent_id: ActiveValue::Set(parent),
3123 }
3124 .insert(&*tx)
3125 .await?;
3126 }
3127
3128 channel_member::ActiveModel {
3129 channel_id: ActiveValue::Set(channel.id),
3130 user_id: ActiveValue::Set(creator_id),
3131 accepted: ActiveValue::Set(true),
3132 admin: ActiveValue::Set(true),
3133 ..Default::default()
3134 }
3135 .insert(&*tx)
3136 .await?;
3137
3138 room::ActiveModel {
3139 channel_id: ActiveValue::Set(Some(channel.id)),
3140 live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
3141 ..Default::default()
3142 }
3143 .insert(&*tx)
3144 .await?;
3145
3146 Ok(channel.id)
3147 })
3148 .await
3149 }
3150
3151 pub async fn remove_channel(
3152 &self,
3153 channel_id: ChannelId,
3154 user_id: UserId,
3155 ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
3156 self.transaction(move |tx| async move {
3157 let tx = tx;
3158
3159 // Check if user is an admin
3160 channel_member::Entity::find()
3161 .filter(
3162 channel_member::Column::ChannelId
3163 .eq(channel_id)
3164 .and(channel_member::Column::UserId.eq(user_id))
3165 .and(channel_member::Column::Admin.eq(true)),
3166 )
3167 .one(&*tx)
3168 .await?
3169 .ok_or_else(|| anyhow!("user is not allowed to remove this channel"))?;
3170
3171 let mut descendants = self.get_channel_descendants([channel_id], &*tx).await?;
3172
3173 // Keep channels which have another active
3174 let mut channels_to_keep = channel_parent::Entity::find()
3175 .filter(
3176 channel_parent::Column::ChildId
3177 .is_in(descendants.keys().copied().filter(|&id| id != channel_id))
3178 .and(
3179 channel_parent::Column::ParentId.is_not_in(descendants.keys().copied()),
3180 ),
3181 )
3182 .stream(&*tx)
3183 .await?;
3184
3185 while let Some(row) = channels_to_keep.next().await {
3186 let row = row?;
3187 descendants.remove(&row.child_id);
3188 }
3189
3190 drop(channels_to_keep);
3191
3192 let channels_to_remove = descendants.keys().copied().collect::<Vec<_>>();
3193
3194 let members_to_notify: Vec<UserId> = channel_member::Entity::find()
3195 .filter(channel_member::Column::ChannelId.is_in(channels_to_remove.iter().copied()))
3196 .select_only()
3197 .column(channel_member::Column::UserId)
3198 .distinct()
3199 .into_values::<_, QueryUserIds>()
3200 .all(&*tx)
3201 .await?;
3202
3203 // Channel members and parents should delete via cascade
3204 channel::Entity::delete_many()
3205 .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
3206 .exec(&*tx)
3207 .await?;
3208
3209 Ok((channels_to_remove, members_to_notify))
3210 })
3211 .await
3212 }
3213
3214 pub async fn invite_channel_member(
3215 &self,
3216 channel_id: ChannelId,
3217 invitee_id: UserId,
3218 inviter_id: UserId,
3219 is_admin: bool,
3220 ) -> Result<()> {
3221 self.transaction(move |tx| async move {
3222 let tx = tx;
3223
3224 // Check if inviter is a member
3225 channel_member::Entity::find()
3226 .filter(
3227 channel_member::Column::ChannelId
3228 .eq(channel_id)
3229 .and(channel_member::Column::UserId.eq(inviter_id))
3230 .and(channel_member::Column::Admin.eq(true)),
3231 )
3232 .one(&*tx)
3233 .await?
3234 .ok_or_else(|| {
3235 anyhow!("Inviter does not have permissions to invite the invitee")
3236 })?;
3237
3238 let channel_membership = channel_member::ActiveModel {
3239 channel_id: ActiveValue::Set(channel_id),
3240 user_id: ActiveValue::Set(invitee_id),
3241 accepted: ActiveValue::Set(false),
3242 admin: ActiveValue::Set(is_admin),
3243 ..Default::default()
3244 };
3245
3246 channel_membership.insert(&*tx).await?;
3247
3248 Ok(())
3249 })
3250 .await
3251 }
3252
3253 pub async fn respond_to_channel_invite(
3254 &self,
3255 channel_id: ChannelId,
3256 user_id: UserId,
3257 accept: bool,
3258 ) -> Result<()> {
3259 self.transaction(move |tx| async move {
3260 let tx = tx;
3261
3262 let rows_affected = if accept {
3263 channel_member::Entity::update_many()
3264 .set(channel_member::ActiveModel {
3265 accepted: ActiveValue::Set(accept),
3266 ..Default::default()
3267 })
3268 .filter(
3269 channel_member::Column::ChannelId
3270 .eq(channel_id)
3271 .and(channel_member::Column::UserId.eq(user_id))
3272 .and(channel_member::Column::Accepted.eq(false)),
3273 )
3274 .exec(&*tx)
3275 .await?
3276 .rows_affected
3277 } else {
3278 channel_member::ActiveModel {
3279 channel_id: ActiveValue::Unchanged(channel_id),
3280 user_id: ActiveValue::Unchanged(user_id),
3281 ..Default::default()
3282 }
3283 .delete(&*tx)
3284 .await?
3285 .rows_affected
3286 };
3287
3288 if rows_affected == 0 {
3289 Err(anyhow!("no such invitation"))?;
3290 }
3291
3292 Ok(())
3293 })
3294 .await
3295 }
3296
3297 pub async fn get_channel_invites(&self, user_id: UserId) -> Result<Vec<Channel>> {
3298 self.transaction(|tx| async move {
3299 let tx = tx;
3300
3301 let channel_invites = channel_member::Entity::find()
3302 .filter(
3303 channel_member::Column::UserId
3304 .eq(user_id)
3305 .and(channel_member::Column::Accepted.eq(false)),
3306 )
3307 .all(&*tx)
3308 .await?;
3309
3310 let channels = channel::Entity::find()
3311 .filter(
3312 channel::Column::Id.is_in(
3313 channel_invites
3314 .into_iter()
3315 .map(|channel_member| channel_member.channel_id),
3316 ),
3317 )
3318 .all(&*tx)
3319 .await?;
3320
3321 let channels = channels
3322 .into_iter()
3323 .map(|channel| Channel {
3324 id: channel.id,
3325 name: channel.name,
3326 parent_id: None,
3327 })
3328 .collect();
3329
3330 Ok(channels)
3331 })
3332 .await
3333 }
3334
3335 pub async fn get_channels(&self, user_id: UserId) -> Result<Vec<Channel>> {
3336 self.transaction(|tx| async move {
3337 let tx = tx;
3338
3339 let starting_channel_ids: Vec<ChannelId> = channel_member::Entity::find()
3340 .filter(
3341 channel_member::Column::UserId
3342 .eq(user_id)
3343 .and(channel_member::Column::Accepted.eq(true)),
3344 )
3345 .select_only()
3346 .column(channel_member::Column::ChannelId)
3347 .into_values::<_, QueryChannelIds>()
3348 .all(&*tx)
3349 .await?;
3350
3351 let parents_by_child_id = self
3352 .get_channel_descendants(starting_channel_ids, &*tx)
3353 .await?;
3354
3355 let mut channels = Vec::with_capacity(parents_by_child_id.len());
3356 let mut rows = channel::Entity::find()
3357 .filter(channel::Column::Id.is_in(parents_by_child_id.keys().copied()))
3358 .stream(&*tx)
3359 .await?;
3360
3361 while let Some(row) = rows.next().await {
3362 let row = row?;
3363 channels.push(Channel {
3364 id: row.id,
3365 name: row.name,
3366 parent_id: parents_by_child_id.get(&row.id).copied().flatten(),
3367 });
3368 }
3369
3370 drop(rows);
3371
3372 Ok(channels)
3373 })
3374 .await
3375 }
3376
3377 pub async fn get_channel_members(&self, id: ChannelId) -> Result<Vec<UserId>> {
3378 self.transaction(|tx| async move {
3379 let tx = tx;
3380 let ancestor_ids = self.get_channel_ancestors(id, &*tx).await?;
3381 let user_ids = channel_member::Entity::find()
3382 .distinct()
3383 .filter(channel_member::Column::ChannelId.is_in(ancestor_ids.iter().copied()))
3384 .select_only()
3385 .column(channel_member::Column::UserId)
3386 .into_values::<_, QueryUserIds>()
3387 .all(&*tx)
3388 .await?;
3389 Ok(user_ids)
3390 })
3391 .await
3392 }
3393
3394 async fn get_channel_ancestors(
3395 &self,
3396 id: ChannelId,
3397 tx: &DatabaseTransaction,
3398 ) -> Result<Vec<ChannelId>> {
3399 let sql = format!(
3400 r#"
3401 WITH RECURSIVE channel_tree(child_id, parent_id) AS (
3402 SELECT CAST(NULL as INTEGER) as child_id, root_ids.column1 as parent_id
3403 FROM (VALUES ({})) as root_ids
3404 UNION
3405 SELECT channel_parents.child_id, channel_parents.parent_id
3406 FROM channel_parents, channel_tree
3407 WHERE channel_parents.child_id = channel_tree.parent_id
3408 )
3409 SELECT DISTINCT channel_tree.parent_id
3410 FROM channel_tree
3411 "#,
3412 id
3413 );
3414
3415 #[derive(FromQueryResult, Debug, PartialEq)]
3416 pub struct ChannelParent {
3417 pub parent_id: ChannelId,
3418 }
3419
3420 let stmt = Statement::from_string(self.pool.get_database_backend(), sql);
3421
3422 let mut channel_ids_stream = channel_parent::Entity::find()
3423 .from_raw_sql(stmt)
3424 .into_model::<ChannelParent>()
3425 .stream(&*tx)
3426 .await?;
3427
3428 let mut channel_ids = vec![];
3429 while let Some(channel_id) = channel_ids_stream.next().await {
3430 channel_ids.push(channel_id?.parent_id);
3431 }
3432
3433 Ok(channel_ids)
3434 }
3435
3436 async fn get_channel_descendants(
3437 &self,
3438 channel_ids: impl IntoIterator<Item = ChannelId>,
3439 tx: &DatabaseTransaction,
3440 ) -> Result<HashMap<ChannelId, Option<ChannelId>>> {
3441 let mut values = String::new();
3442 for id in channel_ids {
3443 if !values.is_empty() {
3444 values.push_str(", ");
3445 }
3446 write!(&mut values, "({})", id).unwrap();
3447 }
3448
3449 if values.is_empty() {
3450 return Ok(HashMap::default());
3451 }
3452
3453 let sql = format!(
3454 r#"
3455 WITH RECURSIVE channel_tree(child_id, parent_id) AS (
3456 SELECT root_ids.column1 as child_id, CAST(NULL as INTEGER) as parent_id
3457 FROM (VALUES {}) as root_ids
3458 UNION
3459 SELECT channel_parents.child_id, channel_parents.parent_id
3460 FROM channel_parents, channel_tree
3461 WHERE channel_parents.parent_id = channel_tree.child_id
3462 )
3463 SELECT channel_tree.child_id, channel_tree.parent_id
3464 FROM channel_tree
3465 ORDER BY child_id, parent_id IS NOT NULL
3466 "#,
3467 values
3468 );
3469
3470 #[derive(FromQueryResult, Debug, PartialEq)]
3471 pub struct ChannelParent {
3472 pub child_id: ChannelId,
3473 pub parent_id: Option<ChannelId>,
3474 }
3475
3476 let stmt = Statement::from_string(self.pool.get_database_backend(), sql);
3477
3478 let mut parents_by_child_id = HashMap::default();
3479 let mut parents = channel_parent::Entity::find()
3480 .from_raw_sql(stmt)
3481 .into_model::<ChannelParent>()
3482 .stream(tx)
3483 .await?;
3484
3485 while let Some(parent) = parents.next().await {
3486 let parent = parent?;
3487 parents_by_child_id.insert(parent.child_id, parent.parent_id);
3488 }
3489
3490 Ok(parents_by_child_id)
3491 }
3492
3493 pub async fn get_channel(&self, channel_id: ChannelId) -> Result<Option<Channel>> {
3494 self.transaction(|tx| async move {
3495 let tx = tx;
3496 let channel = channel::Entity::find_by_id(channel_id).one(&*tx).await?;
3497
3498 Ok(channel.map(|channel| Channel {
3499 id: channel.id,
3500 name: channel.name,
3501 parent_id: None,
3502 }))
3503 })
3504 .await
3505 }
3506
3507 pub async fn get_channel_room(&self, channel_id: ChannelId) -> Result<RoomId> {
3508 self.transaction(|tx| async move {
3509 let tx = tx;
3510 let room = channel::Model {
3511 id: channel_id,
3512 ..Default::default()
3513 }
3514 .find_related(room::Entity)
3515 .one(&*tx)
3516 .await?
3517 .ok_or_else(|| anyhow!("invalid channel"))?;
3518 Ok(room.id)
3519 })
3520 .await
3521 }
3522
3523 async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
3524 where
3525 F: Send + Fn(TransactionHandle) -> Fut,
3526 Fut: Send + Future<Output = Result<T>>,
3527 {
3528 let body = async {
3529 let mut i = 0;
3530 loop {
3531 let (tx, result) = self.with_transaction(&f).await?;
3532 match result {
3533 Ok(result) => match tx.commit().await.map_err(Into::into) {
3534 Ok(()) => return Ok(result),
3535 Err(error) => {
3536 if !self.retry_on_serialization_error(&error, i).await {
3537 return Err(error);
3538 }
3539 }
3540 },
3541 Err(error) => {
3542 tx.rollback().await?;
3543 if !self.retry_on_serialization_error(&error, i).await {
3544 return Err(error);
3545 }
3546 }
3547 }
3548 i += 1;
3549 }
3550 };
3551
3552 self.run(body).await
3553 }
3554
3555 async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
3556 where
3557 F: Send + Fn(TransactionHandle) -> Fut,
3558 Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
3559 {
3560 let body = async {
3561 let mut i = 0;
3562 loop {
3563 let (tx, result) = self.with_transaction(&f).await?;
3564 match result {
3565 Ok(Some((room_id, data))) => {
3566 let lock = self.rooms.entry(room_id).or_default().clone();
3567 let _guard = lock.lock_owned().await;
3568 match tx.commit().await.map_err(Into::into) {
3569 Ok(()) => {
3570 return Ok(Some(RoomGuard {
3571 data,
3572 _guard,
3573 _not_send: PhantomData,
3574 }));
3575 }
3576 Err(error) => {
3577 if !self.retry_on_serialization_error(&error, i).await {
3578 return Err(error);
3579 }
3580 }
3581 }
3582 }
3583 Ok(None) => match tx.commit().await.map_err(Into::into) {
3584 Ok(()) => return Ok(None),
3585 Err(error) => {
3586 if !self.retry_on_serialization_error(&error, i).await {
3587 return Err(error);
3588 }
3589 }
3590 },
3591 Err(error) => {
3592 tx.rollback().await?;
3593 if !self.retry_on_serialization_error(&error, i).await {
3594 return Err(error);
3595 }
3596 }
3597 }
3598 i += 1;
3599 }
3600 };
3601
3602 self.run(body).await
3603 }
3604
3605 async fn room_transaction<F, Fut, T>(&self, room_id: RoomId, f: F) -> Result<RoomGuard<T>>
3606 where
3607 F: Send + Fn(TransactionHandle) -> Fut,
3608 Fut: Send + Future<Output = Result<T>>,
3609 {
3610 let body = async {
3611 let mut i = 0;
3612 loop {
3613 let lock = self.rooms.entry(room_id).or_default().clone();
3614 let _guard = lock.lock_owned().await;
3615 let (tx, result) = self.with_transaction(&f).await?;
3616 match result {
3617 Ok(data) => match tx.commit().await.map_err(Into::into) {
3618 Ok(()) => {
3619 return Ok(RoomGuard {
3620 data,
3621 _guard,
3622 _not_send: PhantomData,
3623 });
3624 }
3625 Err(error) => {
3626 if !self.retry_on_serialization_error(&error, i).await {
3627 return Err(error);
3628 }
3629 }
3630 },
3631 Err(error) => {
3632 tx.rollback().await?;
3633 if !self.retry_on_serialization_error(&error, i).await {
3634 return Err(error);
3635 }
3636 }
3637 }
3638 i += 1;
3639 }
3640 };
3641
3642 self.run(body).await
3643 }
3644
3645 async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
3646 where
3647 F: Send + Fn(TransactionHandle) -> Fut,
3648 Fut: Send + Future<Output = Result<T>>,
3649 {
3650 let tx = self
3651 .pool
3652 .begin_with_config(Some(IsolationLevel::Serializable), None)
3653 .await?;
3654
3655 let mut tx = Arc::new(Some(tx));
3656 let result = f(TransactionHandle(tx.clone())).await;
3657 let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
3658 return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
3659 };
3660
3661 Ok((tx, result))
3662 }
3663
3664 async fn run<F, T>(&self, future: F) -> Result<T>
3665 where
3666 F: Future<Output = Result<T>>,
3667 {
3668 #[cfg(test)]
3669 {
3670 if let Executor::Deterministic(executor) = &self.executor {
3671 executor.simulate_random_delay().await;
3672 }
3673
3674 self.runtime.as_ref().unwrap().block_on(future)
3675 }
3676
3677 #[cfg(not(test))]
3678 {
3679 future.await
3680 }
3681 }
3682
3683 async fn retry_on_serialization_error(&self, error: &Error, prev_attempt_count: u32) -> bool {
3684 // If the error is due to a failure to serialize concurrent transactions, then retry
3685 // this transaction after a delay. With each subsequent retry, double the delay duration.
3686 // Also vary the delay randomly in order to ensure different database connections retry
3687 // at different times.
3688 if is_serialization_error(error) {
3689 let base_delay = 4_u64 << prev_attempt_count.min(16);
3690 let randomized_delay = base_delay as f32 * self.rng.lock().await.gen_range(0.5..=2.0);
3691 log::info!(
3692 "retrying transaction after serialization error. delay: {} ms.",
3693 randomized_delay
3694 );
3695 self.executor
3696 .sleep(Duration::from_millis(randomized_delay as u64))
3697 .await;
3698 true
3699 } else {
3700 false
3701 }
3702 }
3703}
3704
3705fn is_serialization_error(error: &Error) -> bool {
3706 const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
3707 match error {
3708 Error::Database(
3709 DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
3710 | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
3711 ) if error
3712 .as_database_error()
3713 .and_then(|error| error.code())
3714 .as_deref()
3715 == Some(SERIALIZATION_FAILURE_CODE) =>
3716 {
3717 true
3718 }
3719 _ => false,
3720 }
3721}
3722
3723struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
3724
3725impl Deref for TransactionHandle {
3726 type Target = DatabaseTransaction;
3727
3728 fn deref(&self) -> &Self::Target {
3729 self.0.as_ref().as_ref().unwrap()
3730 }
3731}
3732
3733pub struct RoomGuard<T> {
3734 data: T,
3735 _guard: OwnedMutexGuard<()>,
3736 _not_send: PhantomData<Rc<()>>,
3737}
3738
3739impl<T> Deref for RoomGuard<T> {
3740 type Target = T;
3741
3742 fn deref(&self) -> &T {
3743 &self.data
3744 }
3745}
3746
3747impl<T> DerefMut for RoomGuard<T> {
3748 fn deref_mut(&mut self) -> &mut T {
3749 &mut self.data
3750 }
3751}
3752
3753#[derive(Debug, Serialize, Deserialize)]
3754pub struct NewUserParams {
3755 pub github_login: String,
3756 pub github_user_id: i32,
3757 pub invite_count: i32,
3758}
3759
3760#[derive(Debug)]
3761pub struct NewUserResult {
3762 pub user_id: UserId,
3763 pub metrics_id: String,
3764 pub inviting_user_id: Option<UserId>,
3765 pub signup_device_id: Option<String>,
3766}
3767
3768#[derive(FromQueryResult, Debug, PartialEq)]
3769pub struct Channel {
3770 pub id: ChannelId,
3771 pub name: String,
3772 pub parent_id: Option<ChannelId>,
3773}
3774
3775fn random_invite_code() -> String {
3776 nanoid::nanoid!(16)
3777}
3778
3779fn random_email_confirmation_code() -> String {
3780 nanoid::nanoid!(64)
3781}
3782
3783macro_rules! id_type {
3784 ($name:ident) => {
3785 #[derive(
3786 Clone,
3787 Copy,
3788 Debug,
3789 Default,
3790 PartialEq,
3791 Eq,
3792 PartialOrd,
3793 Ord,
3794 Hash,
3795 Serialize,
3796 Deserialize,
3797 )]
3798 #[serde(transparent)]
3799 pub struct $name(pub i32);
3800
3801 impl $name {
3802 #[allow(unused)]
3803 pub const MAX: Self = Self(i32::MAX);
3804
3805 #[allow(unused)]
3806 pub fn from_proto(value: u64) -> Self {
3807 Self(value as i32)
3808 }
3809
3810 #[allow(unused)]
3811 pub fn to_proto(self) -> u64 {
3812 self.0 as u64
3813 }
3814 }
3815
3816 impl std::fmt::Display for $name {
3817 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
3818 self.0.fmt(f)
3819 }
3820 }
3821
3822 impl From<$name> for sea_query::Value {
3823 fn from(value: $name) -> Self {
3824 sea_query::Value::Int(Some(value.0))
3825 }
3826 }
3827
3828 impl sea_orm::TryGetable for $name {
3829 fn try_get(
3830 res: &sea_orm::QueryResult,
3831 pre: &str,
3832 col: &str,
3833 ) -> Result<Self, sea_orm::TryGetError> {
3834 Ok(Self(i32::try_get(res, pre, col)?))
3835 }
3836 }
3837
3838 impl sea_query::ValueType for $name {
3839 fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
3840 match v {
3841 Value::TinyInt(Some(int)) => {
3842 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3843 }
3844 Value::SmallInt(Some(int)) => {
3845 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3846 }
3847 Value::Int(Some(int)) => {
3848 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3849 }
3850 Value::BigInt(Some(int)) => {
3851 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3852 }
3853 Value::TinyUnsigned(Some(int)) => {
3854 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3855 }
3856 Value::SmallUnsigned(Some(int)) => {
3857 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3858 }
3859 Value::Unsigned(Some(int)) => {
3860 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3861 }
3862 Value::BigUnsigned(Some(int)) => {
3863 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3864 }
3865 _ => Err(sea_query::ValueTypeErr),
3866 }
3867 }
3868
3869 fn type_name() -> String {
3870 stringify!($name).into()
3871 }
3872
3873 fn array_type() -> sea_query::ArrayType {
3874 sea_query::ArrayType::Int
3875 }
3876
3877 fn column_type() -> sea_query::ColumnType {
3878 sea_query::ColumnType::Integer(None)
3879 }
3880 }
3881
3882 impl sea_orm::TryFromU64 for $name {
3883 fn try_from_u64(n: u64) -> Result<Self, DbErr> {
3884 Ok(Self(n.try_into().map_err(|_| {
3885 DbErr::ConvertFromU64(concat!(
3886 "error converting ",
3887 stringify!($name),
3888 " to u64"
3889 ))
3890 })?))
3891 }
3892 }
3893
3894 impl sea_query::Nullable for $name {
3895 fn null() -> Value {
3896 Value::Int(None)
3897 }
3898 }
3899 };
3900}
3901
3902id_type!(AccessTokenId);
3903id_type!(ChannelId);
3904id_type!(ChannelMemberId);
3905id_type!(ContactId);
3906id_type!(FollowerId);
3907id_type!(RoomId);
3908id_type!(RoomParticipantId);
3909id_type!(ProjectId);
3910id_type!(ProjectCollaboratorId);
3911id_type!(ReplicaId);
3912id_type!(ServerId);
3913id_type!(SignupId);
3914id_type!(UserId);
3915
3916pub struct RejoinedRoom {
3917 pub room: proto::Room,
3918 pub rejoined_projects: Vec<RejoinedProject>,
3919 pub reshared_projects: Vec<ResharedProject>,
3920}
3921
3922pub struct ResharedProject {
3923 pub id: ProjectId,
3924 pub old_connection_id: ConnectionId,
3925 pub collaborators: Vec<ProjectCollaborator>,
3926 pub worktrees: Vec<proto::WorktreeMetadata>,
3927}
3928
3929pub struct RejoinedProject {
3930 pub id: ProjectId,
3931 pub old_connection_id: ConnectionId,
3932 pub collaborators: Vec<ProjectCollaborator>,
3933 pub worktrees: Vec<RejoinedWorktree>,
3934 pub language_servers: Vec<proto::LanguageServer>,
3935}
3936
3937#[derive(Debug)]
3938pub struct RejoinedWorktree {
3939 pub id: u64,
3940 pub abs_path: String,
3941 pub root_name: String,
3942 pub visible: bool,
3943 pub updated_entries: Vec<proto::Entry>,
3944 pub removed_entries: Vec<u64>,
3945 pub updated_repositories: Vec<proto::RepositoryEntry>,
3946 pub removed_repositories: Vec<u64>,
3947 pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
3948 pub settings_files: Vec<WorktreeSettingsFile>,
3949 pub scan_id: u64,
3950 pub completed_scan_id: u64,
3951}
3952
3953pub struct LeftRoom {
3954 pub room: proto::Room,
3955 pub left_projects: HashMap<ProjectId, LeftProject>,
3956 pub canceled_calls_to_user_ids: Vec<UserId>,
3957 pub deleted: bool,
3958}
3959
3960pub struct RefreshedRoom {
3961 pub room: proto::Room,
3962 pub stale_participant_user_ids: Vec<UserId>,
3963 pub canceled_calls_to_user_ids: Vec<UserId>,
3964}
3965
3966pub struct Project {
3967 pub collaborators: Vec<ProjectCollaborator>,
3968 pub worktrees: BTreeMap<u64, Worktree>,
3969 pub language_servers: Vec<proto::LanguageServer>,
3970}
3971
3972pub struct ProjectCollaborator {
3973 pub connection_id: ConnectionId,
3974 pub user_id: UserId,
3975 pub replica_id: ReplicaId,
3976 pub is_host: bool,
3977}
3978
3979impl ProjectCollaborator {
3980 pub fn to_proto(&self) -> proto::Collaborator {
3981 proto::Collaborator {
3982 peer_id: Some(self.connection_id.into()),
3983 replica_id: self.replica_id.0 as u32,
3984 user_id: self.user_id.to_proto(),
3985 }
3986 }
3987}
3988
3989#[derive(Debug)]
3990pub struct LeftProject {
3991 pub id: ProjectId,
3992 pub host_user_id: UserId,
3993 pub host_connection_id: ConnectionId,
3994 pub connection_ids: Vec<ConnectionId>,
3995}
3996
3997pub struct Worktree {
3998 pub id: u64,
3999 pub abs_path: String,
4000 pub root_name: String,
4001 pub visible: bool,
4002 pub entries: Vec<proto::Entry>,
4003 pub repository_entries: BTreeMap<u64, proto::RepositoryEntry>,
4004 pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
4005 pub settings_files: Vec<WorktreeSettingsFile>,
4006 pub scan_id: u64,
4007 pub completed_scan_id: u64,
4008}
4009
4010#[derive(Debug)]
4011pub struct WorktreeSettingsFile {
4012 pub path: String,
4013 pub content: String,
4014}
4015
4016#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
4017enum QueryChannelIds {
4018 ChannelId,
4019}
4020
4021#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
4022enum QueryUserIds {
4023 UserId,
4024}
4025
4026#[cfg(test)]
4027pub use test::*;
4028
4029#[cfg(test)]
4030mod test {
4031 use super::*;
4032 use gpui::executor::Background;
4033 use parking_lot::Mutex;
4034 use sea_orm::ConnectionTrait;
4035 use sqlx::migrate::MigrateDatabase;
4036 use std::sync::Arc;
4037
4038 pub struct TestDb {
4039 pub db: Option<Arc<Database>>,
4040 pub connection: Option<sqlx::AnyConnection>,
4041 }
4042
4043 impl TestDb {
4044 pub fn sqlite(background: Arc<Background>) -> Self {
4045 let url = format!("sqlite::memory:");
4046 let runtime = tokio::runtime::Builder::new_current_thread()
4047 .enable_io()
4048 .enable_time()
4049 .build()
4050 .unwrap();
4051
4052 let mut db = runtime.block_on(async {
4053 let mut options = ConnectOptions::new(url);
4054 options.max_connections(5);
4055 let db = Database::new(options, Executor::Deterministic(background))
4056 .await
4057 .unwrap();
4058 let sql = include_str!(concat!(
4059 env!("CARGO_MANIFEST_DIR"),
4060 "/migrations.sqlite/20221109000000_test_schema.sql"
4061 ));
4062 db.pool
4063 .execute(sea_orm::Statement::from_string(
4064 db.pool.get_database_backend(),
4065 sql.into(),
4066 ))
4067 .await
4068 .unwrap();
4069 db
4070 });
4071
4072 db.runtime = Some(runtime);
4073
4074 Self {
4075 db: Some(Arc::new(db)),
4076 connection: None,
4077 }
4078 }
4079
4080 pub fn postgres(background: Arc<Background>) -> Self {
4081 static LOCK: Mutex<()> = Mutex::new(());
4082
4083 let _guard = LOCK.lock();
4084 let mut rng = StdRng::from_entropy();
4085 let url = format!(
4086 "postgres://postgres@localhost/zed-test-{}",
4087 rng.gen::<u128>()
4088 );
4089 let runtime = tokio::runtime::Builder::new_current_thread()
4090 .enable_io()
4091 .enable_time()
4092 .build()
4093 .unwrap();
4094
4095 let mut db = runtime.block_on(async {
4096 sqlx::Postgres::create_database(&url)
4097 .await
4098 .expect("failed to create test db");
4099 let mut options = ConnectOptions::new(url);
4100 options
4101 .max_connections(5)
4102 .idle_timeout(Duration::from_secs(0));
4103 let db = Database::new(options, Executor::Deterministic(background))
4104 .await
4105 .unwrap();
4106 let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
4107 db.migrate(Path::new(migrations_path), false).await.unwrap();
4108 db
4109 });
4110
4111 db.runtime = Some(runtime);
4112
4113 Self {
4114 db: Some(Arc::new(db)),
4115 connection: None,
4116 }
4117 }
4118
4119 pub fn db(&self) -> &Arc<Database> {
4120 self.db.as_ref().unwrap()
4121 }
4122 }
4123
4124 impl Drop for TestDb {
4125 fn drop(&mut self) {
4126 let db = self.db.take().unwrap();
4127 if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
4128 db.runtime.as_ref().unwrap().block_on(async {
4129 use util::ResultExt;
4130 let query = "
4131 SELECT pg_terminate_backend(pg_stat_activity.pid)
4132 FROM pg_stat_activity
4133 WHERE
4134 pg_stat_activity.datname = current_database() AND
4135 pid <> pg_backend_pid();
4136 ";
4137 db.pool
4138 .execute(sea_orm::Statement::from_string(
4139 db.pool.get_database_backend(),
4140 query.into(),
4141 ))
4142 .await
4143 .log_err();
4144 sqlx::Postgres::drop_database(db.options.get_url())
4145 .await
4146 .log_err();
4147 })
4148 }
4149 }
4150 }
4151}