1mod access_token;
2mod channel;
3mod channel_member;
4mod channel_path;
5mod contact;
6mod follower;
7mod language_server;
8mod project;
9mod project_collaborator;
10mod room;
11mod room_participant;
12mod server;
13mod signup;
14#[cfg(test)]
15mod tests;
16mod user;
17mod worktree;
18mod worktree_diagnostic_summary;
19mod worktree_entry;
20mod worktree_repository;
21mod worktree_repository_statuses;
22mod worktree_settings_file;
23
24use crate::executor::Executor;
25use crate::{Error, Result};
26use anyhow::anyhow;
27use collections::{BTreeMap, HashMap, HashSet};
28pub use contact::Contact;
29use dashmap::DashMap;
30use futures::StreamExt;
31use hyper::StatusCode;
32use rand::prelude::StdRng;
33use rand::{Rng, SeedableRng};
34use rpc::{proto, ConnectionId};
35use sea_orm::Condition;
36pub use sea_orm::ConnectOptions;
37use sea_orm::{
38 entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
39 DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
40 Statement, TransactionTrait,
41};
42use sea_query::{Alias, Expr, OnConflict, Query};
43use serde::{Deserialize, Serialize};
44pub use signup::{Invite, NewSignup, WaitlistSummary};
45use sqlx::migrate::{Migrate, Migration, MigrationSource};
46use sqlx::Connection;
47use std::fmt::Write as _;
48use std::ops::{Deref, DerefMut};
49use std::path::Path;
50use std::time::Duration;
51use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
52use tokio::sync::{Mutex, OwnedMutexGuard};
53pub use user::Model as User;
54
55pub struct Database {
56 options: ConnectOptions,
57 pool: DatabaseConnection,
58 rooms: DashMap<RoomId, Arc<Mutex<()>>>,
59 rng: Mutex<StdRng>,
60 executor: Executor,
61 #[cfg(test)]
62 runtime: Option<tokio::runtime::Runtime>,
63}
64
65impl Database {
66 pub async fn new(options: ConnectOptions, executor: Executor) -> Result<Self> {
67 Ok(Self {
68 options: options.clone(),
69 pool: sea_orm::Database::connect(options).await?,
70 rooms: DashMap::with_capacity(16384),
71 rng: Mutex::new(StdRng::seed_from_u64(0)),
72 executor,
73 #[cfg(test)]
74 runtime: None,
75 })
76 }
77
78 #[cfg(test)]
79 pub fn reset(&self) {
80 self.rooms.clear();
81 }
82
83 pub async fn migrate(
84 &self,
85 migrations_path: &Path,
86 ignore_checksum_mismatch: bool,
87 ) -> anyhow::Result<Vec<(Migration, Duration)>> {
88 let migrations = MigrationSource::resolve(migrations_path)
89 .await
90 .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
91
92 let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
93
94 connection.ensure_migrations_table().await?;
95 let applied_migrations: HashMap<_, _> = connection
96 .list_applied_migrations()
97 .await?
98 .into_iter()
99 .map(|m| (m.version, m))
100 .collect();
101
102 let mut new_migrations = Vec::new();
103 for migration in migrations {
104 match applied_migrations.get(&migration.version) {
105 Some(applied_migration) => {
106 if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
107 {
108 Err(anyhow!(
109 "checksum mismatch for applied migration {}",
110 migration.description
111 ))?;
112 }
113 }
114 None => {
115 let elapsed = connection.apply(&migration).await?;
116 new_migrations.push((migration, elapsed));
117 }
118 }
119 }
120
121 Ok(new_migrations)
122 }
123
124 pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
125 self.transaction(|tx| async move {
126 let server = server::ActiveModel {
127 environment: ActiveValue::set(environment.into()),
128 ..Default::default()
129 }
130 .insert(&*tx)
131 .await?;
132 Ok(server.id)
133 })
134 .await
135 }
136
137 pub async fn stale_room_ids(
138 &self,
139 environment: &str,
140 new_server_id: ServerId,
141 ) -> Result<Vec<RoomId>> {
142 self.transaction(|tx| async move {
143 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
144 enum QueryAs {
145 RoomId,
146 }
147
148 let stale_server_epochs = self
149 .stale_server_ids(environment, new_server_id, &tx)
150 .await?;
151 Ok(room_participant::Entity::find()
152 .select_only()
153 .column(room_participant::Column::RoomId)
154 .distinct()
155 .filter(
156 room_participant::Column::AnsweringConnectionServerId
157 .is_in(stale_server_epochs),
158 )
159 .into_values::<_, QueryAs>()
160 .all(&*tx)
161 .await?)
162 })
163 .await
164 }
165
166 pub async fn refresh_room(
167 &self,
168 room_id: RoomId,
169 new_server_id: ServerId,
170 ) -> Result<RoomGuard<RefreshedRoom>> {
171 self.room_transaction(room_id, |tx| async move {
172 let stale_participant_filter = Condition::all()
173 .add(room_participant::Column::RoomId.eq(room_id))
174 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
175 .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
176
177 let stale_participant_user_ids = room_participant::Entity::find()
178 .filter(stale_participant_filter.clone())
179 .all(&*tx)
180 .await?
181 .into_iter()
182 .map(|participant| participant.user_id)
183 .collect::<Vec<_>>();
184
185 // Delete participants who failed to reconnect and cancel their calls.
186 let mut canceled_calls_to_user_ids = Vec::new();
187 room_participant::Entity::delete_many()
188 .filter(stale_participant_filter)
189 .exec(&*tx)
190 .await?;
191 let called_participants = room_participant::Entity::find()
192 .filter(
193 Condition::all()
194 .add(
195 room_participant::Column::CallingUserId
196 .is_in(stale_participant_user_ids.iter().copied()),
197 )
198 .add(room_participant::Column::AnsweringConnectionId.is_null()),
199 )
200 .all(&*tx)
201 .await?;
202 room_participant::Entity::delete_many()
203 .filter(
204 room_participant::Column::Id
205 .is_in(called_participants.iter().map(|participant| participant.id)),
206 )
207 .exec(&*tx)
208 .await?;
209 canceled_calls_to_user_ids.extend(
210 called_participants
211 .into_iter()
212 .map(|participant| participant.user_id),
213 );
214
215 let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
216 let channel_members;
217 if let Some(channel_id) = channel_id {
218 channel_members = self.get_channel_members_internal(channel_id, &tx).await?;
219 } else {
220 channel_members = Vec::new();
221
222 // Delete the room if it becomes empty.
223 if room.participants.is_empty() {
224 project::Entity::delete_many()
225 .filter(project::Column::RoomId.eq(room_id))
226 .exec(&*tx)
227 .await?;
228 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
229 }
230 };
231
232 Ok(RefreshedRoom {
233 room,
234 channel_id,
235 channel_members,
236 stale_participant_user_ids,
237 canceled_calls_to_user_ids,
238 })
239 })
240 .await
241 }
242
243 pub async fn delete_stale_servers(
244 &self,
245 environment: &str,
246 new_server_id: ServerId,
247 ) -> Result<()> {
248 self.transaction(|tx| async move {
249 server::Entity::delete_many()
250 .filter(
251 Condition::all()
252 .add(server::Column::Environment.eq(environment))
253 .add(server::Column::Id.ne(new_server_id)),
254 )
255 .exec(&*tx)
256 .await?;
257 Ok(())
258 })
259 .await
260 }
261
262 async fn stale_server_ids(
263 &self,
264 environment: &str,
265 new_server_id: ServerId,
266 tx: &DatabaseTransaction,
267 ) -> Result<Vec<ServerId>> {
268 let stale_servers = server::Entity::find()
269 .filter(
270 Condition::all()
271 .add(server::Column::Environment.eq(environment))
272 .add(server::Column::Id.ne(new_server_id)),
273 )
274 .all(&*tx)
275 .await?;
276 Ok(stale_servers.into_iter().map(|server| server.id).collect())
277 }
278
279 // users
280
281 pub async fn create_user(
282 &self,
283 email_address: &str,
284 admin: bool,
285 params: NewUserParams,
286 ) -> Result<NewUserResult> {
287 self.transaction(|tx| async {
288 let tx = tx;
289 let user = user::Entity::insert(user::ActiveModel {
290 email_address: ActiveValue::set(Some(email_address.into())),
291 github_login: ActiveValue::set(params.github_login.clone()),
292 github_user_id: ActiveValue::set(Some(params.github_user_id)),
293 admin: ActiveValue::set(admin),
294 metrics_id: ActiveValue::set(Uuid::new_v4()),
295 ..Default::default()
296 })
297 .on_conflict(
298 OnConflict::column(user::Column::GithubLogin)
299 .update_column(user::Column::GithubLogin)
300 .to_owned(),
301 )
302 .exec_with_returning(&*tx)
303 .await?;
304
305 Ok(NewUserResult {
306 user_id: user.id,
307 metrics_id: user.metrics_id.to_string(),
308 signup_device_id: None,
309 inviting_user_id: None,
310 })
311 })
312 .await
313 }
314
315 pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
316 self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
317 .await
318 }
319
320 pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
321 self.transaction(|tx| async {
322 let tx = tx;
323 Ok(user::Entity::find()
324 .filter(user::Column::Id.is_in(ids.iter().copied()))
325 .all(&*tx)
326 .await?)
327 })
328 .await
329 }
330
331 pub async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>> {
332 self.transaction(|tx| async move {
333 Ok(user::Entity::find()
334 .filter(user::Column::GithubLogin.eq(github_login))
335 .one(&*tx)
336 .await?)
337 })
338 .await
339 }
340
341 pub async fn get_or_create_user_by_github_account(
342 &self,
343 github_login: &str,
344 github_user_id: Option<i32>,
345 github_email: Option<&str>,
346 ) -> Result<Option<User>> {
347 self.transaction(|tx| async move {
348 let tx = &*tx;
349 if let Some(github_user_id) = github_user_id {
350 if let Some(user_by_github_user_id) = user::Entity::find()
351 .filter(user::Column::GithubUserId.eq(github_user_id))
352 .one(tx)
353 .await?
354 {
355 let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
356 user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
357 Ok(Some(user_by_github_user_id.update(tx).await?))
358 } else if let Some(user_by_github_login) = user::Entity::find()
359 .filter(user::Column::GithubLogin.eq(github_login))
360 .one(tx)
361 .await?
362 {
363 let mut user_by_github_login = user_by_github_login.into_active_model();
364 user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
365 Ok(Some(user_by_github_login.update(tx).await?))
366 } else {
367 let user = user::Entity::insert(user::ActiveModel {
368 email_address: ActiveValue::set(github_email.map(|email| email.into())),
369 github_login: ActiveValue::set(github_login.into()),
370 github_user_id: ActiveValue::set(Some(github_user_id)),
371 admin: ActiveValue::set(false),
372 invite_count: ActiveValue::set(0),
373 invite_code: ActiveValue::set(None),
374 metrics_id: ActiveValue::set(Uuid::new_v4()),
375 ..Default::default()
376 })
377 .exec_with_returning(&*tx)
378 .await?;
379 Ok(Some(user))
380 }
381 } else {
382 Ok(user::Entity::find()
383 .filter(user::Column::GithubLogin.eq(github_login))
384 .one(tx)
385 .await?)
386 }
387 })
388 .await
389 }
390
391 pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
392 self.transaction(|tx| async move {
393 Ok(user::Entity::find()
394 .order_by_asc(user::Column::GithubLogin)
395 .limit(limit as u64)
396 .offset(page as u64 * limit as u64)
397 .all(&*tx)
398 .await?)
399 })
400 .await
401 }
402
403 pub async fn get_users_with_no_invites(
404 &self,
405 invited_by_another_user: bool,
406 ) -> Result<Vec<User>> {
407 self.transaction(|tx| async move {
408 Ok(user::Entity::find()
409 .filter(
410 user::Column::InviteCount
411 .eq(0)
412 .and(if invited_by_another_user {
413 user::Column::InviterId.is_not_null()
414 } else {
415 user::Column::InviterId.is_null()
416 }),
417 )
418 .all(&*tx)
419 .await?)
420 })
421 .await
422 }
423
424 pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
425 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
426 enum QueryAs {
427 MetricsId,
428 }
429
430 self.transaction(|tx| async move {
431 let metrics_id: Uuid = user::Entity::find_by_id(id)
432 .select_only()
433 .column(user::Column::MetricsId)
434 .into_values::<_, QueryAs>()
435 .one(&*tx)
436 .await?
437 .ok_or_else(|| anyhow!("could not find user"))?;
438 Ok(metrics_id.to_string())
439 })
440 .await
441 }
442
443 pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
444 self.transaction(|tx| async move {
445 user::Entity::update_many()
446 .filter(user::Column::Id.eq(id))
447 .set(user::ActiveModel {
448 admin: ActiveValue::set(is_admin),
449 ..Default::default()
450 })
451 .exec(&*tx)
452 .await?;
453 Ok(())
454 })
455 .await
456 }
457
458 pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
459 self.transaction(|tx| async move {
460 user::Entity::update_many()
461 .filter(user::Column::Id.eq(id))
462 .set(user::ActiveModel {
463 connected_once: ActiveValue::set(connected_once),
464 ..Default::default()
465 })
466 .exec(&*tx)
467 .await?;
468 Ok(())
469 })
470 .await
471 }
472
473 pub async fn destroy_user(&self, id: UserId) -> Result<()> {
474 self.transaction(|tx| async move {
475 access_token::Entity::delete_many()
476 .filter(access_token::Column::UserId.eq(id))
477 .exec(&*tx)
478 .await?;
479 user::Entity::delete_by_id(id).exec(&*tx).await?;
480 Ok(())
481 })
482 .await
483 }
484
485 // contacts
486
487 pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
488 #[derive(Debug, FromQueryResult)]
489 struct ContactWithUserBusyStatuses {
490 user_id_a: UserId,
491 user_id_b: UserId,
492 a_to_b: bool,
493 accepted: bool,
494 should_notify: bool,
495 user_a_busy: bool,
496 user_b_busy: bool,
497 }
498
499 self.transaction(|tx| async move {
500 let user_a_participant = Alias::new("user_a_participant");
501 let user_b_participant = Alias::new("user_b_participant");
502 let mut db_contacts = contact::Entity::find()
503 .column_as(
504 Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
505 .is_not_null(),
506 "user_a_busy",
507 )
508 .column_as(
509 Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
510 .is_not_null(),
511 "user_b_busy",
512 )
513 .filter(
514 contact::Column::UserIdA
515 .eq(user_id)
516 .or(contact::Column::UserIdB.eq(user_id)),
517 )
518 .join_as(
519 JoinType::LeftJoin,
520 contact::Relation::UserARoomParticipant.def(),
521 user_a_participant,
522 )
523 .join_as(
524 JoinType::LeftJoin,
525 contact::Relation::UserBRoomParticipant.def(),
526 user_b_participant,
527 )
528 .into_model::<ContactWithUserBusyStatuses>()
529 .stream(&*tx)
530 .await?;
531
532 let mut contacts = Vec::new();
533 while let Some(db_contact) = db_contacts.next().await {
534 let db_contact = db_contact?;
535 if db_contact.user_id_a == user_id {
536 if db_contact.accepted {
537 contacts.push(Contact::Accepted {
538 user_id: db_contact.user_id_b,
539 should_notify: db_contact.should_notify && db_contact.a_to_b,
540 busy: db_contact.user_b_busy,
541 });
542 } else if db_contact.a_to_b {
543 contacts.push(Contact::Outgoing {
544 user_id: db_contact.user_id_b,
545 })
546 } else {
547 contacts.push(Contact::Incoming {
548 user_id: db_contact.user_id_b,
549 should_notify: db_contact.should_notify,
550 });
551 }
552 } else if db_contact.accepted {
553 contacts.push(Contact::Accepted {
554 user_id: db_contact.user_id_a,
555 should_notify: db_contact.should_notify && !db_contact.a_to_b,
556 busy: db_contact.user_a_busy,
557 });
558 } else if db_contact.a_to_b {
559 contacts.push(Contact::Incoming {
560 user_id: db_contact.user_id_a,
561 should_notify: db_contact.should_notify,
562 });
563 } else {
564 contacts.push(Contact::Outgoing {
565 user_id: db_contact.user_id_a,
566 });
567 }
568 }
569
570 contacts.sort_unstable_by_key(|contact| contact.user_id());
571
572 Ok(contacts)
573 })
574 .await
575 }
576
577 pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
578 self.transaction(|tx| async move {
579 let participant = room_participant::Entity::find()
580 .filter(room_participant::Column::UserId.eq(user_id))
581 .one(&*tx)
582 .await?;
583 Ok(participant.is_some())
584 })
585 .await
586 }
587
588 pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
589 self.transaction(|tx| async move {
590 let (id_a, id_b) = if user_id_1 < user_id_2 {
591 (user_id_1, user_id_2)
592 } else {
593 (user_id_2, user_id_1)
594 };
595
596 Ok(contact::Entity::find()
597 .filter(
598 contact::Column::UserIdA
599 .eq(id_a)
600 .and(contact::Column::UserIdB.eq(id_b))
601 .and(contact::Column::Accepted.eq(true)),
602 )
603 .one(&*tx)
604 .await?
605 .is_some())
606 })
607 .await
608 }
609
610 pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
611 self.transaction(|tx| async move {
612 let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
613 (sender_id, receiver_id, true)
614 } else {
615 (receiver_id, sender_id, false)
616 };
617
618 let rows_affected = contact::Entity::insert(contact::ActiveModel {
619 user_id_a: ActiveValue::set(id_a),
620 user_id_b: ActiveValue::set(id_b),
621 a_to_b: ActiveValue::set(a_to_b),
622 accepted: ActiveValue::set(false),
623 should_notify: ActiveValue::set(true),
624 ..Default::default()
625 })
626 .on_conflict(
627 OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
628 .values([
629 (contact::Column::Accepted, true.into()),
630 (contact::Column::ShouldNotify, false.into()),
631 ])
632 .action_and_where(
633 contact::Column::Accepted.eq(false).and(
634 contact::Column::AToB
635 .eq(a_to_b)
636 .and(contact::Column::UserIdA.eq(id_b))
637 .or(contact::Column::AToB
638 .ne(a_to_b)
639 .and(contact::Column::UserIdA.eq(id_a))),
640 ),
641 )
642 .to_owned(),
643 )
644 .exec_without_returning(&*tx)
645 .await?;
646
647 if rows_affected == 1 {
648 Ok(())
649 } else {
650 Err(anyhow!("contact already requested"))?
651 }
652 })
653 .await
654 }
655
656 /// Returns a bool indicating whether the removed contact had originally accepted or not
657 ///
658 /// Deletes the contact identified by the requester and responder ids, and then returns
659 /// whether the deleted contact had originally accepted or was a pending contact request.
660 ///
661 /// # Arguments
662 ///
663 /// * `requester_id` - The user that initiates this request
664 /// * `responder_id` - The user that will be removed
665 pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<bool> {
666 self.transaction(|tx| async move {
667 let (id_a, id_b) = if responder_id < requester_id {
668 (responder_id, requester_id)
669 } else {
670 (requester_id, responder_id)
671 };
672
673 let contact = contact::Entity::find()
674 .filter(
675 contact::Column::UserIdA
676 .eq(id_a)
677 .and(contact::Column::UserIdB.eq(id_b)),
678 )
679 .one(&*tx)
680 .await?
681 .ok_or_else(|| anyhow!("no such contact"))?;
682
683 contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
684 Ok(contact.accepted)
685 })
686 .await
687 }
688
689 pub async fn dismiss_contact_notification(
690 &self,
691 user_id: UserId,
692 contact_user_id: UserId,
693 ) -> Result<()> {
694 self.transaction(|tx| async move {
695 let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
696 (user_id, contact_user_id, true)
697 } else {
698 (contact_user_id, user_id, false)
699 };
700
701 let result = contact::Entity::update_many()
702 .set(contact::ActiveModel {
703 should_notify: ActiveValue::set(false),
704 ..Default::default()
705 })
706 .filter(
707 contact::Column::UserIdA
708 .eq(id_a)
709 .and(contact::Column::UserIdB.eq(id_b))
710 .and(
711 contact::Column::AToB
712 .eq(a_to_b)
713 .and(contact::Column::Accepted.eq(true))
714 .or(contact::Column::AToB
715 .ne(a_to_b)
716 .and(contact::Column::Accepted.eq(false))),
717 ),
718 )
719 .exec(&*tx)
720 .await?;
721 if result.rows_affected == 0 {
722 Err(anyhow!("no such contact request"))?
723 } else {
724 Ok(())
725 }
726 })
727 .await
728 }
729
730 pub async fn respond_to_contact_request(
731 &self,
732 responder_id: UserId,
733 requester_id: UserId,
734 accept: bool,
735 ) -> Result<()> {
736 self.transaction(|tx| async move {
737 let (id_a, id_b, a_to_b) = if responder_id < requester_id {
738 (responder_id, requester_id, false)
739 } else {
740 (requester_id, responder_id, true)
741 };
742 let rows_affected = if accept {
743 let result = contact::Entity::update_many()
744 .set(contact::ActiveModel {
745 accepted: ActiveValue::set(true),
746 should_notify: ActiveValue::set(true),
747 ..Default::default()
748 })
749 .filter(
750 contact::Column::UserIdA
751 .eq(id_a)
752 .and(contact::Column::UserIdB.eq(id_b))
753 .and(contact::Column::AToB.eq(a_to_b)),
754 )
755 .exec(&*tx)
756 .await?;
757 result.rows_affected
758 } else {
759 let result = contact::Entity::delete_many()
760 .filter(
761 contact::Column::UserIdA
762 .eq(id_a)
763 .and(contact::Column::UserIdB.eq(id_b))
764 .and(contact::Column::AToB.eq(a_to_b))
765 .and(contact::Column::Accepted.eq(false)),
766 )
767 .exec(&*tx)
768 .await?;
769
770 result.rows_affected
771 };
772
773 if rows_affected == 1 {
774 Ok(())
775 } else {
776 Err(anyhow!("no such contact request"))?
777 }
778 })
779 .await
780 }
781
782 pub fn fuzzy_like_string(string: &str) -> String {
783 let mut result = String::with_capacity(string.len() * 2 + 1);
784 for c in string.chars() {
785 if c.is_alphanumeric() {
786 result.push('%');
787 result.push(c);
788 }
789 }
790 result.push('%');
791 result
792 }
793
794 pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
795 self.transaction(|tx| async {
796 let tx = tx;
797 let like_string = Self::fuzzy_like_string(name_query);
798 let query = "
799 SELECT users.*
800 FROM users
801 WHERE github_login ILIKE $1
802 ORDER BY github_login <-> $2
803 LIMIT $3
804 ";
805
806 Ok(user::Entity::find()
807 .from_raw_sql(Statement::from_sql_and_values(
808 self.pool.get_database_backend(),
809 query.into(),
810 vec![like_string.into(), name_query.into(), limit.into()],
811 ))
812 .all(&*tx)
813 .await?)
814 })
815 .await
816 }
817
818 // signups
819
820 pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
821 self.transaction(|tx| async move {
822 signup::Entity::insert(signup::ActiveModel {
823 email_address: ActiveValue::set(signup.email_address.clone()),
824 email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
825 email_confirmation_sent: ActiveValue::set(false),
826 platform_mac: ActiveValue::set(signup.platform_mac),
827 platform_windows: ActiveValue::set(signup.platform_windows),
828 platform_linux: ActiveValue::set(signup.platform_linux),
829 platform_unknown: ActiveValue::set(false),
830 editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
831 programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
832 device_id: ActiveValue::set(signup.device_id.clone()),
833 added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
834 ..Default::default()
835 })
836 .on_conflict(
837 OnConflict::column(signup::Column::EmailAddress)
838 .update_columns([
839 signup::Column::PlatformMac,
840 signup::Column::PlatformWindows,
841 signup::Column::PlatformLinux,
842 signup::Column::EditorFeatures,
843 signup::Column::ProgrammingLanguages,
844 signup::Column::DeviceId,
845 signup::Column::AddedToMailingList,
846 ])
847 .to_owned(),
848 )
849 .exec(&*tx)
850 .await?;
851 Ok(())
852 })
853 .await
854 }
855
856 pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
857 self.transaction(|tx| async move {
858 let signup = signup::Entity::find()
859 .filter(signup::Column::EmailAddress.eq(email_address))
860 .one(&*tx)
861 .await?
862 .ok_or_else(|| {
863 anyhow!("signup with email address {} doesn't exist", email_address)
864 })?;
865
866 Ok(signup)
867 })
868 .await
869 }
870
871 pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
872 self.transaction(|tx| async move {
873 let query = "
874 SELECT
875 COUNT(*) as count,
876 COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
877 COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
878 COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
879 COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
880 FROM (
881 SELECT *
882 FROM signups
883 WHERE
884 NOT email_confirmation_sent
885 ) AS unsent
886 ";
887 Ok(
888 WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
889 self.pool.get_database_backend(),
890 query.into(),
891 vec![],
892 ))
893 .one(&*tx)
894 .await?
895 .ok_or_else(|| anyhow!("invalid result"))?,
896 )
897 })
898 .await
899 }
900
901 pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
902 let emails = invites
903 .iter()
904 .map(|s| s.email_address.as_str())
905 .collect::<Vec<_>>();
906 self.transaction(|tx| async {
907 let tx = tx;
908 signup::Entity::update_many()
909 .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
910 .set(signup::ActiveModel {
911 email_confirmation_sent: ActiveValue::set(true),
912 ..Default::default()
913 })
914 .exec(&*tx)
915 .await?;
916 Ok(())
917 })
918 .await
919 }
920
921 pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
922 self.transaction(|tx| async move {
923 Ok(signup::Entity::find()
924 .select_only()
925 .column(signup::Column::EmailAddress)
926 .column(signup::Column::EmailConfirmationCode)
927 .filter(
928 signup::Column::EmailConfirmationSent.eq(false).and(
929 signup::Column::PlatformMac
930 .eq(true)
931 .or(signup::Column::PlatformUnknown.eq(true)),
932 ),
933 )
934 .order_by_asc(signup::Column::CreatedAt)
935 .limit(count as u64)
936 .into_model()
937 .all(&*tx)
938 .await?)
939 })
940 .await
941 }
942
943 // invite codes
944
945 pub async fn create_invite_from_code(
946 &self,
947 code: &str,
948 email_address: &str,
949 device_id: Option<&str>,
950 added_to_mailing_list: bool,
951 ) -> Result<Invite> {
952 self.transaction(|tx| async move {
953 let existing_user = user::Entity::find()
954 .filter(user::Column::EmailAddress.eq(email_address))
955 .one(&*tx)
956 .await?;
957
958 if existing_user.is_some() {
959 Err(anyhow!("email address is already in use"))?;
960 }
961
962 let inviting_user_with_invites = match user::Entity::find()
963 .filter(
964 user::Column::InviteCode
965 .eq(code)
966 .and(user::Column::InviteCount.gt(0)),
967 )
968 .one(&*tx)
969 .await?
970 {
971 Some(inviting_user) => inviting_user,
972 None => {
973 return Err(Error::Http(
974 StatusCode::UNAUTHORIZED,
975 "unable to find an invite code with invites remaining".to_string(),
976 ))?
977 }
978 };
979 user::Entity::update_many()
980 .filter(
981 user::Column::Id
982 .eq(inviting_user_with_invites.id)
983 .and(user::Column::InviteCount.gt(0)),
984 )
985 .col_expr(
986 user::Column::InviteCount,
987 Expr::col(user::Column::InviteCount).sub(1),
988 )
989 .exec(&*tx)
990 .await?;
991
992 let signup = signup::Entity::insert(signup::ActiveModel {
993 email_address: ActiveValue::set(email_address.into()),
994 email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
995 email_confirmation_sent: ActiveValue::set(false),
996 inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
997 platform_linux: ActiveValue::set(false),
998 platform_mac: ActiveValue::set(false),
999 platform_windows: ActiveValue::set(false),
1000 platform_unknown: ActiveValue::set(true),
1001 device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
1002 added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
1003 ..Default::default()
1004 })
1005 .on_conflict(
1006 OnConflict::column(signup::Column::EmailAddress)
1007 .update_column(signup::Column::InvitingUserId)
1008 .to_owned(),
1009 )
1010 .exec_with_returning(&*tx)
1011 .await?;
1012
1013 Ok(Invite {
1014 email_address: signup.email_address,
1015 email_confirmation_code: signup.email_confirmation_code,
1016 })
1017 })
1018 .await
1019 }
1020
1021 pub async fn create_user_from_invite(
1022 &self,
1023 invite: &Invite,
1024 user: NewUserParams,
1025 ) -> Result<Option<NewUserResult>> {
1026 self.transaction(|tx| async {
1027 let tx = tx;
1028 let signup = signup::Entity::find()
1029 .filter(
1030 signup::Column::EmailAddress
1031 .eq(invite.email_address.as_str())
1032 .and(
1033 signup::Column::EmailConfirmationCode
1034 .eq(invite.email_confirmation_code.as_str()),
1035 ),
1036 )
1037 .one(&*tx)
1038 .await?
1039 .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
1040
1041 if signup.user_id.is_some() {
1042 return Ok(None);
1043 }
1044
1045 let user = user::Entity::insert(user::ActiveModel {
1046 email_address: ActiveValue::set(Some(invite.email_address.clone())),
1047 github_login: ActiveValue::set(user.github_login.clone()),
1048 github_user_id: ActiveValue::set(Some(user.github_user_id)),
1049 admin: ActiveValue::set(false),
1050 invite_count: ActiveValue::set(user.invite_count),
1051 invite_code: ActiveValue::set(Some(random_invite_code())),
1052 metrics_id: ActiveValue::set(Uuid::new_v4()),
1053 ..Default::default()
1054 })
1055 .on_conflict(
1056 OnConflict::column(user::Column::GithubLogin)
1057 .update_columns([
1058 user::Column::EmailAddress,
1059 user::Column::GithubUserId,
1060 user::Column::Admin,
1061 ])
1062 .to_owned(),
1063 )
1064 .exec_with_returning(&*tx)
1065 .await?;
1066
1067 let mut signup = signup.into_active_model();
1068 signup.user_id = ActiveValue::set(Some(user.id));
1069 let signup = signup.update(&*tx).await?;
1070
1071 if let Some(inviting_user_id) = signup.inviting_user_id {
1072 let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1073 (inviting_user_id, user.id, true)
1074 } else {
1075 (user.id, inviting_user_id, false)
1076 };
1077
1078 contact::Entity::insert(contact::ActiveModel {
1079 user_id_a: ActiveValue::set(user_id_a),
1080 user_id_b: ActiveValue::set(user_id_b),
1081 a_to_b: ActiveValue::set(a_to_b),
1082 should_notify: ActiveValue::set(true),
1083 accepted: ActiveValue::set(true),
1084 ..Default::default()
1085 })
1086 .on_conflict(OnConflict::new().do_nothing().to_owned())
1087 .exec_without_returning(&*tx)
1088 .await?;
1089 }
1090
1091 Ok(Some(NewUserResult {
1092 user_id: user.id,
1093 metrics_id: user.metrics_id.to_string(),
1094 inviting_user_id: signup.inviting_user_id,
1095 signup_device_id: signup.device_id,
1096 }))
1097 })
1098 .await
1099 }
1100
1101 pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1102 self.transaction(|tx| async move {
1103 if count > 0 {
1104 user::Entity::update_many()
1105 .filter(
1106 user::Column::Id
1107 .eq(id)
1108 .and(user::Column::InviteCode.is_null()),
1109 )
1110 .set(user::ActiveModel {
1111 invite_code: ActiveValue::set(Some(random_invite_code())),
1112 ..Default::default()
1113 })
1114 .exec(&*tx)
1115 .await?;
1116 }
1117
1118 user::Entity::update_many()
1119 .filter(user::Column::Id.eq(id))
1120 .set(user::ActiveModel {
1121 invite_count: ActiveValue::set(count),
1122 ..Default::default()
1123 })
1124 .exec(&*tx)
1125 .await?;
1126 Ok(())
1127 })
1128 .await
1129 }
1130
1131 pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1132 self.transaction(|tx| async move {
1133 match user::Entity::find_by_id(id).one(&*tx).await? {
1134 Some(user) if user.invite_code.is_some() => {
1135 Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1136 }
1137 _ => Ok(None),
1138 }
1139 })
1140 .await
1141 }
1142
1143 pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1144 self.transaction(|tx| async move {
1145 user::Entity::find()
1146 .filter(user::Column::InviteCode.eq(code))
1147 .one(&*tx)
1148 .await?
1149 .ok_or_else(|| {
1150 Error::Http(
1151 StatusCode::NOT_FOUND,
1152 "that invite code does not exist".to_string(),
1153 )
1154 })
1155 })
1156 .await
1157 }
1158
1159 // rooms
1160
1161 pub async fn incoming_call_for_user(
1162 &self,
1163 user_id: UserId,
1164 ) -> Result<Option<proto::IncomingCall>> {
1165 self.transaction(|tx| async move {
1166 let pending_participant = room_participant::Entity::find()
1167 .filter(
1168 room_participant::Column::UserId
1169 .eq(user_id)
1170 .and(room_participant::Column::AnsweringConnectionId.is_null()),
1171 )
1172 .one(&*tx)
1173 .await?;
1174
1175 if let Some(pending_participant) = pending_participant {
1176 let room = self.get_room(pending_participant.room_id, &tx).await?;
1177 Ok(Self::build_incoming_call(&room, user_id))
1178 } else {
1179 Ok(None)
1180 }
1181 })
1182 .await
1183 }
1184
1185 pub async fn create_room(
1186 &self,
1187 user_id: UserId,
1188 connection: ConnectionId,
1189 live_kit_room: &str,
1190 ) -> Result<proto::Room> {
1191 self.transaction(|tx| async move {
1192 let room = room::ActiveModel {
1193 live_kit_room: ActiveValue::set(live_kit_room.into()),
1194 ..Default::default()
1195 }
1196 .insert(&*tx)
1197 .await?;
1198 room_participant::ActiveModel {
1199 room_id: ActiveValue::set(room.id),
1200 user_id: ActiveValue::set(user_id),
1201 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1202 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1203 connection.owner_id as i32,
1204 ))),
1205 answering_connection_lost: ActiveValue::set(false),
1206 calling_user_id: ActiveValue::set(user_id),
1207 calling_connection_id: ActiveValue::set(connection.id as i32),
1208 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1209 connection.owner_id as i32,
1210 ))),
1211 ..Default::default()
1212 }
1213 .insert(&*tx)
1214 .await?;
1215
1216 let room = self.get_room(room.id, &tx).await?;
1217 Ok(room)
1218 })
1219 .await
1220 }
1221
1222 pub async fn call(
1223 &self,
1224 room_id: RoomId,
1225 calling_user_id: UserId,
1226 calling_connection: ConnectionId,
1227 called_user_id: UserId,
1228 initial_project_id: Option<ProjectId>,
1229 ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1230 self.room_transaction(room_id, |tx| async move {
1231 room_participant::ActiveModel {
1232 room_id: ActiveValue::set(room_id),
1233 user_id: ActiveValue::set(called_user_id),
1234 answering_connection_lost: ActiveValue::set(false),
1235 calling_user_id: ActiveValue::set(calling_user_id),
1236 calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1237 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1238 calling_connection.owner_id as i32,
1239 ))),
1240 initial_project_id: ActiveValue::set(initial_project_id),
1241 ..Default::default()
1242 }
1243 .insert(&*tx)
1244 .await?;
1245
1246 let room = self.get_room(room_id, &tx).await?;
1247 let incoming_call = Self::build_incoming_call(&room, called_user_id)
1248 .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1249 Ok((room, incoming_call))
1250 })
1251 .await
1252 }
1253
1254 pub async fn call_failed(
1255 &self,
1256 room_id: RoomId,
1257 called_user_id: UserId,
1258 ) -> Result<RoomGuard<proto::Room>> {
1259 self.room_transaction(room_id, |tx| async move {
1260 room_participant::Entity::delete_many()
1261 .filter(
1262 room_participant::Column::RoomId
1263 .eq(room_id)
1264 .and(room_participant::Column::UserId.eq(called_user_id)),
1265 )
1266 .exec(&*tx)
1267 .await?;
1268 let room = self.get_room(room_id, &tx).await?;
1269 Ok(room)
1270 })
1271 .await
1272 }
1273
1274 pub async fn decline_call(
1275 &self,
1276 expected_room_id: Option<RoomId>,
1277 user_id: UserId,
1278 ) -> Result<Option<RoomGuard<proto::Room>>> {
1279 self.optional_room_transaction(|tx| async move {
1280 let mut filter = Condition::all()
1281 .add(room_participant::Column::UserId.eq(user_id))
1282 .add(room_participant::Column::AnsweringConnectionId.is_null());
1283 if let Some(room_id) = expected_room_id {
1284 filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1285 }
1286 let participant = room_participant::Entity::find()
1287 .filter(filter)
1288 .one(&*tx)
1289 .await?;
1290
1291 let participant = if let Some(participant) = participant {
1292 participant
1293 } else if expected_room_id.is_some() {
1294 return Err(anyhow!("could not find call to decline"))?;
1295 } else {
1296 return Ok(None);
1297 };
1298
1299 let room_id = participant.room_id;
1300 room_participant::Entity::delete(participant.into_active_model())
1301 .exec(&*tx)
1302 .await?;
1303
1304 let room = self.get_room(room_id, &tx).await?;
1305 Ok(Some((room_id, room)))
1306 })
1307 .await
1308 }
1309
1310 pub async fn cancel_call(
1311 &self,
1312 room_id: RoomId,
1313 calling_connection: ConnectionId,
1314 called_user_id: UserId,
1315 ) -> Result<RoomGuard<proto::Room>> {
1316 self.room_transaction(room_id, |tx| async move {
1317 let participant = room_participant::Entity::find()
1318 .filter(
1319 Condition::all()
1320 .add(room_participant::Column::UserId.eq(called_user_id))
1321 .add(room_participant::Column::RoomId.eq(room_id))
1322 .add(
1323 room_participant::Column::CallingConnectionId
1324 .eq(calling_connection.id as i32),
1325 )
1326 .add(
1327 room_participant::Column::CallingConnectionServerId
1328 .eq(calling_connection.owner_id as i32),
1329 )
1330 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1331 )
1332 .one(&*tx)
1333 .await?
1334 .ok_or_else(|| anyhow!("no call to cancel"))?;
1335
1336 room_participant::Entity::delete(participant.into_active_model())
1337 .exec(&*tx)
1338 .await?;
1339
1340 let room = self.get_room(room_id, &tx).await?;
1341 Ok(room)
1342 })
1343 .await
1344 }
1345
1346 pub async fn is_current_room_different_channel(
1347 &self,
1348 user_id: UserId,
1349 channel_id: ChannelId,
1350 ) -> Result<bool> {
1351 self.transaction(|tx| async move {
1352 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1353 enum QueryAs {
1354 ChannelId,
1355 }
1356
1357 let channel_id_model: Option<ChannelId> = room_participant::Entity::find()
1358 .select_only()
1359 .column_as(room::Column::ChannelId, QueryAs::ChannelId)
1360 .inner_join(room::Entity)
1361 .filter(room_participant::Column::UserId.eq(user_id))
1362 .into_values::<_, QueryAs>()
1363 .one(&*tx)
1364 .await?;
1365
1366 let result = channel_id_model
1367 .map(|channel_id_model| channel_id_model != channel_id)
1368 .unwrap_or(false);
1369
1370 Ok(result)
1371 })
1372 .await
1373 }
1374
1375 pub async fn join_room(
1376 &self,
1377 room_id: RoomId,
1378 user_id: UserId,
1379 connection: ConnectionId,
1380 ) -> Result<RoomGuard<JoinRoom>> {
1381 self.room_transaction(room_id, |tx| async move {
1382 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1383 enum QueryChannelId {
1384 ChannelId,
1385 }
1386 let channel_id: Option<ChannelId> = room::Entity::find()
1387 .select_only()
1388 .column(room::Column::ChannelId)
1389 .filter(room::Column::Id.eq(room_id))
1390 .into_values::<_, QueryChannelId>()
1391 .one(&*tx)
1392 .await?
1393 .ok_or_else(|| anyhow!("no such room"))?;
1394
1395 if let Some(channel_id) = channel_id {
1396 self.check_user_is_channel_member(channel_id, user_id, &*tx)
1397 .await?;
1398
1399 room_participant::Entity::insert_many([room_participant::ActiveModel {
1400 room_id: ActiveValue::set(room_id),
1401 user_id: ActiveValue::set(user_id),
1402 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1403 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1404 connection.owner_id as i32,
1405 ))),
1406 answering_connection_lost: ActiveValue::set(false),
1407 calling_user_id: ActiveValue::set(user_id),
1408 calling_connection_id: ActiveValue::set(connection.id as i32),
1409 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1410 connection.owner_id as i32,
1411 ))),
1412 ..Default::default()
1413 }])
1414 .on_conflict(
1415 OnConflict::columns([room_participant::Column::UserId])
1416 .update_columns([
1417 room_participant::Column::AnsweringConnectionId,
1418 room_participant::Column::AnsweringConnectionServerId,
1419 room_participant::Column::AnsweringConnectionLost,
1420 ])
1421 .to_owned(),
1422 )
1423 .exec(&*tx)
1424 .await?;
1425 } else {
1426 let result = room_participant::Entity::update_many()
1427 .filter(
1428 Condition::all()
1429 .add(room_participant::Column::RoomId.eq(room_id))
1430 .add(room_participant::Column::UserId.eq(user_id))
1431 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1432 )
1433 .set(room_participant::ActiveModel {
1434 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1435 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1436 connection.owner_id as i32,
1437 ))),
1438 answering_connection_lost: ActiveValue::set(false),
1439 ..Default::default()
1440 })
1441 .exec(&*tx)
1442 .await?;
1443 if result.rows_affected == 0 {
1444 Err(anyhow!("room does not exist or was already joined"))?;
1445 }
1446 }
1447
1448 let room = self.get_room(room_id, &tx).await?;
1449 let channel_members = if let Some(channel_id) = channel_id {
1450 self.get_channel_members_internal(channel_id, &tx).await?
1451 } else {
1452 Vec::new()
1453 };
1454 Ok(JoinRoom {
1455 room,
1456 channel_id,
1457 channel_members,
1458 })
1459 })
1460 .await
1461 }
1462
1463 pub async fn rejoin_room(
1464 &self,
1465 rejoin_room: proto::RejoinRoom,
1466 user_id: UserId,
1467 connection: ConnectionId,
1468 ) -> Result<RoomGuard<RejoinedRoom>> {
1469 let room_id = RoomId::from_proto(rejoin_room.id);
1470 self.room_transaction(room_id, |tx| async {
1471 let tx = tx;
1472 let participant_update = room_participant::Entity::update_many()
1473 .filter(
1474 Condition::all()
1475 .add(room_participant::Column::RoomId.eq(room_id))
1476 .add(room_participant::Column::UserId.eq(user_id))
1477 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1478 .add(
1479 Condition::any()
1480 .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1481 .add(
1482 room_participant::Column::AnsweringConnectionServerId
1483 .ne(connection.owner_id as i32),
1484 ),
1485 ),
1486 )
1487 .set(room_participant::ActiveModel {
1488 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1489 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1490 connection.owner_id as i32,
1491 ))),
1492 answering_connection_lost: ActiveValue::set(false),
1493 ..Default::default()
1494 })
1495 .exec(&*tx)
1496 .await?;
1497 if participant_update.rows_affected == 0 {
1498 return Err(anyhow!("room does not exist or was already joined"))?;
1499 }
1500
1501 let mut reshared_projects = Vec::new();
1502 for reshared_project in &rejoin_room.reshared_projects {
1503 let project_id = ProjectId::from_proto(reshared_project.project_id);
1504 let project = project::Entity::find_by_id(project_id)
1505 .one(&*tx)
1506 .await?
1507 .ok_or_else(|| anyhow!("project does not exist"))?;
1508 if project.host_user_id != user_id {
1509 return Err(anyhow!("no such project"))?;
1510 }
1511
1512 let mut collaborators = project
1513 .find_related(project_collaborator::Entity)
1514 .all(&*tx)
1515 .await?;
1516 let host_ix = collaborators
1517 .iter()
1518 .position(|collaborator| {
1519 collaborator.user_id == user_id && collaborator.is_host
1520 })
1521 .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1522 let host = collaborators.swap_remove(host_ix);
1523 let old_connection_id = host.connection();
1524
1525 project::Entity::update(project::ActiveModel {
1526 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1527 host_connection_server_id: ActiveValue::set(Some(ServerId(
1528 connection.owner_id as i32,
1529 ))),
1530 ..project.into_active_model()
1531 })
1532 .exec(&*tx)
1533 .await?;
1534 project_collaborator::Entity::update(project_collaborator::ActiveModel {
1535 connection_id: ActiveValue::set(connection.id as i32),
1536 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1537 ..host.into_active_model()
1538 })
1539 .exec(&*tx)
1540 .await?;
1541
1542 self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1543 .await?;
1544
1545 reshared_projects.push(ResharedProject {
1546 id: project_id,
1547 old_connection_id,
1548 collaborators: collaborators
1549 .iter()
1550 .map(|collaborator| ProjectCollaborator {
1551 connection_id: collaborator.connection(),
1552 user_id: collaborator.user_id,
1553 replica_id: collaborator.replica_id,
1554 is_host: collaborator.is_host,
1555 })
1556 .collect(),
1557 worktrees: reshared_project.worktrees.clone(),
1558 });
1559 }
1560
1561 project::Entity::delete_many()
1562 .filter(
1563 Condition::all()
1564 .add(project::Column::RoomId.eq(room_id))
1565 .add(project::Column::HostUserId.eq(user_id))
1566 .add(
1567 project::Column::Id
1568 .is_not_in(reshared_projects.iter().map(|project| project.id)),
1569 ),
1570 )
1571 .exec(&*tx)
1572 .await?;
1573
1574 let mut rejoined_projects = Vec::new();
1575 for rejoined_project in &rejoin_room.rejoined_projects {
1576 let project_id = ProjectId::from_proto(rejoined_project.id);
1577 let Some(project) = project::Entity::find_by_id(project_id)
1578 .one(&*tx)
1579 .await? else { continue };
1580
1581 let mut worktrees = Vec::new();
1582 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1583 for db_worktree in db_worktrees {
1584 let mut worktree = RejoinedWorktree {
1585 id: db_worktree.id as u64,
1586 abs_path: db_worktree.abs_path,
1587 root_name: db_worktree.root_name,
1588 visible: db_worktree.visible,
1589 updated_entries: Default::default(),
1590 removed_entries: Default::default(),
1591 updated_repositories: Default::default(),
1592 removed_repositories: Default::default(),
1593 diagnostic_summaries: Default::default(),
1594 settings_files: Default::default(),
1595 scan_id: db_worktree.scan_id as u64,
1596 completed_scan_id: db_worktree.completed_scan_id as u64,
1597 };
1598
1599 let rejoined_worktree = rejoined_project
1600 .worktrees
1601 .iter()
1602 .find(|worktree| worktree.id == db_worktree.id as u64);
1603
1604 // File entries
1605 {
1606 let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1607 worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1608 } else {
1609 worktree_entry::Column::IsDeleted.eq(false)
1610 };
1611
1612 let mut db_entries = worktree_entry::Entity::find()
1613 .filter(
1614 Condition::all()
1615 .add(worktree_entry::Column::ProjectId.eq(project.id))
1616 .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1617 .add(entry_filter),
1618 )
1619 .stream(&*tx)
1620 .await?;
1621
1622 while let Some(db_entry) = db_entries.next().await {
1623 let db_entry = db_entry?;
1624 if db_entry.is_deleted {
1625 worktree.removed_entries.push(db_entry.id as u64);
1626 } else {
1627 worktree.updated_entries.push(proto::Entry {
1628 id: db_entry.id as u64,
1629 is_dir: db_entry.is_dir,
1630 path: db_entry.path,
1631 inode: db_entry.inode as u64,
1632 mtime: Some(proto::Timestamp {
1633 seconds: db_entry.mtime_seconds as u64,
1634 nanos: db_entry.mtime_nanos as u32,
1635 }),
1636 is_symlink: db_entry.is_symlink,
1637 is_ignored: db_entry.is_ignored,
1638 is_external: db_entry.is_external,
1639 git_status: db_entry.git_status.map(|status| status as i32),
1640 });
1641 }
1642 }
1643 }
1644
1645 // Repository Entries
1646 {
1647 let repository_entry_filter =
1648 if let Some(rejoined_worktree) = rejoined_worktree {
1649 worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
1650 } else {
1651 worktree_repository::Column::IsDeleted.eq(false)
1652 };
1653
1654 let mut db_repositories = worktree_repository::Entity::find()
1655 .filter(
1656 Condition::all()
1657 .add(worktree_repository::Column::ProjectId.eq(project.id))
1658 .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
1659 .add(repository_entry_filter),
1660 )
1661 .stream(&*tx)
1662 .await?;
1663
1664 while let Some(db_repository) = db_repositories.next().await {
1665 let db_repository = db_repository?;
1666 if db_repository.is_deleted {
1667 worktree
1668 .removed_repositories
1669 .push(db_repository.work_directory_id as u64);
1670 } else {
1671 worktree.updated_repositories.push(proto::RepositoryEntry {
1672 work_directory_id: db_repository.work_directory_id as u64,
1673 branch: db_repository.branch,
1674 });
1675 }
1676 }
1677 }
1678
1679 worktrees.push(worktree);
1680 }
1681
1682 let language_servers = project
1683 .find_related(language_server::Entity)
1684 .all(&*tx)
1685 .await?
1686 .into_iter()
1687 .map(|language_server| proto::LanguageServer {
1688 id: language_server.id as u64,
1689 name: language_server.name,
1690 })
1691 .collect::<Vec<_>>();
1692
1693 {
1694 let mut db_settings_files = worktree_settings_file::Entity::find()
1695 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
1696 .stream(&*tx)
1697 .await?;
1698 while let Some(db_settings_file) = db_settings_files.next().await {
1699 let db_settings_file = db_settings_file?;
1700 if let Some(worktree) = worktrees
1701 .iter_mut()
1702 .find(|w| w.id == db_settings_file.worktree_id as u64)
1703 {
1704 worktree.settings_files.push(WorktreeSettingsFile {
1705 path: db_settings_file.path,
1706 content: db_settings_file.content,
1707 });
1708 }
1709 }
1710 }
1711
1712 let mut collaborators = project
1713 .find_related(project_collaborator::Entity)
1714 .all(&*tx)
1715 .await?;
1716 let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1717 .iter()
1718 .position(|collaborator| collaborator.user_id == user_id)
1719 {
1720 collaborators.swap_remove(self_collaborator_ix)
1721 } else {
1722 continue;
1723 };
1724 let old_connection_id = self_collaborator.connection();
1725 project_collaborator::Entity::update(project_collaborator::ActiveModel {
1726 connection_id: ActiveValue::set(connection.id as i32),
1727 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1728 ..self_collaborator.into_active_model()
1729 })
1730 .exec(&*tx)
1731 .await?;
1732
1733 let collaborators = collaborators
1734 .into_iter()
1735 .map(|collaborator| ProjectCollaborator {
1736 connection_id: collaborator.connection(),
1737 user_id: collaborator.user_id,
1738 replica_id: collaborator.replica_id,
1739 is_host: collaborator.is_host,
1740 })
1741 .collect::<Vec<_>>();
1742
1743 rejoined_projects.push(RejoinedProject {
1744 id: project_id,
1745 old_connection_id,
1746 collaborators,
1747 worktrees,
1748 language_servers,
1749 });
1750 }
1751
1752 let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
1753 let channel_members = if let Some(channel_id) = channel_id {
1754 self.get_channel_members_internal(channel_id, &tx).await?
1755 } else {
1756 Vec::new()
1757 };
1758
1759 Ok(RejoinedRoom {
1760 room,
1761 channel_id,
1762 channel_members,
1763 rejoined_projects,
1764 reshared_projects,
1765 })
1766 })
1767 .await
1768 }
1769
1770 pub async fn leave_room(
1771 &self,
1772 connection: ConnectionId,
1773 ) -> Result<Option<RoomGuard<LeftRoom>>> {
1774 self.optional_room_transaction(|tx| async move {
1775 let leaving_participant = room_participant::Entity::find()
1776 .filter(
1777 Condition::all()
1778 .add(
1779 room_participant::Column::AnsweringConnectionId
1780 .eq(connection.id as i32),
1781 )
1782 .add(
1783 room_participant::Column::AnsweringConnectionServerId
1784 .eq(connection.owner_id as i32),
1785 ),
1786 )
1787 .one(&*tx)
1788 .await?;
1789
1790 if let Some(leaving_participant) = leaving_participant {
1791 // Leave room.
1792 let room_id = leaving_participant.room_id;
1793 room_participant::Entity::delete_by_id(leaving_participant.id)
1794 .exec(&*tx)
1795 .await?;
1796
1797 // Cancel pending calls initiated by the leaving user.
1798 let called_participants = room_participant::Entity::find()
1799 .filter(
1800 Condition::all()
1801 .add(
1802 room_participant::Column::CallingUserId
1803 .eq(leaving_participant.user_id),
1804 )
1805 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1806 )
1807 .all(&*tx)
1808 .await?;
1809 room_participant::Entity::delete_many()
1810 .filter(
1811 room_participant::Column::Id
1812 .is_in(called_participants.iter().map(|participant| participant.id)),
1813 )
1814 .exec(&*tx)
1815 .await?;
1816 let canceled_calls_to_user_ids = called_participants
1817 .into_iter()
1818 .map(|participant| participant.user_id)
1819 .collect();
1820
1821 // Detect left projects.
1822 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1823 enum QueryProjectIds {
1824 ProjectId,
1825 }
1826 let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1827 .select_only()
1828 .column_as(
1829 project_collaborator::Column::ProjectId,
1830 QueryProjectIds::ProjectId,
1831 )
1832 .filter(
1833 Condition::all()
1834 .add(
1835 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1836 )
1837 .add(
1838 project_collaborator::Column::ConnectionServerId
1839 .eq(connection.owner_id as i32),
1840 ),
1841 )
1842 .into_values::<_, QueryProjectIds>()
1843 .all(&*tx)
1844 .await?;
1845 let mut left_projects = HashMap::default();
1846 let mut collaborators = project_collaborator::Entity::find()
1847 .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1848 .stream(&*tx)
1849 .await?;
1850 while let Some(collaborator) = collaborators.next().await {
1851 let collaborator = collaborator?;
1852 let left_project =
1853 left_projects
1854 .entry(collaborator.project_id)
1855 .or_insert(LeftProject {
1856 id: collaborator.project_id,
1857 host_user_id: Default::default(),
1858 connection_ids: Default::default(),
1859 host_connection_id: Default::default(),
1860 });
1861
1862 let collaborator_connection_id = collaborator.connection();
1863 if collaborator_connection_id != connection {
1864 left_project.connection_ids.push(collaborator_connection_id);
1865 }
1866
1867 if collaborator.is_host {
1868 left_project.host_user_id = collaborator.user_id;
1869 left_project.host_connection_id = collaborator_connection_id;
1870 }
1871 }
1872 drop(collaborators);
1873
1874 // Leave projects.
1875 project_collaborator::Entity::delete_many()
1876 .filter(
1877 Condition::all()
1878 .add(
1879 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1880 )
1881 .add(
1882 project_collaborator::Column::ConnectionServerId
1883 .eq(connection.owner_id as i32),
1884 ),
1885 )
1886 .exec(&*tx)
1887 .await?;
1888
1889 // Unshare projects.
1890 project::Entity::delete_many()
1891 .filter(
1892 Condition::all()
1893 .add(project::Column::RoomId.eq(room_id))
1894 .add(project::Column::HostConnectionId.eq(connection.id as i32))
1895 .add(
1896 project::Column::HostConnectionServerId
1897 .eq(connection.owner_id as i32),
1898 ),
1899 )
1900 .exec(&*tx)
1901 .await?;
1902
1903 let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
1904 let deleted = if room.participants.is_empty() {
1905 let result = room::Entity::delete_by_id(room_id)
1906 .filter(room::Column::ChannelId.is_null())
1907 .exec(&*tx)
1908 .await?;
1909 result.rows_affected > 0
1910 } else {
1911 false
1912 };
1913
1914 let channel_members = if let Some(channel_id) = channel_id {
1915 self.get_channel_members_internal(channel_id, &tx).await?
1916 } else {
1917 Vec::new()
1918 };
1919 let left_room = LeftRoom {
1920 room,
1921 channel_id,
1922 channel_members,
1923 left_projects,
1924 canceled_calls_to_user_ids,
1925 deleted,
1926 };
1927
1928 if left_room.room.participants.is_empty() {
1929 self.rooms.remove(&room_id);
1930 }
1931
1932 Ok(Some((room_id, left_room)))
1933 } else {
1934 Ok(None)
1935 }
1936 })
1937 .await
1938 }
1939
1940 pub async fn follow(
1941 &self,
1942 project_id: ProjectId,
1943 leader_connection: ConnectionId,
1944 follower_connection: ConnectionId,
1945 ) -> Result<RoomGuard<proto::Room>> {
1946 let room_id = self.room_id_for_project(project_id).await?;
1947 self.room_transaction(room_id, |tx| async move {
1948 follower::ActiveModel {
1949 room_id: ActiveValue::set(room_id),
1950 project_id: ActiveValue::set(project_id),
1951 leader_connection_server_id: ActiveValue::set(ServerId(
1952 leader_connection.owner_id as i32,
1953 )),
1954 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1955 follower_connection_server_id: ActiveValue::set(ServerId(
1956 follower_connection.owner_id as i32,
1957 )),
1958 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1959 ..Default::default()
1960 }
1961 .insert(&*tx)
1962 .await?;
1963
1964 let room = self.get_room(room_id, &*tx).await?;
1965 Ok(room)
1966 })
1967 .await
1968 }
1969
1970 pub async fn unfollow(
1971 &self,
1972 project_id: ProjectId,
1973 leader_connection: ConnectionId,
1974 follower_connection: ConnectionId,
1975 ) -> Result<RoomGuard<proto::Room>> {
1976 let room_id = self.room_id_for_project(project_id).await?;
1977 self.room_transaction(room_id, |tx| async move {
1978 follower::Entity::delete_many()
1979 .filter(
1980 Condition::all()
1981 .add(follower::Column::ProjectId.eq(project_id))
1982 .add(
1983 follower::Column::LeaderConnectionServerId
1984 .eq(leader_connection.owner_id),
1985 )
1986 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1987 .add(
1988 follower::Column::FollowerConnectionServerId
1989 .eq(follower_connection.owner_id),
1990 )
1991 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1992 )
1993 .exec(&*tx)
1994 .await?;
1995
1996 let room = self.get_room(room_id, &*tx).await?;
1997 Ok(room)
1998 })
1999 .await
2000 }
2001
2002 pub async fn update_room_participant_location(
2003 &self,
2004 room_id: RoomId,
2005 connection: ConnectionId,
2006 location: proto::ParticipantLocation,
2007 ) -> Result<RoomGuard<proto::Room>> {
2008 self.room_transaction(room_id, |tx| async {
2009 let tx = tx;
2010 let location_kind;
2011 let location_project_id;
2012 match location
2013 .variant
2014 .as_ref()
2015 .ok_or_else(|| anyhow!("invalid location"))?
2016 {
2017 proto::participant_location::Variant::SharedProject(project) => {
2018 location_kind = 0;
2019 location_project_id = Some(ProjectId::from_proto(project.id));
2020 }
2021 proto::participant_location::Variant::UnsharedProject(_) => {
2022 location_kind = 1;
2023 location_project_id = None;
2024 }
2025 proto::participant_location::Variant::External(_) => {
2026 location_kind = 2;
2027 location_project_id = None;
2028 }
2029 }
2030
2031 let result = room_participant::Entity::update_many()
2032 .filter(
2033 Condition::all()
2034 .add(room_participant::Column::RoomId.eq(room_id))
2035 .add(
2036 room_participant::Column::AnsweringConnectionId
2037 .eq(connection.id as i32),
2038 )
2039 .add(
2040 room_participant::Column::AnsweringConnectionServerId
2041 .eq(connection.owner_id as i32),
2042 ),
2043 )
2044 .set(room_participant::ActiveModel {
2045 location_kind: ActiveValue::set(Some(location_kind)),
2046 location_project_id: ActiveValue::set(location_project_id),
2047 ..Default::default()
2048 })
2049 .exec(&*tx)
2050 .await?;
2051
2052 if result.rows_affected == 1 {
2053 let room = self.get_room(room_id, &tx).await?;
2054 Ok(room)
2055 } else {
2056 Err(anyhow!("could not update room participant location"))?
2057 }
2058 })
2059 .await
2060 }
2061
2062 pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
2063 self.transaction(|tx| async move {
2064 let participant = room_participant::Entity::find()
2065 .filter(
2066 Condition::all()
2067 .add(
2068 room_participant::Column::AnsweringConnectionId
2069 .eq(connection.id as i32),
2070 )
2071 .add(
2072 room_participant::Column::AnsweringConnectionServerId
2073 .eq(connection.owner_id as i32),
2074 ),
2075 )
2076 .one(&*tx)
2077 .await?
2078 .ok_or_else(|| anyhow!("not a participant in any room"))?;
2079
2080 room_participant::Entity::update(room_participant::ActiveModel {
2081 answering_connection_lost: ActiveValue::set(true),
2082 ..participant.into_active_model()
2083 })
2084 .exec(&*tx)
2085 .await?;
2086
2087 Ok(())
2088 })
2089 .await
2090 }
2091
2092 fn build_incoming_call(
2093 room: &proto::Room,
2094 called_user_id: UserId,
2095 ) -> Option<proto::IncomingCall> {
2096 let pending_participant = room
2097 .pending_participants
2098 .iter()
2099 .find(|participant| participant.user_id == called_user_id.to_proto())?;
2100
2101 Some(proto::IncomingCall {
2102 room_id: room.id,
2103 calling_user_id: pending_participant.calling_user_id,
2104 participant_user_ids: room
2105 .participants
2106 .iter()
2107 .map(|participant| participant.user_id)
2108 .collect(),
2109 initial_project: room.participants.iter().find_map(|participant| {
2110 let initial_project_id = pending_participant.initial_project_id?;
2111 participant
2112 .projects
2113 .iter()
2114 .find(|project| project.id == initial_project_id)
2115 .cloned()
2116 }),
2117 })
2118 }
2119 async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
2120 let (_, room) = self.get_channel_room(room_id, tx).await?;
2121 Ok(room)
2122 }
2123
2124 async fn get_channel_room(
2125 &self,
2126 room_id: RoomId,
2127 tx: &DatabaseTransaction,
2128 ) -> Result<(Option<ChannelId>, proto::Room)> {
2129 let db_room = room::Entity::find_by_id(room_id)
2130 .one(tx)
2131 .await?
2132 .ok_or_else(|| anyhow!("could not find room"))?;
2133
2134 let mut db_participants = db_room
2135 .find_related(room_participant::Entity)
2136 .stream(tx)
2137 .await?;
2138 let mut participants = HashMap::default();
2139 let mut pending_participants = Vec::new();
2140 while let Some(db_participant) = db_participants.next().await {
2141 let db_participant = db_participant?;
2142 if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
2143 .answering_connection_id
2144 .zip(db_participant.answering_connection_server_id)
2145 {
2146 let location = match (
2147 db_participant.location_kind,
2148 db_participant.location_project_id,
2149 ) {
2150 (Some(0), Some(project_id)) => {
2151 Some(proto::participant_location::Variant::SharedProject(
2152 proto::participant_location::SharedProject {
2153 id: project_id.to_proto(),
2154 },
2155 ))
2156 }
2157 (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
2158 Default::default(),
2159 )),
2160 _ => Some(proto::participant_location::Variant::External(
2161 Default::default(),
2162 )),
2163 };
2164
2165 let answering_connection = ConnectionId {
2166 owner_id: answering_connection_server_id.0 as u32,
2167 id: answering_connection_id as u32,
2168 };
2169 participants.insert(
2170 answering_connection,
2171 proto::Participant {
2172 user_id: db_participant.user_id.to_proto(),
2173 peer_id: Some(answering_connection.into()),
2174 projects: Default::default(),
2175 location: Some(proto::ParticipantLocation { variant: location }),
2176 },
2177 );
2178 } else {
2179 pending_participants.push(proto::PendingParticipant {
2180 user_id: db_participant.user_id.to_proto(),
2181 calling_user_id: db_participant.calling_user_id.to_proto(),
2182 initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
2183 });
2184 }
2185 }
2186 drop(db_participants);
2187
2188 let mut db_projects = db_room
2189 .find_related(project::Entity)
2190 .find_with_related(worktree::Entity)
2191 .stream(tx)
2192 .await?;
2193
2194 while let Some(row) = db_projects.next().await {
2195 let (db_project, db_worktree) = row?;
2196 let host_connection = db_project.host_connection()?;
2197 if let Some(participant) = participants.get_mut(&host_connection) {
2198 let project = if let Some(project) = participant
2199 .projects
2200 .iter_mut()
2201 .find(|project| project.id == db_project.id.to_proto())
2202 {
2203 project
2204 } else {
2205 participant.projects.push(proto::ParticipantProject {
2206 id: db_project.id.to_proto(),
2207 worktree_root_names: Default::default(),
2208 });
2209 participant.projects.last_mut().unwrap()
2210 };
2211
2212 if let Some(db_worktree) = db_worktree {
2213 if db_worktree.visible {
2214 project.worktree_root_names.push(db_worktree.root_name);
2215 }
2216 }
2217 }
2218 }
2219 drop(db_projects);
2220
2221 let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
2222 let mut followers = Vec::new();
2223 while let Some(db_follower) = db_followers.next().await {
2224 let db_follower = db_follower?;
2225 followers.push(proto::Follower {
2226 leader_id: Some(db_follower.leader_connection().into()),
2227 follower_id: Some(db_follower.follower_connection().into()),
2228 project_id: db_follower.project_id.to_proto(),
2229 });
2230 }
2231
2232 Ok((
2233 db_room.channel_id,
2234 proto::Room {
2235 id: db_room.id.to_proto(),
2236 live_kit_room: db_room.live_kit_room,
2237 participants: participants.into_values().collect(),
2238 pending_participants,
2239 followers,
2240 },
2241 ))
2242 }
2243
2244 // projects
2245
2246 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
2247 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2248 enum QueryAs {
2249 Count,
2250 }
2251
2252 self.transaction(|tx| async move {
2253 Ok(project::Entity::find()
2254 .select_only()
2255 .column_as(project::Column::Id.count(), QueryAs::Count)
2256 .inner_join(user::Entity)
2257 .filter(user::Column::Admin.eq(false))
2258 .into_values::<_, QueryAs>()
2259 .one(&*tx)
2260 .await?
2261 .unwrap_or(0i64) as usize)
2262 })
2263 .await
2264 }
2265
2266 pub async fn share_project(
2267 &self,
2268 room_id: RoomId,
2269 connection: ConnectionId,
2270 worktrees: &[proto::WorktreeMetadata],
2271 ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
2272 self.room_transaction(room_id, |tx| async move {
2273 let participant = room_participant::Entity::find()
2274 .filter(
2275 Condition::all()
2276 .add(
2277 room_participant::Column::AnsweringConnectionId
2278 .eq(connection.id as i32),
2279 )
2280 .add(
2281 room_participant::Column::AnsweringConnectionServerId
2282 .eq(connection.owner_id as i32),
2283 ),
2284 )
2285 .one(&*tx)
2286 .await?
2287 .ok_or_else(|| anyhow!("could not find participant"))?;
2288 if participant.room_id != room_id {
2289 return Err(anyhow!("shared project on unexpected room"))?;
2290 }
2291
2292 let project = project::ActiveModel {
2293 room_id: ActiveValue::set(participant.room_id),
2294 host_user_id: ActiveValue::set(participant.user_id),
2295 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
2296 host_connection_server_id: ActiveValue::set(Some(ServerId(
2297 connection.owner_id as i32,
2298 ))),
2299 ..Default::default()
2300 }
2301 .insert(&*tx)
2302 .await?;
2303
2304 if !worktrees.is_empty() {
2305 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2306 worktree::ActiveModel {
2307 id: ActiveValue::set(worktree.id as i64),
2308 project_id: ActiveValue::set(project.id),
2309 abs_path: ActiveValue::set(worktree.abs_path.clone()),
2310 root_name: ActiveValue::set(worktree.root_name.clone()),
2311 visible: ActiveValue::set(worktree.visible),
2312 scan_id: ActiveValue::set(0),
2313 completed_scan_id: ActiveValue::set(0),
2314 }
2315 }))
2316 .exec(&*tx)
2317 .await?;
2318 }
2319
2320 project_collaborator::ActiveModel {
2321 project_id: ActiveValue::set(project.id),
2322 connection_id: ActiveValue::set(connection.id as i32),
2323 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2324 user_id: ActiveValue::set(participant.user_id),
2325 replica_id: ActiveValue::set(ReplicaId(0)),
2326 is_host: ActiveValue::set(true),
2327 ..Default::default()
2328 }
2329 .insert(&*tx)
2330 .await?;
2331
2332 let room = self.get_room(room_id, &tx).await?;
2333 Ok((project.id, room))
2334 })
2335 .await
2336 }
2337
2338 pub async fn unshare_project(
2339 &self,
2340 project_id: ProjectId,
2341 connection: ConnectionId,
2342 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2343 let room_id = self.room_id_for_project(project_id).await?;
2344 self.room_transaction(room_id, |tx| async move {
2345 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2346
2347 let project = project::Entity::find_by_id(project_id)
2348 .one(&*tx)
2349 .await?
2350 .ok_or_else(|| anyhow!("project not found"))?;
2351 if project.host_connection()? == connection {
2352 project::Entity::delete(project.into_active_model())
2353 .exec(&*tx)
2354 .await?;
2355 let room = self.get_room(room_id, &tx).await?;
2356 Ok((room, guest_connection_ids))
2357 } else {
2358 Err(anyhow!("cannot unshare a project hosted by another user"))?
2359 }
2360 })
2361 .await
2362 }
2363
2364 pub async fn update_project(
2365 &self,
2366 project_id: ProjectId,
2367 connection: ConnectionId,
2368 worktrees: &[proto::WorktreeMetadata],
2369 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2370 let room_id = self.room_id_for_project(project_id).await?;
2371 self.room_transaction(room_id, |tx| async move {
2372 let project = project::Entity::find_by_id(project_id)
2373 .filter(
2374 Condition::all()
2375 .add(project::Column::HostConnectionId.eq(connection.id as i32))
2376 .add(
2377 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2378 ),
2379 )
2380 .one(&*tx)
2381 .await?
2382 .ok_or_else(|| anyhow!("no such project"))?;
2383
2384 self.update_project_worktrees(project.id, worktrees, &tx)
2385 .await?;
2386
2387 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2388 let room = self.get_room(project.room_id, &tx).await?;
2389 Ok((room, guest_connection_ids))
2390 })
2391 .await
2392 }
2393
2394 async fn update_project_worktrees(
2395 &self,
2396 project_id: ProjectId,
2397 worktrees: &[proto::WorktreeMetadata],
2398 tx: &DatabaseTransaction,
2399 ) -> Result<()> {
2400 if !worktrees.is_empty() {
2401 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2402 id: ActiveValue::set(worktree.id as i64),
2403 project_id: ActiveValue::set(project_id),
2404 abs_path: ActiveValue::set(worktree.abs_path.clone()),
2405 root_name: ActiveValue::set(worktree.root_name.clone()),
2406 visible: ActiveValue::set(worktree.visible),
2407 scan_id: ActiveValue::set(0),
2408 completed_scan_id: ActiveValue::set(0),
2409 }))
2410 .on_conflict(
2411 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2412 .update_column(worktree::Column::RootName)
2413 .to_owned(),
2414 )
2415 .exec(&*tx)
2416 .await?;
2417 }
2418
2419 worktree::Entity::delete_many()
2420 .filter(worktree::Column::ProjectId.eq(project_id).and(
2421 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2422 ))
2423 .exec(&*tx)
2424 .await?;
2425
2426 Ok(())
2427 }
2428
2429 pub async fn update_worktree(
2430 &self,
2431 update: &proto::UpdateWorktree,
2432 connection: ConnectionId,
2433 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2434 let project_id = ProjectId::from_proto(update.project_id);
2435 let worktree_id = update.worktree_id as i64;
2436 let room_id = self.room_id_for_project(project_id).await?;
2437 self.room_transaction(room_id, |tx| async move {
2438 // Ensure the update comes from the host.
2439 let _project = project::Entity::find_by_id(project_id)
2440 .filter(
2441 Condition::all()
2442 .add(project::Column::HostConnectionId.eq(connection.id as i32))
2443 .add(
2444 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2445 ),
2446 )
2447 .one(&*tx)
2448 .await?
2449 .ok_or_else(|| anyhow!("no such project"))?;
2450
2451 // Update metadata.
2452 worktree::Entity::update(worktree::ActiveModel {
2453 id: ActiveValue::set(worktree_id),
2454 project_id: ActiveValue::set(project_id),
2455 root_name: ActiveValue::set(update.root_name.clone()),
2456 scan_id: ActiveValue::set(update.scan_id as i64),
2457 completed_scan_id: if update.is_last_update {
2458 ActiveValue::set(update.scan_id as i64)
2459 } else {
2460 ActiveValue::default()
2461 },
2462 abs_path: ActiveValue::set(update.abs_path.clone()),
2463 ..Default::default()
2464 })
2465 .exec(&*tx)
2466 .await?;
2467
2468 if !update.updated_entries.is_empty() {
2469 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2470 let mtime = entry.mtime.clone().unwrap_or_default();
2471 worktree_entry::ActiveModel {
2472 project_id: ActiveValue::set(project_id),
2473 worktree_id: ActiveValue::set(worktree_id),
2474 id: ActiveValue::set(entry.id as i64),
2475 is_dir: ActiveValue::set(entry.is_dir),
2476 path: ActiveValue::set(entry.path.clone()),
2477 inode: ActiveValue::set(entry.inode as i64),
2478 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2479 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2480 is_symlink: ActiveValue::set(entry.is_symlink),
2481 is_ignored: ActiveValue::set(entry.is_ignored),
2482 is_external: ActiveValue::set(entry.is_external),
2483 git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
2484 is_deleted: ActiveValue::set(false),
2485 scan_id: ActiveValue::set(update.scan_id as i64),
2486 }
2487 }))
2488 .on_conflict(
2489 OnConflict::columns([
2490 worktree_entry::Column::ProjectId,
2491 worktree_entry::Column::WorktreeId,
2492 worktree_entry::Column::Id,
2493 ])
2494 .update_columns([
2495 worktree_entry::Column::IsDir,
2496 worktree_entry::Column::Path,
2497 worktree_entry::Column::Inode,
2498 worktree_entry::Column::MtimeSeconds,
2499 worktree_entry::Column::MtimeNanos,
2500 worktree_entry::Column::IsSymlink,
2501 worktree_entry::Column::IsIgnored,
2502 worktree_entry::Column::GitStatus,
2503 worktree_entry::Column::ScanId,
2504 ])
2505 .to_owned(),
2506 )
2507 .exec(&*tx)
2508 .await?;
2509 }
2510
2511 if !update.removed_entries.is_empty() {
2512 worktree_entry::Entity::update_many()
2513 .filter(
2514 worktree_entry::Column::ProjectId
2515 .eq(project_id)
2516 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2517 .and(
2518 worktree_entry::Column::Id
2519 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2520 ),
2521 )
2522 .set(worktree_entry::ActiveModel {
2523 is_deleted: ActiveValue::Set(true),
2524 scan_id: ActiveValue::Set(update.scan_id as i64),
2525 ..Default::default()
2526 })
2527 .exec(&*tx)
2528 .await?;
2529 }
2530
2531 if !update.updated_repositories.is_empty() {
2532 worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
2533 |repository| worktree_repository::ActiveModel {
2534 project_id: ActiveValue::set(project_id),
2535 worktree_id: ActiveValue::set(worktree_id),
2536 work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
2537 scan_id: ActiveValue::set(update.scan_id as i64),
2538 branch: ActiveValue::set(repository.branch.clone()),
2539 is_deleted: ActiveValue::set(false),
2540 },
2541 ))
2542 .on_conflict(
2543 OnConflict::columns([
2544 worktree_repository::Column::ProjectId,
2545 worktree_repository::Column::WorktreeId,
2546 worktree_repository::Column::WorkDirectoryId,
2547 ])
2548 .update_columns([
2549 worktree_repository::Column::ScanId,
2550 worktree_repository::Column::Branch,
2551 ])
2552 .to_owned(),
2553 )
2554 .exec(&*tx)
2555 .await?;
2556 }
2557
2558 if !update.removed_repositories.is_empty() {
2559 worktree_repository::Entity::update_many()
2560 .filter(
2561 worktree_repository::Column::ProjectId
2562 .eq(project_id)
2563 .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
2564 .and(
2565 worktree_repository::Column::WorkDirectoryId
2566 .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
2567 ),
2568 )
2569 .set(worktree_repository::ActiveModel {
2570 is_deleted: ActiveValue::Set(true),
2571 scan_id: ActiveValue::Set(update.scan_id as i64),
2572 ..Default::default()
2573 })
2574 .exec(&*tx)
2575 .await?;
2576 }
2577
2578 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2579 Ok(connection_ids)
2580 })
2581 .await
2582 }
2583
2584 pub async fn update_diagnostic_summary(
2585 &self,
2586 update: &proto::UpdateDiagnosticSummary,
2587 connection: ConnectionId,
2588 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2589 let project_id = ProjectId::from_proto(update.project_id);
2590 let worktree_id = update.worktree_id as i64;
2591 let room_id = self.room_id_for_project(project_id).await?;
2592 self.room_transaction(room_id, |tx| async move {
2593 let summary = update
2594 .summary
2595 .as_ref()
2596 .ok_or_else(|| anyhow!("invalid summary"))?;
2597
2598 // Ensure the update comes from the host.
2599 let project = project::Entity::find_by_id(project_id)
2600 .one(&*tx)
2601 .await?
2602 .ok_or_else(|| anyhow!("no such project"))?;
2603 if project.host_connection()? != connection {
2604 return Err(anyhow!("can't update a project hosted by someone else"))?;
2605 }
2606
2607 // Update summary.
2608 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2609 project_id: ActiveValue::set(project_id),
2610 worktree_id: ActiveValue::set(worktree_id),
2611 path: ActiveValue::set(summary.path.clone()),
2612 language_server_id: ActiveValue::set(summary.language_server_id as i64),
2613 error_count: ActiveValue::set(summary.error_count as i32),
2614 warning_count: ActiveValue::set(summary.warning_count as i32),
2615 ..Default::default()
2616 })
2617 .on_conflict(
2618 OnConflict::columns([
2619 worktree_diagnostic_summary::Column::ProjectId,
2620 worktree_diagnostic_summary::Column::WorktreeId,
2621 worktree_diagnostic_summary::Column::Path,
2622 ])
2623 .update_columns([
2624 worktree_diagnostic_summary::Column::LanguageServerId,
2625 worktree_diagnostic_summary::Column::ErrorCount,
2626 worktree_diagnostic_summary::Column::WarningCount,
2627 ])
2628 .to_owned(),
2629 )
2630 .exec(&*tx)
2631 .await?;
2632
2633 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2634 Ok(connection_ids)
2635 })
2636 .await
2637 }
2638
2639 pub async fn start_language_server(
2640 &self,
2641 update: &proto::StartLanguageServer,
2642 connection: ConnectionId,
2643 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2644 let project_id = ProjectId::from_proto(update.project_id);
2645 let room_id = self.room_id_for_project(project_id).await?;
2646 self.room_transaction(room_id, |tx| async move {
2647 let server = update
2648 .server
2649 .as_ref()
2650 .ok_or_else(|| anyhow!("invalid language server"))?;
2651
2652 // Ensure the update comes from the host.
2653 let project = project::Entity::find_by_id(project_id)
2654 .one(&*tx)
2655 .await?
2656 .ok_or_else(|| anyhow!("no such project"))?;
2657 if project.host_connection()? != connection {
2658 return Err(anyhow!("can't update a project hosted by someone else"))?;
2659 }
2660
2661 // Add the newly-started language server.
2662 language_server::Entity::insert(language_server::ActiveModel {
2663 project_id: ActiveValue::set(project_id),
2664 id: ActiveValue::set(server.id as i64),
2665 name: ActiveValue::set(server.name.clone()),
2666 ..Default::default()
2667 })
2668 .on_conflict(
2669 OnConflict::columns([
2670 language_server::Column::ProjectId,
2671 language_server::Column::Id,
2672 ])
2673 .update_column(language_server::Column::Name)
2674 .to_owned(),
2675 )
2676 .exec(&*tx)
2677 .await?;
2678
2679 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2680 Ok(connection_ids)
2681 })
2682 .await
2683 }
2684
2685 pub async fn update_worktree_settings(
2686 &self,
2687 update: &proto::UpdateWorktreeSettings,
2688 connection: ConnectionId,
2689 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2690 let project_id = ProjectId::from_proto(update.project_id);
2691 let room_id = self.room_id_for_project(project_id).await?;
2692 self.room_transaction(room_id, |tx| async move {
2693 // Ensure the update comes from the host.
2694 let project = project::Entity::find_by_id(project_id)
2695 .one(&*tx)
2696 .await?
2697 .ok_or_else(|| anyhow!("no such project"))?;
2698 if project.host_connection()? != connection {
2699 return Err(anyhow!("can't update a project hosted by someone else"))?;
2700 }
2701
2702 if let Some(content) = &update.content {
2703 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
2704 project_id: ActiveValue::Set(project_id),
2705 worktree_id: ActiveValue::Set(update.worktree_id as i64),
2706 path: ActiveValue::Set(update.path.clone()),
2707 content: ActiveValue::Set(content.clone()),
2708 })
2709 .on_conflict(
2710 OnConflict::columns([
2711 worktree_settings_file::Column::ProjectId,
2712 worktree_settings_file::Column::WorktreeId,
2713 worktree_settings_file::Column::Path,
2714 ])
2715 .update_column(worktree_settings_file::Column::Content)
2716 .to_owned(),
2717 )
2718 .exec(&*tx)
2719 .await?;
2720 } else {
2721 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
2722 project_id: ActiveValue::Set(project_id),
2723 worktree_id: ActiveValue::Set(update.worktree_id as i64),
2724 path: ActiveValue::Set(update.path.clone()),
2725 ..Default::default()
2726 })
2727 .exec(&*tx)
2728 .await?;
2729 }
2730
2731 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2732 Ok(connection_ids)
2733 })
2734 .await
2735 }
2736
2737 pub async fn join_project(
2738 &self,
2739 project_id: ProjectId,
2740 connection: ConnectionId,
2741 ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2742 let room_id = self.room_id_for_project(project_id).await?;
2743 self.room_transaction(room_id, |tx| async move {
2744 let participant = room_participant::Entity::find()
2745 .filter(
2746 Condition::all()
2747 .add(
2748 room_participant::Column::AnsweringConnectionId
2749 .eq(connection.id as i32),
2750 )
2751 .add(
2752 room_participant::Column::AnsweringConnectionServerId
2753 .eq(connection.owner_id as i32),
2754 ),
2755 )
2756 .one(&*tx)
2757 .await?
2758 .ok_or_else(|| anyhow!("must join a room first"))?;
2759
2760 let project = project::Entity::find_by_id(project_id)
2761 .one(&*tx)
2762 .await?
2763 .ok_or_else(|| anyhow!("no such project"))?;
2764 if project.room_id != participant.room_id {
2765 return Err(anyhow!("no such project"))?;
2766 }
2767
2768 let mut collaborators = project
2769 .find_related(project_collaborator::Entity)
2770 .all(&*tx)
2771 .await?;
2772 let replica_ids = collaborators
2773 .iter()
2774 .map(|c| c.replica_id)
2775 .collect::<HashSet<_>>();
2776 let mut replica_id = ReplicaId(1);
2777 while replica_ids.contains(&replica_id) {
2778 replica_id.0 += 1;
2779 }
2780 let new_collaborator = project_collaborator::ActiveModel {
2781 project_id: ActiveValue::set(project_id),
2782 connection_id: ActiveValue::set(connection.id as i32),
2783 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2784 user_id: ActiveValue::set(participant.user_id),
2785 replica_id: ActiveValue::set(replica_id),
2786 is_host: ActiveValue::set(false),
2787 ..Default::default()
2788 }
2789 .insert(&*tx)
2790 .await?;
2791 collaborators.push(new_collaborator);
2792
2793 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2794 let mut worktrees = db_worktrees
2795 .into_iter()
2796 .map(|db_worktree| {
2797 (
2798 db_worktree.id as u64,
2799 Worktree {
2800 id: db_worktree.id as u64,
2801 abs_path: db_worktree.abs_path,
2802 root_name: db_worktree.root_name,
2803 visible: db_worktree.visible,
2804 entries: Default::default(),
2805 repository_entries: Default::default(),
2806 diagnostic_summaries: Default::default(),
2807 settings_files: Default::default(),
2808 scan_id: db_worktree.scan_id as u64,
2809 completed_scan_id: db_worktree.completed_scan_id as u64,
2810 },
2811 )
2812 })
2813 .collect::<BTreeMap<_, _>>();
2814
2815 // Populate worktree entries.
2816 {
2817 let mut db_entries = worktree_entry::Entity::find()
2818 .filter(
2819 Condition::all()
2820 .add(worktree_entry::Column::ProjectId.eq(project_id))
2821 .add(worktree_entry::Column::IsDeleted.eq(false)),
2822 )
2823 .stream(&*tx)
2824 .await?;
2825 while let Some(db_entry) = db_entries.next().await {
2826 let db_entry = db_entry?;
2827 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2828 worktree.entries.push(proto::Entry {
2829 id: db_entry.id as u64,
2830 is_dir: db_entry.is_dir,
2831 path: db_entry.path,
2832 inode: db_entry.inode as u64,
2833 mtime: Some(proto::Timestamp {
2834 seconds: db_entry.mtime_seconds as u64,
2835 nanos: db_entry.mtime_nanos as u32,
2836 }),
2837 is_symlink: db_entry.is_symlink,
2838 is_ignored: db_entry.is_ignored,
2839 is_external: db_entry.is_external,
2840 git_status: db_entry.git_status.map(|status| status as i32),
2841 });
2842 }
2843 }
2844 }
2845
2846 // Populate repository entries.
2847 {
2848 let mut db_repository_entries = worktree_repository::Entity::find()
2849 .filter(
2850 Condition::all()
2851 .add(worktree_repository::Column::ProjectId.eq(project_id))
2852 .add(worktree_repository::Column::IsDeleted.eq(false)),
2853 )
2854 .stream(&*tx)
2855 .await?;
2856 while let Some(db_repository_entry) = db_repository_entries.next().await {
2857 let db_repository_entry = db_repository_entry?;
2858 if let Some(worktree) =
2859 worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
2860 {
2861 worktree.repository_entries.insert(
2862 db_repository_entry.work_directory_id as u64,
2863 proto::RepositoryEntry {
2864 work_directory_id: db_repository_entry.work_directory_id as u64,
2865 branch: db_repository_entry.branch,
2866 },
2867 );
2868 }
2869 }
2870 }
2871
2872 // Populate worktree diagnostic summaries.
2873 {
2874 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2875 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2876 .stream(&*tx)
2877 .await?;
2878 while let Some(db_summary) = db_summaries.next().await {
2879 let db_summary = db_summary?;
2880 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2881 worktree
2882 .diagnostic_summaries
2883 .push(proto::DiagnosticSummary {
2884 path: db_summary.path,
2885 language_server_id: db_summary.language_server_id as u64,
2886 error_count: db_summary.error_count as u32,
2887 warning_count: db_summary.warning_count as u32,
2888 });
2889 }
2890 }
2891 }
2892
2893 // Populate worktree settings files
2894 {
2895 let mut db_settings_files = worktree_settings_file::Entity::find()
2896 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
2897 .stream(&*tx)
2898 .await?;
2899 while let Some(db_settings_file) = db_settings_files.next().await {
2900 let db_settings_file = db_settings_file?;
2901 if let Some(worktree) =
2902 worktrees.get_mut(&(db_settings_file.worktree_id as u64))
2903 {
2904 worktree.settings_files.push(WorktreeSettingsFile {
2905 path: db_settings_file.path,
2906 content: db_settings_file.content,
2907 });
2908 }
2909 }
2910 }
2911
2912 // Populate language servers.
2913 let language_servers = project
2914 .find_related(language_server::Entity)
2915 .all(&*tx)
2916 .await?;
2917
2918 let project = Project {
2919 collaborators: collaborators
2920 .into_iter()
2921 .map(|collaborator| ProjectCollaborator {
2922 connection_id: collaborator.connection(),
2923 user_id: collaborator.user_id,
2924 replica_id: collaborator.replica_id,
2925 is_host: collaborator.is_host,
2926 })
2927 .collect(),
2928 worktrees,
2929 language_servers: language_servers
2930 .into_iter()
2931 .map(|language_server| proto::LanguageServer {
2932 id: language_server.id as u64,
2933 name: language_server.name,
2934 })
2935 .collect(),
2936 };
2937 Ok((project, replica_id as ReplicaId))
2938 })
2939 .await
2940 }
2941
2942 pub async fn leave_project(
2943 &self,
2944 project_id: ProjectId,
2945 connection: ConnectionId,
2946 ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
2947 let room_id = self.room_id_for_project(project_id).await?;
2948 self.room_transaction(room_id, |tx| async move {
2949 let result = project_collaborator::Entity::delete_many()
2950 .filter(
2951 Condition::all()
2952 .add(project_collaborator::Column::ProjectId.eq(project_id))
2953 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2954 .add(
2955 project_collaborator::Column::ConnectionServerId
2956 .eq(connection.owner_id as i32),
2957 ),
2958 )
2959 .exec(&*tx)
2960 .await?;
2961 if result.rows_affected == 0 {
2962 Err(anyhow!("not a collaborator on this project"))?;
2963 }
2964
2965 let project = project::Entity::find_by_id(project_id)
2966 .one(&*tx)
2967 .await?
2968 .ok_or_else(|| anyhow!("no such project"))?;
2969 let collaborators = project
2970 .find_related(project_collaborator::Entity)
2971 .all(&*tx)
2972 .await?;
2973 let connection_ids = collaborators
2974 .into_iter()
2975 .map(|collaborator| collaborator.connection())
2976 .collect();
2977
2978 follower::Entity::delete_many()
2979 .filter(
2980 Condition::any()
2981 .add(
2982 Condition::all()
2983 .add(follower::Column::ProjectId.eq(project_id))
2984 .add(
2985 follower::Column::LeaderConnectionServerId
2986 .eq(connection.owner_id),
2987 )
2988 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
2989 )
2990 .add(
2991 Condition::all()
2992 .add(follower::Column::ProjectId.eq(project_id))
2993 .add(
2994 follower::Column::FollowerConnectionServerId
2995 .eq(connection.owner_id),
2996 )
2997 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
2998 ),
2999 )
3000 .exec(&*tx)
3001 .await?;
3002
3003 let room = self.get_room(project.room_id, &tx).await?;
3004 let left_project = LeftProject {
3005 id: project_id,
3006 host_user_id: project.host_user_id,
3007 host_connection_id: project.host_connection()?,
3008 connection_ids,
3009 };
3010 Ok((room, left_project))
3011 })
3012 .await
3013 }
3014
3015 pub async fn project_collaborators(
3016 &self,
3017 project_id: ProjectId,
3018 connection_id: ConnectionId,
3019 ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
3020 let room_id = self.room_id_for_project(project_id).await?;
3021 self.room_transaction(room_id, |tx| async move {
3022 let collaborators = project_collaborator::Entity::find()
3023 .filter(project_collaborator::Column::ProjectId.eq(project_id))
3024 .all(&*tx)
3025 .await?
3026 .into_iter()
3027 .map(|collaborator| ProjectCollaborator {
3028 connection_id: collaborator.connection(),
3029 user_id: collaborator.user_id,
3030 replica_id: collaborator.replica_id,
3031 is_host: collaborator.is_host,
3032 })
3033 .collect::<Vec<_>>();
3034
3035 if collaborators
3036 .iter()
3037 .any(|collaborator| collaborator.connection_id == connection_id)
3038 {
3039 Ok(collaborators)
3040 } else {
3041 Err(anyhow!("no such project"))?
3042 }
3043 })
3044 .await
3045 }
3046
3047 pub async fn project_connection_ids(
3048 &self,
3049 project_id: ProjectId,
3050 connection_id: ConnectionId,
3051 ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
3052 let room_id = self.room_id_for_project(project_id).await?;
3053 self.room_transaction(room_id, |tx| async move {
3054 let mut collaborators = project_collaborator::Entity::find()
3055 .filter(project_collaborator::Column::ProjectId.eq(project_id))
3056 .stream(&*tx)
3057 .await?;
3058
3059 let mut connection_ids = HashSet::default();
3060 while let Some(collaborator) = collaborators.next().await {
3061 let collaborator = collaborator?;
3062 connection_ids.insert(collaborator.connection());
3063 }
3064
3065 if connection_ids.contains(&connection_id) {
3066 Ok(connection_ids)
3067 } else {
3068 Err(anyhow!("no such project"))?
3069 }
3070 })
3071 .await
3072 }
3073
3074 async fn project_guest_connection_ids(
3075 &self,
3076 project_id: ProjectId,
3077 tx: &DatabaseTransaction,
3078 ) -> Result<Vec<ConnectionId>> {
3079 let mut collaborators = project_collaborator::Entity::find()
3080 .filter(
3081 project_collaborator::Column::ProjectId
3082 .eq(project_id)
3083 .and(project_collaborator::Column::IsHost.eq(false)),
3084 )
3085 .stream(tx)
3086 .await?;
3087
3088 let mut guest_connection_ids = Vec::new();
3089 while let Some(collaborator) = collaborators.next().await {
3090 let collaborator = collaborator?;
3091 guest_connection_ids.push(collaborator.connection());
3092 }
3093 Ok(guest_connection_ids)
3094 }
3095
3096 async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
3097 self.transaction(|tx| async move {
3098 let project = project::Entity::find_by_id(project_id)
3099 .one(&*tx)
3100 .await?
3101 .ok_or_else(|| anyhow!("project {} not found", project_id))?;
3102 Ok(project.room_id)
3103 })
3104 .await
3105 }
3106
3107 // access tokens
3108
3109 pub async fn create_access_token(
3110 &self,
3111 user_id: UserId,
3112 access_token_hash: &str,
3113 max_access_token_count: usize,
3114 ) -> Result<AccessTokenId> {
3115 self.transaction(|tx| async {
3116 let tx = tx;
3117
3118 let token = access_token::ActiveModel {
3119 user_id: ActiveValue::set(user_id),
3120 hash: ActiveValue::set(access_token_hash.into()),
3121 ..Default::default()
3122 }
3123 .insert(&*tx)
3124 .await?;
3125
3126 access_token::Entity::delete_many()
3127 .filter(
3128 access_token::Column::Id.in_subquery(
3129 Query::select()
3130 .column(access_token::Column::Id)
3131 .from(access_token::Entity)
3132 .and_where(access_token::Column::UserId.eq(user_id))
3133 .order_by(access_token::Column::Id, sea_orm::Order::Desc)
3134 .limit(10000)
3135 .offset(max_access_token_count as u64)
3136 .to_owned(),
3137 ),
3138 )
3139 .exec(&*tx)
3140 .await?;
3141 Ok(token.id)
3142 })
3143 .await
3144 }
3145
3146 pub async fn get_access_token(
3147 &self,
3148 access_token_id: AccessTokenId,
3149 ) -> Result<access_token::Model> {
3150 self.transaction(|tx| async move {
3151 Ok(access_token::Entity::find_by_id(access_token_id)
3152 .one(&*tx)
3153 .await?
3154 .ok_or_else(|| anyhow!("no such access token"))?)
3155 })
3156 .await
3157 }
3158
3159 // channels
3160
3161 pub async fn create_root_channel(
3162 &self,
3163 name: &str,
3164 live_kit_room: &str,
3165 creator_id: UserId,
3166 ) -> Result<ChannelId> {
3167 self.create_channel(name, None, live_kit_room, creator_id)
3168 .await
3169 }
3170
3171 pub async fn create_channel(
3172 &self,
3173 name: &str,
3174 parent: Option<ChannelId>,
3175 live_kit_room: &str,
3176 creator_id: UserId,
3177 ) -> Result<ChannelId> {
3178 let name = Self::sanitize_channel_name(name)?;
3179 self.transaction(move |tx| async move {
3180 if let Some(parent) = parent {
3181 self.check_user_is_channel_admin(parent, creator_id, &*tx)
3182 .await?;
3183 }
3184
3185 let channel = channel::ActiveModel {
3186 name: ActiveValue::Set(name.to_string()),
3187 ..Default::default()
3188 }
3189 .insert(&*tx)
3190 .await?;
3191
3192 let channel_paths_stmt;
3193 if let Some(parent) = parent {
3194 let sql = r#"
3195 INSERT INTO channel_paths
3196 (id_path, channel_id)
3197 SELECT
3198 id_path || $1 || '/', $2
3199 FROM
3200 channel_paths
3201 WHERE
3202 channel_id = $3
3203 "#;
3204 channel_paths_stmt = Statement::from_sql_and_values(
3205 self.pool.get_database_backend(),
3206 sql,
3207 [
3208 channel.id.to_proto().into(),
3209 channel.id.to_proto().into(),
3210 parent.to_proto().into(),
3211 ],
3212 );
3213 tx.execute(channel_paths_stmt).await?;
3214 } else {
3215 channel_path::Entity::insert(channel_path::ActiveModel {
3216 channel_id: ActiveValue::Set(channel.id),
3217 id_path: ActiveValue::Set(format!("/{}/", channel.id)),
3218 })
3219 .exec(&*tx)
3220 .await?;
3221 }
3222
3223 channel_member::ActiveModel {
3224 channel_id: ActiveValue::Set(channel.id),
3225 user_id: ActiveValue::Set(creator_id),
3226 accepted: ActiveValue::Set(true),
3227 admin: ActiveValue::Set(true),
3228 ..Default::default()
3229 }
3230 .insert(&*tx)
3231 .await?;
3232
3233 room::ActiveModel {
3234 channel_id: ActiveValue::Set(Some(channel.id)),
3235 live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
3236 ..Default::default()
3237 }
3238 .insert(&*tx)
3239 .await?;
3240
3241 Ok(channel.id)
3242 })
3243 .await
3244 }
3245
3246 pub async fn remove_channel(
3247 &self,
3248 channel_id: ChannelId,
3249 user_id: UserId,
3250 ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
3251 self.transaction(move |tx| async move {
3252 self.check_user_is_channel_admin(channel_id, user_id, &*tx)
3253 .await?;
3254
3255 // Don't remove descendant channels that have additional parents.
3256 let mut channels_to_remove = self.get_channel_descendants([channel_id], &*tx).await?;
3257 {
3258 let mut channels_to_keep = channel_path::Entity::find()
3259 .filter(
3260 channel_path::Column::ChannelId
3261 .is_in(
3262 channels_to_remove
3263 .keys()
3264 .copied()
3265 .filter(|&id| id != channel_id),
3266 )
3267 .and(
3268 channel_path::Column::IdPath
3269 .not_like(&format!("%/{}/%", channel_id)),
3270 ),
3271 )
3272 .stream(&*tx)
3273 .await?;
3274 while let Some(row) = channels_to_keep.next().await {
3275 let row = row?;
3276 channels_to_remove.remove(&row.channel_id);
3277 }
3278 }
3279
3280 let channel_ancestors = self.get_channel_ancestors(channel_id, &*tx).await?;
3281 let members_to_notify: Vec<UserId> = channel_member::Entity::find()
3282 .filter(channel_member::Column::ChannelId.is_in(channel_ancestors))
3283 .select_only()
3284 .column(channel_member::Column::UserId)
3285 .distinct()
3286 .into_values::<_, QueryUserIds>()
3287 .all(&*tx)
3288 .await?;
3289
3290 channel::Entity::delete_many()
3291 .filter(channel::Column::Id.is_in(channels_to_remove.keys().copied()))
3292 .exec(&*tx)
3293 .await?;
3294
3295 Ok((channels_to_remove.into_keys().collect(), members_to_notify))
3296 })
3297 .await
3298 }
3299
3300 pub async fn invite_channel_member(
3301 &self,
3302 channel_id: ChannelId,
3303 invitee_id: UserId,
3304 inviter_id: UserId,
3305 is_admin: bool,
3306 ) -> Result<()> {
3307 self.transaction(move |tx| async move {
3308 self.check_user_is_channel_admin(channel_id, inviter_id, &*tx)
3309 .await?;
3310
3311 channel_member::ActiveModel {
3312 channel_id: ActiveValue::Set(channel_id),
3313 user_id: ActiveValue::Set(invitee_id),
3314 accepted: ActiveValue::Set(false),
3315 admin: ActiveValue::Set(is_admin),
3316 ..Default::default()
3317 }
3318 .insert(&*tx)
3319 .await?;
3320
3321 Ok(())
3322 })
3323 .await
3324 }
3325
3326 fn sanitize_channel_name(name: &str) -> Result<&str> {
3327 let new_name = name.trim().trim_start_matches('#');
3328 if new_name == "" {
3329 Err(anyhow!("channel name can't be blank"))?;
3330 }
3331 Ok(new_name)
3332 }
3333
3334 pub async fn rename_channel(
3335 &self,
3336 channel_id: ChannelId,
3337 user_id: UserId,
3338 new_name: &str,
3339 ) -> Result<String> {
3340 self.transaction(move |tx| async move {
3341 let new_name = Self::sanitize_channel_name(new_name)?.to_string();
3342
3343 self.check_user_is_channel_admin(channel_id, user_id, &*tx)
3344 .await?;
3345
3346 channel::ActiveModel {
3347 id: ActiveValue::Unchanged(channel_id),
3348 name: ActiveValue::Set(new_name.clone()),
3349 ..Default::default()
3350 }
3351 .update(&*tx)
3352 .await?;
3353
3354 Ok(new_name)
3355 })
3356 .await
3357 }
3358
3359 pub async fn respond_to_channel_invite(
3360 &self,
3361 channel_id: ChannelId,
3362 user_id: UserId,
3363 accept: bool,
3364 ) -> Result<()> {
3365 self.transaction(move |tx| async move {
3366 let rows_affected = if accept {
3367 channel_member::Entity::update_many()
3368 .set(channel_member::ActiveModel {
3369 accepted: ActiveValue::Set(accept),
3370 ..Default::default()
3371 })
3372 .filter(
3373 channel_member::Column::ChannelId
3374 .eq(channel_id)
3375 .and(channel_member::Column::UserId.eq(user_id))
3376 .and(channel_member::Column::Accepted.eq(false)),
3377 )
3378 .exec(&*tx)
3379 .await?
3380 .rows_affected
3381 } else {
3382 channel_member::ActiveModel {
3383 channel_id: ActiveValue::Unchanged(channel_id),
3384 user_id: ActiveValue::Unchanged(user_id),
3385 ..Default::default()
3386 }
3387 .delete(&*tx)
3388 .await?
3389 .rows_affected
3390 };
3391
3392 if rows_affected == 0 {
3393 Err(anyhow!("no such invitation"))?;
3394 }
3395
3396 Ok(())
3397 })
3398 .await
3399 }
3400
3401 pub async fn remove_channel_member(
3402 &self,
3403 channel_id: ChannelId,
3404 member_id: UserId,
3405 remover_id: UserId,
3406 ) -> Result<()> {
3407 self.transaction(|tx| async move {
3408 self.check_user_is_channel_admin(channel_id, remover_id, &*tx)
3409 .await?;
3410
3411 let result = channel_member::Entity::delete_many()
3412 .filter(
3413 channel_member::Column::ChannelId
3414 .eq(channel_id)
3415 .and(channel_member::Column::UserId.eq(member_id)),
3416 )
3417 .exec(&*tx)
3418 .await?;
3419
3420 if result.rows_affected == 0 {
3421 Err(anyhow!("no such member"))?;
3422 }
3423
3424 Ok(())
3425 })
3426 .await
3427 }
3428
3429 pub async fn get_channel_invites_for_user(&self, user_id: UserId) -> Result<Vec<Channel>> {
3430 self.transaction(|tx| async move {
3431 let channel_invites = channel_member::Entity::find()
3432 .filter(
3433 channel_member::Column::UserId
3434 .eq(user_id)
3435 .and(channel_member::Column::Accepted.eq(false)),
3436 )
3437 .all(&*tx)
3438 .await?;
3439
3440 let channels = channel::Entity::find()
3441 .filter(
3442 channel::Column::Id.is_in(
3443 channel_invites
3444 .into_iter()
3445 .map(|channel_member| channel_member.channel_id),
3446 ),
3447 )
3448 .all(&*tx)
3449 .await?;
3450
3451 let channels = channels
3452 .into_iter()
3453 .map(|channel| Channel {
3454 id: channel.id,
3455 name: channel.name,
3456 parent_id: None,
3457 })
3458 .collect();
3459
3460 Ok(channels)
3461 })
3462 .await
3463 }
3464
3465 pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
3466 self.transaction(|tx| async move {
3467 let tx = tx;
3468
3469 let channel_memberships = channel_member::Entity::find()
3470 .filter(
3471 channel_member::Column::UserId
3472 .eq(user_id)
3473 .and(channel_member::Column::Accepted.eq(true)),
3474 )
3475 .all(&*tx)
3476 .await?;
3477
3478 let parents_by_child_id = self
3479 .get_channel_descendants(channel_memberships.iter().map(|m| m.channel_id), &*tx)
3480 .await?;
3481
3482 let channels_with_admin_privileges = channel_memberships
3483 .iter()
3484 .filter_map(|membership| membership.admin.then_some(membership.channel_id))
3485 .collect();
3486
3487 let mut channels = Vec::with_capacity(parents_by_child_id.len());
3488 {
3489 let mut rows = channel::Entity::find()
3490 .filter(channel::Column::Id.is_in(parents_by_child_id.keys().copied()))
3491 .stream(&*tx)
3492 .await?;
3493 while let Some(row) = rows.next().await {
3494 let row = row?;
3495 channels.push(Channel {
3496 id: row.id,
3497 name: row.name,
3498 parent_id: parents_by_child_id.get(&row.id).copied().flatten(),
3499 });
3500 }
3501 }
3502
3503 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
3504 enum QueryUserIdsAndChannelIds {
3505 ChannelId,
3506 UserId,
3507 }
3508
3509 let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
3510 {
3511 let mut rows = room_participant::Entity::find()
3512 .inner_join(room::Entity)
3513 .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
3514 .select_only()
3515 .column(room::Column::ChannelId)
3516 .column(room_participant::Column::UserId)
3517 .into_values::<_, QueryUserIdsAndChannelIds>()
3518 .stream(&*tx)
3519 .await?;
3520 while let Some(row) = rows.next().await {
3521 let row: (ChannelId, UserId) = row?;
3522 channel_participants.entry(row.0).or_default().push(row.1)
3523 }
3524 }
3525
3526 Ok(ChannelsForUser {
3527 channels,
3528 channel_participants,
3529 channels_with_admin_privileges,
3530 })
3531 })
3532 .await
3533 }
3534
3535 pub async fn get_channel_members(&self, id: ChannelId) -> Result<Vec<UserId>> {
3536 self.transaction(|tx| async move { self.get_channel_members_internal(id, &*tx).await })
3537 .await
3538 }
3539
3540 pub async fn set_channel_member_admin(
3541 &self,
3542 channel_id: ChannelId,
3543 from: UserId,
3544 for_user: UserId,
3545 admin: bool,
3546 ) -> Result<()> {
3547 self.transaction(|tx| async move {
3548 self.check_user_is_channel_admin(channel_id, from, &*tx)
3549 .await?;
3550
3551 let result = channel_member::Entity::update_many()
3552 .filter(
3553 channel_member::Column::ChannelId
3554 .eq(channel_id)
3555 .and(channel_member::Column::UserId.eq(for_user)),
3556 )
3557 .set(channel_member::ActiveModel {
3558 admin: ActiveValue::set(admin),
3559 ..Default::default()
3560 })
3561 .exec(&*tx)
3562 .await?;
3563
3564 if result.rows_affected == 0 {
3565 Err(anyhow!("no such member"))?;
3566 }
3567
3568 Ok(())
3569 })
3570 .await
3571 }
3572
3573 pub async fn get_channel_member_details(
3574 &self,
3575 channel_id: ChannelId,
3576 user_id: UserId,
3577 ) -> Result<Vec<proto::ChannelMember>> {
3578 self.transaction(|tx| async move {
3579 self.check_user_is_channel_admin(channel_id, user_id, &*tx)
3580 .await?;
3581
3582 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
3583 enum QueryMemberDetails {
3584 UserId,
3585 Admin,
3586 IsDirectMember,
3587 Accepted,
3588 }
3589
3590 let tx = tx;
3591 let ancestor_ids = self.get_channel_ancestors(channel_id, &*tx).await?;
3592 let mut stream = channel_member::Entity::find()
3593 .distinct()
3594 .filter(channel_member::Column::ChannelId.is_in(ancestor_ids.iter().copied()))
3595 .select_only()
3596 .column(channel_member::Column::UserId)
3597 .column(channel_member::Column::Admin)
3598 .column_as(
3599 channel_member::Column::ChannelId.eq(channel_id),
3600 QueryMemberDetails::IsDirectMember,
3601 )
3602 .column(channel_member::Column::Accepted)
3603 .order_by_asc(channel_member::Column::UserId)
3604 .into_values::<_, QueryMemberDetails>()
3605 .stream(&*tx)
3606 .await?;
3607
3608 let mut rows = Vec::<proto::ChannelMember>::new();
3609 while let Some(row) = stream.next().await {
3610 let (user_id, is_admin, is_direct_member, is_invite_accepted): (
3611 UserId,
3612 bool,
3613 bool,
3614 bool,
3615 ) = row?;
3616 let kind = match (is_direct_member, is_invite_accepted) {
3617 (true, true) => proto::channel_member::Kind::Member,
3618 (true, false) => proto::channel_member::Kind::Invitee,
3619 (false, true) => proto::channel_member::Kind::AncestorMember,
3620 (false, false) => continue,
3621 };
3622 let user_id = user_id.to_proto();
3623 let kind = kind.into();
3624 if let Some(last_row) = rows.last_mut() {
3625 if last_row.user_id == user_id {
3626 if is_direct_member {
3627 last_row.kind = kind;
3628 last_row.admin = is_admin;
3629 }
3630 continue;
3631 }
3632 }
3633 rows.push(proto::ChannelMember {
3634 user_id,
3635 kind,
3636 admin: is_admin,
3637 });
3638 }
3639
3640 Ok(rows)
3641 })
3642 .await
3643 }
3644
3645 pub async fn get_channel_members_internal(
3646 &self,
3647 id: ChannelId,
3648 tx: &DatabaseTransaction,
3649 ) -> Result<Vec<UserId>> {
3650 let ancestor_ids = self.get_channel_ancestors(id, tx).await?;
3651 let user_ids = channel_member::Entity::find()
3652 .distinct()
3653 .filter(channel_member::Column::ChannelId.is_in(ancestor_ids.iter().copied()))
3654 .select_only()
3655 .column(channel_member::Column::UserId)
3656 .into_values::<_, QueryUserIds>()
3657 .all(&*tx)
3658 .await?;
3659 Ok(user_ids)
3660 }
3661
3662 async fn check_user_is_channel_member(
3663 &self,
3664 channel_id: ChannelId,
3665 user_id: UserId,
3666 tx: &DatabaseTransaction,
3667 ) -> Result<()> {
3668 let channel_ids = self.get_channel_ancestors(channel_id, tx).await?;
3669 channel_member::Entity::find()
3670 .filter(
3671 channel_member::Column::ChannelId
3672 .is_in(channel_ids)
3673 .and(channel_member::Column::UserId.eq(user_id)),
3674 )
3675 .one(&*tx)
3676 .await?
3677 .ok_or_else(|| anyhow!("user is not a channel member or channel does not exist"))?;
3678 Ok(())
3679 }
3680
3681 async fn check_user_is_channel_admin(
3682 &self,
3683 channel_id: ChannelId,
3684 user_id: UserId,
3685 tx: &DatabaseTransaction,
3686 ) -> Result<()> {
3687 let channel_ids = self.get_channel_ancestors(channel_id, tx).await?;
3688 channel_member::Entity::find()
3689 .filter(
3690 channel_member::Column::ChannelId
3691 .is_in(channel_ids)
3692 .and(channel_member::Column::UserId.eq(user_id))
3693 .and(channel_member::Column::Admin.eq(true)),
3694 )
3695 .one(&*tx)
3696 .await?
3697 .ok_or_else(|| anyhow!("user is not a channel admin or channel does not exist"))?;
3698 Ok(())
3699 }
3700
3701 async fn get_channel_ancestors(
3702 &self,
3703 channel_id: ChannelId,
3704 tx: &DatabaseTransaction,
3705 ) -> Result<Vec<ChannelId>> {
3706 let paths = channel_path::Entity::find()
3707 .filter(channel_path::Column::ChannelId.eq(channel_id))
3708 .all(tx)
3709 .await?;
3710 let mut channel_ids = Vec::new();
3711 for path in paths {
3712 for id in path.id_path.trim_matches('/').split('/') {
3713 if let Ok(id) = id.parse() {
3714 let id = ChannelId::from_proto(id);
3715 if let Err(ix) = channel_ids.binary_search(&id) {
3716 channel_ids.insert(ix, id);
3717 }
3718 }
3719 }
3720 }
3721 Ok(channel_ids)
3722 }
3723
3724 async fn get_channel_descendants(
3725 &self,
3726 channel_ids: impl IntoIterator<Item = ChannelId>,
3727 tx: &DatabaseTransaction,
3728 ) -> Result<HashMap<ChannelId, Option<ChannelId>>> {
3729 let mut values = String::new();
3730 for id in channel_ids {
3731 if !values.is_empty() {
3732 values.push_str(", ");
3733 }
3734 write!(&mut values, "({})", id).unwrap();
3735 }
3736
3737 if values.is_empty() {
3738 return Ok(HashMap::default());
3739 }
3740
3741 let sql = format!(
3742 r#"
3743 SELECT
3744 descendant_paths.*
3745 FROM
3746 channel_paths parent_paths, channel_paths descendant_paths
3747 WHERE
3748 parent_paths.channel_id IN ({values}) AND
3749 descendant_paths.id_path LIKE (parent_paths.id_path || '%')
3750 "#
3751 );
3752
3753 let stmt = Statement::from_string(self.pool.get_database_backend(), sql);
3754
3755 let mut parents_by_child_id = HashMap::default();
3756 let mut paths = channel_path::Entity::find()
3757 .from_raw_sql(stmt)
3758 .stream(tx)
3759 .await?;
3760
3761 while let Some(path) = paths.next().await {
3762 let path = path?;
3763 let ids = path.id_path.trim_matches('/').split('/');
3764 let mut parent_id = None;
3765 for id in ids {
3766 if let Ok(id) = id.parse() {
3767 let id = ChannelId::from_proto(id);
3768 if id == path.channel_id {
3769 break;
3770 }
3771 parent_id = Some(id);
3772 }
3773 }
3774 parents_by_child_id.insert(path.channel_id, parent_id);
3775 }
3776
3777 Ok(parents_by_child_id)
3778 }
3779
3780 /// Returns the channel with the given ID and:
3781 /// - true if the user is a member
3782 /// - false if the user hasn't accepted the invitation yet
3783 pub async fn get_channel(
3784 &self,
3785 channel_id: ChannelId,
3786 user_id: UserId,
3787 ) -> Result<Option<(Channel, bool)>> {
3788 self.transaction(|tx| async move {
3789 let tx = tx;
3790
3791 let channel = channel::Entity::find_by_id(channel_id).one(&*tx).await?;
3792
3793 if let Some(channel) = channel {
3794 if self
3795 .check_user_is_channel_member(channel_id, user_id, &*tx)
3796 .await
3797 .is_err()
3798 {
3799 return Ok(None);
3800 }
3801
3802 let channel_membership = channel_member::Entity::find()
3803 .filter(
3804 channel_member::Column::ChannelId
3805 .eq(channel_id)
3806 .and(channel_member::Column::UserId.eq(user_id)),
3807 )
3808 .one(&*tx)
3809 .await?;
3810
3811 let is_accepted = channel_membership
3812 .map(|membership| membership.accepted)
3813 .unwrap_or(false);
3814
3815 Ok(Some((
3816 Channel {
3817 id: channel.id,
3818 name: channel.name,
3819 parent_id: None,
3820 },
3821 is_accepted,
3822 )))
3823 } else {
3824 Ok(None)
3825 }
3826 })
3827 .await
3828 }
3829
3830 pub async fn room_id_for_channel(&self, channel_id: ChannelId) -> Result<RoomId> {
3831 self.transaction(|tx| async move {
3832 let tx = tx;
3833 let room = channel::Model {
3834 id: channel_id,
3835 ..Default::default()
3836 }
3837 .find_related(room::Entity)
3838 .one(&*tx)
3839 .await?
3840 .ok_or_else(|| anyhow!("invalid channel"))?;
3841 Ok(room.id)
3842 })
3843 .await
3844 }
3845
3846 async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
3847 where
3848 F: Send + Fn(TransactionHandle) -> Fut,
3849 Fut: Send + Future<Output = Result<T>>,
3850 {
3851 let body = async {
3852 let mut i = 0;
3853 loop {
3854 let (tx, result) = self.with_transaction(&f).await?;
3855 match result {
3856 Ok(result) => match tx.commit().await.map_err(Into::into) {
3857 Ok(()) => return Ok(result),
3858 Err(error) => {
3859 if !self.retry_on_serialization_error(&error, i).await {
3860 return Err(error);
3861 }
3862 }
3863 },
3864 Err(error) => {
3865 tx.rollback().await?;
3866 if !self.retry_on_serialization_error(&error, i).await {
3867 return Err(error);
3868 }
3869 }
3870 }
3871 i += 1;
3872 }
3873 };
3874
3875 self.run(body).await
3876 }
3877
3878 async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
3879 where
3880 F: Send + Fn(TransactionHandle) -> Fut,
3881 Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
3882 {
3883 let body = async {
3884 let mut i = 0;
3885 loop {
3886 let (tx, result) = self.with_transaction(&f).await?;
3887 match result {
3888 Ok(Some((room_id, data))) => {
3889 let lock = self.rooms.entry(room_id).or_default().clone();
3890 let _guard = lock.lock_owned().await;
3891 match tx.commit().await.map_err(Into::into) {
3892 Ok(()) => {
3893 return Ok(Some(RoomGuard {
3894 data,
3895 _guard,
3896 _not_send: PhantomData,
3897 }));
3898 }
3899 Err(error) => {
3900 if !self.retry_on_serialization_error(&error, i).await {
3901 return Err(error);
3902 }
3903 }
3904 }
3905 }
3906 Ok(None) => match tx.commit().await.map_err(Into::into) {
3907 Ok(()) => return Ok(None),
3908 Err(error) => {
3909 if !self.retry_on_serialization_error(&error, i).await {
3910 return Err(error);
3911 }
3912 }
3913 },
3914 Err(error) => {
3915 tx.rollback().await?;
3916 if !self.retry_on_serialization_error(&error, i).await {
3917 return Err(error);
3918 }
3919 }
3920 }
3921 i += 1;
3922 }
3923 };
3924
3925 self.run(body).await
3926 }
3927
3928 async fn room_transaction<F, Fut, T>(&self, room_id: RoomId, f: F) -> Result<RoomGuard<T>>
3929 where
3930 F: Send + Fn(TransactionHandle) -> Fut,
3931 Fut: Send + Future<Output = Result<T>>,
3932 {
3933 let body = async {
3934 let mut i = 0;
3935 loop {
3936 let lock = self.rooms.entry(room_id).or_default().clone();
3937 let _guard = lock.lock_owned().await;
3938 let (tx, result) = self.with_transaction(&f).await?;
3939 match result {
3940 Ok(data) => match tx.commit().await.map_err(Into::into) {
3941 Ok(()) => {
3942 return Ok(RoomGuard {
3943 data,
3944 _guard,
3945 _not_send: PhantomData,
3946 });
3947 }
3948 Err(error) => {
3949 if !self.retry_on_serialization_error(&error, i).await {
3950 return Err(error);
3951 }
3952 }
3953 },
3954 Err(error) => {
3955 tx.rollback().await?;
3956 if !self.retry_on_serialization_error(&error, i).await {
3957 return Err(error);
3958 }
3959 }
3960 }
3961 i += 1;
3962 }
3963 };
3964
3965 self.run(body).await
3966 }
3967
3968 async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
3969 where
3970 F: Send + Fn(TransactionHandle) -> Fut,
3971 Fut: Send + Future<Output = Result<T>>,
3972 {
3973 let tx = self
3974 .pool
3975 .begin_with_config(Some(IsolationLevel::Serializable), None)
3976 .await?;
3977
3978 let mut tx = Arc::new(Some(tx));
3979 let result = f(TransactionHandle(tx.clone())).await;
3980 let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
3981 return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
3982 };
3983
3984 Ok((tx, result))
3985 }
3986
3987 async fn run<F, T>(&self, future: F) -> Result<T>
3988 where
3989 F: Future<Output = Result<T>>,
3990 {
3991 #[cfg(test)]
3992 {
3993 if let Executor::Deterministic(executor) = &self.executor {
3994 executor.simulate_random_delay().await;
3995 }
3996
3997 self.runtime.as_ref().unwrap().block_on(future)
3998 }
3999
4000 #[cfg(not(test))]
4001 {
4002 future.await
4003 }
4004 }
4005
4006 async fn retry_on_serialization_error(&self, error: &Error, prev_attempt_count: u32) -> bool {
4007 // If the error is due to a failure to serialize concurrent transactions, then retry
4008 // this transaction after a delay. With each subsequent retry, double the delay duration.
4009 // Also vary the delay randomly in order to ensure different database connections retry
4010 // at different times.
4011 if is_serialization_error(error) {
4012 let base_delay = 4_u64 << prev_attempt_count.min(16);
4013 let randomized_delay = base_delay as f32 * self.rng.lock().await.gen_range(0.5..=2.0);
4014 log::info!(
4015 "retrying transaction after serialization error. delay: {} ms.",
4016 randomized_delay
4017 );
4018 self.executor
4019 .sleep(Duration::from_millis(randomized_delay as u64))
4020 .await;
4021 true
4022 } else {
4023 false
4024 }
4025 }
4026}
4027
4028fn is_serialization_error(error: &Error) -> bool {
4029 const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
4030 match error {
4031 Error::Database(
4032 DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
4033 | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
4034 ) if error
4035 .as_database_error()
4036 .and_then(|error| error.code())
4037 .as_deref()
4038 == Some(SERIALIZATION_FAILURE_CODE) =>
4039 {
4040 true
4041 }
4042 _ => false,
4043 }
4044}
4045
4046struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
4047
4048impl Deref for TransactionHandle {
4049 type Target = DatabaseTransaction;
4050
4051 fn deref(&self) -> &Self::Target {
4052 self.0.as_ref().as_ref().unwrap()
4053 }
4054}
4055
4056pub struct RoomGuard<T> {
4057 data: T,
4058 _guard: OwnedMutexGuard<()>,
4059 _not_send: PhantomData<Rc<()>>,
4060}
4061
4062impl<T> Deref for RoomGuard<T> {
4063 type Target = T;
4064
4065 fn deref(&self) -> &T {
4066 &self.data
4067 }
4068}
4069
4070impl<T> DerefMut for RoomGuard<T> {
4071 fn deref_mut(&mut self) -> &mut T {
4072 &mut self.data
4073 }
4074}
4075
4076impl<T> RoomGuard<T> {
4077 pub fn into_inner(self) -> T {
4078 self.data
4079 }
4080}
4081
4082#[derive(Debug, Serialize, Deserialize)]
4083pub struct NewUserParams {
4084 pub github_login: String,
4085 pub github_user_id: i32,
4086 pub invite_count: i32,
4087}
4088
4089#[derive(Debug)]
4090pub struct NewUserResult {
4091 pub user_id: UserId,
4092 pub metrics_id: String,
4093 pub inviting_user_id: Option<UserId>,
4094 pub signup_device_id: Option<String>,
4095}
4096
4097#[derive(FromQueryResult, Debug, PartialEq)]
4098pub struct Channel {
4099 pub id: ChannelId,
4100 pub name: String,
4101 pub parent_id: Option<ChannelId>,
4102}
4103
4104#[derive(Debug, PartialEq)]
4105pub struct ChannelsForUser {
4106 pub channels: Vec<Channel>,
4107 pub channel_participants: HashMap<ChannelId, Vec<UserId>>,
4108 pub channels_with_admin_privileges: HashSet<ChannelId>,
4109}
4110
4111fn random_invite_code() -> String {
4112 nanoid::nanoid!(16)
4113}
4114
4115fn random_email_confirmation_code() -> String {
4116 nanoid::nanoid!(64)
4117}
4118
4119macro_rules! id_type {
4120 ($name:ident) => {
4121 #[derive(
4122 Clone,
4123 Copy,
4124 Debug,
4125 Default,
4126 PartialEq,
4127 Eq,
4128 PartialOrd,
4129 Ord,
4130 Hash,
4131 Serialize,
4132 Deserialize,
4133 )]
4134 #[serde(transparent)]
4135 pub struct $name(pub i32);
4136
4137 impl $name {
4138 #[allow(unused)]
4139 pub const MAX: Self = Self(i32::MAX);
4140
4141 #[allow(unused)]
4142 pub fn from_proto(value: u64) -> Self {
4143 Self(value as i32)
4144 }
4145
4146 #[allow(unused)]
4147 pub fn to_proto(self) -> u64 {
4148 self.0 as u64
4149 }
4150 }
4151
4152 impl std::fmt::Display for $name {
4153 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
4154 self.0.fmt(f)
4155 }
4156 }
4157
4158 impl From<$name> for sea_query::Value {
4159 fn from(value: $name) -> Self {
4160 sea_query::Value::Int(Some(value.0))
4161 }
4162 }
4163
4164 impl sea_orm::TryGetable for $name {
4165 fn try_get(
4166 res: &sea_orm::QueryResult,
4167 pre: &str,
4168 col: &str,
4169 ) -> Result<Self, sea_orm::TryGetError> {
4170 Ok(Self(i32::try_get(res, pre, col)?))
4171 }
4172 }
4173
4174 impl sea_query::ValueType for $name {
4175 fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
4176 match v {
4177 Value::TinyInt(Some(int)) => {
4178 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4179 }
4180 Value::SmallInt(Some(int)) => {
4181 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4182 }
4183 Value::Int(Some(int)) => {
4184 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4185 }
4186 Value::BigInt(Some(int)) => {
4187 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4188 }
4189 Value::TinyUnsigned(Some(int)) => {
4190 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4191 }
4192 Value::SmallUnsigned(Some(int)) => {
4193 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4194 }
4195 Value::Unsigned(Some(int)) => {
4196 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4197 }
4198 Value::BigUnsigned(Some(int)) => {
4199 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4200 }
4201 _ => Err(sea_query::ValueTypeErr),
4202 }
4203 }
4204
4205 fn type_name() -> String {
4206 stringify!($name).into()
4207 }
4208
4209 fn array_type() -> sea_query::ArrayType {
4210 sea_query::ArrayType::Int
4211 }
4212
4213 fn column_type() -> sea_query::ColumnType {
4214 sea_query::ColumnType::Integer(None)
4215 }
4216 }
4217
4218 impl sea_orm::TryFromU64 for $name {
4219 fn try_from_u64(n: u64) -> Result<Self, DbErr> {
4220 Ok(Self(n.try_into().map_err(|_| {
4221 DbErr::ConvertFromU64(concat!(
4222 "error converting ",
4223 stringify!($name),
4224 " to u64"
4225 ))
4226 })?))
4227 }
4228 }
4229
4230 impl sea_query::Nullable for $name {
4231 fn null() -> Value {
4232 Value::Int(None)
4233 }
4234 }
4235 };
4236}
4237
4238id_type!(AccessTokenId);
4239id_type!(ChannelId);
4240id_type!(ChannelMemberId);
4241id_type!(ContactId);
4242id_type!(FollowerId);
4243id_type!(RoomId);
4244id_type!(RoomParticipantId);
4245id_type!(ProjectId);
4246id_type!(ProjectCollaboratorId);
4247id_type!(ReplicaId);
4248id_type!(ServerId);
4249id_type!(SignupId);
4250id_type!(UserId);
4251
4252#[derive(Clone)]
4253pub struct JoinRoom {
4254 pub room: proto::Room,
4255 pub channel_id: Option<ChannelId>,
4256 pub channel_members: Vec<UserId>,
4257}
4258
4259pub struct RejoinedRoom {
4260 pub room: proto::Room,
4261 pub rejoined_projects: Vec<RejoinedProject>,
4262 pub reshared_projects: Vec<ResharedProject>,
4263 pub channel_id: Option<ChannelId>,
4264 pub channel_members: Vec<UserId>,
4265}
4266
4267pub struct ResharedProject {
4268 pub id: ProjectId,
4269 pub old_connection_id: ConnectionId,
4270 pub collaborators: Vec<ProjectCollaborator>,
4271 pub worktrees: Vec<proto::WorktreeMetadata>,
4272}
4273
4274pub struct RejoinedProject {
4275 pub id: ProjectId,
4276 pub old_connection_id: ConnectionId,
4277 pub collaborators: Vec<ProjectCollaborator>,
4278 pub worktrees: Vec<RejoinedWorktree>,
4279 pub language_servers: Vec<proto::LanguageServer>,
4280}
4281
4282#[derive(Debug)]
4283pub struct RejoinedWorktree {
4284 pub id: u64,
4285 pub abs_path: String,
4286 pub root_name: String,
4287 pub visible: bool,
4288 pub updated_entries: Vec<proto::Entry>,
4289 pub removed_entries: Vec<u64>,
4290 pub updated_repositories: Vec<proto::RepositoryEntry>,
4291 pub removed_repositories: Vec<u64>,
4292 pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
4293 pub settings_files: Vec<WorktreeSettingsFile>,
4294 pub scan_id: u64,
4295 pub completed_scan_id: u64,
4296}
4297
4298pub struct LeftRoom {
4299 pub room: proto::Room,
4300 pub channel_id: Option<ChannelId>,
4301 pub channel_members: Vec<UserId>,
4302 pub left_projects: HashMap<ProjectId, LeftProject>,
4303 pub canceled_calls_to_user_ids: Vec<UserId>,
4304 pub deleted: bool,
4305}
4306
4307pub struct RefreshedRoom {
4308 pub room: proto::Room,
4309 pub channel_id: Option<ChannelId>,
4310 pub channel_members: Vec<UserId>,
4311 pub stale_participant_user_ids: Vec<UserId>,
4312 pub canceled_calls_to_user_ids: Vec<UserId>,
4313}
4314
4315pub struct Project {
4316 pub collaborators: Vec<ProjectCollaborator>,
4317 pub worktrees: BTreeMap<u64, Worktree>,
4318 pub language_servers: Vec<proto::LanguageServer>,
4319}
4320
4321pub struct ProjectCollaborator {
4322 pub connection_id: ConnectionId,
4323 pub user_id: UserId,
4324 pub replica_id: ReplicaId,
4325 pub is_host: bool,
4326}
4327
4328impl ProjectCollaborator {
4329 pub fn to_proto(&self) -> proto::Collaborator {
4330 proto::Collaborator {
4331 peer_id: Some(self.connection_id.into()),
4332 replica_id: self.replica_id.0 as u32,
4333 user_id: self.user_id.to_proto(),
4334 }
4335 }
4336}
4337
4338#[derive(Debug)]
4339pub struct LeftProject {
4340 pub id: ProjectId,
4341 pub host_user_id: UserId,
4342 pub host_connection_id: ConnectionId,
4343 pub connection_ids: Vec<ConnectionId>,
4344}
4345
4346pub struct Worktree {
4347 pub id: u64,
4348 pub abs_path: String,
4349 pub root_name: String,
4350 pub visible: bool,
4351 pub entries: Vec<proto::Entry>,
4352 pub repository_entries: BTreeMap<u64, proto::RepositoryEntry>,
4353 pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
4354 pub settings_files: Vec<WorktreeSettingsFile>,
4355 pub scan_id: u64,
4356 pub completed_scan_id: u64,
4357}
4358
4359#[derive(Debug)]
4360pub struct WorktreeSettingsFile {
4361 pub path: String,
4362 pub content: String,
4363}
4364
4365#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
4366enum QueryUserIds {
4367 UserId,
4368}
4369
4370#[cfg(test)]
4371pub use test::*;
4372
4373#[cfg(test)]
4374mod test {
4375 use super::*;
4376 use gpui::executor::Background;
4377 use parking_lot::Mutex;
4378 use sea_orm::ConnectionTrait;
4379 use sqlx::migrate::MigrateDatabase;
4380 use std::sync::Arc;
4381
4382 pub struct TestDb {
4383 pub db: Option<Arc<Database>>,
4384 pub connection: Option<sqlx::AnyConnection>,
4385 }
4386
4387 impl TestDb {
4388 pub fn sqlite(background: Arc<Background>) -> Self {
4389 let url = format!("sqlite::memory:");
4390 let runtime = tokio::runtime::Builder::new_current_thread()
4391 .enable_io()
4392 .enable_time()
4393 .build()
4394 .unwrap();
4395
4396 let mut db = runtime.block_on(async {
4397 let mut options = ConnectOptions::new(url);
4398 options.max_connections(5);
4399 let db = Database::new(options, Executor::Deterministic(background))
4400 .await
4401 .unwrap();
4402 let sql = include_str!(concat!(
4403 env!("CARGO_MANIFEST_DIR"),
4404 "/migrations.sqlite/20221109000000_test_schema.sql"
4405 ));
4406 db.pool
4407 .execute(sea_orm::Statement::from_string(
4408 db.pool.get_database_backend(),
4409 sql.into(),
4410 ))
4411 .await
4412 .unwrap();
4413 db
4414 });
4415
4416 db.runtime = Some(runtime);
4417
4418 Self {
4419 db: Some(Arc::new(db)),
4420 connection: None,
4421 }
4422 }
4423
4424 pub fn postgres(background: Arc<Background>) -> Self {
4425 static LOCK: Mutex<()> = Mutex::new(());
4426
4427 let _guard = LOCK.lock();
4428 let mut rng = StdRng::from_entropy();
4429 let url = format!(
4430 "postgres://postgres@localhost/zed-test-{}",
4431 rng.gen::<u128>()
4432 );
4433 let runtime = tokio::runtime::Builder::new_current_thread()
4434 .enable_io()
4435 .enable_time()
4436 .build()
4437 .unwrap();
4438
4439 let mut db = runtime.block_on(async {
4440 sqlx::Postgres::create_database(&url)
4441 .await
4442 .expect("failed to create test db");
4443 let mut options = ConnectOptions::new(url);
4444 options
4445 .max_connections(5)
4446 .idle_timeout(Duration::from_secs(0));
4447 let db = Database::new(options, Executor::Deterministic(background))
4448 .await
4449 .unwrap();
4450 let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
4451 db.migrate(Path::new(migrations_path), false).await.unwrap();
4452 db
4453 });
4454
4455 db.runtime = Some(runtime);
4456
4457 Self {
4458 db: Some(Arc::new(db)),
4459 connection: None,
4460 }
4461 }
4462
4463 pub fn db(&self) -> &Arc<Database> {
4464 self.db.as_ref().unwrap()
4465 }
4466 }
4467
4468 impl Drop for TestDb {
4469 fn drop(&mut self) {
4470 let db = self.db.take().unwrap();
4471 if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
4472 db.runtime.as_ref().unwrap().block_on(async {
4473 use util::ResultExt;
4474 let query = "
4475 SELECT pg_terminate_backend(pg_stat_activity.pid)
4476 FROM pg_stat_activity
4477 WHERE
4478 pg_stat_activity.datname = current_database() AND
4479 pid <> pg_backend_pid();
4480 ";
4481 db.pool
4482 .execute(sea_orm::Statement::from_string(
4483 db.pool.get_database_backend(),
4484 query.into(),
4485 ))
4486 .await
4487 .log_err();
4488 sqlx::Postgres::drop_database(db.options.get_url())
4489 .await
4490 .log_err();
4491 })
4492 }
4493 }
4494 }
4495}