1mod access_token;
2mod channel;
3mod channel_member;
4mod channel_path;
5mod contact;
6mod follower;
7mod language_server;
8mod project;
9mod project_collaborator;
10mod room;
11mod room_participant;
12mod server;
13mod signup;
14#[cfg(test)]
15mod tests;
16mod user;
17mod worktree;
18mod worktree_diagnostic_summary;
19mod worktree_entry;
20mod worktree_repository;
21mod worktree_repository_statuses;
22mod worktree_settings_file;
23
24use crate::executor::Executor;
25use crate::{Error, Result};
26use anyhow::anyhow;
27use collections::{BTreeMap, HashMap, HashSet};
28pub use contact::Contact;
29use dashmap::DashMap;
30use futures::StreamExt;
31use hyper::StatusCode;
32use rand::prelude::StdRng;
33use rand::{Rng, SeedableRng};
34use rpc::{proto, ConnectionId};
35use sea_orm::Condition;
36pub use sea_orm::ConnectOptions;
37use sea_orm::{
38 entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
39 DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
40 Statement, TransactionTrait,
41};
42use sea_query::{Alias, Expr, OnConflict, Query};
43use serde::{Deserialize, Serialize};
44pub use signup::{Invite, NewSignup, WaitlistSummary};
45use sqlx::migrate::{Migrate, Migration, MigrationSource};
46use sqlx::Connection;
47use std::fmt::Write as _;
48use std::ops::{Deref, DerefMut};
49use std::path::Path;
50use std::time::Duration;
51use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
52use tokio::sync::{Mutex, OwnedMutexGuard};
53pub use user::Model as User;
54
55pub struct Database {
56 options: ConnectOptions,
57 pool: DatabaseConnection,
58 rooms: DashMap<RoomId, Arc<Mutex<()>>>,
59 rng: Mutex<StdRng>,
60 executor: Executor,
61 #[cfg(test)]
62 runtime: Option<tokio::runtime::Runtime>,
63}
64
65impl Database {
66 pub async fn new(options: ConnectOptions, executor: Executor) -> Result<Self> {
67 Ok(Self {
68 options: options.clone(),
69 pool: sea_orm::Database::connect(options).await?,
70 rooms: DashMap::with_capacity(16384),
71 rng: Mutex::new(StdRng::seed_from_u64(0)),
72 executor,
73 #[cfg(test)]
74 runtime: None,
75 })
76 }
77
78 #[cfg(test)]
79 pub fn reset(&self) {
80 self.rooms.clear();
81 }
82
83 pub async fn migrate(
84 &self,
85 migrations_path: &Path,
86 ignore_checksum_mismatch: bool,
87 ) -> anyhow::Result<Vec<(Migration, Duration)>> {
88 let migrations = MigrationSource::resolve(migrations_path)
89 .await
90 .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
91
92 let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
93
94 connection.ensure_migrations_table().await?;
95 let applied_migrations: HashMap<_, _> = connection
96 .list_applied_migrations()
97 .await?
98 .into_iter()
99 .map(|m| (m.version, m))
100 .collect();
101
102 let mut new_migrations = Vec::new();
103 for migration in migrations {
104 match applied_migrations.get(&migration.version) {
105 Some(applied_migration) => {
106 if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
107 {
108 Err(anyhow!(
109 "checksum mismatch for applied migration {}",
110 migration.description
111 ))?;
112 }
113 }
114 None => {
115 let elapsed = connection.apply(&migration).await?;
116 new_migrations.push((migration, elapsed));
117 }
118 }
119 }
120
121 Ok(new_migrations)
122 }
123
124 pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
125 self.transaction(|tx| async move {
126 let server = server::ActiveModel {
127 environment: ActiveValue::set(environment.into()),
128 ..Default::default()
129 }
130 .insert(&*tx)
131 .await?;
132 Ok(server.id)
133 })
134 .await
135 }
136
137 pub async fn stale_room_ids(
138 &self,
139 environment: &str,
140 new_server_id: ServerId,
141 ) -> Result<Vec<RoomId>> {
142 self.transaction(|tx| async move {
143 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
144 enum QueryAs {
145 RoomId,
146 }
147
148 let stale_server_epochs = self
149 .stale_server_ids(environment, new_server_id, &tx)
150 .await?;
151 Ok(room_participant::Entity::find()
152 .select_only()
153 .column(room_participant::Column::RoomId)
154 .distinct()
155 .filter(
156 room_participant::Column::AnsweringConnectionServerId
157 .is_in(stale_server_epochs),
158 )
159 .into_values::<_, QueryAs>()
160 .all(&*tx)
161 .await?)
162 })
163 .await
164 }
165
166 pub async fn refresh_room(
167 &self,
168 room_id: RoomId,
169 new_server_id: ServerId,
170 ) -> Result<RoomGuard<RefreshedRoom>> {
171 self.room_transaction(room_id, |tx| async move {
172 let stale_participant_filter = Condition::all()
173 .add(room_participant::Column::RoomId.eq(room_id))
174 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
175 .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
176
177 let stale_participant_user_ids = room_participant::Entity::find()
178 .filter(stale_participant_filter.clone())
179 .all(&*tx)
180 .await?
181 .into_iter()
182 .map(|participant| participant.user_id)
183 .collect::<Vec<_>>();
184
185 // Delete participants who failed to reconnect and cancel their calls.
186 let mut canceled_calls_to_user_ids = Vec::new();
187 room_participant::Entity::delete_many()
188 .filter(stale_participant_filter)
189 .exec(&*tx)
190 .await?;
191 let called_participants = room_participant::Entity::find()
192 .filter(
193 Condition::all()
194 .add(
195 room_participant::Column::CallingUserId
196 .is_in(stale_participant_user_ids.iter().copied()),
197 )
198 .add(room_participant::Column::AnsweringConnectionId.is_null()),
199 )
200 .all(&*tx)
201 .await?;
202 room_participant::Entity::delete_many()
203 .filter(
204 room_participant::Column::Id
205 .is_in(called_participants.iter().map(|participant| participant.id)),
206 )
207 .exec(&*tx)
208 .await?;
209 canceled_calls_to_user_ids.extend(
210 called_participants
211 .into_iter()
212 .map(|participant| participant.user_id),
213 );
214
215 let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
216 let channel_members;
217 if let Some(channel_id) = channel_id {
218 channel_members = self.get_channel_members_internal(channel_id, &tx).await?;
219 } else {
220 channel_members = Vec::new();
221
222 // Delete the room if it becomes empty.
223 if room.participants.is_empty() {
224 project::Entity::delete_many()
225 .filter(project::Column::RoomId.eq(room_id))
226 .exec(&*tx)
227 .await?;
228 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
229 }
230 };
231
232 Ok(RefreshedRoom {
233 room,
234 channel_id,
235 channel_members,
236 stale_participant_user_ids,
237 canceled_calls_to_user_ids,
238 })
239 })
240 .await
241 }
242
243 pub async fn delete_stale_servers(
244 &self,
245 environment: &str,
246 new_server_id: ServerId,
247 ) -> Result<()> {
248 self.transaction(|tx| async move {
249 server::Entity::delete_many()
250 .filter(
251 Condition::all()
252 .add(server::Column::Environment.eq(environment))
253 .add(server::Column::Id.ne(new_server_id)),
254 )
255 .exec(&*tx)
256 .await?;
257 Ok(())
258 })
259 .await
260 }
261
262 async fn stale_server_ids(
263 &self,
264 environment: &str,
265 new_server_id: ServerId,
266 tx: &DatabaseTransaction,
267 ) -> Result<Vec<ServerId>> {
268 let stale_servers = server::Entity::find()
269 .filter(
270 Condition::all()
271 .add(server::Column::Environment.eq(environment))
272 .add(server::Column::Id.ne(new_server_id)),
273 )
274 .all(&*tx)
275 .await?;
276 Ok(stale_servers.into_iter().map(|server| server.id).collect())
277 }
278
279 // users
280
281 pub async fn create_user(
282 &self,
283 email_address: &str,
284 admin: bool,
285 params: NewUserParams,
286 ) -> Result<NewUserResult> {
287 self.transaction(|tx| async {
288 let tx = tx;
289 let user = user::Entity::insert(user::ActiveModel {
290 email_address: ActiveValue::set(Some(email_address.into())),
291 github_login: ActiveValue::set(params.github_login.clone()),
292 github_user_id: ActiveValue::set(Some(params.github_user_id)),
293 admin: ActiveValue::set(admin),
294 metrics_id: ActiveValue::set(Uuid::new_v4()),
295 ..Default::default()
296 })
297 .on_conflict(
298 OnConflict::column(user::Column::GithubLogin)
299 .update_column(user::Column::GithubLogin)
300 .to_owned(),
301 )
302 .exec_with_returning(&*tx)
303 .await?;
304
305 Ok(NewUserResult {
306 user_id: user.id,
307 metrics_id: user.metrics_id.to_string(),
308 signup_device_id: None,
309 inviting_user_id: None,
310 })
311 })
312 .await
313 }
314
315 pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
316 self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
317 .await
318 }
319
320 pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
321 self.transaction(|tx| async {
322 let tx = tx;
323 Ok(user::Entity::find()
324 .filter(user::Column::Id.is_in(ids.iter().copied()))
325 .all(&*tx)
326 .await?)
327 })
328 .await
329 }
330
331 pub async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>> {
332 self.transaction(|tx| async move {
333 Ok(user::Entity::find()
334 .filter(user::Column::GithubLogin.eq(github_login))
335 .one(&*tx)
336 .await?)
337 })
338 .await
339 }
340
341 pub async fn get_or_create_user_by_github_account(
342 &self,
343 github_login: &str,
344 github_user_id: Option<i32>,
345 github_email: Option<&str>,
346 ) -> Result<Option<User>> {
347 self.transaction(|tx| async move {
348 let tx = &*tx;
349 if let Some(github_user_id) = github_user_id {
350 if let Some(user_by_github_user_id) = user::Entity::find()
351 .filter(user::Column::GithubUserId.eq(github_user_id))
352 .one(tx)
353 .await?
354 {
355 let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
356 user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
357 Ok(Some(user_by_github_user_id.update(tx).await?))
358 } else if let Some(user_by_github_login) = user::Entity::find()
359 .filter(user::Column::GithubLogin.eq(github_login))
360 .one(tx)
361 .await?
362 {
363 let mut user_by_github_login = user_by_github_login.into_active_model();
364 user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
365 Ok(Some(user_by_github_login.update(tx).await?))
366 } else {
367 let user = user::Entity::insert(user::ActiveModel {
368 email_address: ActiveValue::set(github_email.map(|email| email.into())),
369 github_login: ActiveValue::set(github_login.into()),
370 github_user_id: ActiveValue::set(Some(github_user_id)),
371 admin: ActiveValue::set(false),
372 invite_count: ActiveValue::set(0),
373 invite_code: ActiveValue::set(None),
374 metrics_id: ActiveValue::set(Uuid::new_v4()),
375 ..Default::default()
376 })
377 .exec_with_returning(&*tx)
378 .await?;
379 Ok(Some(user))
380 }
381 } else {
382 Ok(user::Entity::find()
383 .filter(user::Column::GithubLogin.eq(github_login))
384 .one(tx)
385 .await?)
386 }
387 })
388 .await
389 }
390
391 pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
392 self.transaction(|tx| async move {
393 Ok(user::Entity::find()
394 .order_by_asc(user::Column::GithubLogin)
395 .limit(limit as u64)
396 .offset(page as u64 * limit as u64)
397 .all(&*tx)
398 .await?)
399 })
400 .await
401 }
402
403 pub async fn get_users_with_no_invites(
404 &self,
405 invited_by_another_user: bool,
406 ) -> Result<Vec<User>> {
407 self.transaction(|tx| async move {
408 Ok(user::Entity::find()
409 .filter(
410 user::Column::InviteCount
411 .eq(0)
412 .and(if invited_by_another_user {
413 user::Column::InviterId.is_not_null()
414 } else {
415 user::Column::InviterId.is_null()
416 }),
417 )
418 .all(&*tx)
419 .await?)
420 })
421 .await
422 }
423
424 pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
425 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
426 enum QueryAs {
427 MetricsId,
428 }
429
430 self.transaction(|tx| async move {
431 let metrics_id: Uuid = user::Entity::find_by_id(id)
432 .select_only()
433 .column(user::Column::MetricsId)
434 .into_values::<_, QueryAs>()
435 .one(&*tx)
436 .await?
437 .ok_or_else(|| anyhow!("could not find user"))?;
438 Ok(metrics_id.to_string())
439 })
440 .await
441 }
442
443 pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
444 self.transaction(|tx| async move {
445 user::Entity::update_many()
446 .filter(user::Column::Id.eq(id))
447 .set(user::ActiveModel {
448 admin: ActiveValue::set(is_admin),
449 ..Default::default()
450 })
451 .exec(&*tx)
452 .await?;
453 Ok(())
454 })
455 .await
456 }
457
458 pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
459 self.transaction(|tx| async move {
460 user::Entity::update_many()
461 .filter(user::Column::Id.eq(id))
462 .set(user::ActiveModel {
463 connected_once: ActiveValue::set(connected_once),
464 ..Default::default()
465 })
466 .exec(&*tx)
467 .await?;
468 Ok(())
469 })
470 .await
471 }
472
473 pub async fn destroy_user(&self, id: UserId) -> Result<()> {
474 self.transaction(|tx| async move {
475 access_token::Entity::delete_many()
476 .filter(access_token::Column::UserId.eq(id))
477 .exec(&*tx)
478 .await?;
479 user::Entity::delete_by_id(id).exec(&*tx).await?;
480 Ok(())
481 })
482 .await
483 }
484
485 // contacts
486
487 pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
488 #[derive(Debug, FromQueryResult)]
489 struct ContactWithUserBusyStatuses {
490 user_id_a: UserId,
491 user_id_b: UserId,
492 a_to_b: bool,
493 accepted: bool,
494 should_notify: bool,
495 user_a_busy: bool,
496 user_b_busy: bool,
497 }
498
499 self.transaction(|tx| async move {
500 let user_a_participant = Alias::new("user_a_participant");
501 let user_b_participant = Alias::new("user_b_participant");
502 let mut db_contacts = contact::Entity::find()
503 .column_as(
504 Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
505 .is_not_null(),
506 "user_a_busy",
507 )
508 .column_as(
509 Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
510 .is_not_null(),
511 "user_b_busy",
512 )
513 .filter(
514 contact::Column::UserIdA
515 .eq(user_id)
516 .or(contact::Column::UserIdB.eq(user_id)),
517 )
518 .join_as(
519 JoinType::LeftJoin,
520 contact::Relation::UserARoomParticipant.def(),
521 user_a_participant,
522 )
523 .join_as(
524 JoinType::LeftJoin,
525 contact::Relation::UserBRoomParticipant.def(),
526 user_b_participant,
527 )
528 .into_model::<ContactWithUserBusyStatuses>()
529 .stream(&*tx)
530 .await?;
531
532 let mut contacts = Vec::new();
533 while let Some(db_contact) = db_contacts.next().await {
534 let db_contact = db_contact?;
535 if db_contact.user_id_a == user_id {
536 if db_contact.accepted {
537 contacts.push(Contact::Accepted {
538 user_id: db_contact.user_id_b,
539 should_notify: db_contact.should_notify && db_contact.a_to_b,
540 busy: db_contact.user_b_busy,
541 });
542 } else if db_contact.a_to_b {
543 contacts.push(Contact::Outgoing {
544 user_id: db_contact.user_id_b,
545 })
546 } else {
547 contacts.push(Contact::Incoming {
548 user_id: db_contact.user_id_b,
549 should_notify: db_contact.should_notify,
550 });
551 }
552 } else if db_contact.accepted {
553 contacts.push(Contact::Accepted {
554 user_id: db_contact.user_id_a,
555 should_notify: db_contact.should_notify && !db_contact.a_to_b,
556 busy: db_contact.user_a_busy,
557 });
558 } else if db_contact.a_to_b {
559 contacts.push(Contact::Incoming {
560 user_id: db_contact.user_id_a,
561 should_notify: db_contact.should_notify,
562 });
563 } else {
564 contacts.push(Contact::Outgoing {
565 user_id: db_contact.user_id_a,
566 });
567 }
568 }
569
570 contacts.sort_unstable_by_key(|contact| contact.user_id());
571
572 Ok(contacts)
573 })
574 .await
575 }
576
577 pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
578 self.transaction(|tx| async move {
579 let participant = room_participant::Entity::find()
580 .filter(room_participant::Column::UserId.eq(user_id))
581 .one(&*tx)
582 .await?;
583 Ok(participant.is_some())
584 })
585 .await
586 }
587
588 pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
589 self.transaction(|tx| async move {
590 let (id_a, id_b) = if user_id_1 < user_id_2 {
591 (user_id_1, user_id_2)
592 } else {
593 (user_id_2, user_id_1)
594 };
595
596 Ok(contact::Entity::find()
597 .filter(
598 contact::Column::UserIdA
599 .eq(id_a)
600 .and(contact::Column::UserIdB.eq(id_b))
601 .and(contact::Column::Accepted.eq(true)),
602 )
603 .one(&*tx)
604 .await?
605 .is_some())
606 })
607 .await
608 }
609
610 pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
611 self.transaction(|tx| async move {
612 let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
613 (sender_id, receiver_id, true)
614 } else {
615 (receiver_id, sender_id, false)
616 };
617
618 let rows_affected = contact::Entity::insert(contact::ActiveModel {
619 user_id_a: ActiveValue::set(id_a),
620 user_id_b: ActiveValue::set(id_b),
621 a_to_b: ActiveValue::set(a_to_b),
622 accepted: ActiveValue::set(false),
623 should_notify: ActiveValue::set(true),
624 ..Default::default()
625 })
626 .on_conflict(
627 OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
628 .values([
629 (contact::Column::Accepted, true.into()),
630 (contact::Column::ShouldNotify, false.into()),
631 ])
632 .action_and_where(
633 contact::Column::Accepted.eq(false).and(
634 contact::Column::AToB
635 .eq(a_to_b)
636 .and(contact::Column::UserIdA.eq(id_b))
637 .or(contact::Column::AToB
638 .ne(a_to_b)
639 .and(contact::Column::UserIdA.eq(id_a))),
640 ),
641 )
642 .to_owned(),
643 )
644 .exec_without_returning(&*tx)
645 .await?;
646
647 if rows_affected == 1 {
648 Ok(())
649 } else {
650 Err(anyhow!("contact already requested"))?
651 }
652 })
653 .await
654 }
655
656 /// Returns a bool indicating whether the removed contact had originally accepted or not
657 ///
658 /// Deletes the contact identified by the requester and responder ids, and then returns
659 /// whether the deleted contact had originally accepted or was a pending contact request.
660 ///
661 /// # Arguments
662 ///
663 /// * `requester_id` - The user that initiates this request
664 /// * `responder_id` - The user that will be removed
665 pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<bool> {
666 self.transaction(|tx| async move {
667 let (id_a, id_b) = if responder_id < requester_id {
668 (responder_id, requester_id)
669 } else {
670 (requester_id, responder_id)
671 };
672
673 let contact = contact::Entity::find()
674 .filter(
675 contact::Column::UserIdA
676 .eq(id_a)
677 .and(contact::Column::UserIdB.eq(id_b)),
678 )
679 .one(&*tx)
680 .await?
681 .ok_or_else(|| anyhow!("no such contact"))?;
682
683 contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
684 Ok(contact.accepted)
685 })
686 .await
687 }
688
689 pub async fn dismiss_contact_notification(
690 &self,
691 user_id: UserId,
692 contact_user_id: UserId,
693 ) -> Result<()> {
694 self.transaction(|tx| async move {
695 let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
696 (user_id, contact_user_id, true)
697 } else {
698 (contact_user_id, user_id, false)
699 };
700
701 let result = contact::Entity::update_many()
702 .set(contact::ActiveModel {
703 should_notify: ActiveValue::set(false),
704 ..Default::default()
705 })
706 .filter(
707 contact::Column::UserIdA
708 .eq(id_a)
709 .and(contact::Column::UserIdB.eq(id_b))
710 .and(
711 contact::Column::AToB
712 .eq(a_to_b)
713 .and(contact::Column::Accepted.eq(true))
714 .or(contact::Column::AToB
715 .ne(a_to_b)
716 .and(contact::Column::Accepted.eq(false))),
717 ),
718 )
719 .exec(&*tx)
720 .await?;
721 if result.rows_affected == 0 {
722 Err(anyhow!("no such contact request"))?
723 } else {
724 Ok(())
725 }
726 })
727 .await
728 }
729
730 pub async fn respond_to_contact_request(
731 &self,
732 responder_id: UserId,
733 requester_id: UserId,
734 accept: bool,
735 ) -> Result<()> {
736 self.transaction(|tx| async move {
737 let (id_a, id_b, a_to_b) = if responder_id < requester_id {
738 (responder_id, requester_id, false)
739 } else {
740 (requester_id, responder_id, true)
741 };
742 let rows_affected = if accept {
743 let result = contact::Entity::update_many()
744 .set(contact::ActiveModel {
745 accepted: ActiveValue::set(true),
746 should_notify: ActiveValue::set(true),
747 ..Default::default()
748 })
749 .filter(
750 contact::Column::UserIdA
751 .eq(id_a)
752 .and(contact::Column::UserIdB.eq(id_b))
753 .and(contact::Column::AToB.eq(a_to_b)),
754 )
755 .exec(&*tx)
756 .await?;
757 result.rows_affected
758 } else {
759 let result = contact::Entity::delete_many()
760 .filter(
761 contact::Column::UserIdA
762 .eq(id_a)
763 .and(contact::Column::UserIdB.eq(id_b))
764 .and(contact::Column::AToB.eq(a_to_b))
765 .and(contact::Column::Accepted.eq(false)),
766 )
767 .exec(&*tx)
768 .await?;
769
770 result.rows_affected
771 };
772
773 if rows_affected == 1 {
774 Ok(())
775 } else {
776 Err(anyhow!("no such contact request"))?
777 }
778 })
779 .await
780 }
781
782 pub fn fuzzy_like_string(string: &str) -> String {
783 let mut result = String::with_capacity(string.len() * 2 + 1);
784 for c in string.chars() {
785 if c.is_alphanumeric() {
786 result.push('%');
787 result.push(c);
788 }
789 }
790 result.push('%');
791 result
792 }
793
794 pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
795 self.transaction(|tx| async {
796 let tx = tx;
797 let like_string = Self::fuzzy_like_string(name_query);
798 let query = "
799 SELECT users.*
800 FROM users
801 WHERE github_login ILIKE $1
802 ORDER BY github_login <-> $2
803 LIMIT $3
804 ";
805
806 Ok(user::Entity::find()
807 .from_raw_sql(Statement::from_sql_and_values(
808 self.pool.get_database_backend(),
809 query.into(),
810 vec![like_string.into(), name_query.into(), limit.into()],
811 ))
812 .all(&*tx)
813 .await?)
814 })
815 .await
816 }
817
818 // signups
819
820 pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
821 self.transaction(|tx| async move {
822 signup::Entity::insert(signup::ActiveModel {
823 email_address: ActiveValue::set(signup.email_address.clone()),
824 email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
825 email_confirmation_sent: ActiveValue::set(false),
826 platform_mac: ActiveValue::set(signup.platform_mac),
827 platform_windows: ActiveValue::set(signup.platform_windows),
828 platform_linux: ActiveValue::set(signup.platform_linux),
829 platform_unknown: ActiveValue::set(false),
830 editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
831 programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
832 device_id: ActiveValue::set(signup.device_id.clone()),
833 added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
834 ..Default::default()
835 })
836 .on_conflict(
837 OnConflict::column(signup::Column::EmailAddress)
838 .update_columns([
839 signup::Column::PlatformMac,
840 signup::Column::PlatformWindows,
841 signup::Column::PlatformLinux,
842 signup::Column::EditorFeatures,
843 signup::Column::ProgrammingLanguages,
844 signup::Column::DeviceId,
845 signup::Column::AddedToMailingList,
846 ])
847 .to_owned(),
848 )
849 .exec(&*tx)
850 .await?;
851 Ok(())
852 })
853 .await
854 }
855
856 pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
857 self.transaction(|tx| async move {
858 let signup = signup::Entity::find()
859 .filter(signup::Column::EmailAddress.eq(email_address))
860 .one(&*tx)
861 .await?
862 .ok_or_else(|| {
863 anyhow!("signup with email address {} doesn't exist", email_address)
864 })?;
865
866 Ok(signup)
867 })
868 .await
869 }
870
871 pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
872 self.transaction(|tx| async move {
873 let query = "
874 SELECT
875 COUNT(*) as count,
876 COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
877 COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
878 COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
879 COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
880 FROM (
881 SELECT *
882 FROM signups
883 WHERE
884 NOT email_confirmation_sent
885 ) AS unsent
886 ";
887 Ok(
888 WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
889 self.pool.get_database_backend(),
890 query.into(),
891 vec![],
892 ))
893 .one(&*tx)
894 .await?
895 .ok_or_else(|| anyhow!("invalid result"))?,
896 )
897 })
898 .await
899 }
900
901 pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
902 let emails = invites
903 .iter()
904 .map(|s| s.email_address.as_str())
905 .collect::<Vec<_>>();
906 self.transaction(|tx| async {
907 let tx = tx;
908 signup::Entity::update_many()
909 .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
910 .set(signup::ActiveModel {
911 email_confirmation_sent: ActiveValue::set(true),
912 ..Default::default()
913 })
914 .exec(&*tx)
915 .await?;
916 Ok(())
917 })
918 .await
919 }
920
921 pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
922 self.transaction(|tx| async move {
923 Ok(signup::Entity::find()
924 .select_only()
925 .column(signup::Column::EmailAddress)
926 .column(signup::Column::EmailConfirmationCode)
927 .filter(
928 signup::Column::EmailConfirmationSent.eq(false).and(
929 signup::Column::PlatformMac
930 .eq(true)
931 .or(signup::Column::PlatformUnknown.eq(true)),
932 ),
933 )
934 .order_by_asc(signup::Column::CreatedAt)
935 .limit(count as u64)
936 .into_model()
937 .all(&*tx)
938 .await?)
939 })
940 .await
941 }
942
943 // invite codes
944
945 pub async fn create_invite_from_code(
946 &self,
947 code: &str,
948 email_address: &str,
949 device_id: Option<&str>,
950 added_to_mailing_list: bool,
951 ) -> Result<Invite> {
952 self.transaction(|tx| async move {
953 let existing_user = user::Entity::find()
954 .filter(user::Column::EmailAddress.eq(email_address))
955 .one(&*tx)
956 .await?;
957
958 if existing_user.is_some() {
959 Err(anyhow!("email address is already in use"))?;
960 }
961
962 let inviting_user_with_invites = match user::Entity::find()
963 .filter(
964 user::Column::InviteCode
965 .eq(code)
966 .and(user::Column::InviteCount.gt(0)),
967 )
968 .one(&*tx)
969 .await?
970 {
971 Some(inviting_user) => inviting_user,
972 None => {
973 return Err(Error::Http(
974 StatusCode::UNAUTHORIZED,
975 "unable to find an invite code with invites remaining".to_string(),
976 ))?
977 }
978 };
979 user::Entity::update_many()
980 .filter(
981 user::Column::Id
982 .eq(inviting_user_with_invites.id)
983 .and(user::Column::InviteCount.gt(0)),
984 )
985 .col_expr(
986 user::Column::InviteCount,
987 Expr::col(user::Column::InviteCount).sub(1),
988 )
989 .exec(&*tx)
990 .await?;
991
992 let signup = signup::Entity::insert(signup::ActiveModel {
993 email_address: ActiveValue::set(email_address.into()),
994 email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
995 email_confirmation_sent: ActiveValue::set(false),
996 inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
997 platform_linux: ActiveValue::set(false),
998 platform_mac: ActiveValue::set(false),
999 platform_windows: ActiveValue::set(false),
1000 platform_unknown: ActiveValue::set(true),
1001 device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
1002 added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
1003 ..Default::default()
1004 })
1005 .on_conflict(
1006 OnConflict::column(signup::Column::EmailAddress)
1007 .update_column(signup::Column::InvitingUserId)
1008 .to_owned(),
1009 )
1010 .exec_with_returning(&*tx)
1011 .await?;
1012
1013 Ok(Invite {
1014 email_address: signup.email_address,
1015 email_confirmation_code: signup.email_confirmation_code,
1016 })
1017 })
1018 .await
1019 }
1020
1021 pub async fn create_user_from_invite(
1022 &self,
1023 invite: &Invite,
1024 user: NewUserParams,
1025 ) -> Result<Option<NewUserResult>> {
1026 self.transaction(|tx| async {
1027 let tx = tx;
1028 let signup = signup::Entity::find()
1029 .filter(
1030 signup::Column::EmailAddress
1031 .eq(invite.email_address.as_str())
1032 .and(
1033 signup::Column::EmailConfirmationCode
1034 .eq(invite.email_confirmation_code.as_str()),
1035 ),
1036 )
1037 .one(&*tx)
1038 .await?
1039 .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
1040
1041 if signup.user_id.is_some() {
1042 return Ok(None);
1043 }
1044
1045 let user = user::Entity::insert(user::ActiveModel {
1046 email_address: ActiveValue::set(Some(invite.email_address.clone())),
1047 github_login: ActiveValue::set(user.github_login.clone()),
1048 github_user_id: ActiveValue::set(Some(user.github_user_id)),
1049 admin: ActiveValue::set(false),
1050 invite_count: ActiveValue::set(user.invite_count),
1051 invite_code: ActiveValue::set(Some(random_invite_code())),
1052 metrics_id: ActiveValue::set(Uuid::new_v4()),
1053 ..Default::default()
1054 })
1055 .on_conflict(
1056 OnConflict::column(user::Column::GithubLogin)
1057 .update_columns([
1058 user::Column::EmailAddress,
1059 user::Column::GithubUserId,
1060 user::Column::Admin,
1061 ])
1062 .to_owned(),
1063 )
1064 .exec_with_returning(&*tx)
1065 .await?;
1066
1067 let mut signup = signup.into_active_model();
1068 signup.user_id = ActiveValue::set(Some(user.id));
1069 let signup = signup.update(&*tx).await?;
1070
1071 if let Some(inviting_user_id) = signup.inviting_user_id {
1072 let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1073 (inviting_user_id, user.id, true)
1074 } else {
1075 (user.id, inviting_user_id, false)
1076 };
1077
1078 contact::Entity::insert(contact::ActiveModel {
1079 user_id_a: ActiveValue::set(user_id_a),
1080 user_id_b: ActiveValue::set(user_id_b),
1081 a_to_b: ActiveValue::set(a_to_b),
1082 should_notify: ActiveValue::set(true),
1083 accepted: ActiveValue::set(true),
1084 ..Default::default()
1085 })
1086 .on_conflict(OnConflict::new().do_nothing().to_owned())
1087 .exec_without_returning(&*tx)
1088 .await?;
1089 }
1090
1091 Ok(Some(NewUserResult {
1092 user_id: user.id,
1093 metrics_id: user.metrics_id.to_string(),
1094 inviting_user_id: signup.inviting_user_id,
1095 signup_device_id: signup.device_id,
1096 }))
1097 })
1098 .await
1099 }
1100
1101 pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1102 self.transaction(|tx| async move {
1103 if count > 0 {
1104 user::Entity::update_many()
1105 .filter(
1106 user::Column::Id
1107 .eq(id)
1108 .and(user::Column::InviteCode.is_null()),
1109 )
1110 .set(user::ActiveModel {
1111 invite_code: ActiveValue::set(Some(random_invite_code())),
1112 ..Default::default()
1113 })
1114 .exec(&*tx)
1115 .await?;
1116 }
1117
1118 user::Entity::update_many()
1119 .filter(user::Column::Id.eq(id))
1120 .set(user::ActiveModel {
1121 invite_count: ActiveValue::set(count),
1122 ..Default::default()
1123 })
1124 .exec(&*tx)
1125 .await?;
1126 Ok(())
1127 })
1128 .await
1129 }
1130
1131 pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1132 self.transaction(|tx| async move {
1133 match user::Entity::find_by_id(id).one(&*tx).await? {
1134 Some(user) if user.invite_code.is_some() => {
1135 Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1136 }
1137 _ => Ok(None),
1138 }
1139 })
1140 .await
1141 }
1142
1143 pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1144 self.transaction(|tx| async move {
1145 user::Entity::find()
1146 .filter(user::Column::InviteCode.eq(code))
1147 .one(&*tx)
1148 .await?
1149 .ok_or_else(|| {
1150 Error::Http(
1151 StatusCode::NOT_FOUND,
1152 "that invite code does not exist".to_string(),
1153 )
1154 })
1155 })
1156 .await
1157 }
1158
1159 // rooms
1160
1161 pub async fn incoming_call_for_user(
1162 &self,
1163 user_id: UserId,
1164 ) -> Result<Option<proto::IncomingCall>> {
1165 self.transaction(|tx| async move {
1166 let pending_participant = room_participant::Entity::find()
1167 .filter(
1168 room_participant::Column::UserId
1169 .eq(user_id)
1170 .and(room_participant::Column::AnsweringConnectionId.is_null()),
1171 )
1172 .one(&*tx)
1173 .await?;
1174
1175 if let Some(pending_participant) = pending_participant {
1176 let room = self.get_room(pending_participant.room_id, &tx).await?;
1177 Ok(Self::build_incoming_call(&room, user_id))
1178 } else {
1179 Ok(None)
1180 }
1181 })
1182 .await
1183 }
1184
1185 pub async fn create_room(
1186 &self,
1187 user_id: UserId,
1188 connection: ConnectionId,
1189 live_kit_room: &str,
1190 ) -> Result<proto::Room> {
1191 self.transaction(|tx| async move {
1192 let room = room::ActiveModel {
1193 live_kit_room: ActiveValue::set(live_kit_room.into()),
1194 ..Default::default()
1195 }
1196 .insert(&*tx)
1197 .await?;
1198 room_participant::ActiveModel {
1199 room_id: ActiveValue::set(room.id),
1200 user_id: ActiveValue::set(user_id),
1201 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1202 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1203 connection.owner_id as i32,
1204 ))),
1205 answering_connection_lost: ActiveValue::set(false),
1206 calling_user_id: ActiveValue::set(user_id),
1207 calling_connection_id: ActiveValue::set(connection.id as i32),
1208 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1209 connection.owner_id as i32,
1210 ))),
1211 ..Default::default()
1212 }
1213 .insert(&*tx)
1214 .await?;
1215
1216 let room = self.get_room(room.id, &tx).await?;
1217 Ok(room)
1218 })
1219 .await
1220 }
1221
1222 pub async fn call(
1223 &self,
1224 room_id: RoomId,
1225 calling_user_id: UserId,
1226 calling_connection: ConnectionId,
1227 called_user_id: UserId,
1228 initial_project_id: Option<ProjectId>,
1229 ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1230 self.room_transaction(room_id, |tx| async move {
1231 room_participant::ActiveModel {
1232 room_id: ActiveValue::set(room_id),
1233 user_id: ActiveValue::set(called_user_id),
1234 answering_connection_lost: ActiveValue::set(false),
1235 calling_user_id: ActiveValue::set(calling_user_id),
1236 calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1237 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1238 calling_connection.owner_id as i32,
1239 ))),
1240 initial_project_id: ActiveValue::set(initial_project_id),
1241 ..Default::default()
1242 }
1243 .insert(&*tx)
1244 .await?;
1245
1246 let room = self.get_room(room_id, &tx).await?;
1247 let incoming_call = Self::build_incoming_call(&room, called_user_id)
1248 .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1249 Ok((room, incoming_call))
1250 })
1251 .await
1252 }
1253
1254 pub async fn call_failed(
1255 &self,
1256 room_id: RoomId,
1257 called_user_id: UserId,
1258 ) -> Result<RoomGuard<proto::Room>> {
1259 self.room_transaction(room_id, |tx| async move {
1260 room_participant::Entity::delete_many()
1261 .filter(
1262 room_participant::Column::RoomId
1263 .eq(room_id)
1264 .and(room_participant::Column::UserId.eq(called_user_id)),
1265 )
1266 .exec(&*tx)
1267 .await?;
1268 let room = self.get_room(room_id, &tx).await?;
1269 Ok(room)
1270 })
1271 .await
1272 }
1273
1274 pub async fn decline_call(
1275 &self,
1276 expected_room_id: Option<RoomId>,
1277 user_id: UserId,
1278 ) -> Result<Option<RoomGuard<proto::Room>>> {
1279 self.optional_room_transaction(|tx| async move {
1280 let mut filter = Condition::all()
1281 .add(room_participant::Column::UserId.eq(user_id))
1282 .add(room_participant::Column::AnsweringConnectionId.is_null());
1283 if let Some(room_id) = expected_room_id {
1284 filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1285 }
1286 let participant = room_participant::Entity::find()
1287 .filter(filter)
1288 .one(&*tx)
1289 .await?;
1290
1291 let participant = if let Some(participant) = participant {
1292 participant
1293 } else if expected_room_id.is_some() {
1294 return Err(anyhow!("could not find call to decline"))?;
1295 } else {
1296 return Ok(None);
1297 };
1298
1299 let room_id = participant.room_id;
1300 room_participant::Entity::delete(participant.into_active_model())
1301 .exec(&*tx)
1302 .await?;
1303
1304 let room = self.get_room(room_id, &tx).await?;
1305 Ok(Some((room_id, room)))
1306 })
1307 .await
1308 }
1309
1310 pub async fn cancel_call(
1311 &self,
1312 room_id: RoomId,
1313 calling_connection: ConnectionId,
1314 called_user_id: UserId,
1315 ) -> Result<RoomGuard<proto::Room>> {
1316 self.room_transaction(room_id, |tx| async move {
1317 let participant = room_participant::Entity::find()
1318 .filter(
1319 Condition::all()
1320 .add(room_participant::Column::UserId.eq(called_user_id))
1321 .add(room_participant::Column::RoomId.eq(room_id))
1322 .add(
1323 room_participant::Column::CallingConnectionId
1324 .eq(calling_connection.id as i32),
1325 )
1326 .add(
1327 room_participant::Column::CallingConnectionServerId
1328 .eq(calling_connection.owner_id as i32),
1329 )
1330 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1331 )
1332 .one(&*tx)
1333 .await?
1334 .ok_or_else(|| anyhow!("no call to cancel"))?;
1335
1336 room_participant::Entity::delete(participant.into_active_model())
1337 .exec(&*tx)
1338 .await?;
1339
1340 let room = self.get_room(room_id, &tx).await?;
1341 Ok(room)
1342 })
1343 .await
1344 }
1345
1346 pub async fn is_current_room_different_channel(
1347 &self,
1348 user_id: UserId,
1349 channel_id: ChannelId,
1350 ) -> Result<bool> {
1351 self.transaction(|tx| async move {
1352 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1353 enum QueryAs {
1354 ChannelId,
1355 }
1356
1357 let channel_id_model: Option<ChannelId> = room_participant::Entity::find()
1358 .select_only()
1359 .column_as(room::Column::ChannelId, QueryAs::ChannelId)
1360 .inner_join(room::Entity)
1361 .filter(room_participant::Column::UserId.eq(user_id))
1362 .into_values::<_, QueryAs>()
1363 .one(&*tx)
1364 .await?;
1365
1366 let result = channel_id_model
1367 .map(|channel_id_model| channel_id_model != channel_id)
1368 .unwrap_or(false);
1369
1370 Ok(result)
1371 })
1372 .await
1373 }
1374
1375 pub async fn join_room(
1376 &self,
1377 room_id: RoomId,
1378 user_id: UserId,
1379 connection: ConnectionId,
1380 ) -> Result<RoomGuard<JoinRoom>> {
1381 self.room_transaction(room_id, |tx| async move {
1382 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1383 enum QueryChannelId {
1384 ChannelId,
1385 }
1386 let channel_id: Option<ChannelId> = room::Entity::find()
1387 .select_only()
1388 .column(room::Column::ChannelId)
1389 .filter(room::Column::Id.eq(room_id))
1390 .into_values::<_, QueryChannelId>()
1391 .one(&*tx)
1392 .await?
1393 .ok_or_else(|| anyhow!("no such room"))?;
1394
1395 if let Some(channel_id) = channel_id {
1396 self.check_user_is_channel_member(channel_id, user_id, &*tx)
1397 .await?;
1398
1399 room_participant::Entity::insert_many([room_participant::ActiveModel {
1400 room_id: ActiveValue::set(room_id),
1401 user_id: ActiveValue::set(user_id),
1402 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1403 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1404 connection.owner_id as i32,
1405 ))),
1406 answering_connection_lost: ActiveValue::set(false),
1407 calling_user_id: ActiveValue::set(user_id),
1408 calling_connection_id: ActiveValue::set(connection.id as i32),
1409 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1410 connection.owner_id as i32,
1411 ))),
1412 ..Default::default()
1413 }])
1414 .on_conflict(
1415 OnConflict::columns([room_participant::Column::UserId])
1416 .update_columns([
1417 room_participant::Column::AnsweringConnectionId,
1418 room_participant::Column::AnsweringConnectionServerId,
1419 room_participant::Column::AnsweringConnectionLost,
1420 ])
1421 .to_owned(),
1422 )
1423 .exec(&*tx)
1424 .await?;
1425 } else {
1426 let result = room_participant::Entity::update_many()
1427 .filter(
1428 Condition::all()
1429 .add(room_participant::Column::RoomId.eq(room_id))
1430 .add(room_participant::Column::UserId.eq(user_id))
1431 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1432 )
1433 .set(room_participant::ActiveModel {
1434 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1435 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1436 connection.owner_id as i32,
1437 ))),
1438 answering_connection_lost: ActiveValue::set(false),
1439 ..Default::default()
1440 })
1441 .exec(&*tx)
1442 .await?;
1443 if result.rows_affected == 0 {
1444 Err(anyhow!("room does not exist or was already joined"))?;
1445 }
1446 }
1447
1448 let room = self.get_room(room_id, &tx).await?;
1449 let channel_members = if let Some(channel_id) = channel_id {
1450 self.get_channel_members_internal(channel_id, &tx).await?
1451 } else {
1452 Vec::new()
1453 };
1454 Ok(JoinRoom {
1455 room,
1456 channel_id,
1457 channel_members,
1458 })
1459 })
1460 .await
1461 }
1462
1463 pub async fn rejoin_room(
1464 &self,
1465 rejoin_room: proto::RejoinRoom,
1466 user_id: UserId,
1467 connection: ConnectionId,
1468 ) -> Result<RoomGuard<RejoinedRoom>> {
1469 let room_id = RoomId::from_proto(rejoin_room.id);
1470 self.room_transaction(room_id, |tx| async {
1471 let tx = tx;
1472 let participant_update = room_participant::Entity::update_many()
1473 .filter(
1474 Condition::all()
1475 .add(room_participant::Column::RoomId.eq(room_id))
1476 .add(room_participant::Column::UserId.eq(user_id))
1477 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1478 .add(
1479 Condition::any()
1480 .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1481 .add(
1482 room_participant::Column::AnsweringConnectionServerId
1483 .ne(connection.owner_id as i32),
1484 ),
1485 ),
1486 )
1487 .set(room_participant::ActiveModel {
1488 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1489 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1490 connection.owner_id as i32,
1491 ))),
1492 answering_connection_lost: ActiveValue::set(false),
1493 ..Default::default()
1494 })
1495 .exec(&*tx)
1496 .await?;
1497 if participant_update.rows_affected == 0 {
1498 return Err(anyhow!("room does not exist or was already joined"))?;
1499 }
1500
1501 let mut reshared_projects = Vec::new();
1502 for reshared_project in &rejoin_room.reshared_projects {
1503 let project_id = ProjectId::from_proto(reshared_project.project_id);
1504 let project = project::Entity::find_by_id(project_id)
1505 .one(&*tx)
1506 .await?
1507 .ok_or_else(|| anyhow!("project does not exist"))?;
1508 if project.host_user_id != user_id {
1509 return Err(anyhow!("no such project"))?;
1510 }
1511
1512 let mut collaborators = project
1513 .find_related(project_collaborator::Entity)
1514 .all(&*tx)
1515 .await?;
1516 let host_ix = collaborators
1517 .iter()
1518 .position(|collaborator| {
1519 collaborator.user_id == user_id && collaborator.is_host
1520 })
1521 .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1522 let host = collaborators.swap_remove(host_ix);
1523 let old_connection_id = host.connection();
1524
1525 project::Entity::update(project::ActiveModel {
1526 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1527 host_connection_server_id: ActiveValue::set(Some(ServerId(
1528 connection.owner_id as i32,
1529 ))),
1530 ..project.into_active_model()
1531 })
1532 .exec(&*tx)
1533 .await?;
1534 project_collaborator::Entity::update(project_collaborator::ActiveModel {
1535 connection_id: ActiveValue::set(connection.id as i32),
1536 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1537 ..host.into_active_model()
1538 })
1539 .exec(&*tx)
1540 .await?;
1541
1542 self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1543 .await?;
1544
1545 reshared_projects.push(ResharedProject {
1546 id: project_id,
1547 old_connection_id,
1548 collaborators: collaborators
1549 .iter()
1550 .map(|collaborator| ProjectCollaborator {
1551 connection_id: collaborator.connection(),
1552 user_id: collaborator.user_id,
1553 replica_id: collaborator.replica_id,
1554 is_host: collaborator.is_host,
1555 })
1556 .collect(),
1557 worktrees: reshared_project.worktrees.clone(),
1558 });
1559 }
1560
1561 project::Entity::delete_many()
1562 .filter(
1563 Condition::all()
1564 .add(project::Column::RoomId.eq(room_id))
1565 .add(project::Column::HostUserId.eq(user_id))
1566 .add(
1567 project::Column::Id
1568 .is_not_in(reshared_projects.iter().map(|project| project.id)),
1569 ),
1570 )
1571 .exec(&*tx)
1572 .await?;
1573
1574 let mut rejoined_projects = Vec::new();
1575 for rejoined_project in &rejoin_room.rejoined_projects {
1576 let project_id = ProjectId::from_proto(rejoined_project.id);
1577 let Some(project) = project::Entity::find_by_id(project_id)
1578 .one(&*tx)
1579 .await? else { continue };
1580
1581 let mut worktrees = Vec::new();
1582 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1583 for db_worktree in db_worktrees {
1584 let mut worktree = RejoinedWorktree {
1585 id: db_worktree.id as u64,
1586 abs_path: db_worktree.abs_path,
1587 root_name: db_worktree.root_name,
1588 visible: db_worktree.visible,
1589 updated_entries: Default::default(),
1590 removed_entries: Default::default(),
1591 updated_repositories: Default::default(),
1592 removed_repositories: Default::default(),
1593 diagnostic_summaries: Default::default(),
1594 settings_files: Default::default(),
1595 scan_id: db_worktree.scan_id as u64,
1596 completed_scan_id: db_worktree.completed_scan_id as u64,
1597 };
1598
1599 let rejoined_worktree = rejoined_project
1600 .worktrees
1601 .iter()
1602 .find(|worktree| worktree.id == db_worktree.id as u64);
1603
1604 // File entries
1605 {
1606 let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1607 worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1608 } else {
1609 worktree_entry::Column::IsDeleted.eq(false)
1610 };
1611
1612 let mut db_entries = worktree_entry::Entity::find()
1613 .filter(
1614 Condition::all()
1615 .add(worktree_entry::Column::ProjectId.eq(project.id))
1616 .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1617 .add(entry_filter),
1618 )
1619 .stream(&*tx)
1620 .await?;
1621
1622 while let Some(db_entry) = db_entries.next().await {
1623 let db_entry = db_entry?;
1624 if db_entry.is_deleted {
1625 worktree.removed_entries.push(db_entry.id as u64);
1626 } else {
1627 worktree.updated_entries.push(proto::Entry {
1628 id: db_entry.id as u64,
1629 is_dir: db_entry.is_dir,
1630 path: db_entry.path,
1631 inode: db_entry.inode as u64,
1632 mtime: Some(proto::Timestamp {
1633 seconds: db_entry.mtime_seconds as u64,
1634 nanos: db_entry.mtime_nanos as u32,
1635 }),
1636 is_symlink: db_entry.is_symlink,
1637 is_ignored: db_entry.is_ignored,
1638 is_external: db_entry.is_external,
1639 git_status: db_entry.git_status.map(|status| status as i32),
1640 });
1641 }
1642 }
1643 }
1644
1645 // Repository Entries
1646 {
1647 let repository_entry_filter =
1648 if let Some(rejoined_worktree) = rejoined_worktree {
1649 worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
1650 } else {
1651 worktree_repository::Column::IsDeleted.eq(false)
1652 };
1653
1654 let mut db_repositories = worktree_repository::Entity::find()
1655 .filter(
1656 Condition::all()
1657 .add(worktree_repository::Column::ProjectId.eq(project.id))
1658 .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
1659 .add(repository_entry_filter),
1660 )
1661 .stream(&*tx)
1662 .await?;
1663
1664 while let Some(db_repository) = db_repositories.next().await {
1665 let db_repository = db_repository?;
1666 if db_repository.is_deleted {
1667 worktree
1668 .removed_repositories
1669 .push(db_repository.work_directory_id as u64);
1670 } else {
1671 worktree.updated_repositories.push(proto::RepositoryEntry {
1672 work_directory_id: db_repository.work_directory_id as u64,
1673 branch: db_repository.branch,
1674 });
1675 }
1676 }
1677 }
1678
1679 worktrees.push(worktree);
1680 }
1681
1682 let language_servers = project
1683 .find_related(language_server::Entity)
1684 .all(&*tx)
1685 .await?
1686 .into_iter()
1687 .map(|language_server| proto::LanguageServer {
1688 id: language_server.id as u64,
1689 name: language_server.name,
1690 })
1691 .collect::<Vec<_>>();
1692
1693 {
1694 let mut db_settings_files = worktree_settings_file::Entity::find()
1695 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
1696 .stream(&*tx)
1697 .await?;
1698 while let Some(db_settings_file) = db_settings_files.next().await {
1699 let db_settings_file = db_settings_file?;
1700 if let Some(worktree) = worktrees
1701 .iter_mut()
1702 .find(|w| w.id == db_settings_file.worktree_id as u64)
1703 {
1704 worktree.settings_files.push(WorktreeSettingsFile {
1705 path: db_settings_file.path,
1706 content: db_settings_file.content,
1707 });
1708 }
1709 }
1710 }
1711
1712 let mut collaborators = project
1713 .find_related(project_collaborator::Entity)
1714 .all(&*tx)
1715 .await?;
1716 let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1717 .iter()
1718 .position(|collaborator| collaborator.user_id == user_id)
1719 {
1720 collaborators.swap_remove(self_collaborator_ix)
1721 } else {
1722 continue;
1723 };
1724 let old_connection_id = self_collaborator.connection();
1725 project_collaborator::Entity::update(project_collaborator::ActiveModel {
1726 connection_id: ActiveValue::set(connection.id as i32),
1727 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1728 ..self_collaborator.into_active_model()
1729 })
1730 .exec(&*tx)
1731 .await?;
1732
1733 let collaborators = collaborators
1734 .into_iter()
1735 .map(|collaborator| ProjectCollaborator {
1736 connection_id: collaborator.connection(),
1737 user_id: collaborator.user_id,
1738 replica_id: collaborator.replica_id,
1739 is_host: collaborator.is_host,
1740 })
1741 .collect::<Vec<_>>();
1742
1743 rejoined_projects.push(RejoinedProject {
1744 id: project_id,
1745 old_connection_id,
1746 collaborators,
1747 worktrees,
1748 language_servers,
1749 });
1750 }
1751
1752 let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
1753 let channel_members = if let Some(channel_id) = channel_id {
1754 self.get_channel_members_internal(channel_id, &tx).await?
1755 } else {
1756 Vec::new()
1757 };
1758
1759 Ok(RejoinedRoom {
1760 room,
1761 channel_id,
1762 channel_members,
1763 rejoined_projects,
1764 reshared_projects,
1765 })
1766 })
1767 .await
1768 }
1769
1770 pub async fn leave_room(
1771 &self,
1772 connection: ConnectionId,
1773 ) -> Result<Option<RoomGuard<LeftRoom>>> {
1774 self.optional_room_transaction(|tx| async move {
1775 let leaving_participant = room_participant::Entity::find()
1776 .filter(
1777 Condition::all()
1778 .add(
1779 room_participant::Column::AnsweringConnectionId
1780 .eq(connection.id as i32),
1781 )
1782 .add(
1783 room_participant::Column::AnsweringConnectionServerId
1784 .eq(connection.owner_id as i32),
1785 ),
1786 )
1787 .one(&*tx)
1788 .await?;
1789
1790 if let Some(leaving_participant) = leaving_participant {
1791 // Leave room.
1792 let room_id = leaving_participant.room_id;
1793 room_participant::Entity::delete_by_id(leaving_participant.id)
1794 .exec(&*tx)
1795 .await?;
1796
1797 // Cancel pending calls initiated by the leaving user.
1798 let called_participants = room_participant::Entity::find()
1799 .filter(
1800 Condition::all()
1801 .add(
1802 room_participant::Column::CallingUserId
1803 .eq(leaving_participant.user_id),
1804 )
1805 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1806 )
1807 .all(&*tx)
1808 .await?;
1809 room_participant::Entity::delete_many()
1810 .filter(
1811 room_participant::Column::Id
1812 .is_in(called_participants.iter().map(|participant| participant.id)),
1813 )
1814 .exec(&*tx)
1815 .await?;
1816 let canceled_calls_to_user_ids = called_participants
1817 .into_iter()
1818 .map(|participant| participant.user_id)
1819 .collect();
1820
1821 // Detect left projects.
1822 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1823 enum QueryProjectIds {
1824 ProjectId,
1825 }
1826 let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1827 .select_only()
1828 .column_as(
1829 project_collaborator::Column::ProjectId,
1830 QueryProjectIds::ProjectId,
1831 )
1832 .filter(
1833 Condition::all()
1834 .add(
1835 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1836 )
1837 .add(
1838 project_collaborator::Column::ConnectionServerId
1839 .eq(connection.owner_id as i32),
1840 ),
1841 )
1842 .into_values::<_, QueryProjectIds>()
1843 .all(&*tx)
1844 .await?;
1845 let mut left_projects = HashMap::default();
1846 let mut collaborators = project_collaborator::Entity::find()
1847 .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1848 .stream(&*tx)
1849 .await?;
1850 while let Some(collaborator) = collaborators.next().await {
1851 let collaborator = collaborator?;
1852 let left_project =
1853 left_projects
1854 .entry(collaborator.project_id)
1855 .or_insert(LeftProject {
1856 id: collaborator.project_id,
1857 host_user_id: Default::default(),
1858 connection_ids: Default::default(),
1859 host_connection_id: Default::default(),
1860 });
1861
1862 let collaborator_connection_id = collaborator.connection();
1863 if collaborator_connection_id != connection {
1864 left_project.connection_ids.push(collaborator_connection_id);
1865 }
1866
1867 if collaborator.is_host {
1868 left_project.host_user_id = collaborator.user_id;
1869 left_project.host_connection_id = collaborator_connection_id;
1870 }
1871 }
1872 drop(collaborators);
1873
1874 // Leave projects.
1875 project_collaborator::Entity::delete_many()
1876 .filter(
1877 Condition::all()
1878 .add(
1879 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1880 )
1881 .add(
1882 project_collaborator::Column::ConnectionServerId
1883 .eq(connection.owner_id as i32),
1884 ),
1885 )
1886 .exec(&*tx)
1887 .await?;
1888
1889 // Unshare projects.
1890 project::Entity::delete_many()
1891 .filter(
1892 Condition::all()
1893 .add(project::Column::RoomId.eq(room_id))
1894 .add(project::Column::HostConnectionId.eq(connection.id as i32))
1895 .add(
1896 project::Column::HostConnectionServerId
1897 .eq(connection.owner_id as i32),
1898 ),
1899 )
1900 .exec(&*tx)
1901 .await?;
1902
1903 let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
1904 let deleted = if room.participants.is_empty() {
1905 let result = room::Entity::delete_by_id(room_id)
1906 .filter(room::Column::ChannelId.is_null())
1907 .exec(&*tx)
1908 .await?;
1909 result.rows_affected > 0
1910 } else {
1911 false
1912 };
1913
1914 let channel_members = if let Some(channel_id) = channel_id {
1915 self.get_channel_members_internal(channel_id, &tx).await?
1916 } else {
1917 Vec::new()
1918 };
1919 let left_room = LeftRoom {
1920 room,
1921 channel_id,
1922 channel_members,
1923 left_projects,
1924 canceled_calls_to_user_ids,
1925 deleted,
1926 };
1927
1928 if left_room.room.participants.is_empty() {
1929 self.rooms.remove(&room_id);
1930 }
1931
1932 Ok(Some((room_id, left_room)))
1933 } else {
1934 Ok(None)
1935 }
1936 })
1937 .await
1938 }
1939
1940 pub async fn follow(
1941 &self,
1942 project_id: ProjectId,
1943 leader_connection: ConnectionId,
1944 follower_connection: ConnectionId,
1945 ) -> Result<RoomGuard<proto::Room>> {
1946 let room_id = self.room_id_for_project(project_id).await?;
1947 self.room_transaction(room_id, |tx| async move {
1948 follower::ActiveModel {
1949 room_id: ActiveValue::set(room_id),
1950 project_id: ActiveValue::set(project_id),
1951 leader_connection_server_id: ActiveValue::set(ServerId(
1952 leader_connection.owner_id as i32,
1953 )),
1954 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1955 follower_connection_server_id: ActiveValue::set(ServerId(
1956 follower_connection.owner_id as i32,
1957 )),
1958 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1959 ..Default::default()
1960 }
1961 .insert(&*tx)
1962 .await?;
1963
1964 let room = self.get_room(room_id, &*tx).await?;
1965 Ok(room)
1966 })
1967 .await
1968 }
1969
1970 pub async fn unfollow(
1971 &self,
1972 project_id: ProjectId,
1973 leader_connection: ConnectionId,
1974 follower_connection: ConnectionId,
1975 ) -> Result<RoomGuard<proto::Room>> {
1976 let room_id = self.room_id_for_project(project_id).await?;
1977 self.room_transaction(room_id, |tx| async move {
1978 follower::Entity::delete_many()
1979 .filter(
1980 Condition::all()
1981 .add(follower::Column::ProjectId.eq(project_id))
1982 .add(
1983 follower::Column::LeaderConnectionServerId
1984 .eq(leader_connection.owner_id),
1985 )
1986 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1987 .add(
1988 follower::Column::FollowerConnectionServerId
1989 .eq(follower_connection.owner_id),
1990 )
1991 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1992 )
1993 .exec(&*tx)
1994 .await?;
1995
1996 let room = self.get_room(room_id, &*tx).await?;
1997 Ok(room)
1998 })
1999 .await
2000 }
2001
2002 pub async fn update_room_participant_location(
2003 &self,
2004 room_id: RoomId,
2005 connection: ConnectionId,
2006 location: proto::ParticipantLocation,
2007 ) -> Result<RoomGuard<proto::Room>> {
2008 self.room_transaction(room_id, |tx| async {
2009 let tx = tx;
2010 let location_kind;
2011 let location_project_id;
2012 match location
2013 .variant
2014 .as_ref()
2015 .ok_or_else(|| anyhow!("invalid location"))?
2016 {
2017 proto::participant_location::Variant::SharedProject(project) => {
2018 location_kind = 0;
2019 location_project_id = Some(ProjectId::from_proto(project.id));
2020 }
2021 proto::participant_location::Variant::UnsharedProject(_) => {
2022 location_kind = 1;
2023 location_project_id = None;
2024 }
2025 proto::participant_location::Variant::External(_) => {
2026 location_kind = 2;
2027 location_project_id = None;
2028 }
2029 }
2030
2031 let result = room_participant::Entity::update_many()
2032 .filter(
2033 Condition::all()
2034 .add(room_participant::Column::RoomId.eq(room_id))
2035 .add(
2036 room_participant::Column::AnsweringConnectionId
2037 .eq(connection.id as i32),
2038 )
2039 .add(
2040 room_participant::Column::AnsweringConnectionServerId
2041 .eq(connection.owner_id as i32),
2042 ),
2043 )
2044 .set(room_participant::ActiveModel {
2045 location_kind: ActiveValue::set(Some(location_kind)),
2046 location_project_id: ActiveValue::set(location_project_id),
2047 ..Default::default()
2048 })
2049 .exec(&*tx)
2050 .await?;
2051
2052 if result.rows_affected == 1 {
2053 let room = self.get_room(room_id, &tx).await?;
2054 Ok(room)
2055 } else {
2056 Err(anyhow!("could not update room participant location"))?
2057 }
2058 })
2059 .await
2060 }
2061
2062 pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
2063 self.transaction(|tx| async move {
2064 let participant = room_participant::Entity::find()
2065 .filter(
2066 Condition::all()
2067 .add(
2068 room_participant::Column::AnsweringConnectionId
2069 .eq(connection.id as i32),
2070 )
2071 .add(
2072 room_participant::Column::AnsweringConnectionServerId
2073 .eq(connection.owner_id as i32),
2074 ),
2075 )
2076 .one(&*tx)
2077 .await?
2078 .ok_or_else(|| anyhow!("not a participant in any room"))?;
2079
2080 room_participant::Entity::update(room_participant::ActiveModel {
2081 answering_connection_lost: ActiveValue::set(true),
2082 ..participant.into_active_model()
2083 })
2084 .exec(&*tx)
2085 .await?;
2086
2087 Ok(())
2088 })
2089 .await
2090 }
2091
2092 fn build_incoming_call(
2093 room: &proto::Room,
2094 called_user_id: UserId,
2095 ) -> Option<proto::IncomingCall> {
2096 let pending_participant = room
2097 .pending_participants
2098 .iter()
2099 .find(|participant| participant.user_id == called_user_id.to_proto())?;
2100
2101 Some(proto::IncomingCall {
2102 room_id: room.id,
2103 calling_user_id: pending_participant.calling_user_id,
2104 participant_user_ids: room
2105 .participants
2106 .iter()
2107 .map(|participant| participant.user_id)
2108 .collect(),
2109 initial_project: room.participants.iter().find_map(|participant| {
2110 let initial_project_id = pending_participant.initial_project_id?;
2111 participant
2112 .projects
2113 .iter()
2114 .find(|project| project.id == initial_project_id)
2115 .cloned()
2116 }),
2117 })
2118 }
2119 async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
2120 let (_, room) = self.get_channel_room(room_id, tx).await?;
2121 Ok(room)
2122 }
2123
2124 async fn get_channel_room(
2125 &self,
2126 room_id: RoomId,
2127 tx: &DatabaseTransaction,
2128 ) -> Result<(Option<ChannelId>, proto::Room)> {
2129 let db_room = room::Entity::find_by_id(room_id)
2130 .one(tx)
2131 .await?
2132 .ok_or_else(|| anyhow!("could not find room"))?;
2133
2134 let mut db_participants = db_room
2135 .find_related(room_participant::Entity)
2136 .stream(tx)
2137 .await?;
2138 let mut participants = HashMap::default();
2139 let mut pending_participants = Vec::new();
2140 while let Some(db_participant) = db_participants.next().await {
2141 let db_participant = db_participant?;
2142 if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
2143 .answering_connection_id
2144 .zip(db_participant.answering_connection_server_id)
2145 {
2146 let location = match (
2147 db_participant.location_kind,
2148 db_participant.location_project_id,
2149 ) {
2150 (Some(0), Some(project_id)) => {
2151 Some(proto::participant_location::Variant::SharedProject(
2152 proto::participant_location::SharedProject {
2153 id: project_id.to_proto(),
2154 },
2155 ))
2156 }
2157 (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
2158 Default::default(),
2159 )),
2160 _ => Some(proto::participant_location::Variant::External(
2161 Default::default(),
2162 )),
2163 };
2164
2165 let answering_connection = ConnectionId {
2166 owner_id: answering_connection_server_id.0 as u32,
2167 id: answering_connection_id as u32,
2168 };
2169 participants.insert(
2170 answering_connection,
2171 proto::Participant {
2172 user_id: db_participant.user_id.to_proto(),
2173 peer_id: Some(answering_connection.into()),
2174 projects: Default::default(),
2175 location: Some(proto::ParticipantLocation { variant: location }),
2176 },
2177 );
2178 } else {
2179 pending_participants.push(proto::PendingParticipant {
2180 user_id: db_participant.user_id.to_proto(),
2181 calling_user_id: db_participant.calling_user_id.to_proto(),
2182 initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
2183 });
2184 }
2185 }
2186 drop(db_participants);
2187
2188 let mut db_projects = db_room
2189 .find_related(project::Entity)
2190 .find_with_related(worktree::Entity)
2191 .stream(tx)
2192 .await?;
2193
2194 while let Some(row) = db_projects.next().await {
2195 let (db_project, db_worktree) = row?;
2196 let host_connection = db_project.host_connection()?;
2197 if let Some(participant) = participants.get_mut(&host_connection) {
2198 let project = if let Some(project) = participant
2199 .projects
2200 .iter_mut()
2201 .find(|project| project.id == db_project.id.to_proto())
2202 {
2203 project
2204 } else {
2205 participant.projects.push(proto::ParticipantProject {
2206 id: db_project.id.to_proto(),
2207 worktree_root_names: Default::default(),
2208 });
2209 participant.projects.last_mut().unwrap()
2210 };
2211
2212 if let Some(db_worktree) = db_worktree {
2213 if db_worktree.visible {
2214 project.worktree_root_names.push(db_worktree.root_name);
2215 }
2216 }
2217 }
2218 }
2219 drop(db_projects);
2220
2221 let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
2222 let mut followers = Vec::new();
2223 while let Some(db_follower) = db_followers.next().await {
2224 let db_follower = db_follower?;
2225 followers.push(proto::Follower {
2226 leader_id: Some(db_follower.leader_connection().into()),
2227 follower_id: Some(db_follower.follower_connection().into()),
2228 project_id: db_follower.project_id.to_proto(),
2229 });
2230 }
2231
2232 Ok((
2233 db_room.channel_id,
2234 proto::Room {
2235 id: db_room.id.to_proto(),
2236 live_kit_room: db_room.live_kit_room,
2237 participants: participants.into_values().collect(),
2238 pending_participants,
2239 followers,
2240 },
2241 ))
2242 }
2243
2244 // projects
2245
2246 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
2247 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2248 enum QueryAs {
2249 Count,
2250 }
2251
2252 self.transaction(|tx| async move {
2253 Ok(project::Entity::find()
2254 .select_only()
2255 .column_as(project::Column::Id.count(), QueryAs::Count)
2256 .inner_join(user::Entity)
2257 .filter(user::Column::Admin.eq(false))
2258 .into_values::<_, QueryAs>()
2259 .one(&*tx)
2260 .await?
2261 .unwrap_or(0i64) as usize)
2262 })
2263 .await
2264 }
2265
2266 pub async fn share_project(
2267 &self,
2268 room_id: RoomId,
2269 connection: ConnectionId,
2270 worktrees: &[proto::WorktreeMetadata],
2271 ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
2272 self.room_transaction(room_id, |tx| async move {
2273 let participant = room_participant::Entity::find()
2274 .filter(
2275 Condition::all()
2276 .add(
2277 room_participant::Column::AnsweringConnectionId
2278 .eq(connection.id as i32),
2279 )
2280 .add(
2281 room_participant::Column::AnsweringConnectionServerId
2282 .eq(connection.owner_id as i32),
2283 ),
2284 )
2285 .one(&*tx)
2286 .await?
2287 .ok_or_else(|| anyhow!("could not find participant"))?;
2288 if participant.room_id != room_id {
2289 return Err(anyhow!("shared project on unexpected room"))?;
2290 }
2291
2292 let project = project::ActiveModel {
2293 room_id: ActiveValue::set(participant.room_id),
2294 host_user_id: ActiveValue::set(participant.user_id),
2295 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
2296 host_connection_server_id: ActiveValue::set(Some(ServerId(
2297 connection.owner_id as i32,
2298 ))),
2299 ..Default::default()
2300 }
2301 .insert(&*tx)
2302 .await?;
2303
2304 if !worktrees.is_empty() {
2305 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2306 worktree::ActiveModel {
2307 id: ActiveValue::set(worktree.id as i64),
2308 project_id: ActiveValue::set(project.id),
2309 abs_path: ActiveValue::set(worktree.abs_path.clone()),
2310 root_name: ActiveValue::set(worktree.root_name.clone()),
2311 visible: ActiveValue::set(worktree.visible),
2312 scan_id: ActiveValue::set(0),
2313 completed_scan_id: ActiveValue::set(0),
2314 }
2315 }))
2316 .exec(&*tx)
2317 .await?;
2318 }
2319
2320 project_collaborator::ActiveModel {
2321 project_id: ActiveValue::set(project.id),
2322 connection_id: ActiveValue::set(connection.id as i32),
2323 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2324 user_id: ActiveValue::set(participant.user_id),
2325 replica_id: ActiveValue::set(ReplicaId(0)),
2326 is_host: ActiveValue::set(true),
2327 ..Default::default()
2328 }
2329 .insert(&*tx)
2330 .await?;
2331
2332 let room = self.get_room(room_id, &tx).await?;
2333 Ok((project.id, room))
2334 })
2335 .await
2336 }
2337
2338 pub async fn unshare_project(
2339 &self,
2340 project_id: ProjectId,
2341 connection: ConnectionId,
2342 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2343 let room_id = self.room_id_for_project(project_id).await?;
2344 self.room_transaction(room_id, |tx| async move {
2345 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2346
2347 let project = project::Entity::find_by_id(project_id)
2348 .one(&*tx)
2349 .await?
2350 .ok_or_else(|| anyhow!("project not found"))?;
2351 if project.host_connection()? == connection {
2352 project::Entity::delete(project.into_active_model())
2353 .exec(&*tx)
2354 .await?;
2355 let room = self.get_room(room_id, &tx).await?;
2356 Ok((room, guest_connection_ids))
2357 } else {
2358 Err(anyhow!("cannot unshare a project hosted by another user"))?
2359 }
2360 })
2361 .await
2362 }
2363
2364 pub async fn update_project(
2365 &self,
2366 project_id: ProjectId,
2367 connection: ConnectionId,
2368 worktrees: &[proto::WorktreeMetadata],
2369 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2370 let room_id = self.room_id_for_project(project_id).await?;
2371 self.room_transaction(room_id, |tx| async move {
2372 let project = project::Entity::find_by_id(project_id)
2373 .filter(
2374 Condition::all()
2375 .add(project::Column::HostConnectionId.eq(connection.id as i32))
2376 .add(
2377 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2378 ),
2379 )
2380 .one(&*tx)
2381 .await?
2382 .ok_or_else(|| anyhow!("no such project"))?;
2383
2384 self.update_project_worktrees(project.id, worktrees, &tx)
2385 .await?;
2386
2387 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2388 let room = self.get_room(project.room_id, &tx).await?;
2389 Ok((room, guest_connection_ids))
2390 })
2391 .await
2392 }
2393
2394 async fn update_project_worktrees(
2395 &self,
2396 project_id: ProjectId,
2397 worktrees: &[proto::WorktreeMetadata],
2398 tx: &DatabaseTransaction,
2399 ) -> Result<()> {
2400 if !worktrees.is_empty() {
2401 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2402 id: ActiveValue::set(worktree.id as i64),
2403 project_id: ActiveValue::set(project_id),
2404 abs_path: ActiveValue::set(worktree.abs_path.clone()),
2405 root_name: ActiveValue::set(worktree.root_name.clone()),
2406 visible: ActiveValue::set(worktree.visible),
2407 scan_id: ActiveValue::set(0),
2408 completed_scan_id: ActiveValue::set(0),
2409 }))
2410 .on_conflict(
2411 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2412 .update_column(worktree::Column::RootName)
2413 .to_owned(),
2414 )
2415 .exec(&*tx)
2416 .await?;
2417 }
2418
2419 worktree::Entity::delete_many()
2420 .filter(worktree::Column::ProjectId.eq(project_id).and(
2421 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2422 ))
2423 .exec(&*tx)
2424 .await?;
2425
2426 Ok(())
2427 }
2428
2429 pub async fn update_worktree(
2430 &self,
2431 update: &proto::UpdateWorktree,
2432 connection: ConnectionId,
2433 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2434 let project_id = ProjectId::from_proto(update.project_id);
2435 let worktree_id = update.worktree_id as i64;
2436 let room_id = self.room_id_for_project(project_id).await?;
2437 self.room_transaction(room_id, |tx| async move {
2438 // Ensure the update comes from the host.
2439 let _project = project::Entity::find_by_id(project_id)
2440 .filter(
2441 Condition::all()
2442 .add(project::Column::HostConnectionId.eq(connection.id as i32))
2443 .add(
2444 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2445 ),
2446 )
2447 .one(&*tx)
2448 .await?
2449 .ok_or_else(|| anyhow!("no such project"))?;
2450
2451 // Update metadata.
2452 worktree::Entity::update(worktree::ActiveModel {
2453 id: ActiveValue::set(worktree_id),
2454 project_id: ActiveValue::set(project_id),
2455 root_name: ActiveValue::set(update.root_name.clone()),
2456 scan_id: ActiveValue::set(update.scan_id as i64),
2457 completed_scan_id: if update.is_last_update {
2458 ActiveValue::set(update.scan_id as i64)
2459 } else {
2460 ActiveValue::default()
2461 },
2462 abs_path: ActiveValue::set(update.abs_path.clone()),
2463 ..Default::default()
2464 })
2465 .exec(&*tx)
2466 .await?;
2467
2468 if !update.updated_entries.is_empty() {
2469 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2470 let mtime = entry.mtime.clone().unwrap_or_default();
2471 worktree_entry::ActiveModel {
2472 project_id: ActiveValue::set(project_id),
2473 worktree_id: ActiveValue::set(worktree_id),
2474 id: ActiveValue::set(entry.id as i64),
2475 is_dir: ActiveValue::set(entry.is_dir),
2476 path: ActiveValue::set(entry.path.clone()),
2477 inode: ActiveValue::set(entry.inode as i64),
2478 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2479 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2480 is_symlink: ActiveValue::set(entry.is_symlink),
2481 is_ignored: ActiveValue::set(entry.is_ignored),
2482 is_external: ActiveValue::set(entry.is_external),
2483 git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
2484 is_deleted: ActiveValue::set(false),
2485 scan_id: ActiveValue::set(update.scan_id as i64),
2486 }
2487 }))
2488 .on_conflict(
2489 OnConflict::columns([
2490 worktree_entry::Column::ProjectId,
2491 worktree_entry::Column::WorktreeId,
2492 worktree_entry::Column::Id,
2493 ])
2494 .update_columns([
2495 worktree_entry::Column::IsDir,
2496 worktree_entry::Column::Path,
2497 worktree_entry::Column::Inode,
2498 worktree_entry::Column::MtimeSeconds,
2499 worktree_entry::Column::MtimeNanos,
2500 worktree_entry::Column::IsSymlink,
2501 worktree_entry::Column::IsIgnored,
2502 worktree_entry::Column::GitStatus,
2503 worktree_entry::Column::ScanId,
2504 ])
2505 .to_owned(),
2506 )
2507 .exec(&*tx)
2508 .await?;
2509 }
2510
2511 if !update.removed_entries.is_empty() {
2512 worktree_entry::Entity::update_many()
2513 .filter(
2514 worktree_entry::Column::ProjectId
2515 .eq(project_id)
2516 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2517 .and(
2518 worktree_entry::Column::Id
2519 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2520 ),
2521 )
2522 .set(worktree_entry::ActiveModel {
2523 is_deleted: ActiveValue::Set(true),
2524 scan_id: ActiveValue::Set(update.scan_id as i64),
2525 ..Default::default()
2526 })
2527 .exec(&*tx)
2528 .await?;
2529 }
2530
2531 if !update.updated_repositories.is_empty() {
2532 worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
2533 |repository| worktree_repository::ActiveModel {
2534 project_id: ActiveValue::set(project_id),
2535 worktree_id: ActiveValue::set(worktree_id),
2536 work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
2537 scan_id: ActiveValue::set(update.scan_id as i64),
2538 branch: ActiveValue::set(repository.branch.clone()),
2539 is_deleted: ActiveValue::set(false),
2540 },
2541 ))
2542 .on_conflict(
2543 OnConflict::columns([
2544 worktree_repository::Column::ProjectId,
2545 worktree_repository::Column::WorktreeId,
2546 worktree_repository::Column::WorkDirectoryId,
2547 ])
2548 .update_columns([
2549 worktree_repository::Column::ScanId,
2550 worktree_repository::Column::Branch,
2551 ])
2552 .to_owned(),
2553 )
2554 .exec(&*tx)
2555 .await?;
2556 }
2557
2558 if !update.removed_repositories.is_empty() {
2559 worktree_repository::Entity::update_many()
2560 .filter(
2561 worktree_repository::Column::ProjectId
2562 .eq(project_id)
2563 .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
2564 .and(
2565 worktree_repository::Column::WorkDirectoryId
2566 .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
2567 ),
2568 )
2569 .set(worktree_repository::ActiveModel {
2570 is_deleted: ActiveValue::Set(true),
2571 scan_id: ActiveValue::Set(update.scan_id as i64),
2572 ..Default::default()
2573 })
2574 .exec(&*tx)
2575 .await?;
2576 }
2577
2578 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2579 Ok(connection_ids)
2580 })
2581 .await
2582 }
2583
2584 pub async fn update_diagnostic_summary(
2585 &self,
2586 update: &proto::UpdateDiagnosticSummary,
2587 connection: ConnectionId,
2588 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2589 let project_id = ProjectId::from_proto(update.project_id);
2590 let worktree_id = update.worktree_id as i64;
2591 let room_id = self.room_id_for_project(project_id).await?;
2592 self.room_transaction(room_id, |tx| async move {
2593 let summary = update
2594 .summary
2595 .as_ref()
2596 .ok_or_else(|| anyhow!("invalid summary"))?;
2597
2598 // Ensure the update comes from the host.
2599 let project = project::Entity::find_by_id(project_id)
2600 .one(&*tx)
2601 .await?
2602 .ok_or_else(|| anyhow!("no such project"))?;
2603 if project.host_connection()? != connection {
2604 return Err(anyhow!("can't update a project hosted by someone else"))?;
2605 }
2606
2607 // Update summary.
2608 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2609 project_id: ActiveValue::set(project_id),
2610 worktree_id: ActiveValue::set(worktree_id),
2611 path: ActiveValue::set(summary.path.clone()),
2612 language_server_id: ActiveValue::set(summary.language_server_id as i64),
2613 error_count: ActiveValue::set(summary.error_count as i32),
2614 warning_count: ActiveValue::set(summary.warning_count as i32),
2615 ..Default::default()
2616 })
2617 .on_conflict(
2618 OnConflict::columns([
2619 worktree_diagnostic_summary::Column::ProjectId,
2620 worktree_diagnostic_summary::Column::WorktreeId,
2621 worktree_diagnostic_summary::Column::Path,
2622 ])
2623 .update_columns([
2624 worktree_diagnostic_summary::Column::LanguageServerId,
2625 worktree_diagnostic_summary::Column::ErrorCount,
2626 worktree_diagnostic_summary::Column::WarningCount,
2627 ])
2628 .to_owned(),
2629 )
2630 .exec(&*tx)
2631 .await?;
2632
2633 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2634 Ok(connection_ids)
2635 })
2636 .await
2637 }
2638
2639 pub async fn start_language_server(
2640 &self,
2641 update: &proto::StartLanguageServer,
2642 connection: ConnectionId,
2643 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2644 let project_id = ProjectId::from_proto(update.project_id);
2645 let room_id = self.room_id_for_project(project_id).await?;
2646 self.room_transaction(room_id, |tx| async move {
2647 let server = update
2648 .server
2649 .as_ref()
2650 .ok_or_else(|| anyhow!("invalid language server"))?;
2651
2652 // Ensure the update comes from the host.
2653 let project = project::Entity::find_by_id(project_id)
2654 .one(&*tx)
2655 .await?
2656 .ok_or_else(|| anyhow!("no such project"))?;
2657 if project.host_connection()? != connection {
2658 return Err(anyhow!("can't update a project hosted by someone else"))?;
2659 }
2660
2661 // Add the newly-started language server.
2662 language_server::Entity::insert(language_server::ActiveModel {
2663 project_id: ActiveValue::set(project_id),
2664 id: ActiveValue::set(server.id as i64),
2665 name: ActiveValue::set(server.name.clone()),
2666 ..Default::default()
2667 })
2668 .on_conflict(
2669 OnConflict::columns([
2670 language_server::Column::ProjectId,
2671 language_server::Column::Id,
2672 ])
2673 .update_column(language_server::Column::Name)
2674 .to_owned(),
2675 )
2676 .exec(&*tx)
2677 .await?;
2678
2679 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2680 Ok(connection_ids)
2681 })
2682 .await
2683 }
2684
2685 pub async fn update_worktree_settings(
2686 &self,
2687 update: &proto::UpdateWorktreeSettings,
2688 connection: ConnectionId,
2689 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2690 let project_id = ProjectId::from_proto(update.project_id);
2691 let room_id = self.room_id_for_project(project_id).await?;
2692 self.room_transaction(room_id, |tx| async move {
2693 // Ensure the update comes from the host.
2694 let project = project::Entity::find_by_id(project_id)
2695 .one(&*tx)
2696 .await?
2697 .ok_or_else(|| anyhow!("no such project"))?;
2698 if project.host_connection()? != connection {
2699 return Err(anyhow!("can't update a project hosted by someone else"))?;
2700 }
2701
2702 if let Some(content) = &update.content {
2703 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
2704 project_id: ActiveValue::Set(project_id),
2705 worktree_id: ActiveValue::Set(update.worktree_id as i64),
2706 path: ActiveValue::Set(update.path.clone()),
2707 content: ActiveValue::Set(content.clone()),
2708 })
2709 .on_conflict(
2710 OnConflict::columns([
2711 worktree_settings_file::Column::ProjectId,
2712 worktree_settings_file::Column::WorktreeId,
2713 worktree_settings_file::Column::Path,
2714 ])
2715 .update_column(worktree_settings_file::Column::Content)
2716 .to_owned(),
2717 )
2718 .exec(&*tx)
2719 .await?;
2720 } else {
2721 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
2722 project_id: ActiveValue::Set(project_id),
2723 worktree_id: ActiveValue::Set(update.worktree_id as i64),
2724 path: ActiveValue::Set(update.path.clone()),
2725 ..Default::default()
2726 })
2727 .exec(&*tx)
2728 .await?;
2729 }
2730
2731 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2732 Ok(connection_ids)
2733 })
2734 .await
2735 }
2736
2737 pub async fn join_project(
2738 &self,
2739 project_id: ProjectId,
2740 connection: ConnectionId,
2741 ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2742 let room_id = self.room_id_for_project(project_id).await?;
2743 self.room_transaction(room_id, |tx| async move {
2744 let participant = room_participant::Entity::find()
2745 .filter(
2746 Condition::all()
2747 .add(
2748 room_participant::Column::AnsweringConnectionId
2749 .eq(connection.id as i32),
2750 )
2751 .add(
2752 room_participant::Column::AnsweringConnectionServerId
2753 .eq(connection.owner_id as i32),
2754 ),
2755 )
2756 .one(&*tx)
2757 .await?
2758 .ok_or_else(|| anyhow!("must join a room first"))?;
2759
2760 let project = project::Entity::find_by_id(project_id)
2761 .one(&*tx)
2762 .await?
2763 .ok_or_else(|| anyhow!("no such project"))?;
2764 if project.room_id != participant.room_id {
2765 return Err(anyhow!("no such project"))?;
2766 }
2767
2768 let mut collaborators = project
2769 .find_related(project_collaborator::Entity)
2770 .all(&*tx)
2771 .await?;
2772 let replica_ids = collaborators
2773 .iter()
2774 .map(|c| c.replica_id)
2775 .collect::<HashSet<_>>();
2776 let mut replica_id = ReplicaId(1);
2777 while replica_ids.contains(&replica_id) {
2778 replica_id.0 += 1;
2779 }
2780 let new_collaborator = project_collaborator::ActiveModel {
2781 project_id: ActiveValue::set(project_id),
2782 connection_id: ActiveValue::set(connection.id as i32),
2783 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2784 user_id: ActiveValue::set(participant.user_id),
2785 replica_id: ActiveValue::set(replica_id),
2786 is_host: ActiveValue::set(false),
2787 ..Default::default()
2788 }
2789 .insert(&*tx)
2790 .await?;
2791 collaborators.push(new_collaborator);
2792
2793 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2794 let mut worktrees = db_worktrees
2795 .into_iter()
2796 .map(|db_worktree| {
2797 (
2798 db_worktree.id as u64,
2799 Worktree {
2800 id: db_worktree.id as u64,
2801 abs_path: db_worktree.abs_path,
2802 root_name: db_worktree.root_name,
2803 visible: db_worktree.visible,
2804 entries: Default::default(),
2805 repository_entries: Default::default(),
2806 diagnostic_summaries: Default::default(),
2807 settings_files: Default::default(),
2808 scan_id: db_worktree.scan_id as u64,
2809 completed_scan_id: db_worktree.completed_scan_id as u64,
2810 },
2811 )
2812 })
2813 .collect::<BTreeMap<_, _>>();
2814
2815 // Populate worktree entries.
2816 {
2817 let mut db_entries = worktree_entry::Entity::find()
2818 .filter(
2819 Condition::all()
2820 .add(worktree_entry::Column::ProjectId.eq(project_id))
2821 .add(worktree_entry::Column::IsDeleted.eq(false)),
2822 )
2823 .stream(&*tx)
2824 .await?;
2825 while let Some(db_entry) = db_entries.next().await {
2826 let db_entry = db_entry?;
2827 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2828 worktree.entries.push(proto::Entry {
2829 id: db_entry.id as u64,
2830 is_dir: db_entry.is_dir,
2831 path: db_entry.path,
2832 inode: db_entry.inode as u64,
2833 mtime: Some(proto::Timestamp {
2834 seconds: db_entry.mtime_seconds as u64,
2835 nanos: db_entry.mtime_nanos as u32,
2836 }),
2837 is_symlink: db_entry.is_symlink,
2838 is_ignored: db_entry.is_ignored,
2839 is_external: db_entry.is_external,
2840 git_status: db_entry.git_status.map(|status| status as i32),
2841 });
2842 }
2843 }
2844 }
2845
2846 // Populate repository entries.
2847 {
2848 let mut db_repository_entries = worktree_repository::Entity::find()
2849 .filter(
2850 Condition::all()
2851 .add(worktree_repository::Column::ProjectId.eq(project_id))
2852 .add(worktree_repository::Column::IsDeleted.eq(false)),
2853 )
2854 .stream(&*tx)
2855 .await?;
2856 while let Some(db_repository_entry) = db_repository_entries.next().await {
2857 let db_repository_entry = db_repository_entry?;
2858 if let Some(worktree) =
2859 worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
2860 {
2861 worktree.repository_entries.insert(
2862 db_repository_entry.work_directory_id as u64,
2863 proto::RepositoryEntry {
2864 work_directory_id: db_repository_entry.work_directory_id as u64,
2865 branch: db_repository_entry.branch,
2866 },
2867 );
2868 }
2869 }
2870 }
2871
2872 // Populate worktree diagnostic summaries.
2873 {
2874 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2875 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2876 .stream(&*tx)
2877 .await?;
2878 while let Some(db_summary) = db_summaries.next().await {
2879 let db_summary = db_summary?;
2880 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2881 worktree
2882 .diagnostic_summaries
2883 .push(proto::DiagnosticSummary {
2884 path: db_summary.path,
2885 language_server_id: db_summary.language_server_id as u64,
2886 error_count: db_summary.error_count as u32,
2887 warning_count: db_summary.warning_count as u32,
2888 });
2889 }
2890 }
2891 }
2892
2893 // Populate worktree settings files
2894 {
2895 let mut db_settings_files = worktree_settings_file::Entity::find()
2896 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
2897 .stream(&*tx)
2898 .await?;
2899 while let Some(db_settings_file) = db_settings_files.next().await {
2900 let db_settings_file = db_settings_file?;
2901 if let Some(worktree) =
2902 worktrees.get_mut(&(db_settings_file.worktree_id as u64))
2903 {
2904 worktree.settings_files.push(WorktreeSettingsFile {
2905 path: db_settings_file.path,
2906 content: db_settings_file.content,
2907 });
2908 }
2909 }
2910 }
2911
2912 // Populate language servers.
2913 let language_servers = project
2914 .find_related(language_server::Entity)
2915 .all(&*tx)
2916 .await?;
2917
2918 let project = Project {
2919 collaborators: collaborators
2920 .into_iter()
2921 .map(|collaborator| ProjectCollaborator {
2922 connection_id: collaborator.connection(),
2923 user_id: collaborator.user_id,
2924 replica_id: collaborator.replica_id,
2925 is_host: collaborator.is_host,
2926 })
2927 .collect(),
2928 worktrees,
2929 language_servers: language_servers
2930 .into_iter()
2931 .map(|language_server| proto::LanguageServer {
2932 id: language_server.id as u64,
2933 name: language_server.name,
2934 })
2935 .collect(),
2936 };
2937 Ok((project, replica_id as ReplicaId))
2938 })
2939 .await
2940 }
2941
2942 pub async fn leave_project(
2943 &self,
2944 project_id: ProjectId,
2945 connection: ConnectionId,
2946 ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
2947 let room_id = self.room_id_for_project(project_id).await?;
2948 self.room_transaction(room_id, |tx| async move {
2949 let result = project_collaborator::Entity::delete_many()
2950 .filter(
2951 Condition::all()
2952 .add(project_collaborator::Column::ProjectId.eq(project_id))
2953 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2954 .add(
2955 project_collaborator::Column::ConnectionServerId
2956 .eq(connection.owner_id as i32),
2957 ),
2958 )
2959 .exec(&*tx)
2960 .await?;
2961 if result.rows_affected == 0 {
2962 Err(anyhow!("not a collaborator on this project"))?;
2963 }
2964
2965 let project = project::Entity::find_by_id(project_id)
2966 .one(&*tx)
2967 .await?
2968 .ok_or_else(|| anyhow!("no such project"))?;
2969 let collaborators = project
2970 .find_related(project_collaborator::Entity)
2971 .all(&*tx)
2972 .await?;
2973 let connection_ids = collaborators
2974 .into_iter()
2975 .map(|collaborator| collaborator.connection())
2976 .collect();
2977
2978 follower::Entity::delete_many()
2979 .filter(
2980 Condition::any()
2981 .add(
2982 Condition::all()
2983 .add(follower::Column::ProjectId.eq(project_id))
2984 .add(
2985 follower::Column::LeaderConnectionServerId
2986 .eq(connection.owner_id),
2987 )
2988 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
2989 )
2990 .add(
2991 Condition::all()
2992 .add(follower::Column::ProjectId.eq(project_id))
2993 .add(
2994 follower::Column::FollowerConnectionServerId
2995 .eq(connection.owner_id),
2996 )
2997 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
2998 ),
2999 )
3000 .exec(&*tx)
3001 .await?;
3002
3003 let room = self.get_room(project.room_id, &tx).await?;
3004 let left_project = LeftProject {
3005 id: project_id,
3006 host_user_id: project.host_user_id,
3007 host_connection_id: project.host_connection()?,
3008 connection_ids,
3009 };
3010 Ok((room, left_project))
3011 })
3012 .await
3013 }
3014
3015 pub async fn project_collaborators(
3016 &self,
3017 project_id: ProjectId,
3018 connection_id: ConnectionId,
3019 ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
3020 let room_id = self.room_id_for_project(project_id).await?;
3021 self.room_transaction(room_id, |tx| async move {
3022 let collaborators = project_collaborator::Entity::find()
3023 .filter(project_collaborator::Column::ProjectId.eq(project_id))
3024 .all(&*tx)
3025 .await?
3026 .into_iter()
3027 .map(|collaborator| ProjectCollaborator {
3028 connection_id: collaborator.connection(),
3029 user_id: collaborator.user_id,
3030 replica_id: collaborator.replica_id,
3031 is_host: collaborator.is_host,
3032 })
3033 .collect::<Vec<_>>();
3034
3035 if collaborators
3036 .iter()
3037 .any(|collaborator| collaborator.connection_id == connection_id)
3038 {
3039 Ok(collaborators)
3040 } else {
3041 Err(anyhow!("no such project"))?
3042 }
3043 })
3044 .await
3045 }
3046
3047 pub async fn project_connection_ids(
3048 &self,
3049 project_id: ProjectId,
3050 connection_id: ConnectionId,
3051 ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
3052 let room_id = self.room_id_for_project(project_id).await?;
3053 self.room_transaction(room_id, |tx| async move {
3054 let mut collaborators = project_collaborator::Entity::find()
3055 .filter(project_collaborator::Column::ProjectId.eq(project_id))
3056 .stream(&*tx)
3057 .await?;
3058
3059 let mut connection_ids = HashSet::default();
3060 while let Some(collaborator) = collaborators.next().await {
3061 let collaborator = collaborator?;
3062 connection_ids.insert(collaborator.connection());
3063 }
3064
3065 if connection_ids.contains(&connection_id) {
3066 Ok(connection_ids)
3067 } else {
3068 Err(anyhow!("no such project"))?
3069 }
3070 })
3071 .await
3072 }
3073
3074 async fn project_guest_connection_ids(
3075 &self,
3076 project_id: ProjectId,
3077 tx: &DatabaseTransaction,
3078 ) -> Result<Vec<ConnectionId>> {
3079 let mut collaborators = project_collaborator::Entity::find()
3080 .filter(
3081 project_collaborator::Column::ProjectId
3082 .eq(project_id)
3083 .and(project_collaborator::Column::IsHost.eq(false)),
3084 )
3085 .stream(tx)
3086 .await?;
3087
3088 let mut guest_connection_ids = Vec::new();
3089 while let Some(collaborator) = collaborators.next().await {
3090 let collaborator = collaborator?;
3091 guest_connection_ids.push(collaborator.connection());
3092 }
3093 Ok(guest_connection_ids)
3094 }
3095
3096 async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
3097 self.transaction(|tx| async move {
3098 let project = project::Entity::find_by_id(project_id)
3099 .one(&*tx)
3100 .await?
3101 .ok_or_else(|| anyhow!("project {} not found", project_id))?;
3102 Ok(project.room_id)
3103 })
3104 .await
3105 }
3106
3107 // access tokens
3108
3109 pub async fn create_access_token(
3110 &self,
3111 user_id: UserId,
3112 access_token_hash: &str,
3113 max_access_token_count: usize,
3114 ) -> Result<AccessTokenId> {
3115 self.transaction(|tx| async {
3116 let tx = tx;
3117
3118 let token = access_token::ActiveModel {
3119 user_id: ActiveValue::set(user_id),
3120 hash: ActiveValue::set(access_token_hash.into()),
3121 ..Default::default()
3122 }
3123 .insert(&*tx)
3124 .await?;
3125
3126 access_token::Entity::delete_many()
3127 .filter(
3128 access_token::Column::Id.in_subquery(
3129 Query::select()
3130 .column(access_token::Column::Id)
3131 .from(access_token::Entity)
3132 .and_where(access_token::Column::UserId.eq(user_id))
3133 .order_by(access_token::Column::Id, sea_orm::Order::Desc)
3134 .limit(10000)
3135 .offset(max_access_token_count as u64)
3136 .to_owned(),
3137 ),
3138 )
3139 .exec(&*tx)
3140 .await?;
3141 Ok(token.id)
3142 })
3143 .await
3144 }
3145
3146 pub async fn get_access_token(
3147 &self,
3148 access_token_id: AccessTokenId,
3149 ) -> Result<access_token::Model> {
3150 self.transaction(|tx| async move {
3151 Ok(access_token::Entity::find_by_id(access_token_id)
3152 .one(&*tx)
3153 .await?
3154 .ok_or_else(|| anyhow!("no such access token"))?)
3155 })
3156 .await
3157 }
3158
3159 // channels
3160
3161 pub async fn create_root_channel(
3162 &self,
3163 name: &str,
3164 live_kit_room: &str,
3165 creator_id: UserId,
3166 ) -> Result<ChannelId> {
3167 self.create_channel(name, None, live_kit_room, creator_id)
3168 .await
3169 }
3170
3171 pub async fn create_channel(
3172 &self,
3173 name: &str,
3174 parent: Option<ChannelId>,
3175 live_kit_room: &str,
3176 creator_id: UserId,
3177 ) -> Result<ChannelId> {
3178 let name = Self::sanitize_channel_name(name)?;
3179 self.transaction(move |tx| async move {
3180 if let Some(parent) = parent {
3181 self.check_user_is_channel_admin(parent, creator_id, &*tx)
3182 .await?;
3183 }
3184
3185 let channel = channel::ActiveModel {
3186 name: ActiveValue::Set(name.to_string()),
3187 ..Default::default()
3188 }
3189 .insert(&*tx)
3190 .await?;
3191
3192 let channel_paths_stmt;
3193 if let Some(parent) = parent {
3194 let sql = r#"
3195 INSERT INTO channel_paths
3196 (id_path, channel_id)
3197 SELECT
3198 id_path || $1 || '/', $2
3199 FROM
3200 channel_paths
3201 WHERE
3202 channel_id = $3
3203 "#;
3204 channel_paths_stmt = Statement::from_sql_and_values(
3205 self.pool.get_database_backend(),
3206 sql,
3207 [
3208 channel.id.to_proto().into(),
3209 channel.id.to_proto().into(),
3210 parent.to_proto().into(),
3211 ],
3212 );
3213 tx.execute(channel_paths_stmt).await?;
3214 } else {
3215 channel_path::Entity::insert(channel_path::ActiveModel {
3216 channel_id: ActiveValue::Set(channel.id),
3217 id_path: ActiveValue::Set(format!("/{}/", channel.id)),
3218 })
3219 .exec(&*tx)
3220 .await?;
3221 }
3222
3223 channel_member::ActiveModel {
3224 channel_id: ActiveValue::Set(channel.id),
3225 user_id: ActiveValue::Set(creator_id),
3226 accepted: ActiveValue::Set(true),
3227 admin: ActiveValue::Set(true),
3228 ..Default::default()
3229 }
3230 .insert(&*tx)
3231 .await?;
3232
3233 room::ActiveModel {
3234 channel_id: ActiveValue::Set(Some(channel.id)),
3235 live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
3236 ..Default::default()
3237 }
3238 .insert(&*tx)
3239 .await?;
3240
3241 Ok(channel.id)
3242 })
3243 .await
3244 }
3245
3246 pub async fn remove_channel(
3247 &self,
3248 channel_id: ChannelId,
3249 user_id: UserId,
3250 ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
3251 self.transaction(move |tx| async move {
3252 self.check_user_is_channel_admin(channel_id, user_id, &*tx)
3253 .await?;
3254
3255 // Don't remove descendant channels that have additional parents.
3256 let mut channels_to_remove = self.get_channel_descendants([channel_id], &*tx).await?;
3257 {
3258 let mut channels_to_keep = channel_path::Entity::find()
3259 .filter(
3260 channel_path::Column::ChannelId
3261 .is_in(
3262 channels_to_remove
3263 .keys()
3264 .copied()
3265 .filter(|&id| id != channel_id),
3266 )
3267 .and(
3268 channel_path::Column::IdPath
3269 .not_like(&format!("%/{}/%", channel_id)),
3270 ),
3271 )
3272 .stream(&*tx)
3273 .await?;
3274 while let Some(row) = channels_to_keep.next().await {
3275 let row = row?;
3276 channels_to_remove.remove(&row.channel_id);
3277 }
3278 }
3279
3280 let channel_ancestors = self.get_channel_ancestors(channel_id, &*tx).await?;
3281 let members_to_notify: Vec<UserId> = channel_member::Entity::find()
3282 .filter(channel_member::Column::ChannelId.is_in(channel_ancestors))
3283 .select_only()
3284 .column(channel_member::Column::UserId)
3285 .distinct()
3286 .into_values::<_, QueryUserIds>()
3287 .all(&*tx)
3288 .await?;
3289
3290 channel::Entity::delete_many()
3291 .filter(channel::Column::Id.is_in(channels_to_remove.keys().copied()))
3292 .exec(&*tx)
3293 .await?;
3294
3295 Ok((channels_to_remove.into_keys().collect(), members_to_notify))
3296 })
3297 .await
3298 }
3299
3300 pub async fn invite_channel_member(
3301 &self,
3302 channel_id: ChannelId,
3303 invitee_id: UserId,
3304 inviter_id: UserId,
3305 is_admin: bool,
3306 ) -> Result<()> {
3307 self.transaction(move |tx| async move {
3308 self.check_user_is_channel_admin(channel_id, inviter_id, &*tx)
3309 .await?;
3310
3311 channel_member::ActiveModel {
3312 channel_id: ActiveValue::Set(channel_id),
3313 user_id: ActiveValue::Set(invitee_id),
3314 accepted: ActiveValue::Set(false),
3315 admin: ActiveValue::Set(is_admin),
3316 ..Default::default()
3317 }
3318 .insert(&*tx)
3319 .await?;
3320
3321 Ok(())
3322 })
3323 .await
3324 }
3325
3326 fn sanitize_channel_name(name: &str) -> Result<&str> {
3327 let new_name = name.trim().trim_start_matches('#');
3328 if new_name == "" {
3329 Err(anyhow!("channel name can't be blank"))?;
3330 }
3331 Ok(new_name)
3332 }
3333
3334 pub async fn rename_channel(
3335 &self,
3336 channel_id: ChannelId,
3337 user_id: UserId,
3338 new_name: &str,
3339 ) -> Result<String> {
3340 self.transaction(move |tx| async move {
3341 let new_name = Self::sanitize_channel_name(new_name)?.to_string();
3342
3343 self.check_user_is_channel_admin(channel_id, user_id, &*tx)
3344 .await?;
3345
3346 channel::ActiveModel {
3347 id: ActiveValue::Unchanged(channel_id),
3348 name: ActiveValue::Set(new_name.clone()),
3349 ..Default::default()
3350 }
3351 .update(&*tx)
3352 .await?;
3353
3354 Ok(new_name)
3355 })
3356 .await
3357 }
3358
3359 pub async fn respond_to_channel_invite(
3360 &self,
3361 channel_id: ChannelId,
3362 user_id: UserId,
3363 accept: bool,
3364 ) -> Result<()> {
3365 self.transaction(move |tx| async move {
3366 let rows_affected = if accept {
3367 channel_member::Entity::update_many()
3368 .set(channel_member::ActiveModel {
3369 accepted: ActiveValue::Set(accept),
3370 ..Default::default()
3371 })
3372 .filter(
3373 channel_member::Column::ChannelId
3374 .eq(channel_id)
3375 .and(channel_member::Column::UserId.eq(user_id))
3376 .and(channel_member::Column::Accepted.eq(false)),
3377 )
3378 .exec(&*tx)
3379 .await?
3380 .rows_affected
3381 } else {
3382 channel_member::ActiveModel {
3383 channel_id: ActiveValue::Unchanged(channel_id),
3384 user_id: ActiveValue::Unchanged(user_id),
3385 ..Default::default()
3386 }
3387 .delete(&*tx)
3388 .await?
3389 .rows_affected
3390 };
3391
3392 if rows_affected == 0 {
3393 Err(anyhow!("no such invitation"))?;
3394 }
3395
3396 Ok(())
3397 })
3398 .await
3399 }
3400
3401 pub async fn remove_channel_member(
3402 &self,
3403 channel_id: ChannelId,
3404 member_id: UserId,
3405 remover_id: UserId,
3406 ) -> Result<()> {
3407 self.transaction(|tx| async move {
3408 self.check_user_is_channel_admin(channel_id, remover_id, &*tx)
3409 .await?;
3410
3411 let result = channel_member::Entity::delete_many()
3412 .filter(
3413 channel_member::Column::ChannelId
3414 .eq(channel_id)
3415 .and(channel_member::Column::UserId.eq(member_id)),
3416 )
3417 .exec(&*tx)
3418 .await?;
3419
3420 if result.rows_affected == 0 {
3421 Err(anyhow!("no such member"))?;
3422 }
3423
3424 Ok(())
3425 })
3426 .await
3427 }
3428
3429 pub async fn get_channel_invites_for_user(&self, user_id: UserId) -> Result<Vec<Channel>> {
3430 self.transaction(|tx| async move {
3431 let channel_invites = channel_member::Entity::find()
3432 .filter(
3433 channel_member::Column::UserId
3434 .eq(user_id)
3435 .and(channel_member::Column::Accepted.eq(false)),
3436 )
3437 .all(&*tx)
3438 .await?;
3439
3440 let channels = channel::Entity::find()
3441 .filter(
3442 channel::Column::Id.is_in(
3443 channel_invites
3444 .into_iter()
3445 .map(|channel_member| channel_member.channel_id),
3446 ),
3447 )
3448 .all(&*tx)
3449 .await?;
3450
3451 let channels = channels
3452 .into_iter()
3453 .map(|channel| Channel {
3454 id: channel.id,
3455 name: channel.name,
3456 parent_id: None,
3457 })
3458 .collect();
3459
3460 Ok(channels)
3461 })
3462 .await
3463 }
3464
3465 pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
3466 self.transaction(|tx| async move {
3467 let tx = tx;
3468
3469 let channel_memberships = channel_member::Entity::find()
3470 .filter(
3471 channel_member::Column::UserId
3472 .eq(user_id)
3473 .and(channel_member::Column::Accepted.eq(true)),
3474 )
3475 .all(&*tx)
3476 .await?;
3477
3478 let parents_by_child_id = self
3479 .get_channel_descendants(channel_memberships.iter().map(|m| m.channel_id), &*tx)
3480 .await?;
3481
3482 let channels_with_admin_privileges = channel_memberships
3483 .iter()
3484 .filter_map(|membership| membership.admin.then_some(membership.channel_id))
3485 .collect();
3486
3487 let mut channels = Vec::with_capacity(parents_by_child_id.len());
3488 {
3489 let mut rows = channel::Entity::find()
3490 .filter(channel::Column::Id.is_in(parents_by_child_id.keys().copied()))
3491 .stream(&*tx)
3492 .await?;
3493 while let Some(row) = rows.next().await {
3494 let row = row?;
3495 channels.push(Channel {
3496 id: row.id,
3497 name: row.name,
3498 parent_id: parents_by_child_id.get(&row.id).copied().flatten(),
3499 });
3500 }
3501 }
3502
3503 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
3504 enum QueryUserIdsAndChannelIds {
3505 ChannelId,
3506 UserId,
3507 }
3508
3509 let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
3510 {
3511 let mut rows = room_participant::Entity::find()
3512 .inner_join(room::Entity)
3513 .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
3514 .select_only()
3515 .column(room::Column::ChannelId)
3516 .column(room_participant::Column::UserId)
3517 .into_values::<_, QueryUserIdsAndChannelIds>()
3518 .stream(&*tx)
3519 .await?;
3520 while let Some(row) = rows.next().await {
3521 let row: (ChannelId, UserId) = row?;
3522 channel_participants.entry(row.0).or_default().push(row.1)
3523 }
3524 }
3525
3526 Ok(ChannelsForUser {
3527 channels,
3528 channel_participants,
3529 channels_with_admin_privileges,
3530 })
3531 })
3532 .await
3533 }
3534
3535 pub async fn get_channel_members(&self, id: ChannelId) -> Result<Vec<UserId>> {
3536 self.transaction(|tx| async move { self.get_channel_members_internal(id, &*tx).await })
3537 .await
3538 }
3539
3540 pub async fn set_channel_member_admin(
3541 &self,
3542 channel_id: ChannelId,
3543 from: UserId,
3544 for_user: UserId,
3545 admin: bool,
3546 ) -> Result<()> {
3547 self.transaction(|tx| async move {
3548 self.check_user_is_channel_admin(channel_id, from, &*tx)
3549 .await?;
3550
3551 let result = channel_member::Entity::update_many()
3552 .filter(
3553 channel_member::Column::ChannelId
3554 .eq(channel_id)
3555 .and(channel_member::Column::UserId.eq(for_user)),
3556 )
3557 .set(channel_member::ActiveModel {
3558 admin: ActiveValue::set(admin),
3559 ..Default::default()
3560 })
3561 .exec(&*tx)
3562 .await?;
3563
3564 if result.rows_affected == 0 {
3565 Err(anyhow!("no such member"))?;
3566 }
3567
3568 Ok(())
3569 })
3570 .await
3571 }
3572
3573 pub async fn get_channel_member_details(
3574 &self,
3575 channel_id: ChannelId,
3576 user_id: UserId,
3577 ) -> Result<Vec<proto::ChannelMember>> {
3578 self.transaction(|tx| async move {
3579 self.check_user_is_channel_admin(channel_id, user_id, &*tx)
3580 .await?;
3581
3582 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
3583 enum QueryMemberDetails {
3584 UserId,
3585 Admin,
3586 IsDirectMember,
3587 Accepted,
3588 }
3589
3590 let tx = tx;
3591 let ancestor_ids = self.get_channel_ancestors(channel_id, &*tx).await?;
3592 let mut stream = channel_member::Entity::find()
3593 .distinct()
3594 .filter(channel_member::Column::ChannelId.is_in(ancestor_ids.iter().copied()))
3595 .select_only()
3596 .column(channel_member::Column::UserId)
3597 .column(channel_member::Column::Admin)
3598 .column_as(
3599 channel_member::Column::ChannelId.eq(channel_id),
3600 QueryMemberDetails::IsDirectMember,
3601 )
3602 .column(channel_member::Column::Accepted)
3603 .order_by_asc(channel_member::Column::UserId)
3604 .into_values::<_, QueryMemberDetails>()
3605 .stream(&*tx)
3606 .await?;
3607
3608 let mut rows = Vec::<proto::ChannelMember>::new();
3609 while let Some(row) = stream.next().await {
3610 let (user_id, is_admin, is_direct_member, is_invite_accepted): (
3611 UserId,
3612 bool,
3613 bool,
3614 bool,
3615 ) = row?;
3616 let kind = match (is_direct_member, is_invite_accepted) {
3617 (true, true) => proto::channel_member::Kind::Member,
3618 (true, false) => proto::channel_member::Kind::Invitee,
3619 (false, true) => proto::channel_member::Kind::AncestorMember,
3620 (false, false) => continue,
3621 };
3622 let user_id = user_id.to_proto();
3623 let kind = kind.into();
3624 if let Some(last_row) = rows.last_mut() {
3625 if last_row.user_id == user_id {
3626 if is_direct_member {
3627 last_row.kind = kind;
3628 last_row.admin = is_admin;
3629 }
3630 continue;
3631 }
3632 }
3633 rows.push(proto::ChannelMember {
3634 user_id,
3635 kind,
3636 admin: is_admin,
3637 });
3638 }
3639
3640 Ok(rows)
3641 })
3642 .await
3643 }
3644
3645 pub async fn get_channel_members_internal(
3646 &self,
3647 id: ChannelId,
3648 tx: &DatabaseTransaction,
3649 ) -> Result<Vec<UserId>> {
3650 let ancestor_ids = self.get_channel_ancestors(id, tx).await?;
3651 let user_ids = channel_member::Entity::find()
3652 .distinct()
3653 .filter(
3654 channel_member::Column::ChannelId
3655 .is_in(ancestor_ids.iter().copied())
3656 .and(channel_member::Column::Accepted.eq(true)),
3657 )
3658 .select_only()
3659 .column(channel_member::Column::UserId)
3660 .into_values::<_, QueryUserIds>()
3661 .all(&*tx)
3662 .await?;
3663 Ok(user_ids)
3664 }
3665
3666 async fn check_user_is_channel_member(
3667 &self,
3668 channel_id: ChannelId,
3669 user_id: UserId,
3670 tx: &DatabaseTransaction,
3671 ) -> Result<()> {
3672 let channel_ids = self.get_channel_ancestors(channel_id, tx).await?;
3673 channel_member::Entity::find()
3674 .filter(
3675 channel_member::Column::ChannelId
3676 .is_in(channel_ids)
3677 .and(channel_member::Column::UserId.eq(user_id)),
3678 )
3679 .one(&*tx)
3680 .await?
3681 .ok_or_else(|| anyhow!("user is not a channel member or channel does not exist"))?;
3682 Ok(())
3683 }
3684
3685 async fn check_user_is_channel_admin(
3686 &self,
3687 channel_id: ChannelId,
3688 user_id: UserId,
3689 tx: &DatabaseTransaction,
3690 ) -> Result<()> {
3691 let channel_ids = self.get_channel_ancestors(channel_id, tx).await?;
3692 channel_member::Entity::find()
3693 .filter(
3694 channel_member::Column::ChannelId
3695 .is_in(channel_ids)
3696 .and(channel_member::Column::UserId.eq(user_id))
3697 .and(channel_member::Column::Admin.eq(true)),
3698 )
3699 .one(&*tx)
3700 .await?
3701 .ok_or_else(|| anyhow!("user is not a channel admin or channel does not exist"))?;
3702 Ok(())
3703 }
3704
3705 async fn get_channel_ancestors(
3706 &self,
3707 channel_id: ChannelId,
3708 tx: &DatabaseTransaction,
3709 ) -> Result<Vec<ChannelId>> {
3710 let paths = channel_path::Entity::find()
3711 .filter(channel_path::Column::ChannelId.eq(channel_id))
3712 .all(tx)
3713 .await?;
3714 let mut channel_ids = Vec::new();
3715 for path in paths {
3716 for id in path.id_path.trim_matches('/').split('/') {
3717 if let Ok(id) = id.parse() {
3718 let id = ChannelId::from_proto(id);
3719 if let Err(ix) = channel_ids.binary_search(&id) {
3720 channel_ids.insert(ix, id);
3721 }
3722 }
3723 }
3724 }
3725 Ok(channel_ids)
3726 }
3727
3728 async fn get_channel_descendants(
3729 &self,
3730 channel_ids: impl IntoIterator<Item = ChannelId>,
3731 tx: &DatabaseTransaction,
3732 ) -> Result<HashMap<ChannelId, Option<ChannelId>>> {
3733 let mut values = String::new();
3734 for id in channel_ids {
3735 if !values.is_empty() {
3736 values.push_str(", ");
3737 }
3738 write!(&mut values, "({})", id).unwrap();
3739 }
3740
3741 if values.is_empty() {
3742 return Ok(HashMap::default());
3743 }
3744
3745 let sql = format!(
3746 r#"
3747 SELECT
3748 descendant_paths.*
3749 FROM
3750 channel_paths parent_paths, channel_paths descendant_paths
3751 WHERE
3752 parent_paths.channel_id IN ({values}) AND
3753 descendant_paths.id_path LIKE (parent_paths.id_path || '%')
3754 "#
3755 );
3756
3757 let stmt = Statement::from_string(self.pool.get_database_backend(), sql);
3758
3759 let mut parents_by_child_id = HashMap::default();
3760 let mut paths = channel_path::Entity::find()
3761 .from_raw_sql(stmt)
3762 .stream(tx)
3763 .await?;
3764
3765 while let Some(path) = paths.next().await {
3766 let path = path?;
3767 let ids = path.id_path.trim_matches('/').split('/');
3768 let mut parent_id = None;
3769 for id in ids {
3770 if let Ok(id) = id.parse() {
3771 let id = ChannelId::from_proto(id);
3772 if id == path.channel_id {
3773 break;
3774 }
3775 parent_id = Some(id);
3776 }
3777 }
3778 parents_by_child_id.insert(path.channel_id, parent_id);
3779 }
3780
3781 Ok(parents_by_child_id)
3782 }
3783
3784 /// Returns the channel with the given ID and:
3785 /// - true if the user is a member
3786 /// - false if the user hasn't accepted the invitation yet
3787 pub async fn get_channel(
3788 &self,
3789 channel_id: ChannelId,
3790 user_id: UserId,
3791 ) -> Result<Option<(Channel, bool)>> {
3792 self.transaction(|tx| async move {
3793 let tx = tx;
3794
3795 let channel = channel::Entity::find_by_id(channel_id).one(&*tx).await?;
3796
3797 if let Some(channel) = channel {
3798 if self
3799 .check_user_is_channel_member(channel_id, user_id, &*tx)
3800 .await
3801 .is_err()
3802 {
3803 return Ok(None);
3804 }
3805
3806 let channel_membership = channel_member::Entity::find()
3807 .filter(
3808 channel_member::Column::ChannelId
3809 .eq(channel_id)
3810 .and(channel_member::Column::UserId.eq(user_id)),
3811 )
3812 .one(&*tx)
3813 .await?;
3814
3815 let is_accepted = channel_membership
3816 .map(|membership| membership.accepted)
3817 .unwrap_or(false);
3818
3819 Ok(Some((
3820 Channel {
3821 id: channel.id,
3822 name: channel.name,
3823 parent_id: None,
3824 },
3825 is_accepted,
3826 )))
3827 } else {
3828 Ok(None)
3829 }
3830 })
3831 .await
3832 }
3833
3834 pub async fn room_id_for_channel(&self, channel_id: ChannelId) -> Result<RoomId> {
3835 self.transaction(|tx| async move {
3836 let tx = tx;
3837 let room = channel::Model {
3838 id: channel_id,
3839 ..Default::default()
3840 }
3841 .find_related(room::Entity)
3842 .one(&*tx)
3843 .await?
3844 .ok_or_else(|| anyhow!("invalid channel"))?;
3845 Ok(room.id)
3846 })
3847 .await
3848 }
3849
3850 async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
3851 where
3852 F: Send + Fn(TransactionHandle) -> Fut,
3853 Fut: Send + Future<Output = Result<T>>,
3854 {
3855 let body = async {
3856 let mut i = 0;
3857 loop {
3858 let (tx, result) = self.with_transaction(&f).await?;
3859 match result {
3860 Ok(result) => match tx.commit().await.map_err(Into::into) {
3861 Ok(()) => return Ok(result),
3862 Err(error) => {
3863 if !self.retry_on_serialization_error(&error, i).await {
3864 return Err(error);
3865 }
3866 }
3867 },
3868 Err(error) => {
3869 tx.rollback().await?;
3870 if !self.retry_on_serialization_error(&error, i).await {
3871 return Err(error);
3872 }
3873 }
3874 }
3875 i += 1;
3876 }
3877 };
3878
3879 self.run(body).await
3880 }
3881
3882 async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
3883 where
3884 F: Send + Fn(TransactionHandle) -> Fut,
3885 Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
3886 {
3887 let body = async {
3888 let mut i = 0;
3889 loop {
3890 let (tx, result) = self.with_transaction(&f).await?;
3891 match result {
3892 Ok(Some((room_id, data))) => {
3893 let lock = self.rooms.entry(room_id).or_default().clone();
3894 let _guard = lock.lock_owned().await;
3895 match tx.commit().await.map_err(Into::into) {
3896 Ok(()) => {
3897 return Ok(Some(RoomGuard {
3898 data,
3899 _guard,
3900 _not_send: PhantomData,
3901 }));
3902 }
3903 Err(error) => {
3904 if !self.retry_on_serialization_error(&error, i).await {
3905 return Err(error);
3906 }
3907 }
3908 }
3909 }
3910 Ok(None) => match tx.commit().await.map_err(Into::into) {
3911 Ok(()) => return Ok(None),
3912 Err(error) => {
3913 if !self.retry_on_serialization_error(&error, i).await {
3914 return Err(error);
3915 }
3916 }
3917 },
3918 Err(error) => {
3919 tx.rollback().await?;
3920 if !self.retry_on_serialization_error(&error, i).await {
3921 return Err(error);
3922 }
3923 }
3924 }
3925 i += 1;
3926 }
3927 };
3928
3929 self.run(body).await
3930 }
3931
3932 async fn room_transaction<F, Fut, T>(&self, room_id: RoomId, f: F) -> Result<RoomGuard<T>>
3933 where
3934 F: Send + Fn(TransactionHandle) -> Fut,
3935 Fut: Send + Future<Output = Result<T>>,
3936 {
3937 let body = async {
3938 let mut i = 0;
3939 loop {
3940 let lock = self.rooms.entry(room_id).or_default().clone();
3941 let _guard = lock.lock_owned().await;
3942 let (tx, result) = self.with_transaction(&f).await?;
3943 match result {
3944 Ok(data) => match tx.commit().await.map_err(Into::into) {
3945 Ok(()) => {
3946 return Ok(RoomGuard {
3947 data,
3948 _guard,
3949 _not_send: PhantomData,
3950 });
3951 }
3952 Err(error) => {
3953 if !self.retry_on_serialization_error(&error, i).await {
3954 return Err(error);
3955 }
3956 }
3957 },
3958 Err(error) => {
3959 tx.rollback().await?;
3960 if !self.retry_on_serialization_error(&error, i).await {
3961 return Err(error);
3962 }
3963 }
3964 }
3965 i += 1;
3966 }
3967 };
3968
3969 self.run(body).await
3970 }
3971
3972 async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
3973 where
3974 F: Send + Fn(TransactionHandle) -> Fut,
3975 Fut: Send + Future<Output = Result<T>>,
3976 {
3977 let tx = self
3978 .pool
3979 .begin_with_config(Some(IsolationLevel::Serializable), None)
3980 .await?;
3981
3982 let mut tx = Arc::new(Some(tx));
3983 let result = f(TransactionHandle(tx.clone())).await;
3984 let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
3985 return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
3986 };
3987
3988 Ok((tx, result))
3989 }
3990
3991 async fn run<F, T>(&self, future: F) -> Result<T>
3992 where
3993 F: Future<Output = Result<T>>,
3994 {
3995 #[cfg(test)]
3996 {
3997 if let Executor::Deterministic(executor) = &self.executor {
3998 executor.simulate_random_delay().await;
3999 }
4000
4001 self.runtime.as_ref().unwrap().block_on(future)
4002 }
4003
4004 #[cfg(not(test))]
4005 {
4006 future.await
4007 }
4008 }
4009
4010 async fn retry_on_serialization_error(&self, error: &Error, prev_attempt_count: u32) -> bool {
4011 // If the error is due to a failure to serialize concurrent transactions, then retry
4012 // this transaction after a delay. With each subsequent retry, double the delay duration.
4013 // Also vary the delay randomly in order to ensure different database connections retry
4014 // at different times.
4015 if is_serialization_error(error) {
4016 let base_delay = 4_u64 << prev_attempt_count.min(16);
4017 let randomized_delay = base_delay as f32 * self.rng.lock().await.gen_range(0.5..=2.0);
4018 log::info!(
4019 "retrying transaction after serialization error. delay: {} ms.",
4020 randomized_delay
4021 );
4022 self.executor
4023 .sleep(Duration::from_millis(randomized_delay as u64))
4024 .await;
4025 true
4026 } else {
4027 false
4028 }
4029 }
4030}
4031
4032fn is_serialization_error(error: &Error) -> bool {
4033 const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
4034 match error {
4035 Error::Database(
4036 DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
4037 | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
4038 ) if error
4039 .as_database_error()
4040 .and_then(|error| error.code())
4041 .as_deref()
4042 == Some(SERIALIZATION_FAILURE_CODE) =>
4043 {
4044 true
4045 }
4046 _ => false,
4047 }
4048}
4049
4050struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
4051
4052impl Deref for TransactionHandle {
4053 type Target = DatabaseTransaction;
4054
4055 fn deref(&self) -> &Self::Target {
4056 self.0.as_ref().as_ref().unwrap()
4057 }
4058}
4059
4060pub struct RoomGuard<T> {
4061 data: T,
4062 _guard: OwnedMutexGuard<()>,
4063 _not_send: PhantomData<Rc<()>>,
4064}
4065
4066impl<T> Deref for RoomGuard<T> {
4067 type Target = T;
4068
4069 fn deref(&self) -> &T {
4070 &self.data
4071 }
4072}
4073
4074impl<T> DerefMut for RoomGuard<T> {
4075 fn deref_mut(&mut self) -> &mut T {
4076 &mut self.data
4077 }
4078}
4079
4080impl<T> RoomGuard<T> {
4081 pub fn into_inner(self) -> T {
4082 self.data
4083 }
4084}
4085
4086#[derive(Debug, Serialize, Deserialize)]
4087pub struct NewUserParams {
4088 pub github_login: String,
4089 pub github_user_id: i32,
4090 pub invite_count: i32,
4091}
4092
4093#[derive(Debug)]
4094pub struct NewUserResult {
4095 pub user_id: UserId,
4096 pub metrics_id: String,
4097 pub inviting_user_id: Option<UserId>,
4098 pub signup_device_id: Option<String>,
4099}
4100
4101#[derive(FromQueryResult, Debug, PartialEq)]
4102pub struct Channel {
4103 pub id: ChannelId,
4104 pub name: String,
4105 pub parent_id: Option<ChannelId>,
4106}
4107
4108#[derive(Debug, PartialEq)]
4109pub struct ChannelsForUser {
4110 pub channels: Vec<Channel>,
4111 pub channel_participants: HashMap<ChannelId, Vec<UserId>>,
4112 pub channels_with_admin_privileges: HashSet<ChannelId>,
4113}
4114
4115fn random_invite_code() -> String {
4116 nanoid::nanoid!(16)
4117}
4118
4119fn random_email_confirmation_code() -> String {
4120 nanoid::nanoid!(64)
4121}
4122
4123macro_rules! id_type {
4124 ($name:ident) => {
4125 #[derive(
4126 Clone,
4127 Copy,
4128 Debug,
4129 Default,
4130 PartialEq,
4131 Eq,
4132 PartialOrd,
4133 Ord,
4134 Hash,
4135 Serialize,
4136 Deserialize,
4137 )]
4138 #[serde(transparent)]
4139 pub struct $name(pub i32);
4140
4141 impl $name {
4142 #[allow(unused)]
4143 pub const MAX: Self = Self(i32::MAX);
4144
4145 #[allow(unused)]
4146 pub fn from_proto(value: u64) -> Self {
4147 Self(value as i32)
4148 }
4149
4150 #[allow(unused)]
4151 pub fn to_proto(self) -> u64 {
4152 self.0 as u64
4153 }
4154 }
4155
4156 impl std::fmt::Display for $name {
4157 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
4158 self.0.fmt(f)
4159 }
4160 }
4161
4162 impl From<$name> for sea_query::Value {
4163 fn from(value: $name) -> Self {
4164 sea_query::Value::Int(Some(value.0))
4165 }
4166 }
4167
4168 impl sea_orm::TryGetable for $name {
4169 fn try_get(
4170 res: &sea_orm::QueryResult,
4171 pre: &str,
4172 col: &str,
4173 ) -> Result<Self, sea_orm::TryGetError> {
4174 Ok(Self(i32::try_get(res, pre, col)?))
4175 }
4176 }
4177
4178 impl sea_query::ValueType for $name {
4179 fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
4180 match v {
4181 Value::TinyInt(Some(int)) => {
4182 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4183 }
4184 Value::SmallInt(Some(int)) => {
4185 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4186 }
4187 Value::Int(Some(int)) => {
4188 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4189 }
4190 Value::BigInt(Some(int)) => {
4191 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4192 }
4193 Value::TinyUnsigned(Some(int)) => {
4194 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4195 }
4196 Value::SmallUnsigned(Some(int)) => {
4197 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4198 }
4199 Value::Unsigned(Some(int)) => {
4200 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4201 }
4202 Value::BigUnsigned(Some(int)) => {
4203 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4204 }
4205 _ => Err(sea_query::ValueTypeErr),
4206 }
4207 }
4208
4209 fn type_name() -> String {
4210 stringify!($name).into()
4211 }
4212
4213 fn array_type() -> sea_query::ArrayType {
4214 sea_query::ArrayType::Int
4215 }
4216
4217 fn column_type() -> sea_query::ColumnType {
4218 sea_query::ColumnType::Integer(None)
4219 }
4220 }
4221
4222 impl sea_orm::TryFromU64 for $name {
4223 fn try_from_u64(n: u64) -> Result<Self, DbErr> {
4224 Ok(Self(n.try_into().map_err(|_| {
4225 DbErr::ConvertFromU64(concat!(
4226 "error converting ",
4227 stringify!($name),
4228 " to u64"
4229 ))
4230 })?))
4231 }
4232 }
4233
4234 impl sea_query::Nullable for $name {
4235 fn null() -> Value {
4236 Value::Int(None)
4237 }
4238 }
4239 };
4240}
4241
4242id_type!(AccessTokenId);
4243id_type!(ChannelId);
4244id_type!(ChannelMemberId);
4245id_type!(ContactId);
4246id_type!(FollowerId);
4247id_type!(RoomId);
4248id_type!(RoomParticipantId);
4249id_type!(ProjectId);
4250id_type!(ProjectCollaboratorId);
4251id_type!(ReplicaId);
4252id_type!(ServerId);
4253id_type!(SignupId);
4254id_type!(UserId);
4255
4256#[derive(Clone)]
4257pub struct JoinRoom {
4258 pub room: proto::Room,
4259 pub channel_id: Option<ChannelId>,
4260 pub channel_members: Vec<UserId>,
4261}
4262
4263pub struct RejoinedRoom {
4264 pub room: proto::Room,
4265 pub rejoined_projects: Vec<RejoinedProject>,
4266 pub reshared_projects: Vec<ResharedProject>,
4267 pub channel_id: Option<ChannelId>,
4268 pub channel_members: Vec<UserId>,
4269}
4270
4271pub struct ResharedProject {
4272 pub id: ProjectId,
4273 pub old_connection_id: ConnectionId,
4274 pub collaborators: Vec<ProjectCollaborator>,
4275 pub worktrees: Vec<proto::WorktreeMetadata>,
4276}
4277
4278pub struct RejoinedProject {
4279 pub id: ProjectId,
4280 pub old_connection_id: ConnectionId,
4281 pub collaborators: Vec<ProjectCollaborator>,
4282 pub worktrees: Vec<RejoinedWorktree>,
4283 pub language_servers: Vec<proto::LanguageServer>,
4284}
4285
4286#[derive(Debug)]
4287pub struct RejoinedWorktree {
4288 pub id: u64,
4289 pub abs_path: String,
4290 pub root_name: String,
4291 pub visible: bool,
4292 pub updated_entries: Vec<proto::Entry>,
4293 pub removed_entries: Vec<u64>,
4294 pub updated_repositories: Vec<proto::RepositoryEntry>,
4295 pub removed_repositories: Vec<u64>,
4296 pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
4297 pub settings_files: Vec<WorktreeSettingsFile>,
4298 pub scan_id: u64,
4299 pub completed_scan_id: u64,
4300}
4301
4302pub struct LeftRoom {
4303 pub room: proto::Room,
4304 pub channel_id: Option<ChannelId>,
4305 pub channel_members: Vec<UserId>,
4306 pub left_projects: HashMap<ProjectId, LeftProject>,
4307 pub canceled_calls_to_user_ids: Vec<UserId>,
4308 pub deleted: bool,
4309}
4310
4311pub struct RefreshedRoom {
4312 pub room: proto::Room,
4313 pub channel_id: Option<ChannelId>,
4314 pub channel_members: Vec<UserId>,
4315 pub stale_participant_user_ids: Vec<UserId>,
4316 pub canceled_calls_to_user_ids: Vec<UserId>,
4317}
4318
4319pub struct Project {
4320 pub collaborators: Vec<ProjectCollaborator>,
4321 pub worktrees: BTreeMap<u64, Worktree>,
4322 pub language_servers: Vec<proto::LanguageServer>,
4323}
4324
4325pub struct ProjectCollaborator {
4326 pub connection_id: ConnectionId,
4327 pub user_id: UserId,
4328 pub replica_id: ReplicaId,
4329 pub is_host: bool,
4330}
4331
4332impl ProjectCollaborator {
4333 pub fn to_proto(&self) -> proto::Collaborator {
4334 proto::Collaborator {
4335 peer_id: Some(self.connection_id.into()),
4336 replica_id: self.replica_id.0 as u32,
4337 user_id: self.user_id.to_proto(),
4338 }
4339 }
4340}
4341
4342#[derive(Debug)]
4343pub struct LeftProject {
4344 pub id: ProjectId,
4345 pub host_user_id: UserId,
4346 pub host_connection_id: ConnectionId,
4347 pub connection_ids: Vec<ConnectionId>,
4348}
4349
4350pub struct Worktree {
4351 pub id: u64,
4352 pub abs_path: String,
4353 pub root_name: String,
4354 pub visible: bool,
4355 pub entries: Vec<proto::Entry>,
4356 pub repository_entries: BTreeMap<u64, proto::RepositoryEntry>,
4357 pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
4358 pub settings_files: Vec<WorktreeSettingsFile>,
4359 pub scan_id: u64,
4360 pub completed_scan_id: u64,
4361}
4362
4363#[derive(Debug)]
4364pub struct WorktreeSettingsFile {
4365 pub path: String,
4366 pub content: String,
4367}
4368
4369#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
4370enum QueryUserIds {
4371 UserId,
4372}
4373
4374#[cfg(test)]
4375pub use test::*;
4376
4377#[cfg(test)]
4378mod test {
4379 use super::*;
4380 use gpui::executor::Background;
4381 use parking_lot::Mutex;
4382 use sea_orm::ConnectionTrait;
4383 use sqlx::migrate::MigrateDatabase;
4384 use std::sync::Arc;
4385
4386 pub struct TestDb {
4387 pub db: Option<Arc<Database>>,
4388 pub connection: Option<sqlx::AnyConnection>,
4389 }
4390
4391 impl TestDb {
4392 pub fn sqlite(background: Arc<Background>) -> Self {
4393 let url = format!("sqlite::memory:");
4394 let runtime = tokio::runtime::Builder::new_current_thread()
4395 .enable_io()
4396 .enable_time()
4397 .build()
4398 .unwrap();
4399
4400 let mut db = runtime.block_on(async {
4401 let mut options = ConnectOptions::new(url);
4402 options.max_connections(5);
4403 let db = Database::new(options, Executor::Deterministic(background))
4404 .await
4405 .unwrap();
4406 let sql = include_str!(concat!(
4407 env!("CARGO_MANIFEST_DIR"),
4408 "/migrations.sqlite/20221109000000_test_schema.sql"
4409 ));
4410 db.pool
4411 .execute(sea_orm::Statement::from_string(
4412 db.pool.get_database_backend(),
4413 sql.into(),
4414 ))
4415 .await
4416 .unwrap();
4417 db
4418 });
4419
4420 db.runtime = Some(runtime);
4421
4422 Self {
4423 db: Some(Arc::new(db)),
4424 connection: None,
4425 }
4426 }
4427
4428 pub fn postgres(background: Arc<Background>) -> Self {
4429 static LOCK: Mutex<()> = Mutex::new(());
4430
4431 let _guard = LOCK.lock();
4432 let mut rng = StdRng::from_entropy();
4433 let url = format!(
4434 "postgres://postgres@localhost/zed-test-{}",
4435 rng.gen::<u128>()
4436 );
4437 let runtime = tokio::runtime::Builder::new_current_thread()
4438 .enable_io()
4439 .enable_time()
4440 .build()
4441 .unwrap();
4442
4443 let mut db = runtime.block_on(async {
4444 sqlx::Postgres::create_database(&url)
4445 .await
4446 .expect("failed to create test db");
4447 let mut options = ConnectOptions::new(url);
4448 options
4449 .max_connections(5)
4450 .idle_timeout(Duration::from_secs(0));
4451 let db = Database::new(options, Executor::Deterministic(background))
4452 .await
4453 .unwrap();
4454 let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
4455 db.migrate(Path::new(migrations_path), false).await.unwrap();
4456 db
4457 });
4458
4459 db.runtime = Some(runtime);
4460
4461 Self {
4462 db: Some(Arc::new(db)),
4463 connection: None,
4464 }
4465 }
4466
4467 pub fn db(&self) -> &Arc<Database> {
4468 self.db.as_ref().unwrap()
4469 }
4470 }
4471
4472 impl Drop for TestDb {
4473 fn drop(&mut self) {
4474 let db = self.db.take().unwrap();
4475 if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
4476 db.runtime.as_ref().unwrap().block_on(async {
4477 use util::ResultExt;
4478 let query = "
4479 SELECT pg_terminate_backend(pg_stat_activity.pid)
4480 FROM pg_stat_activity
4481 WHERE
4482 pg_stat_activity.datname = current_database() AND
4483 pid <> pg_backend_pid();
4484 ";
4485 db.pool
4486 .execute(sea_orm::Statement::from_string(
4487 db.pool.get_database_backend(),
4488 query.into(),
4489 ))
4490 .await
4491 .log_err();
4492 sqlx::Postgres::drop_database(db.options.get_url())
4493 .await
4494 .log_err();
4495 })
4496 }
4497 }
4498 }
4499}