1mod access_token;
2mod channel;
3mod channel_member;
4mod channel_parent;
5mod contact;
6mod follower;
7mod language_server;
8mod project;
9mod project_collaborator;
10mod room;
11mod room_participant;
12mod server;
13mod signup;
14#[cfg(test)]
15mod tests;
16mod user;
17mod worktree;
18mod worktree_diagnostic_summary;
19mod worktree_entry;
20mod worktree_repository;
21mod worktree_repository_statuses;
22mod worktree_settings_file;
23
24use crate::executor::Executor;
25use crate::{Error, Result};
26use anyhow::anyhow;
27use collections::{BTreeMap, HashMap, HashSet};
28pub use contact::Contact;
29use dashmap::DashMap;
30use futures::StreamExt;
31use hyper::StatusCode;
32use rand::prelude::StdRng;
33use rand::{Rng, SeedableRng};
34use rpc::{proto, ConnectionId};
35use sea_orm::Condition;
36pub use sea_orm::ConnectOptions;
37use sea_orm::{
38 entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
39 DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
40 Statement, TransactionTrait,
41};
42use sea_query::{Alias, Expr, OnConflict, Query};
43use serde::{Deserialize, Serialize};
44pub use signup::{Invite, NewSignup, WaitlistSummary};
45use sqlx::migrate::{Migrate, Migration, MigrationSource};
46use sqlx::Connection;
47use std::fmt::Write as _;
48use std::ops::{Deref, DerefMut};
49use std::path::Path;
50use std::time::Duration;
51use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
52use tokio::sync::{Mutex, OwnedMutexGuard};
53pub use user::Model as User;
54
55pub struct Database {
56 options: ConnectOptions,
57 pool: DatabaseConnection,
58 rooms: DashMap<RoomId, Arc<Mutex<()>>>,
59 rng: Mutex<StdRng>,
60 executor: Executor,
61 #[cfg(test)]
62 runtime: Option<tokio::runtime::Runtime>,
63}
64
65impl Database {
66 pub async fn new(options: ConnectOptions, executor: Executor) -> Result<Self> {
67 Ok(Self {
68 options: options.clone(),
69 pool: sea_orm::Database::connect(options).await?,
70 rooms: DashMap::with_capacity(16384),
71 rng: Mutex::new(StdRng::seed_from_u64(0)),
72 executor,
73 #[cfg(test)]
74 runtime: None,
75 })
76 }
77
78 #[cfg(test)]
79 pub fn reset(&self) {
80 self.rooms.clear();
81 }
82
83 pub async fn migrate(
84 &self,
85 migrations_path: &Path,
86 ignore_checksum_mismatch: bool,
87 ) -> anyhow::Result<Vec<(Migration, Duration)>> {
88 let migrations = MigrationSource::resolve(migrations_path)
89 .await
90 .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
91
92 let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
93
94 connection.ensure_migrations_table().await?;
95 let applied_migrations: HashMap<_, _> = connection
96 .list_applied_migrations()
97 .await?
98 .into_iter()
99 .map(|m| (m.version, m))
100 .collect();
101
102 let mut new_migrations = Vec::new();
103 for migration in migrations {
104 match applied_migrations.get(&migration.version) {
105 Some(applied_migration) => {
106 if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
107 {
108 Err(anyhow!(
109 "checksum mismatch for applied migration {}",
110 migration.description
111 ))?;
112 }
113 }
114 None => {
115 let elapsed = connection.apply(&migration).await?;
116 new_migrations.push((migration, elapsed));
117 }
118 }
119 }
120
121 Ok(new_migrations)
122 }
123
124 pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
125 self.transaction(|tx| async move {
126 let server = server::ActiveModel {
127 environment: ActiveValue::set(environment.into()),
128 ..Default::default()
129 }
130 .insert(&*tx)
131 .await?;
132 Ok(server.id)
133 })
134 .await
135 }
136
137 pub async fn stale_room_ids(
138 &self,
139 environment: &str,
140 new_server_id: ServerId,
141 ) -> Result<Vec<RoomId>> {
142 self.transaction(|tx| async move {
143 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
144 enum QueryAs {
145 RoomId,
146 }
147
148 let stale_server_epochs = self
149 .stale_server_ids(environment, new_server_id, &tx)
150 .await?;
151 Ok(room_participant::Entity::find()
152 .select_only()
153 .column(room_participant::Column::RoomId)
154 .distinct()
155 .filter(
156 room_participant::Column::AnsweringConnectionServerId
157 .is_in(stale_server_epochs),
158 )
159 .into_values::<_, QueryAs>()
160 .all(&*tx)
161 .await?)
162 })
163 .await
164 }
165
166 pub async fn refresh_room(
167 &self,
168 room_id: RoomId,
169 new_server_id: ServerId,
170 ) -> Result<RoomGuard<RefreshedRoom>> {
171 self.room_transaction(room_id, |tx| async move {
172 let stale_participant_filter = Condition::all()
173 .add(room_participant::Column::RoomId.eq(room_id))
174 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
175 .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
176
177 let stale_participant_user_ids = room_participant::Entity::find()
178 .filter(stale_participant_filter.clone())
179 .all(&*tx)
180 .await?
181 .into_iter()
182 .map(|participant| participant.user_id)
183 .collect::<Vec<_>>();
184
185 // Delete participants who failed to reconnect and cancel their calls.
186 let mut canceled_calls_to_user_ids = Vec::new();
187 room_participant::Entity::delete_many()
188 .filter(stale_participant_filter)
189 .exec(&*tx)
190 .await?;
191 let called_participants = room_participant::Entity::find()
192 .filter(
193 Condition::all()
194 .add(
195 room_participant::Column::CallingUserId
196 .is_in(stale_participant_user_ids.iter().copied()),
197 )
198 .add(room_participant::Column::AnsweringConnectionId.is_null()),
199 )
200 .all(&*tx)
201 .await?;
202 room_participant::Entity::delete_many()
203 .filter(
204 room_participant::Column::Id
205 .is_in(called_participants.iter().map(|participant| participant.id)),
206 )
207 .exec(&*tx)
208 .await?;
209 canceled_calls_to_user_ids.extend(
210 called_participants
211 .into_iter()
212 .map(|participant| participant.user_id),
213 );
214
215 let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
216 let channel_members;
217 if let Some(channel_id) = channel_id {
218 channel_members = self.get_channel_members_internal(channel_id, &tx).await?;
219 } else {
220 channel_members = Vec::new();
221
222 // Delete the room if it becomes empty.
223 if room.participants.is_empty() {
224 project::Entity::delete_many()
225 .filter(project::Column::RoomId.eq(room_id))
226 .exec(&*tx)
227 .await?;
228 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
229 }
230 };
231
232 Ok(RefreshedRoom {
233 room,
234 channel_id,
235 channel_members,
236 stale_participant_user_ids,
237 canceled_calls_to_user_ids,
238 })
239 })
240 .await
241 }
242
243 pub async fn delete_stale_servers(
244 &self,
245 environment: &str,
246 new_server_id: ServerId,
247 ) -> Result<()> {
248 self.transaction(|tx| async move {
249 server::Entity::delete_many()
250 .filter(
251 Condition::all()
252 .add(server::Column::Environment.eq(environment))
253 .add(server::Column::Id.ne(new_server_id)),
254 )
255 .exec(&*tx)
256 .await?;
257 Ok(())
258 })
259 .await
260 }
261
262 async fn stale_server_ids(
263 &self,
264 environment: &str,
265 new_server_id: ServerId,
266 tx: &DatabaseTransaction,
267 ) -> Result<Vec<ServerId>> {
268 let stale_servers = server::Entity::find()
269 .filter(
270 Condition::all()
271 .add(server::Column::Environment.eq(environment))
272 .add(server::Column::Id.ne(new_server_id)),
273 )
274 .all(&*tx)
275 .await?;
276 Ok(stale_servers.into_iter().map(|server| server.id).collect())
277 }
278
279 // users
280
281 pub async fn create_user(
282 &self,
283 email_address: &str,
284 admin: bool,
285 params: NewUserParams,
286 ) -> Result<NewUserResult> {
287 self.transaction(|tx| async {
288 let tx = tx;
289 let user = user::Entity::insert(user::ActiveModel {
290 email_address: ActiveValue::set(Some(email_address.into())),
291 github_login: ActiveValue::set(params.github_login.clone()),
292 github_user_id: ActiveValue::set(Some(params.github_user_id)),
293 admin: ActiveValue::set(admin),
294 metrics_id: ActiveValue::set(Uuid::new_v4()),
295 ..Default::default()
296 })
297 .on_conflict(
298 OnConflict::column(user::Column::GithubLogin)
299 .update_column(user::Column::GithubLogin)
300 .to_owned(),
301 )
302 .exec_with_returning(&*tx)
303 .await?;
304
305 Ok(NewUserResult {
306 user_id: user.id,
307 metrics_id: user.metrics_id.to_string(),
308 signup_device_id: None,
309 inviting_user_id: None,
310 })
311 })
312 .await
313 }
314
315 pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
316 self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
317 .await
318 }
319
320 pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
321 self.transaction(|tx| async {
322 let tx = tx;
323 Ok(user::Entity::find()
324 .filter(user::Column::Id.is_in(ids.iter().copied()))
325 .all(&*tx)
326 .await?)
327 })
328 .await
329 }
330
331 pub async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>> {
332 self.transaction(|tx| async move {
333 Ok(user::Entity::find()
334 .filter(user::Column::GithubLogin.eq(github_login))
335 .one(&*tx)
336 .await?)
337 })
338 .await
339 }
340
341 pub async fn get_or_create_user_by_github_account(
342 &self,
343 github_login: &str,
344 github_user_id: Option<i32>,
345 github_email: Option<&str>,
346 ) -> Result<Option<User>> {
347 self.transaction(|tx| async move {
348 let tx = &*tx;
349 if let Some(github_user_id) = github_user_id {
350 if let Some(user_by_github_user_id) = user::Entity::find()
351 .filter(user::Column::GithubUserId.eq(github_user_id))
352 .one(tx)
353 .await?
354 {
355 let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
356 user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
357 Ok(Some(user_by_github_user_id.update(tx).await?))
358 } else if let Some(user_by_github_login) = user::Entity::find()
359 .filter(user::Column::GithubLogin.eq(github_login))
360 .one(tx)
361 .await?
362 {
363 let mut user_by_github_login = user_by_github_login.into_active_model();
364 user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
365 Ok(Some(user_by_github_login.update(tx).await?))
366 } else {
367 let user = user::Entity::insert(user::ActiveModel {
368 email_address: ActiveValue::set(github_email.map(|email| email.into())),
369 github_login: ActiveValue::set(github_login.into()),
370 github_user_id: ActiveValue::set(Some(github_user_id)),
371 admin: ActiveValue::set(false),
372 invite_count: ActiveValue::set(0),
373 invite_code: ActiveValue::set(None),
374 metrics_id: ActiveValue::set(Uuid::new_v4()),
375 ..Default::default()
376 })
377 .exec_with_returning(&*tx)
378 .await?;
379 Ok(Some(user))
380 }
381 } else {
382 Ok(user::Entity::find()
383 .filter(user::Column::GithubLogin.eq(github_login))
384 .one(tx)
385 .await?)
386 }
387 })
388 .await
389 }
390
391 pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
392 self.transaction(|tx| async move {
393 Ok(user::Entity::find()
394 .order_by_asc(user::Column::GithubLogin)
395 .limit(limit as u64)
396 .offset(page as u64 * limit as u64)
397 .all(&*tx)
398 .await?)
399 })
400 .await
401 }
402
403 pub async fn get_users_with_no_invites(
404 &self,
405 invited_by_another_user: bool,
406 ) -> Result<Vec<User>> {
407 self.transaction(|tx| async move {
408 Ok(user::Entity::find()
409 .filter(
410 user::Column::InviteCount
411 .eq(0)
412 .and(if invited_by_another_user {
413 user::Column::InviterId.is_not_null()
414 } else {
415 user::Column::InviterId.is_null()
416 }),
417 )
418 .all(&*tx)
419 .await?)
420 })
421 .await
422 }
423
424 pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
425 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
426 enum QueryAs {
427 MetricsId,
428 }
429
430 self.transaction(|tx| async move {
431 let metrics_id: Uuid = user::Entity::find_by_id(id)
432 .select_only()
433 .column(user::Column::MetricsId)
434 .into_values::<_, QueryAs>()
435 .one(&*tx)
436 .await?
437 .ok_or_else(|| anyhow!("could not find user"))?;
438 Ok(metrics_id.to_string())
439 })
440 .await
441 }
442
443 pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
444 self.transaction(|tx| async move {
445 user::Entity::update_many()
446 .filter(user::Column::Id.eq(id))
447 .set(user::ActiveModel {
448 admin: ActiveValue::set(is_admin),
449 ..Default::default()
450 })
451 .exec(&*tx)
452 .await?;
453 Ok(())
454 })
455 .await
456 }
457
458 pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
459 self.transaction(|tx| async move {
460 user::Entity::update_many()
461 .filter(user::Column::Id.eq(id))
462 .set(user::ActiveModel {
463 connected_once: ActiveValue::set(connected_once),
464 ..Default::default()
465 })
466 .exec(&*tx)
467 .await?;
468 Ok(())
469 })
470 .await
471 }
472
473 pub async fn destroy_user(&self, id: UserId) -> Result<()> {
474 self.transaction(|tx| async move {
475 access_token::Entity::delete_many()
476 .filter(access_token::Column::UserId.eq(id))
477 .exec(&*tx)
478 .await?;
479 user::Entity::delete_by_id(id).exec(&*tx).await?;
480 Ok(())
481 })
482 .await
483 }
484
485 // contacts
486
487 pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
488 #[derive(Debug, FromQueryResult)]
489 struct ContactWithUserBusyStatuses {
490 user_id_a: UserId,
491 user_id_b: UserId,
492 a_to_b: bool,
493 accepted: bool,
494 should_notify: bool,
495 user_a_busy: bool,
496 user_b_busy: bool,
497 }
498
499 self.transaction(|tx| async move {
500 let user_a_participant = Alias::new("user_a_participant");
501 let user_b_participant = Alias::new("user_b_participant");
502 let mut db_contacts = contact::Entity::find()
503 .column_as(
504 Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
505 .is_not_null(),
506 "user_a_busy",
507 )
508 .column_as(
509 Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
510 .is_not_null(),
511 "user_b_busy",
512 )
513 .filter(
514 contact::Column::UserIdA
515 .eq(user_id)
516 .or(contact::Column::UserIdB.eq(user_id)),
517 )
518 .join_as(
519 JoinType::LeftJoin,
520 contact::Relation::UserARoomParticipant.def(),
521 user_a_participant,
522 )
523 .join_as(
524 JoinType::LeftJoin,
525 contact::Relation::UserBRoomParticipant.def(),
526 user_b_participant,
527 )
528 .into_model::<ContactWithUserBusyStatuses>()
529 .stream(&*tx)
530 .await?;
531
532 let mut contacts = Vec::new();
533 while let Some(db_contact) = db_contacts.next().await {
534 let db_contact = db_contact?;
535 if db_contact.user_id_a == user_id {
536 if db_contact.accepted {
537 contacts.push(Contact::Accepted {
538 user_id: db_contact.user_id_b,
539 should_notify: db_contact.should_notify && db_contact.a_to_b,
540 busy: db_contact.user_b_busy,
541 });
542 } else if db_contact.a_to_b {
543 contacts.push(Contact::Outgoing {
544 user_id: db_contact.user_id_b,
545 })
546 } else {
547 contacts.push(Contact::Incoming {
548 user_id: db_contact.user_id_b,
549 should_notify: db_contact.should_notify,
550 });
551 }
552 } else if db_contact.accepted {
553 contacts.push(Contact::Accepted {
554 user_id: db_contact.user_id_a,
555 should_notify: db_contact.should_notify && !db_contact.a_to_b,
556 busy: db_contact.user_a_busy,
557 });
558 } else if db_contact.a_to_b {
559 contacts.push(Contact::Incoming {
560 user_id: db_contact.user_id_a,
561 should_notify: db_contact.should_notify,
562 });
563 } else {
564 contacts.push(Contact::Outgoing {
565 user_id: db_contact.user_id_a,
566 });
567 }
568 }
569
570 contacts.sort_unstable_by_key(|contact| contact.user_id());
571
572 Ok(contacts)
573 })
574 .await
575 }
576
577 pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
578 self.transaction(|tx| async move {
579 let participant = room_participant::Entity::find()
580 .filter(room_participant::Column::UserId.eq(user_id))
581 .one(&*tx)
582 .await?;
583 Ok(participant.is_some())
584 })
585 .await
586 }
587
588 pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
589 self.transaction(|tx| async move {
590 let (id_a, id_b) = if user_id_1 < user_id_2 {
591 (user_id_1, user_id_2)
592 } else {
593 (user_id_2, user_id_1)
594 };
595
596 Ok(contact::Entity::find()
597 .filter(
598 contact::Column::UserIdA
599 .eq(id_a)
600 .and(contact::Column::UserIdB.eq(id_b))
601 .and(contact::Column::Accepted.eq(true)),
602 )
603 .one(&*tx)
604 .await?
605 .is_some())
606 })
607 .await
608 }
609
610 pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
611 self.transaction(|tx| async move {
612 let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
613 (sender_id, receiver_id, true)
614 } else {
615 (receiver_id, sender_id, false)
616 };
617
618 let rows_affected = contact::Entity::insert(contact::ActiveModel {
619 user_id_a: ActiveValue::set(id_a),
620 user_id_b: ActiveValue::set(id_b),
621 a_to_b: ActiveValue::set(a_to_b),
622 accepted: ActiveValue::set(false),
623 should_notify: ActiveValue::set(true),
624 ..Default::default()
625 })
626 .on_conflict(
627 OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
628 .values([
629 (contact::Column::Accepted, true.into()),
630 (contact::Column::ShouldNotify, false.into()),
631 ])
632 .action_and_where(
633 contact::Column::Accepted.eq(false).and(
634 contact::Column::AToB
635 .eq(a_to_b)
636 .and(contact::Column::UserIdA.eq(id_b))
637 .or(contact::Column::AToB
638 .ne(a_to_b)
639 .and(contact::Column::UserIdA.eq(id_a))),
640 ),
641 )
642 .to_owned(),
643 )
644 .exec_without_returning(&*tx)
645 .await?;
646
647 if rows_affected == 1 {
648 Ok(())
649 } else {
650 Err(anyhow!("contact already requested"))?
651 }
652 })
653 .await
654 }
655
656 /// Returns a bool indicating whether the removed contact had originally accepted or not
657 ///
658 /// Deletes the contact identified by the requester and responder ids, and then returns
659 /// whether the deleted contact had originally accepted or was a pending contact request.
660 ///
661 /// # Arguments
662 ///
663 /// * `requester_id` - The user that initiates this request
664 /// * `responder_id` - The user that will be removed
665 pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<bool> {
666 self.transaction(|tx| async move {
667 let (id_a, id_b) = if responder_id < requester_id {
668 (responder_id, requester_id)
669 } else {
670 (requester_id, responder_id)
671 };
672
673 let contact = contact::Entity::find()
674 .filter(
675 contact::Column::UserIdA
676 .eq(id_a)
677 .and(contact::Column::UserIdB.eq(id_b)),
678 )
679 .one(&*tx)
680 .await?
681 .ok_or_else(|| anyhow!("no such contact"))?;
682
683 contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
684 Ok(contact.accepted)
685 })
686 .await
687 }
688
689 pub async fn dismiss_contact_notification(
690 &self,
691 user_id: UserId,
692 contact_user_id: UserId,
693 ) -> Result<()> {
694 self.transaction(|tx| async move {
695 let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
696 (user_id, contact_user_id, true)
697 } else {
698 (contact_user_id, user_id, false)
699 };
700
701 let result = contact::Entity::update_many()
702 .set(contact::ActiveModel {
703 should_notify: ActiveValue::set(false),
704 ..Default::default()
705 })
706 .filter(
707 contact::Column::UserIdA
708 .eq(id_a)
709 .and(contact::Column::UserIdB.eq(id_b))
710 .and(
711 contact::Column::AToB
712 .eq(a_to_b)
713 .and(contact::Column::Accepted.eq(true))
714 .or(contact::Column::AToB
715 .ne(a_to_b)
716 .and(contact::Column::Accepted.eq(false))),
717 ),
718 )
719 .exec(&*tx)
720 .await?;
721 if result.rows_affected == 0 {
722 Err(anyhow!("no such contact request"))?
723 } else {
724 Ok(())
725 }
726 })
727 .await
728 }
729
730 pub async fn respond_to_contact_request(
731 &self,
732 responder_id: UserId,
733 requester_id: UserId,
734 accept: bool,
735 ) -> Result<()> {
736 self.transaction(|tx| async move {
737 let (id_a, id_b, a_to_b) = if responder_id < requester_id {
738 (responder_id, requester_id, false)
739 } else {
740 (requester_id, responder_id, true)
741 };
742 let rows_affected = if accept {
743 let result = contact::Entity::update_many()
744 .set(contact::ActiveModel {
745 accepted: ActiveValue::set(true),
746 should_notify: ActiveValue::set(true),
747 ..Default::default()
748 })
749 .filter(
750 contact::Column::UserIdA
751 .eq(id_a)
752 .and(contact::Column::UserIdB.eq(id_b))
753 .and(contact::Column::AToB.eq(a_to_b)),
754 )
755 .exec(&*tx)
756 .await?;
757 result.rows_affected
758 } else {
759 let result = contact::Entity::delete_many()
760 .filter(
761 contact::Column::UserIdA
762 .eq(id_a)
763 .and(contact::Column::UserIdB.eq(id_b))
764 .and(contact::Column::AToB.eq(a_to_b))
765 .and(contact::Column::Accepted.eq(false)),
766 )
767 .exec(&*tx)
768 .await?;
769
770 result.rows_affected
771 };
772
773 if rows_affected == 1 {
774 Ok(())
775 } else {
776 Err(anyhow!("no such contact request"))?
777 }
778 })
779 .await
780 }
781
782 pub fn fuzzy_like_string(string: &str) -> String {
783 let mut result = String::with_capacity(string.len() * 2 + 1);
784 for c in string.chars() {
785 if c.is_alphanumeric() {
786 result.push('%');
787 result.push(c);
788 }
789 }
790 result.push('%');
791 result
792 }
793
794 pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
795 self.transaction(|tx| async {
796 let tx = tx;
797 let like_string = Self::fuzzy_like_string(name_query);
798 let query = "
799 SELECT users.*
800 FROM users
801 WHERE github_login ILIKE $1
802 ORDER BY github_login <-> $2
803 LIMIT $3
804 ";
805
806 Ok(user::Entity::find()
807 .from_raw_sql(Statement::from_sql_and_values(
808 self.pool.get_database_backend(),
809 query.into(),
810 vec![like_string.into(), name_query.into(), limit.into()],
811 ))
812 .all(&*tx)
813 .await?)
814 })
815 .await
816 }
817
818 // signups
819
820 pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
821 self.transaction(|tx| async move {
822 signup::Entity::insert(signup::ActiveModel {
823 email_address: ActiveValue::set(signup.email_address.clone()),
824 email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
825 email_confirmation_sent: ActiveValue::set(false),
826 platform_mac: ActiveValue::set(signup.platform_mac),
827 platform_windows: ActiveValue::set(signup.platform_windows),
828 platform_linux: ActiveValue::set(signup.platform_linux),
829 platform_unknown: ActiveValue::set(false),
830 editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
831 programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
832 device_id: ActiveValue::set(signup.device_id.clone()),
833 added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
834 ..Default::default()
835 })
836 .on_conflict(
837 OnConflict::column(signup::Column::EmailAddress)
838 .update_columns([
839 signup::Column::PlatformMac,
840 signup::Column::PlatformWindows,
841 signup::Column::PlatformLinux,
842 signup::Column::EditorFeatures,
843 signup::Column::ProgrammingLanguages,
844 signup::Column::DeviceId,
845 signup::Column::AddedToMailingList,
846 ])
847 .to_owned(),
848 )
849 .exec(&*tx)
850 .await?;
851 Ok(())
852 })
853 .await
854 }
855
856 pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
857 self.transaction(|tx| async move {
858 let signup = signup::Entity::find()
859 .filter(signup::Column::EmailAddress.eq(email_address))
860 .one(&*tx)
861 .await?
862 .ok_or_else(|| {
863 anyhow!("signup with email address {} doesn't exist", email_address)
864 })?;
865
866 Ok(signup)
867 })
868 .await
869 }
870
871 pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
872 self.transaction(|tx| async move {
873 let query = "
874 SELECT
875 COUNT(*) as count,
876 COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
877 COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
878 COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
879 COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
880 FROM (
881 SELECT *
882 FROM signups
883 WHERE
884 NOT email_confirmation_sent
885 ) AS unsent
886 ";
887 Ok(
888 WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
889 self.pool.get_database_backend(),
890 query.into(),
891 vec![],
892 ))
893 .one(&*tx)
894 .await?
895 .ok_or_else(|| anyhow!("invalid result"))?,
896 )
897 })
898 .await
899 }
900
901 pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
902 let emails = invites
903 .iter()
904 .map(|s| s.email_address.as_str())
905 .collect::<Vec<_>>();
906 self.transaction(|tx| async {
907 let tx = tx;
908 signup::Entity::update_many()
909 .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
910 .set(signup::ActiveModel {
911 email_confirmation_sent: ActiveValue::set(true),
912 ..Default::default()
913 })
914 .exec(&*tx)
915 .await?;
916 Ok(())
917 })
918 .await
919 }
920
921 pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
922 self.transaction(|tx| async move {
923 Ok(signup::Entity::find()
924 .select_only()
925 .column(signup::Column::EmailAddress)
926 .column(signup::Column::EmailConfirmationCode)
927 .filter(
928 signup::Column::EmailConfirmationSent.eq(false).and(
929 signup::Column::PlatformMac
930 .eq(true)
931 .or(signup::Column::PlatformUnknown.eq(true)),
932 ),
933 )
934 .order_by_asc(signup::Column::CreatedAt)
935 .limit(count as u64)
936 .into_model()
937 .all(&*tx)
938 .await?)
939 })
940 .await
941 }
942
943 // invite codes
944
945 pub async fn create_invite_from_code(
946 &self,
947 code: &str,
948 email_address: &str,
949 device_id: Option<&str>,
950 added_to_mailing_list: bool,
951 ) -> Result<Invite> {
952 self.transaction(|tx| async move {
953 let existing_user = user::Entity::find()
954 .filter(user::Column::EmailAddress.eq(email_address))
955 .one(&*tx)
956 .await?;
957
958 if existing_user.is_some() {
959 Err(anyhow!("email address is already in use"))?;
960 }
961
962 let inviting_user_with_invites = match user::Entity::find()
963 .filter(
964 user::Column::InviteCode
965 .eq(code)
966 .and(user::Column::InviteCount.gt(0)),
967 )
968 .one(&*tx)
969 .await?
970 {
971 Some(inviting_user) => inviting_user,
972 None => {
973 return Err(Error::Http(
974 StatusCode::UNAUTHORIZED,
975 "unable to find an invite code with invites remaining".to_string(),
976 ))?
977 }
978 };
979 user::Entity::update_many()
980 .filter(
981 user::Column::Id
982 .eq(inviting_user_with_invites.id)
983 .and(user::Column::InviteCount.gt(0)),
984 )
985 .col_expr(
986 user::Column::InviteCount,
987 Expr::col(user::Column::InviteCount).sub(1),
988 )
989 .exec(&*tx)
990 .await?;
991
992 let signup = signup::Entity::insert(signup::ActiveModel {
993 email_address: ActiveValue::set(email_address.into()),
994 email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
995 email_confirmation_sent: ActiveValue::set(false),
996 inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
997 platform_linux: ActiveValue::set(false),
998 platform_mac: ActiveValue::set(false),
999 platform_windows: ActiveValue::set(false),
1000 platform_unknown: ActiveValue::set(true),
1001 device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
1002 added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
1003 ..Default::default()
1004 })
1005 .on_conflict(
1006 OnConflict::column(signup::Column::EmailAddress)
1007 .update_column(signup::Column::InvitingUserId)
1008 .to_owned(),
1009 )
1010 .exec_with_returning(&*tx)
1011 .await?;
1012
1013 Ok(Invite {
1014 email_address: signup.email_address,
1015 email_confirmation_code: signup.email_confirmation_code,
1016 })
1017 })
1018 .await
1019 }
1020
1021 pub async fn create_user_from_invite(
1022 &self,
1023 invite: &Invite,
1024 user: NewUserParams,
1025 ) -> Result<Option<NewUserResult>> {
1026 self.transaction(|tx| async {
1027 let tx = tx;
1028 let signup = signup::Entity::find()
1029 .filter(
1030 signup::Column::EmailAddress
1031 .eq(invite.email_address.as_str())
1032 .and(
1033 signup::Column::EmailConfirmationCode
1034 .eq(invite.email_confirmation_code.as_str()),
1035 ),
1036 )
1037 .one(&*tx)
1038 .await?
1039 .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
1040
1041 if signup.user_id.is_some() {
1042 return Ok(None);
1043 }
1044
1045 let user = user::Entity::insert(user::ActiveModel {
1046 email_address: ActiveValue::set(Some(invite.email_address.clone())),
1047 github_login: ActiveValue::set(user.github_login.clone()),
1048 github_user_id: ActiveValue::set(Some(user.github_user_id)),
1049 admin: ActiveValue::set(false),
1050 invite_count: ActiveValue::set(user.invite_count),
1051 invite_code: ActiveValue::set(Some(random_invite_code())),
1052 metrics_id: ActiveValue::set(Uuid::new_v4()),
1053 ..Default::default()
1054 })
1055 .on_conflict(
1056 OnConflict::column(user::Column::GithubLogin)
1057 .update_columns([
1058 user::Column::EmailAddress,
1059 user::Column::GithubUserId,
1060 user::Column::Admin,
1061 ])
1062 .to_owned(),
1063 )
1064 .exec_with_returning(&*tx)
1065 .await?;
1066
1067 let mut signup = signup.into_active_model();
1068 signup.user_id = ActiveValue::set(Some(user.id));
1069 let signup = signup.update(&*tx).await?;
1070
1071 if let Some(inviting_user_id) = signup.inviting_user_id {
1072 let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1073 (inviting_user_id, user.id, true)
1074 } else {
1075 (user.id, inviting_user_id, false)
1076 };
1077
1078 contact::Entity::insert(contact::ActiveModel {
1079 user_id_a: ActiveValue::set(user_id_a),
1080 user_id_b: ActiveValue::set(user_id_b),
1081 a_to_b: ActiveValue::set(a_to_b),
1082 should_notify: ActiveValue::set(true),
1083 accepted: ActiveValue::set(true),
1084 ..Default::default()
1085 })
1086 .on_conflict(OnConflict::new().do_nothing().to_owned())
1087 .exec_without_returning(&*tx)
1088 .await?;
1089 }
1090
1091 Ok(Some(NewUserResult {
1092 user_id: user.id,
1093 metrics_id: user.metrics_id.to_string(),
1094 inviting_user_id: signup.inviting_user_id,
1095 signup_device_id: signup.device_id,
1096 }))
1097 })
1098 .await
1099 }
1100
1101 pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1102 self.transaction(|tx| async move {
1103 if count > 0 {
1104 user::Entity::update_many()
1105 .filter(
1106 user::Column::Id
1107 .eq(id)
1108 .and(user::Column::InviteCode.is_null()),
1109 )
1110 .set(user::ActiveModel {
1111 invite_code: ActiveValue::set(Some(random_invite_code())),
1112 ..Default::default()
1113 })
1114 .exec(&*tx)
1115 .await?;
1116 }
1117
1118 user::Entity::update_many()
1119 .filter(user::Column::Id.eq(id))
1120 .set(user::ActiveModel {
1121 invite_count: ActiveValue::set(count),
1122 ..Default::default()
1123 })
1124 .exec(&*tx)
1125 .await?;
1126 Ok(())
1127 })
1128 .await
1129 }
1130
1131 pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1132 self.transaction(|tx| async move {
1133 match user::Entity::find_by_id(id).one(&*tx).await? {
1134 Some(user) if user.invite_code.is_some() => {
1135 Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1136 }
1137 _ => Ok(None),
1138 }
1139 })
1140 .await
1141 }
1142
1143 pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1144 self.transaction(|tx| async move {
1145 user::Entity::find()
1146 .filter(user::Column::InviteCode.eq(code))
1147 .one(&*tx)
1148 .await?
1149 .ok_or_else(|| {
1150 Error::Http(
1151 StatusCode::NOT_FOUND,
1152 "that invite code does not exist".to_string(),
1153 )
1154 })
1155 })
1156 .await
1157 }
1158
1159 // rooms
1160
1161 pub async fn incoming_call_for_user(
1162 &self,
1163 user_id: UserId,
1164 ) -> Result<Option<proto::IncomingCall>> {
1165 self.transaction(|tx| async move {
1166 let pending_participant = room_participant::Entity::find()
1167 .filter(
1168 room_participant::Column::UserId
1169 .eq(user_id)
1170 .and(room_participant::Column::AnsweringConnectionId.is_null()),
1171 )
1172 .one(&*tx)
1173 .await?;
1174
1175 if let Some(pending_participant) = pending_participant {
1176 let room = self.get_room(pending_participant.room_id, &tx).await?;
1177 Ok(Self::build_incoming_call(&room, user_id))
1178 } else {
1179 Ok(None)
1180 }
1181 })
1182 .await
1183 }
1184
1185 pub async fn create_room(
1186 &self,
1187 user_id: UserId,
1188 connection: ConnectionId,
1189 live_kit_room: &str,
1190 ) -> Result<proto::Room> {
1191 self.transaction(|tx| async move {
1192 let room = room::ActiveModel {
1193 live_kit_room: ActiveValue::set(live_kit_room.into()),
1194 ..Default::default()
1195 }
1196 .insert(&*tx)
1197 .await?;
1198 room_participant::ActiveModel {
1199 room_id: ActiveValue::set(room.id),
1200 user_id: ActiveValue::set(user_id),
1201 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1202 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1203 connection.owner_id as i32,
1204 ))),
1205 answering_connection_lost: ActiveValue::set(false),
1206 calling_user_id: ActiveValue::set(user_id),
1207 calling_connection_id: ActiveValue::set(connection.id as i32),
1208 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1209 connection.owner_id as i32,
1210 ))),
1211 ..Default::default()
1212 }
1213 .insert(&*tx)
1214 .await?;
1215
1216 let room = self.get_room(room.id, &tx).await?;
1217 Ok(room)
1218 })
1219 .await
1220 }
1221
1222 pub async fn call(
1223 &self,
1224 room_id: RoomId,
1225 calling_user_id: UserId,
1226 calling_connection: ConnectionId,
1227 called_user_id: UserId,
1228 initial_project_id: Option<ProjectId>,
1229 ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1230 self.room_transaction(room_id, |tx| async move {
1231 room_participant::ActiveModel {
1232 room_id: ActiveValue::set(room_id),
1233 user_id: ActiveValue::set(called_user_id),
1234 answering_connection_lost: ActiveValue::set(false),
1235 calling_user_id: ActiveValue::set(calling_user_id),
1236 calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1237 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1238 calling_connection.owner_id as i32,
1239 ))),
1240 initial_project_id: ActiveValue::set(initial_project_id),
1241 ..Default::default()
1242 }
1243 .insert(&*tx)
1244 .await?;
1245
1246 let room = self.get_room(room_id, &tx).await?;
1247 let incoming_call = Self::build_incoming_call(&room, called_user_id)
1248 .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1249 Ok((room, incoming_call))
1250 })
1251 .await
1252 }
1253
1254 pub async fn call_failed(
1255 &self,
1256 room_id: RoomId,
1257 called_user_id: UserId,
1258 ) -> Result<RoomGuard<proto::Room>> {
1259 self.room_transaction(room_id, |tx| async move {
1260 room_participant::Entity::delete_many()
1261 .filter(
1262 room_participant::Column::RoomId
1263 .eq(room_id)
1264 .and(room_participant::Column::UserId.eq(called_user_id)),
1265 )
1266 .exec(&*tx)
1267 .await?;
1268 let room = self.get_room(room_id, &tx).await?;
1269 Ok(room)
1270 })
1271 .await
1272 }
1273
1274 pub async fn decline_call(
1275 &self,
1276 expected_room_id: Option<RoomId>,
1277 user_id: UserId,
1278 ) -> Result<Option<RoomGuard<proto::Room>>> {
1279 self.optional_room_transaction(|tx| async move {
1280 let mut filter = Condition::all()
1281 .add(room_participant::Column::UserId.eq(user_id))
1282 .add(room_participant::Column::AnsweringConnectionId.is_null());
1283 if let Some(room_id) = expected_room_id {
1284 filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1285 }
1286 let participant = room_participant::Entity::find()
1287 .filter(filter)
1288 .one(&*tx)
1289 .await?;
1290
1291 let participant = if let Some(participant) = participant {
1292 participant
1293 } else if expected_room_id.is_some() {
1294 return Err(anyhow!("could not find call to decline"))?;
1295 } else {
1296 return Ok(None);
1297 };
1298
1299 let room_id = participant.room_id;
1300 room_participant::Entity::delete(participant.into_active_model())
1301 .exec(&*tx)
1302 .await?;
1303
1304 let room = self.get_room(room_id, &tx).await?;
1305 Ok(Some((room_id, room)))
1306 })
1307 .await
1308 }
1309
1310 pub async fn cancel_call(
1311 &self,
1312 room_id: RoomId,
1313 calling_connection: ConnectionId,
1314 called_user_id: UserId,
1315 ) -> Result<RoomGuard<proto::Room>> {
1316 self.room_transaction(room_id, |tx| async move {
1317 let participant = room_participant::Entity::find()
1318 .filter(
1319 Condition::all()
1320 .add(room_participant::Column::UserId.eq(called_user_id))
1321 .add(room_participant::Column::RoomId.eq(room_id))
1322 .add(
1323 room_participant::Column::CallingConnectionId
1324 .eq(calling_connection.id as i32),
1325 )
1326 .add(
1327 room_participant::Column::CallingConnectionServerId
1328 .eq(calling_connection.owner_id as i32),
1329 )
1330 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1331 )
1332 .one(&*tx)
1333 .await?
1334 .ok_or_else(|| anyhow!("no call to cancel"))?;
1335
1336 room_participant::Entity::delete(participant.into_active_model())
1337 .exec(&*tx)
1338 .await?;
1339
1340 let room = self.get_room(room_id, &tx).await?;
1341 Ok(room)
1342 })
1343 .await
1344 }
1345
1346 pub async fn is_current_room_different_channel(
1347 &self,
1348 user_id: UserId,
1349 channel_id: ChannelId,
1350 ) -> Result<bool> {
1351 self.transaction(|tx| async move {
1352 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1353 enum QueryAs {
1354 ChannelId,
1355 }
1356
1357 let channel_id_model: Option<ChannelId> = room_participant::Entity::find()
1358 .select_only()
1359 .column_as(room::Column::ChannelId, QueryAs::ChannelId)
1360 .inner_join(room::Entity)
1361 .filter(room_participant::Column::UserId.eq(user_id))
1362 .into_values::<_, QueryAs>()
1363 .one(&*tx)
1364 .await?;
1365
1366 let result = channel_id_model
1367 .map(|channel_id_model| channel_id_model != channel_id)
1368 .unwrap_or(false);
1369
1370 Ok(result)
1371 })
1372 .await
1373 }
1374
1375 pub async fn join_room(
1376 &self,
1377 room_id: RoomId,
1378 user_id: UserId,
1379 channel_id: Option<ChannelId>,
1380 connection: ConnectionId,
1381 ) -> Result<RoomGuard<JoinRoom>> {
1382 self.room_transaction(room_id, |tx| async move {
1383 if let Some(channel_id) = channel_id {
1384 channel_member::Entity::find()
1385 .filter(
1386 channel_member::Column::ChannelId
1387 .eq(channel_id)
1388 .and(channel_member::Column::UserId.eq(user_id))
1389 .and(channel_member::Column::Accepted.eq(true)),
1390 )
1391 .one(&*tx)
1392 .await?
1393 .ok_or_else(|| anyhow!("no such channel membership"))?;
1394
1395 room_participant::ActiveModel {
1396 room_id: ActiveValue::set(room_id),
1397 user_id: ActiveValue::set(user_id),
1398 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1399 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1400 connection.owner_id as i32,
1401 ))),
1402 answering_connection_lost: ActiveValue::set(false),
1403 // Redundant for the channel join use case, used for channel and call invitations
1404 calling_user_id: ActiveValue::set(user_id),
1405 calling_connection_id: ActiveValue::set(connection.id as i32),
1406 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1407 connection.owner_id as i32,
1408 ))),
1409 ..Default::default()
1410 }
1411 .insert(&*tx)
1412 .await?;
1413 } else {
1414 let result = room_participant::Entity::update_many()
1415 .filter(
1416 Condition::all()
1417 .add(room_participant::Column::RoomId.eq(room_id))
1418 .add(room_participant::Column::UserId.eq(user_id))
1419 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1420 )
1421 .set(room_participant::ActiveModel {
1422 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1423 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1424 connection.owner_id as i32,
1425 ))),
1426 answering_connection_lost: ActiveValue::set(false),
1427 ..Default::default()
1428 })
1429 .exec(&*tx)
1430 .await?;
1431 if result.rows_affected == 0 {
1432 Err(anyhow!("room does not exist or was already joined"))?;
1433 }
1434 }
1435
1436 let room = self.get_room(room_id, &tx).await?;
1437 let channel_members = if let Some(channel_id) = channel_id {
1438 self.get_channel_members_internal(channel_id, &tx).await?
1439 } else {
1440 Vec::new()
1441 };
1442 Ok(JoinRoom {
1443 room,
1444 channel_id,
1445 channel_members,
1446 })
1447 })
1448 .await
1449 }
1450
1451 pub async fn rejoin_room(
1452 &self,
1453 rejoin_room: proto::RejoinRoom,
1454 user_id: UserId,
1455 connection: ConnectionId,
1456 ) -> Result<RoomGuard<RejoinedRoom>> {
1457 let room_id = RoomId::from_proto(rejoin_room.id);
1458 self.room_transaction(room_id, |tx| async {
1459 let tx = tx;
1460 let participant_update = room_participant::Entity::update_many()
1461 .filter(
1462 Condition::all()
1463 .add(room_participant::Column::RoomId.eq(room_id))
1464 .add(room_participant::Column::UserId.eq(user_id))
1465 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1466 .add(
1467 Condition::any()
1468 .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1469 .add(
1470 room_participant::Column::AnsweringConnectionServerId
1471 .ne(connection.owner_id as i32),
1472 ),
1473 ),
1474 )
1475 .set(room_participant::ActiveModel {
1476 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1477 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1478 connection.owner_id as i32,
1479 ))),
1480 answering_connection_lost: ActiveValue::set(false),
1481 ..Default::default()
1482 })
1483 .exec(&*tx)
1484 .await?;
1485 if participant_update.rows_affected == 0 {
1486 return Err(anyhow!("room does not exist or was already joined"))?;
1487 }
1488
1489 let mut reshared_projects = Vec::new();
1490 for reshared_project in &rejoin_room.reshared_projects {
1491 let project_id = ProjectId::from_proto(reshared_project.project_id);
1492 let project = project::Entity::find_by_id(project_id)
1493 .one(&*tx)
1494 .await?
1495 .ok_or_else(|| anyhow!("project does not exist"))?;
1496 if project.host_user_id != user_id {
1497 return Err(anyhow!("no such project"))?;
1498 }
1499
1500 let mut collaborators = project
1501 .find_related(project_collaborator::Entity)
1502 .all(&*tx)
1503 .await?;
1504 let host_ix = collaborators
1505 .iter()
1506 .position(|collaborator| {
1507 collaborator.user_id == user_id && collaborator.is_host
1508 })
1509 .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1510 let host = collaborators.swap_remove(host_ix);
1511 let old_connection_id = host.connection();
1512
1513 project::Entity::update(project::ActiveModel {
1514 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1515 host_connection_server_id: ActiveValue::set(Some(ServerId(
1516 connection.owner_id as i32,
1517 ))),
1518 ..project.into_active_model()
1519 })
1520 .exec(&*tx)
1521 .await?;
1522 project_collaborator::Entity::update(project_collaborator::ActiveModel {
1523 connection_id: ActiveValue::set(connection.id as i32),
1524 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1525 ..host.into_active_model()
1526 })
1527 .exec(&*tx)
1528 .await?;
1529
1530 self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1531 .await?;
1532
1533 reshared_projects.push(ResharedProject {
1534 id: project_id,
1535 old_connection_id,
1536 collaborators: collaborators
1537 .iter()
1538 .map(|collaborator| ProjectCollaborator {
1539 connection_id: collaborator.connection(),
1540 user_id: collaborator.user_id,
1541 replica_id: collaborator.replica_id,
1542 is_host: collaborator.is_host,
1543 })
1544 .collect(),
1545 worktrees: reshared_project.worktrees.clone(),
1546 });
1547 }
1548
1549 project::Entity::delete_many()
1550 .filter(
1551 Condition::all()
1552 .add(project::Column::RoomId.eq(room_id))
1553 .add(project::Column::HostUserId.eq(user_id))
1554 .add(
1555 project::Column::Id
1556 .is_not_in(reshared_projects.iter().map(|project| project.id)),
1557 ),
1558 )
1559 .exec(&*tx)
1560 .await?;
1561
1562 let mut rejoined_projects = Vec::new();
1563 for rejoined_project in &rejoin_room.rejoined_projects {
1564 let project_id = ProjectId::from_proto(rejoined_project.id);
1565 let Some(project) = project::Entity::find_by_id(project_id)
1566 .one(&*tx)
1567 .await? else { continue };
1568
1569 let mut worktrees = Vec::new();
1570 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1571 for db_worktree in db_worktrees {
1572 let mut worktree = RejoinedWorktree {
1573 id: db_worktree.id as u64,
1574 abs_path: db_worktree.abs_path,
1575 root_name: db_worktree.root_name,
1576 visible: db_worktree.visible,
1577 updated_entries: Default::default(),
1578 removed_entries: Default::default(),
1579 updated_repositories: Default::default(),
1580 removed_repositories: Default::default(),
1581 diagnostic_summaries: Default::default(),
1582 settings_files: Default::default(),
1583 scan_id: db_worktree.scan_id as u64,
1584 completed_scan_id: db_worktree.completed_scan_id as u64,
1585 };
1586
1587 let rejoined_worktree = rejoined_project
1588 .worktrees
1589 .iter()
1590 .find(|worktree| worktree.id == db_worktree.id as u64);
1591
1592 // File entries
1593 {
1594 let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1595 worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1596 } else {
1597 worktree_entry::Column::IsDeleted.eq(false)
1598 };
1599
1600 let mut db_entries = worktree_entry::Entity::find()
1601 .filter(
1602 Condition::all()
1603 .add(worktree_entry::Column::ProjectId.eq(project.id))
1604 .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1605 .add(entry_filter),
1606 )
1607 .stream(&*tx)
1608 .await?;
1609
1610 while let Some(db_entry) = db_entries.next().await {
1611 let db_entry = db_entry?;
1612 if db_entry.is_deleted {
1613 worktree.removed_entries.push(db_entry.id as u64);
1614 } else {
1615 worktree.updated_entries.push(proto::Entry {
1616 id: db_entry.id as u64,
1617 is_dir: db_entry.is_dir,
1618 path: db_entry.path,
1619 inode: db_entry.inode as u64,
1620 mtime: Some(proto::Timestamp {
1621 seconds: db_entry.mtime_seconds as u64,
1622 nanos: db_entry.mtime_nanos as u32,
1623 }),
1624 is_symlink: db_entry.is_symlink,
1625 is_ignored: db_entry.is_ignored,
1626 is_external: db_entry.is_external,
1627 git_status: db_entry.git_status.map(|status| status as i32),
1628 });
1629 }
1630 }
1631 }
1632
1633 // Repository Entries
1634 {
1635 let repository_entry_filter =
1636 if let Some(rejoined_worktree) = rejoined_worktree {
1637 worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
1638 } else {
1639 worktree_repository::Column::IsDeleted.eq(false)
1640 };
1641
1642 let mut db_repositories = worktree_repository::Entity::find()
1643 .filter(
1644 Condition::all()
1645 .add(worktree_repository::Column::ProjectId.eq(project.id))
1646 .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
1647 .add(repository_entry_filter),
1648 )
1649 .stream(&*tx)
1650 .await?;
1651
1652 while let Some(db_repository) = db_repositories.next().await {
1653 let db_repository = db_repository?;
1654 if db_repository.is_deleted {
1655 worktree
1656 .removed_repositories
1657 .push(db_repository.work_directory_id as u64);
1658 } else {
1659 worktree.updated_repositories.push(proto::RepositoryEntry {
1660 work_directory_id: db_repository.work_directory_id as u64,
1661 branch: db_repository.branch,
1662 });
1663 }
1664 }
1665 }
1666
1667 worktrees.push(worktree);
1668 }
1669
1670 let language_servers = project
1671 .find_related(language_server::Entity)
1672 .all(&*tx)
1673 .await?
1674 .into_iter()
1675 .map(|language_server| proto::LanguageServer {
1676 id: language_server.id as u64,
1677 name: language_server.name,
1678 })
1679 .collect::<Vec<_>>();
1680
1681 {
1682 let mut db_settings_files = worktree_settings_file::Entity::find()
1683 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
1684 .stream(&*tx)
1685 .await?;
1686 while let Some(db_settings_file) = db_settings_files.next().await {
1687 let db_settings_file = db_settings_file?;
1688 if let Some(worktree) = worktrees
1689 .iter_mut()
1690 .find(|w| w.id == db_settings_file.worktree_id as u64)
1691 {
1692 worktree.settings_files.push(WorktreeSettingsFile {
1693 path: db_settings_file.path,
1694 content: db_settings_file.content,
1695 });
1696 }
1697 }
1698 }
1699
1700 let mut collaborators = project
1701 .find_related(project_collaborator::Entity)
1702 .all(&*tx)
1703 .await?;
1704 let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1705 .iter()
1706 .position(|collaborator| collaborator.user_id == user_id)
1707 {
1708 collaborators.swap_remove(self_collaborator_ix)
1709 } else {
1710 continue;
1711 };
1712 let old_connection_id = self_collaborator.connection();
1713 project_collaborator::Entity::update(project_collaborator::ActiveModel {
1714 connection_id: ActiveValue::set(connection.id as i32),
1715 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1716 ..self_collaborator.into_active_model()
1717 })
1718 .exec(&*tx)
1719 .await?;
1720
1721 let collaborators = collaborators
1722 .into_iter()
1723 .map(|collaborator| ProjectCollaborator {
1724 connection_id: collaborator.connection(),
1725 user_id: collaborator.user_id,
1726 replica_id: collaborator.replica_id,
1727 is_host: collaborator.is_host,
1728 })
1729 .collect::<Vec<_>>();
1730
1731 rejoined_projects.push(RejoinedProject {
1732 id: project_id,
1733 old_connection_id,
1734 collaborators,
1735 worktrees,
1736 language_servers,
1737 });
1738 }
1739
1740 let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
1741
1742 let channel_members = if let Some(channel_id) = channel_id {
1743 self.get_channel_members_internal(channel_id, &tx).await?
1744 } else {
1745 Vec::new()
1746 };
1747
1748 Ok(RejoinedRoom {
1749 room,
1750 channel_id,
1751 channel_members,
1752 rejoined_projects,
1753 reshared_projects,
1754 })
1755 })
1756 .await
1757 }
1758
1759 pub async fn leave_room(
1760 &self,
1761 connection: ConnectionId,
1762 ) -> Result<Option<RoomGuard<LeftRoom>>> {
1763 self.optional_room_transaction(|tx| async move {
1764 let leaving_participant = room_participant::Entity::find()
1765 .filter(
1766 Condition::all()
1767 .add(
1768 room_participant::Column::AnsweringConnectionId
1769 .eq(connection.id as i32),
1770 )
1771 .add(
1772 room_participant::Column::AnsweringConnectionServerId
1773 .eq(connection.owner_id as i32),
1774 ),
1775 )
1776 .one(&*tx)
1777 .await?;
1778
1779 if let Some(leaving_participant) = leaving_participant {
1780 // Leave room.
1781 let room_id = leaving_participant.room_id;
1782 room_participant::Entity::delete_by_id(leaving_participant.id)
1783 .exec(&*tx)
1784 .await?;
1785
1786 // Cancel pending calls initiated by the leaving user.
1787 let called_participants = room_participant::Entity::find()
1788 .filter(
1789 Condition::all()
1790 .add(
1791 room_participant::Column::CallingUserId
1792 .eq(leaving_participant.user_id),
1793 )
1794 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1795 )
1796 .all(&*tx)
1797 .await?;
1798 room_participant::Entity::delete_many()
1799 .filter(
1800 room_participant::Column::Id
1801 .is_in(called_participants.iter().map(|participant| participant.id)),
1802 )
1803 .exec(&*tx)
1804 .await?;
1805 let canceled_calls_to_user_ids = called_participants
1806 .into_iter()
1807 .map(|participant| participant.user_id)
1808 .collect();
1809
1810 // Detect left projects.
1811 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1812 enum QueryProjectIds {
1813 ProjectId,
1814 }
1815 let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1816 .select_only()
1817 .column_as(
1818 project_collaborator::Column::ProjectId,
1819 QueryProjectIds::ProjectId,
1820 )
1821 .filter(
1822 Condition::all()
1823 .add(
1824 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1825 )
1826 .add(
1827 project_collaborator::Column::ConnectionServerId
1828 .eq(connection.owner_id as i32),
1829 ),
1830 )
1831 .into_values::<_, QueryProjectIds>()
1832 .all(&*tx)
1833 .await?;
1834 let mut left_projects = HashMap::default();
1835 let mut collaborators = project_collaborator::Entity::find()
1836 .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1837 .stream(&*tx)
1838 .await?;
1839 while let Some(collaborator) = collaborators.next().await {
1840 let collaborator = collaborator?;
1841 let left_project =
1842 left_projects
1843 .entry(collaborator.project_id)
1844 .or_insert(LeftProject {
1845 id: collaborator.project_id,
1846 host_user_id: Default::default(),
1847 connection_ids: Default::default(),
1848 host_connection_id: Default::default(),
1849 });
1850
1851 let collaborator_connection_id = collaborator.connection();
1852 if collaborator_connection_id != connection {
1853 left_project.connection_ids.push(collaborator_connection_id);
1854 }
1855
1856 if collaborator.is_host {
1857 left_project.host_user_id = collaborator.user_id;
1858 left_project.host_connection_id = collaborator_connection_id;
1859 }
1860 }
1861 drop(collaborators);
1862
1863 // Leave projects.
1864 project_collaborator::Entity::delete_many()
1865 .filter(
1866 Condition::all()
1867 .add(
1868 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1869 )
1870 .add(
1871 project_collaborator::Column::ConnectionServerId
1872 .eq(connection.owner_id as i32),
1873 ),
1874 )
1875 .exec(&*tx)
1876 .await?;
1877
1878 // Unshare projects.
1879 project::Entity::delete_many()
1880 .filter(
1881 Condition::all()
1882 .add(project::Column::RoomId.eq(room_id))
1883 .add(project::Column::HostConnectionId.eq(connection.id as i32))
1884 .add(
1885 project::Column::HostConnectionServerId
1886 .eq(connection.owner_id as i32),
1887 ),
1888 )
1889 .exec(&*tx)
1890 .await?;
1891
1892 let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
1893 let deleted = if room.participants.is_empty() {
1894 let result = room::Entity::delete_by_id(room_id)
1895 .filter(room::Column::ChannelId.is_null())
1896 .exec(&*tx)
1897 .await?;
1898 result.rows_affected > 0
1899 } else {
1900 false
1901 };
1902
1903 let channel_members = if let Some(channel_id) = channel_id {
1904 self.get_channel_members_internal(channel_id, &tx).await?
1905 } else {
1906 Vec::new()
1907 };
1908 let left_room = LeftRoom {
1909 room,
1910 channel_id,
1911 channel_members,
1912 left_projects,
1913 canceled_calls_to_user_ids,
1914 deleted,
1915 };
1916
1917 if left_room.room.participants.is_empty() {
1918 self.rooms.remove(&room_id);
1919 }
1920
1921 Ok(Some((room_id, left_room)))
1922 } else {
1923 Ok(None)
1924 }
1925 })
1926 .await
1927 }
1928
1929 pub async fn follow(
1930 &self,
1931 project_id: ProjectId,
1932 leader_connection: ConnectionId,
1933 follower_connection: ConnectionId,
1934 ) -> Result<RoomGuard<proto::Room>> {
1935 let room_id = self.room_id_for_project(project_id).await?;
1936 self.room_transaction(room_id, |tx| async move {
1937 follower::ActiveModel {
1938 room_id: ActiveValue::set(room_id),
1939 project_id: ActiveValue::set(project_id),
1940 leader_connection_server_id: ActiveValue::set(ServerId(
1941 leader_connection.owner_id as i32,
1942 )),
1943 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1944 follower_connection_server_id: ActiveValue::set(ServerId(
1945 follower_connection.owner_id as i32,
1946 )),
1947 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1948 ..Default::default()
1949 }
1950 .insert(&*tx)
1951 .await?;
1952
1953 let room = self.get_room(room_id, &*tx).await?;
1954 Ok(room)
1955 })
1956 .await
1957 }
1958
1959 pub async fn unfollow(
1960 &self,
1961 project_id: ProjectId,
1962 leader_connection: ConnectionId,
1963 follower_connection: ConnectionId,
1964 ) -> Result<RoomGuard<proto::Room>> {
1965 let room_id = self.room_id_for_project(project_id).await?;
1966 self.room_transaction(room_id, |tx| async move {
1967 follower::Entity::delete_many()
1968 .filter(
1969 Condition::all()
1970 .add(follower::Column::ProjectId.eq(project_id))
1971 .add(
1972 follower::Column::LeaderConnectionServerId
1973 .eq(leader_connection.owner_id),
1974 )
1975 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1976 .add(
1977 follower::Column::FollowerConnectionServerId
1978 .eq(follower_connection.owner_id),
1979 )
1980 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1981 )
1982 .exec(&*tx)
1983 .await?;
1984
1985 let room = self.get_room(room_id, &*tx).await?;
1986 Ok(room)
1987 })
1988 .await
1989 }
1990
1991 pub async fn update_room_participant_location(
1992 &self,
1993 room_id: RoomId,
1994 connection: ConnectionId,
1995 location: proto::ParticipantLocation,
1996 ) -> Result<RoomGuard<proto::Room>> {
1997 self.room_transaction(room_id, |tx| async {
1998 let tx = tx;
1999 let location_kind;
2000 let location_project_id;
2001 match location
2002 .variant
2003 .as_ref()
2004 .ok_or_else(|| anyhow!("invalid location"))?
2005 {
2006 proto::participant_location::Variant::SharedProject(project) => {
2007 location_kind = 0;
2008 location_project_id = Some(ProjectId::from_proto(project.id));
2009 }
2010 proto::participant_location::Variant::UnsharedProject(_) => {
2011 location_kind = 1;
2012 location_project_id = None;
2013 }
2014 proto::participant_location::Variant::External(_) => {
2015 location_kind = 2;
2016 location_project_id = None;
2017 }
2018 }
2019
2020 let result = room_participant::Entity::update_many()
2021 .filter(
2022 Condition::all()
2023 .add(room_participant::Column::RoomId.eq(room_id))
2024 .add(
2025 room_participant::Column::AnsweringConnectionId
2026 .eq(connection.id as i32),
2027 )
2028 .add(
2029 room_participant::Column::AnsweringConnectionServerId
2030 .eq(connection.owner_id as i32),
2031 ),
2032 )
2033 .set(room_participant::ActiveModel {
2034 location_kind: ActiveValue::set(Some(location_kind)),
2035 location_project_id: ActiveValue::set(location_project_id),
2036 ..Default::default()
2037 })
2038 .exec(&*tx)
2039 .await?;
2040
2041 if result.rows_affected == 1 {
2042 let room = self.get_room(room_id, &tx).await?;
2043 Ok(room)
2044 } else {
2045 Err(anyhow!("could not update room participant location"))?
2046 }
2047 })
2048 .await
2049 }
2050
2051 pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
2052 self.transaction(|tx| async move {
2053 let participant = room_participant::Entity::find()
2054 .filter(
2055 Condition::all()
2056 .add(
2057 room_participant::Column::AnsweringConnectionId
2058 .eq(connection.id as i32),
2059 )
2060 .add(
2061 room_participant::Column::AnsweringConnectionServerId
2062 .eq(connection.owner_id as i32),
2063 ),
2064 )
2065 .one(&*tx)
2066 .await?
2067 .ok_or_else(|| anyhow!("not a participant in any room"))?;
2068
2069 room_participant::Entity::update(room_participant::ActiveModel {
2070 answering_connection_lost: ActiveValue::set(true),
2071 ..participant.into_active_model()
2072 })
2073 .exec(&*tx)
2074 .await?;
2075
2076 Ok(())
2077 })
2078 .await
2079 }
2080
2081 fn build_incoming_call(
2082 room: &proto::Room,
2083 called_user_id: UserId,
2084 ) -> Option<proto::IncomingCall> {
2085 let pending_participant = room
2086 .pending_participants
2087 .iter()
2088 .find(|participant| participant.user_id == called_user_id.to_proto())?;
2089
2090 Some(proto::IncomingCall {
2091 room_id: room.id,
2092 calling_user_id: pending_participant.calling_user_id,
2093 participant_user_ids: room
2094 .participants
2095 .iter()
2096 .map(|participant| participant.user_id)
2097 .collect(),
2098 initial_project: room.participants.iter().find_map(|participant| {
2099 let initial_project_id = pending_participant.initial_project_id?;
2100 participant
2101 .projects
2102 .iter()
2103 .find(|project| project.id == initial_project_id)
2104 .cloned()
2105 }),
2106 })
2107 }
2108 async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
2109 let (_, room) = self.get_channel_room(room_id, tx).await?;
2110 Ok(room)
2111 }
2112
2113 async fn get_channel_room(
2114 &self,
2115 room_id: RoomId,
2116 tx: &DatabaseTransaction,
2117 ) -> Result<(Option<ChannelId>, proto::Room)> {
2118 let db_room = room::Entity::find_by_id(room_id)
2119 .one(tx)
2120 .await?
2121 .ok_or_else(|| anyhow!("could not find room"))?;
2122
2123 let mut db_participants = db_room
2124 .find_related(room_participant::Entity)
2125 .stream(tx)
2126 .await?;
2127 let mut participants = HashMap::default();
2128 let mut pending_participants = Vec::new();
2129 while let Some(db_participant) = db_participants.next().await {
2130 let db_participant = db_participant?;
2131 if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
2132 .answering_connection_id
2133 .zip(db_participant.answering_connection_server_id)
2134 {
2135 let location = match (
2136 db_participant.location_kind,
2137 db_participant.location_project_id,
2138 ) {
2139 (Some(0), Some(project_id)) => {
2140 Some(proto::participant_location::Variant::SharedProject(
2141 proto::participant_location::SharedProject {
2142 id: project_id.to_proto(),
2143 },
2144 ))
2145 }
2146 (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
2147 Default::default(),
2148 )),
2149 _ => Some(proto::participant_location::Variant::External(
2150 Default::default(),
2151 )),
2152 };
2153
2154 let answering_connection = ConnectionId {
2155 owner_id: answering_connection_server_id.0 as u32,
2156 id: answering_connection_id as u32,
2157 };
2158 participants.insert(
2159 answering_connection,
2160 proto::Participant {
2161 user_id: db_participant.user_id.to_proto(),
2162 peer_id: Some(answering_connection.into()),
2163 projects: Default::default(),
2164 location: Some(proto::ParticipantLocation { variant: location }),
2165 },
2166 );
2167 } else {
2168 pending_participants.push(proto::PendingParticipant {
2169 user_id: db_participant.user_id.to_proto(),
2170 calling_user_id: db_participant.calling_user_id.to_proto(),
2171 initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
2172 });
2173 }
2174 }
2175 drop(db_participants);
2176
2177 let mut db_projects = db_room
2178 .find_related(project::Entity)
2179 .find_with_related(worktree::Entity)
2180 .stream(tx)
2181 .await?;
2182
2183 while let Some(row) = db_projects.next().await {
2184 let (db_project, db_worktree) = row?;
2185 let host_connection = db_project.host_connection()?;
2186 if let Some(participant) = participants.get_mut(&host_connection) {
2187 let project = if let Some(project) = participant
2188 .projects
2189 .iter_mut()
2190 .find(|project| project.id == db_project.id.to_proto())
2191 {
2192 project
2193 } else {
2194 participant.projects.push(proto::ParticipantProject {
2195 id: db_project.id.to_proto(),
2196 worktree_root_names: Default::default(),
2197 });
2198 participant.projects.last_mut().unwrap()
2199 };
2200
2201 if let Some(db_worktree) = db_worktree {
2202 if db_worktree.visible {
2203 project.worktree_root_names.push(db_worktree.root_name);
2204 }
2205 }
2206 }
2207 }
2208 drop(db_projects);
2209
2210 let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
2211 let mut followers = Vec::new();
2212 while let Some(db_follower) = db_followers.next().await {
2213 let db_follower = db_follower?;
2214 followers.push(proto::Follower {
2215 leader_id: Some(db_follower.leader_connection().into()),
2216 follower_id: Some(db_follower.follower_connection().into()),
2217 project_id: db_follower.project_id.to_proto(),
2218 });
2219 }
2220
2221 Ok((
2222 db_room.channel_id,
2223 proto::Room {
2224 id: db_room.id.to_proto(),
2225 live_kit_room: db_room.live_kit_room,
2226 participants: participants.into_values().collect(),
2227 pending_participants,
2228 followers,
2229 },
2230 ))
2231 }
2232
2233 // projects
2234
2235 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
2236 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2237 enum QueryAs {
2238 Count,
2239 }
2240
2241 self.transaction(|tx| async move {
2242 Ok(project::Entity::find()
2243 .select_only()
2244 .column_as(project::Column::Id.count(), QueryAs::Count)
2245 .inner_join(user::Entity)
2246 .filter(user::Column::Admin.eq(false))
2247 .into_values::<_, QueryAs>()
2248 .one(&*tx)
2249 .await?
2250 .unwrap_or(0i64) as usize)
2251 })
2252 .await
2253 }
2254
2255 pub async fn share_project(
2256 &self,
2257 room_id: RoomId,
2258 connection: ConnectionId,
2259 worktrees: &[proto::WorktreeMetadata],
2260 ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
2261 self.room_transaction(room_id, |tx| async move {
2262 let participant = room_participant::Entity::find()
2263 .filter(
2264 Condition::all()
2265 .add(
2266 room_participant::Column::AnsweringConnectionId
2267 .eq(connection.id as i32),
2268 )
2269 .add(
2270 room_participant::Column::AnsweringConnectionServerId
2271 .eq(connection.owner_id as i32),
2272 ),
2273 )
2274 .one(&*tx)
2275 .await?
2276 .ok_or_else(|| anyhow!("could not find participant"))?;
2277 if participant.room_id != room_id {
2278 return Err(anyhow!("shared project on unexpected room"))?;
2279 }
2280
2281 let project = project::ActiveModel {
2282 room_id: ActiveValue::set(participant.room_id),
2283 host_user_id: ActiveValue::set(participant.user_id),
2284 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
2285 host_connection_server_id: ActiveValue::set(Some(ServerId(
2286 connection.owner_id as i32,
2287 ))),
2288 ..Default::default()
2289 }
2290 .insert(&*tx)
2291 .await?;
2292
2293 if !worktrees.is_empty() {
2294 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2295 worktree::ActiveModel {
2296 id: ActiveValue::set(worktree.id as i64),
2297 project_id: ActiveValue::set(project.id),
2298 abs_path: ActiveValue::set(worktree.abs_path.clone()),
2299 root_name: ActiveValue::set(worktree.root_name.clone()),
2300 visible: ActiveValue::set(worktree.visible),
2301 scan_id: ActiveValue::set(0),
2302 completed_scan_id: ActiveValue::set(0),
2303 }
2304 }))
2305 .exec(&*tx)
2306 .await?;
2307 }
2308
2309 project_collaborator::ActiveModel {
2310 project_id: ActiveValue::set(project.id),
2311 connection_id: ActiveValue::set(connection.id as i32),
2312 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2313 user_id: ActiveValue::set(participant.user_id),
2314 replica_id: ActiveValue::set(ReplicaId(0)),
2315 is_host: ActiveValue::set(true),
2316 ..Default::default()
2317 }
2318 .insert(&*tx)
2319 .await?;
2320
2321 let room = self.get_room(room_id, &tx).await?;
2322 Ok((project.id, room))
2323 })
2324 .await
2325 }
2326
2327 pub async fn unshare_project(
2328 &self,
2329 project_id: ProjectId,
2330 connection: ConnectionId,
2331 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2332 let room_id = self.room_id_for_project(project_id).await?;
2333 self.room_transaction(room_id, |tx| async move {
2334 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2335
2336 let project = project::Entity::find_by_id(project_id)
2337 .one(&*tx)
2338 .await?
2339 .ok_or_else(|| anyhow!("project not found"))?;
2340 if project.host_connection()? == connection {
2341 project::Entity::delete(project.into_active_model())
2342 .exec(&*tx)
2343 .await?;
2344 let room = self.get_room(room_id, &tx).await?;
2345 Ok((room, guest_connection_ids))
2346 } else {
2347 Err(anyhow!("cannot unshare a project hosted by another user"))?
2348 }
2349 })
2350 .await
2351 }
2352
2353 pub async fn update_project(
2354 &self,
2355 project_id: ProjectId,
2356 connection: ConnectionId,
2357 worktrees: &[proto::WorktreeMetadata],
2358 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2359 let room_id = self.room_id_for_project(project_id).await?;
2360 self.room_transaction(room_id, |tx| async move {
2361 let project = project::Entity::find_by_id(project_id)
2362 .filter(
2363 Condition::all()
2364 .add(project::Column::HostConnectionId.eq(connection.id as i32))
2365 .add(
2366 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2367 ),
2368 )
2369 .one(&*tx)
2370 .await?
2371 .ok_or_else(|| anyhow!("no such project"))?;
2372
2373 self.update_project_worktrees(project.id, worktrees, &tx)
2374 .await?;
2375
2376 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2377 let room = self.get_room(project.room_id, &tx).await?;
2378 Ok((room, guest_connection_ids))
2379 })
2380 .await
2381 }
2382
2383 async fn update_project_worktrees(
2384 &self,
2385 project_id: ProjectId,
2386 worktrees: &[proto::WorktreeMetadata],
2387 tx: &DatabaseTransaction,
2388 ) -> Result<()> {
2389 if !worktrees.is_empty() {
2390 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2391 id: ActiveValue::set(worktree.id as i64),
2392 project_id: ActiveValue::set(project_id),
2393 abs_path: ActiveValue::set(worktree.abs_path.clone()),
2394 root_name: ActiveValue::set(worktree.root_name.clone()),
2395 visible: ActiveValue::set(worktree.visible),
2396 scan_id: ActiveValue::set(0),
2397 completed_scan_id: ActiveValue::set(0),
2398 }))
2399 .on_conflict(
2400 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2401 .update_column(worktree::Column::RootName)
2402 .to_owned(),
2403 )
2404 .exec(&*tx)
2405 .await?;
2406 }
2407
2408 worktree::Entity::delete_many()
2409 .filter(worktree::Column::ProjectId.eq(project_id).and(
2410 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2411 ))
2412 .exec(&*tx)
2413 .await?;
2414
2415 Ok(())
2416 }
2417
2418 pub async fn update_worktree(
2419 &self,
2420 update: &proto::UpdateWorktree,
2421 connection: ConnectionId,
2422 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2423 let project_id = ProjectId::from_proto(update.project_id);
2424 let worktree_id = update.worktree_id as i64;
2425 let room_id = self.room_id_for_project(project_id).await?;
2426 self.room_transaction(room_id, |tx| async move {
2427 // Ensure the update comes from the host.
2428 let _project = project::Entity::find_by_id(project_id)
2429 .filter(
2430 Condition::all()
2431 .add(project::Column::HostConnectionId.eq(connection.id as i32))
2432 .add(
2433 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2434 ),
2435 )
2436 .one(&*tx)
2437 .await?
2438 .ok_or_else(|| anyhow!("no such project"))?;
2439
2440 // Update metadata.
2441 worktree::Entity::update(worktree::ActiveModel {
2442 id: ActiveValue::set(worktree_id),
2443 project_id: ActiveValue::set(project_id),
2444 root_name: ActiveValue::set(update.root_name.clone()),
2445 scan_id: ActiveValue::set(update.scan_id as i64),
2446 completed_scan_id: if update.is_last_update {
2447 ActiveValue::set(update.scan_id as i64)
2448 } else {
2449 ActiveValue::default()
2450 },
2451 abs_path: ActiveValue::set(update.abs_path.clone()),
2452 ..Default::default()
2453 })
2454 .exec(&*tx)
2455 .await?;
2456
2457 if !update.updated_entries.is_empty() {
2458 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2459 let mtime = entry.mtime.clone().unwrap_or_default();
2460 worktree_entry::ActiveModel {
2461 project_id: ActiveValue::set(project_id),
2462 worktree_id: ActiveValue::set(worktree_id),
2463 id: ActiveValue::set(entry.id as i64),
2464 is_dir: ActiveValue::set(entry.is_dir),
2465 path: ActiveValue::set(entry.path.clone()),
2466 inode: ActiveValue::set(entry.inode as i64),
2467 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2468 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2469 is_symlink: ActiveValue::set(entry.is_symlink),
2470 is_ignored: ActiveValue::set(entry.is_ignored),
2471 is_external: ActiveValue::set(entry.is_external),
2472 git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
2473 is_deleted: ActiveValue::set(false),
2474 scan_id: ActiveValue::set(update.scan_id as i64),
2475 }
2476 }))
2477 .on_conflict(
2478 OnConflict::columns([
2479 worktree_entry::Column::ProjectId,
2480 worktree_entry::Column::WorktreeId,
2481 worktree_entry::Column::Id,
2482 ])
2483 .update_columns([
2484 worktree_entry::Column::IsDir,
2485 worktree_entry::Column::Path,
2486 worktree_entry::Column::Inode,
2487 worktree_entry::Column::MtimeSeconds,
2488 worktree_entry::Column::MtimeNanos,
2489 worktree_entry::Column::IsSymlink,
2490 worktree_entry::Column::IsIgnored,
2491 worktree_entry::Column::GitStatus,
2492 worktree_entry::Column::ScanId,
2493 ])
2494 .to_owned(),
2495 )
2496 .exec(&*tx)
2497 .await?;
2498 }
2499
2500 if !update.removed_entries.is_empty() {
2501 worktree_entry::Entity::update_many()
2502 .filter(
2503 worktree_entry::Column::ProjectId
2504 .eq(project_id)
2505 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2506 .and(
2507 worktree_entry::Column::Id
2508 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2509 ),
2510 )
2511 .set(worktree_entry::ActiveModel {
2512 is_deleted: ActiveValue::Set(true),
2513 scan_id: ActiveValue::Set(update.scan_id as i64),
2514 ..Default::default()
2515 })
2516 .exec(&*tx)
2517 .await?;
2518 }
2519
2520 if !update.updated_repositories.is_empty() {
2521 worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
2522 |repository| worktree_repository::ActiveModel {
2523 project_id: ActiveValue::set(project_id),
2524 worktree_id: ActiveValue::set(worktree_id),
2525 work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
2526 scan_id: ActiveValue::set(update.scan_id as i64),
2527 branch: ActiveValue::set(repository.branch.clone()),
2528 is_deleted: ActiveValue::set(false),
2529 },
2530 ))
2531 .on_conflict(
2532 OnConflict::columns([
2533 worktree_repository::Column::ProjectId,
2534 worktree_repository::Column::WorktreeId,
2535 worktree_repository::Column::WorkDirectoryId,
2536 ])
2537 .update_columns([
2538 worktree_repository::Column::ScanId,
2539 worktree_repository::Column::Branch,
2540 ])
2541 .to_owned(),
2542 )
2543 .exec(&*tx)
2544 .await?;
2545 }
2546
2547 if !update.removed_repositories.is_empty() {
2548 worktree_repository::Entity::update_many()
2549 .filter(
2550 worktree_repository::Column::ProjectId
2551 .eq(project_id)
2552 .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
2553 .and(
2554 worktree_repository::Column::WorkDirectoryId
2555 .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
2556 ),
2557 )
2558 .set(worktree_repository::ActiveModel {
2559 is_deleted: ActiveValue::Set(true),
2560 scan_id: ActiveValue::Set(update.scan_id as i64),
2561 ..Default::default()
2562 })
2563 .exec(&*tx)
2564 .await?;
2565 }
2566
2567 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2568 Ok(connection_ids)
2569 })
2570 .await
2571 }
2572
2573 pub async fn update_diagnostic_summary(
2574 &self,
2575 update: &proto::UpdateDiagnosticSummary,
2576 connection: ConnectionId,
2577 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2578 let project_id = ProjectId::from_proto(update.project_id);
2579 let worktree_id = update.worktree_id as i64;
2580 let room_id = self.room_id_for_project(project_id).await?;
2581 self.room_transaction(room_id, |tx| async move {
2582 let summary = update
2583 .summary
2584 .as_ref()
2585 .ok_or_else(|| anyhow!("invalid summary"))?;
2586
2587 // Ensure the update comes from the host.
2588 let project = project::Entity::find_by_id(project_id)
2589 .one(&*tx)
2590 .await?
2591 .ok_or_else(|| anyhow!("no such project"))?;
2592 if project.host_connection()? != connection {
2593 return Err(anyhow!("can't update a project hosted by someone else"))?;
2594 }
2595
2596 // Update summary.
2597 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2598 project_id: ActiveValue::set(project_id),
2599 worktree_id: ActiveValue::set(worktree_id),
2600 path: ActiveValue::set(summary.path.clone()),
2601 language_server_id: ActiveValue::set(summary.language_server_id as i64),
2602 error_count: ActiveValue::set(summary.error_count as i32),
2603 warning_count: ActiveValue::set(summary.warning_count as i32),
2604 ..Default::default()
2605 })
2606 .on_conflict(
2607 OnConflict::columns([
2608 worktree_diagnostic_summary::Column::ProjectId,
2609 worktree_diagnostic_summary::Column::WorktreeId,
2610 worktree_diagnostic_summary::Column::Path,
2611 ])
2612 .update_columns([
2613 worktree_diagnostic_summary::Column::LanguageServerId,
2614 worktree_diagnostic_summary::Column::ErrorCount,
2615 worktree_diagnostic_summary::Column::WarningCount,
2616 ])
2617 .to_owned(),
2618 )
2619 .exec(&*tx)
2620 .await?;
2621
2622 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2623 Ok(connection_ids)
2624 })
2625 .await
2626 }
2627
2628 pub async fn start_language_server(
2629 &self,
2630 update: &proto::StartLanguageServer,
2631 connection: ConnectionId,
2632 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2633 let project_id = ProjectId::from_proto(update.project_id);
2634 let room_id = self.room_id_for_project(project_id).await?;
2635 self.room_transaction(room_id, |tx| async move {
2636 let server = update
2637 .server
2638 .as_ref()
2639 .ok_or_else(|| anyhow!("invalid language server"))?;
2640
2641 // Ensure the update comes from the host.
2642 let project = project::Entity::find_by_id(project_id)
2643 .one(&*tx)
2644 .await?
2645 .ok_or_else(|| anyhow!("no such project"))?;
2646 if project.host_connection()? != connection {
2647 return Err(anyhow!("can't update a project hosted by someone else"))?;
2648 }
2649
2650 // Add the newly-started language server.
2651 language_server::Entity::insert(language_server::ActiveModel {
2652 project_id: ActiveValue::set(project_id),
2653 id: ActiveValue::set(server.id as i64),
2654 name: ActiveValue::set(server.name.clone()),
2655 ..Default::default()
2656 })
2657 .on_conflict(
2658 OnConflict::columns([
2659 language_server::Column::ProjectId,
2660 language_server::Column::Id,
2661 ])
2662 .update_column(language_server::Column::Name)
2663 .to_owned(),
2664 )
2665 .exec(&*tx)
2666 .await?;
2667
2668 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2669 Ok(connection_ids)
2670 })
2671 .await
2672 }
2673
2674 pub async fn update_worktree_settings(
2675 &self,
2676 update: &proto::UpdateWorktreeSettings,
2677 connection: ConnectionId,
2678 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2679 let project_id = ProjectId::from_proto(update.project_id);
2680 let room_id = self.room_id_for_project(project_id).await?;
2681 self.room_transaction(room_id, |tx| async move {
2682 // Ensure the update comes from the host.
2683 let project = project::Entity::find_by_id(project_id)
2684 .one(&*tx)
2685 .await?
2686 .ok_or_else(|| anyhow!("no such project"))?;
2687 if project.host_connection()? != connection {
2688 return Err(anyhow!("can't update a project hosted by someone else"))?;
2689 }
2690
2691 if let Some(content) = &update.content {
2692 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
2693 project_id: ActiveValue::Set(project_id),
2694 worktree_id: ActiveValue::Set(update.worktree_id as i64),
2695 path: ActiveValue::Set(update.path.clone()),
2696 content: ActiveValue::Set(content.clone()),
2697 })
2698 .on_conflict(
2699 OnConflict::columns([
2700 worktree_settings_file::Column::ProjectId,
2701 worktree_settings_file::Column::WorktreeId,
2702 worktree_settings_file::Column::Path,
2703 ])
2704 .update_column(worktree_settings_file::Column::Content)
2705 .to_owned(),
2706 )
2707 .exec(&*tx)
2708 .await?;
2709 } else {
2710 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
2711 project_id: ActiveValue::Set(project_id),
2712 worktree_id: ActiveValue::Set(update.worktree_id as i64),
2713 path: ActiveValue::Set(update.path.clone()),
2714 ..Default::default()
2715 })
2716 .exec(&*tx)
2717 .await?;
2718 }
2719
2720 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2721 Ok(connection_ids)
2722 })
2723 .await
2724 }
2725
2726 pub async fn join_project(
2727 &self,
2728 project_id: ProjectId,
2729 connection: ConnectionId,
2730 ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2731 let room_id = self.room_id_for_project(project_id).await?;
2732 self.room_transaction(room_id, |tx| async move {
2733 let participant = room_participant::Entity::find()
2734 .filter(
2735 Condition::all()
2736 .add(
2737 room_participant::Column::AnsweringConnectionId
2738 .eq(connection.id as i32),
2739 )
2740 .add(
2741 room_participant::Column::AnsweringConnectionServerId
2742 .eq(connection.owner_id as i32),
2743 ),
2744 )
2745 .one(&*tx)
2746 .await?
2747 .ok_or_else(|| anyhow!("must join a room first"))?;
2748
2749 let project = project::Entity::find_by_id(project_id)
2750 .one(&*tx)
2751 .await?
2752 .ok_or_else(|| anyhow!("no such project"))?;
2753 if project.room_id != participant.room_id {
2754 return Err(anyhow!("no such project"))?;
2755 }
2756
2757 let mut collaborators = project
2758 .find_related(project_collaborator::Entity)
2759 .all(&*tx)
2760 .await?;
2761 let replica_ids = collaborators
2762 .iter()
2763 .map(|c| c.replica_id)
2764 .collect::<HashSet<_>>();
2765 let mut replica_id = ReplicaId(1);
2766 while replica_ids.contains(&replica_id) {
2767 replica_id.0 += 1;
2768 }
2769 let new_collaborator = project_collaborator::ActiveModel {
2770 project_id: ActiveValue::set(project_id),
2771 connection_id: ActiveValue::set(connection.id as i32),
2772 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2773 user_id: ActiveValue::set(participant.user_id),
2774 replica_id: ActiveValue::set(replica_id),
2775 is_host: ActiveValue::set(false),
2776 ..Default::default()
2777 }
2778 .insert(&*tx)
2779 .await?;
2780 collaborators.push(new_collaborator);
2781
2782 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2783 let mut worktrees = db_worktrees
2784 .into_iter()
2785 .map(|db_worktree| {
2786 (
2787 db_worktree.id as u64,
2788 Worktree {
2789 id: db_worktree.id as u64,
2790 abs_path: db_worktree.abs_path,
2791 root_name: db_worktree.root_name,
2792 visible: db_worktree.visible,
2793 entries: Default::default(),
2794 repository_entries: Default::default(),
2795 diagnostic_summaries: Default::default(),
2796 settings_files: Default::default(),
2797 scan_id: db_worktree.scan_id as u64,
2798 completed_scan_id: db_worktree.completed_scan_id as u64,
2799 },
2800 )
2801 })
2802 .collect::<BTreeMap<_, _>>();
2803
2804 // Populate worktree entries.
2805 {
2806 let mut db_entries = worktree_entry::Entity::find()
2807 .filter(
2808 Condition::all()
2809 .add(worktree_entry::Column::ProjectId.eq(project_id))
2810 .add(worktree_entry::Column::IsDeleted.eq(false)),
2811 )
2812 .stream(&*tx)
2813 .await?;
2814 while let Some(db_entry) = db_entries.next().await {
2815 let db_entry = db_entry?;
2816 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2817 worktree.entries.push(proto::Entry {
2818 id: db_entry.id as u64,
2819 is_dir: db_entry.is_dir,
2820 path: db_entry.path,
2821 inode: db_entry.inode as u64,
2822 mtime: Some(proto::Timestamp {
2823 seconds: db_entry.mtime_seconds as u64,
2824 nanos: db_entry.mtime_nanos as u32,
2825 }),
2826 is_symlink: db_entry.is_symlink,
2827 is_ignored: db_entry.is_ignored,
2828 is_external: db_entry.is_external,
2829 git_status: db_entry.git_status.map(|status| status as i32),
2830 });
2831 }
2832 }
2833 }
2834
2835 // Populate repository entries.
2836 {
2837 let mut db_repository_entries = worktree_repository::Entity::find()
2838 .filter(
2839 Condition::all()
2840 .add(worktree_repository::Column::ProjectId.eq(project_id))
2841 .add(worktree_repository::Column::IsDeleted.eq(false)),
2842 )
2843 .stream(&*tx)
2844 .await?;
2845 while let Some(db_repository_entry) = db_repository_entries.next().await {
2846 let db_repository_entry = db_repository_entry?;
2847 if let Some(worktree) =
2848 worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
2849 {
2850 worktree.repository_entries.insert(
2851 db_repository_entry.work_directory_id as u64,
2852 proto::RepositoryEntry {
2853 work_directory_id: db_repository_entry.work_directory_id as u64,
2854 branch: db_repository_entry.branch,
2855 },
2856 );
2857 }
2858 }
2859 }
2860
2861 // Populate worktree diagnostic summaries.
2862 {
2863 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2864 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2865 .stream(&*tx)
2866 .await?;
2867 while let Some(db_summary) = db_summaries.next().await {
2868 let db_summary = db_summary?;
2869 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2870 worktree
2871 .diagnostic_summaries
2872 .push(proto::DiagnosticSummary {
2873 path: db_summary.path,
2874 language_server_id: db_summary.language_server_id as u64,
2875 error_count: db_summary.error_count as u32,
2876 warning_count: db_summary.warning_count as u32,
2877 });
2878 }
2879 }
2880 }
2881
2882 // Populate worktree settings files
2883 {
2884 let mut db_settings_files = worktree_settings_file::Entity::find()
2885 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
2886 .stream(&*tx)
2887 .await?;
2888 while let Some(db_settings_file) = db_settings_files.next().await {
2889 let db_settings_file = db_settings_file?;
2890 if let Some(worktree) =
2891 worktrees.get_mut(&(db_settings_file.worktree_id as u64))
2892 {
2893 worktree.settings_files.push(WorktreeSettingsFile {
2894 path: db_settings_file.path,
2895 content: db_settings_file.content,
2896 });
2897 }
2898 }
2899 }
2900
2901 // Populate language servers.
2902 let language_servers = project
2903 .find_related(language_server::Entity)
2904 .all(&*tx)
2905 .await?;
2906
2907 let project = Project {
2908 collaborators: collaborators
2909 .into_iter()
2910 .map(|collaborator| ProjectCollaborator {
2911 connection_id: collaborator.connection(),
2912 user_id: collaborator.user_id,
2913 replica_id: collaborator.replica_id,
2914 is_host: collaborator.is_host,
2915 })
2916 .collect(),
2917 worktrees,
2918 language_servers: language_servers
2919 .into_iter()
2920 .map(|language_server| proto::LanguageServer {
2921 id: language_server.id as u64,
2922 name: language_server.name,
2923 })
2924 .collect(),
2925 };
2926 Ok((project, replica_id as ReplicaId))
2927 })
2928 .await
2929 }
2930
2931 pub async fn leave_project(
2932 &self,
2933 project_id: ProjectId,
2934 connection: ConnectionId,
2935 ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
2936 let room_id = self.room_id_for_project(project_id).await?;
2937 self.room_transaction(room_id, |tx| async move {
2938 let result = project_collaborator::Entity::delete_many()
2939 .filter(
2940 Condition::all()
2941 .add(project_collaborator::Column::ProjectId.eq(project_id))
2942 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2943 .add(
2944 project_collaborator::Column::ConnectionServerId
2945 .eq(connection.owner_id as i32),
2946 ),
2947 )
2948 .exec(&*tx)
2949 .await?;
2950 if result.rows_affected == 0 {
2951 Err(anyhow!("not a collaborator on this project"))?;
2952 }
2953
2954 let project = project::Entity::find_by_id(project_id)
2955 .one(&*tx)
2956 .await?
2957 .ok_or_else(|| anyhow!("no such project"))?;
2958 let collaborators = project
2959 .find_related(project_collaborator::Entity)
2960 .all(&*tx)
2961 .await?;
2962 let connection_ids = collaborators
2963 .into_iter()
2964 .map(|collaborator| collaborator.connection())
2965 .collect();
2966
2967 follower::Entity::delete_many()
2968 .filter(
2969 Condition::any()
2970 .add(
2971 Condition::all()
2972 .add(follower::Column::ProjectId.eq(project_id))
2973 .add(
2974 follower::Column::LeaderConnectionServerId
2975 .eq(connection.owner_id),
2976 )
2977 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
2978 )
2979 .add(
2980 Condition::all()
2981 .add(follower::Column::ProjectId.eq(project_id))
2982 .add(
2983 follower::Column::FollowerConnectionServerId
2984 .eq(connection.owner_id),
2985 )
2986 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
2987 ),
2988 )
2989 .exec(&*tx)
2990 .await?;
2991
2992 let room = self.get_room(project.room_id, &tx).await?;
2993 let left_project = LeftProject {
2994 id: project_id,
2995 host_user_id: project.host_user_id,
2996 host_connection_id: project.host_connection()?,
2997 connection_ids,
2998 };
2999 Ok((room, left_project))
3000 })
3001 .await
3002 }
3003
3004 pub async fn project_collaborators(
3005 &self,
3006 project_id: ProjectId,
3007 connection_id: ConnectionId,
3008 ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
3009 let room_id = self.room_id_for_project(project_id).await?;
3010 self.room_transaction(room_id, |tx| async move {
3011 let collaborators = project_collaborator::Entity::find()
3012 .filter(project_collaborator::Column::ProjectId.eq(project_id))
3013 .all(&*tx)
3014 .await?
3015 .into_iter()
3016 .map(|collaborator| ProjectCollaborator {
3017 connection_id: collaborator.connection(),
3018 user_id: collaborator.user_id,
3019 replica_id: collaborator.replica_id,
3020 is_host: collaborator.is_host,
3021 })
3022 .collect::<Vec<_>>();
3023
3024 if collaborators
3025 .iter()
3026 .any(|collaborator| collaborator.connection_id == connection_id)
3027 {
3028 Ok(collaborators)
3029 } else {
3030 Err(anyhow!("no such project"))?
3031 }
3032 })
3033 .await
3034 }
3035
3036 pub async fn project_connection_ids(
3037 &self,
3038 project_id: ProjectId,
3039 connection_id: ConnectionId,
3040 ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
3041 let room_id = self.room_id_for_project(project_id).await?;
3042 self.room_transaction(room_id, |tx| async move {
3043 let mut collaborators = project_collaborator::Entity::find()
3044 .filter(project_collaborator::Column::ProjectId.eq(project_id))
3045 .stream(&*tx)
3046 .await?;
3047
3048 let mut connection_ids = HashSet::default();
3049 while let Some(collaborator) = collaborators.next().await {
3050 let collaborator = collaborator?;
3051 connection_ids.insert(collaborator.connection());
3052 }
3053
3054 if connection_ids.contains(&connection_id) {
3055 Ok(connection_ids)
3056 } else {
3057 Err(anyhow!("no such project"))?
3058 }
3059 })
3060 .await
3061 }
3062
3063 async fn project_guest_connection_ids(
3064 &self,
3065 project_id: ProjectId,
3066 tx: &DatabaseTransaction,
3067 ) -> Result<Vec<ConnectionId>> {
3068 let mut collaborators = project_collaborator::Entity::find()
3069 .filter(
3070 project_collaborator::Column::ProjectId
3071 .eq(project_id)
3072 .and(project_collaborator::Column::IsHost.eq(false)),
3073 )
3074 .stream(tx)
3075 .await?;
3076
3077 let mut guest_connection_ids = Vec::new();
3078 while let Some(collaborator) = collaborators.next().await {
3079 let collaborator = collaborator?;
3080 guest_connection_ids.push(collaborator.connection());
3081 }
3082 Ok(guest_connection_ids)
3083 }
3084
3085 async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
3086 self.transaction(|tx| async move {
3087 let project = project::Entity::find_by_id(project_id)
3088 .one(&*tx)
3089 .await?
3090 .ok_or_else(|| anyhow!("project {} not found", project_id))?;
3091 Ok(project.room_id)
3092 })
3093 .await
3094 }
3095
3096 // access tokens
3097
3098 pub async fn create_access_token(
3099 &self,
3100 user_id: UserId,
3101 access_token_hash: &str,
3102 max_access_token_count: usize,
3103 ) -> Result<AccessTokenId> {
3104 self.transaction(|tx| async {
3105 let tx = tx;
3106
3107 let token = access_token::ActiveModel {
3108 user_id: ActiveValue::set(user_id),
3109 hash: ActiveValue::set(access_token_hash.into()),
3110 ..Default::default()
3111 }
3112 .insert(&*tx)
3113 .await?;
3114
3115 access_token::Entity::delete_many()
3116 .filter(
3117 access_token::Column::Id.in_subquery(
3118 Query::select()
3119 .column(access_token::Column::Id)
3120 .from(access_token::Entity)
3121 .and_where(access_token::Column::UserId.eq(user_id))
3122 .order_by(access_token::Column::Id, sea_orm::Order::Desc)
3123 .limit(10000)
3124 .offset(max_access_token_count as u64)
3125 .to_owned(),
3126 ),
3127 )
3128 .exec(&*tx)
3129 .await?;
3130 Ok(token.id)
3131 })
3132 .await
3133 }
3134
3135 pub async fn get_access_token(
3136 &self,
3137 access_token_id: AccessTokenId,
3138 ) -> Result<access_token::Model> {
3139 self.transaction(|tx| async move {
3140 Ok(access_token::Entity::find_by_id(access_token_id)
3141 .one(&*tx)
3142 .await?
3143 .ok_or_else(|| anyhow!("no such access token"))?)
3144 })
3145 .await
3146 }
3147
3148 // channels
3149
3150 pub async fn create_root_channel(
3151 &self,
3152 name: &str,
3153 live_kit_room: &str,
3154 creator_id: UserId,
3155 ) -> Result<ChannelId> {
3156 self.create_channel(name, None, live_kit_room, creator_id)
3157 .await
3158 }
3159
3160 pub async fn create_channel(
3161 &self,
3162 name: &str,
3163 parent: Option<ChannelId>,
3164 live_kit_room: &str,
3165 creator_id: UserId,
3166 ) -> Result<ChannelId> {
3167 self.transaction(move |tx| async move {
3168 if let Some(parent) = parent {
3169 self.check_user_is_channel_admin(parent, creator_id, &*tx)
3170 .await?;
3171 }
3172
3173 let channel = channel::ActiveModel {
3174 name: ActiveValue::Set(name.to_string()),
3175 ..Default::default()
3176 }
3177 .insert(&*tx)
3178 .await?;
3179
3180 if let Some(parent) = parent {
3181 channel_parent::ActiveModel {
3182 child_id: ActiveValue::Set(channel.id),
3183 parent_id: ActiveValue::Set(parent),
3184 }
3185 .insert(&*tx)
3186 .await?;
3187 }
3188
3189 channel_member::ActiveModel {
3190 channel_id: ActiveValue::Set(channel.id),
3191 user_id: ActiveValue::Set(creator_id),
3192 accepted: ActiveValue::Set(true),
3193 admin: ActiveValue::Set(true),
3194 ..Default::default()
3195 }
3196 .insert(&*tx)
3197 .await?;
3198
3199 room::ActiveModel {
3200 channel_id: ActiveValue::Set(Some(channel.id)),
3201 live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
3202 ..Default::default()
3203 }
3204 .insert(&*tx)
3205 .await?;
3206
3207 Ok(channel.id)
3208 })
3209 .await
3210 }
3211
3212 pub async fn remove_channel(
3213 &self,
3214 channel_id: ChannelId,
3215 user_id: UserId,
3216 ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
3217 self.transaction(move |tx| async move {
3218 self.check_user_is_channel_admin(channel_id, user_id, &*tx)
3219 .await?;
3220
3221 // Don't remove descendant channels that have additional parents.
3222 let mut channels_to_remove = self.get_channel_descendants([channel_id], &*tx).await?;
3223 {
3224 let mut channels_to_keep = channel_parent::Entity::find()
3225 .filter(
3226 channel_parent::Column::ChildId
3227 .is_in(
3228 channels_to_remove
3229 .keys()
3230 .copied()
3231 .filter(|&id| id != channel_id),
3232 )
3233 .and(
3234 channel_parent::Column::ParentId
3235 .is_not_in(channels_to_remove.keys().copied()),
3236 ),
3237 )
3238 .stream(&*tx)
3239 .await?;
3240 while let Some(row) = channels_to_keep.next().await {
3241 let row = row?;
3242 channels_to_remove.remove(&row.child_id);
3243 }
3244 }
3245
3246 let channel_ancestors = self.get_channel_ancestors(channel_id, &*tx).await?;
3247 let members_to_notify: Vec<UserId> = channel_member::Entity::find()
3248 .filter(channel_member::Column::ChannelId.is_in(channel_ancestors))
3249 .select_only()
3250 .column(channel_member::Column::UserId)
3251 .distinct()
3252 .into_values::<_, QueryUserIds>()
3253 .all(&*tx)
3254 .await?;
3255
3256 channel::Entity::delete_many()
3257 .filter(channel::Column::Id.is_in(channels_to_remove.keys().copied()))
3258 .exec(&*tx)
3259 .await?;
3260
3261 Ok((channels_to_remove.into_keys().collect(), members_to_notify))
3262 })
3263 .await
3264 }
3265
3266 pub async fn invite_channel_member(
3267 &self,
3268 channel_id: ChannelId,
3269 invitee_id: UserId,
3270 inviter_id: UserId,
3271 is_admin: bool,
3272 ) -> Result<()> {
3273 self.transaction(move |tx| async move {
3274 self.check_user_is_channel_admin(channel_id, inviter_id, &*tx)
3275 .await?;
3276
3277 channel_member::ActiveModel {
3278 channel_id: ActiveValue::Set(channel_id),
3279 user_id: ActiveValue::Set(invitee_id),
3280 accepted: ActiveValue::Set(false),
3281 admin: ActiveValue::Set(is_admin),
3282 ..Default::default()
3283 }
3284 .insert(&*tx)
3285 .await?;
3286
3287 Ok(())
3288 })
3289 .await
3290 }
3291
3292 pub async fn respond_to_channel_invite(
3293 &self,
3294 channel_id: ChannelId,
3295 user_id: UserId,
3296 accept: bool,
3297 ) -> Result<()> {
3298 self.transaction(move |tx| async move {
3299 let rows_affected = if accept {
3300 channel_member::Entity::update_many()
3301 .set(channel_member::ActiveModel {
3302 accepted: ActiveValue::Set(accept),
3303 ..Default::default()
3304 })
3305 .filter(
3306 channel_member::Column::ChannelId
3307 .eq(channel_id)
3308 .and(channel_member::Column::UserId.eq(user_id))
3309 .and(channel_member::Column::Accepted.eq(false)),
3310 )
3311 .exec(&*tx)
3312 .await?
3313 .rows_affected
3314 } else {
3315 channel_member::ActiveModel {
3316 channel_id: ActiveValue::Unchanged(channel_id),
3317 user_id: ActiveValue::Unchanged(user_id),
3318 ..Default::default()
3319 }
3320 .delete(&*tx)
3321 .await?
3322 .rows_affected
3323 };
3324
3325 if rows_affected == 0 {
3326 Err(anyhow!("no such invitation"))?;
3327 }
3328
3329 Ok(())
3330 })
3331 .await
3332 }
3333
3334 pub async fn remove_channel_member(
3335 &self,
3336 channel_id: ChannelId,
3337 member_id: UserId,
3338 remover_id: UserId,
3339 ) -> Result<()> {
3340 self.transaction(|tx| async move {
3341 self.check_user_is_channel_admin(channel_id, remover_id, &*tx)
3342 .await?;
3343
3344 let result = channel_member::Entity::delete_many()
3345 .filter(
3346 channel_member::Column::ChannelId
3347 .eq(channel_id)
3348 .and(channel_member::Column::UserId.eq(member_id)),
3349 )
3350 .exec(&*tx)
3351 .await?;
3352
3353 if result.rows_affected == 0 {
3354 Err(anyhow!("no such member"))?;
3355 }
3356
3357 Ok(())
3358 })
3359 .await
3360 }
3361
3362 pub async fn get_channel_invites_for_user(&self, user_id: UserId) -> Result<Vec<Channel>> {
3363 self.transaction(|tx| async move {
3364 let channel_invites = channel_member::Entity::find()
3365 .filter(
3366 channel_member::Column::UserId
3367 .eq(user_id)
3368 .and(channel_member::Column::Accepted.eq(false)),
3369 )
3370 .all(&*tx)
3371 .await?;
3372
3373 let channels = channel::Entity::find()
3374 .filter(
3375 channel::Column::Id.is_in(
3376 channel_invites
3377 .into_iter()
3378 .map(|channel_member| channel_member.channel_id),
3379 ),
3380 )
3381 .all(&*tx)
3382 .await?;
3383
3384 let channels = channels
3385 .into_iter()
3386 .map(|channel| Channel {
3387 id: channel.id,
3388 name: channel.name,
3389 user_is_admin: false,
3390 parent_id: None,
3391 })
3392 .collect();
3393
3394 Ok(channels)
3395 })
3396 .await
3397 }
3398
3399 pub async fn get_channels_for_user(
3400 &self,
3401 user_id: UserId,
3402 ) -> Result<(Vec<Channel>, HashMap<ChannelId, Vec<UserId>>)> {
3403 self.transaction(|tx| async move {
3404 let tx = tx;
3405
3406 let channel_memberships = channel_member::Entity::find()
3407 .filter(
3408 channel_member::Column::UserId
3409 .eq(user_id)
3410 .and(channel_member::Column::Accepted.eq(true)),
3411 )
3412 .all(&*tx)
3413 .await?;
3414
3415 let admin_channel_ids = channel_memberships
3416 .iter()
3417 .filter_map(|m| m.admin.then_some(m.channel_id))
3418 .collect::<HashSet<_>>();
3419 let parents_by_child_id = self
3420 .get_channel_descendants(channel_memberships.iter().map(|m| m.channel_id), &*tx)
3421 .await?;
3422
3423 let mut channels = Vec::with_capacity(parents_by_child_id.len());
3424 {
3425 let mut rows = channel::Entity::find()
3426 .filter(channel::Column::Id.is_in(parents_by_child_id.keys().copied()))
3427 .stream(&*tx)
3428 .await?;
3429 while let Some(row) = rows.next().await {
3430 let row = row?;
3431 channels.push(Channel {
3432 id: row.id,
3433 name: row.name,
3434 user_is_admin: admin_channel_ids.contains(&row.id),
3435 parent_id: parents_by_child_id.get(&row.id).copied().flatten(),
3436 });
3437 }
3438 }
3439
3440 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
3441 enum QueryUserIdsAndChannelIds {
3442 ChannelId,
3443 UserId,
3444 }
3445
3446 let mut participants_by_channel: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
3447 {
3448 let mut rows = room_participant::Entity::find()
3449 .inner_join(room::Entity)
3450 .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
3451 .select_only()
3452 .column(room::Column::ChannelId)
3453 .column(room_participant::Column::UserId)
3454 .into_values::<_, QueryUserIdsAndChannelIds>()
3455 .stream(&*tx)
3456 .await?;
3457 while let Some(row) = rows.next().await {
3458 let row: (ChannelId, UserId) = row?;
3459 participants_by_channel
3460 .entry(row.0)
3461 .or_default()
3462 .push(row.1)
3463 }
3464 }
3465
3466 Ok((channels, participants_by_channel))
3467 })
3468 .await
3469 }
3470
3471 pub async fn get_channel_members(&self, id: ChannelId) -> Result<Vec<UserId>> {
3472 self.transaction(|tx| async move { self.get_channel_members_internal(id, &*tx).await })
3473 .await
3474 }
3475
3476 pub async fn set_channel_member_admin(
3477 &self,
3478 channel_id: ChannelId,
3479 from: UserId,
3480 for_user: UserId,
3481 admin: bool,
3482 ) -> Result<()> {
3483 self.transaction(|tx| async move {
3484 self.check_user_is_channel_admin(channel_id, from, &*tx)
3485 .await?;
3486
3487 let result = channel_member::Entity::update_many()
3488 .filter(
3489 channel_member::Column::ChannelId
3490 .eq(channel_id)
3491 .and(channel_member::Column::UserId.eq(for_user)),
3492 )
3493 .set(channel_member::ActiveModel {
3494 admin: ActiveValue::set(admin),
3495 ..Default::default()
3496 })
3497 .exec(&*tx)
3498 .await?;
3499
3500 if result.rows_affected == 0 {
3501 Err(anyhow!("no such member"))?;
3502 }
3503
3504 Ok(())
3505 })
3506 .await
3507 }
3508
3509 pub async fn get_channel_member_details(
3510 &self,
3511 channel_id: ChannelId,
3512 user_id: UserId,
3513 ) -> Result<Vec<proto::ChannelMember>> {
3514 self.transaction(|tx| async move {
3515 self.check_user_is_channel_admin(channel_id, user_id, &*tx)
3516 .await?;
3517
3518 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
3519 enum QueryMemberDetails {
3520 UserId,
3521 Admin,
3522 IsDirectMember,
3523 Accepted,
3524 }
3525
3526 let tx = tx;
3527 let ancestor_ids = self.get_channel_ancestors(channel_id, &*tx).await?;
3528 let mut stream = channel_member::Entity::find()
3529 .distinct()
3530 .filter(channel_member::Column::ChannelId.is_in(ancestor_ids.iter().copied()))
3531 .select_only()
3532 .column(channel_member::Column::UserId)
3533 .column(channel_member::Column::Admin)
3534 .column_as(
3535 channel_member::Column::ChannelId.eq(channel_id),
3536 QueryMemberDetails::IsDirectMember,
3537 )
3538 .column(channel_member::Column::Accepted)
3539 .order_by_asc(channel_member::Column::UserId)
3540 .into_values::<_, QueryMemberDetails>()
3541 .stream(&*tx)
3542 .await?;
3543
3544 let mut rows = Vec::<proto::ChannelMember>::new();
3545 while let Some(row) = stream.next().await {
3546 let (user_id, is_admin, is_direct_member, is_invite_accepted): (
3547 UserId,
3548 bool,
3549 bool,
3550 bool,
3551 ) = row?;
3552 let kind = match (is_direct_member, is_invite_accepted) {
3553 (true, true) => proto::channel_member::Kind::Member,
3554 (true, false) => proto::channel_member::Kind::Invitee,
3555 (false, true) => proto::channel_member::Kind::AncestorMember,
3556 (false, false) => continue,
3557 };
3558 let user_id = user_id.to_proto();
3559 let kind = kind.into();
3560 if let Some(last_row) = rows.last_mut() {
3561 if last_row.user_id == user_id {
3562 if is_direct_member {
3563 last_row.kind = kind;
3564 last_row.admin = is_admin;
3565 }
3566 continue;
3567 }
3568 }
3569 rows.push(proto::ChannelMember {
3570 user_id,
3571 kind,
3572 admin: is_admin,
3573 });
3574 }
3575
3576 Ok(rows)
3577 })
3578 .await
3579 }
3580
3581 pub async fn get_channel_members_internal(
3582 &self,
3583 id: ChannelId,
3584 tx: &DatabaseTransaction,
3585 ) -> Result<Vec<UserId>> {
3586 let ancestor_ids = self.get_channel_ancestors(id, tx).await?;
3587 let user_ids = channel_member::Entity::find()
3588 .distinct()
3589 .filter(channel_member::Column::ChannelId.is_in(ancestor_ids.iter().copied()))
3590 .select_only()
3591 .column(channel_member::Column::UserId)
3592 .into_values::<_, QueryUserIds>()
3593 .all(&*tx)
3594 .await?;
3595 Ok(user_ids)
3596 }
3597
3598 async fn check_user_is_channel_admin(
3599 &self,
3600 channel_id: ChannelId,
3601 user_id: UserId,
3602 tx: &DatabaseTransaction,
3603 ) -> Result<()> {
3604 let channel_ids = self.get_channel_ancestors(channel_id, tx).await?;
3605 channel_member::Entity::find()
3606 .filter(
3607 channel_member::Column::ChannelId
3608 .is_in(channel_ids)
3609 .and(channel_member::Column::UserId.eq(user_id))
3610 .and(channel_member::Column::Admin.eq(true)),
3611 )
3612 .one(&*tx)
3613 .await?
3614 .ok_or_else(|| anyhow!("user is not allowed to remove this channel"))?;
3615 Ok(())
3616 }
3617
3618 async fn get_channel_ancestors(
3619 &self,
3620 channel_id: ChannelId,
3621 tx: &DatabaseTransaction,
3622 ) -> Result<Vec<ChannelId>> {
3623 let sql = format!(
3624 r#"
3625 WITH RECURSIVE channel_tree(child_id, parent_id) AS (
3626 SELECT CAST(NULL as INTEGER) as child_id, root_ids.column1 as parent_id
3627 FROM (VALUES ({})) as root_ids
3628 UNION
3629 SELECT channel_parents.child_id, channel_parents.parent_id
3630 FROM channel_parents, channel_tree
3631 WHERE channel_parents.child_id = channel_tree.parent_id
3632 )
3633 SELECT DISTINCT channel_tree.parent_id
3634 FROM channel_tree
3635 "#,
3636 channel_id
3637 );
3638
3639 #[derive(FromQueryResult, Debug, PartialEq)]
3640 pub struct ChannelParent {
3641 pub parent_id: ChannelId,
3642 }
3643
3644 let stmt = Statement::from_string(self.pool.get_database_backend(), sql);
3645
3646 let mut channel_ids_stream = channel_parent::Entity::find()
3647 .from_raw_sql(stmt)
3648 .into_model::<ChannelParent>()
3649 .stream(&*tx)
3650 .await?;
3651
3652 let mut channel_ids = vec![];
3653 while let Some(channel_id) = channel_ids_stream.next().await {
3654 channel_ids.push(channel_id?.parent_id);
3655 }
3656
3657 Ok(channel_ids)
3658 }
3659
3660 async fn get_channel_descendants(
3661 &self,
3662 channel_ids: impl IntoIterator<Item = ChannelId>,
3663 tx: &DatabaseTransaction,
3664 ) -> Result<HashMap<ChannelId, Option<ChannelId>>> {
3665 let mut values = String::new();
3666 for id in channel_ids {
3667 if !values.is_empty() {
3668 values.push_str(", ");
3669 }
3670 write!(&mut values, "({})", id).unwrap();
3671 }
3672
3673 if values.is_empty() {
3674 return Ok(HashMap::default());
3675 }
3676
3677 let sql = format!(
3678 r#"
3679 WITH RECURSIVE channel_tree(child_id, parent_id) AS (
3680 SELECT root_ids.column1 as child_id, CAST(NULL as INTEGER) as parent_id
3681 FROM (VALUES {values}) as root_ids
3682 UNION
3683 SELECT channel_parents.child_id, channel_parents.parent_id
3684 FROM channel_parents, channel_tree
3685 WHERE channel_parents.parent_id = channel_tree.child_id
3686 )
3687 SELECT channel_tree.child_id, channel_tree.parent_id
3688 FROM channel_tree
3689 ORDER BY child_id, parent_id IS NOT NULL
3690 "#,
3691 );
3692
3693 #[derive(FromQueryResult, Debug, PartialEq)]
3694 pub struct ChannelParent {
3695 pub child_id: ChannelId,
3696 pub parent_id: Option<ChannelId>,
3697 }
3698
3699 let stmt = Statement::from_string(self.pool.get_database_backend(), sql);
3700
3701 let mut parents_by_child_id = HashMap::default();
3702 let mut parents = channel_parent::Entity::find()
3703 .from_raw_sql(stmt)
3704 .into_model::<ChannelParent>()
3705 .stream(tx)
3706 .await?;
3707
3708 while let Some(parent) = parents.next().await {
3709 let parent = parent?;
3710 parents_by_child_id.insert(parent.child_id, parent.parent_id);
3711 }
3712
3713 Ok(parents_by_child_id)
3714 }
3715
3716 pub async fn get_channel(
3717 &self,
3718 channel_id: ChannelId,
3719 user_id: UserId,
3720 ) -> Result<Option<Channel>> {
3721 self.transaction(|tx| async move {
3722 let tx = tx;
3723 let channel = channel::Entity::find_by_id(channel_id).one(&*tx).await?;
3724 let user_is_admin = channel_member::Entity::find()
3725 .filter(
3726 channel_member::Column::ChannelId
3727 .eq(channel_id)
3728 .and(channel_member::Column::UserId.eq(user_id))
3729 .and(channel_member::Column::Admin.eq(true)),
3730 )
3731 .count(&*tx)
3732 .await?
3733 > 0;
3734
3735 Ok(channel.map(|channel| Channel {
3736 id: channel.id,
3737 name: channel.name,
3738 user_is_admin,
3739 parent_id: None,
3740 }))
3741 })
3742 .await
3743 }
3744
3745 pub async fn room_id_for_channel(&self, channel_id: ChannelId) -> Result<RoomId> {
3746 self.transaction(|tx| async move {
3747 let tx = tx;
3748 let room = channel::Model {
3749 id: channel_id,
3750 ..Default::default()
3751 }
3752 .find_related(room::Entity)
3753 .one(&*tx)
3754 .await?
3755 .ok_or_else(|| anyhow!("invalid channel"))?;
3756 Ok(room.id)
3757 })
3758 .await
3759 }
3760
3761 async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
3762 where
3763 F: Send + Fn(TransactionHandle) -> Fut,
3764 Fut: Send + Future<Output = Result<T>>,
3765 {
3766 let body = async {
3767 let mut i = 0;
3768 loop {
3769 let (tx, result) = self.with_transaction(&f).await?;
3770 match result {
3771 Ok(result) => match tx.commit().await.map_err(Into::into) {
3772 Ok(()) => return Ok(result),
3773 Err(error) => {
3774 if !self.retry_on_serialization_error(&error, i).await {
3775 return Err(error);
3776 }
3777 }
3778 },
3779 Err(error) => {
3780 tx.rollback().await?;
3781 if !self.retry_on_serialization_error(&error, i).await {
3782 return Err(error);
3783 }
3784 }
3785 }
3786 i += 1;
3787 }
3788 };
3789
3790 self.run(body).await
3791 }
3792
3793 async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
3794 where
3795 F: Send + Fn(TransactionHandle) -> Fut,
3796 Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
3797 {
3798 let body = async {
3799 let mut i = 0;
3800 loop {
3801 let (tx, result) = self.with_transaction(&f).await?;
3802 match result {
3803 Ok(Some((room_id, data))) => {
3804 let lock = self.rooms.entry(room_id).or_default().clone();
3805 let _guard = lock.lock_owned().await;
3806 match tx.commit().await.map_err(Into::into) {
3807 Ok(()) => {
3808 return Ok(Some(RoomGuard {
3809 data,
3810 _guard,
3811 _not_send: PhantomData,
3812 }));
3813 }
3814 Err(error) => {
3815 if !self.retry_on_serialization_error(&error, i).await {
3816 return Err(error);
3817 }
3818 }
3819 }
3820 }
3821 Ok(None) => match tx.commit().await.map_err(Into::into) {
3822 Ok(()) => return Ok(None),
3823 Err(error) => {
3824 if !self.retry_on_serialization_error(&error, i).await {
3825 return Err(error);
3826 }
3827 }
3828 },
3829 Err(error) => {
3830 tx.rollback().await?;
3831 if !self.retry_on_serialization_error(&error, i).await {
3832 return Err(error);
3833 }
3834 }
3835 }
3836 i += 1;
3837 }
3838 };
3839
3840 self.run(body).await
3841 }
3842
3843 async fn room_transaction<F, Fut, T>(&self, room_id: RoomId, f: F) -> Result<RoomGuard<T>>
3844 where
3845 F: Send + Fn(TransactionHandle) -> Fut,
3846 Fut: Send + Future<Output = Result<T>>,
3847 {
3848 let body = async {
3849 let mut i = 0;
3850 loop {
3851 let lock = self.rooms.entry(room_id).or_default().clone();
3852 let _guard = lock.lock_owned().await;
3853 let (tx, result) = self.with_transaction(&f).await?;
3854 match result {
3855 Ok(data) => match tx.commit().await.map_err(Into::into) {
3856 Ok(()) => {
3857 return Ok(RoomGuard {
3858 data,
3859 _guard,
3860 _not_send: PhantomData,
3861 });
3862 }
3863 Err(error) => {
3864 if !self.retry_on_serialization_error(&error, i).await {
3865 return Err(error);
3866 }
3867 }
3868 },
3869 Err(error) => {
3870 tx.rollback().await?;
3871 if !self.retry_on_serialization_error(&error, i).await {
3872 return Err(error);
3873 }
3874 }
3875 }
3876 i += 1;
3877 }
3878 };
3879
3880 self.run(body).await
3881 }
3882
3883 async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
3884 where
3885 F: Send + Fn(TransactionHandle) -> Fut,
3886 Fut: Send + Future<Output = Result<T>>,
3887 {
3888 let tx = self
3889 .pool
3890 .begin_with_config(Some(IsolationLevel::Serializable), None)
3891 .await?;
3892
3893 let mut tx = Arc::new(Some(tx));
3894 let result = f(TransactionHandle(tx.clone())).await;
3895 let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
3896 return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
3897 };
3898
3899 Ok((tx, result))
3900 }
3901
3902 async fn run<F, T>(&self, future: F) -> Result<T>
3903 where
3904 F: Future<Output = Result<T>>,
3905 {
3906 #[cfg(test)]
3907 {
3908 if let Executor::Deterministic(executor) = &self.executor {
3909 executor.simulate_random_delay().await;
3910 }
3911
3912 self.runtime.as_ref().unwrap().block_on(future)
3913 }
3914
3915 #[cfg(not(test))]
3916 {
3917 future.await
3918 }
3919 }
3920
3921 async fn retry_on_serialization_error(&self, error: &Error, prev_attempt_count: u32) -> bool {
3922 // If the error is due to a failure to serialize concurrent transactions, then retry
3923 // this transaction after a delay. With each subsequent retry, double the delay duration.
3924 // Also vary the delay randomly in order to ensure different database connections retry
3925 // at different times.
3926 if is_serialization_error(error) {
3927 let base_delay = 4_u64 << prev_attempt_count.min(16);
3928 let randomized_delay = base_delay as f32 * self.rng.lock().await.gen_range(0.5..=2.0);
3929 log::info!(
3930 "retrying transaction after serialization error. delay: {} ms.",
3931 randomized_delay
3932 );
3933 self.executor
3934 .sleep(Duration::from_millis(randomized_delay as u64))
3935 .await;
3936 true
3937 } else {
3938 false
3939 }
3940 }
3941}
3942
3943fn is_serialization_error(error: &Error) -> bool {
3944 const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
3945 match error {
3946 Error::Database(
3947 DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
3948 | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
3949 ) if error
3950 .as_database_error()
3951 .and_then(|error| error.code())
3952 .as_deref()
3953 == Some(SERIALIZATION_FAILURE_CODE) =>
3954 {
3955 true
3956 }
3957 _ => false,
3958 }
3959}
3960
3961struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
3962
3963impl Deref for TransactionHandle {
3964 type Target = DatabaseTransaction;
3965
3966 fn deref(&self) -> &Self::Target {
3967 self.0.as_ref().as_ref().unwrap()
3968 }
3969}
3970
3971pub struct RoomGuard<T> {
3972 data: T,
3973 _guard: OwnedMutexGuard<()>,
3974 _not_send: PhantomData<Rc<()>>,
3975}
3976
3977impl<T> Deref for RoomGuard<T> {
3978 type Target = T;
3979
3980 fn deref(&self) -> &T {
3981 &self.data
3982 }
3983}
3984
3985impl<T> DerefMut for RoomGuard<T> {
3986 fn deref_mut(&mut self) -> &mut T {
3987 &mut self.data
3988 }
3989}
3990
3991#[derive(Debug, Serialize, Deserialize)]
3992pub struct NewUserParams {
3993 pub github_login: String,
3994 pub github_user_id: i32,
3995 pub invite_count: i32,
3996}
3997
3998#[derive(Debug)]
3999pub struct NewUserResult {
4000 pub user_id: UserId,
4001 pub metrics_id: String,
4002 pub inviting_user_id: Option<UserId>,
4003 pub signup_device_id: Option<String>,
4004}
4005
4006#[derive(FromQueryResult, Debug, PartialEq)]
4007pub struct Channel {
4008 pub id: ChannelId,
4009 pub name: String,
4010 pub user_is_admin: bool,
4011 pub parent_id: Option<ChannelId>,
4012}
4013
4014fn random_invite_code() -> String {
4015 nanoid::nanoid!(16)
4016}
4017
4018fn random_email_confirmation_code() -> String {
4019 nanoid::nanoid!(64)
4020}
4021
4022macro_rules! id_type {
4023 ($name:ident) => {
4024 #[derive(
4025 Clone,
4026 Copy,
4027 Debug,
4028 Default,
4029 PartialEq,
4030 Eq,
4031 PartialOrd,
4032 Ord,
4033 Hash,
4034 Serialize,
4035 Deserialize,
4036 )]
4037 #[serde(transparent)]
4038 pub struct $name(pub i32);
4039
4040 impl $name {
4041 #[allow(unused)]
4042 pub const MAX: Self = Self(i32::MAX);
4043
4044 #[allow(unused)]
4045 pub fn from_proto(value: u64) -> Self {
4046 Self(value as i32)
4047 }
4048
4049 #[allow(unused)]
4050 pub fn to_proto(self) -> u64 {
4051 self.0 as u64
4052 }
4053 }
4054
4055 impl std::fmt::Display for $name {
4056 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
4057 self.0.fmt(f)
4058 }
4059 }
4060
4061 impl From<$name> for sea_query::Value {
4062 fn from(value: $name) -> Self {
4063 sea_query::Value::Int(Some(value.0))
4064 }
4065 }
4066
4067 impl sea_orm::TryGetable for $name {
4068 fn try_get(
4069 res: &sea_orm::QueryResult,
4070 pre: &str,
4071 col: &str,
4072 ) -> Result<Self, sea_orm::TryGetError> {
4073 Ok(Self(i32::try_get(res, pre, col)?))
4074 }
4075 }
4076
4077 impl sea_query::ValueType for $name {
4078 fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
4079 match v {
4080 Value::TinyInt(Some(int)) => {
4081 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4082 }
4083 Value::SmallInt(Some(int)) => {
4084 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4085 }
4086 Value::Int(Some(int)) => {
4087 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4088 }
4089 Value::BigInt(Some(int)) => {
4090 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4091 }
4092 Value::TinyUnsigned(Some(int)) => {
4093 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4094 }
4095 Value::SmallUnsigned(Some(int)) => {
4096 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4097 }
4098 Value::Unsigned(Some(int)) => {
4099 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4100 }
4101 Value::BigUnsigned(Some(int)) => {
4102 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4103 }
4104 _ => Err(sea_query::ValueTypeErr),
4105 }
4106 }
4107
4108 fn type_name() -> String {
4109 stringify!($name).into()
4110 }
4111
4112 fn array_type() -> sea_query::ArrayType {
4113 sea_query::ArrayType::Int
4114 }
4115
4116 fn column_type() -> sea_query::ColumnType {
4117 sea_query::ColumnType::Integer(None)
4118 }
4119 }
4120
4121 impl sea_orm::TryFromU64 for $name {
4122 fn try_from_u64(n: u64) -> Result<Self, DbErr> {
4123 Ok(Self(n.try_into().map_err(|_| {
4124 DbErr::ConvertFromU64(concat!(
4125 "error converting ",
4126 stringify!($name),
4127 " to u64"
4128 ))
4129 })?))
4130 }
4131 }
4132
4133 impl sea_query::Nullable for $name {
4134 fn null() -> Value {
4135 Value::Int(None)
4136 }
4137 }
4138 };
4139}
4140
4141id_type!(AccessTokenId);
4142id_type!(ChannelId);
4143id_type!(ChannelMemberId);
4144id_type!(ContactId);
4145id_type!(FollowerId);
4146id_type!(RoomId);
4147id_type!(RoomParticipantId);
4148id_type!(ProjectId);
4149id_type!(ProjectCollaboratorId);
4150id_type!(ReplicaId);
4151id_type!(ServerId);
4152id_type!(SignupId);
4153id_type!(UserId);
4154
4155#[derive(Clone)]
4156pub struct JoinRoom {
4157 pub room: proto::Room,
4158 pub channel_id: Option<ChannelId>,
4159 pub channel_members: Vec<UserId>,
4160}
4161
4162pub struct RejoinedRoom {
4163 pub room: proto::Room,
4164 pub rejoined_projects: Vec<RejoinedProject>,
4165 pub reshared_projects: Vec<ResharedProject>,
4166 pub channel_id: Option<ChannelId>,
4167 pub channel_members: Vec<UserId>,
4168}
4169
4170pub struct ResharedProject {
4171 pub id: ProjectId,
4172 pub old_connection_id: ConnectionId,
4173 pub collaborators: Vec<ProjectCollaborator>,
4174 pub worktrees: Vec<proto::WorktreeMetadata>,
4175}
4176
4177pub struct RejoinedProject {
4178 pub id: ProjectId,
4179 pub old_connection_id: ConnectionId,
4180 pub collaborators: Vec<ProjectCollaborator>,
4181 pub worktrees: Vec<RejoinedWorktree>,
4182 pub language_servers: Vec<proto::LanguageServer>,
4183}
4184
4185#[derive(Debug)]
4186pub struct RejoinedWorktree {
4187 pub id: u64,
4188 pub abs_path: String,
4189 pub root_name: String,
4190 pub visible: bool,
4191 pub updated_entries: Vec<proto::Entry>,
4192 pub removed_entries: Vec<u64>,
4193 pub updated_repositories: Vec<proto::RepositoryEntry>,
4194 pub removed_repositories: Vec<u64>,
4195 pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
4196 pub settings_files: Vec<WorktreeSettingsFile>,
4197 pub scan_id: u64,
4198 pub completed_scan_id: u64,
4199}
4200
4201pub struct LeftRoom {
4202 pub room: proto::Room,
4203 pub channel_id: Option<ChannelId>,
4204 pub channel_members: Vec<UserId>,
4205 pub left_projects: HashMap<ProjectId, LeftProject>,
4206 pub canceled_calls_to_user_ids: Vec<UserId>,
4207 pub deleted: bool,
4208}
4209
4210pub struct RefreshedRoom {
4211 pub room: proto::Room,
4212 pub channel_id: Option<ChannelId>,
4213 pub channel_members: Vec<UserId>,
4214 pub stale_participant_user_ids: Vec<UserId>,
4215 pub canceled_calls_to_user_ids: Vec<UserId>,
4216}
4217
4218pub struct Project {
4219 pub collaborators: Vec<ProjectCollaborator>,
4220 pub worktrees: BTreeMap<u64, Worktree>,
4221 pub language_servers: Vec<proto::LanguageServer>,
4222}
4223
4224pub struct ProjectCollaborator {
4225 pub connection_id: ConnectionId,
4226 pub user_id: UserId,
4227 pub replica_id: ReplicaId,
4228 pub is_host: bool,
4229}
4230
4231impl ProjectCollaborator {
4232 pub fn to_proto(&self) -> proto::Collaborator {
4233 proto::Collaborator {
4234 peer_id: Some(self.connection_id.into()),
4235 replica_id: self.replica_id.0 as u32,
4236 user_id: self.user_id.to_proto(),
4237 }
4238 }
4239}
4240
4241#[derive(Debug)]
4242pub struct LeftProject {
4243 pub id: ProjectId,
4244 pub host_user_id: UserId,
4245 pub host_connection_id: ConnectionId,
4246 pub connection_ids: Vec<ConnectionId>,
4247}
4248
4249pub struct Worktree {
4250 pub id: u64,
4251 pub abs_path: String,
4252 pub root_name: String,
4253 pub visible: bool,
4254 pub entries: Vec<proto::Entry>,
4255 pub repository_entries: BTreeMap<u64, proto::RepositoryEntry>,
4256 pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
4257 pub settings_files: Vec<WorktreeSettingsFile>,
4258 pub scan_id: u64,
4259 pub completed_scan_id: u64,
4260}
4261
4262#[derive(Debug)]
4263pub struct WorktreeSettingsFile {
4264 pub path: String,
4265 pub content: String,
4266}
4267
4268#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
4269enum QueryUserIds {
4270 UserId,
4271}
4272
4273#[cfg(test)]
4274pub use test::*;
4275
4276#[cfg(test)]
4277mod test {
4278 use super::*;
4279 use gpui::executor::Background;
4280 use parking_lot::Mutex;
4281 use sea_orm::ConnectionTrait;
4282 use sqlx::migrate::MigrateDatabase;
4283 use std::sync::Arc;
4284
4285 pub struct TestDb {
4286 pub db: Option<Arc<Database>>,
4287 pub connection: Option<sqlx::AnyConnection>,
4288 }
4289
4290 impl TestDb {
4291 pub fn sqlite(background: Arc<Background>) -> Self {
4292 let url = format!("sqlite::memory:");
4293 let runtime = tokio::runtime::Builder::new_current_thread()
4294 .enable_io()
4295 .enable_time()
4296 .build()
4297 .unwrap();
4298
4299 let mut db = runtime.block_on(async {
4300 let mut options = ConnectOptions::new(url);
4301 options.max_connections(5);
4302 let db = Database::new(options, Executor::Deterministic(background))
4303 .await
4304 .unwrap();
4305 let sql = include_str!(concat!(
4306 env!("CARGO_MANIFEST_DIR"),
4307 "/migrations.sqlite/20221109000000_test_schema.sql"
4308 ));
4309 db.pool
4310 .execute(sea_orm::Statement::from_string(
4311 db.pool.get_database_backend(),
4312 sql.into(),
4313 ))
4314 .await
4315 .unwrap();
4316 db
4317 });
4318
4319 db.runtime = Some(runtime);
4320
4321 Self {
4322 db: Some(Arc::new(db)),
4323 connection: None,
4324 }
4325 }
4326
4327 pub fn postgres(background: Arc<Background>) -> Self {
4328 static LOCK: Mutex<()> = Mutex::new(());
4329
4330 let _guard = LOCK.lock();
4331 let mut rng = StdRng::from_entropy();
4332 let url = format!(
4333 "postgres://postgres@localhost/zed-test-{}",
4334 rng.gen::<u128>()
4335 );
4336 let runtime = tokio::runtime::Builder::new_current_thread()
4337 .enable_io()
4338 .enable_time()
4339 .build()
4340 .unwrap();
4341
4342 let mut db = runtime.block_on(async {
4343 sqlx::Postgres::create_database(&url)
4344 .await
4345 .expect("failed to create test db");
4346 let mut options = ConnectOptions::new(url);
4347 options
4348 .max_connections(5)
4349 .idle_timeout(Duration::from_secs(0));
4350 let db = Database::new(options, Executor::Deterministic(background))
4351 .await
4352 .unwrap();
4353 let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
4354 db.migrate(Path::new(migrations_path), false).await.unwrap();
4355 db
4356 });
4357
4358 db.runtime = Some(runtime);
4359
4360 Self {
4361 db: Some(Arc::new(db)),
4362 connection: None,
4363 }
4364 }
4365
4366 pub fn db(&self) -> &Arc<Database> {
4367 self.db.as_ref().unwrap()
4368 }
4369 }
4370
4371 impl Drop for TestDb {
4372 fn drop(&mut self) {
4373 let db = self.db.take().unwrap();
4374 if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
4375 db.runtime.as_ref().unwrap().block_on(async {
4376 use util::ResultExt;
4377 let query = "
4378 SELECT pg_terminate_backend(pg_stat_activity.pid)
4379 FROM pg_stat_activity
4380 WHERE
4381 pg_stat_activity.datname = current_database() AND
4382 pid <> pg_backend_pid();
4383 ";
4384 db.pool
4385 .execute(sea_orm::Statement::from_string(
4386 db.pool.get_database_backend(),
4387 query.into(),
4388 ))
4389 .await
4390 .log_err();
4391 sqlx::Postgres::drop_database(db.options.get_url())
4392 .await
4393 .log_err();
4394 })
4395 }
4396 }
4397 }
4398}