1mod access_token;
2mod channel;
3mod channel_member;
4mod channel_parent;
5mod contact;
6mod follower;
7mod language_server;
8mod project;
9mod project_collaborator;
10mod room;
11mod room_participant;
12mod server;
13mod signup;
14#[cfg(test)]
15mod tests;
16mod user;
17mod worktree;
18mod worktree_diagnostic_summary;
19mod worktree_entry;
20mod worktree_repository;
21mod worktree_repository_statuses;
22mod worktree_settings_file;
23
24use crate::executor::Executor;
25use crate::{Error, Result};
26use anyhow::anyhow;
27use collections::{BTreeMap, HashMap, HashSet};
28pub use contact::Contact;
29use dashmap::DashMap;
30use futures::StreamExt;
31use hyper::StatusCode;
32use rand::prelude::StdRng;
33use rand::{Rng, SeedableRng};
34use rpc::{proto, ConnectionId};
35use sea_orm::Condition;
36pub use sea_orm::ConnectOptions;
37use sea_orm::{
38 entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
39 DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
40 Statement, TransactionTrait,
41};
42use sea_query::{Alias, Expr, OnConflict, Query};
43use serde::{Deserialize, Serialize};
44pub use signup::{Invite, NewSignup, WaitlistSummary};
45use sqlx::migrate::{Migrate, Migration, MigrationSource};
46use sqlx::Connection;
47use std::fmt::Write as _;
48use std::ops::{Deref, DerefMut};
49use std::path::Path;
50use std::time::Duration;
51use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
52use tokio::sync::{Mutex, OwnedMutexGuard};
53pub use user::Model as User;
54
55pub struct Database {
56 options: ConnectOptions,
57 pool: DatabaseConnection,
58 rooms: DashMap<RoomId, Arc<Mutex<()>>>,
59 rng: Mutex<StdRng>,
60 executor: Executor,
61 #[cfg(test)]
62 runtime: Option<tokio::runtime::Runtime>,
63}
64
65impl Database {
66 pub async fn new(options: ConnectOptions, executor: Executor) -> Result<Self> {
67 Ok(Self {
68 options: options.clone(),
69 pool: sea_orm::Database::connect(options).await?,
70 rooms: DashMap::with_capacity(16384),
71 rng: Mutex::new(StdRng::seed_from_u64(0)),
72 executor,
73 #[cfg(test)]
74 runtime: None,
75 })
76 }
77
78 #[cfg(test)]
79 pub fn reset(&self) {
80 self.rooms.clear();
81 }
82
83 pub async fn migrate(
84 &self,
85 migrations_path: &Path,
86 ignore_checksum_mismatch: bool,
87 ) -> anyhow::Result<Vec<(Migration, Duration)>> {
88 let migrations = MigrationSource::resolve(migrations_path)
89 .await
90 .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
91
92 let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
93
94 connection.ensure_migrations_table().await?;
95 let applied_migrations: HashMap<_, _> = connection
96 .list_applied_migrations()
97 .await?
98 .into_iter()
99 .map(|m| (m.version, m))
100 .collect();
101
102 let mut new_migrations = Vec::new();
103 for migration in migrations {
104 match applied_migrations.get(&migration.version) {
105 Some(applied_migration) => {
106 if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
107 {
108 Err(anyhow!(
109 "checksum mismatch for applied migration {}",
110 migration.description
111 ))?;
112 }
113 }
114 None => {
115 let elapsed = connection.apply(&migration).await?;
116 new_migrations.push((migration, elapsed));
117 }
118 }
119 }
120
121 Ok(new_migrations)
122 }
123
124 pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
125 self.transaction(|tx| async move {
126 let server = server::ActiveModel {
127 environment: ActiveValue::set(environment.into()),
128 ..Default::default()
129 }
130 .insert(&*tx)
131 .await?;
132 Ok(server.id)
133 })
134 .await
135 }
136
137 pub async fn stale_room_ids(
138 &self,
139 environment: &str,
140 new_server_id: ServerId,
141 ) -> Result<Vec<RoomId>> {
142 self.transaction(|tx| async move {
143 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
144 enum QueryAs {
145 RoomId,
146 }
147
148 let stale_server_epochs = self
149 .stale_server_ids(environment, new_server_id, &tx)
150 .await?;
151 Ok(room_participant::Entity::find()
152 .select_only()
153 .column(room_participant::Column::RoomId)
154 .distinct()
155 .filter(
156 room_participant::Column::AnsweringConnectionServerId
157 .is_in(stale_server_epochs),
158 )
159 .into_values::<_, QueryAs>()
160 .all(&*tx)
161 .await?)
162 })
163 .await
164 }
165
166 pub async fn refresh_room(
167 &self,
168 room_id: RoomId,
169 new_server_id: ServerId,
170 ) -> Result<RoomGuard<RefreshedRoom>> {
171 self.room_transaction(room_id, |tx| async move {
172 let stale_participant_filter = Condition::all()
173 .add(room_participant::Column::RoomId.eq(room_id))
174 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
175 .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
176
177 let stale_participant_user_ids = room_participant::Entity::find()
178 .filter(stale_participant_filter.clone())
179 .all(&*tx)
180 .await?
181 .into_iter()
182 .map(|participant| participant.user_id)
183 .collect::<Vec<_>>();
184
185 // Delete participants who failed to reconnect and cancel their calls.
186 let mut canceled_calls_to_user_ids = Vec::new();
187 room_participant::Entity::delete_many()
188 .filter(stale_participant_filter)
189 .exec(&*tx)
190 .await?;
191 let called_participants = room_participant::Entity::find()
192 .filter(
193 Condition::all()
194 .add(
195 room_participant::Column::CallingUserId
196 .is_in(stale_participant_user_ids.iter().copied()),
197 )
198 .add(room_participant::Column::AnsweringConnectionId.is_null()),
199 )
200 .all(&*tx)
201 .await?;
202 room_participant::Entity::delete_many()
203 .filter(
204 room_participant::Column::Id
205 .is_in(called_participants.iter().map(|participant| participant.id)),
206 )
207 .exec(&*tx)
208 .await?;
209 canceled_calls_to_user_ids.extend(
210 called_participants
211 .into_iter()
212 .map(|participant| participant.user_id),
213 );
214
215 let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
216 let channel_members = if let Some(channel_id) = channel_id {
217 self.get_channel_members_internal(channel_id, &tx).await?
218 } else {
219 Vec::new()
220 };
221
222 // Delete the room if it becomes empty.
223 if room.participants.is_empty() {
224 project::Entity::delete_many()
225 .filter(project::Column::RoomId.eq(room_id))
226 .exec(&*tx)
227 .await?;
228 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
229 }
230
231 Ok(RefreshedRoom {
232 room,
233 channel_id,
234 channel_members,
235 stale_participant_user_ids,
236 canceled_calls_to_user_ids,
237 })
238 })
239 .await
240 }
241
242 pub async fn delete_stale_servers(
243 &self,
244 environment: &str,
245 new_server_id: ServerId,
246 ) -> Result<()> {
247 self.transaction(|tx| async move {
248 server::Entity::delete_many()
249 .filter(
250 Condition::all()
251 .add(server::Column::Environment.eq(environment))
252 .add(server::Column::Id.ne(new_server_id)),
253 )
254 .exec(&*tx)
255 .await?;
256 Ok(())
257 })
258 .await
259 }
260
261 async fn stale_server_ids(
262 &self,
263 environment: &str,
264 new_server_id: ServerId,
265 tx: &DatabaseTransaction,
266 ) -> Result<Vec<ServerId>> {
267 let stale_servers = server::Entity::find()
268 .filter(
269 Condition::all()
270 .add(server::Column::Environment.eq(environment))
271 .add(server::Column::Id.ne(new_server_id)),
272 )
273 .all(&*tx)
274 .await?;
275 Ok(stale_servers.into_iter().map(|server| server.id).collect())
276 }
277
278 // users
279
280 pub async fn create_user(
281 &self,
282 email_address: &str,
283 admin: bool,
284 params: NewUserParams,
285 ) -> Result<NewUserResult> {
286 self.transaction(|tx| async {
287 let tx = tx;
288 let user = user::Entity::insert(user::ActiveModel {
289 email_address: ActiveValue::set(Some(email_address.into())),
290 github_login: ActiveValue::set(params.github_login.clone()),
291 github_user_id: ActiveValue::set(Some(params.github_user_id)),
292 admin: ActiveValue::set(admin),
293 metrics_id: ActiveValue::set(Uuid::new_v4()),
294 ..Default::default()
295 })
296 .on_conflict(
297 OnConflict::column(user::Column::GithubLogin)
298 .update_column(user::Column::GithubLogin)
299 .to_owned(),
300 )
301 .exec_with_returning(&*tx)
302 .await?;
303
304 Ok(NewUserResult {
305 user_id: user.id,
306 metrics_id: user.metrics_id.to_string(),
307 signup_device_id: None,
308 inviting_user_id: None,
309 })
310 })
311 .await
312 }
313
314 pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
315 self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
316 .await
317 }
318
319 pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
320 self.transaction(|tx| async {
321 let tx = tx;
322 Ok(user::Entity::find()
323 .filter(user::Column::Id.is_in(ids.iter().copied()))
324 .all(&*tx)
325 .await?)
326 })
327 .await
328 }
329
330 pub async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>> {
331 self.transaction(|tx| async move {
332 Ok(user::Entity::find()
333 .filter(user::Column::GithubLogin.eq(github_login))
334 .one(&*tx)
335 .await?)
336 })
337 .await
338 }
339
340 pub async fn get_or_create_user_by_github_account(
341 &self,
342 github_login: &str,
343 github_user_id: Option<i32>,
344 github_email: Option<&str>,
345 ) -> Result<Option<User>> {
346 self.transaction(|tx| async move {
347 let tx = &*tx;
348 if let Some(github_user_id) = github_user_id {
349 if let Some(user_by_github_user_id) = user::Entity::find()
350 .filter(user::Column::GithubUserId.eq(github_user_id))
351 .one(tx)
352 .await?
353 {
354 let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
355 user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
356 Ok(Some(user_by_github_user_id.update(tx).await?))
357 } else if let Some(user_by_github_login) = user::Entity::find()
358 .filter(user::Column::GithubLogin.eq(github_login))
359 .one(tx)
360 .await?
361 {
362 let mut user_by_github_login = user_by_github_login.into_active_model();
363 user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
364 Ok(Some(user_by_github_login.update(tx).await?))
365 } else {
366 let user = user::Entity::insert(user::ActiveModel {
367 email_address: ActiveValue::set(github_email.map(|email| email.into())),
368 github_login: ActiveValue::set(github_login.into()),
369 github_user_id: ActiveValue::set(Some(github_user_id)),
370 admin: ActiveValue::set(false),
371 invite_count: ActiveValue::set(0),
372 invite_code: ActiveValue::set(None),
373 metrics_id: ActiveValue::set(Uuid::new_v4()),
374 ..Default::default()
375 })
376 .exec_with_returning(&*tx)
377 .await?;
378 Ok(Some(user))
379 }
380 } else {
381 Ok(user::Entity::find()
382 .filter(user::Column::GithubLogin.eq(github_login))
383 .one(tx)
384 .await?)
385 }
386 })
387 .await
388 }
389
390 pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
391 self.transaction(|tx| async move {
392 Ok(user::Entity::find()
393 .order_by_asc(user::Column::GithubLogin)
394 .limit(limit as u64)
395 .offset(page as u64 * limit as u64)
396 .all(&*tx)
397 .await?)
398 })
399 .await
400 }
401
402 pub async fn get_users_with_no_invites(
403 &self,
404 invited_by_another_user: bool,
405 ) -> Result<Vec<User>> {
406 self.transaction(|tx| async move {
407 Ok(user::Entity::find()
408 .filter(
409 user::Column::InviteCount
410 .eq(0)
411 .and(if invited_by_another_user {
412 user::Column::InviterId.is_not_null()
413 } else {
414 user::Column::InviterId.is_null()
415 }),
416 )
417 .all(&*tx)
418 .await?)
419 })
420 .await
421 }
422
423 pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
424 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
425 enum QueryAs {
426 MetricsId,
427 }
428
429 self.transaction(|tx| async move {
430 let metrics_id: Uuid = user::Entity::find_by_id(id)
431 .select_only()
432 .column(user::Column::MetricsId)
433 .into_values::<_, QueryAs>()
434 .one(&*tx)
435 .await?
436 .ok_or_else(|| anyhow!("could not find user"))?;
437 Ok(metrics_id.to_string())
438 })
439 .await
440 }
441
442 pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
443 self.transaction(|tx| async move {
444 user::Entity::update_many()
445 .filter(user::Column::Id.eq(id))
446 .set(user::ActiveModel {
447 admin: ActiveValue::set(is_admin),
448 ..Default::default()
449 })
450 .exec(&*tx)
451 .await?;
452 Ok(())
453 })
454 .await
455 }
456
457 pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
458 self.transaction(|tx| async move {
459 user::Entity::update_many()
460 .filter(user::Column::Id.eq(id))
461 .set(user::ActiveModel {
462 connected_once: ActiveValue::set(connected_once),
463 ..Default::default()
464 })
465 .exec(&*tx)
466 .await?;
467 Ok(())
468 })
469 .await
470 }
471
472 pub async fn destroy_user(&self, id: UserId) -> Result<()> {
473 self.transaction(|tx| async move {
474 access_token::Entity::delete_many()
475 .filter(access_token::Column::UserId.eq(id))
476 .exec(&*tx)
477 .await?;
478 user::Entity::delete_by_id(id).exec(&*tx).await?;
479 Ok(())
480 })
481 .await
482 }
483
484 // contacts
485
486 pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
487 #[derive(Debug, FromQueryResult)]
488 struct ContactWithUserBusyStatuses {
489 user_id_a: UserId,
490 user_id_b: UserId,
491 a_to_b: bool,
492 accepted: bool,
493 should_notify: bool,
494 user_a_busy: bool,
495 user_b_busy: bool,
496 }
497
498 self.transaction(|tx| async move {
499 let user_a_participant = Alias::new("user_a_participant");
500 let user_b_participant = Alias::new("user_b_participant");
501 let mut db_contacts = contact::Entity::find()
502 .column_as(
503 Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
504 .is_not_null(),
505 "user_a_busy",
506 )
507 .column_as(
508 Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
509 .is_not_null(),
510 "user_b_busy",
511 )
512 .filter(
513 contact::Column::UserIdA
514 .eq(user_id)
515 .or(contact::Column::UserIdB.eq(user_id)),
516 )
517 .join_as(
518 JoinType::LeftJoin,
519 contact::Relation::UserARoomParticipant.def(),
520 user_a_participant,
521 )
522 .join_as(
523 JoinType::LeftJoin,
524 contact::Relation::UserBRoomParticipant.def(),
525 user_b_participant,
526 )
527 .into_model::<ContactWithUserBusyStatuses>()
528 .stream(&*tx)
529 .await?;
530
531 let mut contacts = Vec::new();
532 while let Some(db_contact) = db_contacts.next().await {
533 let db_contact = db_contact?;
534 if db_contact.user_id_a == user_id {
535 if db_contact.accepted {
536 contacts.push(Contact::Accepted {
537 user_id: db_contact.user_id_b,
538 should_notify: db_contact.should_notify && db_contact.a_to_b,
539 busy: db_contact.user_b_busy,
540 });
541 } else if db_contact.a_to_b {
542 contacts.push(Contact::Outgoing {
543 user_id: db_contact.user_id_b,
544 })
545 } else {
546 contacts.push(Contact::Incoming {
547 user_id: db_contact.user_id_b,
548 should_notify: db_contact.should_notify,
549 });
550 }
551 } else if db_contact.accepted {
552 contacts.push(Contact::Accepted {
553 user_id: db_contact.user_id_a,
554 should_notify: db_contact.should_notify && !db_contact.a_to_b,
555 busy: db_contact.user_a_busy,
556 });
557 } else if db_contact.a_to_b {
558 contacts.push(Contact::Incoming {
559 user_id: db_contact.user_id_a,
560 should_notify: db_contact.should_notify,
561 });
562 } else {
563 contacts.push(Contact::Outgoing {
564 user_id: db_contact.user_id_a,
565 });
566 }
567 }
568
569 contacts.sort_unstable_by_key(|contact| contact.user_id());
570
571 Ok(contacts)
572 })
573 .await
574 }
575
576 pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
577 self.transaction(|tx| async move {
578 let participant = room_participant::Entity::find()
579 .filter(room_participant::Column::UserId.eq(user_id))
580 .one(&*tx)
581 .await?;
582 Ok(participant.is_some())
583 })
584 .await
585 }
586
587 pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
588 self.transaction(|tx| async move {
589 let (id_a, id_b) = if user_id_1 < user_id_2 {
590 (user_id_1, user_id_2)
591 } else {
592 (user_id_2, user_id_1)
593 };
594
595 Ok(contact::Entity::find()
596 .filter(
597 contact::Column::UserIdA
598 .eq(id_a)
599 .and(contact::Column::UserIdB.eq(id_b))
600 .and(contact::Column::Accepted.eq(true)),
601 )
602 .one(&*tx)
603 .await?
604 .is_some())
605 })
606 .await
607 }
608
609 pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
610 self.transaction(|tx| async move {
611 let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
612 (sender_id, receiver_id, true)
613 } else {
614 (receiver_id, sender_id, false)
615 };
616
617 let rows_affected = contact::Entity::insert(contact::ActiveModel {
618 user_id_a: ActiveValue::set(id_a),
619 user_id_b: ActiveValue::set(id_b),
620 a_to_b: ActiveValue::set(a_to_b),
621 accepted: ActiveValue::set(false),
622 should_notify: ActiveValue::set(true),
623 ..Default::default()
624 })
625 .on_conflict(
626 OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
627 .values([
628 (contact::Column::Accepted, true.into()),
629 (contact::Column::ShouldNotify, false.into()),
630 ])
631 .action_and_where(
632 contact::Column::Accepted.eq(false).and(
633 contact::Column::AToB
634 .eq(a_to_b)
635 .and(contact::Column::UserIdA.eq(id_b))
636 .or(contact::Column::AToB
637 .ne(a_to_b)
638 .and(contact::Column::UserIdA.eq(id_a))),
639 ),
640 )
641 .to_owned(),
642 )
643 .exec_without_returning(&*tx)
644 .await?;
645
646 if rows_affected == 1 {
647 Ok(())
648 } else {
649 Err(anyhow!("contact already requested"))?
650 }
651 })
652 .await
653 }
654
655 /// Returns a bool indicating whether the removed contact had originally accepted or not
656 ///
657 /// Deletes the contact identified by the requester and responder ids, and then returns
658 /// whether the deleted contact had originally accepted or was a pending contact request.
659 ///
660 /// # Arguments
661 ///
662 /// * `requester_id` - The user that initiates this request
663 /// * `responder_id` - The user that will be removed
664 pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<bool> {
665 self.transaction(|tx| async move {
666 let (id_a, id_b) = if responder_id < requester_id {
667 (responder_id, requester_id)
668 } else {
669 (requester_id, responder_id)
670 };
671
672 let contact = contact::Entity::find()
673 .filter(
674 contact::Column::UserIdA
675 .eq(id_a)
676 .and(contact::Column::UserIdB.eq(id_b)),
677 )
678 .one(&*tx)
679 .await?
680 .ok_or_else(|| anyhow!("no such contact"))?;
681
682 contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
683 Ok(contact.accepted)
684 })
685 .await
686 }
687
688 pub async fn dismiss_contact_notification(
689 &self,
690 user_id: UserId,
691 contact_user_id: UserId,
692 ) -> Result<()> {
693 self.transaction(|tx| async move {
694 let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
695 (user_id, contact_user_id, true)
696 } else {
697 (contact_user_id, user_id, false)
698 };
699
700 let result = contact::Entity::update_many()
701 .set(contact::ActiveModel {
702 should_notify: ActiveValue::set(false),
703 ..Default::default()
704 })
705 .filter(
706 contact::Column::UserIdA
707 .eq(id_a)
708 .and(contact::Column::UserIdB.eq(id_b))
709 .and(
710 contact::Column::AToB
711 .eq(a_to_b)
712 .and(contact::Column::Accepted.eq(true))
713 .or(contact::Column::AToB
714 .ne(a_to_b)
715 .and(contact::Column::Accepted.eq(false))),
716 ),
717 )
718 .exec(&*tx)
719 .await?;
720 if result.rows_affected == 0 {
721 Err(anyhow!("no such contact request"))?
722 } else {
723 Ok(())
724 }
725 })
726 .await
727 }
728
729 pub async fn respond_to_contact_request(
730 &self,
731 responder_id: UserId,
732 requester_id: UserId,
733 accept: bool,
734 ) -> Result<()> {
735 self.transaction(|tx| async move {
736 let (id_a, id_b, a_to_b) = if responder_id < requester_id {
737 (responder_id, requester_id, false)
738 } else {
739 (requester_id, responder_id, true)
740 };
741 let rows_affected = if accept {
742 let result = contact::Entity::update_many()
743 .set(contact::ActiveModel {
744 accepted: ActiveValue::set(true),
745 should_notify: ActiveValue::set(true),
746 ..Default::default()
747 })
748 .filter(
749 contact::Column::UserIdA
750 .eq(id_a)
751 .and(contact::Column::UserIdB.eq(id_b))
752 .and(contact::Column::AToB.eq(a_to_b)),
753 )
754 .exec(&*tx)
755 .await?;
756 result.rows_affected
757 } else {
758 let result = contact::Entity::delete_many()
759 .filter(
760 contact::Column::UserIdA
761 .eq(id_a)
762 .and(contact::Column::UserIdB.eq(id_b))
763 .and(contact::Column::AToB.eq(a_to_b))
764 .and(contact::Column::Accepted.eq(false)),
765 )
766 .exec(&*tx)
767 .await?;
768
769 result.rows_affected
770 };
771
772 if rows_affected == 1 {
773 Ok(())
774 } else {
775 Err(anyhow!("no such contact request"))?
776 }
777 })
778 .await
779 }
780
781 pub fn fuzzy_like_string(string: &str) -> String {
782 let mut result = String::with_capacity(string.len() * 2 + 1);
783 for c in string.chars() {
784 if c.is_alphanumeric() {
785 result.push('%');
786 result.push(c);
787 }
788 }
789 result.push('%');
790 result
791 }
792
793 pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
794 self.transaction(|tx| async {
795 let tx = tx;
796 let like_string = Self::fuzzy_like_string(name_query);
797 let query = "
798 SELECT users.*
799 FROM users
800 WHERE github_login ILIKE $1
801 ORDER BY github_login <-> $2
802 LIMIT $3
803 ";
804
805 Ok(user::Entity::find()
806 .from_raw_sql(Statement::from_sql_and_values(
807 self.pool.get_database_backend(),
808 query.into(),
809 vec![like_string.into(), name_query.into(), limit.into()],
810 ))
811 .all(&*tx)
812 .await?)
813 })
814 .await
815 }
816
817 // signups
818
819 pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
820 self.transaction(|tx| async move {
821 signup::Entity::insert(signup::ActiveModel {
822 email_address: ActiveValue::set(signup.email_address.clone()),
823 email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
824 email_confirmation_sent: ActiveValue::set(false),
825 platform_mac: ActiveValue::set(signup.platform_mac),
826 platform_windows: ActiveValue::set(signup.platform_windows),
827 platform_linux: ActiveValue::set(signup.platform_linux),
828 platform_unknown: ActiveValue::set(false),
829 editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
830 programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
831 device_id: ActiveValue::set(signup.device_id.clone()),
832 added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
833 ..Default::default()
834 })
835 .on_conflict(
836 OnConflict::column(signup::Column::EmailAddress)
837 .update_columns([
838 signup::Column::PlatformMac,
839 signup::Column::PlatformWindows,
840 signup::Column::PlatformLinux,
841 signup::Column::EditorFeatures,
842 signup::Column::ProgrammingLanguages,
843 signup::Column::DeviceId,
844 signup::Column::AddedToMailingList,
845 ])
846 .to_owned(),
847 )
848 .exec(&*tx)
849 .await?;
850 Ok(())
851 })
852 .await
853 }
854
855 pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
856 self.transaction(|tx| async move {
857 let signup = signup::Entity::find()
858 .filter(signup::Column::EmailAddress.eq(email_address))
859 .one(&*tx)
860 .await?
861 .ok_or_else(|| {
862 anyhow!("signup with email address {} doesn't exist", email_address)
863 })?;
864
865 Ok(signup)
866 })
867 .await
868 }
869
870 pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
871 self.transaction(|tx| async move {
872 let query = "
873 SELECT
874 COUNT(*) as count,
875 COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
876 COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
877 COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
878 COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
879 FROM (
880 SELECT *
881 FROM signups
882 WHERE
883 NOT email_confirmation_sent
884 ) AS unsent
885 ";
886 Ok(
887 WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
888 self.pool.get_database_backend(),
889 query.into(),
890 vec![],
891 ))
892 .one(&*tx)
893 .await?
894 .ok_or_else(|| anyhow!("invalid result"))?,
895 )
896 })
897 .await
898 }
899
900 pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
901 let emails = invites
902 .iter()
903 .map(|s| s.email_address.as_str())
904 .collect::<Vec<_>>();
905 self.transaction(|tx| async {
906 let tx = tx;
907 signup::Entity::update_many()
908 .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
909 .set(signup::ActiveModel {
910 email_confirmation_sent: ActiveValue::set(true),
911 ..Default::default()
912 })
913 .exec(&*tx)
914 .await?;
915 Ok(())
916 })
917 .await
918 }
919
920 pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
921 self.transaction(|tx| async move {
922 Ok(signup::Entity::find()
923 .select_only()
924 .column(signup::Column::EmailAddress)
925 .column(signup::Column::EmailConfirmationCode)
926 .filter(
927 signup::Column::EmailConfirmationSent.eq(false).and(
928 signup::Column::PlatformMac
929 .eq(true)
930 .or(signup::Column::PlatformUnknown.eq(true)),
931 ),
932 )
933 .order_by_asc(signup::Column::CreatedAt)
934 .limit(count as u64)
935 .into_model()
936 .all(&*tx)
937 .await?)
938 })
939 .await
940 }
941
942 // invite codes
943
944 pub async fn create_invite_from_code(
945 &self,
946 code: &str,
947 email_address: &str,
948 device_id: Option<&str>,
949 added_to_mailing_list: bool,
950 ) -> Result<Invite> {
951 self.transaction(|tx| async move {
952 let existing_user = user::Entity::find()
953 .filter(user::Column::EmailAddress.eq(email_address))
954 .one(&*tx)
955 .await?;
956
957 if existing_user.is_some() {
958 Err(anyhow!("email address is already in use"))?;
959 }
960
961 let inviting_user_with_invites = match user::Entity::find()
962 .filter(
963 user::Column::InviteCode
964 .eq(code)
965 .and(user::Column::InviteCount.gt(0)),
966 )
967 .one(&*tx)
968 .await?
969 {
970 Some(inviting_user) => inviting_user,
971 None => {
972 return Err(Error::Http(
973 StatusCode::UNAUTHORIZED,
974 "unable to find an invite code with invites remaining".to_string(),
975 ))?
976 }
977 };
978 user::Entity::update_many()
979 .filter(
980 user::Column::Id
981 .eq(inviting_user_with_invites.id)
982 .and(user::Column::InviteCount.gt(0)),
983 )
984 .col_expr(
985 user::Column::InviteCount,
986 Expr::col(user::Column::InviteCount).sub(1),
987 )
988 .exec(&*tx)
989 .await?;
990
991 let signup = signup::Entity::insert(signup::ActiveModel {
992 email_address: ActiveValue::set(email_address.into()),
993 email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
994 email_confirmation_sent: ActiveValue::set(false),
995 inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
996 platform_linux: ActiveValue::set(false),
997 platform_mac: ActiveValue::set(false),
998 platform_windows: ActiveValue::set(false),
999 platform_unknown: ActiveValue::set(true),
1000 device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
1001 added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
1002 ..Default::default()
1003 })
1004 .on_conflict(
1005 OnConflict::column(signup::Column::EmailAddress)
1006 .update_column(signup::Column::InvitingUserId)
1007 .to_owned(),
1008 )
1009 .exec_with_returning(&*tx)
1010 .await?;
1011
1012 Ok(Invite {
1013 email_address: signup.email_address,
1014 email_confirmation_code: signup.email_confirmation_code,
1015 })
1016 })
1017 .await
1018 }
1019
1020 pub async fn create_user_from_invite(
1021 &self,
1022 invite: &Invite,
1023 user: NewUserParams,
1024 ) -> Result<Option<NewUserResult>> {
1025 self.transaction(|tx| async {
1026 let tx = tx;
1027 let signup = signup::Entity::find()
1028 .filter(
1029 signup::Column::EmailAddress
1030 .eq(invite.email_address.as_str())
1031 .and(
1032 signup::Column::EmailConfirmationCode
1033 .eq(invite.email_confirmation_code.as_str()),
1034 ),
1035 )
1036 .one(&*tx)
1037 .await?
1038 .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
1039
1040 if signup.user_id.is_some() {
1041 return Ok(None);
1042 }
1043
1044 let user = user::Entity::insert(user::ActiveModel {
1045 email_address: ActiveValue::set(Some(invite.email_address.clone())),
1046 github_login: ActiveValue::set(user.github_login.clone()),
1047 github_user_id: ActiveValue::set(Some(user.github_user_id)),
1048 admin: ActiveValue::set(false),
1049 invite_count: ActiveValue::set(user.invite_count),
1050 invite_code: ActiveValue::set(Some(random_invite_code())),
1051 metrics_id: ActiveValue::set(Uuid::new_v4()),
1052 ..Default::default()
1053 })
1054 .on_conflict(
1055 OnConflict::column(user::Column::GithubLogin)
1056 .update_columns([
1057 user::Column::EmailAddress,
1058 user::Column::GithubUserId,
1059 user::Column::Admin,
1060 ])
1061 .to_owned(),
1062 )
1063 .exec_with_returning(&*tx)
1064 .await?;
1065
1066 let mut signup = signup.into_active_model();
1067 signup.user_id = ActiveValue::set(Some(user.id));
1068 let signup = signup.update(&*tx).await?;
1069
1070 if let Some(inviting_user_id) = signup.inviting_user_id {
1071 let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1072 (inviting_user_id, user.id, true)
1073 } else {
1074 (user.id, inviting_user_id, false)
1075 };
1076
1077 contact::Entity::insert(contact::ActiveModel {
1078 user_id_a: ActiveValue::set(user_id_a),
1079 user_id_b: ActiveValue::set(user_id_b),
1080 a_to_b: ActiveValue::set(a_to_b),
1081 should_notify: ActiveValue::set(true),
1082 accepted: ActiveValue::set(true),
1083 ..Default::default()
1084 })
1085 .on_conflict(OnConflict::new().do_nothing().to_owned())
1086 .exec_without_returning(&*tx)
1087 .await?;
1088 }
1089
1090 Ok(Some(NewUserResult {
1091 user_id: user.id,
1092 metrics_id: user.metrics_id.to_string(),
1093 inviting_user_id: signup.inviting_user_id,
1094 signup_device_id: signup.device_id,
1095 }))
1096 })
1097 .await
1098 }
1099
1100 pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1101 self.transaction(|tx| async move {
1102 if count > 0 {
1103 user::Entity::update_many()
1104 .filter(
1105 user::Column::Id
1106 .eq(id)
1107 .and(user::Column::InviteCode.is_null()),
1108 )
1109 .set(user::ActiveModel {
1110 invite_code: ActiveValue::set(Some(random_invite_code())),
1111 ..Default::default()
1112 })
1113 .exec(&*tx)
1114 .await?;
1115 }
1116
1117 user::Entity::update_many()
1118 .filter(user::Column::Id.eq(id))
1119 .set(user::ActiveModel {
1120 invite_count: ActiveValue::set(count),
1121 ..Default::default()
1122 })
1123 .exec(&*tx)
1124 .await?;
1125 Ok(())
1126 })
1127 .await
1128 }
1129
1130 pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1131 self.transaction(|tx| async move {
1132 match user::Entity::find_by_id(id).one(&*tx).await? {
1133 Some(user) if user.invite_code.is_some() => {
1134 Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1135 }
1136 _ => Ok(None),
1137 }
1138 })
1139 .await
1140 }
1141
1142 pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1143 self.transaction(|tx| async move {
1144 user::Entity::find()
1145 .filter(user::Column::InviteCode.eq(code))
1146 .one(&*tx)
1147 .await?
1148 .ok_or_else(|| {
1149 Error::Http(
1150 StatusCode::NOT_FOUND,
1151 "that invite code does not exist".to_string(),
1152 )
1153 })
1154 })
1155 .await
1156 }
1157
1158 // rooms
1159
1160 pub async fn incoming_call_for_user(
1161 &self,
1162 user_id: UserId,
1163 ) -> Result<Option<proto::IncomingCall>> {
1164 self.transaction(|tx| async move {
1165 let pending_participant = room_participant::Entity::find()
1166 .filter(
1167 room_participant::Column::UserId
1168 .eq(user_id)
1169 .and(room_participant::Column::AnsweringConnectionId.is_null()),
1170 )
1171 .one(&*tx)
1172 .await?;
1173
1174 if let Some(pending_participant) = pending_participant {
1175 let room = self.get_room(pending_participant.room_id, &tx).await?;
1176 Ok(Self::build_incoming_call(&room, user_id))
1177 } else {
1178 Ok(None)
1179 }
1180 })
1181 .await
1182 }
1183
1184 pub async fn create_room(
1185 &self,
1186 user_id: UserId,
1187 connection: ConnectionId,
1188 live_kit_room: &str,
1189 ) -> Result<proto::Room> {
1190 self.transaction(|tx| async move {
1191 let room = room::ActiveModel {
1192 live_kit_room: ActiveValue::set(live_kit_room.into()),
1193 ..Default::default()
1194 }
1195 .insert(&*tx)
1196 .await?;
1197 room_participant::ActiveModel {
1198 room_id: ActiveValue::set(room.id),
1199 user_id: ActiveValue::set(user_id),
1200 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1201 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1202 connection.owner_id as i32,
1203 ))),
1204 answering_connection_lost: ActiveValue::set(false),
1205 calling_user_id: ActiveValue::set(user_id),
1206 calling_connection_id: ActiveValue::set(connection.id as i32),
1207 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1208 connection.owner_id as i32,
1209 ))),
1210 ..Default::default()
1211 }
1212 .insert(&*tx)
1213 .await?;
1214
1215 let room = self.get_room(room.id, &tx).await?;
1216 Ok(room)
1217 })
1218 .await
1219 }
1220
1221 pub async fn call(
1222 &self,
1223 room_id: RoomId,
1224 calling_user_id: UserId,
1225 calling_connection: ConnectionId,
1226 called_user_id: UserId,
1227 initial_project_id: Option<ProjectId>,
1228 ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1229 self.room_transaction(room_id, |tx| async move {
1230 room_participant::ActiveModel {
1231 room_id: ActiveValue::set(room_id),
1232 user_id: ActiveValue::set(called_user_id),
1233 answering_connection_lost: ActiveValue::set(false),
1234 calling_user_id: ActiveValue::set(calling_user_id),
1235 calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1236 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1237 calling_connection.owner_id as i32,
1238 ))),
1239 initial_project_id: ActiveValue::set(initial_project_id),
1240 ..Default::default()
1241 }
1242 .insert(&*tx)
1243 .await?;
1244
1245 let room = self.get_room(room_id, &tx).await?;
1246 let incoming_call = Self::build_incoming_call(&room, called_user_id)
1247 .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1248 Ok((room, incoming_call))
1249 })
1250 .await
1251 }
1252
1253 pub async fn call_failed(
1254 &self,
1255 room_id: RoomId,
1256 called_user_id: UserId,
1257 ) -> Result<RoomGuard<proto::Room>> {
1258 self.room_transaction(room_id, |tx| async move {
1259 room_participant::Entity::delete_many()
1260 .filter(
1261 room_participant::Column::RoomId
1262 .eq(room_id)
1263 .and(room_participant::Column::UserId.eq(called_user_id)),
1264 )
1265 .exec(&*tx)
1266 .await?;
1267 let room = self.get_room(room_id, &tx).await?;
1268 Ok(room)
1269 })
1270 .await
1271 }
1272
1273 pub async fn decline_call(
1274 &self,
1275 expected_room_id: Option<RoomId>,
1276 user_id: UserId,
1277 ) -> Result<Option<RoomGuard<proto::Room>>> {
1278 self.optional_room_transaction(|tx| async move {
1279 let mut filter = Condition::all()
1280 .add(room_participant::Column::UserId.eq(user_id))
1281 .add(room_participant::Column::AnsweringConnectionId.is_null());
1282 if let Some(room_id) = expected_room_id {
1283 filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1284 }
1285 let participant = room_participant::Entity::find()
1286 .filter(filter)
1287 .one(&*tx)
1288 .await?;
1289
1290 let participant = if let Some(participant) = participant {
1291 participant
1292 } else if expected_room_id.is_some() {
1293 return Err(anyhow!("could not find call to decline"))?;
1294 } else {
1295 return Ok(None);
1296 };
1297
1298 let room_id = participant.room_id;
1299 room_participant::Entity::delete(participant.into_active_model())
1300 .exec(&*tx)
1301 .await?;
1302
1303 let room = self.get_room(room_id, &tx).await?;
1304 Ok(Some((room_id, room)))
1305 })
1306 .await
1307 }
1308
1309 pub async fn cancel_call(
1310 &self,
1311 room_id: RoomId,
1312 calling_connection: ConnectionId,
1313 called_user_id: UserId,
1314 ) -> Result<RoomGuard<proto::Room>> {
1315 self.room_transaction(room_id, |tx| async move {
1316 let participant = room_participant::Entity::find()
1317 .filter(
1318 Condition::all()
1319 .add(room_participant::Column::UserId.eq(called_user_id))
1320 .add(room_participant::Column::RoomId.eq(room_id))
1321 .add(
1322 room_participant::Column::CallingConnectionId
1323 .eq(calling_connection.id as i32),
1324 )
1325 .add(
1326 room_participant::Column::CallingConnectionServerId
1327 .eq(calling_connection.owner_id as i32),
1328 )
1329 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1330 )
1331 .one(&*tx)
1332 .await?
1333 .ok_or_else(|| anyhow!("no call to cancel"))?;
1334
1335 room_participant::Entity::delete(participant.into_active_model())
1336 .exec(&*tx)
1337 .await?;
1338
1339 let room = self.get_room(room_id, &tx).await?;
1340 Ok(room)
1341 })
1342 .await
1343 }
1344
1345 pub async fn join_room(
1346 &self,
1347 room_id: RoomId,
1348 user_id: UserId,
1349 channel_id: Option<ChannelId>,
1350 connection: ConnectionId,
1351 ) -> Result<RoomGuard<JoinRoom>> {
1352 self.room_transaction(room_id, |tx| async move {
1353 if let Some(channel_id) = channel_id {
1354 channel_member::Entity::find()
1355 .filter(
1356 channel_member::Column::ChannelId
1357 .eq(channel_id)
1358 .and(channel_member::Column::UserId.eq(user_id))
1359 .and(channel_member::Column::Accepted.eq(true)),
1360 )
1361 .one(&*tx)
1362 .await?
1363 .ok_or_else(|| anyhow!("no such channel membership"))?;
1364
1365 room_participant::ActiveModel {
1366 room_id: ActiveValue::set(room_id),
1367 user_id: ActiveValue::set(user_id),
1368 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1369 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1370 connection.owner_id as i32,
1371 ))),
1372 answering_connection_lost: ActiveValue::set(false),
1373 // Redundant for the channel join use case, used for channel and call invitations
1374 calling_user_id: ActiveValue::set(user_id),
1375 calling_connection_id: ActiveValue::set(connection.id as i32),
1376 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1377 connection.owner_id as i32,
1378 ))),
1379 ..Default::default()
1380 }
1381 .insert(&*tx)
1382 .await?;
1383 } else {
1384 let result = room_participant::Entity::update_many()
1385 .filter(
1386 Condition::all()
1387 .add(room_participant::Column::RoomId.eq(room_id))
1388 .add(room_participant::Column::UserId.eq(user_id))
1389 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1390 )
1391 .set(room_participant::ActiveModel {
1392 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1393 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1394 connection.owner_id as i32,
1395 ))),
1396 answering_connection_lost: ActiveValue::set(false),
1397 ..Default::default()
1398 })
1399 .exec(&*tx)
1400 .await?;
1401 if result.rows_affected == 0 {
1402 Err(anyhow!("room does not exist or was already joined"))?;
1403 }
1404 }
1405
1406 let room = self.get_room(room_id, &tx).await?;
1407 let channel_members = if let Some(channel_id) = channel_id {
1408 self.get_channel_members_internal(channel_id, &tx).await?
1409 } else {
1410 Vec::new()
1411 };
1412 Ok(JoinRoom {
1413 room,
1414 channel_id,
1415 channel_members,
1416 })
1417 })
1418 .await
1419 }
1420
1421 pub async fn rejoin_room(
1422 &self,
1423 rejoin_room: proto::RejoinRoom,
1424 user_id: UserId,
1425 connection: ConnectionId,
1426 ) -> Result<RoomGuard<RejoinedRoom>> {
1427 let room_id = RoomId::from_proto(rejoin_room.id);
1428 self.room_transaction(room_id, |tx| async {
1429 let tx = tx;
1430 let participant_update = room_participant::Entity::update_many()
1431 .filter(
1432 Condition::all()
1433 .add(room_participant::Column::RoomId.eq(room_id))
1434 .add(room_participant::Column::UserId.eq(user_id))
1435 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1436 .add(
1437 Condition::any()
1438 .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1439 .add(
1440 room_participant::Column::AnsweringConnectionServerId
1441 .ne(connection.owner_id as i32),
1442 ),
1443 ),
1444 )
1445 .set(room_participant::ActiveModel {
1446 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1447 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1448 connection.owner_id as i32,
1449 ))),
1450 answering_connection_lost: ActiveValue::set(false),
1451 ..Default::default()
1452 })
1453 .exec(&*tx)
1454 .await?;
1455 if participant_update.rows_affected == 0 {
1456 return Err(anyhow!("room does not exist or was already joined"))?;
1457 }
1458
1459 let mut reshared_projects = Vec::new();
1460 for reshared_project in &rejoin_room.reshared_projects {
1461 let project_id = ProjectId::from_proto(reshared_project.project_id);
1462 let project = project::Entity::find_by_id(project_id)
1463 .one(&*tx)
1464 .await?
1465 .ok_or_else(|| anyhow!("project does not exist"))?;
1466 if project.host_user_id != user_id {
1467 return Err(anyhow!("no such project"))?;
1468 }
1469
1470 let mut collaborators = project
1471 .find_related(project_collaborator::Entity)
1472 .all(&*tx)
1473 .await?;
1474 let host_ix = collaborators
1475 .iter()
1476 .position(|collaborator| {
1477 collaborator.user_id == user_id && collaborator.is_host
1478 })
1479 .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1480 let host = collaborators.swap_remove(host_ix);
1481 let old_connection_id = host.connection();
1482
1483 project::Entity::update(project::ActiveModel {
1484 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1485 host_connection_server_id: ActiveValue::set(Some(ServerId(
1486 connection.owner_id as i32,
1487 ))),
1488 ..project.into_active_model()
1489 })
1490 .exec(&*tx)
1491 .await?;
1492 project_collaborator::Entity::update(project_collaborator::ActiveModel {
1493 connection_id: ActiveValue::set(connection.id as i32),
1494 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1495 ..host.into_active_model()
1496 })
1497 .exec(&*tx)
1498 .await?;
1499
1500 self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1501 .await?;
1502
1503 reshared_projects.push(ResharedProject {
1504 id: project_id,
1505 old_connection_id,
1506 collaborators: collaborators
1507 .iter()
1508 .map(|collaborator| ProjectCollaborator {
1509 connection_id: collaborator.connection(),
1510 user_id: collaborator.user_id,
1511 replica_id: collaborator.replica_id,
1512 is_host: collaborator.is_host,
1513 })
1514 .collect(),
1515 worktrees: reshared_project.worktrees.clone(),
1516 });
1517 }
1518
1519 project::Entity::delete_many()
1520 .filter(
1521 Condition::all()
1522 .add(project::Column::RoomId.eq(room_id))
1523 .add(project::Column::HostUserId.eq(user_id))
1524 .add(
1525 project::Column::Id
1526 .is_not_in(reshared_projects.iter().map(|project| project.id)),
1527 ),
1528 )
1529 .exec(&*tx)
1530 .await?;
1531
1532 let mut rejoined_projects = Vec::new();
1533 for rejoined_project in &rejoin_room.rejoined_projects {
1534 let project_id = ProjectId::from_proto(rejoined_project.id);
1535 let Some(project) = project::Entity::find_by_id(project_id)
1536 .one(&*tx)
1537 .await? else { continue };
1538
1539 let mut worktrees = Vec::new();
1540 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1541 for db_worktree in db_worktrees {
1542 let mut worktree = RejoinedWorktree {
1543 id: db_worktree.id as u64,
1544 abs_path: db_worktree.abs_path,
1545 root_name: db_worktree.root_name,
1546 visible: db_worktree.visible,
1547 updated_entries: Default::default(),
1548 removed_entries: Default::default(),
1549 updated_repositories: Default::default(),
1550 removed_repositories: Default::default(),
1551 diagnostic_summaries: Default::default(),
1552 settings_files: Default::default(),
1553 scan_id: db_worktree.scan_id as u64,
1554 completed_scan_id: db_worktree.completed_scan_id as u64,
1555 };
1556
1557 let rejoined_worktree = rejoined_project
1558 .worktrees
1559 .iter()
1560 .find(|worktree| worktree.id == db_worktree.id as u64);
1561
1562 // File entries
1563 {
1564 let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1565 worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1566 } else {
1567 worktree_entry::Column::IsDeleted.eq(false)
1568 };
1569
1570 let mut db_entries = worktree_entry::Entity::find()
1571 .filter(
1572 Condition::all()
1573 .add(worktree_entry::Column::ProjectId.eq(project.id))
1574 .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1575 .add(entry_filter),
1576 )
1577 .stream(&*tx)
1578 .await?;
1579
1580 while let Some(db_entry) = db_entries.next().await {
1581 let db_entry = db_entry?;
1582 if db_entry.is_deleted {
1583 worktree.removed_entries.push(db_entry.id as u64);
1584 } else {
1585 worktree.updated_entries.push(proto::Entry {
1586 id: db_entry.id as u64,
1587 is_dir: db_entry.is_dir,
1588 path: db_entry.path,
1589 inode: db_entry.inode as u64,
1590 mtime: Some(proto::Timestamp {
1591 seconds: db_entry.mtime_seconds as u64,
1592 nanos: db_entry.mtime_nanos as u32,
1593 }),
1594 is_symlink: db_entry.is_symlink,
1595 is_ignored: db_entry.is_ignored,
1596 is_external: db_entry.is_external,
1597 git_status: db_entry.git_status.map(|status| status as i32),
1598 });
1599 }
1600 }
1601 }
1602
1603 // Repository Entries
1604 {
1605 let repository_entry_filter =
1606 if let Some(rejoined_worktree) = rejoined_worktree {
1607 worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
1608 } else {
1609 worktree_repository::Column::IsDeleted.eq(false)
1610 };
1611
1612 let mut db_repositories = worktree_repository::Entity::find()
1613 .filter(
1614 Condition::all()
1615 .add(worktree_repository::Column::ProjectId.eq(project.id))
1616 .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
1617 .add(repository_entry_filter),
1618 )
1619 .stream(&*tx)
1620 .await?;
1621
1622 while let Some(db_repository) = db_repositories.next().await {
1623 let db_repository = db_repository?;
1624 if db_repository.is_deleted {
1625 worktree
1626 .removed_repositories
1627 .push(db_repository.work_directory_id as u64);
1628 } else {
1629 worktree.updated_repositories.push(proto::RepositoryEntry {
1630 work_directory_id: db_repository.work_directory_id as u64,
1631 branch: db_repository.branch,
1632 });
1633 }
1634 }
1635 }
1636
1637 worktrees.push(worktree);
1638 }
1639
1640 let language_servers = project
1641 .find_related(language_server::Entity)
1642 .all(&*tx)
1643 .await?
1644 .into_iter()
1645 .map(|language_server| proto::LanguageServer {
1646 id: language_server.id as u64,
1647 name: language_server.name,
1648 })
1649 .collect::<Vec<_>>();
1650
1651 {
1652 let mut db_settings_files = worktree_settings_file::Entity::find()
1653 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
1654 .stream(&*tx)
1655 .await?;
1656 while let Some(db_settings_file) = db_settings_files.next().await {
1657 let db_settings_file = db_settings_file?;
1658 if let Some(worktree) = worktrees
1659 .iter_mut()
1660 .find(|w| w.id == db_settings_file.worktree_id as u64)
1661 {
1662 worktree.settings_files.push(WorktreeSettingsFile {
1663 path: db_settings_file.path,
1664 content: db_settings_file.content,
1665 });
1666 }
1667 }
1668 }
1669
1670 let mut collaborators = project
1671 .find_related(project_collaborator::Entity)
1672 .all(&*tx)
1673 .await?;
1674 let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1675 .iter()
1676 .position(|collaborator| collaborator.user_id == user_id)
1677 {
1678 collaborators.swap_remove(self_collaborator_ix)
1679 } else {
1680 continue;
1681 };
1682 let old_connection_id = self_collaborator.connection();
1683 project_collaborator::Entity::update(project_collaborator::ActiveModel {
1684 connection_id: ActiveValue::set(connection.id as i32),
1685 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1686 ..self_collaborator.into_active_model()
1687 })
1688 .exec(&*tx)
1689 .await?;
1690
1691 let collaborators = collaborators
1692 .into_iter()
1693 .map(|collaborator| ProjectCollaborator {
1694 connection_id: collaborator.connection(),
1695 user_id: collaborator.user_id,
1696 replica_id: collaborator.replica_id,
1697 is_host: collaborator.is_host,
1698 })
1699 .collect::<Vec<_>>();
1700
1701 rejoined_projects.push(RejoinedProject {
1702 id: project_id,
1703 old_connection_id,
1704 collaborators,
1705 worktrees,
1706 language_servers,
1707 });
1708 }
1709
1710 let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
1711
1712 let channel_members = if let Some(channel_id) = channel_id {
1713 self.get_channel_members_internal(channel_id, &tx).await?
1714 } else {
1715 Vec::new()
1716 };
1717
1718 Ok(RejoinedRoom {
1719 room,
1720 channel_id,
1721 channel_members,
1722 rejoined_projects,
1723 reshared_projects,
1724 })
1725 })
1726 .await
1727 }
1728
1729 pub async fn leave_room(
1730 &self,
1731 connection: ConnectionId,
1732 ) -> Result<Option<RoomGuard<LeftRoom>>> {
1733 self.optional_room_transaction(|tx| async move {
1734 let leaving_participant = room_participant::Entity::find()
1735 .filter(
1736 Condition::all()
1737 .add(
1738 room_participant::Column::AnsweringConnectionId
1739 .eq(connection.id as i32),
1740 )
1741 .add(
1742 room_participant::Column::AnsweringConnectionServerId
1743 .eq(connection.owner_id as i32),
1744 ),
1745 )
1746 .one(&*tx)
1747 .await?;
1748
1749 if let Some(leaving_participant) = leaving_participant {
1750 // Leave room.
1751 let room_id = leaving_participant.room_id;
1752 room_participant::Entity::delete_by_id(leaving_participant.id)
1753 .exec(&*tx)
1754 .await?;
1755
1756 // Cancel pending calls initiated by the leaving user.
1757 let called_participants = room_participant::Entity::find()
1758 .filter(
1759 Condition::all()
1760 .add(
1761 room_participant::Column::CallingUserId
1762 .eq(leaving_participant.user_id),
1763 )
1764 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1765 )
1766 .all(&*tx)
1767 .await?;
1768 room_participant::Entity::delete_many()
1769 .filter(
1770 room_participant::Column::Id
1771 .is_in(called_participants.iter().map(|participant| participant.id)),
1772 )
1773 .exec(&*tx)
1774 .await?;
1775 let canceled_calls_to_user_ids = called_participants
1776 .into_iter()
1777 .map(|participant| participant.user_id)
1778 .collect();
1779
1780 // Detect left projects.
1781 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1782 enum QueryProjectIds {
1783 ProjectId,
1784 }
1785 let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1786 .select_only()
1787 .column_as(
1788 project_collaborator::Column::ProjectId,
1789 QueryProjectIds::ProjectId,
1790 )
1791 .filter(
1792 Condition::all()
1793 .add(
1794 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1795 )
1796 .add(
1797 project_collaborator::Column::ConnectionServerId
1798 .eq(connection.owner_id as i32),
1799 ),
1800 )
1801 .into_values::<_, QueryProjectIds>()
1802 .all(&*tx)
1803 .await?;
1804 let mut left_projects = HashMap::default();
1805 let mut collaborators = project_collaborator::Entity::find()
1806 .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1807 .stream(&*tx)
1808 .await?;
1809 while let Some(collaborator) = collaborators.next().await {
1810 let collaborator = collaborator?;
1811 let left_project =
1812 left_projects
1813 .entry(collaborator.project_id)
1814 .or_insert(LeftProject {
1815 id: collaborator.project_id,
1816 host_user_id: Default::default(),
1817 connection_ids: Default::default(),
1818 host_connection_id: Default::default(),
1819 });
1820
1821 let collaborator_connection_id = collaborator.connection();
1822 if collaborator_connection_id != connection {
1823 left_project.connection_ids.push(collaborator_connection_id);
1824 }
1825
1826 if collaborator.is_host {
1827 left_project.host_user_id = collaborator.user_id;
1828 left_project.host_connection_id = collaborator_connection_id;
1829 }
1830 }
1831 drop(collaborators);
1832
1833 // Leave projects.
1834 project_collaborator::Entity::delete_many()
1835 .filter(
1836 Condition::all()
1837 .add(
1838 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1839 )
1840 .add(
1841 project_collaborator::Column::ConnectionServerId
1842 .eq(connection.owner_id as i32),
1843 ),
1844 )
1845 .exec(&*tx)
1846 .await?;
1847
1848 // Unshare projects.
1849 project::Entity::delete_many()
1850 .filter(
1851 Condition::all()
1852 .add(project::Column::RoomId.eq(room_id))
1853 .add(project::Column::HostConnectionId.eq(connection.id as i32))
1854 .add(
1855 project::Column::HostConnectionServerId
1856 .eq(connection.owner_id as i32),
1857 ),
1858 )
1859 .exec(&*tx)
1860 .await?;
1861
1862 let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
1863 let deleted = if room.participants.is_empty() {
1864 let result = room::Entity::delete_by_id(room_id)
1865 .filter(room::Column::ChannelId.is_null())
1866 .exec(&*tx)
1867 .await?;
1868 result.rows_affected > 0
1869 } else {
1870 false
1871 };
1872
1873 let channel_members = if let Some(channel_id) = channel_id {
1874 self.get_channel_members_internal(channel_id, &tx).await?
1875 } else {
1876 Vec::new()
1877 };
1878 let left_room = LeftRoom {
1879 room,
1880 channel_id,
1881 channel_members,
1882 left_projects,
1883 canceled_calls_to_user_ids,
1884 deleted,
1885 };
1886
1887 if left_room.room.participants.is_empty() {
1888 self.rooms.remove(&room_id);
1889 }
1890
1891 Ok(Some((room_id, left_room)))
1892 } else {
1893 Ok(None)
1894 }
1895 })
1896 .await
1897 }
1898
1899 pub async fn follow(
1900 &self,
1901 project_id: ProjectId,
1902 leader_connection: ConnectionId,
1903 follower_connection: ConnectionId,
1904 ) -> Result<RoomGuard<proto::Room>> {
1905 let room_id = self.room_id_for_project(project_id).await?;
1906 self.room_transaction(room_id, |tx| async move {
1907 follower::ActiveModel {
1908 room_id: ActiveValue::set(room_id),
1909 project_id: ActiveValue::set(project_id),
1910 leader_connection_server_id: ActiveValue::set(ServerId(
1911 leader_connection.owner_id as i32,
1912 )),
1913 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1914 follower_connection_server_id: ActiveValue::set(ServerId(
1915 follower_connection.owner_id as i32,
1916 )),
1917 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1918 ..Default::default()
1919 }
1920 .insert(&*tx)
1921 .await?;
1922
1923 let room = self.get_room(room_id, &*tx).await?;
1924 Ok(room)
1925 })
1926 .await
1927 }
1928
1929 pub async fn unfollow(
1930 &self,
1931 project_id: ProjectId,
1932 leader_connection: ConnectionId,
1933 follower_connection: ConnectionId,
1934 ) -> Result<RoomGuard<proto::Room>> {
1935 let room_id = self.room_id_for_project(project_id).await?;
1936 self.room_transaction(room_id, |tx| async move {
1937 follower::Entity::delete_many()
1938 .filter(
1939 Condition::all()
1940 .add(follower::Column::ProjectId.eq(project_id))
1941 .add(
1942 follower::Column::LeaderConnectionServerId
1943 .eq(leader_connection.owner_id),
1944 )
1945 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1946 .add(
1947 follower::Column::FollowerConnectionServerId
1948 .eq(follower_connection.owner_id),
1949 )
1950 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1951 )
1952 .exec(&*tx)
1953 .await?;
1954
1955 let room = self.get_room(room_id, &*tx).await?;
1956 Ok(room)
1957 })
1958 .await
1959 }
1960
1961 pub async fn update_room_participant_location(
1962 &self,
1963 room_id: RoomId,
1964 connection: ConnectionId,
1965 location: proto::ParticipantLocation,
1966 ) -> Result<RoomGuard<proto::Room>> {
1967 self.room_transaction(room_id, |tx| async {
1968 let tx = tx;
1969 let location_kind;
1970 let location_project_id;
1971 match location
1972 .variant
1973 .as_ref()
1974 .ok_or_else(|| anyhow!("invalid location"))?
1975 {
1976 proto::participant_location::Variant::SharedProject(project) => {
1977 location_kind = 0;
1978 location_project_id = Some(ProjectId::from_proto(project.id));
1979 }
1980 proto::participant_location::Variant::UnsharedProject(_) => {
1981 location_kind = 1;
1982 location_project_id = None;
1983 }
1984 proto::participant_location::Variant::External(_) => {
1985 location_kind = 2;
1986 location_project_id = None;
1987 }
1988 }
1989
1990 let result = room_participant::Entity::update_many()
1991 .filter(
1992 Condition::all()
1993 .add(room_participant::Column::RoomId.eq(room_id))
1994 .add(
1995 room_participant::Column::AnsweringConnectionId
1996 .eq(connection.id as i32),
1997 )
1998 .add(
1999 room_participant::Column::AnsweringConnectionServerId
2000 .eq(connection.owner_id as i32),
2001 ),
2002 )
2003 .set(room_participant::ActiveModel {
2004 location_kind: ActiveValue::set(Some(location_kind)),
2005 location_project_id: ActiveValue::set(location_project_id),
2006 ..Default::default()
2007 })
2008 .exec(&*tx)
2009 .await?;
2010
2011 if result.rows_affected == 1 {
2012 let room = self.get_room(room_id, &tx).await?;
2013 Ok(room)
2014 } else {
2015 Err(anyhow!("could not update room participant location"))?
2016 }
2017 })
2018 .await
2019 }
2020
2021 pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
2022 self.transaction(|tx| async move {
2023 let participant = room_participant::Entity::find()
2024 .filter(
2025 Condition::all()
2026 .add(
2027 room_participant::Column::AnsweringConnectionId
2028 .eq(connection.id as i32),
2029 )
2030 .add(
2031 room_participant::Column::AnsweringConnectionServerId
2032 .eq(connection.owner_id as i32),
2033 ),
2034 )
2035 .one(&*tx)
2036 .await?
2037 .ok_or_else(|| anyhow!("not a participant in any room"))?;
2038
2039 room_participant::Entity::update(room_participant::ActiveModel {
2040 answering_connection_lost: ActiveValue::set(true),
2041 ..participant.into_active_model()
2042 })
2043 .exec(&*tx)
2044 .await?;
2045
2046 Ok(())
2047 })
2048 .await
2049 }
2050
2051 fn build_incoming_call(
2052 room: &proto::Room,
2053 called_user_id: UserId,
2054 ) -> Option<proto::IncomingCall> {
2055 let pending_participant = room
2056 .pending_participants
2057 .iter()
2058 .find(|participant| participant.user_id == called_user_id.to_proto())?;
2059
2060 Some(proto::IncomingCall {
2061 room_id: room.id,
2062 calling_user_id: pending_participant.calling_user_id,
2063 participant_user_ids: room
2064 .participants
2065 .iter()
2066 .map(|participant| participant.user_id)
2067 .collect(),
2068 initial_project: room.participants.iter().find_map(|participant| {
2069 let initial_project_id = pending_participant.initial_project_id?;
2070 participant
2071 .projects
2072 .iter()
2073 .find(|project| project.id == initial_project_id)
2074 .cloned()
2075 }),
2076 })
2077 }
2078 async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
2079 let (_, room) = self.get_channel_room(room_id, tx).await?;
2080 Ok(room)
2081 }
2082
2083 async fn get_channel_room(
2084 &self,
2085 room_id: RoomId,
2086 tx: &DatabaseTransaction,
2087 ) -> Result<(Option<ChannelId>, proto::Room)> {
2088 let db_room = room::Entity::find_by_id(room_id)
2089 .one(tx)
2090 .await?
2091 .ok_or_else(|| anyhow!("could not find room"))?;
2092
2093 let mut db_participants = db_room
2094 .find_related(room_participant::Entity)
2095 .stream(tx)
2096 .await?;
2097 let mut participants = HashMap::default();
2098 let mut pending_participants = Vec::new();
2099 while let Some(db_participant) = db_participants.next().await {
2100 let db_participant = db_participant?;
2101 if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
2102 .answering_connection_id
2103 .zip(db_participant.answering_connection_server_id)
2104 {
2105 let location = match (
2106 db_participant.location_kind,
2107 db_participant.location_project_id,
2108 ) {
2109 (Some(0), Some(project_id)) => {
2110 Some(proto::participant_location::Variant::SharedProject(
2111 proto::participant_location::SharedProject {
2112 id: project_id.to_proto(),
2113 },
2114 ))
2115 }
2116 (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
2117 Default::default(),
2118 )),
2119 _ => Some(proto::participant_location::Variant::External(
2120 Default::default(),
2121 )),
2122 };
2123
2124 let answering_connection = ConnectionId {
2125 owner_id: answering_connection_server_id.0 as u32,
2126 id: answering_connection_id as u32,
2127 };
2128 participants.insert(
2129 answering_connection,
2130 proto::Participant {
2131 user_id: db_participant.user_id.to_proto(),
2132 peer_id: Some(answering_connection.into()),
2133 projects: Default::default(),
2134 location: Some(proto::ParticipantLocation { variant: location }),
2135 },
2136 );
2137 } else {
2138 pending_participants.push(proto::PendingParticipant {
2139 user_id: db_participant.user_id.to_proto(),
2140 calling_user_id: db_participant.calling_user_id.to_proto(),
2141 initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
2142 });
2143 }
2144 }
2145 drop(db_participants);
2146
2147 let mut db_projects = db_room
2148 .find_related(project::Entity)
2149 .find_with_related(worktree::Entity)
2150 .stream(tx)
2151 .await?;
2152
2153 while let Some(row) = db_projects.next().await {
2154 let (db_project, db_worktree) = row?;
2155 let host_connection = db_project.host_connection()?;
2156 if let Some(participant) = participants.get_mut(&host_connection) {
2157 let project = if let Some(project) = participant
2158 .projects
2159 .iter_mut()
2160 .find(|project| project.id == db_project.id.to_proto())
2161 {
2162 project
2163 } else {
2164 participant.projects.push(proto::ParticipantProject {
2165 id: db_project.id.to_proto(),
2166 worktree_root_names: Default::default(),
2167 });
2168 participant.projects.last_mut().unwrap()
2169 };
2170
2171 if let Some(db_worktree) = db_worktree {
2172 if db_worktree.visible {
2173 project.worktree_root_names.push(db_worktree.root_name);
2174 }
2175 }
2176 }
2177 }
2178 drop(db_projects);
2179
2180 let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
2181 let mut followers = Vec::new();
2182 while let Some(db_follower) = db_followers.next().await {
2183 let db_follower = db_follower?;
2184 followers.push(proto::Follower {
2185 leader_id: Some(db_follower.leader_connection().into()),
2186 follower_id: Some(db_follower.follower_connection().into()),
2187 project_id: db_follower.project_id.to_proto(),
2188 });
2189 }
2190
2191 Ok((
2192 db_room.channel_id,
2193 proto::Room {
2194 id: db_room.id.to_proto(),
2195 live_kit_room: db_room.live_kit_room,
2196 participants: participants.into_values().collect(),
2197 pending_participants,
2198 followers,
2199 },
2200 ))
2201 }
2202
2203 async fn get_channel_members_for_room(
2204 &self,
2205 room_id: RoomId,
2206 tx: &DatabaseTransaction,
2207 ) -> Result<Vec<UserId>> {
2208 let db_room = room::Model {
2209 id: room_id,
2210 ..Default::default()
2211 };
2212
2213 let channel_users =
2214 if let Some(channel) = db_room.find_related(channel::Entity).one(tx).await? {
2215 self.get_channel_members_internal(channel.id, tx).await?
2216 } else {
2217 Vec::new()
2218 };
2219
2220 Ok(channel_users)
2221 }
2222
2223 // projects
2224
2225 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
2226 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2227 enum QueryAs {
2228 Count,
2229 }
2230
2231 self.transaction(|tx| async move {
2232 Ok(project::Entity::find()
2233 .select_only()
2234 .column_as(project::Column::Id.count(), QueryAs::Count)
2235 .inner_join(user::Entity)
2236 .filter(user::Column::Admin.eq(false))
2237 .into_values::<_, QueryAs>()
2238 .one(&*tx)
2239 .await?
2240 .unwrap_or(0i64) as usize)
2241 })
2242 .await
2243 }
2244
2245 pub async fn share_project(
2246 &self,
2247 room_id: RoomId,
2248 connection: ConnectionId,
2249 worktrees: &[proto::WorktreeMetadata],
2250 ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
2251 self.room_transaction(room_id, |tx| async move {
2252 let participant = room_participant::Entity::find()
2253 .filter(
2254 Condition::all()
2255 .add(
2256 room_participant::Column::AnsweringConnectionId
2257 .eq(connection.id as i32),
2258 )
2259 .add(
2260 room_participant::Column::AnsweringConnectionServerId
2261 .eq(connection.owner_id as i32),
2262 ),
2263 )
2264 .one(&*tx)
2265 .await?
2266 .ok_or_else(|| anyhow!("could not find participant"))?;
2267 if participant.room_id != room_id {
2268 return Err(anyhow!("shared project on unexpected room"))?;
2269 }
2270
2271 let project = project::ActiveModel {
2272 room_id: ActiveValue::set(participant.room_id),
2273 host_user_id: ActiveValue::set(participant.user_id),
2274 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
2275 host_connection_server_id: ActiveValue::set(Some(ServerId(
2276 connection.owner_id as i32,
2277 ))),
2278 ..Default::default()
2279 }
2280 .insert(&*tx)
2281 .await?;
2282
2283 if !worktrees.is_empty() {
2284 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2285 worktree::ActiveModel {
2286 id: ActiveValue::set(worktree.id as i64),
2287 project_id: ActiveValue::set(project.id),
2288 abs_path: ActiveValue::set(worktree.abs_path.clone()),
2289 root_name: ActiveValue::set(worktree.root_name.clone()),
2290 visible: ActiveValue::set(worktree.visible),
2291 scan_id: ActiveValue::set(0),
2292 completed_scan_id: ActiveValue::set(0),
2293 }
2294 }))
2295 .exec(&*tx)
2296 .await?;
2297 }
2298
2299 project_collaborator::ActiveModel {
2300 project_id: ActiveValue::set(project.id),
2301 connection_id: ActiveValue::set(connection.id as i32),
2302 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2303 user_id: ActiveValue::set(participant.user_id),
2304 replica_id: ActiveValue::set(ReplicaId(0)),
2305 is_host: ActiveValue::set(true),
2306 ..Default::default()
2307 }
2308 .insert(&*tx)
2309 .await?;
2310
2311 let room = self.get_room(room_id, &tx).await?;
2312 Ok((project.id, room))
2313 })
2314 .await
2315 }
2316
2317 pub async fn unshare_project(
2318 &self,
2319 project_id: ProjectId,
2320 connection: ConnectionId,
2321 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2322 let room_id = self.room_id_for_project(project_id).await?;
2323 self.room_transaction(room_id, |tx| async move {
2324 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2325
2326 let project = project::Entity::find_by_id(project_id)
2327 .one(&*tx)
2328 .await?
2329 .ok_or_else(|| anyhow!("project not found"))?;
2330 if project.host_connection()? == connection {
2331 project::Entity::delete(project.into_active_model())
2332 .exec(&*tx)
2333 .await?;
2334 let room = self.get_room(room_id, &tx).await?;
2335 Ok((room, guest_connection_ids))
2336 } else {
2337 Err(anyhow!("cannot unshare a project hosted by another user"))?
2338 }
2339 })
2340 .await
2341 }
2342
2343 pub async fn update_project(
2344 &self,
2345 project_id: ProjectId,
2346 connection: ConnectionId,
2347 worktrees: &[proto::WorktreeMetadata],
2348 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2349 let room_id = self.room_id_for_project(project_id).await?;
2350 self.room_transaction(room_id, |tx| async move {
2351 let project = project::Entity::find_by_id(project_id)
2352 .filter(
2353 Condition::all()
2354 .add(project::Column::HostConnectionId.eq(connection.id as i32))
2355 .add(
2356 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2357 ),
2358 )
2359 .one(&*tx)
2360 .await?
2361 .ok_or_else(|| anyhow!("no such project"))?;
2362
2363 self.update_project_worktrees(project.id, worktrees, &tx)
2364 .await?;
2365
2366 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2367 let room = self.get_room(project.room_id, &tx).await?;
2368 Ok((room, guest_connection_ids))
2369 })
2370 .await
2371 }
2372
2373 async fn update_project_worktrees(
2374 &self,
2375 project_id: ProjectId,
2376 worktrees: &[proto::WorktreeMetadata],
2377 tx: &DatabaseTransaction,
2378 ) -> Result<()> {
2379 if !worktrees.is_empty() {
2380 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2381 id: ActiveValue::set(worktree.id as i64),
2382 project_id: ActiveValue::set(project_id),
2383 abs_path: ActiveValue::set(worktree.abs_path.clone()),
2384 root_name: ActiveValue::set(worktree.root_name.clone()),
2385 visible: ActiveValue::set(worktree.visible),
2386 scan_id: ActiveValue::set(0),
2387 completed_scan_id: ActiveValue::set(0),
2388 }))
2389 .on_conflict(
2390 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2391 .update_column(worktree::Column::RootName)
2392 .to_owned(),
2393 )
2394 .exec(&*tx)
2395 .await?;
2396 }
2397
2398 worktree::Entity::delete_many()
2399 .filter(worktree::Column::ProjectId.eq(project_id).and(
2400 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2401 ))
2402 .exec(&*tx)
2403 .await?;
2404
2405 Ok(())
2406 }
2407
2408 pub async fn update_worktree(
2409 &self,
2410 update: &proto::UpdateWorktree,
2411 connection: ConnectionId,
2412 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2413 let project_id = ProjectId::from_proto(update.project_id);
2414 let worktree_id = update.worktree_id as i64;
2415 let room_id = self.room_id_for_project(project_id).await?;
2416 self.room_transaction(room_id, |tx| async move {
2417 // Ensure the update comes from the host.
2418 let _project = project::Entity::find_by_id(project_id)
2419 .filter(
2420 Condition::all()
2421 .add(project::Column::HostConnectionId.eq(connection.id as i32))
2422 .add(
2423 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2424 ),
2425 )
2426 .one(&*tx)
2427 .await?
2428 .ok_or_else(|| anyhow!("no such project"))?;
2429
2430 // Update metadata.
2431 worktree::Entity::update(worktree::ActiveModel {
2432 id: ActiveValue::set(worktree_id),
2433 project_id: ActiveValue::set(project_id),
2434 root_name: ActiveValue::set(update.root_name.clone()),
2435 scan_id: ActiveValue::set(update.scan_id as i64),
2436 completed_scan_id: if update.is_last_update {
2437 ActiveValue::set(update.scan_id as i64)
2438 } else {
2439 ActiveValue::default()
2440 },
2441 abs_path: ActiveValue::set(update.abs_path.clone()),
2442 ..Default::default()
2443 })
2444 .exec(&*tx)
2445 .await?;
2446
2447 if !update.updated_entries.is_empty() {
2448 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2449 let mtime = entry.mtime.clone().unwrap_or_default();
2450 worktree_entry::ActiveModel {
2451 project_id: ActiveValue::set(project_id),
2452 worktree_id: ActiveValue::set(worktree_id),
2453 id: ActiveValue::set(entry.id as i64),
2454 is_dir: ActiveValue::set(entry.is_dir),
2455 path: ActiveValue::set(entry.path.clone()),
2456 inode: ActiveValue::set(entry.inode as i64),
2457 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2458 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2459 is_symlink: ActiveValue::set(entry.is_symlink),
2460 is_ignored: ActiveValue::set(entry.is_ignored),
2461 is_external: ActiveValue::set(entry.is_external),
2462 git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
2463 is_deleted: ActiveValue::set(false),
2464 scan_id: ActiveValue::set(update.scan_id as i64),
2465 }
2466 }))
2467 .on_conflict(
2468 OnConflict::columns([
2469 worktree_entry::Column::ProjectId,
2470 worktree_entry::Column::WorktreeId,
2471 worktree_entry::Column::Id,
2472 ])
2473 .update_columns([
2474 worktree_entry::Column::IsDir,
2475 worktree_entry::Column::Path,
2476 worktree_entry::Column::Inode,
2477 worktree_entry::Column::MtimeSeconds,
2478 worktree_entry::Column::MtimeNanos,
2479 worktree_entry::Column::IsSymlink,
2480 worktree_entry::Column::IsIgnored,
2481 worktree_entry::Column::GitStatus,
2482 worktree_entry::Column::ScanId,
2483 ])
2484 .to_owned(),
2485 )
2486 .exec(&*tx)
2487 .await?;
2488 }
2489
2490 if !update.removed_entries.is_empty() {
2491 worktree_entry::Entity::update_many()
2492 .filter(
2493 worktree_entry::Column::ProjectId
2494 .eq(project_id)
2495 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2496 .and(
2497 worktree_entry::Column::Id
2498 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2499 ),
2500 )
2501 .set(worktree_entry::ActiveModel {
2502 is_deleted: ActiveValue::Set(true),
2503 scan_id: ActiveValue::Set(update.scan_id as i64),
2504 ..Default::default()
2505 })
2506 .exec(&*tx)
2507 .await?;
2508 }
2509
2510 if !update.updated_repositories.is_empty() {
2511 worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
2512 |repository| worktree_repository::ActiveModel {
2513 project_id: ActiveValue::set(project_id),
2514 worktree_id: ActiveValue::set(worktree_id),
2515 work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
2516 scan_id: ActiveValue::set(update.scan_id as i64),
2517 branch: ActiveValue::set(repository.branch.clone()),
2518 is_deleted: ActiveValue::set(false),
2519 },
2520 ))
2521 .on_conflict(
2522 OnConflict::columns([
2523 worktree_repository::Column::ProjectId,
2524 worktree_repository::Column::WorktreeId,
2525 worktree_repository::Column::WorkDirectoryId,
2526 ])
2527 .update_columns([
2528 worktree_repository::Column::ScanId,
2529 worktree_repository::Column::Branch,
2530 ])
2531 .to_owned(),
2532 )
2533 .exec(&*tx)
2534 .await?;
2535 }
2536
2537 if !update.removed_repositories.is_empty() {
2538 worktree_repository::Entity::update_many()
2539 .filter(
2540 worktree_repository::Column::ProjectId
2541 .eq(project_id)
2542 .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
2543 .and(
2544 worktree_repository::Column::WorkDirectoryId
2545 .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
2546 ),
2547 )
2548 .set(worktree_repository::ActiveModel {
2549 is_deleted: ActiveValue::Set(true),
2550 scan_id: ActiveValue::Set(update.scan_id as i64),
2551 ..Default::default()
2552 })
2553 .exec(&*tx)
2554 .await?;
2555 }
2556
2557 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2558 Ok(connection_ids)
2559 })
2560 .await
2561 }
2562
2563 pub async fn update_diagnostic_summary(
2564 &self,
2565 update: &proto::UpdateDiagnosticSummary,
2566 connection: ConnectionId,
2567 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2568 let project_id = ProjectId::from_proto(update.project_id);
2569 let worktree_id = update.worktree_id as i64;
2570 let room_id = self.room_id_for_project(project_id).await?;
2571 self.room_transaction(room_id, |tx| async move {
2572 let summary = update
2573 .summary
2574 .as_ref()
2575 .ok_or_else(|| anyhow!("invalid summary"))?;
2576
2577 // Ensure the update comes from the host.
2578 let project = project::Entity::find_by_id(project_id)
2579 .one(&*tx)
2580 .await?
2581 .ok_or_else(|| anyhow!("no such project"))?;
2582 if project.host_connection()? != connection {
2583 return Err(anyhow!("can't update a project hosted by someone else"))?;
2584 }
2585
2586 // Update summary.
2587 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2588 project_id: ActiveValue::set(project_id),
2589 worktree_id: ActiveValue::set(worktree_id),
2590 path: ActiveValue::set(summary.path.clone()),
2591 language_server_id: ActiveValue::set(summary.language_server_id as i64),
2592 error_count: ActiveValue::set(summary.error_count as i32),
2593 warning_count: ActiveValue::set(summary.warning_count as i32),
2594 ..Default::default()
2595 })
2596 .on_conflict(
2597 OnConflict::columns([
2598 worktree_diagnostic_summary::Column::ProjectId,
2599 worktree_diagnostic_summary::Column::WorktreeId,
2600 worktree_diagnostic_summary::Column::Path,
2601 ])
2602 .update_columns([
2603 worktree_diagnostic_summary::Column::LanguageServerId,
2604 worktree_diagnostic_summary::Column::ErrorCount,
2605 worktree_diagnostic_summary::Column::WarningCount,
2606 ])
2607 .to_owned(),
2608 )
2609 .exec(&*tx)
2610 .await?;
2611
2612 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2613 Ok(connection_ids)
2614 })
2615 .await
2616 }
2617
2618 pub async fn start_language_server(
2619 &self,
2620 update: &proto::StartLanguageServer,
2621 connection: ConnectionId,
2622 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2623 let project_id = ProjectId::from_proto(update.project_id);
2624 let room_id = self.room_id_for_project(project_id).await?;
2625 self.room_transaction(room_id, |tx| async move {
2626 let server = update
2627 .server
2628 .as_ref()
2629 .ok_or_else(|| anyhow!("invalid language server"))?;
2630
2631 // Ensure the update comes from the host.
2632 let project = project::Entity::find_by_id(project_id)
2633 .one(&*tx)
2634 .await?
2635 .ok_or_else(|| anyhow!("no such project"))?;
2636 if project.host_connection()? != connection {
2637 return Err(anyhow!("can't update a project hosted by someone else"))?;
2638 }
2639
2640 // Add the newly-started language server.
2641 language_server::Entity::insert(language_server::ActiveModel {
2642 project_id: ActiveValue::set(project_id),
2643 id: ActiveValue::set(server.id as i64),
2644 name: ActiveValue::set(server.name.clone()),
2645 ..Default::default()
2646 })
2647 .on_conflict(
2648 OnConflict::columns([
2649 language_server::Column::ProjectId,
2650 language_server::Column::Id,
2651 ])
2652 .update_column(language_server::Column::Name)
2653 .to_owned(),
2654 )
2655 .exec(&*tx)
2656 .await?;
2657
2658 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2659 Ok(connection_ids)
2660 })
2661 .await
2662 }
2663
2664 pub async fn update_worktree_settings(
2665 &self,
2666 update: &proto::UpdateWorktreeSettings,
2667 connection: ConnectionId,
2668 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2669 let project_id = ProjectId::from_proto(update.project_id);
2670 let room_id = self.room_id_for_project(project_id).await?;
2671 self.room_transaction(room_id, |tx| async move {
2672 // Ensure the update comes from the host.
2673 let project = project::Entity::find_by_id(project_id)
2674 .one(&*tx)
2675 .await?
2676 .ok_or_else(|| anyhow!("no such project"))?;
2677 if project.host_connection()? != connection {
2678 return Err(anyhow!("can't update a project hosted by someone else"))?;
2679 }
2680
2681 if let Some(content) = &update.content {
2682 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
2683 project_id: ActiveValue::Set(project_id),
2684 worktree_id: ActiveValue::Set(update.worktree_id as i64),
2685 path: ActiveValue::Set(update.path.clone()),
2686 content: ActiveValue::Set(content.clone()),
2687 })
2688 .on_conflict(
2689 OnConflict::columns([
2690 worktree_settings_file::Column::ProjectId,
2691 worktree_settings_file::Column::WorktreeId,
2692 worktree_settings_file::Column::Path,
2693 ])
2694 .update_column(worktree_settings_file::Column::Content)
2695 .to_owned(),
2696 )
2697 .exec(&*tx)
2698 .await?;
2699 } else {
2700 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
2701 project_id: ActiveValue::Set(project_id),
2702 worktree_id: ActiveValue::Set(update.worktree_id as i64),
2703 path: ActiveValue::Set(update.path.clone()),
2704 ..Default::default()
2705 })
2706 .exec(&*tx)
2707 .await?;
2708 }
2709
2710 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2711 Ok(connection_ids)
2712 })
2713 .await
2714 }
2715
2716 pub async fn join_project(
2717 &self,
2718 project_id: ProjectId,
2719 connection: ConnectionId,
2720 ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2721 let room_id = self.room_id_for_project(project_id).await?;
2722 self.room_transaction(room_id, |tx| async move {
2723 let participant = room_participant::Entity::find()
2724 .filter(
2725 Condition::all()
2726 .add(
2727 room_participant::Column::AnsweringConnectionId
2728 .eq(connection.id as i32),
2729 )
2730 .add(
2731 room_participant::Column::AnsweringConnectionServerId
2732 .eq(connection.owner_id as i32),
2733 ),
2734 )
2735 .one(&*tx)
2736 .await?
2737 .ok_or_else(|| anyhow!("must join a room first"))?;
2738
2739 let project = project::Entity::find_by_id(project_id)
2740 .one(&*tx)
2741 .await?
2742 .ok_or_else(|| anyhow!("no such project"))?;
2743 if project.room_id != participant.room_id {
2744 return Err(anyhow!("no such project"))?;
2745 }
2746
2747 let mut collaborators = project
2748 .find_related(project_collaborator::Entity)
2749 .all(&*tx)
2750 .await?;
2751 let replica_ids = collaborators
2752 .iter()
2753 .map(|c| c.replica_id)
2754 .collect::<HashSet<_>>();
2755 let mut replica_id = ReplicaId(1);
2756 while replica_ids.contains(&replica_id) {
2757 replica_id.0 += 1;
2758 }
2759 let new_collaborator = project_collaborator::ActiveModel {
2760 project_id: ActiveValue::set(project_id),
2761 connection_id: ActiveValue::set(connection.id as i32),
2762 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2763 user_id: ActiveValue::set(participant.user_id),
2764 replica_id: ActiveValue::set(replica_id),
2765 is_host: ActiveValue::set(false),
2766 ..Default::default()
2767 }
2768 .insert(&*tx)
2769 .await?;
2770 collaborators.push(new_collaborator);
2771
2772 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2773 let mut worktrees = db_worktrees
2774 .into_iter()
2775 .map(|db_worktree| {
2776 (
2777 db_worktree.id as u64,
2778 Worktree {
2779 id: db_worktree.id as u64,
2780 abs_path: db_worktree.abs_path,
2781 root_name: db_worktree.root_name,
2782 visible: db_worktree.visible,
2783 entries: Default::default(),
2784 repository_entries: Default::default(),
2785 diagnostic_summaries: Default::default(),
2786 settings_files: Default::default(),
2787 scan_id: db_worktree.scan_id as u64,
2788 completed_scan_id: db_worktree.completed_scan_id as u64,
2789 },
2790 )
2791 })
2792 .collect::<BTreeMap<_, _>>();
2793
2794 // Populate worktree entries.
2795 {
2796 let mut db_entries = worktree_entry::Entity::find()
2797 .filter(
2798 Condition::all()
2799 .add(worktree_entry::Column::ProjectId.eq(project_id))
2800 .add(worktree_entry::Column::IsDeleted.eq(false)),
2801 )
2802 .stream(&*tx)
2803 .await?;
2804 while let Some(db_entry) = db_entries.next().await {
2805 let db_entry = db_entry?;
2806 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2807 worktree.entries.push(proto::Entry {
2808 id: db_entry.id as u64,
2809 is_dir: db_entry.is_dir,
2810 path: db_entry.path,
2811 inode: db_entry.inode as u64,
2812 mtime: Some(proto::Timestamp {
2813 seconds: db_entry.mtime_seconds as u64,
2814 nanos: db_entry.mtime_nanos as u32,
2815 }),
2816 is_symlink: db_entry.is_symlink,
2817 is_ignored: db_entry.is_ignored,
2818 is_external: db_entry.is_external,
2819 git_status: db_entry.git_status.map(|status| status as i32),
2820 });
2821 }
2822 }
2823 }
2824
2825 // Populate repository entries.
2826 {
2827 let mut db_repository_entries = worktree_repository::Entity::find()
2828 .filter(
2829 Condition::all()
2830 .add(worktree_repository::Column::ProjectId.eq(project_id))
2831 .add(worktree_repository::Column::IsDeleted.eq(false)),
2832 )
2833 .stream(&*tx)
2834 .await?;
2835 while let Some(db_repository_entry) = db_repository_entries.next().await {
2836 let db_repository_entry = db_repository_entry?;
2837 if let Some(worktree) =
2838 worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
2839 {
2840 worktree.repository_entries.insert(
2841 db_repository_entry.work_directory_id as u64,
2842 proto::RepositoryEntry {
2843 work_directory_id: db_repository_entry.work_directory_id as u64,
2844 branch: db_repository_entry.branch,
2845 },
2846 );
2847 }
2848 }
2849 }
2850
2851 // Populate worktree diagnostic summaries.
2852 {
2853 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2854 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2855 .stream(&*tx)
2856 .await?;
2857 while let Some(db_summary) = db_summaries.next().await {
2858 let db_summary = db_summary?;
2859 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2860 worktree
2861 .diagnostic_summaries
2862 .push(proto::DiagnosticSummary {
2863 path: db_summary.path,
2864 language_server_id: db_summary.language_server_id as u64,
2865 error_count: db_summary.error_count as u32,
2866 warning_count: db_summary.warning_count as u32,
2867 });
2868 }
2869 }
2870 }
2871
2872 // Populate worktree settings files
2873 {
2874 let mut db_settings_files = worktree_settings_file::Entity::find()
2875 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
2876 .stream(&*tx)
2877 .await?;
2878 while let Some(db_settings_file) = db_settings_files.next().await {
2879 let db_settings_file = db_settings_file?;
2880 if let Some(worktree) =
2881 worktrees.get_mut(&(db_settings_file.worktree_id as u64))
2882 {
2883 worktree.settings_files.push(WorktreeSettingsFile {
2884 path: db_settings_file.path,
2885 content: db_settings_file.content,
2886 });
2887 }
2888 }
2889 }
2890
2891 // Populate language servers.
2892 let language_servers = project
2893 .find_related(language_server::Entity)
2894 .all(&*tx)
2895 .await?;
2896
2897 let project = Project {
2898 collaborators: collaborators
2899 .into_iter()
2900 .map(|collaborator| ProjectCollaborator {
2901 connection_id: collaborator.connection(),
2902 user_id: collaborator.user_id,
2903 replica_id: collaborator.replica_id,
2904 is_host: collaborator.is_host,
2905 })
2906 .collect(),
2907 worktrees,
2908 language_servers: language_servers
2909 .into_iter()
2910 .map(|language_server| proto::LanguageServer {
2911 id: language_server.id as u64,
2912 name: language_server.name,
2913 })
2914 .collect(),
2915 };
2916 Ok((project, replica_id as ReplicaId))
2917 })
2918 .await
2919 }
2920
2921 pub async fn leave_project(
2922 &self,
2923 project_id: ProjectId,
2924 connection: ConnectionId,
2925 ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
2926 let room_id = self.room_id_for_project(project_id).await?;
2927 self.room_transaction(room_id, |tx| async move {
2928 let result = project_collaborator::Entity::delete_many()
2929 .filter(
2930 Condition::all()
2931 .add(project_collaborator::Column::ProjectId.eq(project_id))
2932 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2933 .add(
2934 project_collaborator::Column::ConnectionServerId
2935 .eq(connection.owner_id as i32),
2936 ),
2937 )
2938 .exec(&*tx)
2939 .await?;
2940 if result.rows_affected == 0 {
2941 Err(anyhow!("not a collaborator on this project"))?;
2942 }
2943
2944 let project = project::Entity::find_by_id(project_id)
2945 .one(&*tx)
2946 .await?
2947 .ok_or_else(|| anyhow!("no such project"))?;
2948 let collaborators = project
2949 .find_related(project_collaborator::Entity)
2950 .all(&*tx)
2951 .await?;
2952 let connection_ids = collaborators
2953 .into_iter()
2954 .map(|collaborator| collaborator.connection())
2955 .collect();
2956
2957 follower::Entity::delete_many()
2958 .filter(
2959 Condition::any()
2960 .add(
2961 Condition::all()
2962 .add(follower::Column::ProjectId.eq(project_id))
2963 .add(
2964 follower::Column::LeaderConnectionServerId
2965 .eq(connection.owner_id),
2966 )
2967 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
2968 )
2969 .add(
2970 Condition::all()
2971 .add(follower::Column::ProjectId.eq(project_id))
2972 .add(
2973 follower::Column::FollowerConnectionServerId
2974 .eq(connection.owner_id),
2975 )
2976 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
2977 ),
2978 )
2979 .exec(&*tx)
2980 .await?;
2981
2982 let room = self.get_room(project.room_id, &tx).await?;
2983 let left_project = LeftProject {
2984 id: project_id,
2985 host_user_id: project.host_user_id,
2986 host_connection_id: project.host_connection()?,
2987 connection_ids,
2988 };
2989 Ok((room, left_project))
2990 })
2991 .await
2992 }
2993
2994 pub async fn project_collaborators(
2995 &self,
2996 project_id: ProjectId,
2997 connection_id: ConnectionId,
2998 ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
2999 let room_id = self.room_id_for_project(project_id).await?;
3000 self.room_transaction(room_id, |tx| async move {
3001 let collaborators = project_collaborator::Entity::find()
3002 .filter(project_collaborator::Column::ProjectId.eq(project_id))
3003 .all(&*tx)
3004 .await?
3005 .into_iter()
3006 .map(|collaborator| ProjectCollaborator {
3007 connection_id: collaborator.connection(),
3008 user_id: collaborator.user_id,
3009 replica_id: collaborator.replica_id,
3010 is_host: collaborator.is_host,
3011 })
3012 .collect::<Vec<_>>();
3013
3014 if collaborators
3015 .iter()
3016 .any(|collaborator| collaborator.connection_id == connection_id)
3017 {
3018 Ok(collaborators)
3019 } else {
3020 Err(anyhow!("no such project"))?
3021 }
3022 })
3023 .await
3024 }
3025
3026 pub async fn project_connection_ids(
3027 &self,
3028 project_id: ProjectId,
3029 connection_id: ConnectionId,
3030 ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
3031 let room_id = self.room_id_for_project(project_id).await?;
3032 self.room_transaction(room_id, |tx| async move {
3033 let mut collaborators = project_collaborator::Entity::find()
3034 .filter(project_collaborator::Column::ProjectId.eq(project_id))
3035 .stream(&*tx)
3036 .await?;
3037
3038 let mut connection_ids = HashSet::default();
3039 while let Some(collaborator) = collaborators.next().await {
3040 let collaborator = collaborator?;
3041 connection_ids.insert(collaborator.connection());
3042 }
3043
3044 if connection_ids.contains(&connection_id) {
3045 Ok(connection_ids)
3046 } else {
3047 Err(anyhow!("no such project"))?
3048 }
3049 })
3050 .await
3051 }
3052
3053 async fn project_guest_connection_ids(
3054 &self,
3055 project_id: ProjectId,
3056 tx: &DatabaseTransaction,
3057 ) -> Result<Vec<ConnectionId>> {
3058 let mut collaborators = project_collaborator::Entity::find()
3059 .filter(
3060 project_collaborator::Column::ProjectId
3061 .eq(project_id)
3062 .and(project_collaborator::Column::IsHost.eq(false)),
3063 )
3064 .stream(tx)
3065 .await?;
3066
3067 let mut guest_connection_ids = Vec::new();
3068 while let Some(collaborator) = collaborators.next().await {
3069 let collaborator = collaborator?;
3070 guest_connection_ids.push(collaborator.connection());
3071 }
3072 Ok(guest_connection_ids)
3073 }
3074
3075 async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
3076 self.transaction(|tx| async move {
3077 let project = project::Entity::find_by_id(project_id)
3078 .one(&*tx)
3079 .await?
3080 .ok_or_else(|| anyhow!("project {} not found", project_id))?;
3081 Ok(project.room_id)
3082 })
3083 .await
3084 }
3085
3086 // access tokens
3087
3088 pub async fn create_access_token(
3089 &self,
3090 user_id: UserId,
3091 access_token_hash: &str,
3092 max_access_token_count: usize,
3093 ) -> Result<AccessTokenId> {
3094 self.transaction(|tx| async {
3095 let tx = tx;
3096
3097 let token = access_token::ActiveModel {
3098 user_id: ActiveValue::set(user_id),
3099 hash: ActiveValue::set(access_token_hash.into()),
3100 ..Default::default()
3101 }
3102 .insert(&*tx)
3103 .await?;
3104
3105 access_token::Entity::delete_many()
3106 .filter(
3107 access_token::Column::Id.in_subquery(
3108 Query::select()
3109 .column(access_token::Column::Id)
3110 .from(access_token::Entity)
3111 .and_where(access_token::Column::UserId.eq(user_id))
3112 .order_by(access_token::Column::Id, sea_orm::Order::Desc)
3113 .limit(10000)
3114 .offset(max_access_token_count as u64)
3115 .to_owned(),
3116 ),
3117 )
3118 .exec(&*tx)
3119 .await?;
3120 Ok(token.id)
3121 })
3122 .await
3123 }
3124
3125 pub async fn get_access_token(
3126 &self,
3127 access_token_id: AccessTokenId,
3128 ) -> Result<access_token::Model> {
3129 self.transaction(|tx| async move {
3130 Ok(access_token::Entity::find_by_id(access_token_id)
3131 .one(&*tx)
3132 .await?
3133 .ok_or_else(|| anyhow!("no such access token"))?)
3134 })
3135 .await
3136 }
3137
3138 // channels
3139
3140 pub async fn create_root_channel(
3141 &self,
3142 name: &str,
3143 live_kit_room: &str,
3144 creator_id: UserId,
3145 ) -> Result<ChannelId> {
3146 self.create_channel(name, None, live_kit_room, creator_id)
3147 .await
3148 }
3149
3150 pub async fn create_channel(
3151 &self,
3152 name: &str,
3153 parent: Option<ChannelId>,
3154 live_kit_room: &str,
3155 creator_id: UserId,
3156 ) -> Result<ChannelId> {
3157 self.transaction(move |tx| async move {
3158 let tx = tx;
3159
3160 if let Some(parent) = parent {
3161 let channels = self.get_channel_ancestors(parent, &*tx).await?;
3162 channel_member::Entity::find()
3163 .filter(channel_member::Column::ChannelId.is_in(channels.iter().copied()))
3164 .filter(
3165 channel_member::Column::UserId
3166 .eq(creator_id)
3167 .and(channel_member::Column::Accepted.eq(true)),
3168 )
3169 .one(&*tx)
3170 .await?
3171 .ok_or_else(|| {
3172 anyhow!("User does not have the permissions to create this channel")
3173 })?;
3174 }
3175
3176 let channel = channel::ActiveModel {
3177 name: ActiveValue::Set(name.to_string()),
3178 ..Default::default()
3179 };
3180
3181 let channel = channel.insert(&*tx).await?;
3182
3183 if let Some(parent) = parent {
3184 channel_parent::ActiveModel {
3185 child_id: ActiveValue::Set(channel.id),
3186 parent_id: ActiveValue::Set(parent),
3187 }
3188 .insert(&*tx)
3189 .await?;
3190 }
3191
3192 channel_member::ActiveModel {
3193 channel_id: ActiveValue::Set(channel.id),
3194 user_id: ActiveValue::Set(creator_id),
3195 accepted: ActiveValue::Set(true),
3196 admin: ActiveValue::Set(true),
3197 ..Default::default()
3198 }
3199 .insert(&*tx)
3200 .await?;
3201
3202 room::ActiveModel {
3203 channel_id: ActiveValue::Set(Some(channel.id)),
3204 live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
3205 ..Default::default()
3206 }
3207 .insert(&*tx)
3208 .await?;
3209
3210 Ok(channel.id)
3211 })
3212 .await
3213 }
3214
3215 pub async fn remove_channel(
3216 &self,
3217 channel_id: ChannelId,
3218 user_id: UserId,
3219 ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
3220 self.transaction(move |tx| async move {
3221 let tx = tx;
3222
3223 // Check if user is an admin
3224 channel_member::Entity::find()
3225 .filter(
3226 channel_member::Column::ChannelId
3227 .eq(channel_id)
3228 .and(channel_member::Column::UserId.eq(user_id))
3229 .and(channel_member::Column::Admin.eq(true)),
3230 )
3231 .one(&*tx)
3232 .await?
3233 .ok_or_else(|| anyhow!("user is not allowed to remove this channel"))?;
3234
3235 let mut descendants = self.get_channel_descendants([channel_id], &*tx).await?;
3236
3237 // Keep channels which have another active
3238 let mut channels_to_keep = channel_parent::Entity::find()
3239 .filter(
3240 channel_parent::Column::ChildId
3241 .is_in(descendants.keys().copied().filter(|&id| id != channel_id))
3242 .and(
3243 channel_parent::Column::ParentId.is_not_in(descendants.keys().copied()),
3244 ),
3245 )
3246 .stream(&*tx)
3247 .await?;
3248
3249 while let Some(row) = channels_to_keep.next().await {
3250 let row = row?;
3251 descendants.remove(&row.child_id);
3252 }
3253
3254 drop(channels_to_keep);
3255
3256 let channels_to_remove = descendants.keys().copied().collect::<Vec<_>>();
3257
3258 let members_to_notify: Vec<UserId> = channel_member::Entity::find()
3259 .filter(channel_member::Column::ChannelId.is_in(channels_to_remove.iter().copied()))
3260 .select_only()
3261 .column(channel_member::Column::UserId)
3262 .distinct()
3263 .into_values::<_, QueryUserIds>()
3264 .all(&*tx)
3265 .await?;
3266
3267 // Channel members and parents should delete via cascade
3268 channel::Entity::delete_many()
3269 .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
3270 .exec(&*tx)
3271 .await?;
3272
3273 Ok((channels_to_remove, members_to_notify))
3274 })
3275 .await
3276 }
3277
3278 pub async fn invite_channel_member(
3279 &self,
3280 channel_id: ChannelId,
3281 invitee_id: UserId,
3282 inviter_id: UserId,
3283 is_admin: bool,
3284 ) -> Result<()> {
3285 self.transaction(move |tx| async move {
3286 let tx = tx;
3287
3288 // Check if inviter is a member
3289 channel_member::Entity::find()
3290 .filter(
3291 channel_member::Column::ChannelId
3292 .eq(channel_id)
3293 .and(channel_member::Column::UserId.eq(inviter_id))
3294 .and(channel_member::Column::Admin.eq(true)),
3295 )
3296 .one(&*tx)
3297 .await?
3298 .ok_or_else(|| {
3299 anyhow!("Inviter does not have permissions to invite the invitee")
3300 })?;
3301
3302 let channel_membership = channel_member::ActiveModel {
3303 channel_id: ActiveValue::Set(channel_id),
3304 user_id: ActiveValue::Set(invitee_id),
3305 accepted: ActiveValue::Set(false),
3306 admin: ActiveValue::Set(is_admin),
3307 ..Default::default()
3308 };
3309
3310 channel_membership.insert(&*tx).await?;
3311
3312 Ok(())
3313 })
3314 .await
3315 }
3316
3317 pub async fn respond_to_channel_invite(
3318 &self,
3319 channel_id: ChannelId,
3320 user_id: UserId,
3321 accept: bool,
3322 ) -> Result<()> {
3323 self.transaction(move |tx| async move {
3324 let tx = tx;
3325
3326 let rows_affected = if accept {
3327 channel_member::Entity::update_many()
3328 .set(channel_member::ActiveModel {
3329 accepted: ActiveValue::Set(accept),
3330 ..Default::default()
3331 })
3332 .filter(
3333 channel_member::Column::ChannelId
3334 .eq(channel_id)
3335 .and(channel_member::Column::UserId.eq(user_id))
3336 .and(channel_member::Column::Accepted.eq(false)),
3337 )
3338 .exec(&*tx)
3339 .await?
3340 .rows_affected
3341 } else {
3342 channel_member::ActiveModel {
3343 channel_id: ActiveValue::Unchanged(channel_id),
3344 user_id: ActiveValue::Unchanged(user_id),
3345 ..Default::default()
3346 }
3347 .delete(&*tx)
3348 .await?
3349 .rows_affected
3350 };
3351
3352 if rows_affected == 0 {
3353 Err(anyhow!("no such invitation"))?;
3354 }
3355
3356 Ok(())
3357 })
3358 .await
3359 }
3360
3361 pub async fn get_channel_invites(&self, user_id: UserId) -> Result<Vec<Channel>> {
3362 self.transaction(|tx| async move {
3363 let tx = tx;
3364
3365 let channel_invites = channel_member::Entity::find()
3366 .filter(
3367 channel_member::Column::UserId
3368 .eq(user_id)
3369 .and(channel_member::Column::Accepted.eq(false)),
3370 )
3371 .all(&*tx)
3372 .await?;
3373
3374 let channels = channel::Entity::find()
3375 .filter(
3376 channel::Column::Id.is_in(
3377 channel_invites
3378 .into_iter()
3379 .map(|channel_member| channel_member.channel_id),
3380 ),
3381 )
3382 .all(&*tx)
3383 .await?;
3384
3385 let channels = channels
3386 .into_iter()
3387 .map(|channel| Channel {
3388 id: channel.id,
3389 name: channel.name,
3390 parent_id: None,
3391 })
3392 .collect();
3393
3394 Ok(channels)
3395 })
3396 .await
3397 }
3398
3399 pub async fn get_channels(
3400 &self,
3401 user_id: UserId,
3402 ) -> Result<(Vec<Channel>, HashMap<ChannelId, Vec<UserId>>)> {
3403 self.transaction(|tx| async move {
3404 let tx = tx;
3405
3406 let starting_channel_ids: Vec<ChannelId> = channel_member::Entity::find()
3407 .filter(
3408 channel_member::Column::UserId
3409 .eq(user_id)
3410 .and(channel_member::Column::Accepted.eq(true)),
3411 )
3412 .select_only()
3413 .column(channel_member::Column::ChannelId)
3414 .into_values::<_, QueryChannelIds>()
3415 .all(&*tx)
3416 .await?;
3417
3418 let parents_by_child_id = self
3419 .get_channel_descendants(starting_channel_ids, &*tx)
3420 .await?;
3421
3422 let mut channels = Vec::with_capacity(parents_by_child_id.len());
3423 let mut rows = channel::Entity::find()
3424 .filter(channel::Column::Id.is_in(parents_by_child_id.keys().copied()))
3425 .stream(&*tx)
3426 .await?;
3427
3428 while let Some(row) = rows.next().await {
3429 let row = row?;
3430 channels.push(Channel {
3431 id: row.id,
3432 name: row.name,
3433 parent_id: parents_by_child_id.get(&row.id).copied().flatten(),
3434 });
3435 }
3436
3437 drop(rows);
3438
3439 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
3440 enum QueryUserIdsAndChannelIds {
3441 ChannelId,
3442 UserId,
3443 }
3444
3445 let mut participants = room_participant::Entity::find()
3446 .inner_join(room::Entity)
3447 .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
3448 .select_only()
3449 .column(room::Column::ChannelId)
3450 .column(room_participant::Column::UserId)
3451 .into_values::<_, QueryUserIdsAndChannelIds>()
3452 .stream(&*tx)
3453 .await?;
3454
3455 let mut participant_map: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
3456 while let Some(row) = participants.next().await {
3457 let row: (ChannelId, UserId) = row?;
3458 participant_map.entry(row.0).or_default().push(row.1)
3459 }
3460
3461 drop(participants);
3462
3463 Ok((channels, participant_map))
3464 })
3465 .await
3466 }
3467
3468 pub async fn get_channel_members(&self, id: ChannelId) -> Result<Vec<UserId>> {
3469 self.transaction(|tx| async move {
3470 let tx = tx;
3471 let user_ids = self.get_channel_members_internal(id, &*tx).await?;
3472 Ok(user_ids)
3473 })
3474 .await
3475 }
3476
3477 pub async fn get_channel_members_internal(
3478 &self,
3479 id: ChannelId,
3480 tx: &DatabaseTransaction,
3481 ) -> Result<Vec<UserId>> {
3482 let ancestor_ids = self.get_channel_ancestors(id, tx).await?;
3483 let user_ids = channel_member::Entity::find()
3484 .distinct()
3485 .filter(channel_member::Column::ChannelId.is_in(ancestor_ids.iter().copied()))
3486 .select_only()
3487 .column(channel_member::Column::UserId)
3488 .into_values::<_, QueryUserIds>()
3489 .all(&*tx)
3490 .await?;
3491 Ok(user_ids)
3492 }
3493
3494 async fn get_channel_ancestors(
3495 &self,
3496 id: ChannelId,
3497 tx: &DatabaseTransaction,
3498 ) -> Result<Vec<ChannelId>> {
3499 let sql = format!(
3500 r#"
3501 WITH RECURSIVE channel_tree(child_id, parent_id) AS (
3502 SELECT CAST(NULL as INTEGER) as child_id, root_ids.column1 as parent_id
3503 FROM (VALUES ({})) as root_ids
3504 UNION
3505 SELECT channel_parents.child_id, channel_parents.parent_id
3506 FROM channel_parents, channel_tree
3507 WHERE channel_parents.child_id = channel_tree.parent_id
3508 )
3509 SELECT DISTINCT channel_tree.parent_id
3510 FROM channel_tree
3511 "#,
3512 id
3513 );
3514
3515 #[derive(FromQueryResult, Debug, PartialEq)]
3516 pub struct ChannelParent {
3517 pub parent_id: ChannelId,
3518 }
3519
3520 let stmt = Statement::from_string(self.pool.get_database_backend(), sql);
3521
3522 let mut channel_ids_stream = channel_parent::Entity::find()
3523 .from_raw_sql(stmt)
3524 .into_model::<ChannelParent>()
3525 .stream(&*tx)
3526 .await?;
3527
3528 let mut channel_ids = vec![];
3529 while let Some(channel_id) = channel_ids_stream.next().await {
3530 channel_ids.push(channel_id?.parent_id);
3531 }
3532
3533 Ok(channel_ids)
3534 }
3535
3536 async fn get_channel_descendants(
3537 &self,
3538 channel_ids: impl IntoIterator<Item = ChannelId>,
3539 tx: &DatabaseTransaction,
3540 ) -> Result<HashMap<ChannelId, Option<ChannelId>>> {
3541 let mut values = String::new();
3542 for id in channel_ids {
3543 if !values.is_empty() {
3544 values.push_str(", ");
3545 }
3546 write!(&mut values, "({})", id).unwrap();
3547 }
3548
3549 if values.is_empty() {
3550 return Ok(HashMap::default());
3551 }
3552
3553 let sql = format!(
3554 r#"
3555 WITH RECURSIVE channel_tree(child_id, parent_id) AS (
3556 SELECT root_ids.column1 as child_id, CAST(NULL as INTEGER) as parent_id
3557 FROM (VALUES {}) as root_ids
3558 UNION
3559 SELECT channel_parents.child_id, channel_parents.parent_id
3560 FROM channel_parents, channel_tree
3561 WHERE channel_parents.parent_id = channel_tree.child_id
3562 )
3563 SELECT channel_tree.child_id, channel_tree.parent_id
3564 FROM channel_tree
3565 ORDER BY child_id, parent_id IS NOT NULL
3566 "#,
3567 values
3568 );
3569
3570 #[derive(FromQueryResult, Debug, PartialEq)]
3571 pub struct ChannelParent {
3572 pub child_id: ChannelId,
3573 pub parent_id: Option<ChannelId>,
3574 }
3575
3576 let stmt = Statement::from_string(self.pool.get_database_backend(), sql);
3577
3578 let mut parents_by_child_id = HashMap::default();
3579 let mut parents = channel_parent::Entity::find()
3580 .from_raw_sql(stmt)
3581 .into_model::<ChannelParent>()
3582 .stream(tx)
3583 .await?;
3584
3585 while let Some(parent) = parents.next().await {
3586 let parent = parent?;
3587 parents_by_child_id.insert(parent.child_id, parent.parent_id);
3588 }
3589
3590 Ok(parents_by_child_id)
3591 }
3592
3593 pub async fn get_channel(&self, channel_id: ChannelId) -> Result<Option<Channel>> {
3594 self.transaction(|tx| async move {
3595 let tx = tx;
3596 let channel = channel::Entity::find_by_id(channel_id).one(&*tx).await?;
3597
3598 Ok(channel.map(|channel| Channel {
3599 id: channel.id,
3600 name: channel.name,
3601 parent_id: None,
3602 }))
3603 })
3604 .await
3605 }
3606
3607 pub async fn room_id_for_channel(&self, channel_id: ChannelId) -> Result<RoomId> {
3608 self.transaction(|tx| async move {
3609 let tx = tx;
3610 let room = channel::Model {
3611 id: channel_id,
3612 ..Default::default()
3613 }
3614 .find_related(room::Entity)
3615 .one(&*tx)
3616 .await?
3617 .ok_or_else(|| anyhow!("invalid channel"))?;
3618 Ok(room.id)
3619 })
3620 .await
3621 }
3622
3623 async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
3624 where
3625 F: Send + Fn(TransactionHandle) -> Fut,
3626 Fut: Send + Future<Output = Result<T>>,
3627 {
3628 let body = async {
3629 let mut i = 0;
3630 loop {
3631 let (tx, result) = self.with_transaction(&f).await?;
3632 match result {
3633 Ok(result) => match tx.commit().await.map_err(Into::into) {
3634 Ok(()) => return Ok(result),
3635 Err(error) => {
3636 if !self.retry_on_serialization_error(&error, i).await {
3637 return Err(error);
3638 }
3639 }
3640 },
3641 Err(error) => {
3642 tx.rollback().await?;
3643 if !self.retry_on_serialization_error(&error, i).await {
3644 return Err(error);
3645 }
3646 }
3647 }
3648 i += 1;
3649 }
3650 };
3651
3652 self.run(body).await
3653 }
3654
3655 async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
3656 where
3657 F: Send + Fn(TransactionHandle) -> Fut,
3658 Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
3659 {
3660 let body = async {
3661 let mut i = 0;
3662 loop {
3663 let (tx, result) = self.with_transaction(&f).await?;
3664 match result {
3665 Ok(Some((room_id, data))) => {
3666 let lock = self.rooms.entry(room_id).or_default().clone();
3667 let _guard = lock.lock_owned().await;
3668 match tx.commit().await.map_err(Into::into) {
3669 Ok(()) => {
3670 return Ok(Some(RoomGuard {
3671 data,
3672 _guard,
3673 _not_send: PhantomData,
3674 }));
3675 }
3676 Err(error) => {
3677 if !self.retry_on_serialization_error(&error, i).await {
3678 return Err(error);
3679 }
3680 }
3681 }
3682 }
3683 Ok(None) => match tx.commit().await.map_err(Into::into) {
3684 Ok(()) => return Ok(None),
3685 Err(error) => {
3686 if !self.retry_on_serialization_error(&error, i).await {
3687 return Err(error);
3688 }
3689 }
3690 },
3691 Err(error) => {
3692 tx.rollback().await?;
3693 if !self.retry_on_serialization_error(&error, i).await {
3694 return Err(error);
3695 }
3696 }
3697 }
3698 i += 1;
3699 }
3700 };
3701
3702 self.run(body).await
3703 }
3704
3705 async fn room_transaction<F, Fut, T>(&self, room_id: RoomId, f: F) -> Result<RoomGuard<T>>
3706 where
3707 F: Send + Fn(TransactionHandle) -> Fut,
3708 Fut: Send + Future<Output = Result<T>>,
3709 {
3710 let body = async {
3711 let mut i = 0;
3712 loop {
3713 let lock = self.rooms.entry(room_id).or_default().clone();
3714 let _guard = lock.lock_owned().await;
3715 let (tx, result) = self.with_transaction(&f).await?;
3716 match result {
3717 Ok(data) => match tx.commit().await.map_err(Into::into) {
3718 Ok(()) => {
3719 return Ok(RoomGuard {
3720 data,
3721 _guard,
3722 _not_send: PhantomData,
3723 });
3724 }
3725 Err(error) => {
3726 if !self.retry_on_serialization_error(&error, i).await {
3727 return Err(error);
3728 }
3729 }
3730 },
3731 Err(error) => {
3732 tx.rollback().await?;
3733 if !self.retry_on_serialization_error(&error, i).await {
3734 return Err(error);
3735 }
3736 }
3737 }
3738 i += 1;
3739 }
3740 };
3741
3742 self.run(body).await
3743 }
3744
3745 async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
3746 where
3747 F: Send + Fn(TransactionHandle) -> Fut,
3748 Fut: Send + Future<Output = Result<T>>,
3749 {
3750 let tx = self
3751 .pool
3752 .begin_with_config(Some(IsolationLevel::Serializable), None)
3753 .await?;
3754
3755 let mut tx = Arc::new(Some(tx));
3756 let result = f(TransactionHandle(tx.clone())).await;
3757 let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
3758 return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
3759 };
3760
3761 Ok((tx, result))
3762 }
3763
3764 async fn run<F, T>(&self, future: F) -> Result<T>
3765 where
3766 F: Future<Output = Result<T>>,
3767 {
3768 #[cfg(test)]
3769 {
3770 if let Executor::Deterministic(executor) = &self.executor {
3771 executor.simulate_random_delay().await;
3772 }
3773
3774 self.runtime.as_ref().unwrap().block_on(future)
3775 }
3776
3777 #[cfg(not(test))]
3778 {
3779 future.await
3780 }
3781 }
3782
3783 async fn retry_on_serialization_error(&self, error: &Error, prev_attempt_count: u32) -> bool {
3784 // If the error is due to a failure to serialize concurrent transactions, then retry
3785 // this transaction after a delay. With each subsequent retry, double the delay duration.
3786 // Also vary the delay randomly in order to ensure different database connections retry
3787 // at different times.
3788 if is_serialization_error(error) {
3789 let base_delay = 4_u64 << prev_attempt_count.min(16);
3790 let randomized_delay = base_delay as f32 * self.rng.lock().await.gen_range(0.5..=2.0);
3791 log::info!(
3792 "retrying transaction after serialization error. delay: {} ms.",
3793 randomized_delay
3794 );
3795 self.executor
3796 .sleep(Duration::from_millis(randomized_delay as u64))
3797 .await;
3798 true
3799 } else {
3800 false
3801 }
3802 }
3803}
3804
3805fn is_serialization_error(error: &Error) -> bool {
3806 const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
3807 match error {
3808 Error::Database(
3809 DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
3810 | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
3811 ) if error
3812 .as_database_error()
3813 .and_then(|error| error.code())
3814 .as_deref()
3815 == Some(SERIALIZATION_FAILURE_CODE) =>
3816 {
3817 true
3818 }
3819 _ => false,
3820 }
3821}
3822
3823struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
3824
3825impl Deref for TransactionHandle {
3826 type Target = DatabaseTransaction;
3827
3828 fn deref(&self) -> &Self::Target {
3829 self.0.as_ref().as_ref().unwrap()
3830 }
3831}
3832
3833pub struct RoomGuard<T> {
3834 data: T,
3835 _guard: OwnedMutexGuard<()>,
3836 _not_send: PhantomData<Rc<()>>,
3837}
3838
3839impl<T> Deref for RoomGuard<T> {
3840 type Target = T;
3841
3842 fn deref(&self) -> &T {
3843 &self.data
3844 }
3845}
3846
3847impl<T> DerefMut for RoomGuard<T> {
3848 fn deref_mut(&mut self) -> &mut T {
3849 &mut self.data
3850 }
3851}
3852
3853#[derive(Debug, Serialize, Deserialize)]
3854pub struct NewUserParams {
3855 pub github_login: String,
3856 pub github_user_id: i32,
3857 pub invite_count: i32,
3858}
3859
3860#[derive(Debug)]
3861pub struct NewUserResult {
3862 pub user_id: UserId,
3863 pub metrics_id: String,
3864 pub inviting_user_id: Option<UserId>,
3865 pub signup_device_id: Option<String>,
3866}
3867
3868#[derive(FromQueryResult, Debug, PartialEq)]
3869pub struct Channel {
3870 pub id: ChannelId,
3871 pub name: String,
3872 pub parent_id: Option<ChannelId>,
3873}
3874
3875fn random_invite_code() -> String {
3876 nanoid::nanoid!(16)
3877}
3878
3879fn random_email_confirmation_code() -> String {
3880 nanoid::nanoid!(64)
3881}
3882
3883macro_rules! id_type {
3884 ($name:ident) => {
3885 #[derive(
3886 Clone,
3887 Copy,
3888 Debug,
3889 Default,
3890 PartialEq,
3891 Eq,
3892 PartialOrd,
3893 Ord,
3894 Hash,
3895 Serialize,
3896 Deserialize,
3897 )]
3898 #[serde(transparent)]
3899 pub struct $name(pub i32);
3900
3901 impl $name {
3902 #[allow(unused)]
3903 pub const MAX: Self = Self(i32::MAX);
3904
3905 #[allow(unused)]
3906 pub fn from_proto(value: u64) -> Self {
3907 Self(value as i32)
3908 }
3909
3910 #[allow(unused)]
3911 pub fn to_proto(self) -> u64 {
3912 self.0 as u64
3913 }
3914 }
3915
3916 impl std::fmt::Display for $name {
3917 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
3918 self.0.fmt(f)
3919 }
3920 }
3921
3922 impl From<$name> for sea_query::Value {
3923 fn from(value: $name) -> Self {
3924 sea_query::Value::Int(Some(value.0))
3925 }
3926 }
3927
3928 impl sea_orm::TryGetable for $name {
3929 fn try_get(
3930 res: &sea_orm::QueryResult,
3931 pre: &str,
3932 col: &str,
3933 ) -> Result<Self, sea_orm::TryGetError> {
3934 Ok(Self(i32::try_get(res, pre, col)?))
3935 }
3936 }
3937
3938 impl sea_query::ValueType for $name {
3939 fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
3940 match v {
3941 Value::TinyInt(Some(int)) => {
3942 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3943 }
3944 Value::SmallInt(Some(int)) => {
3945 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3946 }
3947 Value::Int(Some(int)) => {
3948 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3949 }
3950 Value::BigInt(Some(int)) => {
3951 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3952 }
3953 Value::TinyUnsigned(Some(int)) => {
3954 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3955 }
3956 Value::SmallUnsigned(Some(int)) => {
3957 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3958 }
3959 Value::Unsigned(Some(int)) => {
3960 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3961 }
3962 Value::BigUnsigned(Some(int)) => {
3963 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3964 }
3965 _ => Err(sea_query::ValueTypeErr),
3966 }
3967 }
3968
3969 fn type_name() -> String {
3970 stringify!($name).into()
3971 }
3972
3973 fn array_type() -> sea_query::ArrayType {
3974 sea_query::ArrayType::Int
3975 }
3976
3977 fn column_type() -> sea_query::ColumnType {
3978 sea_query::ColumnType::Integer(None)
3979 }
3980 }
3981
3982 impl sea_orm::TryFromU64 for $name {
3983 fn try_from_u64(n: u64) -> Result<Self, DbErr> {
3984 Ok(Self(n.try_into().map_err(|_| {
3985 DbErr::ConvertFromU64(concat!(
3986 "error converting ",
3987 stringify!($name),
3988 " to u64"
3989 ))
3990 })?))
3991 }
3992 }
3993
3994 impl sea_query::Nullable for $name {
3995 fn null() -> Value {
3996 Value::Int(None)
3997 }
3998 }
3999 };
4000}
4001
4002id_type!(AccessTokenId);
4003id_type!(ChannelId);
4004id_type!(ChannelMemberId);
4005id_type!(ContactId);
4006id_type!(FollowerId);
4007id_type!(RoomId);
4008id_type!(RoomParticipantId);
4009id_type!(ProjectId);
4010id_type!(ProjectCollaboratorId);
4011id_type!(ReplicaId);
4012id_type!(ServerId);
4013id_type!(SignupId);
4014id_type!(UserId);
4015
4016#[derive(Clone)]
4017pub struct JoinRoom {
4018 pub room: proto::Room,
4019 pub channel_id: Option<ChannelId>,
4020 pub channel_members: Vec<UserId>,
4021}
4022
4023pub struct RejoinedRoom {
4024 pub room: proto::Room,
4025 pub rejoined_projects: Vec<RejoinedProject>,
4026 pub reshared_projects: Vec<ResharedProject>,
4027 pub channel_id: Option<ChannelId>,
4028 pub channel_members: Vec<UserId>,
4029}
4030
4031pub struct ResharedProject {
4032 pub id: ProjectId,
4033 pub old_connection_id: ConnectionId,
4034 pub collaborators: Vec<ProjectCollaborator>,
4035 pub worktrees: Vec<proto::WorktreeMetadata>,
4036}
4037
4038pub struct RejoinedProject {
4039 pub id: ProjectId,
4040 pub old_connection_id: ConnectionId,
4041 pub collaborators: Vec<ProjectCollaborator>,
4042 pub worktrees: Vec<RejoinedWorktree>,
4043 pub language_servers: Vec<proto::LanguageServer>,
4044}
4045
4046#[derive(Debug)]
4047pub struct RejoinedWorktree {
4048 pub id: u64,
4049 pub abs_path: String,
4050 pub root_name: String,
4051 pub visible: bool,
4052 pub updated_entries: Vec<proto::Entry>,
4053 pub removed_entries: Vec<u64>,
4054 pub updated_repositories: Vec<proto::RepositoryEntry>,
4055 pub removed_repositories: Vec<u64>,
4056 pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
4057 pub settings_files: Vec<WorktreeSettingsFile>,
4058 pub scan_id: u64,
4059 pub completed_scan_id: u64,
4060}
4061
4062pub struct LeftRoom {
4063 pub room: proto::Room,
4064 pub channel_id: Option<ChannelId>,
4065 pub channel_members: Vec<UserId>,
4066 pub left_projects: HashMap<ProjectId, LeftProject>,
4067 pub canceled_calls_to_user_ids: Vec<UserId>,
4068 pub deleted: bool,
4069}
4070
4071pub struct RefreshedRoom {
4072 pub room: proto::Room,
4073 pub channel_id: Option<ChannelId>,
4074 pub channel_members: Vec<UserId>,
4075 pub stale_participant_user_ids: Vec<UserId>,
4076 pub canceled_calls_to_user_ids: Vec<UserId>,
4077}
4078
4079pub struct Project {
4080 pub collaborators: Vec<ProjectCollaborator>,
4081 pub worktrees: BTreeMap<u64, Worktree>,
4082 pub language_servers: Vec<proto::LanguageServer>,
4083}
4084
4085pub struct ProjectCollaborator {
4086 pub connection_id: ConnectionId,
4087 pub user_id: UserId,
4088 pub replica_id: ReplicaId,
4089 pub is_host: bool,
4090}
4091
4092impl ProjectCollaborator {
4093 pub fn to_proto(&self) -> proto::Collaborator {
4094 proto::Collaborator {
4095 peer_id: Some(self.connection_id.into()),
4096 replica_id: self.replica_id.0 as u32,
4097 user_id: self.user_id.to_proto(),
4098 }
4099 }
4100}
4101
4102#[derive(Debug)]
4103pub struct LeftProject {
4104 pub id: ProjectId,
4105 pub host_user_id: UserId,
4106 pub host_connection_id: ConnectionId,
4107 pub connection_ids: Vec<ConnectionId>,
4108}
4109
4110pub struct Worktree {
4111 pub id: u64,
4112 pub abs_path: String,
4113 pub root_name: String,
4114 pub visible: bool,
4115 pub entries: Vec<proto::Entry>,
4116 pub repository_entries: BTreeMap<u64, proto::RepositoryEntry>,
4117 pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
4118 pub settings_files: Vec<WorktreeSettingsFile>,
4119 pub scan_id: u64,
4120 pub completed_scan_id: u64,
4121}
4122
4123#[derive(Debug)]
4124pub struct WorktreeSettingsFile {
4125 pub path: String,
4126 pub content: String,
4127}
4128
4129#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
4130enum QueryChannelIds {
4131 ChannelId,
4132}
4133
4134#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
4135enum QueryUserIds {
4136 UserId,
4137}
4138
4139#[cfg(test)]
4140pub use test::*;
4141
4142#[cfg(test)]
4143mod test {
4144 use super::*;
4145 use gpui::executor::Background;
4146 use parking_lot::Mutex;
4147 use sea_orm::ConnectionTrait;
4148 use sqlx::migrate::MigrateDatabase;
4149 use std::sync::Arc;
4150
4151 pub struct TestDb {
4152 pub db: Option<Arc<Database>>,
4153 pub connection: Option<sqlx::AnyConnection>,
4154 }
4155
4156 impl TestDb {
4157 pub fn sqlite(background: Arc<Background>) -> Self {
4158 let url = format!("sqlite::memory:");
4159 let runtime = tokio::runtime::Builder::new_current_thread()
4160 .enable_io()
4161 .enable_time()
4162 .build()
4163 .unwrap();
4164
4165 let mut db = runtime.block_on(async {
4166 let mut options = ConnectOptions::new(url);
4167 options.max_connections(5);
4168 let db = Database::new(options, Executor::Deterministic(background))
4169 .await
4170 .unwrap();
4171 let sql = include_str!(concat!(
4172 env!("CARGO_MANIFEST_DIR"),
4173 "/migrations.sqlite/20221109000000_test_schema.sql"
4174 ));
4175 db.pool
4176 .execute(sea_orm::Statement::from_string(
4177 db.pool.get_database_backend(),
4178 sql.into(),
4179 ))
4180 .await
4181 .unwrap();
4182 db
4183 });
4184
4185 db.runtime = Some(runtime);
4186
4187 Self {
4188 db: Some(Arc::new(db)),
4189 connection: None,
4190 }
4191 }
4192
4193 pub fn postgres(background: Arc<Background>) -> Self {
4194 static LOCK: Mutex<()> = Mutex::new(());
4195
4196 let _guard = LOCK.lock();
4197 let mut rng = StdRng::from_entropy();
4198 let url = format!(
4199 "postgres://postgres@localhost/zed-test-{}",
4200 rng.gen::<u128>()
4201 );
4202 let runtime = tokio::runtime::Builder::new_current_thread()
4203 .enable_io()
4204 .enable_time()
4205 .build()
4206 .unwrap();
4207
4208 let mut db = runtime.block_on(async {
4209 sqlx::Postgres::create_database(&url)
4210 .await
4211 .expect("failed to create test db");
4212 let mut options = ConnectOptions::new(url);
4213 options
4214 .max_connections(5)
4215 .idle_timeout(Duration::from_secs(0));
4216 let db = Database::new(options, Executor::Deterministic(background))
4217 .await
4218 .unwrap();
4219 let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
4220 db.migrate(Path::new(migrations_path), false).await.unwrap();
4221 db
4222 });
4223
4224 db.runtime = Some(runtime);
4225
4226 Self {
4227 db: Some(Arc::new(db)),
4228 connection: None,
4229 }
4230 }
4231
4232 pub fn db(&self) -> &Arc<Database> {
4233 self.db.as_ref().unwrap()
4234 }
4235 }
4236
4237 impl Drop for TestDb {
4238 fn drop(&mut self) {
4239 let db = self.db.take().unwrap();
4240 if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
4241 db.runtime.as_ref().unwrap().block_on(async {
4242 use util::ResultExt;
4243 let query = "
4244 SELECT pg_terminate_backend(pg_stat_activity.pid)
4245 FROM pg_stat_activity
4246 WHERE
4247 pg_stat_activity.datname = current_database() AND
4248 pid <> pg_backend_pid();
4249 ";
4250 db.pool
4251 .execute(sea_orm::Statement::from_string(
4252 db.pool.get_database_backend(),
4253 query.into(),
4254 ))
4255 .await
4256 .log_err();
4257 sqlx::Postgres::drop_database(db.options.get_url())
4258 .await
4259 .log_err();
4260 })
4261 }
4262 }
4263 }
4264}