1mod access_token;
2mod channel;
3mod channel_member;
4mod channel_parent;
5mod contact;
6mod follower;
7mod language_server;
8mod project;
9mod project_collaborator;
10mod room;
11mod room_participant;
12mod server;
13mod signup;
14#[cfg(test)]
15mod tests;
16mod user;
17mod worktree;
18mod worktree_diagnostic_summary;
19mod worktree_entry;
20mod worktree_repository;
21mod worktree_repository_statuses;
22mod worktree_settings_file;
23
24use crate::executor::Executor;
25use crate::{Error, Result};
26use anyhow::anyhow;
27use collections::{BTreeMap, HashMap, HashSet};
28pub use contact::Contact;
29use dashmap::DashMap;
30use futures::StreamExt;
31use hyper::StatusCode;
32use rand::prelude::StdRng;
33use rand::{Rng, SeedableRng};
34use rpc::{proto, ConnectionId};
35use sea_orm::Condition;
36pub use sea_orm::ConnectOptions;
37use sea_orm::{
38 entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
39 DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
40 Statement, TransactionTrait,
41};
42use sea_query::{Alias, Expr, OnConflict, Query};
43use serde::{Deserialize, Serialize};
44pub use signup::{Invite, NewSignup, WaitlistSummary};
45use sqlx::migrate::{Migrate, Migration, MigrationSource};
46use sqlx::Connection;
47use std::fmt::Write as _;
48use std::ops::{Deref, DerefMut};
49use std::path::Path;
50use std::time::Duration;
51use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
52use tokio::sync::{Mutex, OwnedMutexGuard};
53pub use user::Model as User;
54
55pub struct Database {
56 options: ConnectOptions,
57 pool: DatabaseConnection,
58 rooms: DashMap<RoomId, Arc<Mutex<()>>>,
59 rng: Mutex<StdRng>,
60 executor: Executor,
61 #[cfg(test)]
62 runtime: Option<tokio::runtime::Runtime>,
63}
64
65impl Database {
66 pub async fn new(options: ConnectOptions, executor: Executor) -> Result<Self> {
67 Ok(Self {
68 options: options.clone(),
69 pool: sea_orm::Database::connect(options).await?,
70 rooms: DashMap::with_capacity(16384),
71 rng: Mutex::new(StdRng::seed_from_u64(0)),
72 executor,
73 #[cfg(test)]
74 runtime: None,
75 })
76 }
77
78 #[cfg(test)]
79 pub fn reset(&self) {
80 self.rooms.clear();
81 }
82
83 pub async fn migrate(
84 &self,
85 migrations_path: &Path,
86 ignore_checksum_mismatch: bool,
87 ) -> anyhow::Result<Vec<(Migration, Duration)>> {
88 let migrations = MigrationSource::resolve(migrations_path)
89 .await
90 .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
91
92 let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
93
94 connection.ensure_migrations_table().await?;
95 let applied_migrations: HashMap<_, _> = connection
96 .list_applied_migrations()
97 .await?
98 .into_iter()
99 .map(|m| (m.version, m))
100 .collect();
101
102 let mut new_migrations = Vec::new();
103 for migration in migrations {
104 match applied_migrations.get(&migration.version) {
105 Some(applied_migration) => {
106 if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
107 {
108 Err(anyhow!(
109 "checksum mismatch for applied migration {}",
110 migration.description
111 ))?;
112 }
113 }
114 None => {
115 let elapsed = connection.apply(&migration).await?;
116 new_migrations.push((migration, elapsed));
117 }
118 }
119 }
120
121 Ok(new_migrations)
122 }
123
124 pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
125 self.transaction(|tx| async move {
126 let server = server::ActiveModel {
127 environment: ActiveValue::set(environment.into()),
128 ..Default::default()
129 }
130 .insert(&*tx)
131 .await?;
132 Ok(server.id)
133 })
134 .await
135 }
136
137 pub async fn stale_room_ids(
138 &self,
139 environment: &str,
140 new_server_id: ServerId,
141 ) -> Result<Vec<RoomId>> {
142 self.transaction(|tx| async move {
143 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
144 enum QueryAs {
145 RoomId,
146 }
147
148 let stale_server_epochs = self
149 .stale_server_ids(environment, new_server_id, &tx)
150 .await?;
151 Ok(room_participant::Entity::find()
152 .select_only()
153 .column(room_participant::Column::RoomId)
154 .distinct()
155 .filter(
156 room_participant::Column::AnsweringConnectionServerId
157 .is_in(stale_server_epochs),
158 )
159 .into_values::<_, QueryAs>()
160 .all(&*tx)
161 .await?)
162 })
163 .await
164 }
165
166 pub async fn refresh_room(
167 &self,
168 room_id: RoomId,
169 new_server_id: ServerId,
170 ) -> Result<RoomGuard<RefreshedRoom>> {
171 self.room_transaction(room_id, |tx| async move {
172 let stale_participant_filter = Condition::all()
173 .add(room_participant::Column::RoomId.eq(room_id))
174 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
175 .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
176
177 let stale_participant_user_ids = room_participant::Entity::find()
178 .filter(stale_participant_filter.clone())
179 .all(&*tx)
180 .await?
181 .into_iter()
182 .map(|participant| participant.user_id)
183 .collect::<Vec<_>>();
184
185 // Delete participants who failed to reconnect and cancel their calls.
186 let mut canceled_calls_to_user_ids = Vec::new();
187 room_participant::Entity::delete_many()
188 .filter(stale_participant_filter)
189 .exec(&*tx)
190 .await?;
191 let called_participants = room_participant::Entity::find()
192 .filter(
193 Condition::all()
194 .add(
195 room_participant::Column::CallingUserId
196 .is_in(stale_participant_user_ids.iter().copied()),
197 )
198 .add(room_participant::Column::AnsweringConnectionId.is_null()),
199 )
200 .all(&*tx)
201 .await?;
202 room_participant::Entity::delete_many()
203 .filter(
204 room_participant::Column::Id
205 .is_in(called_participants.iter().map(|participant| participant.id)),
206 )
207 .exec(&*tx)
208 .await?;
209 canceled_calls_to_user_ids.extend(
210 called_participants
211 .into_iter()
212 .map(|participant| participant.user_id),
213 );
214
215 let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
216 let channel_members = if let Some(channel_id) = channel_id {
217 self.get_channel_members_internal(channel_id, &tx).await?
218 } else {
219 Vec::new()
220 };
221
222 // Delete the room if it becomes empty.
223 if room.participants.is_empty() {
224 project::Entity::delete_many()
225 .filter(project::Column::RoomId.eq(room_id))
226 .exec(&*tx)
227 .await?;
228 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
229 }
230
231 Ok(RefreshedRoom {
232 room,
233 channel_id,
234 channel_members,
235 stale_participant_user_ids,
236 canceled_calls_to_user_ids,
237 })
238 })
239 .await
240 }
241
242 pub async fn delete_stale_servers(
243 &self,
244 environment: &str,
245 new_server_id: ServerId,
246 ) -> Result<()> {
247 self.transaction(|tx| async move {
248 server::Entity::delete_many()
249 .filter(
250 Condition::all()
251 .add(server::Column::Environment.eq(environment))
252 .add(server::Column::Id.ne(new_server_id)),
253 )
254 .exec(&*tx)
255 .await?;
256 Ok(())
257 })
258 .await
259 }
260
261 async fn stale_server_ids(
262 &self,
263 environment: &str,
264 new_server_id: ServerId,
265 tx: &DatabaseTransaction,
266 ) -> Result<Vec<ServerId>> {
267 let stale_servers = server::Entity::find()
268 .filter(
269 Condition::all()
270 .add(server::Column::Environment.eq(environment))
271 .add(server::Column::Id.ne(new_server_id)),
272 )
273 .all(&*tx)
274 .await?;
275 Ok(stale_servers.into_iter().map(|server| server.id).collect())
276 }
277
278 // users
279
280 pub async fn create_user(
281 &self,
282 email_address: &str,
283 admin: bool,
284 params: NewUserParams,
285 ) -> Result<NewUserResult> {
286 self.transaction(|tx| async {
287 let tx = tx;
288 let user = user::Entity::insert(user::ActiveModel {
289 email_address: ActiveValue::set(Some(email_address.into())),
290 github_login: ActiveValue::set(params.github_login.clone()),
291 github_user_id: ActiveValue::set(Some(params.github_user_id)),
292 admin: ActiveValue::set(admin),
293 metrics_id: ActiveValue::set(Uuid::new_v4()),
294 ..Default::default()
295 })
296 .on_conflict(
297 OnConflict::column(user::Column::GithubLogin)
298 .update_column(user::Column::GithubLogin)
299 .to_owned(),
300 )
301 .exec_with_returning(&*tx)
302 .await?;
303
304 Ok(NewUserResult {
305 user_id: user.id,
306 metrics_id: user.metrics_id.to_string(),
307 signup_device_id: None,
308 inviting_user_id: None,
309 })
310 })
311 .await
312 }
313
314 pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
315 self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
316 .await
317 }
318
319 pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
320 self.transaction(|tx| async {
321 let tx = tx;
322 Ok(user::Entity::find()
323 .filter(user::Column::Id.is_in(ids.iter().copied()))
324 .all(&*tx)
325 .await?)
326 })
327 .await
328 }
329
330 pub async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>> {
331 self.transaction(|tx| async move {
332 Ok(user::Entity::find()
333 .filter(user::Column::GithubLogin.eq(github_login))
334 .one(&*tx)
335 .await?)
336 })
337 .await
338 }
339
340 pub async fn get_or_create_user_by_github_account(
341 &self,
342 github_login: &str,
343 github_user_id: Option<i32>,
344 github_email: Option<&str>,
345 ) -> Result<Option<User>> {
346 self.transaction(|tx| async move {
347 let tx = &*tx;
348 if let Some(github_user_id) = github_user_id {
349 if let Some(user_by_github_user_id) = user::Entity::find()
350 .filter(user::Column::GithubUserId.eq(github_user_id))
351 .one(tx)
352 .await?
353 {
354 let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
355 user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
356 Ok(Some(user_by_github_user_id.update(tx).await?))
357 } else if let Some(user_by_github_login) = user::Entity::find()
358 .filter(user::Column::GithubLogin.eq(github_login))
359 .one(tx)
360 .await?
361 {
362 let mut user_by_github_login = user_by_github_login.into_active_model();
363 user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
364 Ok(Some(user_by_github_login.update(tx).await?))
365 } else {
366 let user = user::Entity::insert(user::ActiveModel {
367 email_address: ActiveValue::set(github_email.map(|email| email.into())),
368 github_login: ActiveValue::set(github_login.into()),
369 github_user_id: ActiveValue::set(Some(github_user_id)),
370 admin: ActiveValue::set(false),
371 invite_count: ActiveValue::set(0),
372 invite_code: ActiveValue::set(None),
373 metrics_id: ActiveValue::set(Uuid::new_v4()),
374 ..Default::default()
375 })
376 .exec_with_returning(&*tx)
377 .await?;
378 Ok(Some(user))
379 }
380 } else {
381 Ok(user::Entity::find()
382 .filter(user::Column::GithubLogin.eq(github_login))
383 .one(tx)
384 .await?)
385 }
386 })
387 .await
388 }
389
390 pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
391 self.transaction(|tx| async move {
392 Ok(user::Entity::find()
393 .order_by_asc(user::Column::GithubLogin)
394 .limit(limit as u64)
395 .offset(page as u64 * limit as u64)
396 .all(&*tx)
397 .await?)
398 })
399 .await
400 }
401
402 pub async fn get_users_with_no_invites(
403 &self,
404 invited_by_another_user: bool,
405 ) -> Result<Vec<User>> {
406 self.transaction(|tx| async move {
407 Ok(user::Entity::find()
408 .filter(
409 user::Column::InviteCount
410 .eq(0)
411 .and(if invited_by_another_user {
412 user::Column::InviterId.is_not_null()
413 } else {
414 user::Column::InviterId.is_null()
415 }),
416 )
417 .all(&*tx)
418 .await?)
419 })
420 .await
421 }
422
423 pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
424 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
425 enum QueryAs {
426 MetricsId,
427 }
428
429 self.transaction(|tx| async move {
430 let metrics_id: Uuid = user::Entity::find_by_id(id)
431 .select_only()
432 .column(user::Column::MetricsId)
433 .into_values::<_, QueryAs>()
434 .one(&*tx)
435 .await?
436 .ok_or_else(|| anyhow!("could not find user"))?;
437 Ok(metrics_id.to_string())
438 })
439 .await
440 }
441
442 pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
443 self.transaction(|tx| async move {
444 user::Entity::update_many()
445 .filter(user::Column::Id.eq(id))
446 .set(user::ActiveModel {
447 admin: ActiveValue::set(is_admin),
448 ..Default::default()
449 })
450 .exec(&*tx)
451 .await?;
452 Ok(())
453 })
454 .await
455 }
456
457 pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
458 self.transaction(|tx| async move {
459 user::Entity::update_many()
460 .filter(user::Column::Id.eq(id))
461 .set(user::ActiveModel {
462 connected_once: ActiveValue::set(connected_once),
463 ..Default::default()
464 })
465 .exec(&*tx)
466 .await?;
467 Ok(())
468 })
469 .await
470 }
471
472 pub async fn destroy_user(&self, id: UserId) -> Result<()> {
473 self.transaction(|tx| async move {
474 access_token::Entity::delete_many()
475 .filter(access_token::Column::UserId.eq(id))
476 .exec(&*tx)
477 .await?;
478 user::Entity::delete_by_id(id).exec(&*tx).await?;
479 Ok(())
480 })
481 .await
482 }
483
484 // contacts
485
486 pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
487 #[derive(Debug, FromQueryResult)]
488 struct ContactWithUserBusyStatuses {
489 user_id_a: UserId,
490 user_id_b: UserId,
491 a_to_b: bool,
492 accepted: bool,
493 should_notify: bool,
494 user_a_busy: bool,
495 user_b_busy: bool,
496 }
497
498 self.transaction(|tx| async move {
499 let user_a_participant = Alias::new("user_a_participant");
500 let user_b_participant = Alias::new("user_b_participant");
501 let mut db_contacts = contact::Entity::find()
502 .column_as(
503 Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
504 .is_not_null(),
505 "user_a_busy",
506 )
507 .column_as(
508 Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
509 .is_not_null(),
510 "user_b_busy",
511 )
512 .filter(
513 contact::Column::UserIdA
514 .eq(user_id)
515 .or(contact::Column::UserIdB.eq(user_id)),
516 )
517 .join_as(
518 JoinType::LeftJoin,
519 contact::Relation::UserARoomParticipant.def(),
520 user_a_participant,
521 )
522 .join_as(
523 JoinType::LeftJoin,
524 contact::Relation::UserBRoomParticipant.def(),
525 user_b_participant,
526 )
527 .into_model::<ContactWithUserBusyStatuses>()
528 .stream(&*tx)
529 .await?;
530
531 let mut contacts = Vec::new();
532 while let Some(db_contact) = db_contacts.next().await {
533 let db_contact = db_contact?;
534 if db_contact.user_id_a == user_id {
535 if db_contact.accepted {
536 contacts.push(Contact::Accepted {
537 user_id: db_contact.user_id_b,
538 should_notify: db_contact.should_notify && db_contact.a_to_b,
539 busy: db_contact.user_b_busy,
540 });
541 } else if db_contact.a_to_b {
542 contacts.push(Contact::Outgoing {
543 user_id: db_contact.user_id_b,
544 })
545 } else {
546 contacts.push(Contact::Incoming {
547 user_id: db_contact.user_id_b,
548 should_notify: db_contact.should_notify,
549 });
550 }
551 } else if db_contact.accepted {
552 contacts.push(Contact::Accepted {
553 user_id: db_contact.user_id_a,
554 should_notify: db_contact.should_notify && !db_contact.a_to_b,
555 busy: db_contact.user_a_busy,
556 });
557 } else if db_contact.a_to_b {
558 contacts.push(Contact::Incoming {
559 user_id: db_contact.user_id_a,
560 should_notify: db_contact.should_notify,
561 });
562 } else {
563 contacts.push(Contact::Outgoing {
564 user_id: db_contact.user_id_a,
565 });
566 }
567 }
568
569 contacts.sort_unstable_by_key(|contact| contact.user_id());
570
571 Ok(contacts)
572 })
573 .await
574 }
575
576 pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
577 self.transaction(|tx| async move {
578 let participant = room_participant::Entity::find()
579 .filter(room_participant::Column::UserId.eq(user_id))
580 .one(&*tx)
581 .await?;
582 Ok(participant.is_some())
583 })
584 .await
585 }
586
587 pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
588 self.transaction(|tx| async move {
589 let (id_a, id_b) = if user_id_1 < user_id_2 {
590 (user_id_1, user_id_2)
591 } else {
592 (user_id_2, user_id_1)
593 };
594
595 Ok(contact::Entity::find()
596 .filter(
597 contact::Column::UserIdA
598 .eq(id_a)
599 .and(contact::Column::UserIdB.eq(id_b))
600 .and(contact::Column::Accepted.eq(true)),
601 )
602 .one(&*tx)
603 .await?
604 .is_some())
605 })
606 .await
607 }
608
609 pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
610 self.transaction(|tx| async move {
611 let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
612 (sender_id, receiver_id, true)
613 } else {
614 (receiver_id, sender_id, false)
615 };
616
617 let rows_affected = contact::Entity::insert(contact::ActiveModel {
618 user_id_a: ActiveValue::set(id_a),
619 user_id_b: ActiveValue::set(id_b),
620 a_to_b: ActiveValue::set(a_to_b),
621 accepted: ActiveValue::set(false),
622 should_notify: ActiveValue::set(true),
623 ..Default::default()
624 })
625 .on_conflict(
626 OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
627 .values([
628 (contact::Column::Accepted, true.into()),
629 (contact::Column::ShouldNotify, false.into()),
630 ])
631 .action_and_where(
632 contact::Column::Accepted.eq(false).and(
633 contact::Column::AToB
634 .eq(a_to_b)
635 .and(contact::Column::UserIdA.eq(id_b))
636 .or(contact::Column::AToB
637 .ne(a_to_b)
638 .and(contact::Column::UserIdA.eq(id_a))),
639 ),
640 )
641 .to_owned(),
642 )
643 .exec_without_returning(&*tx)
644 .await?;
645
646 if rows_affected == 1 {
647 Ok(())
648 } else {
649 Err(anyhow!("contact already requested"))?
650 }
651 })
652 .await
653 }
654
655 /// Returns a bool indicating whether the removed contact had originally accepted or not
656 ///
657 /// Deletes the contact identified by the requester and responder ids, and then returns
658 /// whether the deleted contact had originally accepted or was a pending contact request.
659 ///
660 /// # Arguments
661 ///
662 /// * `requester_id` - The user that initiates this request
663 /// * `responder_id` - The user that will be removed
664 pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<bool> {
665 self.transaction(|tx| async move {
666 let (id_a, id_b) = if responder_id < requester_id {
667 (responder_id, requester_id)
668 } else {
669 (requester_id, responder_id)
670 };
671
672 let contact = contact::Entity::find()
673 .filter(
674 contact::Column::UserIdA
675 .eq(id_a)
676 .and(contact::Column::UserIdB.eq(id_b)),
677 )
678 .one(&*tx)
679 .await?
680 .ok_or_else(|| anyhow!("no such contact"))?;
681
682 contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
683 Ok(contact.accepted)
684 })
685 .await
686 }
687
688 pub async fn dismiss_contact_notification(
689 &self,
690 user_id: UserId,
691 contact_user_id: UserId,
692 ) -> Result<()> {
693 self.transaction(|tx| async move {
694 let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
695 (user_id, contact_user_id, true)
696 } else {
697 (contact_user_id, user_id, false)
698 };
699
700 let result = contact::Entity::update_many()
701 .set(contact::ActiveModel {
702 should_notify: ActiveValue::set(false),
703 ..Default::default()
704 })
705 .filter(
706 contact::Column::UserIdA
707 .eq(id_a)
708 .and(contact::Column::UserIdB.eq(id_b))
709 .and(
710 contact::Column::AToB
711 .eq(a_to_b)
712 .and(contact::Column::Accepted.eq(true))
713 .or(contact::Column::AToB
714 .ne(a_to_b)
715 .and(contact::Column::Accepted.eq(false))),
716 ),
717 )
718 .exec(&*tx)
719 .await?;
720 if result.rows_affected == 0 {
721 Err(anyhow!("no such contact request"))?
722 } else {
723 Ok(())
724 }
725 })
726 .await
727 }
728
729 pub async fn respond_to_contact_request(
730 &self,
731 responder_id: UserId,
732 requester_id: UserId,
733 accept: bool,
734 ) -> Result<()> {
735 self.transaction(|tx| async move {
736 let (id_a, id_b, a_to_b) = if responder_id < requester_id {
737 (responder_id, requester_id, false)
738 } else {
739 (requester_id, responder_id, true)
740 };
741 let rows_affected = if accept {
742 let result = contact::Entity::update_many()
743 .set(contact::ActiveModel {
744 accepted: ActiveValue::set(true),
745 should_notify: ActiveValue::set(true),
746 ..Default::default()
747 })
748 .filter(
749 contact::Column::UserIdA
750 .eq(id_a)
751 .and(contact::Column::UserIdB.eq(id_b))
752 .and(contact::Column::AToB.eq(a_to_b)),
753 )
754 .exec(&*tx)
755 .await?;
756 result.rows_affected
757 } else {
758 let result = contact::Entity::delete_many()
759 .filter(
760 contact::Column::UserIdA
761 .eq(id_a)
762 .and(contact::Column::UserIdB.eq(id_b))
763 .and(contact::Column::AToB.eq(a_to_b))
764 .and(contact::Column::Accepted.eq(false)),
765 )
766 .exec(&*tx)
767 .await?;
768
769 result.rows_affected
770 };
771
772 if rows_affected == 1 {
773 Ok(())
774 } else {
775 Err(anyhow!("no such contact request"))?
776 }
777 })
778 .await
779 }
780
781 pub fn fuzzy_like_string(string: &str) -> String {
782 let mut result = String::with_capacity(string.len() * 2 + 1);
783 for c in string.chars() {
784 if c.is_alphanumeric() {
785 result.push('%');
786 result.push(c);
787 }
788 }
789 result.push('%');
790 result
791 }
792
793 pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
794 self.transaction(|tx| async {
795 let tx = tx;
796 let like_string = Self::fuzzy_like_string(name_query);
797 let query = "
798 SELECT users.*
799 FROM users
800 WHERE github_login ILIKE $1
801 ORDER BY github_login <-> $2
802 LIMIT $3
803 ";
804
805 Ok(user::Entity::find()
806 .from_raw_sql(Statement::from_sql_and_values(
807 self.pool.get_database_backend(),
808 query.into(),
809 vec![like_string.into(), name_query.into(), limit.into()],
810 ))
811 .all(&*tx)
812 .await?)
813 })
814 .await
815 }
816
817 // signups
818
819 pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
820 self.transaction(|tx| async move {
821 signup::Entity::insert(signup::ActiveModel {
822 email_address: ActiveValue::set(signup.email_address.clone()),
823 email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
824 email_confirmation_sent: ActiveValue::set(false),
825 platform_mac: ActiveValue::set(signup.platform_mac),
826 platform_windows: ActiveValue::set(signup.platform_windows),
827 platform_linux: ActiveValue::set(signup.platform_linux),
828 platform_unknown: ActiveValue::set(false),
829 editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
830 programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
831 device_id: ActiveValue::set(signup.device_id.clone()),
832 added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
833 ..Default::default()
834 })
835 .on_conflict(
836 OnConflict::column(signup::Column::EmailAddress)
837 .update_columns([
838 signup::Column::PlatformMac,
839 signup::Column::PlatformWindows,
840 signup::Column::PlatformLinux,
841 signup::Column::EditorFeatures,
842 signup::Column::ProgrammingLanguages,
843 signup::Column::DeviceId,
844 signup::Column::AddedToMailingList,
845 ])
846 .to_owned(),
847 )
848 .exec(&*tx)
849 .await?;
850 Ok(())
851 })
852 .await
853 }
854
855 pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
856 self.transaction(|tx| async move {
857 let signup = signup::Entity::find()
858 .filter(signup::Column::EmailAddress.eq(email_address))
859 .one(&*tx)
860 .await?
861 .ok_or_else(|| {
862 anyhow!("signup with email address {} doesn't exist", email_address)
863 })?;
864
865 Ok(signup)
866 })
867 .await
868 }
869
870 pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
871 self.transaction(|tx| async move {
872 let query = "
873 SELECT
874 COUNT(*) as count,
875 COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
876 COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
877 COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
878 COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
879 FROM (
880 SELECT *
881 FROM signups
882 WHERE
883 NOT email_confirmation_sent
884 ) AS unsent
885 ";
886 Ok(
887 WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
888 self.pool.get_database_backend(),
889 query.into(),
890 vec![],
891 ))
892 .one(&*tx)
893 .await?
894 .ok_or_else(|| anyhow!("invalid result"))?,
895 )
896 })
897 .await
898 }
899
900 pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
901 let emails = invites
902 .iter()
903 .map(|s| s.email_address.as_str())
904 .collect::<Vec<_>>();
905 self.transaction(|tx| async {
906 let tx = tx;
907 signup::Entity::update_many()
908 .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
909 .set(signup::ActiveModel {
910 email_confirmation_sent: ActiveValue::set(true),
911 ..Default::default()
912 })
913 .exec(&*tx)
914 .await?;
915 Ok(())
916 })
917 .await
918 }
919
920 pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
921 self.transaction(|tx| async move {
922 Ok(signup::Entity::find()
923 .select_only()
924 .column(signup::Column::EmailAddress)
925 .column(signup::Column::EmailConfirmationCode)
926 .filter(
927 signup::Column::EmailConfirmationSent.eq(false).and(
928 signup::Column::PlatformMac
929 .eq(true)
930 .or(signup::Column::PlatformUnknown.eq(true)),
931 ),
932 )
933 .order_by_asc(signup::Column::CreatedAt)
934 .limit(count as u64)
935 .into_model()
936 .all(&*tx)
937 .await?)
938 })
939 .await
940 }
941
942 // invite codes
943
944 pub async fn create_invite_from_code(
945 &self,
946 code: &str,
947 email_address: &str,
948 device_id: Option<&str>,
949 added_to_mailing_list: bool,
950 ) -> Result<Invite> {
951 self.transaction(|tx| async move {
952 let existing_user = user::Entity::find()
953 .filter(user::Column::EmailAddress.eq(email_address))
954 .one(&*tx)
955 .await?;
956
957 if existing_user.is_some() {
958 Err(anyhow!("email address is already in use"))?;
959 }
960
961 let inviting_user_with_invites = match user::Entity::find()
962 .filter(
963 user::Column::InviteCode
964 .eq(code)
965 .and(user::Column::InviteCount.gt(0)),
966 )
967 .one(&*tx)
968 .await?
969 {
970 Some(inviting_user) => inviting_user,
971 None => {
972 return Err(Error::Http(
973 StatusCode::UNAUTHORIZED,
974 "unable to find an invite code with invites remaining".to_string(),
975 ))?
976 }
977 };
978 user::Entity::update_many()
979 .filter(
980 user::Column::Id
981 .eq(inviting_user_with_invites.id)
982 .and(user::Column::InviteCount.gt(0)),
983 )
984 .col_expr(
985 user::Column::InviteCount,
986 Expr::col(user::Column::InviteCount).sub(1),
987 )
988 .exec(&*tx)
989 .await?;
990
991 let signup = signup::Entity::insert(signup::ActiveModel {
992 email_address: ActiveValue::set(email_address.into()),
993 email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
994 email_confirmation_sent: ActiveValue::set(false),
995 inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
996 platform_linux: ActiveValue::set(false),
997 platform_mac: ActiveValue::set(false),
998 platform_windows: ActiveValue::set(false),
999 platform_unknown: ActiveValue::set(true),
1000 device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
1001 added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
1002 ..Default::default()
1003 })
1004 .on_conflict(
1005 OnConflict::column(signup::Column::EmailAddress)
1006 .update_column(signup::Column::InvitingUserId)
1007 .to_owned(),
1008 )
1009 .exec_with_returning(&*tx)
1010 .await?;
1011
1012 Ok(Invite {
1013 email_address: signup.email_address,
1014 email_confirmation_code: signup.email_confirmation_code,
1015 })
1016 })
1017 .await
1018 }
1019
1020 pub async fn create_user_from_invite(
1021 &self,
1022 invite: &Invite,
1023 user: NewUserParams,
1024 ) -> Result<Option<NewUserResult>> {
1025 self.transaction(|tx| async {
1026 let tx = tx;
1027 let signup = signup::Entity::find()
1028 .filter(
1029 signup::Column::EmailAddress
1030 .eq(invite.email_address.as_str())
1031 .and(
1032 signup::Column::EmailConfirmationCode
1033 .eq(invite.email_confirmation_code.as_str()),
1034 ),
1035 )
1036 .one(&*tx)
1037 .await?
1038 .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
1039
1040 if signup.user_id.is_some() {
1041 return Ok(None);
1042 }
1043
1044 let user = user::Entity::insert(user::ActiveModel {
1045 email_address: ActiveValue::set(Some(invite.email_address.clone())),
1046 github_login: ActiveValue::set(user.github_login.clone()),
1047 github_user_id: ActiveValue::set(Some(user.github_user_id)),
1048 admin: ActiveValue::set(false),
1049 invite_count: ActiveValue::set(user.invite_count),
1050 invite_code: ActiveValue::set(Some(random_invite_code())),
1051 metrics_id: ActiveValue::set(Uuid::new_v4()),
1052 ..Default::default()
1053 })
1054 .on_conflict(
1055 OnConflict::column(user::Column::GithubLogin)
1056 .update_columns([
1057 user::Column::EmailAddress,
1058 user::Column::GithubUserId,
1059 user::Column::Admin,
1060 ])
1061 .to_owned(),
1062 )
1063 .exec_with_returning(&*tx)
1064 .await?;
1065
1066 let mut signup = signup.into_active_model();
1067 signup.user_id = ActiveValue::set(Some(user.id));
1068 let signup = signup.update(&*tx).await?;
1069
1070 if let Some(inviting_user_id) = signup.inviting_user_id {
1071 let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1072 (inviting_user_id, user.id, true)
1073 } else {
1074 (user.id, inviting_user_id, false)
1075 };
1076
1077 contact::Entity::insert(contact::ActiveModel {
1078 user_id_a: ActiveValue::set(user_id_a),
1079 user_id_b: ActiveValue::set(user_id_b),
1080 a_to_b: ActiveValue::set(a_to_b),
1081 should_notify: ActiveValue::set(true),
1082 accepted: ActiveValue::set(true),
1083 ..Default::default()
1084 })
1085 .on_conflict(OnConflict::new().do_nothing().to_owned())
1086 .exec_without_returning(&*tx)
1087 .await?;
1088 }
1089
1090 Ok(Some(NewUserResult {
1091 user_id: user.id,
1092 metrics_id: user.metrics_id.to_string(),
1093 inviting_user_id: signup.inviting_user_id,
1094 signup_device_id: signup.device_id,
1095 }))
1096 })
1097 .await
1098 }
1099
1100 pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1101 self.transaction(|tx| async move {
1102 if count > 0 {
1103 user::Entity::update_many()
1104 .filter(
1105 user::Column::Id
1106 .eq(id)
1107 .and(user::Column::InviteCode.is_null()),
1108 )
1109 .set(user::ActiveModel {
1110 invite_code: ActiveValue::set(Some(random_invite_code())),
1111 ..Default::default()
1112 })
1113 .exec(&*tx)
1114 .await?;
1115 }
1116
1117 user::Entity::update_many()
1118 .filter(user::Column::Id.eq(id))
1119 .set(user::ActiveModel {
1120 invite_count: ActiveValue::set(count),
1121 ..Default::default()
1122 })
1123 .exec(&*tx)
1124 .await?;
1125 Ok(())
1126 })
1127 .await
1128 }
1129
1130 pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1131 self.transaction(|tx| async move {
1132 match user::Entity::find_by_id(id).one(&*tx).await? {
1133 Some(user) if user.invite_code.is_some() => {
1134 Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1135 }
1136 _ => Ok(None),
1137 }
1138 })
1139 .await
1140 }
1141
1142 pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1143 self.transaction(|tx| async move {
1144 user::Entity::find()
1145 .filter(user::Column::InviteCode.eq(code))
1146 .one(&*tx)
1147 .await?
1148 .ok_or_else(|| {
1149 Error::Http(
1150 StatusCode::NOT_FOUND,
1151 "that invite code does not exist".to_string(),
1152 )
1153 })
1154 })
1155 .await
1156 }
1157
1158 // rooms
1159
1160 pub async fn incoming_call_for_user(
1161 &self,
1162 user_id: UserId,
1163 ) -> Result<Option<proto::IncomingCall>> {
1164 self.transaction(|tx| async move {
1165 let pending_participant = room_participant::Entity::find()
1166 .filter(
1167 room_participant::Column::UserId
1168 .eq(user_id)
1169 .and(room_participant::Column::AnsweringConnectionId.is_null()),
1170 )
1171 .one(&*tx)
1172 .await?;
1173
1174 if let Some(pending_participant) = pending_participant {
1175 let room = self.get_room(pending_participant.room_id, &tx).await?;
1176 Ok(Self::build_incoming_call(&room, user_id))
1177 } else {
1178 Ok(None)
1179 }
1180 })
1181 .await
1182 }
1183
1184 pub async fn create_room(
1185 &self,
1186 user_id: UserId,
1187 connection: ConnectionId,
1188 live_kit_room: &str,
1189 ) -> Result<proto::Room> {
1190 self.transaction(|tx| async move {
1191 let room = room::ActiveModel {
1192 live_kit_room: ActiveValue::set(live_kit_room.into()),
1193 ..Default::default()
1194 }
1195 .insert(&*tx)
1196 .await?;
1197 room_participant::ActiveModel {
1198 room_id: ActiveValue::set(room.id),
1199 user_id: ActiveValue::set(user_id),
1200 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1201 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1202 connection.owner_id as i32,
1203 ))),
1204 answering_connection_lost: ActiveValue::set(false),
1205 calling_user_id: ActiveValue::set(user_id),
1206 calling_connection_id: ActiveValue::set(connection.id as i32),
1207 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1208 connection.owner_id as i32,
1209 ))),
1210 ..Default::default()
1211 }
1212 .insert(&*tx)
1213 .await?;
1214
1215 let room = self.get_room(room.id, &tx).await?;
1216 Ok(room)
1217 })
1218 .await
1219 }
1220
1221 pub async fn call(
1222 &self,
1223 room_id: RoomId,
1224 calling_user_id: UserId,
1225 calling_connection: ConnectionId,
1226 called_user_id: UserId,
1227 initial_project_id: Option<ProjectId>,
1228 ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1229 self.room_transaction(room_id, |tx| async move {
1230 room_participant::ActiveModel {
1231 room_id: ActiveValue::set(room_id),
1232 user_id: ActiveValue::set(called_user_id),
1233 answering_connection_lost: ActiveValue::set(false),
1234 calling_user_id: ActiveValue::set(calling_user_id),
1235 calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1236 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1237 calling_connection.owner_id as i32,
1238 ))),
1239 initial_project_id: ActiveValue::set(initial_project_id),
1240 ..Default::default()
1241 }
1242 .insert(&*tx)
1243 .await?;
1244
1245 let room = self.get_room(room_id, &tx).await?;
1246 let incoming_call = Self::build_incoming_call(&room, called_user_id)
1247 .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1248 Ok((room, incoming_call))
1249 })
1250 .await
1251 }
1252
1253 pub async fn call_failed(
1254 &self,
1255 room_id: RoomId,
1256 called_user_id: UserId,
1257 ) -> Result<RoomGuard<proto::Room>> {
1258 self.room_transaction(room_id, |tx| async move {
1259 room_participant::Entity::delete_many()
1260 .filter(
1261 room_participant::Column::RoomId
1262 .eq(room_id)
1263 .and(room_participant::Column::UserId.eq(called_user_id)),
1264 )
1265 .exec(&*tx)
1266 .await?;
1267 let room = self.get_room(room_id, &tx).await?;
1268 Ok(room)
1269 })
1270 .await
1271 }
1272
1273 pub async fn decline_call(
1274 &self,
1275 expected_room_id: Option<RoomId>,
1276 user_id: UserId,
1277 ) -> Result<Option<RoomGuard<proto::Room>>> {
1278 self.optional_room_transaction(|tx| async move {
1279 let mut filter = Condition::all()
1280 .add(room_participant::Column::UserId.eq(user_id))
1281 .add(room_participant::Column::AnsweringConnectionId.is_null());
1282 if let Some(room_id) = expected_room_id {
1283 filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1284 }
1285 let participant = room_participant::Entity::find()
1286 .filter(filter)
1287 .one(&*tx)
1288 .await?;
1289
1290 let participant = if let Some(participant) = participant {
1291 participant
1292 } else if expected_room_id.is_some() {
1293 return Err(anyhow!("could not find call to decline"))?;
1294 } else {
1295 return Ok(None);
1296 };
1297
1298 let room_id = participant.room_id;
1299 room_participant::Entity::delete(participant.into_active_model())
1300 .exec(&*tx)
1301 .await?;
1302
1303 let room = self.get_room(room_id, &tx).await?;
1304 Ok(Some((room_id, room)))
1305 })
1306 .await
1307 }
1308
1309 pub async fn cancel_call(
1310 &self,
1311 room_id: RoomId,
1312 calling_connection: ConnectionId,
1313 called_user_id: UserId,
1314 ) -> Result<RoomGuard<proto::Room>> {
1315 self.room_transaction(room_id, |tx| async move {
1316 let participant = room_participant::Entity::find()
1317 .filter(
1318 Condition::all()
1319 .add(room_participant::Column::UserId.eq(called_user_id))
1320 .add(room_participant::Column::RoomId.eq(room_id))
1321 .add(
1322 room_participant::Column::CallingConnectionId
1323 .eq(calling_connection.id as i32),
1324 )
1325 .add(
1326 room_participant::Column::CallingConnectionServerId
1327 .eq(calling_connection.owner_id as i32),
1328 )
1329 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1330 )
1331 .one(&*tx)
1332 .await?
1333 .ok_or_else(|| anyhow!("no call to cancel"))?;
1334
1335 room_participant::Entity::delete(participant.into_active_model())
1336 .exec(&*tx)
1337 .await?;
1338
1339 let room = self.get_room(room_id, &tx).await?;
1340 Ok(room)
1341 })
1342 .await
1343 }
1344
1345 pub async fn is_current_room_different_channel(
1346 &self,
1347 user_id: UserId,
1348 channel_id: ChannelId,
1349 ) -> Result<bool> {
1350 self.transaction(|tx| async move {
1351 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1352 enum QueryAs {
1353 ChannelId,
1354 }
1355
1356 let channel_id_model: Option<ChannelId> = room_participant::Entity::find()
1357 .select_only()
1358 .column_as(room::Column::ChannelId, QueryAs::ChannelId)
1359 .inner_join(room::Entity)
1360 .filter(room_participant::Column::UserId.eq(user_id))
1361 .into_values::<_, QueryAs>()
1362 .one(&*tx)
1363 .await?;
1364
1365 let result = channel_id_model
1366 .map(|channel_id_model| channel_id_model != channel_id)
1367 .unwrap_or(false);
1368
1369 Ok(result)
1370 })
1371 .await
1372 }
1373
1374 pub async fn join_room(
1375 &self,
1376 room_id: RoomId,
1377 user_id: UserId,
1378 channel_id: Option<ChannelId>,
1379 connection: ConnectionId,
1380 ) -> Result<RoomGuard<JoinRoom>> {
1381 self.room_transaction(room_id, |tx| async move {
1382 if let Some(channel_id) = channel_id {
1383 channel_member::Entity::find()
1384 .filter(
1385 channel_member::Column::ChannelId
1386 .eq(channel_id)
1387 .and(channel_member::Column::UserId.eq(user_id))
1388 .and(channel_member::Column::Accepted.eq(true)),
1389 )
1390 .one(&*tx)
1391 .await?
1392 .ok_or_else(|| anyhow!("no such channel membership"))?;
1393
1394 room_participant::ActiveModel {
1395 room_id: ActiveValue::set(room_id),
1396 user_id: ActiveValue::set(user_id),
1397 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1398 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1399 connection.owner_id as i32,
1400 ))),
1401 answering_connection_lost: ActiveValue::set(false),
1402 // Redundant for the channel join use case, used for channel and call invitations
1403 calling_user_id: ActiveValue::set(user_id),
1404 calling_connection_id: ActiveValue::set(connection.id as i32),
1405 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1406 connection.owner_id as i32,
1407 ))),
1408 ..Default::default()
1409 }
1410 .insert(&*tx)
1411 .await?;
1412 } else {
1413 let result = room_participant::Entity::update_many()
1414 .filter(
1415 Condition::all()
1416 .add(room_participant::Column::RoomId.eq(room_id))
1417 .add(room_participant::Column::UserId.eq(user_id))
1418 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1419 )
1420 .set(room_participant::ActiveModel {
1421 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1422 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1423 connection.owner_id as i32,
1424 ))),
1425 answering_connection_lost: ActiveValue::set(false),
1426 ..Default::default()
1427 })
1428 .exec(&*tx)
1429 .await?;
1430 if result.rows_affected == 0 {
1431 Err(anyhow!("room does not exist or was already joined"))?;
1432 }
1433 }
1434
1435 let room = self.get_room(room_id, &tx).await?;
1436 let channel_members = if let Some(channel_id) = channel_id {
1437 self.get_channel_members_internal(channel_id, &tx).await?
1438 } else {
1439 Vec::new()
1440 };
1441 Ok(JoinRoom {
1442 room,
1443 channel_id,
1444 channel_members,
1445 })
1446 })
1447 .await
1448 }
1449
1450 pub async fn rejoin_room(
1451 &self,
1452 rejoin_room: proto::RejoinRoom,
1453 user_id: UserId,
1454 connection: ConnectionId,
1455 ) -> Result<RoomGuard<RejoinedRoom>> {
1456 let room_id = RoomId::from_proto(rejoin_room.id);
1457 self.room_transaction(room_id, |tx| async {
1458 let tx = tx;
1459 let participant_update = room_participant::Entity::update_many()
1460 .filter(
1461 Condition::all()
1462 .add(room_participant::Column::RoomId.eq(room_id))
1463 .add(room_participant::Column::UserId.eq(user_id))
1464 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1465 .add(
1466 Condition::any()
1467 .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1468 .add(
1469 room_participant::Column::AnsweringConnectionServerId
1470 .ne(connection.owner_id as i32),
1471 ),
1472 ),
1473 )
1474 .set(room_participant::ActiveModel {
1475 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1476 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1477 connection.owner_id as i32,
1478 ))),
1479 answering_connection_lost: ActiveValue::set(false),
1480 ..Default::default()
1481 })
1482 .exec(&*tx)
1483 .await?;
1484 if participant_update.rows_affected == 0 {
1485 return Err(anyhow!("room does not exist or was already joined"))?;
1486 }
1487
1488 let mut reshared_projects = Vec::new();
1489 for reshared_project in &rejoin_room.reshared_projects {
1490 let project_id = ProjectId::from_proto(reshared_project.project_id);
1491 let project = project::Entity::find_by_id(project_id)
1492 .one(&*tx)
1493 .await?
1494 .ok_or_else(|| anyhow!("project does not exist"))?;
1495 if project.host_user_id != user_id {
1496 return Err(anyhow!("no such project"))?;
1497 }
1498
1499 let mut collaborators = project
1500 .find_related(project_collaborator::Entity)
1501 .all(&*tx)
1502 .await?;
1503 let host_ix = collaborators
1504 .iter()
1505 .position(|collaborator| {
1506 collaborator.user_id == user_id && collaborator.is_host
1507 })
1508 .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1509 let host = collaborators.swap_remove(host_ix);
1510 let old_connection_id = host.connection();
1511
1512 project::Entity::update(project::ActiveModel {
1513 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1514 host_connection_server_id: ActiveValue::set(Some(ServerId(
1515 connection.owner_id as i32,
1516 ))),
1517 ..project.into_active_model()
1518 })
1519 .exec(&*tx)
1520 .await?;
1521 project_collaborator::Entity::update(project_collaborator::ActiveModel {
1522 connection_id: ActiveValue::set(connection.id as i32),
1523 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1524 ..host.into_active_model()
1525 })
1526 .exec(&*tx)
1527 .await?;
1528
1529 self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1530 .await?;
1531
1532 reshared_projects.push(ResharedProject {
1533 id: project_id,
1534 old_connection_id,
1535 collaborators: collaborators
1536 .iter()
1537 .map(|collaborator| ProjectCollaborator {
1538 connection_id: collaborator.connection(),
1539 user_id: collaborator.user_id,
1540 replica_id: collaborator.replica_id,
1541 is_host: collaborator.is_host,
1542 })
1543 .collect(),
1544 worktrees: reshared_project.worktrees.clone(),
1545 });
1546 }
1547
1548 project::Entity::delete_many()
1549 .filter(
1550 Condition::all()
1551 .add(project::Column::RoomId.eq(room_id))
1552 .add(project::Column::HostUserId.eq(user_id))
1553 .add(
1554 project::Column::Id
1555 .is_not_in(reshared_projects.iter().map(|project| project.id)),
1556 ),
1557 )
1558 .exec(&*tx)
1559 .await?;
1560
1561 let mut rejoined_projects = Vec::new();
1562 for rejoined_project in &rejoin_room.rejoined_projects {
1563 let project_id = ProjectId::from_proto(rejoined_project.id);
1564 let Some(project) = project::Entity::find_by_id(project_id)
1565 .one(&*tx)
1566 .await? else { continue };
1567
1568 let mut worktrees = Vec::new();
1569 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1570 for db_worktree in db_worktrees {
1571 let mut worktree = RejoinedWorktree {
1572 id: db_worktree.id as u64,
1573 abs_path: db_worktree.abs_path,
1574 root_name: db_worktree.root_name,
1575 visible: db_worktree.visible,
1576 updated_entries: Default::default(),
1577 removed_entries: Default::default(),
1578 updated_repositories: Default::default(),
1579 removed_repositories: Default::default(),
1580 diagnostic_summaries: Default::default(),
1581 settings_files: Default::default(),
1582 scan_id: db_worktree.scan_id as u64,
1583 completed_scan_id: db_worktree.completed_scan_id as u64,
1584 };
1585
1586 let rejoined_worktree = rejoined_project
1587 .worktrees
1588 .iter()
1589 .find(|worktree| worktree.id == db_worktree.id as u64);
1590
1591 // File entries
1592 {
1593 let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1594 worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1595 } else {
1596 worktree_entry::Column::IsDeleted.eq(false)
1597 };
1598
1599 let mut db_entries = worktree_entry::Entity::find()
1600 .filter(
1601 Condition::all()
1602 .add(worktree_entry::Column::ProjectId.eq(project.id))
1603 .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1604 .add(entry_filter),
1605 )
1606 .stream(&*tx)
1607 .await?;
1608
1609 while let Some(db_entry) = db_entries.next().await {
1610 let db_entry = db_entry?;
1611 if db_entry.is_deleted {
1612 worktree.removed_entries.push(db_entry.id as u64);
1613 } else {
1614 worktree.updated_entries.push(proto::Entry {
1615 id: db_entry.id as u64,
1616 is_dir: db_entry.is_dir,
1617 path: db_entry.path,
1618 inode: db_entry.inode as u64,
1619 mtime: Some(proto::Timestamp {
1620 seconds: db_entry.mtime_seconds as u64,
1621 nanos: db_entry.mtime_nanos as u32,
1622 }),
1623 is_symlink: db_entry.is_symlink,
1624 is_ignored: db_entry.is_ignored,
1625 is_external: db_entry.is_external,
1626 git_status: db_entry.git_status.map(|status| status as i32),
1627 });
1628 }
1629 }
1630 }
1631
1632 // Repository Entries
1633 {
1634 let repository_entry_filter =
1635 if let Some(rejoined_worktree) = rejoined_worktree {
1636 worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
1637 } else {
1638 worktree_repository::Column::IsDeleted.eq(false)
1639 };
1640
1641 let mut db_repositories = worktree_repository::Entity::find()
1642 .filter(
1643 Condition::all()
1644 .add(worktree_repository::Column::ProjectId.eq(project.id))
1645 .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
1646 .add(repository_entry_filter),
1647 )
1648 .stream(&*tx)
1649 .await?;
1650
1651 while let Some(db_repository) = db_repositories.next().await {
1652 let db_repository = db_repository?;
1653 if db_repository.is_deleted {
1654 worktree
1655 .removed_repositories
1656 .push(db_repository.work_directory_id as u64);
1657 } else {
1658 worktree.updated_repositories.push(proto::RepositoryEntry {
1659 work_directory_id: db_repository.work_directory_id as u64,
1660 branch: db_repository.branch,
1661 });
1662 }
1663 }
1664 }
1665
1666 worktrees.push(worktree);
1667 }
1668
1669 let language_servers = project
1670 .find_related(language_server::Entity)
1671 .all(&*tx)
1672 .await?
1673 .into_iter()
1674 .map(|language_server| proto::LanguageServer {
1675 id: language_server.id as u64,
1676 name: language_server.name,
1677 })
1678 .collect::<Vec<_>>();
1679
1680 {
1681 let mut db_settings_files = worktree_settings_file::Entity::find()
1682 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
1683 .stream(&*tx)
1684 .await?;
1685 while let Some(db_settings_file) = db_settings_files.next().await {
1686 let db_settings_file = db_settings_file?;
1687 if let Some(worktree) = worktrees
1688 .iter_mut()
1689 .find(|w| w.id == db_settings_file.worktree_id as u64)
1690 {
1691 worktree.settings_files.push(WorktreeSettingsFile {
1692 path: db_settings_file.path,
1693 content: db_settings_file.content,
1694 });
1695 }
1696 }
1697 }
1698
1699 let mut collaborators = project
1700 .find_related(project_collaborator::Entity)
1701 .all(&*tx)
1702 .await?;
1703 let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1704 .iter()
1705 .position(|collaborator| collaborator.user_id == user_id)
1706 {
1707 collaborators.swap_remove(self_collaborator_ix)
1708 } else {
1709 continue;
1710 };
1711 let old_connection_id = self_collaborator.connection();
1712 project_collaborator::Entity::update(project_collaborator::ActiveModel {
1713 connection_id: ActiveValue::set(connection.id as i32),
1714 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1715 ..self_collaborator.into_active_model()
1716 })
1717 .exec(&*tx)
1718 .await?;
1719
1720 let collaborators = collaborators
1721 .into_iter()
1722 .map(|collaborator| ProjectCollaborator {
1723 connection_id: collaborator.connection(),
1724 user_id: collaborator.user_id,
1725 replica_id: collaborator.replica_id,
1726 is_host: collaborator.is_host,
1727 })
1728 .collect::<Vec<_>>();
1729
1730 rejoined_projects.push(RejoinedProject {
1731 id: project_id,
1732 old_connection_id,
1733 collaborators,
1734 worktrees,
1735 language_servers,
1736 });
1737 }
1738
1739 let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
1740
1741 let channel_members = if let Some(channel_id) = channel_id {
1742 self.get_channel_members_internal(channel_id, &tx).await?
1743 } else {
1744 Vec::new()
1745 };
1746
1747 Ok(RejoinedRoom {
1748 room,
1749 channel_id,
1750 channel_members,
1751 rejoined_projects,
1752 reshared_projects,
1753 })
1754 })
1755 .await
1756 }
1757
1758 pub async fn leave_room(
1759 &self,
1760 connection: ConnectionId,
1761 ) -> Result<Option<RoomGuard<LeftRoom>>> {
1762 self.optional_room_transaction(|tx| async move {
1763 let leaving_participant = room_participant::Entity::find()
1764 .filter(
1765 Condition::all()
1766 .add(
1767 room_participant::Column::AnsweringConnectionId
1768 .eq(connection.id as i32),
1769 )
1770 .add(
1771 room_participant::Column::AnsweringConnectionServerId
1772 .eq(connection.owner_id as i32),
1773 ),
1774 )
1775 .one(&*tx)
1776 .await?;
1777
1778 if let Some(leaving_participant) = leaving_participant {
1779 // Leave room.
1780 let room_id = leaving_participant.room_id;
1781 room_participant::Entity::delete_by_id(leaving_participant.id)
1782 .exec(&*tx)
1783 .await?;
1784
1785 // Cancel pending calls initiated by the leaving user.
1786 let called_participants = room_participant::Entity::find()
1787 .filter(
1788 Condition::all()
1789 .add(
1790 room_participant::Column::CallingUserId
1791 .eq(leaving_participant.user_id),
1792 )
1793 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1794 )
1795 .all(&*tx)
1796 .await?;
1797 room_participant::Entity::delete_many()
1798 .filter(
1799 room_participant::Column::Id
1800 .is_in(called_participants.iter().map(|participant| participant.id)),
1801 )
1802 .exec(&*tx)
1803 .await?;
1804 let canceled_calls_to_user_ids = called_participants
1805 .into_iter()
1806 .map(|participant| participant.user_id)
1807 .collect();
1808
1809 // Detect left projects.
1810 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1811 enum QueryProjectIds {
1812 ProjectId,
1813 }
1814 let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1815 .select_only()
1816 .column_as(
1817 project_collaborator::Column::ProjectId,
1818 QueryProjectIds::ProjectId,
1819 )
1820 .filter(
1821 Condition::all()
1822 .add(
1823 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1824 )
1825 .add(
1826 project_collaborator::Column::ConnectionServerId
1827 .eq(connection.owner_id as i32),
1828 ),
1829 )
1830 .into_values::<_, QueryProjectIds>()
1831 .all(&*tx)
1832 .await?;
1833 let mut left_projects = HashMap::default();
1834 let mut collaborators = project_collaborator::Entity::find()
1835 .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1836 .stream(&*tx)
1837 .await?;
1838 while let Some(collaborator) = collaborators.next().await {
1839 let collaborator = collaborator?;
1840 let left_project =
1841 left_projects
1842 .entry(collaborator.project_id)
1843 .or_insert(LeftProject {
1844 id: collaborator.project_id,
1845 host_user_id: Default::default(),
1846 connection_ids: Default::default(),
1847 host_connection_id: Default::default(),
1848 });
1849
1850 let collaborator_connection_id = collaborator.connection();
1851 if collaborator_connection_id != connection {
1852 left_project.connection_ids.push(collaborator_connection_id);
1853 }
1854
1855 if collaborator.is_host {
1856 left_project.host_user_id = collaborator.user_id;
1857 left_project.host_connection_id = collaborator_connection_id;
1858 }
1859 }
1860 drop(collaborators);
1861
1862 // Leave projects.
1863 project_collaborator::Entity::delete_many()
1864 .filter(
1865 Condition::all()
1866 .add(
1867 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1868 )
1869 .add(
1870 project_collaborator::Column::ConnectionServerId
1871 .eq(connection.owner_id as i32),
1872 ),
1873 )
1874 .exec(&*tx)
1875 .await?;
1876
1877 // Unshare projects.
1878 project::Entity::delete_many()
1879 .filter(
1880 Condition::all()
1881 .add(project::Column::RoomId.eq(room_id))
1882 .add(project::Column::HostConnectionId.eq(connection.id as i32))
1883 .add(
1884 project::Column::HostConnectionServerId
1885 .eq(connection.owner_id as i32),
1886 ),
1887 )
1888 .exec(&*tx)
1889 .await?;
1890
1891 let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
1892 let deleted = if room.participants.is_empty() {
1893 let result = room::Entity::delete_by_id(room_id)
1894 .filter(room::Column::ChannelId.is_null())
1895 .exec(&*tx)
1896 .await?;
1897 result.rows_affected > 0
1898 } else {
1899 false
1900 };
1901
1902 let channel_members = if let Some(channel_id) = channel_id {
1903 self.get_channel_members_internal(channel_id, &tx).await?
1904 } else {
1905 Vec::new()
1906 };
1907 let left_room = LeftRoom {
1908 room,
1909 channel_id,
1910 channel_members,
1911 left_projects,
1912 canceled_calls_to_user_ids,
1913 deleted,
1914 };
1915
1916 if left_room.room.participants.is_empty() {
1917 self.rooms.remove(&room_id);
1918 }
1919
1920 Ok(Some((room_id, left_room)))
1921 } else {
1922 Ok(None)
1923 }
1924 })
1925 .await
1926 }
1927
1928 pub async fn follow(
1929 &self,
1930 project_id: ProjectId,
1931 leader_connection: ConnectionId,
1932 follower_connection: ConnectionId,
1933 ) -> Result<RoomGuard<proto::Room>> {
1934 let room_id = self.room_id_for_project(project_id).await?;
1935 self.room_transaction(room_id, |tx| async move {
1936 follower::ActiveModel {
1937 room_id: ActiveValue::set(room_id),
1938 project_id: ActiveValue::set(project_id),
1939 leader_connection_server_id: ActiveValue::set(ServerId(
1940 leader_connection.owner_id as i32,
1941 )),
1942 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1943 follower_connection_server_id: ActiveValue::set(ServerId(
1944 follower_connection.owner_id as i32,
1945 )),
1946 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1947 ..Default::default()
1948 }
1949 .insert(&*tx)
1950 .await?;
1951
1952 let room = self.get_room(room_id, &*tx).await?;
1953 Ok(room)
1954 })
1955 .await
1956 }
1957
1958 pub async fn unfollow(
1959 &self,
1960 project_id: ProjectId,
1961 leader_connection: ConnectionId,
1962 follower_connection: ConnectionId,
1963 ) -> Result<RoomGuard<proto::Room>> {
1964 let room_id = self.room_id_for_project(project_id).await?;
1965 self.room_transaction(room_id, |tx| async move {
1966 follower::Entity::delete_many()
1967 .filter(
1968 Condition::all()
1969 .add(follower::Column::ProjectId.eq(project_id))
1970 .add(
1971 follower::Column::LeaderConnectionServerId
1972 .eq(leader_connection.owner_id),
1973 )
1974 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1975 .add(
1976 follower::Column::FollowerConnectionServerId
1977 .eq(follower_connection.owner_id),
1978 )
1979 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1980 )
1981 .exec(&*tx)
1982 .await?;
1983
1984 let room = self.get_room(room_id, &*tx).await?;
1985 Ok(room)
1986 })
1987 .await
1988 }
1989
1990 pub async fn update_room_participant_location(
1991 &self,
1992 room_id: RoomId,
1993 connection: ConnectionId,
1994 location: proto::ParticipantLocation,
1995 ) -> Result<RoomGuard<proto::Room>> {
1996 self.room_transaction(room_id, |tx| async {
1997 let tx = tx;
1998 let location_kind;
1999 let location_project_id;
2000 match location
2001 .variant
2002 .as_ref()
2003 .ok_or_else(|| anyhow!("invalid location"))?
2004 {
2005 proto::participant_location::Variant::SharedProject(project) => {
2006 location_kind = 0;
2007 location_project_id = Some(ProjectId::from_proto(project.id));
2008 }
2009 proto::participant_location::Variant::UnsharedProject(_) => {
2010 location_kind = 1;
2011 location_project_id = None;
2012 }
2013 proto::participant_location::Variant::External(_) => {
2014 location_kind = 2;
2015 location_project_id = None;
2016 }
2017 }
2018
2019 let result = room_participant::Entity::update_many()
2020 .filter(
2021 Condition::all()
2022 .add(room_participant::Column::RoomId.eq(room_id))
2023 .add(
2024 room_participant::Column::AnsweringConnectionId
2025 .eq(connection.id as i32),
2026 )
2027 .add(
2028 room_participant::Column::AnsweringConnectionServerId
2029 .eq(connection.owner_id as i32),
2030 ),
2031 )
2032 .set(room_participant::ActiveModel {
2033 location_kind: ActiveValue::set(Some(location_kind)),
2034 location_project_id: ActiveValue::set(location_project_id),
2035 ..Default::default()
2036 })
2037 .exec(&*tx)
2038 .await?;
2039
2040 if result.rows_affected == 1 {
2041 let room = self.get_room(room_id, &tx).await?;
2042 Ok(room)
2043 } else {
2044 Err(anyhow!("could not update room participant location"))?
2045 }
2046 })
2047 .await
2048 }
2049
2050 pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
2051 self.transaction(|tx| async move {
2052 let participant = room_participant::Entity::find()
2053 .filter(
2054 Condition::all()
2055 .add(
2056 room_participant::Column::AnsweringConnectionId
2057 .eq(connection.id as i32),
2058 )
2059 .add(
2060 room_participant::Column::AnsweringConnectionServerId
2061 .eq(connection.owner_id as i32),
2062 ),
2063 )
2064 .one(&*tx)
2065 .await?
2066 .ok_or_else(|| anyhow!("not a participant in any room"))?;
2067
2068 room_participant::Entity::update(room_participant::ActiveModel {
2069 answering_connection_lost: ActiveValue::set(true),
2070 ..participant.into_active_model()
2071 })
2072 .exec(&*tx)
2073 .await?;
2074
2075 Ok(())
2076 })
2077 .await
2078 }
2079
2080 fn build_incoming_call(
2081 room: &proto::Room,
2082 called_user_id: UserId,
2083 ) -> Option<proto::IncomingCall> {
2084 let pending_participant = room
2085 .pending_participants
2086 .iter()
2087 .find(|participant| participant.user_id == called_user_id.to_proto())?;
2088
2089 Some(proto::IncomingCall {
2090 room_id: room.id,
2091 calling_user_id: pending_participant.calling_user_id,
2092 participant_user_ids: room
2093 .participants
2094 .iter()
2095 .map(|participant| participant.user_id)
2096 .collect(),
2097 initial_project: room.participants.iter().find_map(|participant| {
2098 let initial_project_id = pending_participant.initial_project_id?;
2099 participant
2100 .projects
2101 .iter()
2102 .find(|project| project.id == initial_project_id)
2103 .cloned()
2104 }),
2105 })
2106 }
2107 async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
2108 let (_, room) = self.get_channel_room(room_id, tx).await?;
2109 Ok(room)
2110 }
2111
2112 async fn get_channel_room(
2113 &self,
2114 room_id: RoomId,
2115 tx: &DatabaseTransaction,
2116 ) -> Result<(Option<ChannelId>, proto::Room)> {
2117 let db_room = room::Entity::find_by_id(room_id)
2118 .one(tx)
2119 .await?
2120 .ok_or_else(|| anyhow!("could not find room"))?;
2121
2122 let mut db_participants = db_room
2123 .find_related(room_participant::Entity)
2124 .stream(tx)
2125 .await?;
2126 let mut participants = HashMap::default();
2127 let mut pending_participants = Vec::new();
2128 while let Some(db_participant) = db_participants.next().await {
2129 let db_participant = db_participant?;
2130 if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
2131 .answering_connection_id
2132 .zip(db_participant.answering_connection_server_id)
2133 {
2134 let location = match (
2135 db_participant.location_kind,
2136 db_participant.location_project_id,
2137 ) {
2138 (Some(0), Some(project_id)) => {
2139 Some(proto::participant_location::Variant::SharedProject(
2140 proto::participant_location::SharedProject {
2141 id: project_id.to_proto(),
2142 },
2143 ))
2144 }
2145 (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
2146 Default::default(),
2147 )),
2148 _ => Some(proto::participant_location::Variant::External(
2149 Default::default(),
2150 )),
2151 };
2152
2153 let answering_connection = ConnectionId {
2154 owner_id: answering_connection_server_id.0 as u32,
2155 id: answering_connection_id as u32,
2156 };
2157 participants.insert(
2158 answering_connection,
2159 proto::Participant {
2160 user_id: db_participant.user_id.to_proto(),
2161 peer_id: Some(answering_connection.into()),
2162 projects: Default::default(),
2163 location: Some(proto::ParticipantLocation { variant: location }),
2164 },
2165 );
2166 } else {
2167 pending_participants.push(proto::PendingParticipant {
2168 user_id: db_participant.user_id.to_proto(),
2169 calling_user_id: db_participant.calling_user_id.to_proto(),
2170 initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
2171 });
2172 }
2173 }
2174 drop(db_participants);
2175
2176 let mut db_projects = db_room
2177 .find_related(project::Entity)
2178 .find_with_related(worktree::Entity)
2179 .stream(tx)
2180 .await?;
2181
2182 while let Some(row) = db_projects.next().await {
2183 let (db_project, db_worktree) = row?;
2184 let host_connection = db_project.host_connection()?;
2185 if let Some(participant) = participants.get_mut(&host_connection) {
2186 let project = if let Some(project) = participant
2187 .projects
2188 .iter_mut()
2189 .find(|project| project.id == db_project.id.to_proto())
2190 {
2191 project
2192 } else {
2193 participant.projects.push(proto::ParticipantProject {
2194 id: db_project.id.to_proto(),
2195 worktree_root_names: Default::default(),
2196 });
2197 participant.projects.last_mut().unwrap()
2198 };
2199
2200 if let Some(db_worktree) = db_worktree {
2201 if db_worktree.visible {
2202 project.worktree_root_names.push(db_worktree.root_name);
2203 }
2204 }
2205 }
2206 }
2207 drop(db_projects);
2208
2209 let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
2210 let mut followers = Vec::new();
2211 while let Some(db_follower) = db_followers.next().await {
2212 let db_follower = db_follower?;
2213 followers.push(proto::Follower {
2214 leader_id: Some(db_follower.leader_connection().into()),
2215 follower_id: Some(db_follower.follower_connection().into()),
2216 project_id: db_follower.project_id.to_proto(),
2217 });
2218 }
2219
2220 Ok((
2221 db_room.channel_id,
2222 proto::Room {
2223 id: db_room.id.to_proto(),
2224 live_kit_room: db_room.live_kit_room,
2225 participants: participants.into_values().collect(),
2226 pending_participants,
2227 followers,
2228 },
2229 ))
2230 }
2231
2232 // projects
2233
2234 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
2235 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2236 enum QueryAs {
2237 Count,
2238 }
2239
2240 self.transaction(|tx| async move {
2241 Ok(project::Entity::find()
2242 .select_only()
2243 .column_as(project::Column::Id.count(), QueryAs::Count)
2244 .inner_join(user::Entity)
2245 .filter(user::Column::Admin.eq(false))
2246 .into_values::<_, QueryAs>()
2247 .one(&*tx)
2248 .await?
2249 .unwrap_or(0i64) as usize)
2250 })
2251 .await
2252 }
2253
2254 pub async fn share_project(
2255 &self,
2256 room_id: RoomId,
2257 connection: ConnectionId,
2258 worktrees: &[proto::WorktreeMetadata],
2259 ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
2260 self.room_transaction(room_id, |tx| async move {
2261 let participant = room_participant::Entity::find()
2262 .filter(
2263 Condition::all()
2264 .add(
2265 room_participant::Column::AnsweringConnectionId
2266 .eq(connection.id as i32),
2267 )
2268 .add(
2269 room_participant::Column::AnsweringConnectionServerId
2270 .eq(connection.owner_id as i32),
2271 ),
2272 )
2273 .one(&*tx)
2274 .await?
2275 .ok_or_else(|| anyhow!("could not find participant"))?;
2276 if participant.room_id != room_id {
2277 return Err(anyhow!("shared project on unexpected room"))?;
2278 }
2279
2280 let project = project::ActiveModel {
2281 room_id: ActiveValue::set(participant.room_id),
2282 host_user_id: ActiveValue::set(participant.user_id),
2283 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
2284 host_connection_server_id: ActiveValue::set(Some(ServerId(
2285 connection.owner_id as i32,
2286 ))),
2287 ..Default::default()
2288 }
2289 .insert(&*tx)
2290 .await?;
2291
2292 if !worktrees.is_empty() {
2293 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2294 worktree::ActiveModel {
2295 id: ActiveValue::set(worktree.id as i64),
2296 project_id: ActiveValue::set(project.id),
2297 abs_path: ActiveValue::set(worktree.abs_path.clone()),
2298 root_name: ActiveValue::set(worktree.root_name.clone()),
2299 visible: ActiveValue::set(worktree.visible),
2300 scan_id: ActiveValue::set(0),
2301 completed_scan_id: ActiveValue::set(0),
2302 }
2303 }))
2304 .exec(&*tx)
2305 .await?;
2306 }
2307
2308 project_collaborator::ActiveModel {
2309 project_id: ActiveValue::set(project.id),
2310 connection_id: ActiveValue::set(connection.id as i32),
2311 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2312 user_id: ActiveValue::set(participant.user_id),
2313 replica_id: ActiveValue::set(ReplicaId(0)),
2314 is_host: ActiveValue::set(true),
2315 ..Default::default()
2316 }
2317 .insert(&*tx)
2318 .await?;
2319
2320 let room = self.get_room(room_id, &tx).await?;
2321 Ok((project.id, room))
2322 })
2323 .await
2324 }
2325
2326 pub async fn unshare_project(
2327 &self,
2328 project_id: ProjectId,
2329 connection: ConnectionId,
2330 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2331 let room_id = self.room_id_for_project(project_id).await?;
2332 self.room_transaction(room_id, |tx| async move {
2333 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2334
2335 let project = project::Entity::find_by_id(project_id)
2336 .one(&*tx)
2337 .await?
2338 .ok_or_else(|| anyhow!("project not found"))?;
2339 if project.host_connection()? == connection {
2340 project::Entity::delete(project.into_active_model())
2341 .exec(&*tx)
2342 .await?;
2343 let room = self.get_room(room_id, &tx).await?;
2344 Ok((room, guest_connection_ids))
2345 } else {
2346 Err(anyhow!("cannot unshare a project hosted by another user"))?
2347 }
2348 })
2349 .await
2350 }
2351
2352 pub async fn update_project(
2353 &self,
2354 project_id: ProjectId,
2355 connection: ConnectionId,
2356 worktrees: &[proto::WorktreeMetadata],
2357 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2358 let room_id = self.room_id_for_project(project_id).await?;
2359 self.room_transaction(room_id, |tx| async move {
2360 let project = project::Entity::find_by_id(project_id)
2361 .filter(
2362 Condition::all()
2363 .add(project::Column::HostConnectionId.eq(connection.id as i32))
2364 .add(
2365 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2366 ),
2367 )
2368 .one(&*tx)
2369 .await?
2370 .ok_or_else(|| anyhow!("no such project"))?;
2371
2372 self.update_project_worktrees(project.id, worktrees, &tx)
2373 .await?;
2374
2375 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2376 let room = self.get_room(project.room_id, &tx).await?;
2377 Ok((room, guest_connection_ids))
2378 })
2379 .await
2380 }
2381
2382 async fn update_project_worktrees(
2383 &self,
2384 project_id: ProjectId,
2385 worktrees: &[proto::WorktreeMetadata],
2386 tx: &DatabaseTransaction,
2387 ) -> Result<()> {
2388 if !worktrees.is_empty() {
2389 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2390 id: ActiveValue::set(worktree.id as i64),
2391 project_id: ActiveValue::set(project_id),
2392 abs_path: ActiveValue::set(worktree.abs_path.clone()),
2393 root_name: ActiveValue::set(worktree.root_name.clone()),
2394 visible: ActiveValue::set(worktree.visible),
2395 scan_id: ActiveValue::set(0),
2396 completed_scan_id: ActiveValue::set(0),
2397 }))
2398 .on_conflict(
2399 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2400 .update_column(worktree::Column::RootName)
2401 .to_owned(),
2402 )
2403 .exec(&*tx)
2404 .await?;
2405 }
2406
2407 worktree::Entity::delete_many()
2408 .filter(worktree::Column::ProjectId.eq(project_id).and(
2409 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2410 ))
2411 .exec(&*tx)
2412 .await?;
2413
2414 Ok(())
2415 }
2416
2417 pub async fn update_worktree(
2418 &self,
2419 update: &proto::UpdateWorktree,
2420 connection: ConnectionId,
2421 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2422 let project_id = ProjectId::from_proto(update.project_id);
2423 let worktree_id = update.worktree_id as i64;
2424 let room_id = self.room_id_for_project(project_id).await?;
2425 self.room_transaction(room_id, |tx| async move {
2426 // Ensure the update comes from the host.
2427 let _project = project::Entity::find_by_id(project_id)
2428 .filter(
2429 Condition::all()
2430 .add(project::Column::HostConnectionId.eq(connection.id as i32))
2431 .add(
2432 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2433 ),
2434 )
2435 .one(&*tx)
2436 .await?
2437 .ok_or_else(|| anyhow!("no such project"))?;
2438
2439 // Update metadata.
2440 worktree::Entity::update(worktree::ActiveModel {
2441 id: ActiveValue::set(worktree_id),
2442 project_id: ActiveValue::set(project_id),
2443 root_name: ActiveValue::set(update.root_name.clone()),
2444 scan_id: ActiveValue::set(update.scan_id as i64),
2445 completed_scan_id: if update.is_last_update {
2446 ActiveValue::set(update.scan_id as i64)
2447 } else {
2448 ActiveValue::default()
2449 },
2450 abs_path: ActiveValue::set(update.abs_path.clone()),
2451 ..Default::default()
2452 })
2453 .exec(&*tx)
2454 .await?;
2455
2456 if !update.updated_entries.is_empty() {
2457 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2458 let mtime = entry.mtime.clone().unwrap_or_default();
2459 worktree_entry::ActiveModel {
2460 project_id: ActiveValue::set(project_id),
2461 worktree_id: ActiveValue::set(worktree_id),
2462 id: ActiveValue::set(entry.id as i64),
2463 is_dir: ActiveValue::set(entry.is_dir),
2464 path: ActiveValue::set(entry.path.clone()),
2465 inode: ActiveValue::set(entry.inode as i64),
2466 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2467 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2468 is_symlink: ActiveValue::set(entry.is_symlink),
2469 is_ignored: ActiveValue::set(entry.is_ignored),
2470 is_external: ActiveValue::set(entry.is_external),
2471 git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
2472 is_deleted: ActiveValue::set(false),
2473 scan_id: ActiveValue::set(update.scan_id as i64),
2474 }
2475 }))
2476 .on_conflict(
2477 OnConflict::columns([
2478 worktree_entry::Column::ProjectId,
2479 worktree_entry::Column::WorktreeId,
2480 worktree_entry::Column::Id,
2481 ])
2482 .update_columns([
2483 worktree_entry::Column::IsDir,
2484 worktree_entry::Column::Path,
2485 worktree_entry::Column::Inode,
2486 worktree_entry::Column::MtimeSeconds,
2487 worktree_entry::Column::MtimeNanos,
2488 worktree_entry::Column::IsSymlink,
2489 worktree_entry::Column::IsIgnored,
2490 worktree_entry::Column::GitStatus,
2491 worktree_entry::Column::ScanId,
2492 ])
2493 .to_owned(),
2494 )
2495 .exec(&*tx)
2496 .await?;
2497 }
2498
2499 if !update.removed_entries.is_empty() {
2500 worktree_entry::Entity::update_many()
2501 .filter(
2502 worktree_entry::Column::ProjectId
2503 .eq(project_id)
2504 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2505 .and(
2506 worktree_entry::Column::Id
2507 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2508 ),
2509 )
2510 .set(worktree_entry::ActiveModel {
2511 is_deleted: ActiveValue::Set(true),
2512 scan_id: ActiveValue::Set(update.scan_id as i64),
2513 ..Default::default()
2514 })
2515 .exec(&*tx)
2516 .await?;
2517 }
2518
2519 if !update.updated_repositories.is_empty() {
2520 worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
2521 |repository| worktree_repository::ActiveModel {
2522 project_id: ActiveValue::set(project_id),
2523 worktree_id: ActiveValue::set(worktree_id),
2524 work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
2525 scan_id: ActiveValue::set(update.scan_id as i64),
2526 branch: ActiveValue::set(repository.branch.clone()),
2527 is_deleted: ActiveValue::set(false),
2528 },
2529 ))
2530 .on_conflict(
2531 OnConflict::columns([
2532 worktree_repository::Column::ProjectId,
2533 worktree_repository::Column::WorktreeId,
2534 worktree_repository::Column::WorkDirectoryId,
2535 ])
2536 .update_columns([
2537 worktree_repository::Column::ScanId,
2538 worktree_repository::Column::Branch,
2539 ])
2540 .to_owned(),
2541 )
2542 .exec(&*tx)
2543 .await?;
2544 }
2545
2546 if !update.removed_repositories.is_empty() {
2547 worktree_repository::Entity::update_many()
2548 .filter(
2549 worktree_repository::Column::ProjectId
2550 .eq(project_id)
2551 .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
2552 .and(
2553 worktree_repository::Column::WorkDirectoryId
2554 .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
2555 ),
2556 )
2557 .set(worktree_repository::ActiveModel {
2558 is_deleted: ActiveValue::Set(true),
2559 scan_id: ActiveValue::Set(update.scan_id as i64),
2560 ..Default::default()
2561 })
2562 .exec(&*tx)
2563 .await?;
2564 }
2565
2566 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2567 Ok(connection_ids)
2568 })
2569 .await
2570 }
2571
2572 pub async fn update_diagnostic_summary(
2573 &self,
2574 update: &proto::UpdateDiagnosticSummary,
2575 connection: ConnectionId,
2576 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2577 let project_id = ProjectId::from_proto(update.project_id);
2578 let worktree_id = update.worktree_id as i64;
2579 let room_id = self.room_id_for_project(project_id).await?;
2580 self.room_transaction(room_id, |tx| async move {
2581 let summary = update
2582 .summary
2583 .as_ref()
2584 .ok_or_else(|| anyhow!("invalid summary"))?;
2585
2586 // Ensure the update comes from the host.
2587 let project = project::Entity::find_by_id(project_id)
2588 .one(&*tx)
2589 .await?
2590 .ok_or_else(|| anyhow!("no such project"))?;
2591 if project.host_connection()? != connection {
2592 return Err(anyhow!("can't update a project hosted by someone else"))?;
2593 }
2594
2595 // Update summary.
2596 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2597 project_id: ActiveValue::set(project_id),
2598 worktree_id: ActiveValue::set(worktree_id),
2599 path: ActiveValue::set(summary.path.clone()),
2600 language_server_id: ActiveValue::set(summary.language_server_id as i64),
2601 error_count: ActiveValue::set(summary.error_count as i32),
2602 warning_count: ActiveValue::set(summary.warning_count as i32),
2603 ..Default::default()
2604 })
2605 .on_conflict(
2606 OnConflict::columns([
2607 worktree_diagnostic_summary::Column::ProjectId,
2608 worktree_diagnostic_summary::Column::WorktreeId,
2609 worktree_diagnostic_summary::Column::Path,
2610 ])
2611 .update_columns([
2612 worktree_diagnostic_summary::Column::LanguageServerId,
2613 worktree_diagnostic_summary::Column::ErrorCount,
2614 worktree_diagnostic_summary::Column::WarningCount,
2615 ])
2616 .to_owned(),
2617 )
2618 .exec(&*tx)
2619 .await?;
2620
2621 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2622 Ok(connection_ids)
2623 })
2624 .await
2625 }
2626
2627 pub async fn start_language_server(
2628 &self,
2629 update: &proto::StartLanguageServer,
2630 connection: ConnectionId,
2631 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2632 let project_id = ProjectId::from_proto(update.project_id);
2633 let room_id = self.room_id_for_project(project_id).await?;
2634 self.room_transaction(room_id, |tx| async move {
2635 let server = update
2636 .server
2637 .as_ref()
2638 .ok_or_else(|| anyhow!("invalid language server"))?;
2639
2640 // Ensure the update comes from the host.
2641 let project = project::Entity::find_by_id(project_id)
2642 .one(&*tx)
2643 .await?
2644 .ok_or_else(|| anyhow!("no such project"))?;
2645 if project.host_connection()? != connection {
2646 return Err(anyhow!("can't update a project hosted by someone else"))?;
2647 }
2648
2649 // Add the newly-started language server.
2650 language_server::Entity::insert(language_server::ActiveModel {
2651 project_id: ActiveValue::set(project_id),
2652 id: ActiveValue::set(server.id as i64),
2653 name: ActiveValue::set(server.name.clone()),
2654 ..Default::default()
2655 })
2656 .on_conflict(
2657 OnConflict::columns([
2658 language_server::Column::ProjectId,
2659 language_server::Column::Id,
2660 ])
2661 .update_column(language_server::Column::Name)
2662 .to_owned(),
2663 )
2664 .exec(&*tx)
2665 .await?;
2666
2667 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2668 Ok(connection_ids)
2669 })
2670 .await
2671 }
2672
2673 pub async fn update_worktree_settings(
2674 &self,
2675 update: &proto::UpdateWorktreeSettings,
2676 connection: ConnectionId,
2677 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2678 let project_id = ProjectId::from_proto(update.project_id);
2679 let room_id = self.room_id_for_project(project_id).await?;
2680 self.room_transaction(room_id, |tx| async move {
2681 // Ensure the update comes from the host.
2682 let project = project::Entity::find_by_id(project_id)
2683 .one(&*tx)
2684 .await?
2685 .ok_or_else(|| anyhow!("no such project"))?;
2686 if project.host_connection()? != connection {
2687 return Err(anyhow!("can't update a project hosted by someone else"))?;
2688 }
2689
2690 if let Some(content) = &update.content {
2691 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
2692 project_id: ActiveValue::Set(project_id),
2693 worktree_id: ActiveValue::Set(update.worktree_id as i64),
2694 path: ActiveValue::Set(update.path.clone()),
2695 content: ActiveValue::Set(content.clone()),
2696 })
2697 .on_conflict(
2698 OnConflict::columns([
2699 worktree_settings_file::Column::ProjectId,
2700 worktree_settings_file::Column::WorktreeId,
2701 worktree_settings_file::Column::Path,
2702 ])
2703 .update_column(worktree_settings_file::Column::Content)
2704 .to_owned(),
2705 )
2706 .exec(&*tx)
2707 .await?;
2708 } else {
2709 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
2710 project_id: ActiveValue::Set(project_id),
2711 worktree_id: ActiveValue::Set(update.worktree_id as i64),
2712 path: ActiveValue::Set(update.path.clone()),
2713 ..Default::default()
2714 })
2715 .exec(&*tx)
2716 .await?;
2717 }
2718
2719 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2720 Ok(connection_ids)
2721 })
2722 .await
2723 }
2724
2725 pub async fn join_project(
2726 &self,
2727 project_id: ProjectId,
2728 connection: ConnectionId,
2729 ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2730 let room_id = self.room_id_for_project(project_id).await?;
2731 self.room_transaction(room_id, |tx| async move {
2732 let participant = room_participant::Entity::find()
2733 .filter(
2734 Condition::all()
2735 .add(
2736 room_participant::Column::AnsweringConnectionId
2737 .eq(connection.id as i32),
2738 )
2739 .add(
2740 room_participant::Column::AnsweringConnectionServerId
2741 .eq(connection.owner_id as i32),
2742 ),
2743 )
2744 .one(&*tx)
2745 .await?
2746 .ok_or_else(|| anyhow!("must join a room first"))?;
2747
2748 let project = project::Entity::find_by_id(project_id)
2749 .one(&*tx)
2750 .await?
2751 .ok_or_else(|| anyhow!("no such project"))?;
2752 if project.room_id != participant.room_id {
2753 return Err(anyhow!("no such project"))?;
2754 }
2755
2756 let mut collaborators = project
2757 .find_related(project_collaborator::Entity)
2758 .all(&*tx)
2759 .await?;
2760 let replica_ids = collaborators
2761 .iter()
2762 .map(|c| c.replica_id)
2763 .collect::<HashSet<_>>();
2764 let mut replica_id = ReplicaId(1);
2765 while replica_ids.contains(&replica_id) {
2766 replica_id.0 += 1;
2767 }
2768 let new_collaborator = project_collaborator::ActiveModel {
2769 project_id: ActiveValue::set(project_id),
2770 connection_id: ActiveValue::set(connection.id as i32),
2771 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2772 user_id: ActiveValue::set(participant.user_id),
2773 replica_id: ActiveValue::set(replica_id),
2774 is_host: ActiveValue::set(false),
2775 ..Default::default()
2776 }
2777 .insert(&*tx)
2778 .await?;
2779 collaborators.push(new_collaborator);
2780
2781 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2782 let mut worktrees = db_worktrees
2783 .into_iter()
2784 .map(|db_worktree| {
2785 (
2786 db_worktree.id as u64,
2787 Worktree {
2788 id: db_worktree.id as u64,
2789 abs_path: db_worktree.abs_path,
2790 root_name: db_worktree.root_name,
2791 visible: db_worktree.visible,
2792 entries: Default::default(),
2793 repository_entries: Default::default(),
2794 diagnostic_summaries: Default::default(),
2795 settings_files: Default::default(),
2796 scan_id: db_worktree.scan_id as u64,
2797 completed_scan_id: db_worktree.completed_scan_id as u64,
2798 },
2799 )
2800 })
2801 .collect::<BTreeMap<_, _>>();
2802
2803 // Populate worktree entries.
2804 {
2805 let mut db_entries = worktree_entry::Entity::find()
2806 .filter(
2807 Condition::all()
2808 .add(worktree_entry::Column::ProjectId.eq(project_id))
2809 .add(worktree_entry::Column::IsDeleted.eq(false)),
2810 )
2811 .stream(&*tx)
2812 .await?;
2813 while let Some(db_entry) = db_entries.next().await {
2814 let db_entry = db_entry?;
2815 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2816 worktree.entries.push(proto::Entry {
2817 id: db_entry.id as u64,
2818 is_dir: db_entry.is_dir,
2819 path: db_entry.path,
2820 inode: db_entry.inode as u64,
2821 mtime: Some(proto::Timestamp {
2822 seconds: db_entry.mtime_seconds as u64,
2823 nanos: db_entry.mtime_nanos as u32,
2824 }),
2825 is_symlink: db_entry.is_symlink,
2826 is_ignored: db_entry.is_ignored,
2827 is_external: db_entry.is_external,
2828 git_status: db_entry.git_status.map(|status| status as i32),
2829 });
2830 }
2831 }
2832 }
2833
2834 // Populate repository entries.
2835 {
2836 let mut db_repository_entries = worktree_repository::Entity::find()
2837 .filter(
2838 Condition::all()
2839 .add(worktree_repository::Column::ProjectId.eq(project_id))
2840 .add(worktree_repository::Column::IsDeleted.eq(false)),
2841 )
2842 .stream(&*tx)
2843 .await?;
2844 while let Some(db_repository_entry) = db_repository_entries.next().await {
2845 let db_repository_entry = db_repository_entry?;
2846 if let Some(worktree) =
2847 worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
2848 {
2849 worktree.repository_entries.insert(
2850 db_repository_entry.work_directory_id as u64,
2851 proto::RepositoryEntry {
2852 work_directory_id: db_repository_entry.work_directory_id as u64,
2853 branch: db_repository_entry.branch,
2854 },
2855 );
2856 }
2857 }
2858 }
2859
2860 // Populate worktree diagnostic summaries.
2861 {
2862 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2863 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2864 .stream(&*tx)
2865 .await?;
2866 while let Some(db_summary) = db_summaries.next().await {
2867 let db_summary = db_summary?;
2868 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2869 worktree
2870 .diagnostic_summaries
2871 .push(proto::DiagnosticSummary {
2872 path: db_summary.path,
2873 language_server_id: db_summary.language_server_id as u64,
2874 error_count: db_summary.error_count as u32,
2875 warning_count: db_summary.warning_count as u32,
2876 });
2877 }
2878 }
2879 }
2880
2881 // Populate worktree settings files
2882 {
2883 let mut db_settings_files = worktree_settings_file::Entity::find()
2884 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
2885 .stream(&*tx)
2886 .await?;
2887 while let Some(db_settings_file) = db_settings_files.next().await {
2888 let db_settings_file = db_settings_file?;
2889 if let Some(worktree) =
2890 worktrees.get_mut(&(db_settings_file.worktree_id as u64))
2891 {
2892 worktree.settings_files.push(WorktreeSettingsFile {
2893 path: db_settings_file.path,
2894 content: db_settings_file.content,
2895 });
2896 }
2897 }
2898 }
2899
2900 // Populate language servers.
2901 let language_servers = project
2902 .find_related(language_server::Entity)
2903 .all(&*tx)
2904 .await?;
2905
2906 let project = Project {
2907 collaborators: collaborators
2908 .into_iter()
2909 .map(|collaborator| ProjectCollaborator {
2910 connection_id: collaborator.connection(),
2911 user_id: collaborator.user_id,
2912 replica_id: collaborator.replica_id,
2913 is_host: collaborator.is_host,
2914 })
2915 .collect(),
2916 worktrees,
2917 language_servers: language_servers
2918 .into_iter()
2919 .map(|language_server| proto::LanguageServer {
2920 id: language_server.id as u64,
2921 name: language_server.name,
2922 })
2923 .collect(),
2924 };
2925 Ok((project, replica_id as ReplicaId))
2926 })
2927 .await
2928 }
2929
2930 pub async fn leave_project(
2931 &self,
2932 project_id: ProjectId,
2933 connection: ConnectionId,
2934 ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
2935 let room_id = self.room_id_for_project(project_id).await?;
2936 self.room_transaction(room_id, |tx| async move {
2937 let result = project_collaborator::Entity::delete_many()
2938 .filter(
2939 Condition::all()
2940 .add(project_collaborator::Column::ProjectId.eq(project_id))
2941 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2942 .add(
2943 project_collaborator::Column::ConnectionServerId
2944 .eq(connection.owner_id as i32),
2945 ),
2946 )
2947 .exec(&*tx)
2948 .await?;
2949 if result.rows_affected == 0 {
2950 Err(anyhow!("not a collaborator on this project"))?;
2951 }
2952
2953 let project = project::Entity::find_by_id(project_id)
2954 .one(&*tx)
2955 .await?
2956 .ok_or_else(|| anyhow!("no such project"))?;
2957 let collaborators = project
2958 .find_related(project_collaborator::Entity)
2959 .all(&*tx)
2960 .await?;
2961 let connection_ids = collaborators
2962 .into_iter()
2963 .map(|collaborator| collaborator.connection())
2964 .collect();
2965
2966 follower::Entity::delete_many()
2967 .filter(
2968 Condition::any()
2969 .add(
2970 Condition::all()
2971 .add(follower::Column::ProjectId.eq(project_id))
2972 .add(
2973 follower::Column::LeaderConnectionServerId
2974 .eq(connection.owner_id),
2975 )
2976 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
2977 )
2978 .add(
2979 Condition::all()
2980 .add(follower::Column::ProjectId.eq(project_id))
2981 .add(
2982 follower::Column::FollowerConnectionServerId
2983 .eq(connection.owner_id),
2984 )
2985 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
2986 ),
2987 )
2988 .exec(&*tx)
2989 .await?;
2990
2991 let room = self.get_room(project.room_id, &tx).await?;
2992 let left_project = LeftProject {
2993 id: project_id,
2994 host_user_id: project.host_user_id,
2995 host_connection_id: project.host_connection()?,
2996 connection_ids,
2997 };
2998 Ok((room, left_project))
2999 })
3000 .await
3001 }
3002
3003 pub async fn project_collaborators(
3004 &self,
3005 project_id: ProjectId,
3006 connection_id: ConnectionId,
3007 ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
3008 let room_id = self.room_id_for_project(project_id).await?;
3009 self.room_transaction(room_id, |tx| async move {
3010 let collaborators = project_collaborator::Entity::find()
3011 .filter(project_collaborator::Column::ProjectId.eq(project_id))
3012 .all(&*tx)
3013 .await?
3014 .into_iter()
3015 .map(|collaborator| ProjectCollaborator {
3016 connection_id: collaborator.connection(),
3017 user_id: collaborator.user_id,
3018 replica_id: collaborator.replica_id,
3019 is_host: collaborator.is_host,
3020 })
3021 .collect::<Vec<_>>();
3022
3023 if collaborators
3024 .iter()
3025 .any(|collaborator| collaborator.connection_id == connection_id)
3026 {
3027 Ok(collaborators)
3028 } else {
3029 Err(anyhow!("no such project"))?
3030 }
3031 })
3032 .await
3033 }
3034
3035 pub async fn project_connection_ids(
3036 &self,
3037 project_id: ProjectId,
3038 connection_id: ConnectionId,
3039 ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
3040 let room_id = self.room_id_for_project(project_id).await?;
3041 self.room_transaction(room_id, |tx| async move {
3042 let mut collaborators = project_collaborator::Entity::find()
3043 .filter(project_collaborator::Column::ProjectId.eq(project_id))
3044 .stream(&*tx)
3045 .await?;
3046
3047 let mut connection_ids = HashSet::default();
3048 while let Some(collaborator) = collaborators.next().await {
3049 let collaborator = collaborator?;
3050 connection_ids.insert(collaborator.connection());
3051 }
3052
3053 if connection_ids.contains(&connection_id) {
3054 Ok(connection_ids)
3055 } else {
3056 Err(anyhow!("no such project"))?
3057 }
3058 })
3059 .await
3060 }
3061
3062 async fn project_guest_connection_ids(
3063 &self,
3064 project_id: ProjectId,
3065 tx: &DatabaseTransaction,
3066 ) -> Result<Vec<ConnectionId>> {
3067 let mut collaborators = project_collaborator::Entity::find()
3068 .filter(
3069 project_collaborator::Column::ProjectId
3070 .eq(project_id)
3071 .and(project_collaborator::Column::IsHost.eq(false)),
3072 )
3073 .stream(tx)
3074 .await?;
3075
3076 let mut guest_connection_ids = Vec::new();
3077 while let Some(collaborator) = collaborators.next().await {
3078 let collaborator = collaborator?;
3079 guest_connection_ids.push(collaborator.connection());
3080 }
3081 Ok(guest_connection_ids)
3082 }
3083
3084 async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
3085 self.transaction(|tx| async move {
3086 let project = project::Entity::find_by_id(project_id)
3087 .one(&*tx)
3088 .await?
3089 .ok_or_else(|| anyhow!("project {} not found", project_id))?;
3090 Ok(project.room_id)
3091 })
3092 .await
3093 }
3094
3095 // access tokens
3096
3097 pub async fn create_access_token(
3098 &self,
3099 user_id: UserId,
3100 access_token_hash: &str,
3101 max_access_token_count: usize,
3102 ) -> Result<AccessTokenId> {
3103 self.transaction(|tx| async {
3104 let tx = tx;
3105
3106 let token = access_token::ActiveModel {
3107 user_id: ActiveValue::set(user_id),
3108 hash: ActiveValue::set(access_token_hash.into()),
3109 ..Default::default()
3110 }
3111 .insert(&*tx)
3112 .await?;
3113
3114 access_token::Entity::delete_many()
3115 .filter(
3116 access_token::Column::Id.in_subquery(
3117 Query::select()
3118 .column(access_token::Column::Id)
3119 .from(access_token::Entity)
3120 .and_where(access_token::Column::UserId.eq(user_id))
3121 .order_by(access_token::Column::Id, sea_orm::Order::Desc)
3122 .limit(10000)
3123 .offset(max_access_token_count as u64)
3124 .to_owned(),
3125 ),
3126 )
3127 .exec(&*tx)
3128 .await?;
3129 Ok(token.id)
3130 })
3131 .await
3132 }
3133
3134 pub async fn get_access_token(
3135 &self,
3136 access_token_id: AccessTokenId,
3137 ) -> Result<access_token::Model> {
3138 self.transaction(|tx| async move {
3139 Ok(access_token::Entity::find_by_id(access_token_id)
3140 .one(&*tx)
3141 .await?
3142 .ok_or_else(|| anyhow!("no such access token"))?)
3143 })
3144 .await
3145 }
3146
3147 // channels
3148
3149 pub async fn create_root_channel(
3150 &self,
3151 name: &str,
3152 live_kit_room: &str,
3153 creator_id: UserId,
3154 ) -> Result<ChannelId> {
3155 self.create_channel(name, None, live_kit_room, creator_id)
3156 .await
3157 }
3158
3159 pub async fn create_channel(
3160 &self,
3161 name: &str,
3162 parent: Option<ChannelId>,
3163 live_kit_room: &str,
3164 creator_id: UserId,
3165 ) -> Result<ChannelId> {
3166 self.transaction(move |tx| async move {
3167 let tx = tx;
3168
3169 if let Some(parent) = parent {
3170 let channels = self.get_channel_ancestors(parent, &*tx).await?;
3171 channel_member::Entity::find()
3172 .filter(channel_member::Column::ChannelId.is_in(channels.iter().copied()))
3173 .filter(
3174 channel_member::Column::UserId
3175 .eq(creator_id)
3176 .and(channel_member::Column::Accepted.eq(true)),
3177 )
3178 .one(&*tx)
3179 .await?
3180 .ok_or_else(|| {
3181 anyhow!("User does not have the permissions to create this channel")
3182 })?;
3183 }
3184
3185 let channel = channel::ActiveModel {
3186 name: ActiveValue::Set(name.to_string()),
3187 ..Default::default()
3188 };
3189
3190 let channel = channel.insert(&*tx).await?;
3191
3192 if let Some(parent) = parent {
3193 channel_parent::ActiveModel {
3194 child_id: ActiveValue::Set(channel.id),
3195 parent_id: ActiveValue::Set(parent),
3196 }
3197 .insert(&*tx)
3198 .await?;
3199 }
3200
3201 channel_member::ActiveModel {
3202 channel_id: ActiveValue::Set(channel.id),
3203 user_id: ActiveValue::Set(creator_id),
3204 accepted: ActiveValue::Set(true),
3205 admin: ActiveValue::Set(true),
3206 ..Default::default()
3207 }
3208 .insert(&*tx)
3209 .await?;
3210
3211 room::ActiveModel {
3212 channel_id: ActiveValue::Set(Some(channel.id)),
3213 live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
3214 ..Default::default()
3215 }
3216 .insert(&*tx)
3217 .await?;
3218
3219 Ok(channel.id)
3220 })
3221 .await
3222 }
3223
3224 pub async fn remove_channel(
3225 &self,
3226 channel_id: ChannelId,
3227 user_id: UserId,
3228 ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
3229 self.transaction(move |tx| async move {
3230 let tx = tx;
3231
3232 // Check if user is an admin
3233 channel_member::Entity::find()
3234 .filter(
3235 channel_member::Column::ChannelId
3236 .eq(channel_id)
3237 .and(channel_member::Column::UserId.eq(user_id))
3238 .and(channel_member::Column::Admin.eq(true)),
3239 )
3240 .one(&*tx)
3241 .await?
3242 .ok_or_else(|| anyhow!("user is not allowed to remove this channel"))?;
3243
3244 let mut descendants = self.get_channel_descendants([channel_id], &*tx).await?;
3245
3246 // Keep channels which have another active
3247 let mut channels_to_keep = channel_parent::Entity::find()
3248 .filter(
3249 channel_parent::Column::ChildId
3250 .is_in(descendants.keys().copied().filter(|&id| id != channel_id))
3251 .and(
3252 channel_parent::Column::ParentId.is_not_in(descendants.keys().copied()),
3253 ),
3254 )
3255 .stream(&*tx)
3256 .await?;
3257
3258 while let Some(row) = channels_to_keep.next().await {
3259 let row = row?;
3260 descendants.remove(&row.child_id);
3261 }
3262
3263 drop(channels_to_keep);
3264
3265 let channels_to_remove = descendants.keys().copied().collect::<Vec<_>>();
3266
3267 let members_to_notify: Vec<UserId> = channel_member::Entity::find()
3268 .filter(channel_member::Column::ChannelId.is_in(channels_to_remove.iter().copied()))
3269 .select_only()
3270 .column(channel_member::Column::UserId)
3271 .distinct()
3272 .into_values::<_, QueryUserIds>()
3273 .all(&*tx)
3274 .await?;
3275
3276 // Channel members and parents should delete via cascade
3277 channel::Entity::delete_many()
3278 .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
3279 .exec(&*tx)
3280 .await?;
3281
3282 Ok((channels_to_remove, members_to_notify))
3283 })
3284 .await
3285 }
3286
3287 pub async fn invite_channel_member(
3288 &self,
3289 channel_id: ChannelId,
3290 invitee_id: UserId,
3291 inviter_id: UserId,
3292 is_admin: bool,
3293 ) -> Result<()> {
3294 self.transaction(move |tx| async move {
3295 let tx = tx;
3296
3297 // Check if inviter is a member
3298 channel_member::Entity::find()
3299 .filter(
3300 channel_member::Column::ChannelId
3301 .eq(channel_id)
3302 .and(channel_member::Column::UserId.eq(inviter_id))
3303 .and(channel_member::Column::Admin.eq(true)),
3304 )
3305 .one(&*tx)
3306 .await?
3307 .ok_or_else(|| {
3308 anyhow!("Inviter does not have permissions to invite the invitee")
3309 })?;
3310
3311 let channel_membership = channel_member::ActiveModel {
3312 channel_id: ActiveValue::Set(channel_id),
3313 user_id: ActiveValue::Set(invitee_id),
3314 accepted: ActiveValue::Set(false),
3315 admin: ActiveValue::Set(is_admin),
3316 ..Default::default()
3317 };
3318
3319 channel_membership.insert(&*tx).await?;
3320
3321 Ok(())
3322 })
3323 .await
3324 }
3325
3326 pub async fn respond_to_channel_invite(
3327 &self,
3328 channel_id: ChannelId,
3329 user_id: UserId,
3330 accept: bool,
3331 ) -> Result<()> {
3332 self.transaction(move |tx| async move {
3333 let tx = tx;
3334
3335 let rows_affected = if accept {
3336 channel_member::Entity::update_many()
3337 .set(channel_member::ActiveModel {
3338 accepted: ActiveValue::Set(accept),
3339 ..Default::default()
3340 })
3341 .filter(
3342 channel_member::Column::ChannelId
3343 .eq(channel_id)
3344 .and(channel_member::Column::UserId.eq(user_id))
3345 .and(channel_member::Column::Accepted.eq(false)),
3346 )
3347 .exec(&*tx)
3348 .await?
3349 .rows_affected
3350 } else {
3351 channel_member::ActiveModel {
3352 channel_id: ActiveValue::Unchanged(channel_id),
3353 user_id: ActiveValue::Unchanged(user_id),
3354 ..Default::default()
3355 }
3356 .delete(&*tx)
3357 .await?
3358 .rows_affected
3359 };
3360
3361 if rows_affected == 0 {
3362 Err(anyhow!("no such invitation"))?;
3363 }
3364
3365 Ok(())
3366 })
3367 .await
3368 }
3369
3370 pub async fn get_channel_invites(&self, user_id: UserId) -> Result<Vec<Channel>> {
3371 self.transaction(|tx| async move {
3372 let tx = tx;
3373
3374 let channel_invites = channel_member::Entity::find()
3375 .filter(
3376 channel_member::Column::UserId
3377 .eq(user_id)
3378 .and(channel_member::Column::Accepted.eq(false)),
3379 )
3380 .all(&*tx)
3381 .await?;
3382
3383 let channels = channel::Entity::find()
3384 .filter(
3385 channel::Column::Id.is_in(
3386 channel_invites
3387 .into_iter()
3388 .map(|channel_member| channel_member.channel_id),
3389 ),
3390 )
3391 .all(&*tx)
3392 .await?;
3393
3394 let channels = channels
3395 .into_iter()
3396 .map(|channel| Channel {
3397 id: channel.id,
3398 name: channel.name,
3399 parent_id: None,
3400 })
3401 .collect();
3402
3403 Ok(channels)
3404 })
3405 .await
3406 }
3407
3408 pub async fn get_channels(
3409 &self,
3410 user_id: UserId,
3411 ) -> Result<(Vec<Channel>, HashMap<ChannelId, Vec<UserId>>)> {
3412 self.transaction(|tx| async move {
3413 let tx = tx;
3414
3415 let starting_channel_ids: Vec<ChannelId> = channel_member::Entity::find()
3416 .filter(
3417 channel_member::Column::UserId
3418 .eq(user_id)
3419 .and(channel_member::Column::Accepted.eq(true)),
3420 )
3421 .select_only()
3422 .column(channel_member::Column::ChannelId)
3423 .into_values::<_, QueryChannelIds>()
3424 .all(&*tx)
3425 .await?;
3426
3427 let parents_by_child_id = self
3428 .get_channel_descendants(starting_channel_ids, &*tx)
3429 .await?;
3430
3431 let mut channels = Vec::with_capacity(parents_by_child_id.len());
3432 let mut rows = channel::Entity::find()
3433 .filter(channel::Column::Id.is_in(parents_by_child_id.keys().copied()))
3434 .stream(&*tx)
3435 .await?;
3436
3437 while let Some(row) = rows.next().await {
3438 let row = row?;
3439 channels.push(Channel {
3440 id: row.id,
3441 name: row.name,
3442 parent_id: parents_by_child_id.get(&row.id).copied().flatten(),
3443 });
3444 }
3445
3446 drop(rows);
3447
3448 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
3449 enum QueryUserIdsAndChannelIds {
3450 ChannelId,
3451 UserId,
3452 }
3453
3454 let mut participants = room_participant::Entity::find()
3455 .inner_join(room::Entity)
3456 .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
3457 .select_only()
3458 .column(room::Column::ChannelId)
3459 .column(room_participant::Column::UserId)
3460 .into_values::<_, QueryUserIdsAndChannelIds>()
3461 .stream(&*tx)
3462 .await?;
3463
3464 let mut participant_map: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
3465 while let Some(row) = participants.next().await {
3466 let row: (ChannelId, UserId) = row?;
3467 participant_map.entry(row.0).or_default().push(row.1)
3468 }
3469
3470 drop(participants);
3471
3472 Ok((channels, participant_map))
3473 })
3474 .await
3475 }
3476
3477 pub async fn get_channel_members(&self, id: ChannelId) -> Result<Vec<UserId>> {
3478 self.transaction(|tx| async move {
3479 let tx = tx;
3480 let user_ids = self.get_channel_members_internal(id, &*tx).await?;
3481 Ok(user_ids)
3482 })
3483 .await
3484 }
3485
3486 pub async fn get_channel_members_internal(
3487 &self,
3488 id: ChannelId,
3489 tx: &DatabaseTransaction,
3490 ) -> Result<Vec<UserId>> {
3491 let ancestor_ids = self.get_channel_ancestors(id, tx).await?;
3492 let user_ids = channel_member::Entity::find()
3493 .distinct()
3494 .filter(channel_member::Column::ChannelId.is_in(ancestor_ids.iter().copied()))
3495 .select_only()
3496 .column(channel_member::Column::UserId)
3497 .into_values::<_, QueryUserIds>()
3498 .all(&*tx)
3499 .await?;
3500 Ok(user_ids)
3501 }
3502
3503 async fn get_channel_ancestors(
3504 &self,
3505 id: ChannelId,
3506 tx: &DatabaseTransaction,
3507 ) -> Result<Vec<ChannelId>> {
3508 let sql = format!(
3509 r#"
3510 WITH RECURSIVE channel_tree(child_id, parent_id) AS (
3511 SELECT CAST(NULL as INTEGER) as child_id, root_ids.column1 as parent_id
3512 FROM (VALUES ({})) as root_ids
3513 UNION
3514 SELECT channel_parents.child_id, channel_parents.parent_id
3515 FROM channel_parents, channel_tree
3516 WHERE channel_parents.child_id = channel_tree.parent_id
3517 )
3518 SELECT DISTINCT channel_tree.parent_id
3519 FROM channel_tree
3520 "#,
3521 id
3522 );
3523
3524 #[derive(FromQueryResult, Debug, PartialEq)]
3525 pub struct ChannelParent {
3526 pub parent_id: ChannelId,
3527 }
3528
3529 let stmt = Statement::from_string(self.pool.get_database_backend(), sql);
3530
3531 let mut channel_ids_stream = channel_parent::Entity::find()
3532 .from_raw_sql(stmt)
3533 .into_model::<ChannelParent>()
3534 .stream(&*tx)
3535 .await?;
3536
3537 let mut channel_ids = vec![];
3538 while let Some(channel_id) = channel_ids_stream.next().await {
3539 channel_ids.push(channel_id?.parent_id);
3540 }
3541
3542 Ok(channel_ids)
3543 }
3544
3545 async fn get_channel_descendants(
3546 &self,
3547 channel_ids: impl IntoIterator<Item = ChannelId>,
3548 tx: &DatabaseTransaction,
3549 ) -> Result<HashMap<ChannelId, Option<ChannelId>>> {
3550 let mut values = String::new();
3551 for id in channel_ids {
3552 if !values.is_empty() {
3553 values.push_str(", ");
3554 }
3555 write!(&mut values, "({})", id).unwrap();
3556 }
3557
3558 if values.is_empty() {
3559 return Ok(HashMap::default());
3560 }
3561
3562 let sql = format!(
3563 r#"
3564 WITH RECURSIVE channel_tree(child_id, parent_id) AS (
3565 SELECT root_ids.column1 as child_id, CAST(NULL as INTEGER) as parent_id
3566 FROM (VALUES {}) as root_ids
3567 UNION
3568 SELECT channel_parents.child_id, channel_parents.parent_id
3569 FROM channel_parents, channel_tree
3570 WHERE channel_parents.parent_id = channel_tree.child_id
3571 )
3572 SELECT channel_tree.child_id, channel_tree.parent_id
3573 FROM channel_tree
3574 ORDER BY child_id, parent_id IS NOT NULL
3575 "#,
3576 values
3577 );
3578
3579 #[derive(FromQueryResult, Debug, PartialEq)]
3580 pub struct ChannelParent {
3581 pub child_id: ChannelId,
3582 pub parent_id: Option<ChannelId>,
3583 }
3584
3585 let stmt = Statement::from_string(self.pool.get_database_backend(), sql);
3586
3587 let mut parents_by_child_id = HashMap::default();
3588 let mut parents = channel_parent::Entity::find()
3589 .from_raw_sql(stmt)
3590 .into_model::<ChannelParent>()
3591 .stream(tx)
3592 .await?;
3593
3594 while let Some(parent) = parents.next().await {
3595 let parent = parent?;
3596 parents_by_child_id.insert(parent.child_id, parent.parent_id);
3597 }
3598
3599 Ok(parents_by_child_id)
3600 }
3601
3602 pub async fn get_channel(&self, channel_id: ChannelId) -> Result<Option<Channel>> {
3603 self.transaction(|tx| async move {
3604 let tx = tx;
3605 let channel = channel::Entity::find_by_id(channel_id).one(&*tx).await?;
3606
3607 Ok(channel.map(|channel| Channel {
3608 id: channel.id,
3609 name: channel.name,
3610 parent_id: None,
3611 }))
3612 })
3613 .await
3614 }
3615
3616 pub async fn room_id_for_channel(&self, channel_id: ChannelId) -> Result<RoomId> {
3617 self.transaction(|tx| async move {
3618 let tx = tx;
3619 let room = channel::Model {
3620 id: channel_id,
3621 ..Default::default()
3622 }
3623 .find_related(room::Entity)
3624 .one(&*tx)
3625 .await?
3626 .ok_or_else(|| anyhow!("invalid channel"))?;
3627 Ok(room.id)
3628 })
3629 .await
3630 }
3631
3632 async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
3633 where
3634 F: Send + Fn(TransactionHandle) -> Fut,
3635 Fut: Send + Future<Output = Result<T>>,
3636 {
3637 let body = async {
3638 let mut i = 0;
3639 loop {
3640 let (tx, result) = self.with_transaction(&f).await?;
3641 match result {
3642 Ok(result) => match tx.commit().await.map_err(Into::into) {
3643 Ok(()) => return Ok(result),
3644 Err(error) => {
3645 if !self.retry_on_serialization_error(&error, i).await {
3646 return Err(error);
3647 }
3648 }
3649 },
3650 Err(error) => {
3651 tx.rollback().await?;
3652 if !self.retry_on_serialization_error(&error, i).await {
3653 return Err(error);
3654 }
3655 }
3656 }
3657 i += 1;
3658 }
3659 };
3660
3661 self.run(body).await
3662 }
3663
3664 async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
3665 where
3666 F: Send + Fn(TransactionHandle) -> Fut,
3667 Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
3668 {
3669 let body = async {
3670 let mut i = 0;
3671 loop {
3672 let (tx, result) = self.with_transaction(&f).await?;
3673 match result {
3674 Ok(Some((room_id, data))) => {
3675 let lock = self.rooms.entry(room_id).or_default().clone();
3676 let _guard = lock.lock_owned().await;
3677 match tx.commit().await.map_err(Into::into) {
3678 Ok(()) => {
3679 return Ok(Some(RoomGuard {
3680 data,
3681 _guard,
3682 _not_send: PhantomData,
3683 }));
3684 }
3685 Err(error) => {
3686 if !self.retry_on_serialization_error(&error, i).await {
3687 return Err(error);
3688 }
3689 }
3690 }
3691 }
3692 Ok(None) => match tx.commit().await.map_err(Into::into) {
3693 Ok(()) => return Ok(None),
3694 Err(error) => {
3695 if !self.retry_on_serialization_error(&error, i).await {
3696 return Err(error);
3697 }
3698 }
3699 },
3700 Err(error) => {
3701 tx.rollback().await?;
3702 if !self.retry_on_serialization_error(&error, i).await {
3703 return Err(error);
3704 }
3705 }
3706 }
3707 i += 1;
3708 }
3709 };
3710
3711 self.run(body).await
3712 }
3713
3714 async fn room_transaction<F, Fut, T>(&self, room_id: RoomId, f: F) -> Result<RoomGuard<T>>
3715 where
3716 F: Send + Fn(TransactionHandle) -> Fut,
3717 Fut: Send + Future<Output = Result<T>>,
3718 {
3719 let body = async {
3720 let mut i = 0;
3721 loop {
3722 let lock = self.rooms.entry(room_id).or_default().clone();
3723 let _guard = lock.lock_owned().await;
3724 let (tx, result) = self.with_transaction(&f).await?;
3725 match result {
3726 Ok(data) => match tx.commit().await.map_err(Into::into) {
3727 Ok(()) => {
3728 return Ok(RoomGuard {
3729 data,
3730 _guard,
3731 _not_send: PhantomData,
3732 });
3733 }
3734 Err(error) => {
3735 if !self.retry_on_serialization_error(&error, i).await {
3736 return Err(error);
3737 }
3738 }
3739 },
3740 Err(error) => {
3741 tx.rollback().await?;
3742 if !self.retry_on_serialization_error(&error, i).await {
3743 return Err(error);
3744 }
3745 }
3746 }
3747 i += 1;
3748 }
3749 };
3750
3751 self.run(body).await
3752 }
3753
3754 async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
3755 where
3756 F: Send + Fn(TransactionHandle) -> Fut,
3757 Fut: Send + Future<Output = Result<T>>,
3758 {
3759 let tx = self
3760 .pool
3761 .begin_with_config(Some(IsolationLevel::Serializable), None)
3762 .await?;
3763
3764 let mut tx = Arc::new(Some(tx));
3765 let result = f(TransactionHandle(tx.clone())).await;
3766 let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
3767 return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
3768 };
3769
3770 Ok((tx, result))
3771 }
3772
3773 async fn run<F, T>(&self, future: F) -> Result<T>
3774 where
3775 F: Future<Output = Result<T>>,
3776 {
3777 #[cfg(test)]
3778 {
3779 if let Executor::Deterministic(executor) = &self.executor {
3780 executor.simulate_random_delay().await;
3781 }
3782
3783 self.runtime.as_ref().unwrap().block_on(future)
3784 }
3785
3786 #[cfg(not(test))]
3787 {
3788 future.await
3789 }
3790 }
3791
3792 async fn retry_on_serialization_error(&self, error: &Error, prev_attempt_count: u32) -> bool {
3793 // If the error is due to a failure to serialize concurrent transactions, then retry
3794 // this transaction after a delay. With each subsequent retry, double the delay duration.
3795 // Also vary the delay randomly in order to ensure different database connections retry
3796 // at different times.
3797 if is_serialization_error(error) {
3798 let base_delay = 4_u64 << prev_attempt_count.min(16);
3799 let randomized_delay = base_delay as f32 * self.rng.lock().await.gen_range(0.5..=2.0);
3800 log::info!(
3801 "retrying transaction after serialization error. delay: {} ms.",
3802 randomized_delay
3803 );
3804 self.executor
3805 .sleep(Duration::from_millis(randomized_delay as u64))
3806 .await;
3807 true
3808 } else {
3809 false
3810 }
3811 }
3812}
3813
3814fn is_serialization_error(error: &Error) -> bool {
3815 const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
3816 match error {
3817 Error::Database(
3818 DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
3819 | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
3820 ) if error
3821 .as_database_error()
3822 .and_then(|error| error.code())
3823 .as_deref()
3824 == Some(SERIALIZATION_FAILURE_CODE) =>
3825 {
3826 true
3827 }
3828 _ => false,
3829 }
3830}
3831
3832struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
3833
3834impl Deref for TransactionHandle {
3835 type Target = DatabaseTransaction;
3836
3837 fn deref(&self) -> &Self::Target {
3838 self.0.as_ref().as_ref().unwrap()
3839 }
3840}
3841
3842pub struct RoomGuard<T> {
3843 data: T,
3844 _guard: OwnedMutexGuard<()>,
3845 _not_send: PhantomData<Rc<()>>,
3846}
3847
3848impl<T> Deref for RoomGuard<T> {
3849 type Target = T;
3850
3851 fn deref(&self) -> &T {
3852 &self.data
3853 }
3854}
3855
3856impl<T> DerefMut for RoomGuard<T> {
3857 fn deref_mut(&mut self) -> &mut T {
3858 &mut self.data
3859 }
3860}
3861
3862#[derive(Debug, Serialize, Deserialize)]
3863pub struct NewUserParams {
3864 pub github_login: String,
3865 pub github_user_id: i32,
3866 pub invite_count: i32,
3867}
3868
3869#[derive(Debug)]
3870pub struct NewUserResult {
3871 pub user_id: UserId,
3872 pub metrics_id: String,
3873 pub inviting_user_id: Option<UserId>,
3874 pub signup_device_id: Option<String>,
3875}
3876
3877#[derive(FromQueryResult, Debug, PartialEq)]
3878pub struct Channel {
3879 pub id: ChannelId,
3880 pub name: String,
3881 pub parent_id: Option<ChannelId>,
3882}
3883
3884fn random_invite_code() -> String {
3885 nanoid::nanoid!(16)
3886}
3887
3888fn random_email_confirmation_code() -> String {
3889 nanoid::nanoid!(64)
3890}
3891
3892macro_rules! id_type {
3893 ($name:ident) => {
3894 #[derive(
3895 Clone,
3896 Copy,
3897 Debug,
3898 Default,
3899 PartialEq,
3900 Eq,
3901 PartialOrd,
3902 Ord,
3903 Hash,
3904 Serialize,
3905 Deserialize,
3906 )]
3907 #[serde(transparent)]
3908 pub struct $name(pub i32);
3909
3910 impl $name {
3911 #[allow(unused)]
3912 pub const MAX: Self = Self(i32::MAX);
3913
3914 #[allow(unused)]
3915 pub fn from_proto(value: u64) -> Self {
3916 Self(value as i32)
3917 }
3918
3919 #[allow(unused)]
3920 pub fn to_proto(self) -> u64 {
3921 self.0 as u64
3922 }
3923 }
3924
3925 impl std::fmt::Display for $name {
3926 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
3927 self.0.fmt(f)
3928 }
3929 }
3930
3931 impl From<$name> for sea_query::Value {
3932 fn from(value: $name) -> Self {
3933 sea_query::Value::Int(Some(value.0))
3934 }
3935 }
3936
3937 impl sea_orm::TryGetable for $name {
3938 fn try_get(
3939 res: &sea_orm::QueryResult,
3940 pre: &str,
3941 col: &str,
3942 ) -> Result<Self, sea_orm::TryGetError> {
3943 Ok(Self(i32::try_get(res, pre, col)?))
3944 }
3945 }
3946
3947 impl sea_query::ValueType for $name {
3948 fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
3949 match v {
3950 Value::TinyInt(Some(int)) => {
3951 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3952 }
3953 Value::SmallInt(Some(int)) => {
3954 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3955 }
3956 Value::Int(Some(int)) => {
3957 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3958 }
3959 Value::BigInt(Some(int)) => {
3960 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3961 }
3962 Value::TinyUnsigned(Some(int)) => {
3963 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3964 }
3965 Value::SmallUnsigned(Some(int)) => {
3966 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3967 }
3968 Value::Unsigned(Some(int)) => {
3969 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3970 }
3971 Value::BigUnsigned(Some(int)) => {
3972 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3973 }
3974 _ => Err(sea_query::ValueTypeErr),
3975 }
3976 }
3977
3978 fn type_name() -> String {
3979 stringify!($name).into()
3980 }
3981
3982 fn array_type() -> sea_query::ArrayType {
3983 sea_query::ArrayType::Int
3984 }
3985
3986 fn column_type() -> sea_query::ColumnType {
3987 sea_query::ColumnType::Integer(None)
3988 }
3989 }
3990
3991 impl sea_orm::TryFromU64 for $name {
3992 fn try_from_u64(n: u64) -> Result<Self, DbErr> {
3993 Ok(Self(n.try_into().map_err(|_| {
3994 DbErr::ConvertFromU64(concat!(
3995 "error converting ",
3996 stringify!($name),
3997 " to u64"
3998 ))
3999 })?))
4000 }
4001 }
4002
4003 impl sea_query::Nullable for $name {
4004 fn null() -> Value {
4005 Value::Int(None)
4006 }
4007 }
4008 };
4009}
4010
4011id_type!(AccessTokenId);
4012id_type!(ChannelId);
4013id_type!(ChannelMemberId);
4014id_type!(ContactId);
4015id_type!(FollowerId);
4016id_type!(RoomId);
4017id_type!(RoomParticipantId);
4018id_type!(ProjectId);
4019id_type!(ProjectCollaboratorId);
4020id_type!(ReplicaId);
4021id_type!(ServerId);
4022id_type!(SignupId);
4023id_type!(UserId);
4024
4025#[derive(Clone)]
4026pub struct JoinRoom {
4027 pub room: proto::Room,
4028 pub channel_id: Option<ChannelId>,
4029 pub channel_members: Vec<UserId>,
4030}
4031
4032pub struct RejoinedRoom {
4033 pub room: proto::Room,
4034 pub rejoined_projects: Vec<RejoinedProject>,
4035 pub reshared_projects: Vec<ResharedProject>,
4036 pub channel_id: Option<ChannelId>,
4037 pub channel_members: Vec<UserId>,
4038}
4039
4040pub struct ResharedProject {
4041 pub id: ProjectId,
4042 pub old_connection_id: ConnectionId,
4043 pub collaborators: Vec<ProjectCollaborator>,
4044 pub worktrees: Vec<proto::WorktreeMetadata>,
4045}
4046
4047pub struct RejoinedProject {
4048 pub id: ProjectId,
4049 pub old_connection_id: ConnectionId,
4050 pub collaborators: Vec<ProjectCollaborator>,
4051 pub worktrees: Vec<RejoinedWorktree>,
4052 pub language_servers: Vec<proto::LanguageServer>,
4053}
4054
4055#[derive(Debug)]
4056pub struct RejoinedWorktree {
4057 pub id: u64,
4058 pub abs_path: String,
4059 pub root_name: String,
4060 pub visible: bool,
4061 pub updated_entries: Vec<proto::Entry>,
4062 pub removed_entries: Vec<u64>,
4063 pub updated_repositories: Vec<proto::RepositoryEntry>,
4064 pub removed_repositories: Vec<u64>,
4065 pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
4066 pub settings_files: Vec<WorktreeSettingsFile>,
4067 pub scan_id: u64,
4068 pub completed_scan_id: u64,
4069}
4070
4071pub struct LeftRoom {
4072 pub room: proto::Room,
4073 pub channel_id: Option<ChannelId>,
4074 pub channel_members: Vec<UserId>,
4075 pub left_projects: HashMap<ProjectId, LeftProject>,
4076 pub canceled_calls_to_user_ids: Vec<UserId>,
4077 pub deleted: bool,
4078}
4079
4080pub struct RefreshedRoom {
4081 pub room: proto::Room,
4082 pub channel_id: Option<ChannelId>,
4083 pub channel_members: Vec<UserId>,
4084 pub stale_participant_user_ids: Vec<UserId>,
4085 pub canceled_calls_to_user_ids: Vec<UserId>,
4086}
4087
4088pub struct Project {
4089 pub collaborators: Vec<ProjectCollaborator>,
4090 pub worktrees: BTreeMap<u64, Worktree>,
4091 pub language_servers: Vec<proto::LanguageServer>,
4092}
4093
4094pub struct ProjectCollaborator {
4095 pub connection_id: ConnectionId,
4096 pub user_id: UserId,
4097 pub replica_id: ReplicaId,
4098 pub is_host: bool,
4099}
4100
4101impl ProjectCollaborator {
4102 pub fn to_proto(&self) -> proto::Collaborator {
4103 proto::Collaborator {
4104 peer_id: Some(self.connection_id.into()),
4105 replica_id: self.replica_id.0 as u32,
4106 user_id: self.user_id.to_proto(),
4107 }
4108 }
4109}
4110
4111#[derive(Debug)]
4112pub struct LeftProject {
4113 pub id: ProjectId,
4114 pub host_user_id: UserId,
4115 pub host_connection_id: ConnectionId,
4116 pub connection_ids: Vec<ConnectionId>,
4117}
4118
4119pub struct Worktree {
4120 pub id: u64,
4121 pub abs_path: String,
4122 pub root_name: String,
4123 pub visible: bool,
4124 pub entries: Vec<proto::Entry>,
4125 pub repository_entries: BTreeMap<u64, proto::RepositoryEntry>,
4126 pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
4127 pub settings_files: Vec<WorktreeSettingsFile>,
4128 pub scan_id: u64,
4129 pub completed_scan_id: u64,
4130}
4131
4132#[derive(Debug)]
4133pub struct WorktreeSettingsFile {
4134 pub path: String,
4135 pub content: String,
4136}
4137
4138#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
4139enum QueryChannelIds {
4140 ChannelId,
4141}
4142
4143#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
4144enum QueryUserIds {
4145 UserId,
4146}
4147
4148#[cfg(test)]
4149pub use test::*;
4150
4151#[cfg(test)]
4152mod test {
4153 use super::*;
4154 use gpui::executor::Background;
4155 use parking_lot::Mutex;
4156 use sea_orm::ConnectionTrait;
4157 use sqlx::migrate::MigrateDatabase;
4158 use std::sync::Arc;
4159
4160 pub struct TestDb {
4161 pub db: Option<Arc<Database>>,
4162 pub connection: Option<sqlx::AnyConnection>,
4163 }
4164
4165 impl TestDb {
4166 pub fn sqlite(background: Arc<Background>) -> Self {
4167 let url = format!("sqlite::memory:");
4168 let runtime = tokio::runtime::Builder::new_current_thread()
4169 .enable_io()
4170 .enable_time()
4171 .build()
4172 .unwrap();
4173
4174 let mut db = runtime.block_on(async {
4175 let mut options = ConnectOptions::new(url);
4176 options.max_connections(5);
4177 let db = Database::new(options, Executor::Deterministic(background))
4178 .await
4179 .unwrap();
4180 let sql = include_str!(concat!(
4181 env!("CARGO_MANIFEST_DIR"),
4182 "/migrations.sqlite/20221109000000_test_schema.sql"
4183 ));
4184 db.pool
4185 .execute(sea_orm::Statement::from_string(
4186 db.pool.get_database_backend(),
4187 sql.into(),
4188 ))
4189 .await
4190 .unwrap();
4191 db
4192 });
4193
4194 db.runtime = Some(runtime);
4195
4196 Self {
4197 db: Some(Arc::new(db)),
4198 connection: None,
4199 }
4200 }
4201
4202 pub fn postgres(background: Arc<Background>) -> Self {
4203 static LOCK: Mutex<()> = Mutex::new(());
4204
4205 let _guard = LOCK.lock();
4206 let mut rng = StdRng::from_entropy();
4207 let url = format!(
4208 "postgres://postgres@localhost/zed-test-{}",
4209 rng.gen::<u128>()
4210 );
4211 let runtime = tokio::runtime::Builder::new_current_thread()
4212 .enable_io()
4213 .enable_time()
4214 .build()
4215 .unwrap();
4216
4217 let mut db = runtime.block_on(async {
4218 sqlx::Postgres::create_database(&url)
4219 .await
4220 .expect("failed to create test db");
4221 let mut options = ConnectOptions::new(url);
4222 options
4223 .max_connections(5)
4224 .idle_timeout(Duration::from_secs(0));
4225 let db = Database::new(options, Executor::Deterministic(background))
4226 .await
4227 .unwrap();
4228 let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
4229 db.migrate(Path::new(migrations_path), false).await.unwrap();
4230 db
4231 });
4232
4233 db.runtime = Some(runtime);
4234
4235 Self {
4236 db: Some(Arc::new(db)),
4237 connection: None,
4238 }
4239 }
4240
4241 pub fn db(&self) -> &Arc<Database> {
4242 self.db.as_ref().unwrap()
4243 }
4244 }
4245
4246 impl Drop for TestDb {
4247 fn drop(&mut self) {
4248 let db = self.db.take().unwrap();
4249 if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
4250 db.runtime.as_ref().unwrap().block_on(async {
4251 use util::ResultExt;
4252 let query = "
4253 SELECT pg_terminate_backend(pg_stat_activity.pid)
4254 FROM pg_stat_activity
4255 WHERE
4256 pg_stat_activity.datname = current_database() AND
4257 pid <> pg_backend_pid();
4258 ";
4259 db.pool
4260 .execute(sea_orm::Statement::from_string(
4261 db.pool.get_database_backend(),
4262 query.into(),
4263 ))
4264 .await
4265 .log_err();
4266 sqlx::Postgres::drop_database(db.options.get_url())
4267 .await
4268 .log_err();
4269 })
4270 }
4271 }
4272 }
4273}