1mod access_token;
2mod channel;
3mod channel_member;
4mod channel_parent;
5mod contact;
6mod follower;
7mod language_server;
8mod project;
9mod project_collaborator;
10mod room;
11mod room_participant;
12mod server;
13mod signup;
14#[cfg(test)]
15mod tests;
16mod user;
17mod worktree;
18mod worktree_diagnostic_summary;
19mod worktree_entry;
20mod worktree_repository;
21mod worktree_repository_statuses;
22mod worktree_settings_file;
23
24use crate::executor::Executor;
25use crate::{Error, Result};
26use anyhow::anyhow;
27use collections::{BTreeMap, HashMap, HashSet};
28pub use contact::Contact;
29use dashmap::DashMap;
30use futures::StreamExt;
31use hyper::StatusCode;
32use rand::prelude::StdRng;
33use rand::{Rng, SeedableRng};
34use rpc::{proto, ConnectionId};
35use sea_orm::Condition;
36pub use sea_orm::ConnectOptions;
37use sea_orm::{
38 entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
39 DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
40 Statement, TransactionTrait,
41};
42use sea_query::{Alias, Expr, OnConflict, Query};
43use serde::{Deserialize, Serialize};
44pub use signup::{Invite, NewSignup, WaitlistSummary};
45use sqlx::migrate::{Migrate, Migration, MigrationSource};
46use sqlx::Connection;
47use std::fmt::Write as _;
48use std::ops::{Deref, DerefMut};
49use std::path::Path;
50use std::time::Duration;
51use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
52use tokio::sync::{Mutex, OwnedMutexGuard};
53pub use user::Model as User;
54
55pub struct Database {
56 options: ConnectOptions,
57 pool: DatabaseConnection,
58 rooms: DashMap<RoomId, Arc<Mutex<()>>>,
59 rng: Mutex<StdRng>,
60 executor: Executor,
61 #[cfg(test)]
62 runtime: Option<tokio::runtime::Runtime>,
63}
64
65impl Database {
66 pub async fn new(options: ConnectOptions, executor: Executor) -> Result<Self> {
67 Ok(Self {
68 options: options.clone(),
69 pool: sea_orm::Database::connect(options).await?,
70 rooms: DashMap::with_capacity(16384),
71 rng: Mutex::new(StdRng::seed_from_u64(0)),
72 executor,
73 #[cfg(test)]
74 runtime: None,
75 })
76 }
77
78 #[cfg(test)]
79 pub fn reset(&self) {
80 self.rooms.clear();
81 }
82
83 pub async fn migrate(
84 &self,
85 migrations_path: &Path,
86 ignore_checksum_mismatch: bool,
87 ) -> anyhow::Result<Vec<(Migration, Duration)>> {
88 let migrations = MigrationSource::resolve(migrations_path)
89 .await
90 .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
91
92 let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
93
94 connection.ensure_migrations_table().await?;
95 let applied_migrations: HashMap<_, _> = connection
96 .list_applied_migrations()
97 .await?
98 .into_iter()
99 .map(|m| (m.version, m))
100 .collect();
101
102 let mut new_migrations = Vec::new();
103 for migration in migrations {
104 match applied_migrations.get(&migration.version) {
105 Some(applied_migration) => {
106 if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
107 {
108 Err(anyhow!(
109 "checksum mismatch for applied migration {}",
110 migration.description
111 ))?;
112 }
113 }
114 None => {
115 let elapsed = connection.apply(&migration).await?;
116 new_migrations.push((migration, elapsed));
117 }
118 }
119 }
120
121 Ok(new_migrations)
122 }
123
124 pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
125 self.transaction(|tx| async move {
126 let server = server::ActiveModel {
127 environment: ActiveValue::set(environment.into()),
128 ..Default::default()
129 }
130 .insert(&*tx)
131 .await?;
132 Ok(server.id)
133 })
134 .await
135 }
136
137 pub async fn stale_room_ids(
138 &self,
139 environment: &str,
140 new_server_id: ServerId,
141 ) -> Result<Vec<RoomId>> {
142 self.transaction(|tx| async move {
143 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
144 enum QueryAs {
145 RoomId,
146 }
147
148 let stale_server_epochs = self
149 .stale_server_ids(environment, new_server_id, &tx)
150 .await?;
151 Ok(room_participant::Entity::find()
152 .select_only()
153 .column(room_participant::Column::RoomId)
154 .distinct()
155 .filter(
156 room_participant::Column::AnsweringConnectionServerId
157 .is_in(stale_server_epochs),
158 )
159 .into_values::<_, QueryAs>()
160 .all(&*tx)
161 .await?)
162 })
163 .await
164 }
165
166 pub async fn refresh_room(
167 &self,
168 room_id: RoomId,
169 new_server_id: ServerId,
170 ) -> Result<RoomGuard<RefreshedRoom>> {
171 self.room_transaction(room_id, |tx| async move {
172 let stale_participant_filter = Condition::all()
173 .add(room_participant::Column::RoomId.eq(room_id))
174 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
175 .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
176
177 let stale_participant_user_ids = room_participant::Entity::find()
178 .filter(stale_participant_filter.clone())
179 .all(&*tx)
180 .await?
181 .into_iter()
182 .map(|participant| participant.user_id)
183 .collect::<Vec<_>>();
184
185 // Delete participants who failed to reconnect and cancel their calls.
186 let mut canceled_calls_to_user_ids = Vec::new();
187 room_participant::Entity::delete_many()
188 .filter(stale_participant_filter)
189 .exec(&*tx)
190 .await?;
191 let called_participants = room_participant::Entity::find()
192 .filter(
193 Condition::all()
194 .add(
195 room_participant::Column::CallingUserId
196 .is_in(stale_participant_user_ids.iter().copied()),
197 )
198 .add(room_participant::Column::AnsweringConnectionId.is_null()),
199 )
200 .all(&*tx)
201 .await?;
202 room_participant::Entity::delete_many()
203 .filter(
204 room_participant::Column::Id
205 .is_in(called_participants.iter().map(|participant| participant.id)),
206 )
207 .exec(&*tx)
208 .await?;
209 canceled_calls_to_user_ids.extend(
210 called_participants
211 .into_iter()
212 .map(|participant| participant.user_id),
213 );
214
215 let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
216 let channel_members = if let Some(channel_id) = channel_id {
217 self.get_channel_members_internal(channel_id, &tx).await?
218 } else {
219 Vec::new()
220 };
221
222 // Delete the room if it becomes empty.
223 if room.participants.is_empty() {
224 project::Entity::delete_many()
225 .filter(project::Column::RoomId.eq(room_id))
226 .exec(&*tx)
227 .await?;
228 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
229 }
230
231 Ok(RefreshedRoom {
232 room,
233 channel_id,
234 channel_members,
235 stale_participant_user_ids,
236 canceled_calls_to_user_ids,
237 })
238 })
239 .await
240 }
241
242 pub async fn delete_stale_servers(
243 &self,
244 environment: &str,
245 new_server_id: ServerId,
246 ) -> Result<()> {
247 self.transaction(|tx| async move {
248 server::Entity::delete_many()
249 .filter(
250 Condition::all()
251 .add(server::Column::Environment.eq(environment))
252 .add(server::Column::Id.ne(new_server_id)),
253 )
254 .exec(&*tx)
255 .await?;
256 Ok(())
257 })
258 .await
259 }
260
261 async fn stale_server_ids(
262 &self,
263 environment: &str,
264 new_server_id: ServerId,
265 tx: &DatabaseTransaction,
266 ) -> Result<Vec<ServerId>> {
267 let stale_servers = server::Entity::find()
268 .filter(
269 Condition::all()
270 .add(server::Column::Environment.eq(environment))
271 .add(server::Column::Id.ne(new_server_id)),
272 )
273 .all(&*tx)
274 .await?;
275 Ok(stale_servers.into_iter().map(|server| server.id).collect())
276 }
277
278 // users
279
280 pub async fn create_user(
281 &self,
282 email_address: &str,
283 admin: bool,
284 params: NewUserParams,
285 ) -> Result<NewUserResult> {
286 self.transaction(|tx| async {
287 let tx = tx;
288 let user = user::Entity::insert(user::ActiveModel {
289 email_address: ActiveValue::set(Some(email_address.into())),
290 github_login: ActiveValue::set(params.github_login.clone()),
291 github_user_id: ActiveValue::set(Some(params.github_user_id)),
292 admin: ActiveValue::set(admin),
293 metrics_id: ActiveValue::set(Uuid::new_v4()),
294 ..Default::default()
295 })
296 .on_conflict(
297 OnConflict::column(user::Column::GithubLogin)
298 .update_column(user::Column::GithubLogin)
299 .to_owned(),
300 )
301 .exec_with_returning(&*tx)
302 .await?;
303
304 Ok(NewUserResult {
305 user_id: user.id,
306 metrics_id: user.metrics_id.to_string(),
307 signup_device_id: None,
308 inviting_user_id: None,
309 })
310 })
311 .await
312 }
313
314 pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
315 self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
316 .await
317 }
318
319 pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
320 self.transaction(|tx| async {
321 let tx = tx;
322 Ok(user::Entity::find()
323 .filter(user::Column::Id.is_in(ids.iter().copied()))
324 .all(&*tx)
325 .await?)
326 })
327 .await
328 }
329
330 pub async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>> {
331 self.transaction(|tx| async move {
332 Ok(user::Entity::find()
333 .filter(user::Column::GithubLogin.eq(github_login))
334 .one(&*tx)
335 .await?)
336 })
337 .await
338 }
339
340 pub async fn get_or_create_user_by_github_account(
341 &self,
342 github_login: &str,
343 github_user_id: Option<i32>,
344 github_email: Option<&str>,
345 ) -> Result<Option<User>> {
346 self.transaction(|tx| async move {
347 let tx = &*tx;
348 if let Some(github_user_id) = github_user_id {
349 if let Some(user_by_github_user_id) = user::Entity::find()
350 .filter(user::Column::GithubUserId.eq(github_user_id))
351 .one(tx)
352 .await?
353 {
354 let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
355 user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
356 Ok(Some(user_by_github_user_id.update(tx).await?))
357 } else if let Some(user_by_github_login) = user::Entity::find()
358 .filter(user::Column::GithubLogin.eq(github_login))
359 .one(tx)
360 .await?
361 {
362 let mut user_by_github_login = user_by_github_login.into_active_model();
363 user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
364 Ok(Some(user_by_github_login.update(tx).await?))
365 } else {
366 let user = user::Entity::insert(user::ActiveModel {
367 email_address: ActiveValue::set(github_email.map(|email| email.into())),
368 github_login: ActiveValue::set(github_login.into()),
369 github_user_id: ActiveValue::set(Some(github_user_id)),
370 admin: ActiveValue::set(false),
371 invite_count: ActiveValue::set(0),
372 invite_code: ActiveValue::set(None),
373 metrics_id: ActiveValue::set(Uuid::new_v4()),
374 ..Default::default()
375 })
376 .exec_with_returning(&*tx)
377 .await?;
378 Ok(Some(user))
379 }
380 } else {
381 Ok(user::Entity::find()
382 .filter(user::Column::GithubLogin.eq(github_login))
383 .one(tx)
384 .await?)
385 }
386 })
387 .await
388 }
389
390 pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
391 self.transaction(|tx| async move {
392 Ok(user::Entity::find()
393 .order_by_asc(user::Column::GithubLogin)
394 .limit(limit as u64)
395 .offset(page as u64 * limit as u64)
396 .all(&*tx)
397 .await?)
398 })
399 .await
400 }
401
402 pub async fn get_users_with_no_invites(
403 &self,
404 invited_by_another_user: bool,
405 ) -> Result<Vec<User>> {
406 self.transaction(|tx| async move {
407 Ok(user::Entity::find()
408 .filter(
409 user::Column::InviteCount
410 .eq(0)
411 .and(if invited_by_another_user {
412 user::Column::InviterId.is_not_null()
413 } else {
414 user::Column::InviterId.is_null()
415 }),
416 )
417 .all(&*tx)
418 .await?)
419 })
420 .await
421 }
422
423 pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
424 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
425 enum QueryAs {
426 MetricsId,
427 }
428
429 self.transaction(|tx| async move {
430 let metrics_id: Uuid = user::Entity::find_by_id(id)
431 .select_only()
432 .column(user::Column::MetricsId)
433 .into_values::<_, QueryAs>()
434 .one(&*tx)
435 .await?
436 .ok_or_else(|| anyhow!("could not find user"))?;
437 Ok(metrics_id.to_string())
438 })
439 .await
440 }
441
442 pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
443 self.transaction(|tx| async move {
444 user::Entity::update_many()
445 .filter(user::Column::Id.eq(id))
446 .set(user::ActiveModel {
447 admin: ActiveValue::set(is_admin),
448 ..Default::default()
449 })
450 .exec(&*tx)
451 .await?;
452 Ok(())
453 })
454 .await
455 }
456
457 pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
458 self.transaction(|tx| async move {
459 user::Entity::update_many()
460 .filter(user::Column::Id.eq(id))
461 .set(user::ActiveModel {
462 connected_once: ActiveValue::set(connected_once),
463 ..Default::default()
464 })
465 .exec(&*tx)
466 .await?;
467 Ok(())
468 })
469 .await
470 }
471
472 pub async fn destroy_user(&self, id: UserId) -> Result<()> {
473 self.transaction(|tx| async move {
474 access_token::Entity::delete_many()
475 .filter(access_token::Column::UserId.eq(id))
476 .exec(&*tx)
477 .await?;
478 user::Entity::delete_by_id(id).exec(&*tx).await?;
479 Ok(())
480 })
481 .await
482 }
483
484 // contacts
485
486 pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
487 #[derive(Debug, FromQueryResult)]
488 struct ContactWithUserBusyStatuses {
489 user_id_a: UserId,
490 user_id_b: UserId,
491 a_to_b: bool,
492 accepted: bool,
493 should_notify: bool,
494 user_a_busy: bool,
495 user_b_busy: bool,
496 }
497
498 self.transaction(|tx| async move {
499 let user_a_participant = Alias::new("user_a_participant");
500 let user_b_participant = Alias::new("user_b_participant");
501 let mut db_contacts = contact::Entity::find()
502 .column_as(
503 Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
504 .is_not_null(),
505 "user_a_busy",
506 )
507 .column_as(
508 Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
509 .is_not_null(),
510 "user_b_busy",
511 )
512 .filter(
513 contact::Column::UserIdA
514 .eq(user_id)
515 .or(contact::Column::UserIdB.eq(user_id)),
516 )
517 .join_as(
518 JoinType::LeftJoin,
519 contact::Relation::UserARoomParticipant.def(),
520 user_a_participant,
521 )
522 .join_as(
523 JoinType::LeftJoin,
524 contact::Relation::UserBRoomParticipant.def(),
525 user_b_participant,
526 )
527 .into_model::<ContactWithUserBusyStatuses>()
528 .stream(&*tx)
529 .await?;
530
531 let mut contacts = Vec::new();
532 while let Some(db_contact) = db_contacts.next().await {
533 let db_contact = db_contact?;
534 if db_contact.user_id_a == user_id {
535 if db_contact.accepted {
536 contacts.push(Contact::Accepted {
537 user_id: db_contact.user_id_b,
538 should_notify: db_contact.should_notify && db_contact.a_to_b,
539 busy: db_contact.user_b_busy,
540 });
541 } else if db_contact.a_to_b {
542 contacts.push(Contact::Outgoing {
543 user_id: db_contact.user_id_b,
544 })
545 } else {
546 contacts.push(Contact::Incoming {
547 user_id: db_contact.user_id_b,
548 should_notify: db_contact.should_notify,
549 });
550 }
551 } else if db_contact.accepted {
552 contacts.push(Contact::Accepted {
553 user_id: db_contact.user_id_a,
554 should_notify: db_contact.should_notify && !db_contact.a_to_b,
555 busy: db_contact.user_a_busy,
556 });
557 } else if db_contact.a_to_b {
558 contacts.push(Contact::Incoming {
559 user_id: db_contact.user_id_a,
560 should_notify: db_contact.should_notify,
561 });
562 } else {
563 contacts.push(Contact::Outgoing {
564 user_id: db_contact.user_id_a,
565 });
566 }
567 }
568
569 contacts.sort_unstable_by_key(|contact| contact.user_id());
570
571 Ok(contacts)
572 })
573 .await
574 }
575
576 pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
577 self.transaction(|tx| async move {
578 let participant = room_participant::Entity::find()
579 .filter(room_participant::Column::UserId.eq(user_id))
580 .one(&*tx)
581 .await?;
582 Ok(participant.is_some())
583 })
584 .await
585 }
586
587 pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
588 self.transaction(|tx| async move {
589 let (id_a, id_b) = if user_id_1 < user_id_2 {
590 (user_id_1, user_id_2)
591 } else {
592 (user_id_2, user_id_1)
593 };
594
595 Ok(contact::Entity::find()
596 .filter(
597 contact::Column::UserIdA
598 .eq(id_a)
599 .and(contact::Column::UserIdB.eq(id_b))
600 .and(contact::Column::Accepted.eq(true)),
601 )
602 .one(&*tx)
603 .await?
604 .is_some())
605 })
606 .await
607 }
608
609 pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
610 self.transaction(|tx| async move {
611 let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
612 (sender_id, receiver_id, true)
613 } else {
614 (receiver_id, sender_id, false)
615 };
616
617 let rows_affected = contact::Entity::insert(contact::ActiveModel {
618 user_id_a: ActiveValue::set(id_a),
619 user_id_b: ActiveValue::set(id_b),
620 a_to_b: ActiveValue::set(a_to_b),
621 accepted: ActiveValue::set(false),
622 should_notify: ActiveValue::set(true),
623 ..Default::default()
624 })
625 .on_conflict(
626 OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
627 .values([
628 (contact::Column::Accepted, true.into()),
629 (contact::Column::ShouldNotify, false.into()),
630 ])
631 .action_and_where(
632 contact::Column::Accepted.eq(false).and(
633 contact::Column::AToB
634 .eq(a_to_b)
635 .and(contact::Column::UserIdA.eq(id_b))
636 .or(contact::Column::AToB
637 .ne(a_to_b)
638 .and(contact::Column::UserIdA.eq(id_a))),
639 ),
640 )
641 .to_owned(),
642 )
643 .exec_without_returning(&*tx)
644 .await?;
645
646 if rows_affected == 1 {
647 Ok(())
648 } else {
649 Err(anyhow!("contact already requested"))?
650 }
651 })
652 .await
653 }
654
655 /// Returns a bool indicating whether the removed contact had originally accepted or not
656 ///
657 /// Deletes the contact identified by the requester and responder ids, and then returns
658 /// whether the deleted contact had originally accepted or was a pending contact request.
659 ///
660 /// # Arguments
661 ///
662 /// * `requester_id` - The user that initiates this request
663 /// * `responder_id` - The user that will be removed
664 pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<bool> {
665 self.transaction(|tx| async move {
666 let (id_a, id_b) = if responder_id < requester_id {
667 (responder_id, requester_id)
668 } else {
669 (requester_id, responder_id)
670 };
671
672 let contact = contact::Entity::find()
673 .filter(
674 contact::Column::UserIdA
675 .eq(id_a)
676 .and(contact::Column::UserIdB.eq(id_b)),
677 )
678 .one(&*tx)
679 .await?
680 .ok_or_else(|| anyhow!("no such contact"))?;
681
682 contact::Entity::delete_by_id(contact.id).exec(&*tx).await?;
683 Ok(contact.accepted)
684 })
685 .await
686 }
687
688 pub async fn dismiss_contact_notification(
689 &self,
690 user_id: UserId,
691 contact_user_id: UserId,
692 ) -> Result<()> {
693 self.transaction(|tx| async move {
694 let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
695 (user_id, contact_user_id, true)
696 } else {
697 (contact_user_id, user_id, false)
698 };
699
700 let result = contact::Entity::update_many()
701 .set(contact::ActiveModel {
702 should_notify: ActiveValue::set(false),
703 ..Default::default()
704 })
705 .filter(
706 contact::Column::UserIdA
707 .eq(id_a)
708 .and(contact::Column::UserIdB.eq(id_b))
709 .and(
710 contact::Column::AToB
711 .eq(a_to_b)
712 .and(contact::Column::Accepted.eq(true))
713 .or(contact::Column::AToB
714 .ne(a_to_b)
715 .and(contact::Column::Accepted.eq(false))),
716 ),
717 )
718 .exec(&*tx)
719 .await?;
720 if result.rows_affected == 0 {
721 Err(anyhow!("no such contact request"))?
722 } else {
723 Ok(())
724 }
725 })
726 .await
727 }
728
729 pub async fn respond_to_contact_request(
730 &self,
731 responder_id: UserId,
732 requester_id: UserId,
733 accept: bool,
734 ) -> Result<()> {
735 self.transaction(|tx| async move {
736 let (id_a, id_b, a_to_b) = if responder_id < requester_id {
737 (responder_id, requester_id, false)
738 } else {
739 (requester_id, responder_id, true)
740 };
741 let rows_affected = if accept {
742 let result = contact::Entity::update_many()
743 .set(contact::ActiveModel {
744 accepted: ActiveValue::set(true),
745 should_notify: ActiveValue::set(true),
746 ..Default::default()
747 })
748 .filter(
749 contact::Column::UserIdA
750 .eq(id_a)
751 .and(contact::Column::UserIdB.eq(id_b))
752 .and(contact::Column::AToB.eq(a_to_b)),
753 )
754 .exec(&*tx)
755 .await?;
756 result.rows_affected
757 } else {
758 let result = contact::Entity::delete_many()
759 .filter(
760 contact::Column::UserIdA
761 .eq(id_a)
762 .and(contact::Column::UserIdB.eq(id_b))
763 .and(contact::Column::AToB.eq(a_to_b))
764 .and(contact::Column::Accepted.eq(false)),
765 )
766 .exec(&*tx)
767 .await?;
768
769 result.rows_affected
770 };
771
772 if rows_affected == 1 {
773 Ok(())
774 } else {
775 Err(anyhow!("no such contact request"))?
776 }
777 })
778 .await
779 }
780
781 pub fn fuzzy_like_string(string: &str) -> String {
782 let mut result = String::with_capacity(string.len() * 2 + 1);
783 for c in string.chars() {
784 if c.is_alphanumeric() {
785 result.push('%');
786 result.push(c);
787 }
788 }
789 result.push('%');
790 result
791 }
792
793 pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
794 self.transaction(|tx| async {
795 let tx = tx;
796 let like_string = Self::fuzzy_like_string(name_query);
797 let query = "
798 SELECT users.*
799 FROM users
800 WHERE github_login ILIKE $1
801 ORDER BY github_login <-> $2
802 LIMIT $3
803 ";
804
805 Ok(user::Entity::find()
806 .from_raw_sql(Statement::from_sql_and_values(
807 self.pool.get_database_backend(),
808 query.into(),
809 vec![like_string.into(), name_query.into(), limit.into()],
810 ))
811 .all(&*tx)
812 .await?)
813 })
814 .await
815 }
816
817 // signups
818
819 pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
820 self.transaction(|tx| async move {
821 signup::Entity::insert(signup::ActiveModel {
822 email_address: ActiveValue::set(signup.email_address.clone()),
823 email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
824 email_confirmation_sent: ActiveValue::set(false),
825 platform_mac: ActiveValue::set(signup.platform_mac),
826 platform_windows: ActiveValue::set(signup.platform_windows),
827 platform_linux: ActiveValue::set(signup.platform_linux),
828 platform_unknown: ActiveValue::set(false),
829 editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
830 programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
831 device_id: ActiveValue::set(signup.device_id.clone()),
832 added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
833 ..Default::default()
834 })
835 .on_conflict(
836 OnConflict::column(signup::Column::EmailAddress)
837 .update_columns([
838 signup::Column::PlatformMac,
839 signup::Column::PlatformWindows,
840 signup::Column::PlatformLinux,
841 signup::Column::EditorFeatures,
842 signup::Column::ProgrammingLanguages,
843 signup::Column::DeviceId,
844 signup::Column::AddedToMailingList,
845 ])
846 .to_owned(),
847 )
848 .exec(&*tx)
849 .await?;
850 Ok(())
851 })
852 .await
853 }
854
855 pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
856 self.transaction(|tx| async move {
857 let signup = signup::Entity::find()
858 .filter(signup::Column::EmailAddress.eq(email_address))
859 .one(&*tx)
860 .await?
861 .ok_or_else(|| {
862 anyhow!("signup with email address {} doesn't exist", email_address)
863 })?;
864
865 Ok(signup)
866 })
867 .await
868 }
869
870 pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
871 self.transaction(|tx| async move {
872 let query = "
873 SELECT
874 COUNT(*) as count,
875 COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
876 COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
877 COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
878 COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
879 FROM (
880 SELECT *
881 FROM signups
882 WHERE
883 NOT email_confirmation_sent
884 ) AS unsent
885 ";
886 Ok(
887 WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
888 self.pool.get_database_backend(),
889 query.into(),
890 vec![],
891 ))
892 .one(&*tx)
893 .await?
894 .ok_or_else(|| anyhow!("invalid result"))?,
895 )
896 })
897 .await
898 }
899
900 pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
901 let emails = invites
902 .iter()
903 .map(|s| s.email_address.as_str())
904 .collect::<Vec<_>>();
905 self.transaction(|tx| async {
906 let tx = tx;
907 signup::Entity::update_many()
908 .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
909 .set(signup::ActiveModel {
910 email_confirmation_sent: ActiveValue::set(true),
911 ..Default::default()
912 })
913 .exec(&*tx)
914 .await?;
915 Ok(())
916 })
917 .await
918 }
919
920 pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
921 self.transaction(|tx| async move {
922 Ok(signup::Entity::find()
923 .select_only()
924 .column(signup::Column::EmailAddress)
925 .column(signup::Column::EmailConfirmationCode)
926 .filter(
927 signup::Column::EmailConfirmationSent.eq(false).and(
928 signup::Column::PlatformMac
929 .eq(true)
930 .or(signup::Column::PlatformUnknown.eq(true)),
931 ),
932 )
933 .order_by_asc(signup::Column::CreatedAt)
934 .limit(count as u64)
935 .into_model()
936 .all(&*tx)
937 .await?)
938 })
939 .await
940 }
941
942 // invite codes
943
944 pub async fn create_invite_from_code(
945 &self,
946 code: &str,
947 email_address: &str,
948 device_id: Option<&str>,
949 added_to_mailing_list: bool,
950 ) -> Result<Invite> {
951 self.transaction(|tx| async move {
952 let existing_user = user::Entity::find()
953 .filter(user::Column::EmailAddress.eq(email_address))
954 .one(&*tx)
955 .await?;
956
957 if existing_user.is_some() {
958 Err(anyhow!("email address is already in use"))?;
959 }
960
961 let inviting_user_with_invites = match user::Entity::find()
962 .filter(
963 user::Column::InviteCode
964 .eq(code)
965 .and(user::Column::InviteCount.gt(0)),
966 )
967 .one(&*tx)
968 .await?
969 {
970 Some(inviting_user) => inviting_user,
971 None => {
972 return Err(Error::Http(
973 StatusCode::UNAUTHORIZED,
974 "unable to find an invite code with invites remaining".to_string(),
975 ))?
976 }
977 };
978 user::Entity::update_many()
979 .filter(
980 user::Column::Id
981 .eq(inviting_user_with_invites.id)
982 .and(user::Column::InviteCount.gt(0)),
983 )
984 .col_expr(
985 user::Column::InviteCount,
986 Expr::col(user::Column::InviteCount).sub(1),
987 )
988 .exec(&*tx)
989 .await?;
990
991 let signup = signup::Entity::insert(signup::ActiveModel {
992 email_address: ActiveValue::set(email_address.into()),
993 email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
994 email_confirmation_sent: ActiveValue::set(false),
995 inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
996 platform_linux: ActiveValue::set(false),
997 platform_mac: ActiveValue::set(false),
998 platform_windows: ActiveValue::set(false),
999 platform_unknown: ActiveValue::set(true),
1000 device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
1001 added_to_mailing_list: ActiveValue::set(added_to_mailing_list),
1002 ..Default::default()
1003 })
1004 .on_conflict(
1005 OnConflict::column(signup::Column::EmailAddress)
1006 .update_column(signup::Column::InvitingUserId)
1007 .to_owned(),
1008 )
1009 .exec_with_returning(&*tx)
1010 .await?;
1011
1012 Ok(Invite {
1013 email_address: signup.email_address,
1014 email_confirmation_code: signup.email_confirmation_code,
1015 })
1016 })
1017 .await
1018 }
1019
1020 pub async fn create_user_from_invite(
1021 &self,
1022 invite: &Invite,
1023 user: NewUserParams,
1024 ) -> Result<Option<NewUserResult>> {
1025 self.transaction(|tx| async {
1026 let tx = tx;
1027 let signup = signup::Entity::find()
1028 .filter(
1029 signup::Column::EmailAddress
1030 .eq(invite.email_address.as_str())
1031 .and(
1032 signup::Column::EmailConfirmationCode
1033 .eq(invite.email_confirmation_code.as_str()),
1034 ),
1035 )
1036 .one(&*tx)
1037 .await?
1038 .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
1039
1040 if signup.user_id.is_some() {
1041 return Ok(None);
1042 }
1043
1044 let user = user::Entity::insert(user::ActiveModel {
1045 email_address: ActiveValue::set(Some(invite.email_address.clone())),
1046 github_login: ActiveValue::set(user.github_login.clone()),
1047 github_user_id: ActiveValue::set(Some(user.github_user_id)),
1048 admin: ActiveValue::set(false),
1049 invite_count: ActiveValue::set(user.invite_count),
1050 invite_code: ActiveValue::set(Some(random_invite_code())),
1051 metrics_id: ActiveValue::set(Uuid::new_v4()),
1052 ..Default::default()
1053 })
1054 .on_conflict(
1055 OnConflict::column(user::Column::GithubLogin)
1056 .update_columns([
1057 user::Column::EmailAddress,
1058 user::Column::GithubUserId,
1059 user::Column::Admin,
1060 ])
1061 .to_owned(),
1062 )
1063 .exec_with_returning(&*tx)
1064 .await?;
1065
1066 let mut signup = signup.into_active_model();
1067 signup.user_id = ActiveValue::set(Some(user.id));
1068 let signup = signup.update(&*tx).await?;
1069
1070 if let Some(inviting_user_id) = signup.inviting_user_id {
1071 let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1072 (inviting_user_id, user.id, true)
1073 } else {
1074 (user.id, inviting_user_id, false)
1075 };
1076
1077 contact::Entity::insert(contact::ActiveModel {
1078 user_id_a: ActiveValue::set(user_id_a),
1079 user_id_b: ActiveValue::set(user_id_b),
1080 a_to_b: ActiveValue::set(a_to_b),
1081 should_notify: ActiveValue::set(true),
1082 accepted: ActiveValue::set(true),
1083 ..Default::default()
1084 })
1085 .on_conflict(OnConflict::new().do_nothing().to_owned())
1086 .exec_without_returning(&*tx)
1087 .await?;
1088 }
1089
1090 Ok(Some(NewUserResult {
1091 user_id: user.id,
1092 metrics_id: user.metrics_id.to_string(),
1093 inviting_user_id: signup.inviting_user_id,
1094 signup_device_id: signup.device_id,
1095 }))
1096 })
1097 .await
1098 }
1099
1100 pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1101 self.transaction(|tx| async move {
1102 if count > 0 {
1103 user::Entity::update_many()
1104 .filter(
1105 user::Column::Id
1106 .eq(id)
1107 .and(user::Column::InviteCode.is_null()),
1108 )
1109 .set(user::ActiveModel {
1110 invite_code: ActiveValue::set(Some(random_invite_code())),
1111 ..Default::default()
1112 })
1113 .exec(&*tx)
1114 .await?;
1115 }
1116
1117 user::Entity::update_many()
1118 .filter(user::Column::Id.eq(id))
1119 .set(user::ActiveModel {
1120 invite_count: ActiveValue::set(count),
1121 ..Default::default()
1122 })
1123 .exec(&*tx)
1124 .await?;
1125 Ok(())
1126 })
1127 .await
1128 }
1129
1130 pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1131 self.transaction(|tx| async move {
1132 match user::Entity::find_by_id(id).one(&*tx).await? {
1133 Some(user) if user.invite_code.is_some() => {
1134 Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1135 }
1136 _ => Ok(None),
1137 }
1138 })
1139 .await
1140 }
1141
1142 pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1143 self.transaction(|tx| async move {
1144 user::Entity::find()
1145 .filter(user::Column::InviteCode.eq(code))
1146 .one(&*tx)
1147 .await?
1148 .ok_or_else(|| {
1149 Error::Http(
1150 StatusCode::NOT_FOUND,
1151 "that invite code does not exist".to_string(),
1152 )
1153 })
1154 })
1155 .await
1156 }
1157
1158 // rooms
1159
1160 pub async fn incoming_call_for_user(
1161 &self,
1162 user_id: UserId,
1163 ) -> Result<Option<proto::IncomingCall>> {
1164 self.transaction(|tx| async move {
1165 let pending_participant = room_participant::Entity::find()
1166 .filter(
1167 room_participant::Column::UserId
1168 .eq(user_id)
1169 .and(room_participant::Column::AnsweringConnectionId.is_null()),
1170 )
1171 .one(&*tx)
1172 .await?;
1173
1174 if let Some(pending_participant) = pending_participant {
1175 let room = self.get_room(pending_participant.room_id, &tx).await?;
1176 Ok(Self::build_incoming_call(&room, user_id))
1177 } else {
1178 Ok(None)
1179 }
1180 })
1181 .await
1182 }
1183
1184 pub async fn create_room(
1185 &self,
1186 user_id: UserId,
1187 connection: ConnectionId,
1188 live_kit_room: &str,
1189 ) -> Result<proto::Room> {
1190 self.transaction(|tx| async move {
1191 let room = room::ActiveModel {
1192 live_kit_room: ActiveValue::set(live_kit_room.into()),
1193 ..Default::default()
1194 }
1195 .insert(&*tx)
1196 .await?;
1197 room_participant::ActiveModel {
1198 room_id: ActiveValue::set(room.id),
1199 user_id: ActiveValue::set(user_id),
1200 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1201 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1202 connection.owner_id as i32,
1203 ))),
1204 answering_connection_lost: ActiveValue::set(false),
1205 calling_user_id: ActiveValue::set(user_id),
1206 calling_connection_id: ActiveValue::set(connection.id as i32),
1207 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1208 connection.owner_id as i32,
1209 ))),
1210 ..Default::default()
1211 }
1212 .insert(&*tx)
1213 .await?;
1214
1215 let room = self.get_room(room.id, &tx).await?;
1216 Ok(room)
1217 })
1218 .await
1219 }
1220
1221 pub async fn call(
1222 &self,
1223 room_id: RoomId,
1224 calling_user_id: UserId,
1225 calling_connection: ConnectionId,
1226 called_user_id: UserId,
1227 initial_project_id: Option<ProjectId>,
1228 ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1229 self.room_transaction(room_id, |tx| async move {
1230 room_participant::ActiveModel {
1231 room_id: ActiveValue::set(room_id),
1232 user_id: ActiveValue::set(called_user_id),
1233 answering_connection_lost: ActiveValue::set(false),
1234 calling_user_id: ActiveValue::set(calling_user_id),
1235 calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1236 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1237 calling_connection.owner_id as i32,
1238 ))),
1239 initial_project_id: ActiveValue::set(initial_project_id),
1240 ..Default::default()
1241 }
1242 .insert(&*tx)
1243 .await?;
1244
1245 let room = self.get_room(room_id, &tx).await?;
1246 let incoming_call = Self::build_incoming_call(&room, called_user_id)
1247 .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1248 Ok((room, incoming_call))
1249 })
1250 .await
1251 }
1252
1253 pub async fn call_failed(
1254 &self,
1255 room_id: RoomId,
1256 called_user_id: UserId,
1257 ) -> Result<RoomGuard<proto::Room>> {
1258 self.room_transaction(room_id, |tx| async move {
1259 room_participant::Entity::delete_many()
1260 .filter(
1261 room_participant::Column::RoomId
1262 .eq(room_id)
1263 .and(room_participant::Column::UserId.eq(called_user_id)),
1264 )
1265 .exec(&*tx)
1266 .await?;
1267 let room = self.get_room(room_id, &tx).await?;
1268 Ok(room)
1269 })
1270 .await
1271 }
1272
1273 pub async fn decline_call(
1274 &self,
1275 expected_room_id: Option<RoomId>,
1276 user_id: UserId,
1277 ) -> Result<Option<RoomGuard<proto::Room>>> {
1278 self.optional_room_transaction(|tx| async move {
1279 let mut filter = Condition::all()
1280 .add(room_participant::Column::UserId.eq(user_id))
1281 .add(room_participant::Column::AnsweringConnectionId.is_null());
1282 if let Some(room_id) = expected_room_id {
1283 filter = filter.add(room_participant::Column::RoomId.eq(room_id));
1284 }
1285 let participant = room_participant::Entity::find()
1286 .filter(filter)
1287 .one(&*tx)
1288 .await?;
1289
1290 let participant = if let Some(participant) = participant {
1291 participant
1292 } else if expected_room_id.is_some() {
1293 return Err(anyhow!("could not find call to decline"))?;
1294 } else {
1295 return Ok(None);
1296 };
1297
1298 let room_id = participant.room_id;
1299 room_participant::Entity::delete(participant.into_active_model())
1300 .exec(&*tx)
1301 .await?;
1302
1303 let room = self.get_room(room_id, &tx).await?;
1304 Ok(Some((room_id, room)))
1305 })
1306 .await
1307 }
1308
1309 pub async fn cancel_call(
1310 &self,
1311 room_id: RoomId,
1312 calling_connection: ConnectionId,
1313 called_user_id: UserId,
1314 ) -> Result<RoomGuard<proto::Room>> {
1315 self.room_transaction(room_id, |tx| async move {
1316 let participant = room_participant::Entity::find()
1317 .filter(
1318 Condition::all()
1319 .add(room_participant::Column::UserId.eq(called_user_id))
1320 .add(room_participant::Column::RoomId.eq(room_id))
1321 .add(
1322 room_participant::Column::CallingConnectionId
1323 .eq(calling_connection.id as i32),
1324 )
1325 .add(
1326 room_participant::Column::CallingConnectionServerId
1327 .eq(calling_connection.owner_id as i32),
1328 )
1329 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1330 )
1331 .one(&*tx)
1332 .await?
1333 .ok_or_else(|| anyhow!("no call to cancel"))?;
1334
1335 room_participant::Entity::delete(participant.into_active_model())
1336 .exec(&*tx)
1337 .await?;
1338
1339 let room = self.get_room(room_id, &tx).await?;
1340 Ok(room)
1341 })
1342 .await
1343 }
1344
1345 pub async fn join_room(
1346 &self,
1347 room_id: RoomId,
1348 user_id: UserId,
1349 channel_id: Option<ChannelId>,
1350 connection: ConnectionId,
1351 ) -> Result<RoomGuard<JoinRoom>> {
1352 self.room_transaction(room_id, |tx| async move {
1353 if let Some(channel_id) = channel_id {
1354 channel_member::Entity::find()
1355 .filter(
1356 channel_member::Column::ChannelId
1357 .eq(channel_id)
1358 .and(channel_member::Column::UserId.eq(user_id))
1359 .and(channel_member::Column::Accepted.eq(true)),
1360 )
1361 .one(&*tx)
1362 .await?
1363 .ok_or_else(|| anyhow!("no such channel membership"))?;
1364
1365 room_participant::ActiveModel {
1366 room_id: ActiveValue::set(room_id),
1367 user_id: ActiveValue::set(user_id),
1368 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1369 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1370 connection.owner_id as i32,
1371 ))),
1372 answering_connection_lost: ActiveValue::set(false),
1373 // Redundant for the channel join use case, used for channel and call invitations
1374 calling_user_id: ActiveValue::set(user_id),
1375 calling_connection_id: ActiveValue::set(connection.id as i32),
1376 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1377 connection.owner_id as i32,
1378 ))),
1379 ..Default::default()
1380 }
1381 .insert(&*tx)
1382 .await?;
1383 } else {
1384 let result = room_participant::Entity::update_many()
1385 .filter(
1386 Condition::all()
1387 .add(room_participant::Column::RoomId.eq(room_id))
1388 .add(room_participant::Column::UserId.eq(user_id))
1389 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1390 )
1391 .set(room_participant::ActiveModel {
1392 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1393 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1394 connection.owner_id as i32,
1395 ))),
1396 answering_connection_lost: ActiveValue::set(false),
1397 ..Default::default()
1398 })
1399 .exec(&*tx)
1400 .await?;
1401 if result.rows_affected == 0 {
1402 Err(anyhow!("room does not exist or was already joined"))?;
1403 }
1404 }
1405
1406 let room = self.get_room(room_id, &tx).await?;
1407 let channel_members = if let Some(channel_id) = channel_id {
1408 self.get_channel_members_internal(channel_id, &tx).await?
1409 } else {
1410 Vec::new()
1411 };
1412 Ok(JoinRoom {
1413 room,
1414 channel_id,
1415 channel_members,
1416 })
1417 })
1418 .await
1419 }
1420
1421 pub async fn rejoin_room(
1422 &self,
1423 rejoin_room: proto::RejoinRoom,
1424 user_id: UserId,
1425 connection: ConnectionId,
1426 ) -> Result<RoomGuard<RejoinedRoom>> {
1427 let room_id = RoomId::from_proto(rejoin_room.id);
1428 self.room_transaction(room_id, |tx| async {
1429 let tx = tx;
1430 let participant_update = room_participant::Entity::update_many()
1431 .filter(
1432 Condition::all()
1433 .add(room_participant::Column::RoomId.eq(room_id))
1434 .add(room_participant::Column::UserId.eq(user_id))
1435 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
1436 .add(
1437 Condition::any()
1438 .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1439 .add(
1440 room_participant::Column::AnsweringConnectionServerId
1441 .ne(connection.owner_id as i32),
1442 ),
1443 ),
1444 )
1445 .set(room_participant::ActiveModel {
1446 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1447 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1448 connection.owner_id as i32,
1449 ))),
1450 answering_connection_lost: ActiveValue::set(false),
1451 ..Default::default()
1452 })
1453 .exec(&*tx)
1454 .await?;
1455 if participant_update.rows_affected == 0 {
1456 return Err(anyhow!("room does not exist or was already joined"))?;
1457 }
1458
1459 let mut reshared_projects = Vec::new();
1460 for reshared_project in &rejoin_room.reshared_projects {
1461 let project_id = ProjectId::from_proto(reshared_project.project_id);
1462 let project = project::Entity::find_by_id(project_id)
1463 .one(&*tx)
1464 .await?
1465 .ok_or_else(|| anyhow!("project does not exist"))?;
1466 if project.host_user_id != user_id {
1467 return Err(anyhow!("no such project"))?;
1468 }
1469
1470 let mut collaborators = project
1471 .find_related(project_collaborator::Entity)
1472 .all(&*tx)
1473 .await?;
1474 let host_ix = collaborators
1475 .iter()
1476 .position(|collaborator| {
1477 collaborator.user_id == user_id && collaborator.is_host
1478 })
1479 .ok_or_else(|| anyhow!("host not found among collaborators"))?;
1480 let host = collaborators.swap_remove(host_ix);
1481 let old_connection_id = host.connection();
1482
1483 project::Entity::update(project::ActiveModel {
1484 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
1485 host_connection_server_id: ActiveValue::set(Some(ServerId(
1486 connection.owner_id as i32,
1487 ))),
1488 ..project.into_active_model()
1489 })
1490 .exec(&*tx)
1491 .await?;
1492 project_collaborator::Entity::update(project_collaborator::ActiveModel {
1493 connection_id: ActiveValue::set(connection.id as i32),
1494 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1495 ..host.into_active_model()
1496 })
1497 .exec(&*tx)
1498 .await?;
1499
1500 self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
1501 .await?;
1502
1503 reshared_projects.push(ResharedProject {
1504 id: project_id,
1505 old_connection_id,
1506 collaborators: collaborators
1507 .iter()
1508 .map(|collaborator| ProjectCollaborator {
1509 connection_id: collaborator.connection(),
1510 user_id: collaborator.user_id,
1511 replica_id: collaborator.replica_id,
1512 is_host: collaborator.is_host,
1513 })
1514 .collect(),
1515 worktrees: reshared_project.worktrees.clone(),
1516 });
1517 }
1518
1519 project::Entity::delete_many()
1520 .filter(
1521 Condition::all()
1522 .add(project::Column::RoomId.eq(room_id))
1523 .add(project::Column::HostUserId.eq(user_id))
1524 .add(
1525 project::Column::Id
1526 .is_not_in(reshared_projects.iter().map(|project| project.id)),
1527 ),
1528 )
1529 .exec(&*tx)
1530 .await?;
1531
1532 let mut rejoined_projects = Vec::new();
1533 for rejoined_project in &rejoin_room.rejoined_projects {
1534 let project_id = ProjectId::from_proto(rejoined_project.id);
1535 let Some(project) = project::Entity::find_by_id(project_id)
1536 .one(&*tx)
1537 .await? else { continue };
1538
1539 let mut worktrees = Vec::new();
1540 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
1541 for db_worktree in db_worktrees {
1542 let mut worktree = RejoinedWorktree {
1543 id: db_worktree.id as u64,
1544 abs_path: db_worktree.abs_path,
1545 root_name: db_worktree.root_name,
1546 visible: db_worktree.visible,
1547 updated_entries: Default::default(),
1548 removed_entries: Default::default(),
1549 updated_repositories: Default::default(),
1550 removed_repositories: Default::default(),
1551 diagnostic_summaries: Default::default(),
1552 settings_files: Default::default(),
1553 scan_id: db_worktree.scan_id as u64,
1554 completed_scan_id: db_worktree.completed_scan_id as u64,
1555 };
1556
1557 let rejoined_worktree = rejoined_project
1558 .worktrees
1559 .iter()
1560 .find(|worktree| worktree.id == db_worktree.id as u64);
1561
1562 // File entries
1563 {
1564 let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
1565 worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
1566 } else {
1567 worktree_entry::Column::IsDeleted.eq(false)
1568 };
1569
1570 let mut db_entries = worktree_entry::Entity::find()
1571 .filter(
1572 Condition::all()
1573 .add(worktree_entry::Column::ProjectId.eq(project.id))
1574 .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
1575 .add(entry_filter),
1576 )
1577 .stream(&*tx)
1578 .await?;
1579
1580 while let Some(db_entry) = db_entries.next().await {
1581 let db_entry = db_entry?;
1582 if db_entry.is_deleted {
1583 worktree.removed_entries.push(db_entry.id as u64);
1584 } else {
1585 worktree.updated_entries.push(proto::Entry {
1586 id: db_entry.id as u64,
1587 is_dir: db_entry.is_dir,
1588 path: db_entry.path,
1589 inode: db_entry.inode as u64,
1590 mtime: Some(proto::Timestamp {
1591 seconds: db_entry.mtime_seconds as u64,
1592 nanos: db_entry.mtime_nanos as u32,
1593 }),
1594 is_symlink: db_entry.is_symlink,
1595 is_ignored: db_entry.is_ignored,
1596 is_external: db_entry.is_external,
1597 git_status: db_entry.git_status.map(|status| status as i32),
1598 });
1599 }
1600 }
1601 }
1602
1603 // Repository Entries
1604 {
1605 let repository_entry_filter =
1606 if let Some(rejoined_worktree) = rejoined_worktree {
1607 worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
1608 } else {
1609 worktree_repository::Column::IsDeleted.eq(false)
1610 };
1611
1612 let mut db_repositories = worktree_repository::Entity::find()
1613 .filter(
1614 Condition::all()
1615 .add(worktree_repository::Column::ProjectId.eq(project.id))
1616 .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
1617 .add(repository_entry_filter),
1618 )
1619 .stream(&*tx)
1620 .await?;
1621
1622 while let Some(db_repository) = db_repositories.next().await {
1623 let db_repository = db_repository?;
1624 if db_repository.is_deleted {
1625 worktree
1626 .removed_repositories
1627 .push(db_repository.work_directory_id as u64);
1628 } else {
1629 worktree.updated_repositories.push(proto::RepositoryEntry {
1630 work_directory_id: db_repository.work_directory_id as u64,
1631 branch: db_repository.branch,
1632 });
1633 }
1634 }
1635 }
1636
1637 worktrees.push(worktree);
1638 }
1639
1640 let language_servers = project
1641 .find_related(language_server::Entity)
1642 .all(&*tx)
1643 .await?
1644 .into_iter()
1645 .map(|language_server| proto::LanguageServer {
1646 id: language_server.id as u64,
1647 name: language_server.name,
1648 })
1649 .collect::<Vec<_>>();
1650
1651 {
1652 let mut db_settings_files = worktree_settings_file::Entity::find()
1653 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
1654 .stream(&*tx)
1655 .await?;
1656 while let Some(db_settings_file) = db_settings_files.next().await {
1657 let db_settings_file = db_settings_file?;
1658 if let Some(worktree) = worktrees
1659 .iter_mut()
1660 .find(|w| w.id == db_settings_file.worktree_id as u64)
1661 {
1662 worktree.settings_files.push(WorktreeSettingsFile {
1663 path: db_settings_file.path,
1664 content: db_settings_file.content,
1665 });
1666 }
1667 }
1668 }
1669
1670 let mut collaborators = project
1671 .find_related(project_collaborator::Entity)
1672 .all(&*tx)
1673 .await?;
1674 let self_collaborator = if let Some(self_collaborator_ix) = collaborators
1675 .iter()
1676 .position(|collaborator| collaborator.user_id == user_id)
1677 {
1678 collaborators.swap_remove(self_collaborator_ix)
1679 } else {
1680 continue;
1681 };
1682 let old_connection_id = self_collaborator.connection();
1683 project_collaborator::Entity::update(project_collaborator::ActiveModel {
1684 connection_id: ActiveValue::set(connection.id as i32),
1685 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
1686 ..self_collaborator.into_active_model()
1687 })
1688 .exec(&*tx)
1689 .await?;
1690
1691 let collaborators = collaborators
1692 .into_iter()
1693 .map(|collaborator| ProjectCollaborator {
1694 connection_id: collaborator.connection(),
1695 user_id: collaborator.user_id,
1696 replica_id: collaborator.replica_id,
1697 is_host: collaborator.is_host,
1698 })
1699 .collect::<Vec<_>>();
1700
1701 rejoined_projects.push(RejoinedProject {
1702 id: project_id,
1703 old_connection_id,
1704 collaborators,
1705 worktrees,
1706 language_servers,
1707 });
1708 }
1709
1710 let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
1711
1712 let channel_members = if let Some(channel_id) = channel_id {
1713 self.get_channel_members_internal(channel_id, &tx).await?
1714 } else {
1715 Vec::new()
1716 };
1717
1718 Ok(RejoinedRoom {
1719 room,
1720 channel_id,
1721 channel_members,
1722 rejoined_projects,
1723 reshared_projects,
1724 })
1725 })
1726 .await
1727 }
1728
1729 pub async fn leave_room(
1730 &self,
1731 connection: ConnectionId,
1732 ) -> Result<Option<RoomGuard<LeftRoom>>> {
1733 self.optional_room_transaction(|tx| async move {
1734 let leaving_participant = room_participant::Entity::find()
1735 .filter(
1736 Condition::all()
1737 .add(
1738 room_participant::Column::AnsweringConnectionId
1739 .eq(connection.id as i32),
1740 )
1741 .add(
1742 room_participant::Column::AnsweringConnectionServerId
1743 .eq(connection.owner_id as i32),
1744 ),
1745 )
1746 .one(&*tx)
1747 .await?;
1748
1749 if let Some(leaving_participant) = leaving_participant {
1750 // Leave room.
1751 let room_id = leaving_participant.room_id;
1752 room_participant::Entity::delete_by_id(leaving_participant.id)
1753 .exec(&*tx)
1754 .await?;
1755
1756 // Cancel pending calls initiated by the leaving user.
1757 let called_participants = room_participant::Entity::find()
1758 .filter(
1759 Condition::all()
1760 .add(
1761 room_participant::Column::CallingUserId
1762 .eq(leaving_participant.user_id),
1763 )
1764 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1765 )
1766 .all(&*tx)
1767 .await?;
1768 room_participant::Entity::delete_many()
1769 .filter(
1770 room_participant::Column::Id
1771 .is_in(called_participants.iter().map(|participant| participant.id)),
1772 )
1773 .exec(&*tx)
1774 .await?;
1775 let canceled_calls_to_user_ids = called_participants
1776 .into_iter()
1777 .map(|participant| participant.user_id)
1778 .collect();
1779
1780 // Detect left projects.
1781 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1782 enum QueryProjectIds {
1783 ProjectId,
1784 }
1785 let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1786 .select_only()
1787 .column_as(
1788 project_collaborator::Column::ProjectId,
1789 QueryProjectIds::ProjectId,
1790 )
1791 .filter(
1792 Condition::all()
1793 .add(
1794 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1795 )
1796 .add(
1797 project_collaborator::Column::ConnectionServerId
1798 .eq(connection.owner_id as i32),
1799 ),
1800 )
1801 .into_values::<_, QueryProjectIds>()
1802 .all(&*tx)
1803 .await?;
1804 let mut left_projects = HashMap::default();
1805 let mut collaborators = project_collaborator::Entity::find()
1806 .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1807 .stream(&*tx)
1808 .await?;
1809 while let Some(collaborator) = collaborators.next().await {
1810 let collaborator = collaborator?;
1811 let left_project =
1812 left_projects
1813 .entry(collaborator.project_id)
1814 .or_insert(LeftProject {
1815 id: collaborator.project_id,
1816 host_user_id: Default::default(),
1817 connection_ids: Default::default(),
1818 host_connection_id: Default::default(),
1819 });
1820
1821 let collaborator_connection_id = collaborator.connection();
1822 if collaborator_connection_id != connection {
1823 left_project.connection_ids.push(collaborator_connection_id);
1824 }
1825
1826 if collaborator.is_host {
1827 left_project.host_user_id = collaborator.user_id;
1828 left_project.host_connection_id = collaborator_connection_id;
1829 }
1830 }
1831 drop(collaborators);
1832
1833 // Leave projects.
1834 project_collaborator::Entity::delete_many()
1835 .filter(
1836 Condition::all()
1837 .add(
1838 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1839 )
1840 .add(
1841 project_collaborator::Column::ConnectionServerId
1842 .eq(connection.owner_id as i32),
1843 ),
1844 )
1845 .exec(&*tx)
1846 .await?;
1847
1848 // Unshare projects.
1849 project::Entity::delete_many()
1850 .filter(
1851 Condition::all()
1852 .add(project::Column::RoomId.eq(room_id))
1853 .add(project::Column::HostConnectionId.eq(connection.id as i32))
1854 .add(
1855 project::Column::HostConnectionServerId
1856 .eq(connection.owner_id as i32),
1857 ),
1858 )
1859 .exec(&*tx)
1860 .await?;
1861
1862 let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
1863 let deleted = if room.participants.is_empty() {
1864 let result = room::Entity::delete_by_id(room_id)
1865 .filter(room::Column::ChannelId.is_null())
1866 .exec(&*tx)
1867 .await?;
1868 result.rows_affected > 0
1869 } else {
1870 false
1871 };
1872
1873 let channel_members = if let Some(channel_id) = channel_id {
1874 self.get_channel_members_internal(channel_id, &tx).await?
1875 } else {
1876 Vec::new()
1877 };
1878 let left_room = LeftRoom {
1879 room,
1880 channel_id,
1881 channel_members,
1882 left_projects,
1883 canceled_calls_to_user_ids,
1884 deleted,
1885 };
1886
1887 if left_room.room.participants.is_empty() {
1888 self.rooms.remove(&room_id);
1889 }
1890
1891 Ok(Some((room_id, left_room)))
1892 } else {
1893 Ok(None)
1894 }
1895 })
1896 .await
1897 }
1898
1899 pub async fn follow(
1900 &self,
1901 project_id: ProjectId,
1902 leader_connection: ConnectionId,
1903 follower_connection: ConnectionId,
1904 ) -> Result<RoomGuard<proto::Room>> {
1905 let room_id = self.room_id_for_project(project_id).await?;
1906 self.room_transaction(room_id, |tx| async move {
1907 follower::ActiveModel {
1908 room_id: ActiveValue::set(room_id),
1909 project_id: ActiveValue::set(project_id),
1910 leader_connection_server_id: ActiveValue::set(ServerId(
1911 leader_connection.owner_id as i32,
1912 )),
1913 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1914 follower_connection_server_id: ActiveValue::set(ServerId(
1915 follower_connection.owner_id as i32,
1916 )),
1917 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1918 ..Default::default()
1919 }
1920 .insert(&*tx)
1921 .await?;
1922
1923 let room = self.get_room(room_id, &*tx).await?;
1924 Ok(room)
1925 })
1926 .await
1927 }
1928
1929 pub async fn unfollow(
1930 &self,
1931 project_id: ProjectId,
1932 leader_connection: ConnectionId,
1933 follower_connection: ConnectionId,
1934 ) -> Result<RoomGuard<proto::Room>> {
1935 let room_id = self.room_id_for_project(project_id).await?;
1936 self.room_transaction(room_id, |tx| async move {
1937 follower::Entity::delete_many()
1938 .filter(
1939 Condition::all()
1940 .add(follower::Column::ProjectId.eq(project_id))
1941 .add(
1942 follower::Column::LeaderConnectionServerId
1943 .eq(leader_connection.owner_id),
1944 )
1945 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1946 .add(
1947 follower::Column::FollowerConnectionServerId
1948 .eq(follower_connection.owner_id),
1949 )
1950 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1951 )
1952 .exec(&*tx)
1953 .await?;
1954
1955 let room = self.get_room(room_id, &*tx).await?;
1956 Ok(room)
1957 })
1958 .await
1959 }
1960
1961 pub async fn update_room_participant_location(
1962 &self,
1963 room_id: RoomId,
1964 connection: ConnectionId,
1965 location: proto::ParticipantLocation,
1966 ) -> Result<RoomGuard<proto::Room>> {
1967 self.room_transaction(room_id, |tx| async {
1968 let tx = tx;
1969 let location_kind;
1970 let location_project_id;
1971 match location
1972 .variant
1973 .as_ref()
1974 .ok_or_else(|| anyhow!("invalid location"))?
1975 {
1976 proto::participant_location::Variant::SharedProject(project) => {
1977 location_kind = 0;
1978 location_project_id = Some(ProjectId::from_proto(project.id));
1979 }
1980 proto::participant_location::Variant::UnsharedProject(_) => {
1981 location_kind = 1;
1982 location_project_id = None;
1983 }
1984 proto::participant_location::Variant::External(_) => {
1985 location_kind = 2;
1986 location_project_id = None;
1987 }
1988 }
1989
1990 let result = room_participant::Entity::update_many()
1991 .filter(
1992 Condition::all()
1993 .add(room_participant::Column::RoomId.eq(room_id))
1994 .add(
1995 room_participant::Column::AnsweringConnectionId
1996 .eq(connection.id as i32),
1997 )
1998 .add(
1999 room_participant::Column::AnsweringConnectionServerId
2000 .eq(connection.owner_id as i32),
2001 ),
2002 )
2003 .set(room_participant::ActiveModel {
2004 location_kind: ActiveValue::set(Some(location_kind)),
2005 location_project_id: ActiveValue::set(location_project_id),
2006 ..Default::default()
2007 })
2008 .exec(&*tx)
2009 .await?;
2010
2011 if result.rows_affected == 1 {
2012 let room = self.get_room(room_id, &tx).await?;
2013 Ok(room)
2014 } else {
2015 Err(anyhow!("could not update room participant location"))?
2016 }
2017 })
2018 .await
2019 }
2020
2021 pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
2022 self.transaction(|tx| async move {
2023 let participant = room_participant::Entity::find()
2024 .filter(
2025 Condition::all()
2026 .add(
2027 room_participant::Column::AnsweringConnectionId
2028 .eq(connection.id as i32),
2029 )
2030 .add(
2031 room_participant::Column::AnsweringConnectionServerId
2032 .eq(connection.owner_id as i32),
2033 ),
2034 )
2035 .one(&*tx)
2036 .await?
2037 .ok_or_else(|| anyhow!("not a participant in any room"))?;
2038
2039 room_participant::Entity::update(room_participant::ActiveModel {
2040 answering_connection_lost: ActiveValue::set(true),
2041 ..participant.into_active_model()
2042 })
2043 .exec(&*tx)
2044 .await?;
2045
2046 Ok(())
2047 })
2048 .await
2049 }
2050
2051 fn build_incoming_call(
2052 room: &proto::Room,
2053 called_user_id: UserId,
2054 ) -> Option<proto::IncomingCall> {
2055 let pending_participant = room
2056 .pending_participants
2057 .iter()
2058 .find(|participant| participant.user_id == called_user_id.to_proto())?;
2059
2060 Some(proto::IncomingCall {
2061 room_id: room.id,
2062 calling_user_id: pending_participant.calling_user_id,
2063 participant_user_ids: room
2064 .participants
2065 .iter()
2066 .map(|participant| participant.user_id)
2067 .collect(),
2068 initial_project: room.participants.iter().find_map(|participant| {
2069 let initial_project_id = pending_participant.initial_project_id?;
2070 participant
2071 .projects
2072 .iter()
2073 .find(|project| project.id == initial_project_id)
2074 .cloned()
2075 }),
2076 })
2077 }
2078 async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
2079 let (_, room) = self.get_channel_room(room_id, tx).await?;
2080 Ok(room)
2081 }
2082
2083 async fn get_channel_room(
2084 &self,
2085 room_id: RoomId,
2086 tx: &DatabaseTransaction,
2087 ) -> Result<(Option<ChannelId>, proto::Room)> {
2088 let db_room = room::Entity::find_by_id(room_id)
2089 .one(tx)
2090 .await?
2091 .ok_or_else(|| anyhow!("could not find room"))?;
2092
2093 let mut db_participants = db_room
2094 .find_related(room_participant::Entity)
2095 .stream(tx)
2096 .await?;
2097 let mut participants = HashMap::default();
2098 let mut pending_participants = Vec::new();
2099 while let Some(db_participant) = db_participants.next().await {
2100 let db_participant = db_participant?;
2101 if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
2102 .answering_connection_id
2103 .zip(db_participant.answering_connection_server_id)
2104 {
2105 let location = match (
2106 db_participant.location_kind,
2107 db_participant.location_project_id,
2108 ) {
2109 (Some(0), Some(project_id)) => {
2110 Some(proto::participant_location::Variant::SharedProject(
2111 proto::participant_location::SharedProject {
2112 id: project_id.to_proto(),
2113 },
2114 ))
2115 }
2116 (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
2117 Default::default(),
2118 )),
2119 _ => Some(proto::participant_location::Variant::External(
2120 Default::default(),
2121 )),
2122 };
2123
2124 let answering_connection = ConnectionId {
2125 owner_id: answering_connection_server_id.0 as u32,
2126 id: answering_connection_id as u32,
2127 };
2128 participants.insert(
2129 answering_connection,
2130 proto::Participant {
2131 user_id: db_participant.user_id.to_proto(),
2132 peer_id: Some(answering_connection.into()),
2133 projects: Default::default(),
2134 location: Some(proto::ParticipantLocation { variant: location }),
2135 },
2136 );
2137 } else {
2138 pending_participants.push(proto::PendingParticipant {
2139 user_id: db_participant.user_id.to_proto(),
2140 calling_user_id: db_participant.calling_user_id.to_proto(),
2141 initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
2142 });
2143 }
2144 }
2145 drop(db_participants);
2146
2147 let mut db_projects = db_room
2148 .find_related(project::Entity)
2149 .find_with_related(worktree::Entity)
2150 .stream(tx)
2151 .await?;
2152
2153 while let Some(row) = db_projects.next().await {
2154 let (db_project, db_worktree) = row?;
2155 let host_connection = db_project.host_connection()?;
2156 if let Some(participant) = participants.get_mut(&host_connection) {
2157 let project = if let Some(project) = participant
2158 .projects
2159 .iter_mut()
2160 .find(|project| project.id == db_project.id.to_proto())
2161 {
2162 project
2163 } else {
2164 participant.projects.push(proto::ParticipantProject {
2165 id: db_project.id.to_proto(),
2166 worktree_root_names: Default::default(),
2167 });
2168 participant.projects.last_mut().unwrap()
2169 };
2170
2171 if let Some(db_worktree) = db_worktree {
2172 if db_worktree.visible {
2173 project.worktree_root_names.push(db_worktree.root_name);
2174 }
2175 }
2176 }
2177 }
2178 drop(db_projects);
2179
2180 let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
2181 let mut followers = Vec::new();
2182 while let Some(db_follower) = db_followers.next().await {
2183 let db_follower = db_follower?;
2184 followers.push(proto::Follower {
2185 leader_id: Some(db_follower.leader_connection().into()),
2186 follower_id: Some(db_follower.follower_connection().into()),
2187 project_id: db_follower.project_id.to_proto(),
2188 });
2189 }
2190
2191 Ok((
2192 db_room.channel_id,
2193 proto::Room {
2194 id: db_room.id.to_proto(),
2195 live_kit_room: db_room.live_kit_room,
2196 participants: participants.into_values().collect(),
2197 pending_participants,
2198 followers,
2199 },
2200 ))
2201 }
2202
2203 // projects
2204
2205 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
2206 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2207 enum QueryAs {
2208 Count,
2209 }
2210
2211 self.transaction(|tx| async move {
2212 Ok(project::Entity::find()
2213 .select_only()
2214 .column_as(project::Column::Id.count(), QueryAs::Count)
2215 .inner_join(user::Entity)
2216 .filter(user::Column::Admin.eq(false))
2217 .into_values::<_, QueryAs>()
2218 .one(&*tx)
2219 .await?
2220 .unwrap_or(0i64) as usize)
2221 })
2222 .await
2223 }
2224
2225 pub async fn share_project(
2226 &self,
2227 room_id: RoomId,
2228 connection: ConnectionId,
2229 worktrees: &[proto::WorktreeMetadata],
2230 ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
2231 self.room_transaction(room_id, |tx| async move {
2232 let participant = room_participant::Entity::find()
2233 .filter(
2234 Condition::all()
2235 .add(
2236 room_participant::Column::AnsweringConnectionId
2237 .eq(connection.id as i32),
2238 )
2239 .add(
2240 room_participant::Column::AnsweringConnectionServerId
2241 .eq(connection.owner_id as i32),
2242 ),
2243 )
2244 .one(&*tx)
2245 .await?
2246 .ok_or_else(|| anyhow!("could not find participant"))?;
2247 if participant.room_id != room_id {
2248 return Err(anyhow!("shared project on unexpected room"))?;
2249 }
2250
2251 let project = project::ActiveModel {
2252 room_id: ActiveValue::set(participant.room_id),
2253 host_user_id: ActiveValue::set(participant.user_id),
2254 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
2255 host_connection_server_id: ActiveValue::set(Some(ServerId(
2256 connection.owner_id as i32,
2257 ))),
2258 ..Default::default()
2259 }
2260 .insert(&*tx)
2261 .await?;
2262
2263 if !worktrees.is_empty() {
2264 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
2265 worktree::ActiveModel {
2266 id: ActiveValue::set(worktree.id as i64),
2267 project_id: ActiveValue::set(project.id),
2268 abs_path: ActiveValue::set(worktree.abs_path.clone()),
2269 root_name: ActiveValue::set(worktree.root_name.clone()),
2270 visible: ActiveValue::set(worktree.visible),
2271 scan_id: ActiveValue::set(0),
2272 completed_scan_id: ActiveValue::set(0),
2273 }
2274 }))
2275 .exec(&*tx)
2276 .await?;
2277 }
2278
2279 project_collaborator::ActiveModel {
2280 project_id: ActiveValue::set(project.id),
2281 connection_id: ActiveValue::set(connection.id as i32),
2282 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2283 user_id: ActiveValue::set(participant.user_id),
2284 replica_id: ActiveValue::set(ReplicaId(0)),
2285 is_host: ActiveValue::set(true),
2286 ..Default::default()
2287 }
2288 .insert(&*tx)
2289 .await?;
2290
2291 let room = self.get_room(room_id, &tx).await?;
2292 Ok((project.id, room))
2293 })
2294 .await
2295 }
2296
2297 pub async fn unshare_project(
2298 &self,
2299 project_id: ProjectId,
2300 connection: ConnectionId,
2301 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2302 let room_id = self.room_id_for_project(project_id).await?;
2303 self.room_transaction(room_id, |tx| async move {
2304 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2305
2306 let project = project::Entity::find_by_id(project_id)
2307 .one(&*tx)
2308 .await?
2309 .ok_or_else(|| anyhow!("project not found"))?;
2310 if project.host_connection()? == connection {
2311 project::Entity::delete(project.into_active_model())
2312 .exec(&*tx)
2313 .await?;
2314 let room = self.get_room(room_id, &tx).await?;
2315 Ok((room, guest_connection_ids))
2316 } else {
2317 Err(anyhow!("cannot unshare a project hosted by another user"))?
2318 }
2319 })
2320 .await
2321 }
2322
2323 pub async fn update_project(
2324 &self,
2325 project_id: ProjectId,
2326 connection: ConnectionId,
2327 worktrees: &[proto::WorktreeMetadata],
2328 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
2329 let room_id = self.room_id_for_project(project_id).await?;
2330 self.room_transaction(room_id, |tx| async move {
2331 let project = project::Entity::find_by_id(project_id)
2332 .filter(
2333 Condition::all()
2334 .add(project::Column::HostConnectionId.eq(connection.id as i32))
2335 .add(
2336 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2337 ),
2338 )
2339 .one(&*tx)
2340 .await?
2341 .ok_or_else(|| anyhow!("no such project"))?;
2342
2343 self.update_project_worktrees(project.id, worktrees, &tx)
2344 .await?;
2345
2346 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
2347 let room = self.get_room(project.room_id, &tx).await?;
2348 Ok((room, guest_connection_ids))
2349 })
2350 .await
2351 }
2352
2353 async fn update_project_worktrees(
2354 &self,
2355 project_id: ProjectId,
2356 worktrees: &[proto::WorktreeMetadata],
2357 tx: &DatabaseTransaction,
2358 ) -> Result<()> {
2359 if !worktrees.is_empty() {
2360 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
2361 id: ActiveValue::set(worktree.id as i64),
2362 project_id: ActiveValue::set(project_id),
2363 abs_path: ActiveValue::set(worktree.abs_path.clone()),
2364 root_name: ActiveValue::set(worktree.root_name.clone()),
2365 visible: ActiveValue::set(worktree.visible),
2366 scan_id: ActiveValue::set(0),
2367 completed_scan_id: ActiveValue::set(0),
2368 }))
2369 .on_conflict(
2370 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
2371 .update_column(worktree::Column::RootName)
2372 .to_owned(),
2373 )
2374 .exec(&*tx)
2375 .await?;
2376 }
2377
2378 worktree::Entity::delete_many()
2379 .filter(worktree::Column::ProjectId.eq(project_id).and(
2380 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
2381 ))
2382 .exec(&*tx)
2383 .await?;
2384
2385 Ok(())
2386 }
2387
2388 pub async fn update_worktree(
2389 &self,
2390 update: &proto::UpdateWorktree,
2391 connection: ConnectionId,
2392 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2393 let project_id = ProjectId::from_proto(update.project_id);
2394 let worktree_id = update.worktree_id as i64;
2395 let room_id = self.room_id_for_project(project_id).await?;
2396 self.room_transaction(room_id, |tx| async move {
2397 // Ensure the update comes from the host.
2398 let _project = project::Entity::find_by_id(project_id)
2399 .filter(
2400 Condition::all()
2401 .add(project::Column::HostConnectionId.eq(connection.id as i32))
2402 .add(
2403 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
2404 ),
2405 )
2406 .one(&*tx)
2407 .await?
2408 .ok_or_else(|| anyhow!("no such project"))?;
2409
2410 // Update metadata.
2411 worktree::Entity::update(worktree::ActiveModel {
2412 id: ActiveValue::set(worktree_id),
2413 project_id: ActiveValue::set(project_id),
2414 root_name: ActiveValue::set(update.root_name.clone()),
2415 scan_id: ActiveValue::set(update.scan_id as i64),
2416 completed_scan_id: if update.is_last_update {
2417 ActiveValue::set(update.scan_id as i64)
2418 } else {
2419 ActiveValue::default()
2420 },
2421 abs_path: ActiveValue::set(update.abs_path.clone()),
2422 ..Default::default()
2423 })
2424 .exec(&*tx)
2425 .await?;
2426
2427 if !update.updated_entries.is_empty() {
2428 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
2429 let mtime = entry.mtime.clone().unwrap_or_default();
2430 worktree_entry::ActiveModel {
2431 project_id: ActiveValue::set(project_id),
2432 worktree_id: ActiveValue::set(worktree_id),
2433 id: ActiveValue::set(entry.id as i64),
2434 is_dir: ActiveValue::set(entry.is_dir),
2435 path: ActiveValue::set(entry.path.clone()),
2436 inode: ActiveValue::set(entry.inode as i64),
2437 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2438 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2439 is_symlink: ActiveValue::set(entry.is_symlink),
2440 is_ignored: ActiveValue::set(entry.is_ignored),
2441 is_external: ActiveValue::set(entry.is_external),
2442 git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
2443 is_deleted: ActiveValue::set(false),
2444 scan_id: ActiveValue::set(update.scan_id as i64),
2445 }
2446 }))
2447 .on_conflict(
2448 OnConflict::columns([
2449 worktree_entry::Column::ProjectId,
2450 worktree_entry::Column::WorktreeId,
2451 worktree_entry::Column::Id,
2452 ])
2453 .update_columns([
2454 worktree_entry::Column::IsDir,
2455 worktree_entry::Column::Path,
2456 worktree_entry::Column::Inode,
2457 worktree_entry::Column::MtimeSeconds,
2458 worktree_entry::Column::MtimeNanos,
2459 worktree_entry::Column::IsSymlink,
2460 worktree_entry::Column::IsIgnored,
2461 worktree_entry::Column::GitStatus,
2462 worktree_entry::Column::ScanId,
2463 ])
2464 .to_owned(),
2465 )
2466 .exec(&*tx)
2467 .await?;
2468 }
2469
2470 if !update.removed_entries.is_empty() {
2471 worktree_entry::Entity::update_many()
2472 .filter(
2473 worktree_entry::Column::ProjectId
2474 .eq(project_id)
2475 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2476 .and(
2477 worktree_entry::Column::Id
2478 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2479 ),
2480 )
2481 .set(worktree_entry::ActiveModel {
2482 is_deleted: ActiveValue::Set(true),
2483 scan_id: ActiveValue::Set(update.scan_id as i64),
2484 ..Default::default()
2485 })
2486 .exec(&*tx)
2487 .await?;
2488 }
2489
2490 if !update.updated_repositories.is_empty() {
2491 worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
2492 |repository| worktree_repository::ActiveModel {
2493 project_id: ActiveValue::set(project_id),
2494 worktree_id: ActiveValue::set(worktree_id),
2495 work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
2496 scan_id: ActiveValue::set(update.scan_id as i64),
2497 branch: ActiveValue::set(repository.branch.clone()),
2498 is_deleted: ActiveValue::set(false),
2499 },
2500 ))
2501 .on_conflict(
2502 OnConflict::columns([
2503 worktree_repository::Column::ProjectId,
2504 worktree_repository::Column::WorktreeId,
2505 worktree_repository::Column::WorkDirectoryId,
2506 ])
2507 .update_columns([
2508 worktree_repository::Column::ScanId,
2509 worktree_repository::Column::Branch,
2510 ])
2511 .to_owned(),
2512 )
2513 .exec(&*tx)
2514 .await?;
2515 }
2516
2517 if !update.removed_repositories.is_empty() {
2518 worktree_repository::Entity::update_many()
2519 .filter(
2520 worktree_repository::Column::ProjectId
2521 .eq(project_id)
2522 .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
2523 .and(
2524 worktree_repository::Column::WorkDirectoryId
2525 .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
2526 ),
2527 )
2528 .set(worktree_repository::ActiveModel {
2529 is_deleted: ActiveValue::Set(true),
2530 scan_id: ActiveValue::Set(update.scan_id as i64),
2531 ..Default::default()
2532 })
2533 .exec(&*tx)
2534 .await?;
2535 }
2536
2537 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2538 Ok(connection_ids)
2539 })
2540 .await
2541 }
2542
2543 pub async fn update_diagnostic_summary(
2544 &self,
2545 update: &proto::UpdateDiagnosticSummary,
2546 connection: ConnectionId,
2547 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2548 let project_id = ProjectId::from_proto(update.project_id);
2549 let worktree_id = update.worktree_id as i64;
2550 let room_id = self.room_id_for_project(project_id).await?;
2551 self.room_transaction(room_id, |tx| async move {
2552 let summary = update
2553 .summary
2554 .as_ref()
2555 .ok_or_else(|| anyhow!("invalid summary"))?;
2556
2557 // Ensure the update comes from the host.
2558 let project = project::Entity::find_by_id(project_id)
2559 .one(&*tx)
2560 .await?
2561 .ok_or_else(|| anyhow!("no such project"))?;
2562 if project.host_connection()? != connection {
2563 return Err(anyhow!("can't update a project hosted by someone else"))?;
2564 }
2565
2566 // Update summary.
2567 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2568 project_id: ActiveValue::set(project_id),
2569 worktree_id: ActiveValue::set(worktree_id),
2570 path: ActiveValue::set(summary.path.clone()),
2571 language_server_id: ActiveValue::set(summary.language_server_id as i64),
2572 error_count: ActiveValue::set(summary.error_count as i32),
2573 warning_count: ActiveValue::set(summary.warning_count as i32),
2574 ..Default::default()
2575 })
2576 .on_conflict(
2577 OnConflict::columns([
2578 worktree_diagnostic_summary::Column::ProjectId,
2579 worktree_diagnostic_summary::Column::WorktreeId,
2580 worktree_diagnostic_summary::Column::Path,
2581 ])
2582 .update_columns([
2583 worktree_diagnostic_summary::Column::LanguageServerId,
2584 worktree_diagnostic_summary::Column::ErrorCount,
2585 worktree_diagnostic_summary::Column::WarningCount,
2586 ])
2587 .to_owned(),
2588 )
2589 .exec(&*tx)
2590 .await?;
2591
2592 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2593 Ok(connection_ids)
2594 })
2595 .await
2596 }
2597
2598 pub async fn start_language_server(
2599 &self,
2600 update: &proto::StartLanguageServer,
2601 connection: ConnectionId,
2602 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2603 let project_id = ProjectId::from_proto(update.project_id);
2604 let room_id = self.room_id_for_project(project_id).await?;
2605 self.room_transaction(room_id, |tx| async move {
2606 let server = update
2607 .server
2608 .as_ref()
2609 .ok_or_else(|| anyhow!("invalid language server"))?;
2610
2611 // Ensure the update comes from the host.
2612 let project = project::Entity::find_by_id(project_id)
2613 .one(&*tx)
2614 .await?
2615 .ok_or_else(|| anyhow!("no such project"))?;
2616 if project.host_connection()? != connection {
2617 return Err(anyhow!("can't update a project hosted by someone else"))?;
2618 }
2619
2620 // Add the newly-started language server.
2621 language_server::Entity::insert(language_server::ActiveModel {
2622 project_id: ActiveValue::set(project_id),
2623 id: ActiveValue::set(server.id as i64),
2624 name: ActiveValue::set(server.name.clone()),
2625 ..Default::default()
2626 })
2627 .on_conflict(
2628 OnConflict::columns([
2629 language_server::Column::ProjectId,
2630 language_server::Column::Id,
2631 ])
2632 .update_column(language_server::Column::Name)
2633 .to_owned(),
2634 )
2635 .exec(&*tx)
2636 .await?;
2637
2638 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2639 Ok(connection_ids)
2640 })
2641 .await
2642 }
2643
2644 pub async fn update_worktree_settings(
2645 &self,
2646 update: &proto::UpdateWorktreeSettings,
2647 connection: ConnectionId,
2648 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2649 let project_id = ProjectId::from_proto(update.project_id);
2650 let room_id = self.room_id_for_project(project_id).await?;
2651 self.room_transaction(room_id, |tx| async move {
2652 // Ensure the update comes from the host.
2653 let project = project::Entity::find_by_id(project_id)
2654 .one(&*tx)
2655 .await?
2656 .ok_or_else(|| anyhow!("no such project"))?;
2657 if project.host_connection()? != connection {
2658 return Err(anyhow!("can't update a project hosted by someone else"))?;
2659 }
2660
2661 if let Some(content) = &update.content {
2662 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
2663 project_id: ActiveValue::Set(project_id),
2664 worktree_id: ActiveValue::Set(update.worktree_id as i64),
2665 path: ActiveValue::Set(update.path.clone()),
2666 content: ActiveValue::Set(content.clone()),
2667 })
2668 .on_conflict(
2669 OnConflict::columns([
2670 worktree_settings_file::Column::ProjectId,
2671 worktree_settings_file::Column::WorktreeId,
2672 worktree_settings_file::Column::Path,
2673 ])
2674 .update_column(worktree_settings_file::Column::Content)
2675 .to_owned(),
2676 )
2677 .exec(&*tx)
2678 .await?;
2679 } else {
2680 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
2681 project_id: ActiveValue::Set(project_id),
2682 worktree_id: ActiveValue::Set(update.worktree_id as i64),
2683 path: ActiveValue::Set(update.path.clone()),
2684 ..Default::default()
2685 })
2686 .exec(&*tx)
2687 .await?;
2688 }
2689
2690 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2691 Ok(connection_ids)
2692 })
2693 .await
2694 }
2695
2696 pub async fn join_project(
2697 &self,
2698 project_id: ProjectId,
2699 connection: ConnectionId,
2700 ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2701 let room_id = self.room_id_for_project(project_id).await?;
2702 self.room_transaction(room_id, |tx| async move {
2703 let participant = room_participant::Entity::find()
2704 .filter(
2705 Condition::all()
2706 .add(
2707 room_participant::Column::AnsweringConnectionId
2708 .eq(connection.id as i32),
2709 )
2710 .add(
2711 room_participant::Column::AnsweringConnectionServerId
2712 .eq(connection.owner_id as i32),
2713 ),
2714 )
2715 .one(&*tx)
2716 .await?
2717 .ok_or_else(|| anyhow!("must join a room first"))?;
2718
2719 let project = project::Entity::find_by_id(project_id)
2720 .one(&*tx)
2721 .await?
2722 .ok_or_else(|| anyhow!("no such project"))?;
2723 if project.room_id != participant.room_id {
2724 return Err(anyhow!("no such project"))?;
2725 }
2726
2727 let mut collaborators = project
2728 .find_related(project_collaborator::Entity)
2729 .all(&*tx)
2730 .await?;
2731 let replica_ids = collaborators
2732 .iter()
2733 .map(|c| c.replica_id)
2734 .collect::<HashSet<_>>();
2735 let mut replica_id = ReplicaId(1);
2736 while replica_ids.contains(&replica_id) {
2737 replica_id.0 += 1;
2738 }
2739 let new_collaborator = project_collaborator::ActiveModel {
2740 project_id: ActiveValue::set(project_id),
2741 connection_id: ActiveValue::set(connection.id as i32),
2742 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
2743 user_id: ActiveValue::set(participant.user_id),
2744 replica_id: ActiveValue::set(replica_id),
2745 is_host: ActiveValue::set(false),
2746 ..Default::default()
2747 }
2748 .insert(&*tx)
2749 .await?;
2750 collaborators.push(new_collaborator);
2751
2752 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2753 let mut worktrees = db_worktrees
2754 .into_iter()
2755 .map(|db_worktree| {
2756 (
2757 db_worktree.id as u64,
2758 Worktree {
2759 id: db_worktree.id as u64,
2760 abs_path: db_worktree.abs_path,
2761 root_name: db_worktree.root_name,
2762 visible: db_worktree.visible,
2763 entries: Default::default(),
2764 repository_entries: Default::default(),
2765 diagnostic_summaries: Default::default(),
2766 settings_files: Default::default(),
2767 scan_id: db_worktree.scan_id as u64,
2768 completed_scan_id: db_worktree.completed_scan_id as u64,
2769 },
2770 )
2771 })
2772 .collect::<BTreeMap<_, _>>();
2773
2774 // Populate worktree entries.
2775 {
2776 let mut db_entries = worktree_entry::Entity::find()
2777 .filter(
2778 Condition::all()
2779 .add(worktree_entry::Column::ProjectId.eq(project_id))
2780 .add(worktree_entry::Column::IsDeleted.eq(false)),
2781 )
2782 .stream(&*tx)
2783 .await?;
2784 while let Some(db_entry) = db_entries.next().await {
2785 let db_entry = db_entry?;
2786 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2787 worktree.entries.push(proto::Entry {
2788 id: db_entry.id as u64,
2789 is_dir: db_entry.is_dir,
2790 path: db_entry.path,
2791 inode: db_entry.inode as u64,
2792 mtime: Some(proto::Timestamp {
2793 seconds: db_entry.mtime_seconds as u64,
2794 nanos: db_entry.mtime_nanos as u32,
2795 }),
2796 is_symlink: db_entry.is_symlink,
2797 is_ignored: db_entry.is_ignored,
2798 is_external: db_entry.is_external,
2799 git_status: db_entry.git_status.map(|status| status as i32),
2800 });
2801 }
2802 }
2803 }
2804
2805 // Populate repository entries.
2806 {
2807 let mut db_repository_entries = worktree_repository::Entity::find()
2808 .filter(
2809 Condition::all()
2810 .add(worktree_repository::Column::ProjectId.eq(project_id))
2811 .add(worktree_repository::Column::IsDeleted.eq(false)),
2812 )
2813 .stream(&*tx)
2814 .await?;
2815 while let Some(db_repository_entry) = db_repository_entries.next().await {
2816 let db_repository_entry = db_repository_entry?;
2817 if let Some(worktree) =
2818 worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
2819 {
2820 worktree.repository_entries.insert(
2821 db_repository_entry.work_directory_id as u64,
2822 proto::RepositoryEntry {
2823 work_directory_id: db_repository_entry.work_directory_id as u64,
2824 branch: db_repository_entry.branch,
2825 },
2826 );
2827 }
2828 }
2829 }
2830
2831 // Populate worktree diagnostic summaries.
2832 {
2833 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2834 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2835 .stream(&*tx)
2836 .await?;
2837 while let Some(db_summary) = db_summaries.next().await {
2838 let db_summary = db_summary?;
2839 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2840 worktree
2841 .diagnostic_summaries
2842 .push(proto::DiagnosticSummary {
2843 path: db_summary.path,
2844 language_server_id: db_summary.language_server_id as u64,
2845 error_count: db_summary.error_count as u32,
2846 warning_count: db_summary.warning_count as u32,
2847 });
2848 }
2849 }
2850 }
2851
2852 // Populate worktree settings files
2853 {
2854 let mut db_settings_files = worktree_settings_file::Entity::find()
2855 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
2856 .stream(&*tx)
2857 .await?;
2858 while let Some(db_settings_file) = db_settings_files.next().await {
2859 let db_settings_file = db_settings_file?;
2860 if let Some(worktree) =
2861 worktrees.get_mut(&(db_settings_file.worktree_id as u64))
2862 {
2863 worktree.settings_files.push(WorktreeSettingsFile {
2864 path: db_settings_file.path,
2865 content: db_settings_file.content,
2866 });
2867 }
2868 }
2869 }
2870
2871 // Populate language servers.
2872 let language_servers = project
2873 .find_related(language_server::Entity)
2874 .all(&*tx)
2875 .await?;
2876
2877 let project = Project {
2878 collaborators: collaborators
2879 .into_iter()
2880 .map(|collaborator| ProjectCollaborator {
2881 connection_id: collaborator.connection(),
2882 user_id: collaborator.user_id,
2883 replica_id: collaborator.replica_id,
2884 is_host: collaborator.is_host,
2885 })
2886 .collect(),
2887 worktrees,
2888 language_servers: language_servers
2889 .into_iter()
2890 .map(|language_server| proto::LanguageServer {
2891 id: language_server.id as u64,
2892 name: language_server.name,
2893 })
2894 .collect(),
2895 };
2896 Ok((project, replica_id as ReplicaId))
2897 })
2898 .await
2899 }
2900
2901 pub async fn leave_project(
2902 &self,
2903 project_id: ProjectId,
2904 connection: ConnectionId,
2905 ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
2906 let room_id = self.room_id_for_project(project_id).await?;
2907 self.room_transaction(room_id, |tx| async move {
2908 let result = project_collaborator::Entity::delete_many()
2909 .filter(
2910 Condition::all()
2911 .add(project_collaborator::Column::ProjectId.eq(project_id))
2912 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2913 .add(
2914 project_collaborator::Column::ConnectionServerId
2915 .eq(connection.owner_id as i32),
2916 ),
2917 )
2918 .exec(&*tx)
2919 .await?;
2920 if result.rows_affected == 0 {
2921 Err(anyhow!("not a collaborator on this project"))?;
2922 }
2923
2924 let project = project::Entity::find_by_id(project_id)
2925 .one(&*tx)
2926 .await?
2927 .ok_or_else(|| anyhow!("no such project"))?;
2928 let collaborators = project
2929 .find_related(project_collaborator::Entity)
2930 .all(&*tx)
2931 .await?;
2932 let connection_ids = collaborators
2933 .into_iter()
2934 .map(|collaborator| collaborator.connection())
2935 .collect();
2936
2937 follower::Entity::delete_many()
2938 .filter(
2939 Condition::any()
2940 .add(
2941 Condition::all()
2942 .add(follower::Column::ProjectId.eq(project_id))
2943 .add(
2944 follower::Column::LeaderConnectionServerId
2945 .eq(connection.owner_id),
2946 )
2947 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
2948 )
2949 .add(
2950 Condition::all()
2951 .add(follower::Column::ProjectId.eq(project_id))
2952 .add(
2953 follower::Column::FollowerConnectionServerId
2954 .eq(connection.owner_id),
2955 )
2956 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
2957 ),
2958 )
2959 .exec(&*tx)
2960 .await?;
2961
2962 let room = self.get_room(project.room_id, &tx).await?;
2963 let left_project = LeftProject {
2964 id: project_id,
2965 host_user_id: project.host_user_id,
2966 host_connection_id: project.host_connection()?,
2967 connection_ids,
2968 };
2969 Ok((room, left_project))
2970 })
2971 .await
2972 }
2973
2974 pub async fn project_collaborators(
2975 &self,
2976 project_id: ProjectId,
2977 connection_id: ConnectionId,
2978 ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
2979 let room_id = self.room_id_for_project(project_id).await?;
2980 self.room_transaction(room_id, |tx| async move {
2981 let collaborators = project_collaborator::Entity::find()
2982 .filter(project_collaborator::Column::ProjectId.eq(project_id))
2983 .all(&*tx)
2984 .await?
2985 .into_iter()
2986 .map(|collaborator| ProjectCollaborator {
2987 connection_id: collaborator.connection(),
2988 user_id: collaborator.user_id,
2989 replica_id: collaborator.replica_id,
2990 is_host: collaborator.is_host,
2991 })
2992 .collect::<Vec<_>>();
2993
2994 if collaborators
2995 .iter()
2996 .any(|collaborator| collaborator.connection_id == connection_id)
2997 {
2998 Ok(collaborators)
2999 } else {
3000 Err(anyhow!("no such project"))?
3001 }
3002 })
3003 .await
3004 }
3005
3006 pub async fn project_connection_ids(
3007 &self,
3008 project_id: ProjectId,
3009 connection_id: ConnectionId,
3010 ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
3011 let room_id = self.room_id_for_project(project_id).await?;
3012 self.room_transaction(room_id, |tx| async move {
3013 let mut collaborators = project_collaborator::Entity::find()
3014 .filter(project_collaborator::Column::ProjectId.eq(project_id))
3015 .stream(&*tx)
3016 .await?;
3017
3018 let mut connection_ids = HashSet::default();
3019 while let Some(collaborator) = collaborators.next().await {
3020 let collaborator = collaborator?;
3021 connection_ids.insert(collaborator.connection());
3022 }
3023
3024 if connection_ids.contains(&connection_id) {
3025 Ok(connection_ids)
3026 } else {
3027 Err(anyhow!("no such project"))?
3028 }
3029 })
3030 .await
3031 }
3032
3033 async fn project_guest_connection_ids(
3034 &self,
3035 project_id: ProjectId,
3036 tx: &DatabaseTransaction,
3037 ) -> Result<Vec<ConnectionId>> {
3038 let mut collaborators = project_collaborator::Entity::find()
3039 .filter(
3040 project_collaborator::Column::ProjectId
3041 .eq(project_id)
3042 .and(project_collaborator::Column::IsHost.eq(false)),
3043 )
3044 .stream(tx)
3045 .await?;
3046
3047 let mut guest_connection_ids = Vec::new();
3048 while let Some(collaborator) = collaborators.next().await {
3049 let collaborator = collaborator?;
3050 guest_connection_ids.push(collaborator.connection());
3051 }
3052 Ok(guest_connection_ids)
3053 }
3054
3055 async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
3056 self.transaction(|tx| async move {
3057 let project = project::Entity::find_by_id(project_id)
3058 .one(&*tx)
3059 .await?
3060 .ok_or_else(|| anyhow!("project {} not found", project_id))?;
3061 Ok(project.room_id)
3062 })
3063 .await
3064 }
3065
3066 // access tokens
3067
3068 pub async fn create_access_token(
3069 &self,
3070 user_id: UserId,
3071 access_token_hash: &str,
3072 max_access_token_count: usize,
3073 ) -> Result<AccessTokenId> {
3074 self.transaction(|tx| async {
3075 let tx = tx;
3076
3077 let token = access_token::ActiveModel {
3078 user_id: ActiveValue::set(user_id),
3079 hash: ActiveValue::set(access_token_hash.into()),
3080 ..Default::default()
3081 }
3082 .insert(&*tx)
3083 .await?;
3084
3085 access_token::Entity::delete_many()
3086 .filter(
3087 access_token::Column::Id.in_subquery(
3088 Query::select()
3089 .column(access_token::Column::Id)
3090 .from(access_token::Entity)
3091 .and_where(access_token::Column::UserId.eq(user_id))
3092 .order_by(access_token::Column::Id, sea_orm::Order::Desc)
3093 .limit(10000)
3094 .offset(max_access_token_count as u64)
3095 .to_owned(),
3096 ),
3097 )
3098 .exec(&*tx)
3099 .await?;
3100 Ok(token.id)
3101 })
3102 .await
3103 }
3104
3105 pub async fn get_access_token(
3106 &self,
3107 access_token_id: AccessTokenId,
3108 ) -> Result<access_token::Model> {
3109 self.transaction(|tx| async move {
3110 Ok(access_token::Entity::find_by_id(access_token_id)
3111 .one(&*tx)
3112 .await?
3113 .ok_or_else(|| anyhow!("no such access token"))?)
3114 })
3115 .await
3116 }
3117
3118 // channels
3119
3120 pub async fn create_root_channel(
3121 &self,
3122 name: &str,
3123 live_kit_room: &str,
3124 creator_id: UserId,
3125 ) -> Result<ChannelId> {
3126 self.create_channel(name, None, live_kit_room, creator_id)
3127 .await
3128 }
3129
3130 pub async fn create_channel(
3131 &self,
3132 name: &str,
3133 parent: Option<ChannelId>,
3134 live_kit_room: &str,
3135 creator_id: UserId,
3136 ) -> Result<ChannelId> {
3137 self.transaction(move |tx| async move {
3138 let tx = tx;
3139
3140 if let Some(parent) = parent {
3141 let channels = self.get_channel_ancestors(parent, &*tx).await?;
3142 channel_member::Entity::find()
3143 .filter(channel_member::Column::ChannelId.is_in(channels.iter().copied()))
3144 .filter(
3145 channel_member::Column::UserId
3146 .eq(creator_id)
3147 .and(channel_member::Column::Accepted.eq(true)),
3148 )
3149 .one(&*tx)
3150 .await?
3151 .ok_or_else(|| {
3152 anyhow!("User does not have the permissions to create this channel")
3153 })?;
3154 }
3155
3156 let channel = channel::ActiveModel {
3157 name: ActiveValue::Set(name.to_string()),
3158 ..Default::default()
3159 };
3160
3161 let channel = channel.insert(&*tx).await?;
3162
3163 if let Some(parent) = parent {
3164 channel_parent::ActiveModel {
3165 child_id: ActiveValue::Set(channel.id),
3166 parent_id: ActiveValue::Set(parent),
3167 }
3168 .insert(&*tx)
3169 .await?;
3170 }
3171
3172 channel_member::ActiveModel {
3173 channel_id: ActiveValue::Set(channel.id),
3174 user_id: ActiveValue::Set(creator_id),
3175 accepted: ActiveValue::Set(true),
3176 admin: ActiveValue::Set(true),
3177 ..Default::default()
3178 }
3179 .insert(&*tx)
3180 .await?;
3181
3182 room::ActiveModel {
3183 channel_id: ActiveValue::Set(Some(channel.id)),
3184 live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
3185 ..Default::default()
3186 }
3187 .insert(&*tx)
3188 .await?;
3189
3190 Ok(channel.id)
3191 })
3192 .await
3193 }
3194
3195 pub async fn remove_channel(
3196 &self,
3197 channel_id: ChannelId,
3198 user_id: UserId,
3199 ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
3200 self.transaction(move |tx| async move {
3201 let tx = tx;
3202
3203 // Check if user is an admin
3204 channel_member::Entity::find()
3205 .filter(
3206 channel_member::Column::ChannelId
3207 .eq(channel_id)
3208 .and(channel_member::Column::UserId.eq(user_id))
3209 .and(channel_member::Column::Admin.eq(true)),
3210 )
3211 .one(&*tx)
3212 .await?
3213 .ok_or_else(|| anyhow!("user is not allowed to remove this channel"))?;
3214
3215 let mut descendants = self.get_channel_descendants([channel_id], &*tx).await?;
3216
3217 // Keep channels which have another active
3218 let mut channels_to_keep = channel_parent::Entity::find()
3219 .filter(
3220 channel_parent::Column::ChildId
3221 .is_in(descendants.keys().copied().filter(|&id| id != channel_id))
3222 .and(
3223 channel_parent::Column::ParentId.is_not_in(descendants.keys().copied()),
3224 ),
3225 )
3226 .stream(&*tx)
3227 .await?;
3228
3229 while let Some(row) = channels_to_keep.next().await {
3230 let row = row?;
3231 descendants.remove(&row.child_id);
3232 }
3233
3234 drop(channels_to_keep);
3235
3236 let channels_to_remove = descendants.keys().copied().collect::<Vec<_>>();
3237
3238 let members_to_notify: Vec<UserId> = channel_member::Entity::find()
3239 .filter(channel_member::Column::ChannelId.is_in(channels_to_remove.iter().copied()))
3240 .select_only()
3241 .column(channel_member::Column::UserId)
3242 .distinct()
3243 .into_values::<_, QueryUserIds>()
3244 .all(&*tx)
3245 .await?;
3246
3247 // Channel members and parents should delete via cascade
3248 channel::Entity::delete_many()
3249 .filter(channel::Column::Id.is_in(channels_to_remove.iter().copied()))
3250 .exec(&*tx)
3251 .await?;
3252
3253 Ok((channels_to_remove, members_to_notify))
3254 })
3255 .await
3256 }
3257
3258 pub async fn invite_channel_member(
3259 &self,
3260 channel_id: ChannelId,
3261 invitee_id: UserId,
3262 inviter_id: UserId,
3263 is_admin: bool,
3264 ) -> Result<()> {
3265 self.transaction(move |tx| async move {
3266 let tx = tx;
3267
3268 // Check if inviter is a member
3269 channel_member::Entity::find()
3270 .filter(
3271 channel_member::Column::ChannelId
3272 .eq(channel_id)
3273 .and(channel_member::Column::UserId.eq(inviter_id))
3274 .and(channel_member::Column::Admin.eq(true)),
3275 )
3276 .one(&*tx)
3277 .await?
3278 .ok_or_else(|| {
3279 anyhow!("Inviter does not have permissions to invite the invitee")
3280 })?;
3281
3282 let channel_membership = channel_member::ActiveModel {
3283 channel_id: ActiveValue::Set(channel_id),
3284 user_id: ActiveValue::Set(invitee_id),
3285 accepted: ActiveValue::Set(false),
3286 admin: ActiveValue::Set(is_admin),
3287 ..Default::default()
3288 };
3289
3290 channel_membership.insert(&*tx).await?;
3291
3292 Ok(())
3293 })
3294 .await
3295 }
3296
3297 pub async fn respond_to_channel_invite(
3298 &self,
3299 channel_id: ChannelId,
3300 user_id: UserId,
3301 accept: bool,
3302 ) -> Result<()> {
3303 self.transaction(move |tx| async move {
3304 let tx = tx;
3305
3306 let rows_affected = if accept {
3307 channel_member::Entity::update_many()
3308 .set(channel_member::ActiveModel {
3309 accepted: ActiveValue::Set(accept),
3310 ..Default::default()
3311 })
3312 .filter(
3313 channel_member::Column::ChannelId
3314 .eq(channel_id)
3315 .and(channel_member::Column::UserId.eq(user_id))
3316 .and(channel_member::Column::Accepted.eq(false)),
3317 )
3318 .exec(&*tx)
3319 .await?
3320 .rows_affected
3321 } else {
3322 channel_member::ActiveModel {
3323 channel_id: ActiveValue::Unchanged(channel_id),
3324 user_id: ActiveValue::Unchanged(user_id),
3325 ..Default::default()
3326 }
3327 .delete(&*tx)
3328 .await?
3329 .rows_affected
3330 };
3331
3332 if rows_affected == 0 {
3333 Err(anyhow!("no such invitation"))?;
3334 }
3335
3336 Ok(())
3337 })
3338 .await
3339 }
3340
3341 pub async fn get_channel_invites(&self, user_id: UserId) -> Result<Vec<Channel>> {
3342 self.transaction(|tx| async move {
3343 let tx = tx;
3344
3345 let channel_invites = channel_member::Entity::find()
3346 .filter(
3347 channel_member::Column::UserId
3348 .eq(user_id)
3349 .and(channel_member::Column::Accepted.eq(false)),
3350 )
3351 .all(&*tx)
3352 .await?;
3353
3354 let channels = channel::Entity::find()
3355 .filter(
3356 channel::Column::Id.is_in(
3357 channel_invites
3358 .into_iter()
3359 .map(|channel_member| channel_member.channel_id),
3360 ),
3361 )
3362 .all(&*tx)
3363 .await?;
3364
3365 let channels = channels
3366 .into_iter()
3367 .map(|channel| Channel {
3368 id: channel.id,
3369 name: channel.name,
3370 parent_id: None,
3371 })
3372 .collect();
3373
3374 Ok(channels)
3375 })
3376 .await
3377 }
3378
3379 pub async fn get_channels(
3380 &self,
3381 user_id: UserId,
3382 ) -> Result<(Vec<Channel>, HashMap<ChannelId, Vec<UserId>>)> {
3383 self.transaction(|tx| async move {
3384 let tx = tx;
3385
3386 let starting_channel_ids: Vec<ChannelId> = channel_member::Entity::find()
3387 .filter(
3388 channel_member::Column::UserId
3389 .eq(user_id)
3390 .and(channel_member::Column::Accepted.eq(true)),
3391 )
3392 .select_only()
3393 .column(channel_member::Column::ChannelId)
3394 .into_values::<_, QueryChannelIds>()
3395 .all(&*tx)
3396 .await?;
3397
3398 let parents_by_child_id = self
3399 .get_channel_descendants(starting_channel_ids, &*tx)
3400 .await?;
3401
3402 let mut channels = Vec::with_capacity(parents_by_child_id.len());
3403 let mut rows = channel::Entity::find()
3404 .filter(channel::Column::Id.is_in(parents_by_child_id.keys().copied()))
3405 .stream(&*tx)
3406 .await?;
3407
3408 while let Some(row) = rows.next().await {
3409 let row = row?;
3410 channels.push(Channel {
3411 id: row.id,
3412 name: row.name,
3413 parent_id: parents_by_child_id.get(&row.id).copied().flatten(),
3414 });
3415 }
3416
3417 drop(rows);
3418
3419 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
3420 enum QueryUserIdsAndChannelIds {
3421 ChannelId,
3422 UserId,
3423 }
3424
3425 let mut participants = room_participant::Entity::find()
3426 .inner_join(room::Entity)
3427 .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
3428 .select_only()
3429 .column(room::Column::ChannelId)
3430 .column(room_participant::Column::UserId)
3431 .into_values::<_, QueryUserIdsAndChannelIds>()
3432 .stream(&*tx)
3433 .await?;
3434
3435 let mut participant_map: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
3436 while let Some(row) = participants.next().await {
3437 let row: (ChannelId, UserId) = row?;
3438 participant_map.entry(row.0).or_default().push(row.1)
3439 }
3440
3441 drop(participants);
3442
3443 Ok((channels, participant_map))
3444 })
3445 .await
3446 }
3447
3448 pub async fn get_channel_members(&self, id: ChannelId) -> Result<Vec<UserId>> {
3449 self.transaction(|tx| async move {
3450 let tx = tx;
3451 let user_ids = self.get_channel_members_internal(id, &*tx).await?;
3452 Ok(user_ids)
3453 })
3454 .await
3455 }
3456
3457 pub async fn get_channel_members_internal(
3458 &self,
3459 id: ChannelId,
3460 tx: &DatabaseTransaction,
3461 ) -> Result<Vec<UserId>> {
3462 let ancestor_ids = self.get_channel_ancestors(id, tx).await?;
3463 let user_ids = channel_member::Entity::find()
3464 .distinct()
3465 .filter(channel_member::Column::ChannelId.is_in(ancestor_ids.iter().copied()))
3466 .select_only()
3467 .column(channel_member::Column::UserId)
3468 .into_values::<_, QueryUserIds>()
3469 .all(&*tx)
3470 .await?;
3471 Ok(user_ids)
3472 }
3473
3474 async fn get_channel_ancestors(
3475 &self,
3476 id: ChannelId,
3477 tx: &DatabaseTransaction,
3478 ) -> Result<Vec<ChannelId>> {
3479 let sql = format!(
3480 r#"
3481 WITH RECURSIVE channel_tree(child_id, parent_id) AS (
3482 SELECT CAST(NULL as INTEGER) as child_id, root_ids.column1 as parent_id
3483 FROM (VALUES ({})) as root_ids
3484 UNION
3485 SELECT channel_parents.child_id, channel_parents.parent_id
3486 FROM channel_parents, channel_tree
3487 WHERE channel_parents.child_id = channel_tree.parent_id
3488 )
3489 SELECT DISTINCT channel_tree.parent_id
3490 FROM channel_tree
3491 "#,
3492 id
3493 );
3494
3495 #[derive(FromQueryResult, Debug, PartialEq)]
3496 pub struct ChannelParent {
3497 pub parent_id: ChannelId,
3498 }
3499
3500 let stmt = Statement::from_string(self.pool.get_database_backend(), sql);
3501
3502 let mut channel_ids_stream = channel_parent::Entity::find()
3503 .from_raw_sql(stmt)
3504 .into_model::<ChannelParent>()
3505 .stream(&*tx)
3506 .await?;
3507
3508 let mut channel_ids = vec![];
3509 while let Some(channel_id) = channel_ids_stream.next().await {
3510 channel_ids.push(channel_id?.parent_id);
3511 }
3512
3513 Ok(channel_ids)
3514 }
3515
3516 async fn get_channel_descendants(
3517 &self,
3518 channel_ids: impl IntoIterator<Item = ChannelId>,
3519 tx: &DatabaseTransaction,
3520 ) -> Result<HashMap<ChannelId, Option<ChannelId>>> {
3521 let mut values = String::new();
3522 for id in channel_ids {
3523 if !values.is_empty() {
3524 values.push_str(", ");
3525 }
3526 write!(&mut values, "({})", id).unwrap();
3527 }
3528
3529 if values.is_empty() {
3530 return Ok(HashMap::default());
3531 }
3532
3533 let sql = format!(
3534 r#"
3535 WITH RECURSIVE channel_tree(child_id, parent_id) AS (
3536 SELECT root_ids.column1 as child_id, CAST(NULL as INTEGER) as parent_id
3537 FROM (VALUES {}) as root_ids
3538 UNION
3539 SELECT channel_parents.child_id, channel_parents.parent_id
3540 FROM channel_parents, channel_tree
3541 WHERE channel_parents.parent_id = channel_tree.child_id
3542 )
3543 SELECT channel_tree.child_id, channel_tree.parent_id
3544 FROM channel_tree
3545 ORDER BY child_id, parent_id IS NOT NULL
3546 "#,
3547 values
3548 );
3549
3550 #[derive(FromQueryResult, Debug, PartialEq)]
3551 pub struct ChannelParent {
3552 pub child_id: ChannelId,
3553 pub parent_id: Option<ChannelId>,
3554 }
3555
3556 let stmt = Statement::from_string(self.pool.get_database_backend(), sql);
3557
3558 let mut parents_by_child_id = HashMap::default();
3559 let mut parents = channel_parent::Entity::find()
3560 .from_raw_sql(stmt)
3561 .into_model::<ChannelParent>()
3562 .stream(tx)
3563 .await?;
3564
3565 while let Some(parent) = parents.next().await {
3566 let parent = parent?;
3567 parents_by_child_id.insert(parent.child_id, parent.parent_id);
3568 }
3569
3570 Ok(parents_by_child_id)
3571 }
3572
3573 pub async fn get_channel(&self, channel_id: ChannelId) -> Result<Option<Channel>> {
3574 self.transaction(|tx| async move {
3575 let tx = tx;
3576 let channel = channel::Entity::find_by_id(channel_id).one(&*tx).await?;
3577
3578 Ok(channel.map(|channel| Channel {
3579 id: channel.id,
3580 name: channel.name,
3581 parent_id: None,
3582 }))
3583 })
3584 .await
3585 }
3586
3587 pub async fn room_id_for_channel(&self, channel_id: ChannelId) -> Result<RoomId> {
3588 self.transaction(|tx| async move {
3589 let tx = tx;
3590 let room = channel::Model {
3591 id: channel_id,
3592 ..Default::default()
3593 }
3594 .find_related(room::Entity)
3595 .one(&*tx)
3596 .await?
3597 .ok_or_else(|| anyhow!("invalid channel"))?;
3598 Ok(room.id)
3599 })
3600 .await
3601 }
3602
3603 async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
3604 where
3605 F: Send + Fn(TransactionHandle) -> Fut,
3606 Fut: Send + Future<Output = Result<T>>,
3607 {
3608 let body = async {
3609 let mut i = 0;
3610 loop {
3611 let (tx, result) = self.with_transaction(&f).await?;
3612 match result {
3613 Ok(result) => match tx.commit().await.map_err(Into::into) {
3614 Ok(()) => return Ok(result),
3615 Err(error) => {
3616 if !self.retry_on_serialization_error(&error, i).await {
3617 return Err(error);
3618 }
3619 }
3620 },
3621 Err(error) => {
3622 tx.rollback().await?;
3623 if !self.retry_on_serialization_error(&error, i).await {
3624 return Err(error);
3625 }
3626 }
3627 }
3628 i += 1;
3629 }
3630 };
3631
3632 self.run(body).await
3633 }
3634
3635 async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
3636 where
3637 F: Send + Fn(TransactionHandle) -> Fut,
3638 Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
3639 {
3640 let body = async {
3641 let mut i = 0;
3642 loop {
3643 let (tx, result) = self.with_transaction(&f).await?;
3644 match result {
3645 Ok(Some((room_id, data))) => {
3646 let lock = self.rooms.entry(room_id).or_default().clone();
3647 let _guard = lock.lock_owned().await;
3648 match tx.commit().await.map_err(Into::into) {
3649 Ok(()) => {
3650 return Ok(Some(RoomGuard {
3651 data,
3652 _guard,
3653 _not_send: PhantomData,
3654 }));
3655 }
3656 Err(error) => {
3657 if !self.retry_on_serialization_error(&error, i).await {
3658 return Err(error);
3659 }
3660 }
3661 }
3662 }
3663 Ok(None) => match tx.commit().await.map_err(Into::into) {
3664 Ok(()) => return Ok(None),
3665 Err(error) => {
3666 if !self.retry_on_serialization_error(&error, i).await {
3667 return Err(error);
3668 }
3669 }
3670 },
3671 Err(error) => {
3672 tx.rollback().await?;
3673 if !self.retry_on_serialization_error(&error, i).await {
3674 return Err(error);
3675 }
3676 }
3677 }
3678 i += 1;
3679 }
3680 };
3681
3682 self.run(body).await
3683 }
3684
3685 async fn room_transaction<F, Fut, T>(&self, room_id: RoomId, f: F) -> Result<RoomGuard<T>>
3686 where
3687 F: Send + Fn(TransactionHandle) -> Fut,
3688 Fut: Send + Future<Output = Result<T>>,
3689 {
3690 let body = async {
3691 let mut i = 0;
3692 loop {
3693 let lock = self.rooms.entry(room_id).or_default().clone();
3694 let _guard = lock.lock_owned().await;
3695 let (tx, result) = self.with_transaction(&f).await?;
3696 match result {
3697 Ok(data) => match tx.commit().await.map_err(Into::into) {
3698 Ok(()) => {
3699 return Ok(RoomGuard {
3700 data,
3701 _guard,
3702 _not_send: PhantomData,
3703 });
3704 }
3705 Err(error) => {
3706 if !self.retry_on_serialization_error(&error, i).await {
3707 return Err(error);
3708 }
3709 }
3710 },
3711 Err(error) => {
3712 tx.rollback().await?;
3713 if !self.retry_on_serialization_error(&error, i).await {
3714 return Err(error);
3715 }
3716 }
3717 }
3718 i += 1;
3719 }
3720 };
3721
3722 self.run(body).await
3723 }
3724
3725 async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
3726 where
3727 F: Send + Fn(TransactionHandle) -> Fut,
3728 Fut: Send + Future<Output = Result<T>>,
3729 {
3730 let tx = self
3731 .pool
3732 .begin_with_config(Some(IsolationLevel::Serializable), None)
3733 .await?;
3734
3735 let mut tx = Arc::new(Some(tx));
3736 let result = f(TransactionHandle(tx.clone())).await;
3737 let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
3738 return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
3739 };
3740
3741 Ok((tx, result))
3742 }
3743
3744 async fn run<F, T>(&self, future: F) -> Result<T>
3745 where
3746 F: Future<Output = Result<T>>,
3747 {
3748 #[cfg(test)]
3749 {
3750 if let Executor::Deterministic(executor) = &self.executor {
3751 executor.simulate_random_delay().await;
3752 }
3753
3754 self.runtime.as_ref().unwrap().block_on(future)
3755 }
3756
3757 #[cfg(not(test))]
3758 {
3759 future.await
3760 }
3761 }
3762
3763 async fn retry_on_serialization_error(&self, error: &Error, prev_attempt_count: u32) -> bool {
3764 // If the error is due to a failure to serialize concurrent transactions, then retry
3765 // this transaction after a delay. With each subsequent retry, double the delay duration.
3766 // Also vary the delay randomly in order to ensure different database connections retry
3767 // at different times.
3768 if is_serialization_error(error) {
3769 let base_delay = 4_u64 << prev_attempt_count.min(16);
3770 let randomized_delay = base_delay as f32 * self.rng.lock().await.gen_range(0.5..=2.0);
3771 log::info!(
3772 "retrying transaction after serialization error. delay: {} ms.",
3773 randomized_delay
3774 );
3775 self.executor
3776 .sleep(Duration::from_millis(randomized_delay as u64))
3777 .await;
3778 true
3779 } else {
3780 false
3781 }
3782 }
3783}
3784
3785fn is_serialization_error(error: &Error) -> bool {
3786 const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
3787 match error {
3788 Error::Database(
3789 DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
3790 | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
3791 ) if error
3792 .as_database_error()
3793 .and_then(|error| error.code())
3794 .as_deref()
3795 == Some(SERIALIZATION_FAILURE_CODE) =>
3796 {
3797 true
3798 }
3799 _ => false,
3800 }
3801}
3802
3803struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
3804
3805impl Deref for TransactionHandle {
3806 type Target = DatabaseTransaction;
3807
3808 fn deref(&self) -> &Self::Target {
3809 self.0.as_ref().as_ref().unwrap()
3810 }
3811}
3812
3813pub struct RoomGuard<T> {
3814 data: T,
3815 _guard: OwnedMutexGuard<()>,
3816 _not_send: PhantomData<Rc<()>>,
3817}
3818
3819impl<T> Deref for RoomGuard<T> {
3820 type Target = T;
3821
3822 fn deref(&self) -> &T {
3823 &self.data
3824 }
3825}
3826
3827impl<T> DerefMut for RoomGuard<T> {
3828 fn deref_mut(&mut self) -> &mut T {
3829 &mut self.data
3830 }
3831}
3832
3833#[derive(Debug, Serialize, Deserialize)]
3834pub struct NewUserParams {
3835 pub github_login: String,
3836 pub github_user_id: i32,
3837 pub invite_count: i32,
3838}
3839
3840#[derive(Debug)]
3841pub struct NewUserResult {
3842 pub user_id: UserId,
3843 pub metrics_id: String,
3844 pub inviting_user_id: Option<UserId>,
3845 pub signup_device_id: Option<String>,
3846}
3847
3848#[derive(FromQueryResult, Debug, PartialEq)]
3849pub struct Channel {
3850 pub id: ChannelId,
3851 pub name: String,
3852 pub parent_id: Option<ChannelId>,
3853}
3854
3855fn random_invite_code() -> String {
3856 nanoid::nanoid!(16)
3857}
3858
3859fn random_email_confirmation_code() -> String {
3860 nanoid::nanoid!(64)
3861}
3862
3863macro_rules! id_type {
3864 ($name:ident) => {
3865 #[derive(
3866 Clone,
3867 Copy,
3868 Debug,
3869 Default,
3870 PartialEq,
3871 Eq,
3872 PartialOrd,
3873 Ord,
3874 Hash,
3875 Serialize,
3876 Deserialize,
3877 )]
3878 #[serde(transparent)]
3879 pub struct $name(pub i32);
3880
3881 impl $name {
3882 #[allow(unused)]
3883 pub const MAX: Self = Self(i32::MAX);
3884
3885 #[allow(unused)]
3886 pub fn from_proto(value: u64) -> Self {
3887 Self(value as i32)
3888 }
3889
3890 #[allow(unused)]
3891 pub fn to_proto(self) -> u64 {
3892 self.0 as u64
3893 }
3894 }
3895
3896 impl std::fmt::Display for $name {
3897 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
3898 self.0.fmt(f)
3899 }
3900 }
3901
3902 impl From<$name> for sea_query::Value {
3903 fn from(value: $name) -> Self {
3904 sea_query::Value::Int(Some(value.0))
3905 }
3906 }
3907
3908 impl sea_orm::TryGetable for $name {
3909 fn try_get(
3910 res: &sea_orm::QueryResult,
3911 pre: &str,
3912 col: &str,
3913 ) -> Result<Self, sea_orm::TryGetError> {
3914 Ok(Self(i32::try_get(res, pre, col)?))
3915 }
3916 }
3917
3918 impl sea_query::ValueType for $name {
3919 fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
3920 match v {
3921 Value::TinyInt(Some(int)) => {
3922 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3923 }
3924 Value::SmallInt(Some(int)) => {
3925 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3926 }
3927 Value::Int(Some(int)) => {
3928 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3929 }
3930 Value::BigInt(Some(int)) => {
3931 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3932 }
3933 Value::TinyUnsigned(Some(int)) => {
3934 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3935 }
3936 Value::SmallUnsigned(Some(int)) => {
3937 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3938 }
3939 Value::Unsigned(Some(int)) => {
3940 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3941 }
3942 Value::BigUnsigned(Some(int)) => {
3943 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
3944 }
3945 _ => Err(sea_query::ValueTypeErr),
3946 }
3947 }
3948
3949 fn type_name() -> String {
3950 stringify!($name).into()
3951 }
3952
3953 fn array_type() -> sea_query::ArrayType {
3954 sea_query::ArrayType::Int
3955 }
3956
3957 fn column_type() -> sea_query::ColumnType {
3958 sea_query::ColumnType::Integer(None)
3959 }
3960 }
3961
3962 impl sea_orm::TryFromU64 for $name {
3963 fn try_from_u64(n: u64) -> Result<Self, DbErr> {
3964 Ok(Self(n.try_into().map_err(|_| {
3965 DbErr::ConvertFromU64(concat!(
3966 "error converting ",
3967 stringify!($name),
3968 " to u64"
3969 ))
3970 })?))
3971 }
3972 }
3973
3974 impl sea_query::Nullable for $name {
3975 fn null() -> Value {
3976 Value::Int(None)
3977 }
3978 }
3979 };
3980}
3981
3982id_type!(AccessTokenId);
3983id_type!(ChannelId);
3984id_type!(ChannelMemberId);
3985id_type!(ContactId);
3986id_type!(FollowerId);
3987id_type!(RoomId);
3988id_type!(RoomParticipantId);
3989id_type!(ProjectId);
3990id_type!(ProjectCollaboratorId);
3991id_type!(ReplicaId);
3992id_type!(ServerId);
3993id_type!(SignupId);
3994id_type!(UserId);
3995
3996#[derive(Clone)]
3997pub struct JoinRoom {
3998 pub room: proto::Room,
3999 pub channel_id: Option<ChannelId>,
4000 pub channel_members: Vec<UserId>,
4001}
4002
4003pub struct RejoinedRoom {
4004 pub room: proto::Room,
4005 pub rejoined_projects: Vec<RejoinedProject>,
4006 pub reshared_projects: Vec<ResharedProject>,
4007 pub channel_id: Option<ChannelId>,
4008 pub channel_members: Vec<UserId>,
4009}
4010
4011pub struct ResharedProject {
4012 pub id: ProjectId,
4013 pub old_connection_id: ConnectionId,
4014 pub collaborators: Vec<ProjectCollaborator>,
4015 pub worktrees: Vec<proto::WorktreeMetadata>,
4016}
4017
4018pub struct RejoinedProject {
4019 pub id: ProjectId,
4020 pub old_connection_id: ConnectionId,
4021 pub collaborators: Vec<ProjectCollaborator>,
4022 pub worktrees: Vec<RejoinedWorktree>,
4023 pub language_servers: Vec<proto::LanguageServer>,
4024}
4025
4026#[derive(Debug)]
4027pub struct RejoinedWorktree {
4028 pub id: u64,
4029 pub abs_path: String,
4030 pub root_name: String,
4031 pub visible: bool,
4032 pub updated_entries: Vec<proto::Entry>,
4033 pub removed_entries: Vec<u64>,
4034 pub updated_repositories: Vec<proto::RepositoryEntry>,
4035 pub removed_repositories: Vec<u64>,
4036 pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
4037 pub settings_files: Vec<WorktreeSettingsFile>,
4038 pub scan_id: u64,
4039 pub completed_scan_id: u64,
4040}
4041
4042pub struct LeftRoom {
4043 pub room: proto::Room,
4044 pub channel_id: Option<ChannelId>,
4045 pub channel_members: Vec<UserId>,
4046 pub left_projects: HashMap<ProjectId, LeftProject>,
4047 pub canceled_calls_to_user_ids: Vec<UserId>,
4048 pub deleted: bool,
4049}
4050
4051pub struct RefreshedRoom {
4052 pub room: proto::Room,
4053 pub channel_id: Option<ChannelId>,
4054 pub channel_members: Vec<UserId>,
4055 pub stale_participant_user_ids: Vec<UserId>,
4056 pub canceled_calls_to_user_ids: Vec<UserId>,
4057}
4058
4059pub struct Project {
4060 pub collaborators: Vec<ProjectCollaborator>,
4061 pub worktrees: BTreeMap<u64, Worktree>,
4062 pub language_servers: Vec<proto::LanguageServer>,
4063}
4064
4065pub struct ProjectCollaborator {
4066 pub connection_id: ConnectionId,
4067 pub user_id: UserId,
4068 pub replica_id: ReplicaId,
4069 pub is_host: bool,
4070}
4071
4072impl ProjectCollaborator {
4073 pub fn to_proto(&self) -> proto::Collaborator {
4074 proto::Collaborator {
4075 peer_id: Some(self.connection_id.into()),
4076 replica_id: self.replica_id.0 as u32,
4077 user_id: self.user_id.to_proto(),
4078 }
4079 }
4080}
4081
4082#[derive(Debug)]
4083pub struct LeftProject {
4084 pub id: ProjectId,
4085 pub host_user_id: UserId,
4086 pub host_connection_id: ConnectionId,
4087 pub connection_ids: Vec<ConnectionId>,
4088}
4089
4090pub struct Worktree {
4091 pub id: u64,
4092 pub abs_path: String,
4093 pub root_name: String,
4094 pub visible: bool,
4095 pub entries: Vec<proto::Entry>,
4096 pub repository_entries: BTreeMap<u64, proto::RepositoryEntry>,
4097 pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
4098 pub settings_files: Vec<WorktreeSettingsFile>,
4099 pub scan_id: u64,
4100 pub completed_scan_id: u64,
4101}
4102
4103#[derive(Debug)]
4104pub struct WorktreeSettingsFile {
4105 pub path: String,
4106 pub content: String,
4107}
4108
4109#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
4110enum QueryChannelIds {
4111 ChannelId,
4112}
4113
4114#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
4115enum QueryUserIds {
4116 UserId,
4117}
4118
4119#[cfg(test)]
4120pub use test::*;
4121
4122#[cfg(test)]
4123mod test {
4124 use super::*;
4125 use gpui::executor::Background;
4126 use parking_lot::Mutex;
4127 use sea_orm::ConnectionTrait;
4128 use sqlx::migrate::MigrateDatabase;
4129 use std::sync::Arc;
4130
4131 pub struct TestDb {
4132 pub db: Option<Arc<Database>>,
4133 pub connection: Option<sqlx::AnyConnection>,
4134 }
4135
4136 impl TestDb {
4137 pub fn sqlite(background: Arc<Background>) -> Self {
4138 let url = format!("sqlite::memory:");
4139 let runtime = tokio::runtime::Builder::new_current_thread()
4140 .enable_io()
4141 .enable_time()
4142 .build()
4143 .unwrap();
4144
4145 let mut db = runtime.block_on(async {
4146 let mut options = ConnectOptions::new(url);
4147 options.max_connections(5);
4148 let db = Database::new(options, Executor::Deterministic(background))
4149 .await
4150 .unwrap();
4151 let sql = include_str!(concat!(
4152 env!("CARGO_MANIFEST_DIR"),
4153 "/migrations.sqlite/20221109000000_test_schema.sql"
4154 ));
4155 db.pool
4156 .execute(sea_orm::Statement::from_string(
4157 db.pool.get_database_backend(),
4158 sql.into(),
4159 ))
4160 .await
4161 .unwrap();
4162 db
4163 });
4164
4165 db.runtime = Some(runtime);
4166
4167 Self {
4168 db: Some(Arc::new(db)),
4169 connection: None,
4170 }
4171 }
4172
4173 pub fn postgres(background: Arc<Background>) -> Self {
4174 static LOCK: Mutex<()> = Mutex::new(());
4175
4176 let _guard = LOCK.lock();
4177 let mut rng = StdRng::from_entropy();
4178 let url = format!(
4179 "postgres://postgres@localhost/zed-test-{}",
4180 rng.gen::<u128>()
4181 );
4182 let runtime = tokio::runtime::Builder::new_current_thread()
4183 .enable_io()
4184 .enable_time()
4185 .build()
4186 .unwrap();
4187
4188 let mut db = runtime.block_on(async {
4189 sqlx::Postgres::create_database(&url)
4190 .await
4191 .expect("failed to create test db");
4192 let mut options = ConnectOptions::new(url);
4193 options
4194 .max_connections(5)
4195 .idle_timeout(Duration::from_secs(0));
4196 let db = Database::new(options, Executor::Deterministic(background))
4197 .await
4198 .unwrap();
4199 let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
4200 db.migrate(Path::new(migrations_path), false).await.unwrap();
4201 db
4202 });
4203
4204 db.runtime = Some(runtime);
4205
4206 Self {
4207 db: Some(Arc::new(db)),
4208 connection: None,
4209 }
4210 }
4211
4212 pub fn db(&self) -> &Arc<Database> {
4213 self.db.as_ref().unwrap()
4214 }
4215 }
4216
4217 impl Drop for TestDb {
4218 fn drop(&mut self) {
4219 let db = self.db.take().unwrap();
4220 if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
4221 db.runtime.as_ref().unwrap().block_on(async {
4222 use util::ResultExt;
4223 let query = "
4224 SELECT pg_terminate_backend(pg_stat_activity.pid)
4225 FROM pg_stat_activity
4226 WHERE
4227 pg_stat_activity.datname = current_database() AND
4228 pid <> pg_backend_pid();
4229 ";
4230 db.pool
4231 .execute(sea_orm::Statement::from_string(
4232 db.pool.get_database_backend(),
4233 query.into(),
4234 ))
4235 .await
4236 .log_err();
4237 sqlx::Postgres::drop_database(db.options.get_url())
4238 .await
4239 .log_err();
4240 })
4241 }
4242 }
4243 }
4244}