1mod access_token;
2mod channel;
3mod channel_member;
4mod channel_parent;
5mod contact;
6mod follower;
7mod language_server;
8mod project;
9mod project_collaborator;
10mod room;
11mod room_participant;
12mod server;
13mod signup;
14#[cfg(test)]
15mod tests;
16mod user;
17mod worktree;
18mod worktree_diagnostic_summary;
19mod worktree_entry;
20mod worktree_repository;
21mod worktree_repository_statuses;
22mod worktree_settings_file;
23
24use crate::executor::Executor;
25use crate::{Error, Result};
26use anyhow::anyhow;
27use collections::{BTreeMap, HashMap, HashSet};
28pub use contact::Contact;
29use dashmap::DashMap;
30use futures::StreamExt;
31use hyper::StatusCode;
32use rand::prelude::StdRng;
33use rand::{Rng, SeedableRng};
34use rpc::{proto, ConnectionId};
35use sea_orm::Condition;
36pub use sea_orm::ConnectOptions;
37use sea_orm::{
38 entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
39 DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
40 Statement, TransactionTrait,
41};
42use sea_query::{Alias, Expr, OnConflict, Query};
43use serde::{Deserialize, Serialize};
44pub use signup::{Invite, NewSignup, WaitlistSummary};
45use sqlx::migrate::{Migrate, Migration, MigrationSource};
46use sqlx::Connection;
47use std::fmt::Write as _;
48use std::ops::{Deref, DerefMut};
49use std::path::Path;
50use std::time::Duration;
51use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
52use tokio::sync::{Mutex, OwnedMutexGuard};
53pub use user::Model as User;
54
55pub struct Database {
56 options: ConnectOptions,
57 pool: DatabaseConnection,
58 rooms: DashMap<RoomId, Arc<Mutex<()>>>,
59 rng: Mutex<StdRng>,
60 executor: Executor,
61 #[cfg(test)]
62 runtime: Option<tokio::runtime::Runtime>,
63}
64
65impl Database {
66 pub async fn new(options: ConnectOptions, executor: Executor) -> Result<Self> {
67 Ok(Self {
68 options: options.clone(),
69 pool: sea_orm::Database::connect(options).await?,
70 rooms: DashMap::with_capacity(16384),
71 rng: Mutex::new(StdRng::seed_from_u64(0)),
72 executor,
73 #[cfg(test)]
74 runtime: None,
75 })
76 }
77
78 #[cfg(test)]
79 pub fn reset(&self) {
80 self.rooms.clear();
81 }
82
83 pub async fn migrate(
84 &self,
85 migrations_path: &Path,
86 ignore_checksum_mismatch: bool,
87 ) -> anyhow::Result<Vec<(Migration, Duration)>> {
88 let migrations = MigrationSource::resolve(migrations_path)
89 .await
90 .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
91
92 let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
93
94 connection.ensure_migrations_table().await?;
95 let applied_migrations: HashMap<_, _> = connection
96 .list_applied_migrations()
97 .await?
98 .into_iter()
99 .map(|m| (m.version, m))
100 .collect();
101
102 let mut new_migrations = Vec::new();
103 for migration in migrations {
104 match applied_migrations.get(&migration.version) {
105 Some(applied_migration) => {
106 if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
107 {
108 Err(anyhow!(
109 "checksum mismatch for applied migration {}",
110 migration.description
111 ))?;
112 }
113 }
114 None => {
115 let elapsed = connection.apply(&migration).await?;
116 new_migrations.push((migration, elapsed));
117 }
118 }
119 }
120
121 Ok(new_migrations)
122 }
123
124 pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
125 self.transaction(|tx| async move {
126 let server = server::ActiveModel {
127 environment: ActiveValue::set(environment.into()),
128 ..Default::default()
129 }
130 .insert(&*tx)
131 .await?;
132 Ok(server.id)
133 })
134 .await
135 }
136
137 pub async fn stale_room_ids(
138 &self,
139 environment: &str,
140 new_server_id: ServerId,
141 ) -> Result<Vec<RoomId>> {
142 self.transaction(|tx| async move {
143 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
144 enum QueryAs {
145 RoomId,
146 }
147
148 let stale_server_epochs = self
149 .stale_server_ids(environment, new_server_id, &tx)
150 .await?;
151 Ok(room_participant::Entity::find()
152 .select_only()
153 .column(room_participant::Column::RoomId)
154 .distinct()
155 .filter(
156 room_participant::Column::AnsweringConnectionServerId
157 .is_in(stale_server_epochs),
158 )
159 .into_values::<_, QueryAs>()
160 .all(&*tx)
161 .await?)
162 })
163 .await
164 }
165
166 pub async fn refresh_room(
167 &self,
168 room_id: RoomId,
169 new_server_id: ServerId,
170 ) -> Result<RoomGuard<RefreshedRoom>> {
171 self.room_transaction(room_id, |tx| async move {
172 let stale_participant_filter = Condition::all()
173 .add(room_participant::Column::RoomId.eq(room_id))
174 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
175 .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
176
177 let stale_participant_user_ids = room_participant::Entity::find()
178 .filter(stale_participant_filter.clone())
179 .all(&*tx)
180 .await?
181 .into_iter()
182 .map(|participant| participant.user_id)
183 .collect::<Vec<_>>();
184
185 // Delete participants who failed to reconnect and cancel their calls.
186 let mut canceled_calls_to_user_ids = Vec::new();
187 room_participant::Entity::delete_many()
188 .filter(stale_participant_filter)
189 .exec(&*tx)
190 .await?;
191 let called_participants = room_participant::Entity::find()
192 .filter(
193 Condition::all()
194 .add(
195 room_participant::Column::CallingUserId
196 .is_in(stale_participant_user_ids.iter().copied()),
197 )
198 .add(room_participant::Column::AnsweringConnectionId.is_null()),
199 )
200 .all(&*tx)
201 .await?;
202 room_participant::Entity::delete_many()
203 .filter(
204 room_participant::Column::Id
205 .is_in(called_participants.iter().map(|participant| participant.id)),
206 )
207 .exec(&*tx)
208 .await?;
209 canceled_calls_to_user_ids.extend(
210 called_participants
211 .into_iter()
212 .map(|participant| participant.user_id),
213 );
214
215 let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
216 let channel_members;
217 if let Some(channel_id) = channel_id {
218 channel_members = self.get_channel_members_internal(channel_id, &tx).await?;
219 } else {
220 channel_members = Vec::new();
221
222 // Delete the room if it becomes empty.
223 if room.participants.is_empty() {
224 project::Entity::delete_many()
225 .filter(project::Column::RoomId.eq(room_id))
226 .exec(&*tx)
227 .await?;
228 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
229 }
230 };
231
232 Ok(RefreshedRoom {
233 room,
234 channel_id,
235 channel_members,
236 stale_participant_user_ids,
237 canceled_calls_to_user_ids,
238 })
239 })
240 .await
241 }
242
243 pub async fn delete_stale_servers(
244 &self,
245 environment: &str,
246 new_server_id: ServerId,
247 ) -> Result<()> {
248 self.transaction(|tx| async move {
249 server::Entity::delete_many()
250 .filter(
251 Condition::all()
252 .add(server::Column::Environment.eq(environment))
253 .add(server::Column::Id.ne(new_server_id)),
254 )
255 .exec(&*tx)
256 .await?;
257 Ok(())
258 })
259 .await
260 }
261
262 async fn stale_server_ids(
263 &self,
264 environment: &str,
265 new_server_id: ServerId,
266 tx: &DatabaseTransaction,
267 ) -> Result<Vec<ServerId>> {
268 let stale_servers = server::Entity::find()
269 .filter(
270 Condition::all()
271 .add(server::Column::Environment.eq(environment))
272 .add(server::Column::Id.ne(new_server_id)),
273 )
274 .all(&*tx)
275 .await?;
276 Ok(stale_servers.into_iter().map(|server| server.id).collect())
277 }
278
279 // users
280
281 pub async fn create_user(
282 &self,
283 email_address: &str,
284 admin: bool,
285 params: NewUserParams,
286 ) -> Result<NewUserResult> {
287 self.transaction(|tx| async {
288 let tx = tx;
289 let user = user::Entity::insert(user::ActiveModel {
290 email_address: ActiveValue::set(Some(email_address.into())),
291 github_login: ActiveValue::set(params.github_login.clone()),
292 github_user_id: ActiveValue::set(Some(params.github_user_id)),
293 admin: ActiveValue::set(admin),
294 metrics_id: ActiveValue::set(Uuid::new_v4()),
295 ..Default::default()
296 })
297 .on_conflict(
298 OnConflict::column(user::Column::GithubLogin)
299 .update_column(user::Column::GithubLogin)
300 .to_owned(),
301 )
302 .exec_with_returning(&*tx)
303 .await?;
304
305 Ok(NewUserResult {
306 user_id: user.id,
307 metrics_id: user.metrics_id.to_string(),
308 signup_device_id: None,
309 inviting_user_id: None,
310 })
311 })
312 .await
313 }
314
315 pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
316 self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
317 .await
318 }
319
320 pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
321 self.transaction(|tx| async {
322 let tx = tx;
323 Ok(user::Entity::find()
324 .filter(user::Column::Id.is_in(ids.iter().copied()))
325 .all(&*tx)
326 .await?)
327 })
328 .await
329 }
330
331 pub async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>> {
332 self.transaction(|tx| async move {
333 Ok(user::Entity::find()
334 .filter(user::Column::GithubLogin.eq(github_login))
335 .one(&*tx)
336 .await?)
337 })
338 .await
339 }
340
341 pub async fn get_or_create_user_by_github_account(
342 &self,
343 github_login: &str,
344 github_user_id: Option<i32>,
345 github_email: Option<&str>,
346 ) -> Result<Option<User>> {
347 self.transaction(|tx| async move {
348 let tx = &*tx;
349 if let Some(github_user_id) = github_user_id {
350 if let Some(user_by_github_user_id) = user::Entity::find()
351 .filter(user::Column::GithubUserId.eq(github_user_id))
352 .one(tx)
353 .await?
354 {
355 let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
356 user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
357 Ok(Some(user_by_github_user_id.update(tx).await?))
358 } else if let Some(user_by_github_login) = user::Entity::find()
359 .filter(user::Column::GithubLogin.eq(github_login))
360 .one(tx)
361 .await?
362 {
363 let mut user_by_github_login = user_by_github_login.into_active_model();
364 user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
365 Ok(Some(user_by_github_login.update(tx).await?))
366 } else {
367 let user = user::Entity::insert(user::ActiveModel {
368 email_address: ActiveValue::set(github_email.map(|email| email.into())),
369 github_login: ActiveValue::set(github_login.into()),
370 github_user_id: ActiveValue::set(Some(github_user_id)),
371 admin: ActiveValue::set(false),
372 invite_count: ActiveValue::set(0),
373 invite_code: ActiveValue::set(None),
374 metrics_id: ActiveValue::set(Uuid::new_v4()),
375 ..Default::default()
376 })
377 .exec_with_returning(&*tx)
378 .await?;
379 Ok(Some(user))
380 }
381 } else {
382 Ok(user::Entity::find()
383 .filter(user::Column::GithubLogin.eq(github_login))
384 .one(tx)
385 .await?)
386 }
387 })
388 .await
389 }
390
391 pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
392 self.transaction(|tx| async move {
393 Ok(user::Entity::find()
394 .order_by_asc(user::Column::GithubLogin)
395 .limit(limit as u64)
396 .offset(page as u64 * limit as u64)
397 .all(&*tx)
398 .await?)
399 })
400 .await
401 }
402
403 pub async fn get_users_with_no_invites(
404 &self,
405 invited_by_another_user: bool,
406 ) -> Result<Vec<User>> {
407 self.transaction(|tx| async move {
408 Ok(user::Entity::find()
409 .filter(
410 user::Column::InviteCount
411 .eq(0)
412 .and(if invited_by_another_user {
413 user::Column::InviterId.is_not_null()
414 } else {
415 user::Column::InviterId.is_null()
416 }),
417 )
418 .all(&*tx)
419 .await?)
420 })
421 .await
422 }
423
424 pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
425 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
426 enum QueryAs {
427 MetricsId,
428 }
429
430 self.transaction(|tx| async move {
431 let metrics_id: Uuid = user::Entity::find_by_id(id)
432 .select_only()
433 .column(user::Column::MetricsId)
434 .into_values::<_, QueryAs>()
435 .one(&*tx)
436 .await?
437 .ok_or_else(|| anyhow!("could not find user"))?;
438 Ok(metrics_id.to_string())
439 })
440 .await
441 }
442
443 pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
444 self.transaction(|tx| async move {
445 user::Entity::update_many()
446 .filter(user::Column::Id.eq(id))
447 .set(user::ActiveModel {
448 admin: ActiveValue::set(is_admin),
449 ..Default::default()
450 })
451 .exec(&*tx)
452 .await?;
453 Ok(())
454 })
455 .await
456 }
457
458 pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
459 self.transaction(|tx| async move {
460 user::Entity::update_many()
461 .filter(user::Column::Id.eq(id))
462 .set(user::ActiveModel {
463 connected_once: ActiveValue::set(connected_once),
464 ..Default::default()
465 })
466 .exec(&*tx)
467 .await?;
468 Ok(())
469 })
470 .await
471 }
472
473 pub async fn destroy_user(&self, id: UserId) -> Result<()> {
474 self.transaction(|tx| async move {
475 access_token::Entity::delete_many()
476 .filter(access_token::Column::UserId.eq(id))
477 .exec(&*tx)
478 .await?;
479 user::Entity::delete_by_id(id).exec(&*tx).await?;
480 Ok(())
481 })
482 .await
483 }
484
485 // contacts
486
487 pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
488 #[derive(Debug, FromQueryResult)]
489 struct ContactWithUserBusyStatuses {
490 user_id_a: UserId,
491 user_id_b: UserId,
492 a_to_b: bool,
493 accepted: bool,
494 should_notify: bool,
495 user_a_busy: bool,
496 user_b_busy: bool,
497 }
498
499 self.transaction(|tx| async move {
500 let user_a_participant = Alias::new("user_a_participant");
501 let user_b_participant = Alias::new("user_b_participant");
502 let mut db_contacts = contact::Entity::find()
503 .column_as(
504 Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
505 .is_not_null(),
506 "user_a_busy",
507 )
508 .column_as(
509 Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
510 .is_not_null(),
511 "user_b_busy",
512 )
513 .filter(
514 contact::Column::UserIdA
515 .eq(user_id)
516 .or(contact::Column::UserIdB.eq(user_id)),
517 )
518 .join_as(
519 JoinType::LeftJoin,
520 contact::Relation::UserARoomParticipant.def(),
521 user_a_participant,
522 )
523 .join_as(
524 JoinType::LeftJoin,
525 contact::Relation::UserBRoomParticipant.def(),
526 user_b_participant,
527 )
528 .into_model::<ContactWithUserBusyStatuses>()
529 .stream(&*tx)
530 .await?;
531
532 let mut contacts = Vec::new();
533 while let Some(db_contact) = db_contacts.next().await {
534 let db_contact = db_contact?;
535 if db_contact.user_id_a == user_id {
536 if db_contact.accepted {
537 contacts.push(Contact::Accepted {
538 user_id: db_contact.user_id_b,
539 should_notify: db_contact.should_notify && db_contact.a_to_b,
540 busy: db_contact.user_b_busy,
541 });
542 } else if db_contact.a_to_b {
543 contacts.push(Contact::Outgoing {
544 user_id: db_contact.user_id_b,
545 })
546 } else {
547 contacts.push(Contact::Incoming {
548 user_id: db_contact.user_id_b,
549 should_notify: db_contact.should_notify,
550 });
551 }
552 } else if db_contact.accepted {
553 contacts.push(Contact::Accepted {
554 user_id: db_contact.user_id_a,
555 should_notify: db_contact.should_notify && !db_contact.a_to_b,
556 busy: db_contact.user_a_busy,
557 });
558 } else if db_contact.a_to_b {
559 contacts.push(Contact::Incoming {
560 user_id: db_contact.user_id_a,
561 should_notify: db_contact.should_notify,
562 });
563 } else {
564 contacts.push(Contact::Outgoing {
565 user_id: db_contact.user_id_a,
566 });
567 }
568 }
569
570 contacts.sort_unstable_by_key(|contact| contact.user_id());
571
572 Ok(contacts)
573 })
574 .await
575 }
576
577 pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
578 self.transaction(|tx| async move {
579 let participant = room_participant::Entity::find()
580 .filter(room_participant::Column::UserId.eq(user_id))
581 .one(&*tx)
582 .await?;
583 Ok(participant.is_some())
584 })
585 .await
586 }
587
588 pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
589 self.transaction(|tx| async move {
590 let (id_a, id_b) = if user_id_1 < user_id_2 {
591 (user_id_1, user_id_2)
592 } else {
593 (user_id_2, user_id_1)
594 };
595
596 Ok(contact::Entity::find()
597 .filter(
598 contact::Column::UserIdA
599 .eq(id_a)
600 .and(contact::Column::UserIdB.eq(id_b))
601 .and(contact::Column::Accepted.eq(true)),
602 )
603 .one(&*tx)
604 .await?
605 .is_some())
606 })
607 .await
608 }
609
610 pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
611 self.transaction(|tx| async move {
612 let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
613 (sender_id, receiver_id, true)
614 } else {
615 (receiver_id, sender_id, false)
616 };
617
618 let rows_affected = contact::Entity::insert(contact::ActiveModel {
619 user_id_a: ActiveValue::set(id_a),
620 user_id_b: ActiveValue::set(id_b),
621 a_to_b: ActiveValue::set(a_to_b),
622 accepted: ActiveValue::set(false),
623 should_notify: ActiveValue::set(true),
624 ..Default::default()
625 })
626 .on_conflict(
627 OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
628 .values([
629 (contact::Column::Accepted, true.into()),
630 (contact::Column::ShouldNotify, false.into()),
631 ])
632 .action_and_where(
633 contact::Column::Accepted.eq(false).and(
634 contact::Column::AToB
635 .eq(a_to_b)
636 .and(contact::Column::UserIdA.eq(id_b))
637 .or(contact::Column::AToB
638 .ne(a_to_b)
639 .and(contact::Column::UserIdA.eq(id_a))),
640 ),
641 )
642 .to_owned(),
643 )
644 .exec_without_returning(&*tx)
645 .await?;
646
647 if rows_affected == 1 {
648 Ok(())
649 } else {
650 Err(anyhow!("contact already requested"))?
651 }
652 })
653 .await
654 }
655
656 /// Returns a bool indicating whether the removed contact had originally accepted or not
657 ///
658 /// Deletes the contact identified by the requester and responder ids, and then returns
659 /// whether the deleted contact had originally accepted or was a pending contact request.
660 ///
661 /// # Arguments
662 ///
663 /// * `requester_id` - The user that initiates this request
664 /// * `responder_id` - The user that will be removed
665 pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<bool> {
666 self.transaction(|tx| async move {
667 let (id_a, id_b) = if responder_id < requester_id {
668 (responder_id, requester_id)
669 } else {
670 (requester_id, responder_id)
671 };
672
673 let contact = contact::Entity::find()
674 .filter(
675 contact::Column::UserIdA
676 .eq(id_a)
677 .and(contact::Column::UserIdB.eq(id_b)),
678 )
679 .one(&*tx)
680 .await?
681 .ok_or_else(|| anyhow!("no such contact"))?;
682
683 contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
684 Ok(contact.accepted)
685 })
686 .await
687 }
688
689 pub async fn dismiss_contact_notification(
690 &self,
691 user_id: UserId,
692 contact_user_id: UserId,
693 ) -> Result<()> {
694 self.transaction(|tx| async move {
695 let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
696 (user_id, contact_user_id, true)
697 } else {
698 (contact_user_id, user_id, false)
699 };
700
701 let result = contact::Entity::update_many()
702 .set(contact::ActiveModel {
703 should_notify: ActiveValue::set(false),
704 ..Default::default()
705 })
706 .filter(
707 contact::Column::UserIdA
708 .eq(id_a)
709 .and(contact::Column::UserIdB.eq(id_b))
710 .and(
711 contact::Column::AToB
712 .eq(a_to_b)
713 .and(contact::Column::Accepted.eq(true))
714 .or(contact::Column::AToB
715 .ne(a_to_b)
716 .and(contact::Column::Accepted.eq(false))),
717 ),
718 )
719 .exec(&*tx)
720 .await?;
721 if result.rows_affected == 0 {
722 Err(anyhow!("no such contact request"))?
723 } else {
724 Ok(())
725 }
726 })
727 .await
728 }
729
730 pub async fn respond_to_contact_request(
731 &self,
732 responder_id: UserId,
733 requester_id: UserId,
734 accept: bool,
735 ) -> Result<()> {
736 self.transaction(|tx| async move {
737 let (id_a, id_b, a_to_b) = if responder_id < requester_id {
738 (responder_id, requester_id, false)
739 } else {
740 (requester_id, responder_id, true)
741 };
742 let rows_affected = if accept {
743 let result = contact::Entity::update_many()
744 .set(contact::ActiveModel {
745 accepted: ActiveValue::set(true),
746 should_notify: ActiveValue::set(true),
747 ..Default::default()
748 })
749 .filter(
750 contact::Column::UserIdA
751 .eq(id_a)
752 .and(contact::Column::UserIdB.eq(id_b))
753 .and(contact::Column::AToB.eq(a_to_b)),
754 )
755 .exec(&*tx)
756 .await?;
757 result.rows_affected
758 } else {
759 let result = contact::Entity::delete_many()
760 .filter(
761 contact::Column::UserIdA
762 .eq(id_a)
763 .and(contact::Column::UserIdB.eq(id_b))
764 .and(contact::Column::AToB.eq(a_to_b))
765 .and(contact::Column::Accepted.eq(false)),
766 )
767 .exec(&*tx)
768 .await?;
769
770 result.rows_affected
771 };
772
773 if rows_affected == 1 {
774 Ok(())
775 } else {
776 Err(anyhow!("no such contact request"))?
777 }
778 })
779 .await
780 }
781
782 pub fn fuzzy_like_string(string: &str) -> String {
783 let mut result = String::with_capacity(string.len() * 2 + 1);
784 for c in string.chars() {
785 if c.is_alphanumeric() {
786 result.push('%');
787 result.push(c);
788 }
789 }
790 result.push('%');
791 result
792 }
793
794 pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
795 self.transaction(|tx| async {
796 let tx = tx;
797 let like_string = Self::fuzzy_like_string(name_query);
798 let query = "
799 SELECT users.*
800 FROM users
801 WHERE github_login ILIKE $1
802 ORDER BY github_login <-> $2
803 LIMIT $3
804 ";
805
806 Ok(user::Entity::find()
807 .from_raw_sql(Statement::from_sql_and_values(
808 self.pool.get_database_backend(),
809 query.into(),
810 vec![like_string.into(), name_query.into(), limit.into()],
811 ))
812 .all(&*tx)
813 .await?)
814 })
815 .await
816 }
817
818 // signups
819
820 pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
821 self.transaction(|tx| async move {
822 signup::Entity::insert(signup::ActiveModel {
823 email_address: ActiveValue::set(signup.email_address.clone()),
824 email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
825 email_confirmation_sent: ActiveValue::set(false),
826 platform_mac: ActiveValue::set(signup.platform_mac),
827 platform_windows: ActiveValue::set(signup.platform_windows),
828 platform_linux: ActiveValue::set(signup.platform_linux),
829 platform_unknown: ActiveValue::set(false),
830 editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
831 programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
832 device_id: ActiveValue::set(signup.device_id.clone()),
833 added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
834 ..Default::default()
835 })
836 .on_conflict(
837 OnConflict::column(signup::Column::EmailAddress)
838 .update_columns([
839 signup::Column::PlatformMac,
840 signup::Column::PlatformWindows,
841 signup::Column::PlatformLinux,
842 signup::Column::EditorFeatures,
843 signup::Column::ProgrammingLanguages,
844 signup::Column::DeviceId,
845 signup::Column::AddedToMailingList,
846 ])
847 .to_owned(),
848 )
849 .exec(&*tx)
850 .await?;
851 Ok(())
852 })
853 .await
854 }
855
856 pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
857 self.transaction(|tx| async move {
858 let signup = signup::Entity::find()
859 .filter(signup::Column::EmailAddress.eq(email_address))
860 .one(&*tx)
861 .await?
862 .ok_or_else(|| {
863 anyhow!("signup with email address {} doesn't exist", email_address)
864 })?;
865
866 Ok(signup)
867 })
868 .await
869 }
870
871 pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
872 self.transaction(|tx| async move {
873 let query = "
874 SELECT
875 COUNT(*) as count,
876 COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
877 COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
878 COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
879 COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
880 FROM (
881 SELECT *
882 FROM signups
883 WHERE
884 NOT email_confirmation_sent
885 ) AS unsent
886 ";
887 Ok(
888 WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
889 self.pool.get_database_backend(),
890 query.into(),
891 vec![],
892 ))
893 .one(&*tx)
894 .await?
895 .ok_or_else(|| anyhow!("invalid result"))?,
896 )
897 })
898 .await
899 }
900
901 pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
902 let emails = invites
903 .iter()
904 .map(|s| s.email_address.as_str())
905 .collect::<Vec<_>>();
906 self.transaction(|tx| async {
907 let tx = tx;
908 signup::Entity::update_many()
909 .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
910 .set(signup::ActiveModel {
911 email_confirmation_sent: ActiveValue::set(true),
912 ..Default::default()
913 })
914 .exec(&*tx)
915 .await?;
916 Ok(())
917 })
918 .await
919 }
920
921 pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
922 self.transaction(|tx| async move {
923 Ok(signup::Entity::find()
924 .select_only()
925 .column(signup::Column::EmailAddress)
926 .column(signup::Column::EmailConfirmationCode)
927 .filter(
928 signup::Column::EmailConfirmationSent.eq(false).and(
929 signup::Column::PlatformMac
930 .eq(true)
931 .or(signup::Column::PlatformUnknown.eq(true)),
932 ),
933 )
934 .order_by_asc(signup::Column::CreatedAt)
935 .limit(count as u64)
936 .into_model()
937 .all(&*tx)
938 .await?)
939 })
940 .await
941 }
942
943 // invite codes
944
945 pub async fn create_invite_from_code(
946 &self,
947 code: &str,
948 email_address: &str,
949 device_id: Option<&str>,
950 added_to_mailing_list: bool,
951 ) -> Result<Invite> {
952 self.transaction(|tx| async move {
953 let existing_user = user::Entity::find()
954 .filter(user::Column::EmailAddress.eq(email_address))
955 .one(&*tx)
956 .await?;
957
958 if existing_user.is_some() {
959 Err(anyhow!("email address is already in use"))?;
960 }
961
962 let inviting_user_with_invites = match user::Entity::find()
963 .filter(
964 user::Column::InviteCode
965 .eq(code)
966 .and(user::Column::InviteCount.gt(0)),
967 )
968 .one(&*tx)
969 .await?
970 {
971 Some(inviting_user) => inviting_user,
972 None => {
973 return Err(Error::Http(
974 StatusCode::UNAUTHORIZED,
975 "unable to find an invite code with invites remaining".to_string(),
976 ))?
977 }
978 };
979 user::Entity::update_many()
980 .filter(
981 user::Column::Id
982 .eq(inviting_user_with_invites.id)
983 .and(user::Column::InviteCount.gt(0)),
984 )
985 .col_expr(
986 user::Column::InviteCount,
987 Expr::col(user::Column::InviteCount).sub(1),
988 )
989 .exec(&*tx)
990 .await?;
991
992 let signup = signup::Entity::insert(signup::ActiveModel {
993 email_address: ActiveValue::set(email_address.into()),
994 email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
995 email_confirmation_sent: ActiveValue::set(false),
996 inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
997 platform_linux: ActiveValue::set(false),
998 platform_mac: ActiveValue::set(false),
999 platform_windows: ActiveValue::set(false),
1000 platform_unknown: ActiveValue::set(true),
1001 device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
1002 added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
1003 ..Default::default()
1004 })
1005 .on_conflict(
1006 OnConflict::column(signup::Column::EmailAddress)
1007 .update_column(signup::Column::InvitingUserId)
1008 .to_owned(),
1009 )
1010 .exec_with_returning(&*tx)
1011 .await?;
1012
1013 Ok(Invite {
1014 email_address: signup.email_address,
1015 email_confirmation_code: signup.email_confirmation_code,
1016 })
1017 })
1018 .await
1019 }
1020
1021 pub async fn create_user_from_invite(
1022 &self,
1023 invite: &Invite,
1024 user: NewUserParams,
1025 ) -> Result<Option<NewUserResult>> {
1026 self.transaction(|tx| async {
1027 let tx = tx;
1028 let signup = signup::Entity::find()
1029 .filter(
1030 signup::Column::EmailAddress
1031 .eq(invite.email_address.as_str())
1032 .and(
1033 signup::Column::EmailConfirmationCode
1034 .eq(invite.email_confirmation_code.as_str()),
1035 ),
1036 )
1037 .one(&*tx)
1038 .await?
1039 .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
1040
1041 if signup.user_id.is_some() {
1042 return Ok(None);
1043 }
1044
1045 let user = user::Entity::insert(user::ActiveModel {
1046 email_address: ActiveValue::set(Some(invite.email_address.clone())),
1047 github_login: ActiveValue::set(user.github_login.clone()),
1048 github_user_id: ActiveValue::set(Some(user.github_user_id)),
1049 admin: ActiveValue::set(false),
1050 invite_count: ActiveValue::set(user.invite_count),
1051 invite_code: ActiveValue::set(Some(random_invite_code())),
1052 metrics_id: ActiveValue::set(Uuid::new_v4()),
1053 ..Default::default()
1054 })
1055 .on_conflict(
1056 OnConflict::column(user::Column::GithubLogin)
1057 .update_columns([
1058 user::Column::EmailAddress,
1059 user::Column::GithubUserId,
1060 user::Column::Admin,
1061 ])
1062 .to_owned(),
1063 )
1064 .exec_with_returning(&*tx)
1065 .await?;
1066
1067 let mut signup = signup.into_active_model();
1068 signup.user_id = ActiveValue::set(Some(user.id));
1069 let signup = signup.update(&*tx).await?;
1070
1071 if let Some(inviting_user_id) = signup.inviting_user_id {
1072 let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1073 (inviting_user_id, user.id, true)
1074 } else {
1075 (user.id, inviting_user_id, false)
1076 };
1077
1078 contact::Entity::insert(contact::ActiveModel {
1079 user_id_a: ActiveValue::set(user_id_a),
1080 user_id_b: ActiveValue::set(user_id_b),
1081 a_to_b: ActiveValue::set(a_to_b),
1082 should_notify: ActiveValue::set(true),
1083 accepted: ActiveValue::set(true),
1084 ..Default::default()
1085 })
1086 .on_conflict(OnConflict::new().do_nothing().to_owned())
1087 .exec_without_returning(&*tx)
1088 .await?;
1089 }
1090
1091 Ok(Some(NewUserResult {
1092 user_id: user.id,
1093 metrics_id: user.metrics_id.to_string(),
1094 inviting_user_id: signup.inviting_user_id,
1095 signup_device_id: signup.device_id,
1096 }))
1097 })
1098 .await
1099 }
1100
1101 pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1102 self.transaction(|tx| async move {
1103 if count > 0 {
1104 user::Entity::update_many()
1105 .filter(
1106 user::Column::Id
1107 .eq(id)
1108 .and(user::Column::InviteCode.is_null()),
1109 )
1110 .set(user::ActiveModel {
1111 invite_code: ActiveValue::set(Some(random_invite_code())),
1112 ..Default::default()
1113 })
1114 .exec(&*tx)
1115 .await?;
1116 }
1117
1118 user::Entity::update_many()
1119 .filter(user::Column::Id.eq(id))
1120 .set(user::ActiveModel {
1121 invite_count: ActiveValue::set(count),
1122 ..Default::default()
1123 })
1124 .exec(&*tx)
1125 .await?;
1126 Ok(())
1127 })
1128 .await
1129 }
1130
1131 pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1132 self.transaction(|tx| async move {
1133 match user::Entity::find_by_id(id).one(&*tx).await? {
1134 Some(user) if user.invite_code.is_some() => {
1135 Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1136 }
1137 _ => Ok(None),
1138 }
1139 })
1140 .await
1141 }
1142
1143 pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1144 self.transaction(|tx| async move {
1145 user::Entity::find()
1146 .filter(user::Column::InviteCode.eq(code))
1147 .one(&*tx)
1148 .await?
1149 .ok_or_else(|| {
1150 Error::Http(
1151 StatusCode::NOT_FOUND,
1152 "that invite code does not exist".to_string(),
1153 )
1154 })
1155 })
1156 .await
1157 }
1158
1159 // rooms
1160
1161 pub async fn incoming_call_for_user(
1162 &self,
1163 user_id: UserId,
1164 ) -> Result<Option<proto::IncomingCall>> {
1165 self.transaction(|tx| async move {
1166 let pending_participant = room_participant::Entity::find()
1167 .filter(
1168 room_participant::Column::UserId
1169 .eq(user_id)
1170 .and(room_participant::Column::AnsweringConnectionId.is_null()),
1171 )
1172 .one(&*tx)
1173 .await?;
1174
1175 if let Some(pending_participant) = pending_participant {
1176 let room = self.get_room(pending_participant.room_id, &tx).await?;
1177 Ok(Self::build_incoming_call(&room, user_id))
1178 } else {
1179 Ok(None)
1180 }
1181 })
1182 .await
1183 }
1184
1185 pub async fn create_room(
1186 &self,
1187 user_id: UserId,
1188 connection: ConnectionId,
1189 live_kit_room: &str,
1190 ) -> Result<proto::Room> {
1191 self.transaction(|tx| async move {
1192 let room = room::ActiveModel {
1193 live_kit_room: ActiveValue::set(live_kit_room.into()),
1194 ..Default::default()
1195 }
1196 .insert(&*tx)
1197 .await?;
1198 room_participant::ActiveModel {
1199 room_id: ActiveValue::set(room.id),
1200 user_id: ActiveValue::set(user_id),
1201 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1202 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1203 connection.owner_id as i32,
1204 ))),
1205 answering_connection_lost: ActiveValue::set(false),
1206 calling_user_id: ActiveValue::set(user_id),
1207 calling_connection_id: ActiveValue::set(connection.id as i32),
1208 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1209 connection.owner_id as i32,
1210 ))),
1211 ..Default::default()
1212 }
1213 .insert(&*tx)
1214 .await?;
1215
1216 let room = self.get_room(room.id, &tx).await?;
1217 Ok(room)
1218 })
1219 .await
1220 }
1221
1222 pub async fn call(
1223 &self,
1224 room_id: RoomId,
1225 calling_user_id: UserId,
1226 calling_connection: ConnectionId,
1227 called_user_id: UserId,
1228 initial_project_id: Option<ProjectId>,
1229 ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1230 self.room_transaction(room_id, |tx| async move {
1231 room_participant::ActiveModel {
1232 room_id: ActiveValue::set(room_id),
1233 user_id: ActiveValue::set(called_user_id),
1234 answering_connection_lost: ActiveValue::set(false),
1235 calling_user_id: ActiveValue::set(calling_user_id),
1236 calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1237 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1238 calling_connection.owner_id as i32,
1239 ))),
1240 initial_project_id: ActiveValue::set(initial_project_id),
1241 ..Default::default()
1242 }
1243 .insert(&*tx)
1244 .await?;
1245
1246 let room = self.get_room(room_id, &tx).await?;
1247 let incoming_call = Self::build_incoming_call(&room, called_user_id)
1248 .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1249 Ok((room, incoming_call))
1250 })
1251 .await
1252 }
1253
1254 pub async fn call_failed(
1255 &self,
1256 room_id: RoomId,
1257 called_user_id: UserId,
1258 ) -> Result<RoomGuard<proto::Room>> {
1259 self.room_transaction(room_id, |tx| async move {
1260 room_participant::Entity::delete_many()
1261 .filter(
1262 room_participant::Column::RoomId
1263 .eq(room_id)
1264 .and(room_participant::Column::UserId.eq(called_user_id)),
1265 )
1266 .exec(&*tx)
1267 .await?;
1268 let room = self.get_room(room_id, &tx).await?;
1269 Ok(room)
1270 })
1271 .await
1272 }
1273
1274 pub async fn decline_call(
1275 &self,
1276 expected_room_id: Option<RoomId>,
1277 user_id: UserId,
1278 ) -> Result<Option<RoomGuard<proto::Room>>> {
1279 self.optional_room_transaction(|tx| async move {
1280 let mut filter = Condition::all()
1281 .add(room_participant::Column::UserId.eq(user_id))
1282 .add(room_participant::Column::AnsweringConnectionId.is_null());
1283 if let Some(room_id) = expected_room_id {
1284 filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1285 }
1286 let participant = room_participant::Entity::find()
1287 .filter(filter)
1288 .one(&*tx)
1289 .await?;
1290
1291 let participant = if let Some(participant) = participant {
1292 participant
1293 } else if expected_room_id.is_some() {
1294 return Err(anyhow!("could not find call to decline"))?;
1295 } else {
1296 return Ok(None);
1297 };
1298
1299 let room_id = participant.room_id;
1300 room_participant::Entity::delete(participant.into_active_model())
1301 .exec(&*tx)
1302 .await?;
1303
1304 let room = self.get_room(room_id, &tx).await?;
1305 Ok(Some((room_id, room)))
1306 })
1307 .await
1308 }
1309
1310 pub async fn cancel_call(
1311 &self,
1312 room_id: RoomId,
1313 calling_connection: ConnectionId,
1314 called_user_id: UserId,
1315 ) -> Result<RoomGuard<proto::Room>> {
1316 self.room_transaction(room_id, |tx| async move {
1317 let participant = room_participant::Entity::find()
1318 .filter(
1319 Condition::all()
1320 .add(room_participant::Column::UserId.eq(called_user_id))
1321 .add(room_participant::Column::RoomId.eq(room_id))
1322 .add(
1323 room_participant::Column::CallingConnectionId
1324 .eq(calling_connection.id as i32),
1325 )
1326 .add(
1327 room_participant::Column::CallingConnectionServerId
1328 .eq(calling_connection.owner_id as i32),
1329 )
1330 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1331 )
1332 .one(&*tx)
1333 .await?
1334 .ok_or_else(|| anyhow!("no call to cancel"))?;
1335
1336 room_participant::Entity::delete(participant.into_active_model())
1337 .exec(&*tx)
1338 .await?;
1339
1340 let room = self.get_room(room_id, &tx).await?;
1341 Ok(room)
1342 })
1343 .await
1344 }
1345
1346 pub async fn is_current_room_different_channel(
1347 &self,
1348 user_id: UserId,
1349 channel_id: ChannelId,
1350 ) -> Result<bool> {
1351 self.transaction(|tx| async move {
1352 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1353 enum QueryAs {
1354 ChannelId,
1355 }
1356
1357 let channel_id_model: Option<ChannelId> = room_participant::Entity::find()
1358 .select_only()
1359 .column_as(room::Column::ChannelId, QueryAs::ChannelId)
1360 .inner_join(room::Entity)
1361 .filter(room_participant::Column::UserId.eq(user_id))
1362 .into_values::<_, QueryAs>()
1363 .one(&*tx)
1364 .await?;
1365
1366 let result = channel_id_model
1367 .map(|channel_id_model| channel_id_model != channel_id)
1368 .unwrap_or(false);
1369
1370 Ok(result)
1371 })
1372 .await
1373 }
1374
1375 pub async fn join_room(
1376 &self,
1377 room_id: RoomId,
1378 user_id: UserId,
1379 channel_id: Option<ChannelId>,
1380 connection: ConnectionId,
1381 ) -> Result<RoomGuard<JoinRoom>> {
1382 self.room_transaction(room_id, |tx| async move {
1383 if let Some(channel_id) = channel_id {
1384 channel_member::Entity::find()
1385 .filter(
1386 channel_member::Column::ChannelId
1387 .eq(channel_id)
1388 .and(channel_member::Column::UserId.eq(user_id))
1389 .and(channel_member::Column::Accepted.eq(true)),
1390 )
1391 .one(&*tx)
1392 .await?
1393 .ok_or_else(|| anyhow!("no such channel membership"))?;
1394
1395 room_participant::ActiveModel {
1396 room_id: ActiveValue::set(room_id),
1397 user_id: ActiveValue::set(user_id),
1398 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1399 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1400 connection.owner_id as i32,
1401 ))),
1402 answering_connection_lost: ActiveValue::set(false),
1403 // Redundant for the channel join use case, used for channel and call invitations
1404 calling_user_id: ActiveValue::set(user_id),
1405 calling_connection_id: ActiveValue::set(connection.id as i32),
1406 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1407 connection.owner_id as i32,
1408 ))),
1409 ..Default::default()
1410 }
1411 .insert(&*tx)
1412 .await?;
1413 } else {
1414 let result = room_participant::Entity::update_many()
1415 .filter(
1416 Condition::all()
1417 .add(room_participant::Column::RoomId.eq(room_id))
1418 .add(room_participant::Column::UserId.eq(user_id))
1419 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1420 )
1421 .set(room_participant::ActiveModel {
1422 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1423 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1424 connection.owner_id as i32,
1425 ))),
1426 answering_connection_lost: ActiveValue::set(false),
1427 ..Default::default()
1428 })
1429 .exec(&*tx)
1430 .await?;
1431 if result.rows_affected == 0 {
1432 Err(anyhow!("room does not exist or was already joined"))?;
1433 }
1434 }
1435
1436 let room = self.get_room(room_id, &tx).await?;
1437 let channel_members = if let Some(channel_id) = channel_id {
1438 self.get_channel_members_internal(channel_id, &tx).await?
1439 } else {
1440 Vec::new()
1441 };
1442 Ok(JoinRoom {
1443 room,
1444 channel_id,
1445 channel_members,
1446 })
1447 })
1448 .await
1449 }
1450
1451 pub async fn rejoin_room(
1452 &self,
1453 rejoin_room: proto::RejoinRoom,
1454 user_id: UserId,
1455 connection: ConnectionId,
1456 ) -> Result<RoomGuard<RejoinedRoom>> {
1457 let room_id = RoomId::from_proto(rejoin_room.id);
1458 self.room_transaction(room_id, |tx| async {
1459 let tx = tx;
1460 let participant_update = room_participant::Entity::update_many()
1461 .filter(
1462 Condition::all()
1463 .add(room_participant::Column::RoomId.eq(room_id))
1464 .add(room_participant::Column::UserId.eq(user_id))
1465 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1466 .add(
1467 Condition::any()
1468 .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1469 .add(
1470 room_participant::Column::AnsweringConnectionServerId
1471 .ne(connection.owner_id as i32),
1472 ),
1473 ),
1474 )
1475 .set(room_participant::ActiveModel {
1476 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1477 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1478 connection.owner_id as i32,
1479 ))),
1480 answering_connection_lost: ActiveValue::set(false),
1481 ..Default::default()
1482 })
1483 .exec(&*tx)
1484 .await?;
1485 if participant_update.rows_affected == 0 {
1486 return Err(anyhow!("room does not exist or was already joined"))?;
1487 }
1488
1489 let mut reshared_projects = Vec::new();
1490 for reshared_project in &rejoin_room.reshared_projects {
1491 let project_id = ProjectId::from_proto(reshared_project.project_id);
1492 let project = project::Entity::find_by_id(project_id)
1493 .one(&*tx)
1494 .await?
1495 .ok_or_else(|| anyhow!("project does not exist"))?;
1496 if project.host_user_id != user_id {
1497 return Err(anyhow!("no such project"))?;
1498 }
1499
1500 let mut collaborators = project
1501 .find_related(project_collaborator::Entity)
1502 .all(&*tx)
1503 .await?;
1504 let host_ix = collaborators
1505 .iter()
1506 .position(|collaborator| {
1507 collaborator.user_id == user_id && collaborator.is_host
1508 })
1509 .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1510 let host = collaborators.swap_remove(host_ix);
1511 let old_connection_id = host.connection();
1512
1513 project::Entity::update(project::ActiveModel {
1514 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1515 host_connection_server_id: ActiveValue::set(Some(ServerId(
1516 connection.owner_id as i32,
1517 ))),
1518 ..project.into_active_model()
1519 })
1520 .exec(&*tx)
1521 .await?;
1522 project_collaborator::Entity::update(project_collaborator::ActiveModel {
1523 connection_id: ActiveValue::set(connection.id as i32),
1524 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1525 ..host.into_active_model()
1526 })
1527 .exec(&*tx)
1528 .await?;
1529
1530 self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1531 .await?;
1532
1533 reshared_projects.push(ResharedProject {
1534 id: project_id,
1535 old_connection_id,
1536 collaborators: collaborators
1537 .iter()
1538 .map(|collaborator| ProjectCollaborator {
1539 connection_id: collaborator.connection(),
1540 user_id: collaborator.user_id,
1541 replica_id: collaborator.replica_id,
1542 is_host: collaborator.is_host,
1543 })
1544 .collect(),
1545 worktrees: reshared_project.worktrees.clone(),
1546 });
1547 }
1548
1549 project::Entity::delete_many()
1550 .filter(
1551 Condition::all()
1552 .add(project::Column::RoomId.eq(room_id))
1553 .add(project::Column::HostUserId.eq(user_id))
1554 .add(
1555 project::Column::Id
1556 .is_not_in(reshared_projects.iter().map(|project| project.id)),
1557 ),
1558 )
1559 .exec(&*tx)
1560 .await?;
1561
1562 let mut rejoined_projects = Vec::new();
1563 for rejoined_project in &rejoin_room.rejoined_projects {
1564 let project_id = ProjectId::from_proto(rejoined_project.id);
1565 let Some(project) = project::Entity::find_by_id(project_id)
1566 .one(&*tx)
1567 .await? else { continue };
1568
1569 let mut worktrees = Vec::new();
1570 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1571 for db_worktree in db_worktrees {
1572 let mut worktree = RejoinedWorktree {
1573 id: db_worktree.id as u64,
1574 abs_path: db_worktree.abs_path,
1575 root_name: db_worktree.root_name,
1576 visible: db_worktree.visible,
1577 updated_entries: Default::default(),
1578 removed_entries: Default::default(),
1579 updated_repositories: Default::default(),
1580 removed_repositories: Default::default(),
1581 diagnostic_summaries: Default::default(),
1582 settings_files: Default::default(),
1583 scan_id: db_worktree.scan_id as u64,
1584 completed_scan_id: db_worktree.completed_scan_id as u64,
1585 };
1586
1587 let rejoined_worktree = rejoined_project
1588 .worktrees
1589 .iter()
1590 .find(|worktree| worktree.id == db_worktree.id as u64);
1591
1592 // File entries
1593 {
1594 let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1595 worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1596 } else {
1597 worktree_entry::Column::IsDeleted.eq(false)
1598 };
1599
1600 let mut db_entries = worktree_entry::Entity::find()
1601 .filter(
1602 Condition::all()
1603 .add(worktree_entry::Column::ProjectId.eq(project.id))
1604 .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1605 .add(entry_filter),
1606 )
1607 .stream(&*tx)
1608 .await?;
1609
1610 while let Some(db_entry) = db_entries.next().await {
1611 let db_entry = db_entry?;
1612 if db_entry.is_deleted {
1613 worktree.removed_entries.push(db_entry.id as u64);
1614 } else {
1615 worktree.updated_entries.push(proto::Entry {
1616 id: db_entry.id as u64,
1617 is_dir: db_entry.is_dir,
1618 path: db_entry.path,
1619 inode: db_entry.inode as u64,
1620 mtime: Some(proto::Timestamp {
1621 seconds: db_entry.mtime_seconds as u64,
1622 nanos: db_entry.mtime_nanos as u32,
1623 }),
1624 is_symlink: db_entry.is_symlink,
1625 is_ignored: db_entry.is_ignored,
1626 is_external: db_entry.is_external,
1627 git_status: db_entry.git_status.map(|status| status as i32),
1628 });
1629 }
1630 }
1631 }
1632
1633 // Repository Entries
1634 {
1635 let repository_entry_filter =
1636 if let Some(rejoined_worktree) = rejoined_worktree {
1637 worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
1638 } else {
1639 worktree_repository::Column::IsDeleted.eq(false)
1640 };
1641
1642 let mut db_repositories = worktree_repository::Entity::find()
1643 .filter(
1644 Condition::all()
1645 .add(worktree_repository::Column::ProjectId.eq(project.id))
1646 .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
1647 .add(repository_entry_filter),
1648 )
1649 .stream(&*tx)
1650 .await?;
1651
1652 while let Some(db_repository) = db_repositories.next().await {
1653 let db_repository = db_repository?;
1654 if db_repository.is_deleted {
1655 worktree
1656 .removed_repositories
1657 .push(db_repository.work_directory_id as u64);
1658 } else {
1659 worktree.updated_repositories.push(proto::RepositoryEntry {
1660 work_directory_id: db_repository.work_directory_id as u64,
1661 branch: db_repository.branch,
1662 });
1663 }
1664 }
1665 }
1666
1667 worktrees.push(worktree);
1668 }
1669
1670 let language_servers = project
1671 .find_related(language_server::Entity)
1672 .all(&*tx)
1673 .await?
1674 .into_iter()
1675 .map(|language_server| proto::LanguageServer {
1676 id: language_server.id as u64,
1677 name: language_server.name,
1678 })
1679 .collect::<Vec<_>>();
1680
1681 {
1682 let mut db_settings_files = worktree_settings_file::Entity::find()
1683 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
1684 .stream(&*tx)
1685 .await?;
1686 while let Some(db_settings_file) = db_settings_files.next().await {
1687 let db_settings_file = db_settings_file?;
1688 if let Some(worktree) = worktrees
1689 .iter_mut()
1690 .find(|w| w.id == db_settings_file.worktree_id as u64)
1691 {
1692 worktree.settings_files.push(WorktreeSettingsFile {
1693 path: db_settings_file.path,
1694 content: db_settings_file.content,
1695 });
1696 }
1697 }
1698 }
1699
1700 let mut collaborators = project
1701 .find_related(project_collaborator::Entity)
1702 .all(&*tx)
1703 .await?;
1704 let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1705 .iter()
1706 .position(|collaborator| collaborator.user_id == user_id)
1707 {
1708 collaborators.swap_remove(self_collaborator_ix)
1709 } else {
1710 continue;
1711 };
1712 let old_connection_id = self_collaborator.connection();
1713 project_collaborator::Entity::update(project_collaborator::ActiveModel {
1714 connection_id: ActiveValue::set(connection.id as i32),
1715 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1716 ..self_collaborator.into_active_model()
1717 })
1718 .exec(&*tx)
1719 .await?;
1720
1721 let collaborators = collaborators
1722 .into_iter()
1723 .map(|collaborator| ProjectCollaborator {
1724 connection_id: collaborator.connection(),
1725 user_id: collaborator.user_id,
1726 replica_id: collaborator.replica_id,
1727 is_host: collaborator.is_host,
1728 })
1729 .collect::<Vec<_>>();
1730
1731 rejoined_projects.push(RejoinedProject {
1732 id: project_id,
1733 old_connection_id,
1734 collaborators,
1735 worktrees,
1736 language_servers,
1737 });
1738 }
1739
1740 let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
1741
1742 let channel_members = if let Some(channel_id) = channel_id {
1743 self.get_channel_members_internal(channel_id, &tx).await?
1744 } else {
1745 Vec::new()
1746 };
1747
1748 Ok(RejoinedRoom {
1749 room,
1750 channel_id,
1751 channel_members,
1752 rejoined_projects,
1753 reshared_projects,
1754 })
1755 })
1756 .await
1757 }
1758
1759 pub async fn leave_room(
1760 &self,
1761 connection: ConnectionId,
1762 ) -> Result<Option<RoomGuard<LeftRoom>>> {
1763 self.optional_room_transaction(|tx| async move {
1764 let leaving_participant = room_participant::Entity::find()
1765 .filter(
1766 Condition::all()
1767 .add(
1768 room_participant::Column::AnsweringConnectionId
1769 .eq(connection.id as i32),
1770 )
1771 .add(
1772 room_participant::Column::AnsweringConnectionServerId
1773 .eq(connection.owner_id as i32),
1774 ),
1775 )
1776 .one(&*tx)
1777 .await?;
1778
1779 if let Some(leaving_participant) = leaving_participant {
1780 // Leave room.
1781 let room_id = leaving_participant.room_id;
1782 room_participant::Entity::delete_by_id(leaving_participant.id)
1783 .exec(&*tx)
1784 .await?;
1785
1786 // Cancel pending calls initiated by the leaving user.
1787 let called_participants = room_participant::Entity::find()
1788 .filter(
1789 Condition::all()
1790 .add(
1791 room_participant::Column::CallingUserId
1792 .eq(leaving_participant.user_id),
1793 )
1794 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1795 )
1796 .all(&*tx)
1797 .await?;
1798 room_participant::Entity::delete_many()
1799 .filter(
1800 room_participant::Column::Id
1801 .is_in(called_participants.iter().map(|participant| participant.id)),
1802 )
1803 .exec(&*tx)
1804 .await?;
1805 let canceled_calls_to_user_ids = called_participants
1806 .into_iter()
1807 .map(|participant| participant.user_id)
1808 .collect();
1809
1810 // Detect left projects.
1811 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1812 enum QueryProjectIds {
1813 ProjectId,
1814 }
1815 let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1816 .select_only()
1817 .column_as(
1818 project_collaborator::Column::ProjectId,
1819 QueryProjectIds::ProjectId,
1820 )
1821 .filter(
1822 Condition::all()
1823 .add(
1824 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1825 )
1826 .add(
1827 project_collaborator::Column::ConnectionServerId
1828 .eq(connection.owner_id as i32),
1829 ),
1830 )
1831 .into_values::<_, QueryProjectIds>()
1832 .all(&*tx)
1833 .await?;
1834 let mut left_projects = HashMap::default();
1835 let mut collaborators = project_collaborator::Entity::find()
1836 .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1837 .stream(&*tx)
1838 .await?;
1839 while let Some(collaborator) = collaborators.next().await {
1840 let collaborator = collaborator?;
1841 let left_project =
1842 left_projects
1843 .entry(collaborator.project_id)
1844 .or_insert(LeftProject {
1845 id: collaborator.project_id,
1846 host_user_id: Default::default(),
1847 connection_ids: Default::default(),
1848 host_connection_id: Default::default(),
1849 });
1850
1851 let collaborator_connection_id = collaborator.connection();
1852 if collaborator_connection_id != connection {
1853 left_project.connection_ids.push(collaborator_connection_id);
1854 }
1855
1856 if collaborator.is_host {
1857 left_project.host_user_id = collaborator.user_id;
1858 left_project.host_connection_id = collaborator_connection_id;
1859 }
1860 }
1861 drop(collaborators);
1862
1863 // Leave projects.
1864 project_collaborator::Entity::delete_many()
1865 .filter(
1866 Condition::all()
1867 .add(
1868 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1869 )
1870 .add(
1871 project_collaborator::Column::ConnectionServerId
1872 .eq(connection.owner_id as i32),
1873 ),
1874 )
1875 .exec(&*tx)
1876 .await?;
1877
1878 // Unshare projects.
1879 project::Entity::delete_many()
1880 .filter(
1881 Condition::all()
1882 .add(project::Column::RoomId.eq(room_id))
1883 .add(project::Column::HostConnectionId.eq(connection.id as i32))
1884 .add(
1885 project::Column::HostConnectionServerId
1886 .eq(connection.owner_id as i32),
1887 ),
1888 )
1889 .exec(&*tx)
1890 .await?;
1891
1892 let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
1893 let deleted = if room.participants.is_empty() {
1894 let result = room::Entity::delete_by_id(room_id)
1895 .filter(room::Column::ChannelId.is_null())
1896 .exec(&*tx)
1897 .await?;
1898 result.rows_affected > 0
1899 } else {
1900 false
1901 };
1902
1903 let channel_members = if let Some(channel_id) = channel_id {
1904 self.get_channel_members_internal(channel_id, &tx).await?
1905 } else {
1906 Vec::new()
1907 };
1908 let left_room = LeftRoom {
1909 room,
1910 channel_id,
1911 channel_members,
1912 left_projects,
1913 canceled_calls_to_user_ids,
1914 deleted,
1915 };
1916
1917 if left_room.room.participants.is_empty() {
1918 self.rooms.remove(&room_id);
1919 }
1920
1921 Ok(Some((room_id, left_room)))
1922 } else {
1923 Ok(None)
1924 }
1925 })
1926 .await
1927 }
1928
1929 pub async fn follow(
1930 &self,
1931 project_id: ProjectId,
1932 leader_connection: ConnectionId,
1933 follower_connection: ConnectionId,
1934 ) -> Result<RoomGuard<proto::Room>> {
1935 let room_id = self.room_id_for_project(project_id).await?;
1936 self.room_transaction(room_id, |tx| async move {
1937 follower::ActiveModel {
1938 room_id: ActiveValue::set(room_id),
1939 project_id: ActiveValue::set(project_id),
1940 leader_connection_server_id: ActiveValue::set(ServerId(
1941 leader_connection.owner_id as i32,
1942 )),
1943 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1944 follower_connection_server_id: ActiveValue::set(ServerId(
1945 follower_connection.owner_id as i32,
1946 )),
1947 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1948 ..Default::default()
1949 }
1950 .insert(&*tx)
1951 .await?;
1952
1953 let room = self.get_room(room_id, &*tx).await?;
1954 Ok(room)
1955 })
1956 .await
1957 }
1958
1959 pub async fn unfollow(
1960 &self,
1961 project_id: ProjectId,
1962 leader_connection: ConnectionId,
1963 follower_connection: ConnectionId,
1964 ) -> Result<RoomGuard<proto::Room>> {
1965 let room_id = self.room_id_for_project(project_id).await?;
1966 self.room_transaction(room_id, |tx| async move {
1967 follower::Entity::delete_many()
1968 .filter(
1969 Condition::all()
1970 .add(follower::Column::ProjectId.eq(project_id))
1971 .add(
1972 follower::Column::LeaderConnectionServerId
1973 .eq(leader_connection.owner_id),
1974 )
1975 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1976 .add(
1977 follower::Column::FollowerConnectionServerId
1978 .eq(follower_connection.owner_id),
1979 )
1980 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1981 )
1982 .exec(&*tx)
1983 .await?;
1984
1985 let room = self.get_room(room_id, &*tx).await?;
1986 Ok(room)
1987 })
1988 .await
1989 }
1990
1991 pub async fn update_room_participant_location(
1992 &self,
1993 room_id: RoomId,
1994 connection: ConnectionId,
1995 location: proto::ParticipantLocation,
1996 ) -> Result<RoomGuard<proto::Room>> {
1997 self.room_transaction(room_id, |tx| async {
1998 let tx = tx;
1999 let location_kind;
2000 let location_project_id;
2001 match location
2002 .variant
2003 .as_ref()
2004 .ok_or_else(|| anyhow!("invalid location"))?
2005 {
2006 proto::participant_location::Variant::SharedProject(project) => {
2007 location_kind = 0;
2008 location_project_id = Some(ProjectId::from_proto(project.id));
2009 }
2010 proto::participant_location::Variant::UnsharedProject(_) => {
2011 location_kind = 1;
2012 location_project_id = None;
2013 }
2014 proto::participant_location::Variant::External(_) => {
2015 location_kind = 2;
2016 location_project_id = None;
2017 }
2018 }
2019
2020 let result = room_participant::Entity::update_many()
2021 .filter(
2022 Condition::all()
2023 .add(room_participant::Column::RoomId.eq(room_id))
2024 .add(
2025 room_participant::Column::AnsweringConnectionId
2026 .eq(connection.id as i32),
2027 )
2028 .add(
2029 room_participant::Column::AnsweringConnectionServerId
2030 .eq(connection.owner_id as i32),
2031 ),
2032 )
2033 .set(room_participant::ActiveModel {
2034 location_kind: ActiveValue::set(Some(location_kind)),
2035 location_project_id: ActiveValue::set(location_project_id),
2036 ..Default::default()
2037 })
2038 .exec(&*tx)
2039 .await?;
2040
2041 if result.rows_affected == 1 {
2042 let room = self.get_room(room_id, &tx).await?;
2043 Ok(room)
2044 } else {
2045 Err(anyhow!("could not update room participant location"))?
2046 }
2047 })
2048 .await
2049 }
2050
2051 pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
2052 self.transaction(|tx| async move {
2053 let participant = room_participant::Entity::find()
2054 .filter(
2055 Condition::all()
2056 .add(
2057 room_participant::Column::AnsweringConnectionId
2058 .eq(connection.id as i32),
2059 )
2060 .add(
2061 room_participant::Column::AnsweringConnectionServerId
2062 .eq(connection.owner_id as i32),
2063 ),
2064 )
2065 .one(&*tx)
2066 .await?
2067 .ok_or_else(|| anyhow!("not a participant in any room"))?;
2068
2069 room_participant::Entity::update(room_participant::ActiveModel {
2070 answering_connection_lost: ActiveValue::set(true),
2071 ..participant.into_active_model()
2072 })
2073 .exec(&*tx)
2074 .await?;
2075
2076 Ok(())
2077 })
2078 .await
2079 }
2080
2081 fn build_incoming_call(
2082 room: &proto::Room,
2083 called_user_id: UserId,
2084 ) -> Option<proto::IncomingCall> {
2085 let pending_participant = room
2086 .pending_participants
2087 .iter()
2088 .find(|participant| participant.user_id == called_user_id.to_proto())?;
2089
2090 Some(proto::IncomingCall {
2091 room_id: room.id,
2092 calling_user_id: pending_participant.calling_user_id,
2093 participant_user_ids: room
2094 .participants
2095 .iter()
2096 .map(|participant| participant.user_id)
2097 .collect(),
2098 initial_project: room.participants.iter().find_map(|participant| {
2099 let initial_project_id = pending_participant.initial_project_id?;
2100 participant
2101 .projects
2102 .iter()
2103 .find(|project| project.id == initial_project_id)
2104 .cloned()
2105 }),
2106 })
2107 }
2108 async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
2109 let (_, room) = self.get_channel_room(room_id, tx).await?;
2110 Ok(room)
2111 }
2112
2113 async fn get_channel_room(
2114 &self,
2115 room_id: RoomId,
2116 tx: &DatabaseTransaction,
2117 ) -> Result<(Option<ChannelId>, proto::Room)> {
2118 let db_room = room::Entity::find_by_id(room_id)
2119 .one(tx)
2120 .await?
2121 .ok_or_else(|| anyhow!("could not find room"))?;
2122
2123 let mut db_participants = db_room
2124 .find_related(room_participant::Entity)
2125 .stream(tx)
2126 .await?;
2127 let mut participants = HashMap::default();
2128 let mut pending_participants = Vec::new();
2129 while let Some(db_participant) = db_participants.next().await {
2130 let db_participant = db_participant?;
2131 if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
2132 .answering_connection_id
2133 .zip(db_participant.answering_connection_server_id)
2134 {
2135 let location = match (
2136 db_participant.location_kind,
2137 db_participant.location_project_id,
2138 ) {
2139 (Some(0), Some(project_id)) => {
2140 Some(proto::participant_location::Variant::SharedProject(
2141 proto::participant_location::SharedProject {
2142 id: project_id.to_proto(),
2143 },
2144 ))
2145 }
2146 (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
2147 Default::default(),
2148 )),
2149 _ => Some(proto::participant_location::Variant::External(
2150 Default::default(),
2151 )),
2152 };
2153
2154 let answering_connection = ConnectionId {
2155 owner_id: answering_connection_server_id.0 as u32,
2156 id: answering_connection_id as u32,
2157 };
2158 participants.insert(
2159 answering_connection,
2160 proto::Participant {
2161 user_id: db_participant.user_id.to_proto(),
2162 peer_id: Some(answering_connection.into()),
2163 projects: Default::default(),
2164 location: Some(proto::ParticipantLocation { variant: location }),
2165 },
2166 );
2167 } else {
2168 pending_participants.push(proto::PendingParticipant {
2169 user_id: db_participant.user_id.to_proto(),
2170 calling_user_id: db_participant.calling_user_id.to_proto(),
2171 initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
2172 });
2173 }
2174 }
2175 drop(db_participants);
2176
2177 let mut db_projects = db_room
2178 .find_related(project::Entity)
2179 .find_with_related(worktree::Entity)
2180 .stream(tx)
2181 .await?;
2182
2183 while let Some(row) = db_projects.next().await {
2184 let (db_project, db_worktree) = row?;
2185 let host_connection = db_project.host_connection()?;
2186 if let Some(participant) = participants.get_mut(&host_connection) {
2187 let project = if let Some(project) = participant
2188 .projects
2189 .iter_mut()
2190 .find(|project| project.id == db_project.id.to_proto())
2191 {
2192 project
2193 } else {
2194 participant.projects.push(proto::ParticipantProject {
2195 id: db_project.id.to_proto(),
2196 worktree_root_names: Default::default(),
2197 });
2198 participant.projects.last_mut().unwrap()
2199 };
2200
2201 if let Some(db_worktree) = db_worktree {
2202 if db_worktree.visible {
2203 project.worktree_root_names.push(db_worktree.root_name);
2204 }
2205 }
2206 }
2207 }
2208 drop(db_projects);
2209
2210 let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
2211 let mut followers = Vec::new();
2212 while let Some(db_follower) = db_followers.next().await {
2213 let db_follower = db_follower?;
2214 followers.push(proto::Follower {
2215 leader_id: Some(db_follower.leader_connection().into()),
2216 follower_id: Some(db_follower.follower_connection().into()),
2217 project_id: db_follower.project_id.to_proto(),
2218 });
2219 }
2220
2221 Ok((
2222 db_room.channel_id,
2223 proto::Room {
2224 id: db_room.id.to_proto(),
2225 live_kit_room: db_room.live_kit_room,
2226 participants: participants.into_values().collect(),
2227 pending_participants,
2228 followers,
2229 },
2230 ))
2231 }
2232
2233 // projects
2234
2235 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
2236 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2237 enum QueryAs {
2238 Count,
2239 }
2240
2241 self.transaction(|tx| async move {
2242 Ok(project::Entity::find()
2243 .select_only()
2244 .column_as(project::Column::Id.count(), QueryAs::Count)
2245 .inner_join(user::Entity)
2246 .filter(user::Column::Admin.eq(false))
2247 .into_values::<_, QueryAs>()
2248 .one(&*tx)
2249 .await?
2250 .unwrap_or(0i64) as usize)
2251 })
2252 .await
2253 }
2254
2255 pub async fn share_project(
2256 &self,
2257 room_id: RoomId,
2258 connection: ConnectionId,
2259 worktrees: &[proto::WorktreeMetadata],
2260 ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
2261 self.room_transaction(room_id, |tx| async move {
2262 let participant = room_participant::Entity::find()
2263 .filter(
2264 Condition::all()
2265 .add(
2266 room_participant::Column::AnsweringConnectionId
2267 .eq(connection.id as i32),
2268 )
2269 .add(
2270 room_participant::Column::AnsweringConnectionServerId
2271 .eq(connection.owner_id as i32),
2272 ),
2273 )
2274 .one(&*tx)
2275 .await?
2276 .ok_or_else(|| anyhow!("could not find participant"))?;
2277 if participant.room_id != room_id {
2278 return Err(anyhow!("shared project on unexpected room"))?;
2279 }
2280
2281 let project = project::ActiveModel {
2282 room_id: ActiveValue::set(participant.room_id),
2283 host_user_id: ActiveValue::set(participant.user_id),
2284 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
2285 host_connection_server_id: ActiveValue::set(Some(ServerId(
2286 connection.owner_id as i32,
2287 ))),
2288 ..Default::default()
2289 }
2290 .insert(&*tx)
2291 .await?;
2292
2293 if !worktrees.is_empty() {
2294 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2295 worktree::ActiveModel {
2296 id: ActiveValue::set(worktree.id as i64),
2297 project_id: ActiveValue::set(project.id),
2298 abs_path: ActiveValue::set(worktree.abs_path.clone()),
2299 root_name: ActiveValue::set(worktree.root_name.clone()),
2300 visible: ActiveValue::set(worktree.visible),
2301 scan_id: ActiveValue::set(0),
2302 completed_scan_id: ActiveValue::set(0),
2303 }
2304 }))
2305 .exec(&*tx)
2306 .await?;
2307 }
2308
2309 project_collaborator::ActiveModel {
2310 project_id: ActiveValue::set(project.id),
2311 connection_id: ActiveValue::set(connection.id as i32),
2312 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2313 user_id: ActiveValue::set(participant.user_id),
2314 replica_id: ActiveValue::set(ReplicaId(0)),
2315 is_host: ActiveValue::set(true),
2316 ..Default::default()
2317 }
2318 .insert(&*tx)
2319 .await?;
2320
2321 let room = self.get_room(room_id, &tx).await?;
2322 Ok((project.id, room))
2323 })
2324 .await
2325 }
2326
2327 pub async fn unshare_project(
2328 &self,
2329 project_id: ProjectId,
2330 connection: ConnectionId,
2331 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2332 let room_id = self.room_id_for_project(project_id).await?;
2333 self.room_transaction(room_id, |tx| async move {
2334 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2335
2336 let project = project::Entity::find_by_id(project_id)
2337 .one(&*tx)
2338 .await?
2339 .ok_or_else(|| anyhow!("project not found"))?;
2340 if project.host_connection()? == connection {
2341 project::Entity::delete(project.into_active_model())
2342 .exec(&*tx)
2343 .await?;
2344 let room = self.get_room(room_id, &tx).await?;
2345 Ok((room, guest_connection_ids))
2346 } else {
2347 Err(anyhow!("cannot unshare a project hosted by another user"))?
2348 }
2349 })
2350 .await
2351 }
2352
2353 pub async fn update_project(
2354 &self,
2355 project_id: ProjectId,
2356 connection: ConnectionId,
2357 worktrees: &[proto::WorktreeMetadata],
2358 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2359 let room_id = self.room_id_for_project(project_id).await?;
2360 self.room_transaction(room_id, |tx| async move {
2361 let project = project::Entity::find_by_id(project_id)
2362 .filter(
2363 Condition::all()
2364 .add(project::Column::HostConnectionId.eq(connection.id as i32))
2365 .add(
2366 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2367 ),
2368 )
2369 .one(&*tx)
2370 .await?
2371 .ok_or_else(|| anyhow!("no such project"))?;
2372
2373 self.update_project_worktrees(project.id, worktrees, &tx)
2374 .await?;
2375
2376 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2377 let room = self.get_room(project.room_id, &tx).await?;
2378 Ok((room, guest_connection_ids))
2379 })
2380 .await
2381 }
2382
2383 async fn update_project_worktrees(
2384 &self,
2385 project_id: ProjectId,
2386 worktrees: &[proto::WorktreeMetadata],
2387 tx: &DatabaseTransaction,
2388 ) -> Result<()> {
2389 if !worktrees.is_empty() {
2390 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2391 id: ActiveValue::set(worktree.id as i64),
2392 project_id: ActiveValue::set(project_id),
2393 abs_path: ActiveValue::set(worktree.abs_path.clone()),
2394 root_name: ActiveValue::set(worktree.root_name.clone()),
2395 visible: ActiveValue::set(worktree.visible),
2396 scan_id: ActiveValue::set(0),
2397 completed_scan_id: ActiveValue::set(0),
2398 }))
2399 .on_conflict(
2400 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2401 .update_column(worktree::Column::RootName)
2402 .to_owned(),
2403 )
2404 .exec(&*tx)
2405 .await?;
2406 }
2407
2408 worktree::Entity::delete_many()
2409 .filter(worktree::Column::ProjectId.eq(project_id).and(
2410 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2411 ))
2412 .exec(&*tx)
2413 .await?;
2414
2415 Ok(())
2416 }
2417
2418 pub async fn update_worktree(
2419 &self,
2420 update: &proto::UpdateWorktree,
2421 connection: ConnectionId,
2422 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2423 let project_id = ProjectId::from_proto(update.project_id);
2424 let worktree_id = update.worktree_id as i64;
2425 let room_id = self.room_id_for_project(project_id).await?;
2426 self.room_transaction(room_id, |tx| async move {
2427 // Ensure the update comes from the host.
2428 let _project = project::Entity::find_by_id(project_id)
2429 .filter(
2430 Condition::all()
2431 .add(project::Column::HostConnectionId.eq(connection.id as i32))
2432 .add(
2433 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2434 ),
2435 )
2436 .one(&*tx)
2437 .await?
2438 .ok_or_else(|| anyhow!("no such project"))?;
2439
2440 // Update metadata.
2441 worktree::Entity::update(worktree::ActiveModel {
2442 id: ActiveValue::set(worktree_id),
2443 project_id: ActiveValue::set(project_id),
2444 root_name: ActiveValue::set(update.root_name.clone()),
2445 scan_id: ActiveValue::set(update.scan_id as i64),
2446 completed_scan_id: if update.is_last_update {
2447 ActiveValue::set(update.scan_id as i64)
2448 } else {
2449 ActiveValue::default()
2450 },
2451 abs_path: ActiveValue::set(update.abs_path.clone()),
2452 ..Default::default()
2453 })
2454 .exec(&*tx)
2455 .await?;
2456
2457 if !update.updated_entries.is_empty() {
2458 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2459 let mtime = entry.mtime.clone().unwrap_or_default();
2460 worktree_entry::ActiveModel {
2461 project_id: ActiveValue::set(project_id),
2462 worktree_id: ActiveValue::set(worktree_id),
2463 id: ActiveValue::set(entry.id as i64),
2464 is_dir: ActiveValue::set(entry.is_dir),
2465 path: ActiveValue::set(entry.path.clone()),
2466 inode: ActiveValue::set(entry.inode as i64),
2467 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2468 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2469 is_symlink: ActiveValue::set(entry.is_symlink),
2470 is_ignored: ActiveValue::set(entry.is_ignored),
2471 is_external: ActiveValue::set(entry.is_external),
2472 git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
2473 is_deleted: ActiveValue::set(false),
2474 scan_id: ActiveValue::set(update.scan_id as i64),
2475 }
2476 }))
2477 .on_conflict(
2478 OnConflict::columns([
2479 worktree_entry::Column::ProjectId,
2480 worktree_entry::Column::WorktreeId,
2481 worktree_entry::Column::Id,
2482 ])
2483 .update_columns([
2484 worktree_entry::Column::IsDir,
2485 worktree_entry::Column::Path,
2486 worktree_entry::Column::Inode,
2487 worktree_entry::Column::MtimeSeconds,
2488 worktree_entry::Column::MtimeNanos,
2489 worktree_entry::Column::IsSymlink,
2490 worktree_entry::Column::IsIgnored,
2491 worktree_entry::Column::GitStatus,
2492 worktree_entry::Column::ScanId,
2493 ])
2494 .to_owned(),
2495 )
2496 .exec(&*tx)
2497 .await?;
2498 }
2499
2500 if !update.removed_entries.is_empty() {
2501 worktree_entry::Entity::update_many()
2502 .filter(
2503 worktree_entry::Column::ProjectId
2504 .eq(project_id)
2505 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2506 .and(
2507 worktree_entry::Column::Id
2508 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2509 ),
2510 )
2511 .set(worktree_entry::ActiveModel {
2512 is_deleted: ActiveValue::Set(true),
2513 scan_id: ActiveValue::Set(update.scan_id as i64),
2514 ..Default::default()
2515 })
2516 .exec(&*tx)
2517 .await?;
2518 }
2519
2520 if !update.updated_repositories.is_empty() {
2521 worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
2522 |repository| worktree_repository::ActiveModel {
2523 project_id: ActiveValue::set(project_id),
2524 worktree_id: ActiveValue::set(worktree_id),
2525 work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
2526 scan_id: ActiveValue::set(update.scan_id as i64),
2527 branch: ActiveValue::set(repository.branch.clone()),
2528 is_deleted: ActiveValue::set(false),
2529 },
2530 ))
2531 .on_conflict(
2532 OnConflict::columns([
2533 worktree_repository::Column::ProjectId,
2534 worktree_repository::Column::WorktreeId,
2535 worktree_repository::Column::WorkDirectoryId,
2536 ])
2537 .update_columns([
2538 worktree_repository::Column::ScanId,
2539 worktree_repository::Column::Branch,
2540 ])
2541 .to_owned(),
2542 )
2543 .exec(&*tx)
2544 .await?;
2545 }
2546
2547 if !update.removed_repositories.is_empty() {
2548 worktree_repository::Entity::update_many()
2549 .filter(
2550 worktree_repository::Column::ProjectId
2551 .eq(project_id)
2552 .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
2553 .and(
2554 worktree_repository::Column::WorkDirectoryId
2555 .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
2556 ),
2557 )
2558 .set(worktree_repository::ActiveModel {
2559 is_deleted: ActiveValue::Set(true),
2560 scan_id: ActiveValue::Set(update.scan_id as i64),
2561 ..Default::default()
2562 })
2563 .exec(&*tx)
2564 .await?;
2565 }
2566
2567 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2568 Ok(connection_ids)
2569 })
2570 .await
2571 }
2572
2573 pub async fn update_diagnostic_summary(
2574 &self,
2575 update: &proto::UpdateDiagnosticSummary,
2576 connection: ConnectionId,
2577 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2578 let project_id = ProjectId::from_proto(update.project_id);
2579 let worktree_id = update.worktree_id as i64;
2580 let room_id = self.room_id_for_project(project_id).await?;
2581 self.room_transaction(room_id, |tx| async move {
2582 let summary = update
2583 .summary
2584 .as_ref()
2585 .ok_or_else(|| anyhow!("invalid summary"))?;
2586
2587 // Ensure the update comes from the host.
2588 let project = project::Entity::find_by_id(project_id)
2589 .one(&*tx)
2590 .await?
2591 .ok_or_else(|| anyhow!("no such project"))?;
2592 if project.host_connection()? != connection {
2593 return Err(anyhow!("can't update a project hosted by someone else"))?;
2594 }
2595
2596 // Update summary.
2597 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2598 project_id: ActiveValue::set(project_id),
2599 worktree_id: ActiveValue::set(worktree_id),
2600 path: ActiveValue::set(summary.path.clone()),
2601 language_server_id: ActiveValue::set(summary.language_server_id as i64),
2602 error_count: ActiveValue::set(summary.error_count as i32),
2603 warning_count: ActiveValue::set(summary.warning_count as i32),
2604 ..Default::default()
2605 })
2606 .on_conflict(
2607 OnConflict::columns([
2608 worktree_diagnostic_summary::Column::ProjectId,
2609 worktree_diagnostic_summary::Column::WorktreeId,
2610 worktree_diagnostic_summary::Column::Path,
2611 ])
2612 .update_columns([
2613 worktree_diagnostic_summary::Column::LanguageServerId,
2614 worktree_diagnostic_summary::Column::ErrorCount,
2615 worktree_diagnostic_summary::Column::WarningCount,
2616 ])
2617 .to_owned(),
2618 )
2619 .exec(&*tx)
2620 .await?;
2621
2622 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2623 Ok(connection_ids)
2624 })
2625 .await
2626 }
2627
2628 pub async fn start_language_server(
2629 &self,
2630 update: &proto::StartLanguageServer,
2631 connection: ConnectionId,
2632 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2633 let project_id = ProjectId::from_proto(update.project_id);
2634 let room_id = self.room_id_for_project(project_id).await?;
2635 self.room_transaction(room_id, |tx| async move {
2636 let server = update
2637 .server
2638 .as_ref()
2639 .ok_or_else(|| anyhow!("invalid language server"))?;
2640
2641 // Ensure the update comes from the host.
2642 let project = project::Entity::find_by_id(project_id)
2643 .one(&*tx)
2644 .await?
2645 .ok_or_else(|| anyhow!("no such project"))?;
2646 if project.host_connection()? != connection {
2647 return Err(anyhow!("can't update a project hosted by someone else"))?;
2648 }
2649
2650 // Add the newly-started language server.
2651 language_server::Entity::insert(language_server::ActiveModel {
2652 project_id: ActiveValue::set(project_id),
2653 id: ActiveValue::set(server.id as i64),
2654 name: ActiveValue::set(server.name.clone()),
2655 ..Default::default()
2656 })
2657 .on_conflict(
2658 OnConflict::columns([
2659 language_server::Column::ProjectId,
2660 language_server::Column::Id,
2661 ])
2662 .update_column(language_server::Column::Name)
2663 .to_owned(),
2664 )
2665 .exec(&*tx)
2666 .await?;
2667
2668 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2669 Ok(connection_ids)
2670 })
2671 .await
2672 }
2673
2674 pub async fn update_worktree_settings(
2675 &self,
2676 update: &proto::UpdateWorktreeSettings,
2677 connection: ConnectionId,
2678 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2679 let project_id = ProjectId::from_proto(update.project_id);
2680 let room_id = self.room_id_for_project(project_id).await?;
2681 self.room_transaction(room_id, |tx| async move {
2682 // Ensure the update comes from the host.
2683 let project = project::Entity::find_by_id(project_id)
2684 .one(&*tx)
2685 .await?
2686 .ok_or_else(|| anyhow!("no such project"))?;
2687 if project.host_connection()? != connection {
2688 return Err(anyhow!("can't update a project hosted by someone else"))?;
2689 }
2690
2691 if let Some(content) = &update.content {
2692 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
2693 project_id: ActiveValue::Set(project_id),
2694 worktree_id: ActiveValue::Set(update.worktree_id as i64),
2695 path: ActiveValue::Set(update.path.clone()),
2696 content: ActiveValue::Set(content.clone()),
2697 })
2698 .on_conflict(
2699 OnConflict::columns([
2700 worktree_settings_file::Column::ProjectId,
2701 worktree_settings_file::Column::WorktreeId,
2702 worktree_settings_file::Column::Path,
2703 ])
2704 .update_column(worktree_settings_file::Column::Content)
2705 .to_owned(),
2706 )
2707 .exec(&*tx)
2708 .await?;
2709 } else {
2710 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
2711 project_id: ActiveValue::Set(project_id),
2712 worktree_id: ActiveValue::Set(update.worktree_id as i64),
2713 path: ActiveValue::Set(update.path.clone()),
2714 ..Default::default()
2715 })
2716 .exec(&*tx)
2717 .await?;
2718 }
2719
2720 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2721 Ok(connection_ids)
2722 })
2723 .await
2724 }
2725
2726 pub async fn join_project(
2727 &self,
2728 project_id: ProjectId,
2729 connection: ConnectionId,
2730 ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2731 let room_id = self.room_id_for_project(project_id).await?;
2732 self.room_transaction(room_id, |tx| async move {
2733 let participant = room_participant::Entity::find()
2734 .filter(
2735 Condition::all()
2736 .add(
2737 room_participant::Column::AnsweringConnectionId
2738 .eq(connection.id as i32),
2739 )
2740 .add(
2741 room_participant::Column::AnsweringConnectionServerId
2742 .eq(connection.owner_id as i32),
2743 ),
2744 )
2745 .one(&*tx)
2746 .await?
2747 .ok_or_else(|| anyhow!("must join a room first"))?;
2748
2749 let project = project::Entity::find_by_id(project_id)
2750 .one(&*tx)
2751 .await?
2752 .ok_or_else(|| anyhow!("no such project"))?;
2753 if project.room_id != participant.room_id {
2754 return Err(anyhow!("no such project"))?;
2755 }
2756
2757 let mut collaborators = project
2758 .find_related(project_collaborator::Entity)
2759 .all(&*tx)
2760 .await?;
2761 let replica_ids = collaborators
2762 .iter()
2763 .map(|c| c.replica_id)
2764 .collect::<HashSet<_>>();
2765 let mut replica_id = ReplicaId(1);
2766 while replica_ids.contains(&replica_id) {
2767 replica_id.0 += 1;
2768 }
2769 let new_collaborator = project_collaborator::ActiveModel {
2770 project_id: ActiveValue::set(project_id),
2771 connection_id: ActiveValue::set(connection.id as i32),
2772 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2773 user_id: ActiveValue::set(participant.user_id),
2774 replica_id: ActiveValue::set(replica_id),
2775 is_host: ActiveValue::set(false),
2776 ..Default::default()
2777 }
2778 .insert(&*tx)
2779 .await?;
2780 collaborators.push(new_collaborator);
2781
2782 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2783 let mut worktrees = db_worktrees
2784 .into_iter()
2785 .map(|db_worktree| {
2786 (
2787 db_worktree.id as u64,
2788 Worktree {
2789 id: db_worktree.id as u64,
2790 abs_path: db_worktree.abs_path,
2791 root_name: db_worktree.root_name,
2792 visible: db_worktree.visible,
2793 entries: Default::default(),
2794 repository_entries: Default::default(),
2795 diagnostic_summaries: Default::default(),
2796 settings_files: Default::default(),
2797 scan_id: db_worktree.scan_id as u64,
2798 completed_scan_id: db_worktree.completed_scan_id as u64,
2799 },
2800 )
2801 })
2802 .collect::<BTreeMap<_, _>>();
2803
2804 // Populate worktree entries.
2805 {
2806 let mut db_entries = worktree_entry::Entity::find()
2807 .filter(
2808 Condition::all()
2809 .add(worktree_entry::Column::ProjectId.eq(project_id))
2810 .add(worktree_entry::Column::IsDeleted.eq(false)),
2811 )
2812 .stream(&*tx)
2813 .await?;
2814 while let Some(db_entry) = db_entries.next().await {
2815 let db_entry = db_entry?;
2816 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2817 worktree.entries.push(proto::Entry {
2818 id: db_entry.id as u64,
2819 is_dir: db_entry.is_dir,
2820 path: db_entry.path,
2821 inode: db_entry.inode as u64,
2822 mtime: Some(proto::Timestamp {
2823 seconds: db_entry.mtime_seconds as u64,
2824 nanos: db_entry.mtime_nanos as u32,
2825 }),
2826 is_symlink: db_entry.is_symlink,
2827 is_ignored: db_entry.is_ignored,
2828 is_external: db_entry.is_external,
2829 git_status: db_entry.git_status.map(|status| status as i32),
2830 });
2831 }
2832 }
2833 }
2834
2835 // Populate repository entries.
2836 {
2837 let mut db_repository_entries = worktree_repository::Entity::find()
2838 .filter(
2839 Condition::all()
2840 .add(worktree_repository::Column::ProjectId.eq(project_id))
2841 .add(worktree_repository::Column::IsDeleted.eq(false)),
2842 )
2843 .stream(&*tx)
2844 .await?;
2845 while let Some(db_repository_entry) = db_repository_entries.next().await {
2846 let db_repository_entry = db_repository_entry?;
2847 if let Some(worktree) =
2848 worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
2849 {
2850 worktree.repository_entries.insert(
2851 db_repository_entry.work_directory_id as u64,
2852 proto::RepositoryEntry {
2853 work_directory_id: db_repository_entry.work_directory_id as u64,
2854 branch: db_repository_entry.branch,
2855 },
2856 );
2857 }
2858 }
2859 }
2860
2861 // Populate worktree diagnostic summaries.
2862 {
2863 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2864 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2865 .stream(&*tx)
2866 .await?;
2867 while let Some(db_summary) = db_summaries.next().await {
2868 let db_summary = db_summary?;
2869 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2870 worktree
2871 .diagnostic_summaries
2872 .push(proto::DiagnosticSummary {
2873 path: db_summary.path,
2874 language_server_id: db_summary.language_server_id as u64,
2875 error_count: db_summary.error_count as u32,
2876 warning_count: db_summary.warning_count as u32,
2877 });
2878 }
2879 }
2880 }
2881
2882 // Populate worktree settings files
2883 {
2884 let mut db_settings_files = worktree_settings_file::Entity::find()
2885 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
2886 .stream(&*tx)
2887 .await?;
2888 while let Some(db_settings_file) = db_settings_files.next().await {
2889 let db_settings_file = db_settings_file?;
2890 if let Some(worktree) =
2891 worktrees.get_mut(&(db_settings_file.worktree_id as u64))
2892 {
2893 worktree.settings_files.push(WorktreeSettingsFile {
2894 path: db_settings_file.path,
2895 content: db_settings_file.content,
2896 });
2897 }
2898 }
2899 }
2900
2901 // Populate language servers.
2902 let language_servers = project
2903 .find_related(language_server::Entity)
2904 .all(&*tx)
2905 .await?;
2906
2907 let project = Project {
2908 collaborators: collaborators
2909 .into_iter()
2910 .map(|collaborator| ProjectCollaborator {
2911 connection_id: collaborator.connection(),
2912 user_id: collaborator.user_id,
2913 replica_id: collaborator.replica_id,
2914 is_host: collaborator.is_host,
2915 })
2916 .collect(),
2917 worktrees,
2918 language_servers: language_servers
2919 .into_iter()
2920 .map(|language_server| proto::LanguageServer {
2921 id: language_server.id as u64,
2922 name: language_server.name,
2923 })
2924 .collect(),
2925 };
2926 Ok((project, replica_id as ReplicaId))
2927 })
2928 .await
2929 }
2930
2931 pub async fn leave_project(
2932 &self,
2933 project_id: ProjectId,
2934 connection: ConnectionId,
2935 ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
2936 let room_id = self.room_id_for_project(project_id).await?;
2937 self.room_transaction(room_id, |tx| async move {
2938 let result = project_collaborator::Entity::delete_many()
2939 .filter(
2940 Condition::all()
2941 .add(project_collaborator::Column::ProjectId.eq(project_id))
2942 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2943 .add(
2944 project_collaborator::Column::ConnectionServerId
2945 .eq(connection.owner_id as i32),
2946 ),
2947 )
2948 .exec(&*tx)
2949 .await?;
2950 if result.rows_affected == 0 {
2951 Err(anyhow!("not a collaborator on this project"))?;
2952 }
2953
2954 let project = project::Entity::find_by_id(project_id)
2955 .one(&*tx)
2956 .await?
2957 .ok_or_else(|| anyhow!("no such project"))?;
2958 let collaborators = project
2959 .find_related(project_collaborator::Entity)
2960 .all(&*tx)
2961 .await?;
2962 let connection_ids = collaborators
2963 .into_iter()
2964 .map(|collaborator| collaborator.connection())
2965 .collect();
2966
2967 follower::Entity::delete_many()
2968 .filter(
2969 Condition::any()
2970 .add(
2971 Condition::all()
2972 .add(follower::Column::ProjectId.eq(project_id))
2973 .add(
2974 follower::Column::LeaderConnectionServerId
2975 .eq(connection.owner_id),
2976 )
2977 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
2978 )
2979 .add(
2980 Condition::all()
2981 .add(follower::Column::ProjectId.eq(project_id))
2982 .add(
2983 follower::Column::FollowerConnectionServerId
2984 .eq(connection.owner_id),
2985 )
2986 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
2987 ),
2988 )
2989 .exec(&*tx)
2990 .await?;
2991
2992 let room = self.get_room(project.room_id, &tx).await?;
2993 let left_project = LeftProject {
2994 id: project_id,
2995 host_user_id: project.host_user_id,
2996 host_connection_id: project.host_connection()?,
2997 connection_ids,
2998 };
2999 Ok((room, left_project))
3000 })
3001 .await
3002 }
3003
3004 pub async fn project_collaborators(
3005 &self,
3006 project_id: ProjectId,
3007 connection_id: ConnectionId,
3008 ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
3009 let room_id = self.room_id_for_project(project_id).await?;
3010 self.room_transaction(room_id, |tx| async move {
3011 let collaborators = project_collaborator::Entity::find()
3012 .filter(project_collaborator::Column::ProjectId.eq(project_id))
3013 .all(&*tx)
3014 .await?
3015 .into_iter()
3016 .map(|collaborator| ProjectCollaborator {
3017 connection_id: collaborator.connection(),
3018 user_id: collaborator.user_id,
3019 replica_id: collaborator.replica_id,
3020 is_host: collaborator.is_host,
3021 })
3022 .collect::<Vec<_>>();
3023
3024 if collaborators
3025 .iter()
3026 .any(|collaborator| collaborator.connection_id == connection_id)
3027 {
3028 Ok(collaborators)
3029 } else {
3030 Err(anyhow!("no such project"))?
3031 }
3032 })
3033 .await
3034 }
3035
3036 pub async fn project_connection_ids(
3037 &self,
3038 project_id: ProjectId,
3039 connection_id: ConnectionId,
3040 ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
3041 let room_id = self.room_id_for_project(project_id).await?;
3042 self.room_transaction(room_id, |tx| async move {
3043 let mut collaborators = project_collaborator::Entity::find()
3044 .filter(project_collaborator::Column::ProjectId.eq(project_id))
3045 .stream(&*tx)
3046 .await?;
3047
3048 let mut connection_ids = HashSet::default();
3049 while let Some(collaborator) = collaborators.next().await {
3050 let collaborator = collaborator?;
3051 connection_ids.insert(collaborator.connection());
3052 }
3053
3054 if connection_ids.contains(&connection_id) {
3055 Ok(connection_ids)
3056 } else {
3057 Err(anyhow!("no such project"))?
3058 }
3059 })
3060 .await
3061 }
3062
3063 async fn project_guest_connection_ids(
3064 &self,
3065 project_id: ProjectId,
3066 tx: &DatabaseTransaction,
3067 ) -> Result<Vec<ConnectionId>> {
3068 let mut collaborators = project_collaborator::Entity::find()
3069 .filter(
3070 project_collaborator::Column::ProjectId
3071 .eq(project_id)
3072 .and(project_collaborator::Column::IsHost.eq(false)),
3073 )
3074 .stream(tx)
3075 .await?;
3076
3077 let mut guest_connection_ids = Vec::new();
3078 while let Some(collaborator) = collaborators.next().await {
3079 let collaborator = collaborator?;
3080 guest_connection_ids.push(collaborator.connection());
3081 }
3082 Ok(guest_connection_ids)
3083 }
3084
3085 async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
3086 self.transaction(|tx| async move {
3087 let project = project::Entity::find_by_id(project_id)
3088 .one(&*tx)
3089 .await?
3090 .ok_or_else(|| anyhow!("project {} not found", project_id))?;
3091 Ok(project.room_id)
3092 })
3093 .await
3094 }
3095
3096 // access tokens
3097
3098 pub async fn create_access_token(
3099 &self,
3100 user_id: UserId,
3101 access_token_hash: &str,
3102 max_access_token_count: usize,
3103 ) -> Result<AccessTokenId> {
3104 self.transaction(|tx| async {
3105 let tx = tx;
3106
3107 let token = access_token::ActiveModel {
3108 user_id: ActiveValue::set(user_id),
3109 hash: ActiveValue::set(access_token_hash.into()),
3110 ..Default::default()
3111 }
3112 .insert(&*tx)
3113 .await?;
3114
3115 access_token::Entity::delete_many()
3116 .filter(
3117 access_token::Column::Id.in_subquery(
3118 Query::select()
3119 .column(access_token::Column::Id)
3120 .from(access_token::Entity)
3121 .and_where(access_token::Column::UserId.eq(user_id))
3122 .order_by(access_token::Column::Id, sea_orm::Order::Desc)
3123 .limit(10000)
3124 .offset(max_access_token_count as u64)
3125 .to_owned(),
3126 ),
3127 )
3128 .exec(&*tx)
3129 .await?;
3130 Ok(token.id)
3131 })
3132 .await
3133 }
3134
3135 pub async fn get_access_token(
3136 &self,
3137 access_token_id: AccessTokenId,
3138 ) -> Result<access_token::Model> {
3139 self.transaction(|tx| async move {
3140 Ok(access_token::Entity::find_by_id(access_token_id)
3141 .one(&*tx)
3142 .await?
3143 .ok_or_else(|| anyhow!("no such access token"))?)
3144 })
3145 .await
3146 }
3147
3148 // channels
3149
3150 pub async fn create_root_channel(
3151 &self,
3152 name: &str,
3153 live_kit_room: &str,
3154 creator_id: UserId,
3155 ) -> Result<ChannelId> {
3156 self.create_channel(name, None, live_kit_room, creator_id)
3157 .await
3158 }
3159
3160 pub async fn create_channel(
3161 &self,
3162 name: &str,
3163 parent: Option<ChannelId>,
3164 live_kit_room: &str,
3165 creator_id: UserId,
3166 ) -> Result<ChannelId> {
3167 self.transaction(move |tx| async move {
3168 if let Some(parent) = parent {
3169 self.check_user_is_channel_admin(parent, creator_id, &*tx)
3170 .await?;
3171 }
3172
3173 let channel = channel::ActiveModel {
3174 name: ActiveValue::Set(name.to_string()),
3175 ..Default::default()
3176 }
3177 .insert(&*tx)
3178 .await?;
3179
3180 if let Some(parent) = parent {
3181 channel_parent::ActiveModel {
3182 child_id: ActiveValue::Set(channel.id),
3183 parent_id: ActiveValue::Set(parent),
3184 }
3185 .insert(&*tx)
3186 .await?;
3187 }
3188
3189 channel_member::ActiveModel {
3190 channel_id: ActiveValue::Set(channel.id),
3191 user_id: ActiveValue::Set(creator_id),
3192 accepted: ActiveValue::Set(true),
3193 admin: ActiveValue::Set(true),
3194 ..Default::default()
3195 }
3196 .insert(&*tx)
3197 .await?;
3198
3199 room::ActiveModel {
3200 channel_id: ActiveValue::Set(Some(channel.id)),
3201 live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
3202 ..Default::default()
3203 }
3204 .insert(&*tx)
3205 .await?;
3206
3207 Ok(channel.id)
3208 })
3209 .await
3210 }
3211
3212 pub async fn remove_channel(
3213 &self,
3214 channel_id: ChannelId,
3215 user_id: UserId,
3216 ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
3217 self.transaction(move |tx| async move {
3218 self.check_user_is_channel_admin(channel_id, user_id, &*tx)
3219 .await?;
3220
3221 // Don't remove descendant channels that have additional parents.
3222 let mut channels_to_remove = self.get_channel_descendants([channel_id], &*tx).await?;
3223 {
3224 let mut channels_to_keep = channel_parent::Entity::find()
3225 .filter(
3226 channel_parent::Column::ChildId
3227 .is_in(
3228 channels_to_remove
3229 .keys()
3230 .copied()
3231 .filter(|&id| id != channel_id),
3232 )
3233 .and(
3234 channel_parent::Column::ParentId
3235 .is_not_in(channels_to_remove.keys().copied()),
3236 ),
3237 )
3238 .stream(&*tx)
3239 .await?;
3240 while let Some(row) = channels_to_keep.next().await {
3241 let row = row?;
3242 channels_to_remove.remove(&row.child_id);
3243 }
3244 }
3245
3246 let members_to_notify: Vec<UserId> = channel_member::Entity::find()
3247 .filter(channel_member::Column::ChannelId.is_in(channels_to_remove.keys().copied()))
3248 .select_only()
3249 .column(channel_member::Column::UserId)
3250 .distinct()
3251 .into_values::<_, QueryUserIds>()
3252 .all(&*tx)
3253 .await?;
3254
3255 channel::Entity::delete_many()
3256 .filter(channel::Column::Id.is_in(channels_to_remove.keys().copied()))
3257 .exec(&*tx)
3258 .await?;
3259
3260 Ok((channels_to_remove.into_keys().collect(), members_to_notify))
3261 })
3262 .await
3263 }
3264
3265 pub async fn invite_channel_member(
3266 &self,
3267 channel_id: ChannelId,
3268 invitee_id: UserId,
3269 inviter_id: UserId,
3270 is_admin: bool,
3271 ) -> Result<()> {
3272 self.transaction(move |tx| async move {
3273 self.check_user_is_channel_admin(channel_id, inviter_id, &*tx)
3274 .await?;
3275
3276 channel_member::ActiveModel {
3277 channel_id: ActiveValue::Set(channel_id),
3278 user_id: ActiveValue::Set(invitee_id),
3279 accepted: ActiveValue::Set(false),
3280 admin: ActiveValue::Set(is_admin),
3281 ..Default::default()
3282 }
3283 .insert(&*tx)
3284 .await?;
3285
3286 Ok(())
3287 })
3288 .await
3289 }
3290
3291 pub async fn respond_to_channel_invite(
3292 &self,
3293 channel_id: ChannelId,
3294 user_id: UserId,
3295 accept: bool,
3296 ) -> Result<()> {
3297 self.transaction(move |tx| async move {
3298 let rows_affected = if accept {
3299 channel_member::Entity::update_many()
3300 .set(channel_member::ActiveModel {
3301 accepted: ActiveValue::Set(accept),
3302 ..Default::default()
3303 })
3304 .filter(
3305 channel_member::Column::ChannelId
3306 .eq(channel_id)
3307 .and(channel_member::Column::UserId.eq(user_id))
3308 .and(channel_member::Column::Accepted.eq(false)),
3309 )
3310 .exec(&*tx)
3311 .await?
3312 .rows_affected
3313 } else {
3314 channel_member::ActiveModel {
3315 channel_id: ActiveValue::Unchanged(channel_id),
3316 user_id: ActiveValue::Unchanged(user_id),
3317 ..Default::default()
3318 }
3319 .delete(&*tx)
3320 .await?
3321 .rows_affected
3322 };
3323
3324 if rows_affected == 0 {
3325 Err(anyhow!("no such invitation"))?;
3326 }
3327
3328 Ok(())
3329 })
3330 .await
3331 }
3332
3333 pub async fn remove_channel_member(
3334 &self,
3335 channel_id: ChannelId,
3336 member_id: UserId,
3337 remover_id: UserId,
3338 ) -> Result<()> {
3339 self.transaction(|tx| async move {
3340 self.check_user_is_channel_admin(channel_id, remover_id, &*tx)
3341 .await?;
3342
3343 let result = channel_member::Entity::delete_many()
3344 .filter(
3345 channel_member::Column::ChannelId
3346 .eq(channel_id)
3347 .and(channel_member::Column::UserId.eq(member_id)),
3348 )
3349 .exec(&*tx)
3350 .await?;
3351
3352 if result.rows_affected == 0 {
3353 Err(anyhow!("no such member"))?;
3354 }
3355
3356 Ok(())
3357 })
3358 .await
3359 }
3360
3361 pub async fn get_channel_invites_for_user(&self, user_id: UserId) -> Result<Vec<Channel>> {
3362 self.transaction(|tx| async move {
3363 let channel_invites = channel_member::Entity::find()
3364 .filter(
3365 channel_member::Column::UserId
3366 .eq(user_id)
3367 .and(channel_member::Column::Accepted.eq(false)),
3368 )
3369 .all(&*tx)
3370 .await?;
3371
3372 let channels = channel::Entity::find()
3373 .filter(
3374 channel::Column::Id.is_in(
3375 channel_invites
3376 .into_iter()
3377 .map(|channel_member| channel_member.channel_id),
3378 ),
3379 )
3380 .all(&*tx)
3381 .await?;
3382
3383 let channels = channels
3384 .into_iter()
3385 .map(|channel| Channel {
3386 id: channel.id,
3387 name: channel.name,
3388 parent_id: None,
3389 })
3390 .collect();
3391
3392 Ok(channels)
3393 })
3394 .await
3395 }
3396
3397 pub async fn get_channels_for_user(
3398 &self,
3399 user_id: UserId,
3400 ) -> Result<(Vec<Channel>, HashMap<ChannelId, Vec<UserId>>)> {
3401 self.transaction(|tx| async move {
3402 let tx = tx;
3403
3404 let starting_channel_ids: Vec<ChannelId> = channel_member::Entity::find()
3405 .filter(
3406 channel_member::Column::UserId
3407 .eq(user_id)
3408 .and(channel_member::Column::Accepted.eq(true)),
3409 )
3410 .select_only()
3411 .column(channel_member::Column::ChannelId)
3412 .into_values::<_, QueryChannelIds>()
3413 .all(&*tx)
3414 .await?;
3415
3416 let parents_by_child_id = self
3417 .get_channel_descendants(starting_channel_ids, &*tx)
3418 .await?;
3419
3420 let mut channels = Vec::with_capacity(parents_by_child_id.len());
3421 {
3422 let mut rows = channel::Entity::find()
3423 .filter(channel::Column::Id.is_in(parents_by_child_id.keys().copied()))
3424 .stream(&*tx)
3425 .await?;
3426 while let Some(row) = rows.next().await {
3427 let row = row?;
3428 channels.push(Channel {
3429 id: row.id,
3430 name: row.name,
3431 parent_id: parents_by_child_id.get(&row.id).copied().flatten(),
3432 });
3433 }
3434 }
3435
3436 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
3437 enum QueryUserIdsAndChannelIds {
3438 ChannelId,
3439 UserId,
3440 }
3441
3442 let mut participants_by_channel: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
3443 {
3444 let mut rows = room_participant::Entity::find()
3445 .inner_join(room::Entity)
3446 .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
3447 .select_only()
3448 .column(room::Column::ChannelId)
3449 .column(room_participant::Column::UserId)
3450 .into_values::<_, QueryUserIdsAndChannelIds>()
3451 .stream(&*tx)
3452 .await?;
3453 while let Some(row) = rows.next().await {
3454 let row: (ChannelId, UserId) = row?;
3455 participants_by_channel
3456 .entry(row.0)
3457 .or_default()
3458 .push(row.1)
3459 }
3460 }
3461
3462 Ok((channels, participants_by_channel))
3463 })
3464 .await
3465 }
3466
3467 pub async fn get_channel_members(&self, id: ChannelId) -> Result<Vec<UserId>> {
3468 self.transaction(|tx| async move { self.get_channel_members_internal(id, &*tx).await })
3469 .await
3470 }
3471
3472 pub async fn get_channel_member_details(
3473 &self,
3474 channel_id: ChannelId,
3475 user_id: UserId,
3476 ) -> Result<Vec<proto::ChannelMember>> {
3477 self.transaction(|tx| async move {
3478 self.check_user_is_channel_admin(channel_id, user_id, &*tx)
3479 .await?;
3480
3481 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
3482 enum QueryMemberDetails {
3483 UserId,
3484 IsDirectMember,
3485 Accepted,
3486 }
3487
3488 let tx = tx;
3489 let ancestor_ids = self.get_channel_ancestors(channel_id, &*tx).await?;
3490 let mut stream = channel_member::Entity::find()
3491 .distinct()
3492 .filter(channel_member::Column::ChannelId.is_in(ancestor_ids.iter().copied()))
3493 .select_only()
3494 .column(channel_member::Column::UserId)
3495 .column_as(
3496 channel_member::Column::ChannelId.eq(channel_id),
3497 QueryMemberDetails::IsDirectMember,
3498 )
3499 .column(channel_member::Column::Accepted)
3500 .order_by_asc(channel_member::Column::UserId)
3501 .into_values::<_, QueryMemberDetails>()
3502 .stream(&*tx)
3503 .await?;
3504
3505 let mut rows = Vec::<proto::ChannelMember>::new();
3506 while let Some(row) = stream.next().await {
3507 let (user_id, is_direct_member, is_invite_accepted): (UserId, bool, bool) = row?;
3508 let kind = match (is_direct_member, is_invite_accepted) {
3509 (true, true) => proto::channel_member::Kind::Member,
3510 (true, false) => proto::channel_member::Kind::Invitee,
3511 (false, true) => proto::channel_member::Kind::AncestorMember,
3512 (false, false) => continue,
3513 };
3514 let user_id = user_id.to_proto();
3515 let kind = kind.into();
3516 if let Some(last_row) = rows.last_mut() {
3517 if last_row.user_id == user_id {
3518 last_row.kind = last_row.kind.min(kind);
3519 continue;
3520 }
3521 }
3522 rows.push(proto::ChannelMember { user_id, kind });
3523 }
3524
3525 Ok(rows)
3526 })
3527 .await
3528 }
3529
3530 pub async fn get_channel_members_internal(
3531 &self,
3532 id: ChannelId,
3533 tx: &DatabaseTransaction,
3534 ) -> Result<Vec<UserId>> {
3535 let ancestor_ids = self.get_channel_ancestors(id, tx).await?;
3536 let user_ids = channel_member::Entity::find()
3537 .distinct()
3538 .filter(channel_member::Column::ChannelId.is_in(ancestor_ids.iter().copied()))
3539 .select_only()
3540 .column(channel_member::Column::UserId)
3541 .into_values::<_, QueryUserIds>()
3542 .all(&*tx)
3543 .await?;
3544 Ok(user_ids)
3545 }
3546
3547 async fn check_user_is_channel_admin(
3548 &self,
3549 channel_id: ChannelId,
3550 user_id: UserId,
3551 tx: &DatabaseTransaction,
3552 ) -> Result<()> {
3553 let channel_ids = self.get_channel_ancestors(channel_id, tx).await?;
3554 channel_member::Entity::find()
3555 .filter(
3556 channel_member::Column::ChannelId
3557 .is_in(channel_ids)
3558 .and(channel_member::Column::UserId.eq(user_id))
3559 .and(channel_member::Column::Admin.eq(true)),
3560 )
3561 .one(&*tx)
3562 .await?
3563 .ok_or_else(|| anyhow!("user is not allowed to remove this channel"))?;
3564 Ok(())
3565 }
3566
3567 async fn get_channel_ancestors(
3568 &self,
3569 channel_id: ChannelId,
3570 tx: &DatabaseTransaction,
3571 ) -> Result<Vec<ChannelId>> {
3572 let sql = format!(
3573 r#"
3574 WITH RECURSIVE channel_tree(child_id, parent_id) AS (
3575 SELECT CAST(NULL as INTEGER) as child_id, root_ids.column1 as parent_id
3576 FROM (VALUES ({})) as root_ids
3577 UNION
3578 SELECT channel_parents.child_id, channel_parents.parent_id
3579 FROM channel_parents, channel_tree
3580 WHERE channel_parents.child_id = channel_tree.parent_id
3581 )
3582 SELECT DISTINCT channel_tree.parent_id
3583 FROM channel_tree
3584 "#,
3585 channel_id
3586 );
3587
3588 #[derive(FromQueryResult, Debug, PartialEq)]
3589 pub struct ChannelParent {
3590 pub parent_id: ChannelId,
3591 }
3592
3593 let stmt = Statement::from_string(self.pool.get_database_backend(), sql);
3594
3595 let mut channel_ids_stream = channel_parent::Entity::find()
3596 .from_raw_sql(stmt)
3597 .into_model::<ChannelParent>()
3598 .stream(&*tx)
3599 .await?;
3600
3601 let mut channel_ids = vec![];
3602 while let Some(channel_id) = channel_ids_stream.next().await {
3603 channel_ids.push(channel_id?.parent_id);
3604 }
3605
3606 Ok(channel_ids)
3607 }
3608
3609 async fn get_channel_descendants(
3610 &self,
3611 channel_ids: impl IntoIterator<Item = ChannelId>,
3612 tx: &DatabaseTransaction,
3613 ) -> Result<HashMap<ChannelId, Option<ChannelId>>> {
3614 let mut values = String::new();
3615 for id in channel_ids {
3616 if !values.is_empty() {
3617 values.push_str(", ");
3618 }
3619 write!(&mut values, "({})", id).unwrap();
3620 }
3621
3622 if values.is_empty() {
3623 return Ok(HashMap::default());
3624 }
3625
3626 let sql = format!(
3627 r#"
3628 WITH RECURSIVE channel_tree(child_id, parent_id) AS (
3629 SELECT root_ids.column1 as child_id, CAST(NULL as INTEGER) as parent_id
3630 FROM (VALUES {}) as root_ids
3631 UNION
3632 SELECT channel_parents.child_id, channel_parents.parent_id
3633 FROM channel_parents, channel_tree
3634 WHERE channel_parents.parent_id = channel_tree.child_id
3635 )
3636 SELECT channel_tree.child_id, channel_tree.parent_id
3637 FROM channel_tree
3638 ORDER BY child_id, parent_id IS NOT NULL
3639 "#,
3640 values
3641 );
3642
3643 #[derive(FromQueryResult, Debug, PartialEq)]
3644 pub struct ChannelParent {
3645 pub child_id: ChannelId,
3646 pub parent_id: Option<ChannelId>,
3647 }
3648
3649 let stmt = Statement::from_string(self.pool.get_database_backend(), sql);
3650
3651 let mut parents_by_child_id = HashMap::default();
3652 let mut parents = channel_parent::Entity::find()
3653 .from_raw_sql(stmt)
3654 .into_model::<ChannelParent>()
3655 .stream(tx)
3656 .await?;
3657
3658 while let Some(parent) = parents.next().await {
3659 let parent = parent?;
3660 parents_by_child_id.insert(parent.child_id, parent.parent_id);
3661 }
3662
3663 Ok(parents_by_child_id)
3664 }
3665
3666 pub async fn get_channel(&self, channel_id: ChannelId) -> Result<Option<Channel>> {
3667 self.transaction(|tx| async move {
3668 let tx = tx;
3669 let channel = channel::Entity::find_by_id(channel_id).one(&*tx).await?;
3670
3671 Ok(channel.map(|channel| Channel {
3672 id: channel.id,
3673 name: channel.name,
3674 parent_id: None,
3675 }))
3676 })
3677 .await
3678 }
3679
3680 pub async fn room_id_for_channel(&self, channel_id: ChannelId) -> Result<RoomId> {
3681 self.transaction(|tx| async move {
3682 let tx = tx;
3683 let room = channel::Model {
3684 id: channel_id,
3685 ..Default::default()
3686 }
3687 .find_related(room::Entity)
3688 .one(&*tx)
3689 .await?
3690 .ok_or_else(|| anyhow!("invalid channel"))?;
3691 Ok(room.id)
3692 })
3693 .await
3694 }
3695
3696 async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
3697 where
3698 F: Send + Fn(TransactionHandle) -> Fut,
3699 Fut: Send + Future<Output = Result<T>>,
3700 {
3701 let body = async {
3702 let mut i = 0;
3703 loop {
3704 let (tx, result) = self.with_transaction(&f).await?;
3705 match result {
3706 Ok(result) => match tx.commit().await.map_err(Into::into) {
3707 Ok(()) => return Ok(result),
3708 Err(error) => {
3709 if !self.retry_on_serialization_error(&error, i).await {
3710 return Err(error);
3711 }
3712 }
3713 },
3714 Err(error) => {
3715 tx.rollback().await?;
3716 if !self.retry_on_serialization_error(&error, i).await {
3717 return Err(error);
3718 }
3719 }
3720 }
3721 i += 1;
3722 }
3723 };
3724
3725 self.run(body).await
3726 }
3727
3728 async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
3729 where
3730 F: Send + Fn(TransactionHandle) -> Fut,
3731 Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
3732 {
3733 let body = async {
3734 let mut i = 0;
3735 loop {
3736 let (tx, result) = self.with_transaction(&f).await?;
3737 match result {
3738 Ok(Some((room_id, data))) => {
3739 let lock = self.rooms.entry(room_id).or_default().clone();
3740 let _guard = lock.lock_owned().await;
3741 match tx.commit().await.map_err(Into::into) {
3742 Ok(()) => {
3743 return Ok(Some(RoomGuard {
3744 data,
3745 _guard,
3746 _not_send: PhantomData,
3747 }));
3748 }
3749 Err(error) => {
3750 if !self.retry_on_serialization_error(&error, i).await {
3751 return Err(error);
3752 }
3753 }
3754 }
3755 }
3756 Ok(None) => match tx.commit().await.map_err(Into::into) {
3757 Ok(()) => return Ok(None),
3758 Err(error) => {
3759 if !self.retry_on_serialization_error(&error, i).await {
3760 return Err(error);
3761 }
3762 }
3763 },
3764 Err(error) => {
3765 tx.rollback().await?;
3766 if !self.retry_on_serialization_error(&error, i).await {
3767 return Err(error);
3768 }
3769 }
3770 }
3771 i += 1;
3772 }
3773 };
3774
3775 self.run(body).await
3776 }
3777
3778 async fn room_transaction<F, Fut, T>(&self, room_id: RoomId, f: F) -> Result<RoomGuard<T>>
3779 where
3780 F: Send + Fn(TransactionHandle) -> Fut,
3781 Fut: Send + Future<Output = Result<T>>,
3782 {
3783 let body = async {
3784 let mut i = 0;
3785 loop {
3786 let lock = self.rooms.entry(room_id).or_default().clone();
3787 let _guard = lock.lock_owned().await;
3788 let (tx, result) = self.with_transaction(&f).await?;
3789 match result {
3790 Ok(data) => match tx.commit().await.map_err(Into::into) {
3791 Ok(()) => {
3792 return Ok(RoomGuard {
3793 data,
3794 _guard,
3795 _not_send: PhantomData,
3796 });
3797 }
3798 Err(error) => {
3799 if !self.retry_on_serialization_error(&error, i).await {
3800 return Err(error);
3801 }
3802 }
3803 },
3804 Err(error) => {
3805 tx.rollback().await?;
3806 if !self.retry_on_serialization_error(&error, i).await {
3807 return Err(error);
3808 }
3809 }
3810 }
3811 i += 1;
3812 }
3813 };
3814
3815 self.run(body).await
3816 }
3817
3818 async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
3819 where
3820 F: Send + Fn(TransactionHandle) -> Fut,
3821 Fut: Send + Future<Output = Result<T>>,
3822 {
3823 let tx = self
3824 .pool
3825 .begin_with_config(Some(IsolationLevel::Serializable), None)
3826 .await?;
3827
3828 let mut tx = Arc::new(Some(tx));
3829 let result = f(TransactionHandle(tx.clone())).await;
3830 let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
3831 return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
3832 };
3833
3834 Ok((tx, result))
3835 }
3836
3837 async fn run<F, T>(&self, future: F) -> Result<T>
3838 where
3839 F: Future<Output = Result<T>>,
3840 {
3841 #[cfg(test)]
3842 {
3843 if let Executor::Deterministic(executor) = &self.executor {
3844 executor.simulate_random_delay().await;
3845 }
3846
3847 self.runtime.as_ref().unwrap().block_on(future)
3848 }
3849
3850 #[cfg(not(test))]
3851 {
3852 future.await
3853 }
3854 }
3855
3856 async fn retry_on_serialization_error(&self, error: &Error, prev_attempt_count: u32) -> bool {
3857 // If the error is due to a failure to serialize concurrent transactions, then retry
3858 // this transaction after a delay. With each subsequent retry, double the delay duration.
3859 // Also vary the delay randomly in order to ensure different database connections retry
3860 // at different times.
3861 if is_serialization_error(error) {
3862 let base_delay = 4_u64 << prev_attempt_count.min(16);
3863 let randomized_delay = base_delay as f32 * self.rng.lock().await.gen_range(0.5..=2.0);
3864 log::info!(
3865 "retrying transaction after serialization error. delay: {} ms.",
3866 randomized_delay
3867 );
3868 self.executor
3869 .sleep(Duration::from_millis(randomized_delay as u64))
3870 .await;
3871 true
3872 } else {
3873 false
3874 }
3875 }
3876}
3877
3878fn is_serialization_error(error: &Error) -> bool {
3879 const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
3880 match error {
3881 Error::Database(
3882 DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
3883 | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
3884 ) if error
3885 .as_database_error()
3886 .and_then(|error| error.code())
3887 .as_deref()
3888 == Some(SERIALIZATION_FAILURE_CODE) =>
3889 {
3890 true
3891 }
3892 _ => false,
3893 }
3894}
3895
3896struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
3897
3898impl Deref for TransactionHandle {
3899 type Target = DatabaseTransaction;
3900
3901 fn deref(&self) -> &Self::Target {
3902 self.0.as_ref().as_ref().unwrap()
3903 }
3904}
3905
3906pub struct RoomGuard<T> {
3907 data: T,
3908 _guard: OwnedMutexGuard<()>,
3909 _not_send: PhantomData<Rc<()>>,
3910}
3911
3912impl<T> Deref for RoomGuard<T> {
3913 type Target = T;
3914
3915 fn deref(&self) -> &T {
3916 &self.data
3917 }
3918}
3919
3920impl<T> DerefMut for RoomGuard<T> {
3921 fn deref_mut(&mut self) -> &mut T {
3922 &mut self.data
3923 }
3924}
3925
3926#[derive(Debug, Serialize, Deserialize)]
3927pub struct NewUserParams {
3928 pub github_login: String,
3929 pub github_user_id: i32,
3930 pub invite_count: i32,
3931}
3932
3933#[derive(Debug)]
3934pub struct NewUserResult {
3935 pub user_id: UserId,
3936 pub metrics_id: String,
3937 pub inviting_user_id: Option<UserId>,
3938 pub signup_device_id: Option<String>,
3939}
3940
3941#[derive(FromQueryResult, Debug, PartialEq)]
3942pub struct Channel {
3943 pub id: ChannelId,
3944 pub name: String,
3945 pub parent_id: Option<ChannelId>,
3946}
3947
3948fn random_invite_code() -> String {
3949 nanoid::nanoid!(16)
3950}
3951
3952fn random_email_confirmation_code() -> String {
3953 nanoid::nanoid!(64)
3954}
3955
3956macro_rules! id_type {
3957 ($name:ident) => {
3958 #[derive(
3959 Clone,
3960 Copy,
3961 Debug,
3962 Default,
3963 PartialEq,
3964 Eq,
3965 PartialOrd,
3966 Ord,
3967 Hash,
3968 Serialize,
3969 Deserialize,
3970 )]
3971 #[serde(transparent)]
3972 pub struct $name(pub i32);
3973
3974 impl $name {
3975 #[allow(unused)]
3976 pub const MAX: Self = Self(i32::MAX);
3977
3978 #[allow(unused)]
3979 pub fn from_proto(value: u64) -> Self {
3980 Self(value as i32)
3981 }
3982
3983 #[allow(unused)]
3984 pub fn to_proto(self) -> u64 {
3985 self.0 as u64
3986 }
3987 }
3988
3989 impl std::fmt::Display for $name {
3990 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
3991 self.0.fmt(f)
3992 }
3993 }
3994
3995 impl From<$name> for sea_query::Value {
3996 fn from(value: $name) -> Self {
3997 sea_query::Value::Int(Some(value.0))
3998 }
3999 }
4000
4001 impl sea_orm::TryGetable for $name {
4002 fn try_get(
4003 res: &sea_orm::QueryResult,
4004 pre: &str,
4005 col: &str,
4006 ) -> Result<Self, sea_orm::TryGetError> {
4007 Ok(Self(i32::try_get(res, pre, col)?))
4008 }
4009 }
4010
4011 impl sea_query::ValueType for $name {
4012 fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
4013 match v {
4014 Value::TinyInt(Some(int)) => {
4015 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4016 }
4017 Value::SmallInt(Some(int)) => {
4018 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4019 }
4020 Value::Int(Some(int)) => {
4021 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4022 }
4023 Value::BigInt(Some(int)) => {
4024 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4025 }
4026 Value::TinyUnsigned(Some(int)) => {
4027 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4028 }
4029 Value::SmallUnsigned(Some(int)) => {
4030 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4031 }
4032 Value::Unsigned(Some(int)) => {
4033 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4034 }
4035 Value::BigUnsigned(Some(int)) => {
4036 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
4037 }
4038 _ => Err(sea_query::ValueTypeErr),
4039 }
4040 }
4041
4042 fn type_name() -> String {
4043 stringify!($name).into()
4044 }
4045
4046 fn array_type() -> sea_query::ArrayType {
4047 sea_query::ArrayType::Int
4048 }
4049
4050 fn column_type() -> sea_query::ColumnType {
4051 sea_query::ColumnType::Integer(None)
4052 }
4053 }
4054
4055 impl sea_orm::TryFromU64 for $name {
4056 fn try_from_u64(n: u64) -> Result<Self, DbErr> {
4057 Ok(Self(n.try_into().map_err(|_| {
4058 DbErr::ConvertFromU64(concat!(
4059 "error converting ",
4060 stringify!($name),
4061 " to u64"
4062 ))
4063 })?))
4064 }
4065 }
4066
4067 impl sea_query::Nullable for $name {
4068 fn null() -> Value {
4069 Value::Int(None)
4070 }
4071 }
4072 };
4073}
4074
4075id_type!(AccessTokenId);
4076id_type!(ChannelId);
4077id_type!(ChannelMemberId);
4078id_type!(ContactId);
4079id_type!(FollowerId);
4080id_type!(RoomId);
4081id_type!(RoomParticipantId);
4082id_type!(ProjectId);
4083id_type!(ProjectCollaboratorId);
4084id_type!(ReplicaId);
4085id_type!(ServerId);
4086id_type!(SignupId);
4087id_type!(UserId);
4088
4089#[derive(Clone)]
4090pub struct JoinRoom {
4091 pub room: proto::Room,
4092 pub channel_id: Option<ChannelId>,
4093 pub channel_members: Vec<UserId>,
4094}
4095
4096pub struct RejoinedRoom {
4097 pub room: proto::Room,
4098 pub rejoined_projects: Vec<RejoinedProject>,
4099 pub reshared_projects: Vec<ResharedProject>,
4100 pub channel_id: Option<ChannelId>,
4101 pub channel_members: Vec<UserId>,
4102}
4103
4104pub struct ResharedProject {
4105 pub id: ProjectId,
4106 pub old_connection_id: ConnectionId,
4107 pub collaborators: Vec<ProjectCollaborator>,
4108 pub worktrees: Vec<proto::WorktreeMetadata>,
4109}
4110
4111pub struct RejoinedProject {
4112 pub id: ProjectId,
4113 pub old_connection_id: ConnectionId,
4114 pub collaborators: Vec<ProjectCollaborator>,
4115 pub worktrees: Vec<RejoinedWorktree>,
4116 pub language_servers: Vec<proto::LanguageServer>,
4117}
4118
4119#[derive(Debug)]
4120pub struct RejoinedWorktree {
4121 pub id: u64,
4122 pub abs_path: String,
4123 pub root_name: String,
4124 pub visible: bool,
4125 pub updated_entries: Vec<proto::Entry>,
4126 pub removed_entries: Vec<u64>,
4127 pub updated_repositories: Vec<proto::RepositoryEntry>,
4128 pub removed_repositories: Vec<u64>,
4129 pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
4130 pub settings_files: Vec<WorktreeSettingsFile>,
4131 pub scan_id: u64,
4132 pub completed_scan_id: u64,
4133}
4134
4135pub struct LeftRoom {
4136 pub room: proto::Room,
4137 pub channel_id: Option<ChannelId>,
4138 pub channel_members: Vec<UserId>,
4139 pub left_projects: HashMap<ProjectId, LeftProject>,
4140 pub canceled_calls_to_user_ids: Vec<UserId>,
4141 pub deleted: bool,
4142}
4143
4144pub struct RefreshedRoom {
4145 pub room: proto::Room,
4146 pub channel_id: Option<ChannelId>,
4147 pub channel_members: Vec<UserId>,
4148 pub stale_participant_user_ids: Vec<UserId>,
4149 pub canceled_calls_to_user_ids: Vec<UserId>,
4150}
4151
4152pub struct Project {
4153 pub collaborators: Vec<ProjectCollaborator>,
4154 pub worktrees: BTreeMap<u64, Worktree>,
4155 pub language_servers: Vec<proto::LanguageServer>,
4156}
4157
4158pub struct ProjectCollaborator {
4159 pub connection_id: ConnectionId,
4160 pub user_id: UserId,
4161 pub replica_id: ReplicaId,
4162 pub is_host: bool,
4163}
4164
4165impl ProjectCollaborator {
4166 pub fn to_proto(&self) -> proto::Collaborator {
4167 proto::Collaborator {
4168 peer_id: Some(self.connection_id.into()),
4169 replica_id: self.replica_id.0 as u32,
4170 user_id: self.user_id.to_proto(),
4171 }
4172 }
4173}
4174
4175#[derive(Debug)]
4176pub struct LeftProject {
4177 pub id: ProjectId,
4178 pub host_user_id: UserId,
4179 pub host_connection_id: ConnectionId,
4180 pub connection_ids: Vec<ConnectionId>,
4181}
4182
4183pub struct Worktree {
4184 pub id: u64,
4185 pub abs_path: String,
4186 pub root_name: String,
4187 pub visible: bool,
4188 pub entries: Vec<proto::Entry>,
4189 pub repository_entries: BTreeMap<u64, proto::RepositoryEntry>,
4190 pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
4191 pub settings_files: Vec<WorktreeSettingsFile>,
4192 pub scan_id: u64,
4193 pub completed_scan_id: u64,
4194}
4195
4196#[derive(Debug)]
4197pub struct WorktreeSettingsFile {
4198 pub path: String,
4199 pub content: String,
4200}
4201
4202#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
4203enum QueryChannelIds {
4204 ChannelId,
4205}
4206
4207#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
4208enum QueryUserIds {
4209 UserId,
4210}
4211
4212#[cfg(test)]
4213pub use test::*;
4214
4215#[cfg(test)]
4216mod test {
4217 use super::*;
4218 use gpui::executor::Background;
4219 use parking_lot::Mutex;
4220 use sea_orm::ConnectionTrait;
4221 use sqlx::migrate::MigrateDatabase;
4222 use std::sync::Arc;
4223
4224 pub struct TestDb {
4225 pub db: Option<Arc<Database>>,
4226 pub connection: Option<sqlx::AnyConnection>,
4227 }
4228
4229 impl TestDb {
4230 pub fn sqlite(background: Arc<Background>) -> Self {
4231 let url = format!("sqlite::memory:");
4232 let runtime = tokio::runtime::Builder::new_current_thread()
4233 .enable_io()
4234 .enable_time()
4235 .build()
4236 .unwrap();
4237
4238 let mut db = runtime.block_on(async {
4239 let mut options = ConnectOptions::new(url);
4240 options.max_connections(5);
4241 let db = Database::new(options, Executor::Deterministic(background))
4242 .await
4243 .unwrap();
4244 let sql = include_str!(concat!(
4245 env!("CARGO_MANIFEST_DIR"),
4246 "/migrations.sqlite/20221109000000_test_schema.sql"
4247 ));
4248 db.pool
4249 .execute(sea_orm::Statement::from_string(
4250 db.pool.get_database_backend(),
4251 sql.into(),
4252 ))
4253 .await
4254 .unwrap();
4255 db
4256 });
4257
4258 db.runtime = Some(runtime);
4259
4260 Self {
4261 db: Some(Arc::new(db)),
4262 connection: None,
4263 }
4264 }
4265
4266 pub fn postgres(background: Arc<Background>) -> Self {
4267 static LOCK: Mutex<()> = Mutex::new(());
4268
4269 let _guard = LOCK.lock();
4270 let mut rng = StdRng::from_entropy();
4271 let url = format!(
4272 "postgres://postgres@localhost/zed-test-{}",
4273 rng.gen::<u128>()
4274 );
4275 let runtime = tokio::runtime::Builder::new_current_thread()
4276 .enable_io()
4277 .enable_time()
4278 .build()
4279 .unwrap();
4280
4281 let mut db = runtime.block_on(async {
4282 sqlx::Postgres::create_database(&url)
4283 .await
4284 .expect("failed to create test db");
4285 let mut options = ConnectOptions::new(url);
4286 options
4287 .max_connections(5)
4288 .idle_timeout(Duration::from_secs(0));
4289 let db = Database::new(options, Executor::Deterministic(background))
4290 .await
4291 .unwrap();
4292 let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
4293 db.migrate(Path::new(migrations_path), false).await.unwrap();
4294 db
4295 });
4296
4297 db.runtime = Some(runtime);
4298
4299 Self {
4300 db: Some(Arc::new(db)),
4301 connection: None,
4302 }
4303 }
4304
4305 pub fn db(&self) -> &Arc<Database> {
4306 self.db.as_ref().unwrap()
4307 }
4308 }
4309
4310 impl Drop for TestDb {
4311 fn drop(&mut self) {
4312 let db = self.db.take().unwrap();
4313 if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
4314 db.runtime.as_ref().unwrap().block_on(async {
4315 use util::ResultExt;
4316 let query = "
4317 SELECT pg_terminate_backend(pg_stat_activity.pid)
4318 FROM pg_stat_activity
4319 WHERE
4320 pg_stat_activity.datname = current_database() AND
4321 pid <> pg_backend_pid();
4322 ";
4323 db.pool
4324 .execute(sea_orm::Statement::from_string(
4325 db.pool.get_database_backend(),
4326 query.into(),
4327 ))
4328 .await
4329 .log_err();
4330 sqlx::Postgres::drop_database(db.options.get_url())
4331 .await
4332 .log_err();
4333 })
4334 }
4335 }
4336 }
4337}