1mod access_token;
2mod contact;
3mod language_server;
4mod project;
5mod project_collaborator;
6mod room;
7mod room_participant;
8mod server;
9mod signup;
10#[cfg(test)]
11mod tests;
12mod user;
13mod worktree;
14mod worktree_diagnostic_summary;
15mod worktree_entry;
16
17use crate::{Error, Result};
18use anyhow::anyhow;
19use collections::{BTreeMap, HashMap, HashSet};
20pub use contact::Contact;
21use dashmap::DashMap;
22use futures::StreamExt;
23use hyper::StatusCode;
24use rpc::{proto, ConnectionId};
25use sea_orm::Condition;
26pub use sea_orm::ConnectOptions;
27use sea_orm::{
28 entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
29 DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect,
30 Statement, TransactionTrait,
31};
32use sea_query::{Alias, Expr, OnConflict, Query};
33use serde::{Deserialize, Serialize};
34pub use signup::{Invite, NewSignup, WaitlistSummary};
35use sqlx::migrate::{Migrate, Migration, MigrationSource};
36use sqlx::Connection;
37use std::ops::{Deref, DerefMut};
38use std::path::Path;
39use std::time::Duration;
40use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
41use tokio::sync::{Mutex, OwnedMutexGuard};
42pub use user::Model as User;
43
44pub struct Database {
45 options: ConnectOptions,
46 pool: DatabaseConnection,
47 rooms: DashMap<RoomId, Arc<Mutex<()>>>,
48 #[cfg(test)]
49 background: Option<std::sync::Arc<gpui::executor::Background>>,
50 #[cfg(test)]
51 runtime: Option<tokio::runtime::Runtime>,
52}
53
54impl Database {
55 pub async fn new(options: ConnectOptions) -> Result<Self> {
56 Ok(Self {
57 options: options.clone(),
58 pool: sea_orm::Database::connect(options).await?,
59 rooms: DashMap::with_capacity(16384),
60 #[cfg(test)]
61 background: None,
62 #[cfg(test)]
63 runtime: None,
64 })
65 }
66
67 #[cfg(test)]
68 pub fn reset(&self) {
69 self.rooms.clear();
70 }
71
72 pub async fn migrate(
73 &self,
74 migrations_path: &Path,
75 ignore_checksum_mismatch: bool,
76 ) -> anyhow::Result<Vec<(Migration, Duration)>> {
77 let migrations = MigrationSource::resolve(migrations_path)
78 .await
79 .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
80
81 let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
82
83 connection.ensure_migrations_table().await?;
84 let applied_migrations: HashMap<_, _> = connection
85 .list_applied_migrations()
86 .await?
87 .into_iter()
88 .map(|m| (m.version, m))
89 .collect();
90
91 let mut new_migrations = Vec::new();
92 for migration in migrations {
93 match applied_migrations.get(&migration.version) {
94 Some(applied_migration) => {
95 if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
96 {
97 Err(anyhow!(
98 "checksum mismatch for applied migration {}",
99 migration.description
100 ))?;
101 }
102 }
103 None => {
104 let elapsed = connection.apply(&migration).await?;
105 new_migrations.push((migration, elapsed));
106 }
107 }
108 }
109
110 Ok(new_migrations)
111 }
112
113 pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
114 self.transaction(|tx| async move {
115 let server = server::ActiveModel {
116 environment: ActiveValue::set(environment.into()),
117 ..Default::default()
118 }
119 .insert(&*tx)
120 .await?;
121 Ok(server.id)
122 })
123 .await
124 }
125
126 pub async fn delete_stale_projects(
127 &self,
128 new_epoch: ServerId,
129 environment: &str,
130 ) -> Result<()> {
131 self.transaction(|tx| async move {
132 let stale_server_epochs = self.stale_server_ids(environment, new_epoch, &tx).await?;
133 project_collaborator::Entity::delete_many()
134 .filter(
135 project_collaborator::Column::ConnectionServerId
136 .is_in(stale_server_epochs.iter().copied()),
137 )
138 .exec(&*tx)
139 .await?;
140 project::Entity::delete_many()
141 .filter(
142 project::Column::HostConnectionServerId
143 .is_in(stale_server_epochs.iter().copied()),
144 )
145 .exec(&*tx)
146 .await?;
147 Ok(())
148 })
149 .await
150 }
151
152 pub async fn stale_room_ids(
153 &self,
154 new_epoch: ServerId,
155 environment: &str,
156 ) -> Result<Vec<RoomId>> {
157 self.transaction(|tx| async move {
158 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
159 enum QueryAs {
160 RoomId,
161 }
162
163 let stale_server_epochs = self.stale_server_ids(environment, new_epoch, &tx).await?;
164 Ok(room_participant::Entity::find()
165 .select_only()
166 .column(room_participant::Column::RoomId)
167 .distinct()
168 .filter(
169 room_participant::Column::AnsweringConnectionServerId
170 .is_in(stale_server_epochs),
171 )
172 .into_values::<_, QueryAs>()
173 .all(&*tx)
174 .await?)
175 })
176 .await
177 }
178
179 pub async fn refresh_room(
180 &self,
181 room_id: RoomId,
182 new_epoch: ServerId,
183 ) -> Result<RoomGuard<RefreshedRoom>> {
184 self.room_transaction(|tx| async move {
185 let stale_participant_filter = Condition::all()
186 .add(room_participant::Column::RoomId.eq(room_id))
187 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
188 .add(room_participant::Column::AnsweringConnectionServerId.ne(new_epoch));
189
190 let stale_participant_user_ids = room_participant::Entity::find()
191 .filter(stale_participant_filter.clone())
192 .all(&*tx)
193 .await?
194 .into_iter()
195 .map(|participant| participant.user_id)
196 .collect::<Vec<_>>();
197
198 // Delete participants who failed to reconnect.
199 room_participant::Entity::delete_many()
200 .filter(stale_participant_filter)
201 .exec(&*tx)
202 .await?;
203
204 let room = self.get_room(room_id, &tx).await?;
205 let mut canceled_calls_to_user_ids = Vec::new();
206 // Delete the room if it becomes empty and cancel pending calls.
207 if room.participants.is_empty() {
208 canceled_calls_to_user_ids.extend(
209 room.pending_participants
210 .iter()
211 .map(|pending_participant| UserId::from_proto(pending_participant.user_id)),
212 );
213 room_participant::Entity::delete_many()
214 .filter(room_participant::Column::RoomId.eq(room_id))
215 .exec(&*tx)
216 .await?;
217 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
218 }
219
220 Ok((
221 room_id,
222 RefreshedRoom {
223 room,
224 stale_participant_user_ids,
225 canceled_calls_to_user_ids,
226 },
227 ))
228 })
229 .await
230 }
231
232 pub async fn delete_stale_servers(&self, new_epoch: ServerId, environment: &str) -> Result<()> {
233 self.transaction(|tx| async move {
234 server::Entity::delete_many()
235 .filter(
236 Condition::all()
237 .add(server::Column::Environment.eq(environment))
238 .add(server::Column::Id.ne(new_epoch)),
239 )
240 .exec(&*tx)
241 .await?;
242 Ok(())
243 })
244 .await
245 }
246
247 async fn stale_server_ids(
248 &self,
249 environment: &str,
250 new_epoch: ServerId,
251 tx: &DatabaseTransaction,
252 ) -> Result<Vec<ServerId>> {
253 let stale_servers = server::Entity::find()
254 .filter(
255 Condition::all()
256 .add(server::Column::Environment.eq(environment))
257 .add(server::Column::Id.ne(new_epoch)),
258 )
259 .all(&*tx)
260 .await?;
261 Ok(stale_servers.into_iter().map(|server| server.id).collect())
262 }
263
264 // users
265
266 pub async fn create_user(
267 &self,
268 email_address: &str,
269 admin: bool,
270 params: NewUserParams,
271 ) -> Result<NewUserResult> {
272 self.transaction(|tx| async {
273 let tx = tx;
274 let user = user::Entity::insert(user::ActiveModel {
275 email_address: ActiveValue::set(Some(email_address.into())),
276 github_login: ActiveValue::set(params.github_login.clone()),
277 github_user_id: ActiveValue::set(Some(params.github_user_id)),
278 admin: ActiveValue::set(admin),
279 metrics_id: ActiveValue::set(Uuid::new_v4()),
280 ..Default::default()
281 })
282 .on_conflict(
283 OnConflict::column(user::Column::GithubLogin)
284 .update_column(user::Column::GithubLogin)
285 .to_owned(),
286 )
287 .exec_with_returning(&*tx)
288 .await?;
289
290 Ok(NewUserResult {
291 user_id: user.id,
292 metrics_id: user.metrics_id.to_string(),
293 signup_device_id: None,
294 inviting_user_id: None,
295 })
296 })
297 .await
298 }
299
300 pub async fn get_user_by_id(&self, id: UserId) -> Result<Option<user::Model>> {
301 self.transaction(|tx| async move { Ok(user::Entity::find_by_id(id).one(&*tx).await?) })
302 .await
303 }
304
305 pub async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<user::Model>> {
306 self.transaction(|tx| async {
307 let tx = tx;
308 Ok(user::Entity::find()
309 .filter(user::Column::Id.is_in(ids.iter().copied()))
310 .all(&*tx)
311 .await?)
312 })
313 .await
314 }
315
316 pub async fn get_user_by_github_account(
317 &self,
318 github_login: &str,
319 github_user_id: Option<i32>,
320 ) -> Result<Option<User>> {
321 self.transaction(|tx| async move {
322 let tx = &*tx;
323 if let Some(github_user_id) = github_user_id {
324 if let Some(user_by_github_user_id) = user::Entity::find()
325 .filter(user::Column::GithubUserId.eq(github_user_id))
326 .one(tx)
327 .await?
328 {
329 let mut user_by_github_user_id = user_by_github_user_id.into_active_model();
330 user_by_github_user_id.github_login = ActiveValue::set(github_login.into());
331 Ok(Some(user_by_github_user_id.update(tx).await?))
332 } else if let Some(user_by_github_login) = user::Entity::find()
333 .filter(user::Column::GithubLogin.eq(github_login))
334 .one(tx)
335 .await?
336 {
337 let mut user_by_github_login = user_by_github_login.into_active_model();
338 user_by_github_login.github_user_id = ActiveValue::set(Some(github_user_id));
339 Ok(Some(user_by_github_login.update(tx).await?))
340 } else {
341 Ok(None)
342 }
343 } else {
344 Ok(user::Entity::find()
345 .filter(user::Column::GithubLogin.eq(github_login))
346 .one(tx)
347 .await?)
348 }
349 })
350 .await
351 }
352
353 pub async fn get_all_users(&self, page: u32, limit: u32) -> Result<Vec<User>> {
354 self.transaction(|tx| async move {
355 Ok(user::Entity::find()
356 .order_by_asc(user::Column::GithubLogin)
357 .limit(limit as u64)
358 .offset(page as u64 * limit as u64)
359 .all(&*tx)
360 .await?)
361 })
362 .await
363 }
364
365 pub async fn get_users_with_no_invites(
366 &self,
367 invited_by_another_user: bool,
368 ) -> Result<Vec<User>> {
369 self.transaction(|tx| async move {
370 Ok(user::Entity::find()
371 .filter(
372 user::Column::InviteCount
373 .eq(0)
374 .and(if invited_by_another_user {
375 user::Column::InviterId.is_not_null()
376 } else {
377 user::Column::InviterId.is_null()
378 }),
379 )
380 .all(&*tx)
381 .await?)
382 })
383 .await
384 }
385
386 pub async fn get_user_metrics_id(&self, id: UserId) -> Result<String> {
387 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
388 enum QueryAs {
389 MetricsId,
390 }
391
392 self.transaction(|tx| async move {
393 let metrics_id: Uuid = user::Entity::find_by_id(id)
394 .select_only()
395 .column(user::Column::MetricsId)
396 .into_values::<_, QueryAs>()
397 .one(&*tx)
398 .await?
399 .ok_or_else(|| anyhow!("could not find user"))?;
400 Ok(metrics_id.to_string())
401 })
402 .await
403 }
404
405 pub async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
406 self.transaction(|tx| async move {
407 user::Entity::update_many()
408 .filter(user::Column::Id.eq(id))
409 .set(user::ActiveModel {
410 admin: ActiveValue::set(is_admin),
411 ..Default::default()
412 })
413 .exec(&*tx)
414 .await?;
415 Ok(())
416 })
417 .await
418 }
419
420 pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
421 self.transaction(|tx| async move {
422 user::Entity::update_many()
423 .filter(user::Column::Id.eq(id))
424 .set(user::ActiveModel {
425 connected_once: ActiveValue::set(connected_once),
426 ..Default::default()
427 })
428 .exec(&*tx)
429 .await?;
430 Ok(())
431 })
432 .await
433 }
434
435 pub async fn destroy_user(&self, id: UserId) -> Result<()> {
436 self.transaction(|tx| async move {
437 access_token::Entity::delete_many()
438 .filter(access_token::Column::UserId.eq(id))
439 .exec(&*tx)
440 .await?;
441 user::Entity::delete_by_id(id).exec(&*tx).await?;
442 Ok(())
443 })
444 .await
445 }
446
447 // contacts
448
449 pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
450 #[derive(Debug, FromQueryResult)]
451 struct ContactWithUserBusyStatuses {
452 user_id_a: UserId,
453 user_id_b: UserId,
454 a_to_b: bool,
455 accepted: bool,
456 should_notify: bool,
457 user_a_busy: bool,
458 user_b_busy: bool,
459 }
460
461 self.transaction(|tx| async move {
462 let user_a_participant = Alias::new("user_a_participant");
463 let user_b_participant = Alias::new("user_b_participant");
464 let mut db_contacts = contact::Entity::find()
465 .column_as(
466 Expr::tbl(user_a_participant.clone(), room_participant::Column::Id)
467 .is_not_null(),
468 "user_a_busy",
469 )
470 .column_as(
471 Expr::tbl(user_b_participant.clone(), room_participant::Column::Id)
472 .is_not_null(),
473 "user_b_busy",
474 )
475 .filter(
476 contact::Column::UserIdA
477 .eq(user_id)
478 .or(contact::Column::UserIdB.eq(user_id)),
479 )
480 .join_as(
481 JoinType::LeftJoin,
482 contact::Relation::UserARoomParticipant.def(),
483 user_a_participant,
484 )
485 .join_as(
486 JoinType::LeftJoin,
487 contact::Relation::UserBRoomParticipant.def(),
488 user_b_participant,
489 )
490 .into_model::<ContactWithUserBusyStatuses>()
491 .stream(&*tx)
492 .await?;
493
494 let mut contacts = Vec::new();
495 while let Some(db_contact) = db_contacts.next().await {
496 let db_contact = db_contact?;
497 if db_contact.user_id_a == user_id {
498 if db_contact.accepted {
499 contacts.push(Contact::Accepted {
500 user_id: db_contact.user_id_b,
501 should_notify: db_contact.should_notify && db_contact.a_to_b,
502 busy: db_contact.user_b_busy,
503 });
504 } else if db_contact.a_to_b {
505 contacts.push(Contact::Outgoing {
506 user_id: db_contact.user_id_b,
507 })
508 } else {
509 contacts.push(Contact::Incoming {
510 user_id: db_contact.user_id_b,
511 should_notify: db_contact.should_notify,
512 });
513 }
514 } else if db_contact.accepted {
515 contacts.push(Contact::Accepted {
516 user_id: db_contact.user_id_a,
517 should_notify: db_contact.should_notify && !db_contact.a_to_b,
518 busy: db_contact.user_a_busy,
519 });
520 } else if db_contact.a_to_b {
521 contacts.push(Contact::Incoming {
522 user_id: db_contact.user_id_a,
523 should_notify: db_contact.should_notify,
524 });
525 } else {
526 contacts.push(Contact::Outgoing {
527 user_id: db_contact.user_id_a,
528 });
529 }
530 }
531
532 contacts.sort_unstable_by_key(|contact| contact.user_id());
533
534 Ok(contacts)
535 })
536 .await
537 }
538
539 pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
540 self.transaction(|tx| async move {
541 let participant = room_participant::Entity::find()
542 .filter(room_participant::Column::UserId.eq(user_id))
543 .one(&*tx)
544 .await?;
545 Ok(participant.is_some())
546 })
547 .await
548 }
549
550 pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
551 self.transaction(|tx| async move {
552 let (id_a, id_b) = if user_id_1 < user_id_2 {
553 (user_id_1, user_id_2)
554 } else {
555 (user_id_2, user_id_1)
556 };
557
558 Ok(contact::Entity::find()
559 .filter(
560 contact::Column::UserIdA
561 .eq(id_a)
562 .and(contact::Column::UserIdB.eq(id_b))
563 .and(contact::Column::Accepted.eq(true)),
564 )
565 .one(&*tx)
566 .await?
567 .is_some())
568 })
569 .await
570 }
571
572 pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
573 self.transaction(|tx| async move {
574 let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
575 (sender_id, receiver_id, true)
576 } else {
577 (receiver_id, sender_id, false)
578 };
579
580 let rows_affected = contact::Entity::insert(contact::ActiveModel {
581 user_id_a: ActiveValue::set(id_a),
582 user_id_b: ActiveValue::set(id_b),
583 a_to_b: ActiveValue::set(a_to_b),
584 accepted: ActiveValue::set(false),
585 should_notify: ActiveValue::set(true),
586 ..Default::default()
587 })
588 .on_conflict(
589 OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB])
590 .values([
591 (contact::Column::Accepted, true.into()),
592 (contact::Column::ShouldNotify, false.into()),
593 ])
594 .action_and_where(
595 contact::Column::Accepted.eq(false).and(
596 contact::Column::AToB
597 .eq(a_to_b)
598 .and(contact::Column::UserIdA.eq(id_b))
599 .or(contact::Column::AToB
600 .ne(a_to_b)
601 .and(contact::Column::UserIdA.eq(id_a))),
602 ),
603 )
604 .to_owned(),
605 )
606 .exec_without_returning(&*tx)
607 .await?;
608
609 if rows_affected == 1 {
610 Ok(())
611 } else {
612 Err(anyhow!("contact already requested"))?
613 }
614 })
615 .await
616 }
617
618 pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<()> {
619 self.transaction(|tx| async move {
620 let (id_a, id_b) = if responder_id < requester_id {
621 (responder_id, requester_id)
622 } else {
623 (requester_id, responder_id)
624 };
625
626 let result = contact::Entity::delete_many()
627 .filter(
628 contact::Column::UserIdA
629 .eq(id_a)
630 .and(contact::Column::UserIdB.eq(id_b)),
631 )
632 .exec(&*tx)
633 .await?;
634
635 if result.rows_affected == 1 {
636 Ok(())
637 } else {
638 Err(anyhow!("no such contact"))?
639 }
640 })
641 .await
642 }
643
644 pub async fn dismiss_contact_notification(
645 &self,
646 user_id: UserId,
647 contact_user_id: UserId,
648 ) -> Result<()> {
649 self.transaction(|tx| async move {
650 let (id_a, id_b, a_to_b) = if user_id < contact_user_id {
651 (user_id, contact_user_id, true)
652 } else {
653 (contact_user_id, user_id, false)
654 };
655
656 let result = contact::Entity::update_many()
657 .set(contact::ActiveModel {
658 should_notify: ActiveValue::set(false),
659 ..Default::default()
660 })
661 .filter(
662 contact::Column::UserIdA
663 .eq(id_a)
664 .and(contact::Column::UserIdB.eq(id_b))
665 .and(
666 contact::Column::AToB
667 .eq(a_to_b)
668 .and(contact::Column::Accepted.eq(true))
669 .or(contact::Column::AToB
670 .ne(a_to_b)
671 .and(contact::Column::Accepted.eq(false))),
672 ),
673 )
674 .exec(&*tx)
675 .await?;
676 if result.rows_affected == 0 {
677 Err(anyhow!("no such contact request"))?
678 } else {
679 Ok(())
680 }
681 })
682 .await
683 }
684
685 pub async fn respond_to_contact_request(
686 &self,
687 responder_id: UserId,
688 requester_id: UserId,
689 accept: bool,
690 ) -> Result<()> {
691 self.transaction(|tx| async move {
692 let (id_a, id_b, a_to_b) = if responder_id < requester_id {
693 (responder_id, requester_id, false)
694 } else {
695 (requester_id, responder_id, true)
696 };
697 let rows_affected = if accept {
698 let result = contact::Entity::update_many()
699 .set(contact::ActiveModel {
700 accepted: ActiveValue::set(true),
701 should_notify: ActiveValue::set(true),
702 ..Default::default()
703 })
704 .filter(
705 contact::Column::UserIdA
706 .eq(id_a)
707 .and(contact::Column::UserIdB.eq(id_b))
708 .and(contact::Column::AToB.eq(a_to_b)),
709 )
710 .exec(&*tx)
711 .await?;
712 result.rows_affected
713 } else {
714 let result = contact::Entity::delete_many()
715 .filter(
716 contact::Column::UserIdA
717 .eq(id_a)
718 .and(contact::Column::UserIdB.eq(id_b))
719 .and(contact::Column::AToB.eq(a_to_b))
720 .and(contact::Column::Accepted.eq(false)),
721 )
722 .exec(&*tx)
723 .await?;
724
725 result.rows_affected
726 };
727
728 if rows_affected == 1 {
729 Ok(())
730 } else {
731 Err(anyhow!("no such contact request"))?
732 }
733 })
734 .await
735 }
736
737 pub fn fuzzy_like_string(string: &str) -> String {
738 let mut result = String::with_capacity(string.len() * 2 + 1);
739 for c in string.chars() {
740 if c.is_alphanumeric() {
741 result.push('%');
742 result.push(c);
743 }
744 }
745 result.push('%');
746 result
747 }
748
749 pub async fn fuzzy_search_users(&self, name_query: &str, limit: u32) -> Result<Vec<User>> {
750 self.transaction(|tx| async {
751 let tx = tx;
752 let like_string = Self::fuzzy_like_string(name_query);
753 let query = "
754 SELECT users.*
755 FROM users
756 WHERE github_login ILIKE $1
757 ORDER BY github_login <-> $2
758 LIMIT $3
759 ";
760
761 Ok(user::Entity::find()
762 .from_raw_sql(Statement::from_sql_and_values(
763 self.pool.get_database_backend(),
764 query.into(),
765 vec![like_string.into(), name_query.into(), limit.into()],
766 ))
767 .all(&*tx)
768 .await?)
769 })
770 .await
771 }
772
773 // signups
774
775 pub async fn create_signup(&self, signup: &NewSignup) -> Result<()> {
776 self.transaction(|tx| async move {
777 signup::Entity::insert(signup::ActiveModel {
778 email_address: ActiveValue::set(signup.email_address.clone()),
779 email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
780 email_confirmation_sent: ActiveValue::set(false),
781 platform_mac: ActiveValue::set(signup.platform_mac),
782 platform_windows: ActiveValue::set(signup.platform_windows),
783 platform_linux: ActiveValue::set(signup.platform_linux),
784 platform_unknown: ActiveValue::set(false),
785 editor_features: ActiveValue::set(Some(signup.editor_features.clone())),
786 programming_languages: ActiveValue::set(Some(signup.programming_languages.clone())),
787 device_id: ActiveValue::set(signup.device_id.clone()),
788 added_to_mailing_list: ActiveValue::set(signup.added_to_mailing_list),
789 ..Default::default()
790 })
791 .on_conflict(
792 OnConflict::column(signup::Column::EmailAddress)
793 .update_columns([
794 signup::Column::PlatformMac,
795 signup::Column::PlatformWindows,
796 signup::Column::PlatformLinux,
797 signup::Column::EditorFeatures,
798 signup::Column::ProgrammingLanguages,
799 signup::Column::DeviceId,
800 signup::Column::AddedToMailingList,
801 ])
802 .to_owned(),
803 )
804 .exec(&*tx)
805 .await?;
806 Ok(())
807 })
808 .await
809 }
810
811 pub async fn get_signup(&self, email_address: &str) -> Result<signup::Model> {
812 self.transaction(|tx| async move {
813 let signup = signup::Entity::find()
814 .filter(signup::Column::EmailAddress.eq(email_address))
815 .one(&*tx)
816 .await?
817 .ok_or_else(|| {
818 anyhow!("signup with email address {} doesn't exist", email_address)
819 })?;
820
821 Ok(signup)
822 })
823 .await
824 }
825
826 pub async fn get_waitlist_summary(&self) -> Result<WaitlistSummary> {
827 self.transaction(|tx| async move {
828 let query = "
829 SELECT
830 COUNT(*) as count,
831 COALESCE(SUM(CASE WHEN platform_linux THEN 1 ELSE 0 END), 0) as linux_count,
832 COALESCE(SUM(CASE WHEN platform_mac THEN 1 ELSE 0 END), 0) as mac_count,
833 COALESCE(SUM(CASE WHEN platform_windows THEN 1 ELSE 0 END), 0) as windows_count,
834 COALESCE(SUM(CASE WHEN platform_unknown THEN 1 ELSE 0 END), 0) as unknown_count
835 FROM (
836 SELECT *
837 FROM signups
838 WHERE
839 NOT email_confirmation_sent
840 ) AS unsent
841 ";
842 Ok(
843 WaitlistSummary::find_by_statement(Statement::from_sql_and_values(
844 self.pool.get_database_backend(),
845 query.into(),
846 vec![],
847 ))
848 .one(&*tx)
849 .await?
850 .ok_or_else(|| anyhow!("invalid result"))?,
851 )
852 })
853 .await
854 }
855
856 pub async fn record_sent_invites(&self, invites: &[Invite]) -> Result<()> {
857 let emails = invites
858 .iter()
859 .map(|s| s.email_address.as_str())
860 .collect::<Vec<_>>();
861 self.transaction(|tx| async {
862 let tx = tx;
863 signup::Entity::update_many()
864 .filter(signup::Column::EmailAddress.is_in(emails.iter().copied()))
865 .set(signup::ActiveModel {
866 email_confirmation_sent: ActiveValue::set(true),
867 ..Default::default()
868 })
869 .exec(&*tx)
870 .await?;
871 Ok(())
872 })
873 .await
874 }
875
876 pub async fn get_unsent_invites(&self, count: usize) -> Result<Vec<Invite>> {
877 self.transaction(|tx| async move {
878 Ok(signup::Entity::find()
879 .select_only()
880 .column(signup::Column::EmailAddress)
881 .column(signup::Column::EmailConfirmationCode)
882 .filter(
883 signup::Column::EmailConfirmationSent.eq(false).and(
884 signup::Column::PlatformMac
885 .eq(true)
886 .or(signup::Column::PlatformUnknown.eq(true)),
887 ),
888 )
889 .order_by_asc(signup::Column::CreatedAt)
890 .limit(count as u64)
891 .into_model()
892 .all(&*tx)
893 .await?)
894 })
895 .await
896 }
897
898 // invite codes
899
900 pub async fn create_invite_from_code(
901 &self,
902 code: &str,
903 email_address: &str,
904 device_id: Option<&str>,
905 ) -> Result<Invite> {
906 self.transaction(|tx| async move {
907 let existing_user = user::Entity::find()
908 .filter(user::Column::EmailAddress.eq(email_address))
909 .one(&*tx)
910 .await?;
911
912 if existing_user.is_some() {
913 Err(anyhow!("email address is already in use"))?;
914 }
915
916 let inviting_user_with_invites = match user::Entity::find()
917 .filter(
918 user::Column::InviteCode
919 .eq(code)
920 .and(user::Column::InviteCount.gt(0)),
921 )
922 .one(&*tx)
923 .await?
924 {
925 Some(inviting_user) => inviting_user,
926 None => {
927 return Err(Error::Http(
928 StatusCode::UNAUTHORIZED,
929 "unable to find an invite code with invites remaining".to_string(),
930 ))?
931 }
932 };
933 user::Entity::update_many()
934 .filter(
935 user::Column::Id
936 .eq(inviting_user_with_invites.id)
937 .and(user::Column::InviteCount.gt(0)),
938 )
939 .col_expr(
940 user::Column::InviteCount,
941 Expr::col(user::Column::InviteCount).sub(1),
942 )
943 .exec(&*tx)
944 .await?;
945
946 let signup = signup::Entity::insert(signup::ActiveModel {
947 email_address: ActiveValue::set(email_address.into()),
948 email_confirmation_code: ActiveValue::set(random_email_confirmation_code()),
949 email_confirmation_sent: ActiveValue::set(false),
950 inviting_user_id: ActiveValue::set(Some(inviting_user_with_invites.id)),
951 platform_linux: ActiveValue::set(false),
952 platform_mac: ActiveValue::set(false),
953 platform_windows: ActiveValue::set(false),
954 platform_unknown: ActiveValue::set(true),
955 device_id: ActiveValue::set(device_id.map(|device_id| device_id.into())),
956 ..Default::default()
957 })
958 .on_conflict(
959 OnConflict::column(signup::Column::EmailAddress)
960 .update_column(signup::Column::InvitingUserId)
961 .to_owned(),
962 )
963 .exec_with_returning(&*tx)
964 .await?;
965
966 Ok(Invite {
967 email_address: signup.email_address,
968 email_confirmation_code: signup.email_confirmation_code,
969 })
970 })
971 .await
972 }
973
974 pub async fn create_user_from_invite(
975 &self,
976 invite: &Invite,
977 user: NewUserParams,
978 ) -> Result<Option<NewUserResult>> {
979 self.transaction(|tx| async {
980 let tx = tx;
981 let signup = signup::Entity::find()
982 .filter(
983 signup::Column::EmailAddress
984 .eq(invite.email_address.as_str())
985 .and(
986 signup::Column::EmailConfirmationCode
987 .eq(invite.email_confirmation_code.as_str()),
988 ),
989 )
990 .one(&*tx)
991 .await?
992 .ok_or_else(|| Error::Http(StatusCode::NOT_FOUND, "no such invite".to_string()))?;
993
994 if signup.user_id.is_some() {
995 return Ok(None);
996 }
997
998 let user = user::Entity::insert(user::ActiveModel {
999 email_address: ActiveValue::set(Some(invite.email_address.clone())),
1000 github_login: ActiveValue::set(user.github_login.clone()),
1001 github_user_id: ActiveValue::set(Some(user.github_user_id)),
1002 admin: ActiveValue::set(false),
1003 invite_count: ActiveValue::set(user.invite_count),
1004 invite_code: ActiveValue::set(Some(random_invite_code())),
1005 metrics_id: ActiveValue::set(Uuid::new_v4()),
1006 ..Default::default()
1007 })
1008 .on_conflict(
1009 OnConflict::column(user::Column::GithubLogin)
1010 .update_columns([
1011 user::Column::EmailAddress,
1012 user::Column::GithubUserId,
1013 user::Column::Admin,
1014 ])
1015 .to_owned(),
1016 )
1017 .exec_with_returning(&*tx)
1018 .await?;
1019
1020 let mut signup = signup.into_active_model();
1021 signup.user_id = ActiveValue::set(Some(user.id));
1022 let signup = signup.update(&*tx).await?;
1023
1024 if let Some(inviting_user_id) = signup.inviting_user_id {
1025 let (user_id_a, user_id_b, a_to_b) = if inviting_user_id < user.id {
1026 (inviting_user_id, user.id, true)
1027 } else {
1028 (user.id, inviting_user_id, false)
1029 };
1030
1031 contact::Entity::insert(contact::ActiveModel {
1032 user_id_a: ActiveValue::set(user_id_a),
1033 user_id_b: ActiveValue::set(user_id_b),
1034 a_to_b: ActiveValue::set(a_to_b),
1035 should_notify: ActiveValue::set(true),
1036 accepted: ActiveValue::set(true),
1037 ..Default::default()
1038 })
1039 .on_conflict(OnConflict::new().do_nothing().to_owned())
1040 .exec_without_returning(&*tx)
1041 .await?;
1042 }
1043
1044 Ok(Some(NewUserResult {
1045 user_id: user.id,
1046 metrics_id: user.metrics_id.to_string(),
1047 inviting_user_id: signup.inviting_user_id,
1048 signup_device_id: signup.device_id,
1049 }))
1050 })
1051 .await
1052 }
1053
1054 pub async fn set_invite_count_for_user(&self, id: UserId, count: i32) -> Result<()> {
1055 self.transaction(|tx| async move {
1056 if count > 0 {
1057 user::Entity::update_many()
1058 .filter(
1059 user::Column::Id
1060 .eq(id)
1061 .and(user::Column::InviteCode.is_null()),
1062 )
1063 .set(user::ActiveModel {
1064 invite_code: ActiveValue::set(Some(random_invite_code())),
1065 ..Default::default()
1066 })
1067 .exec(&*tx)
1068 .await?;
1069 }
1070
1071 user::Entity::update_many()
1072 .filter(user::Column::Id.eq(id))
1073 .set(user::ActiveModel {
1074 invite_count: ActiveValue::set(count),
1075 ..Default::default()
1076 })
1077 .exec(&*tx)
1078 .await?;
1079 Ok(())
1080 })
1081 .await
1082 }
1083
1084 pub async fn get_invite_code_for_user(&self, id: UserId) -> Result<Option<(String, i32)>> {
1085 self.transaction(|tx| async move {
1086 match user::Entity::find_by_id(id).one(&*tx).await? {
1087 Some(user) if user.invite_code.is_some() => {
1088 Ok(Some((user.invite_code.unwrap(), user.invite_count)))
1089 }
1090 _ => Ok(None),
1091 }
1092 })
1093 .await
1094 }
1095
1096 pub async fn get_user_for_invite_code(&self, code: &str) -> Result<User> {
1097 self.transaction(|tx| async move {
1098 user::Entity::find()
1099 .filter(user::Column::InviteCode.eq(code))
1100 .one(&*tx)
1101 .await?
1102 .ok_or_else(|| {
1103 Error::Http(
1104 StatusCode::NOT_FOUND,
1105 "that invite code does not exist".to_string(),
1106 )
1107 })
1108 })
1109 .await
1110 }
1111
1112 // rooms
1113
1114 pub async fn incoming_call_for_user(
1115 &self,
1116 user_id: UserId,
1117 ) -> Result<Option<proto::IncomingCall>> {
1118 self.transaction(|tx| async move {
1119 let pending_participant = room_participant::Entity::find()
1120 .filter(
1121 room_participant::Column::UserId
1122 .eq(user_id)
1123 .and(room_participant::Column::AnsweringConnectionId.is_null()),
1124 )
1125 .one(&*tx)
1126 .await?;
1127
1128 if let Some(pending_participant) = pending_participant {
1129 let room = self.get_room(pending_participant.room_id, &tx).await?;
1130 Ok(Self::build_incoming_call(&room, user_id))
1131 } else {
1132 Ok(None)
1133 }
1134 })
1135 .await
1136 }
1137
1138 pub async fn create_room(
1139 &self,
1140 user_id: UserId,
1141 connection: ConnectionId,
1142 live_kit_room: &str,
1143 ) -> Result<RoomGuard<proto::Room>> {
1144 self.room_transaction(|tx| async move {
1145 let room = room::ActiveModel {
1146 live_kit_room: ActiveValue::set(live_kit_room.into()),
1147 ..Default::default()
1148 }
1149 .insert(&*tx)
1150 .await?;
1151 let room_id = room.id;
1152
1153 room_participant::ActiveModel {
1154 room_id: ActiveValue::set(room_id),
1155 user_id: ActiveValue::set(user_id),
1156 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1157 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1158 connection.epoch as i32,
1159 ))),
1160 answering_connection_lost: ActiveValue::set(false),
1161 calling_user_id: ActiveValue::set(user_id),
1162 calling_connection_id: ActiveValue::set(connection.id as i32),
1163 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1164 connection.epoch as i32,
1165 ))),
1166 ..Default::default()
1167 }
1168 .insert(&*tx)
1169 .await?;
1170
1171 let room = self.get_room(room_id, &tx).await?;
1172 Ok((room_id, room))
1173 })
1174 .await
1175 }
1176
1177 pub async fn call(
1178 &self,
1179 room_id: RoomId,
1180 calling_user_id: UserId,
1181 calling_connection: ConnectionId,
1182 called_user_id: UserId,
1183 initial_project_id: Option<ProjectId>,
1184 ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
1185 self.room_transaction(|tx| async move {
1186 room_participant::ActiveModel {
1187 room_id: ActiveValue::set(room_id),
1188 user_id: ActiveValue::set(called_user_id),
1189 answering_connection_lost: ActiveValue::set(false),
1190 calling_user_id: ActiveValue::set(calling_user_id),
1191 calling_connection_id: ActiveValue::set(calling_connection.id as i32),
1192 calling_connection_server_id: ActiveValue::set(Some(ServerId(
1193 calling_connection.epoch as i32,
1194 ))),
1195 initial_project_id: ActiveValue::set(initial_project_id),
1196 ..Default::default()
1197 }
1198 .insert(&*tx)
1199 .await?;
1200
1201 let room = self.get_room(room_id, &tx).await?;
1202 let incoming_call = Self::build_incoming_call(&room, called_user_id)
1203 .ok_or_else(|| anyhow!("failed to build incoming call"))?;
1204 Ok((room_id, (room, incoming_call)))
1205 })
1206 .await
1207 }
1208
1209 pub async fn call_failed(
1210 &self,
1211 room_id: RoomId,
1212 called_user_id: UserId,
1213 ) -> Result<RoomGuard<proto::Room>> {
1214 self.room_transaction(|tx| async move {
1215 room_participant::Entity::delete_many()
1216 .filter(
1217 room_participant::Column::RoomId
1218 .eq(room_id)
1219 .and(room_participant::Column::UserId.eq(called_user_id)),
1220 )
1221 .exec(&*tx)
1222 .await?;
1223 let room = self.get_room(room_id, &tx).await?;
1224 Ok((room_id, room))
1225 })
1226 .await
1227 }
1228
1229 pub async fn decline_call(
1230 &self,
1231 expected_room_id: Option<RoomId>,
1232 user_id: UserId,
1233 ) -> Result<RoomGuard<proto::Room>> {
1234 self.room_transaction(|tx| async move {
1235 let participant = room_participant::Entity::find()
1236 .filter(
1237 room_participant::Column::UserId
1238 .eq(user_id)
1239 .and(room_participant::Column::AnsweringConnectionId.is_null()),
1240 )
1241 .one(&*tx)
1242 .await?
1243 .ok_or_else(|| anyhow!("could not decline call"))?;
1244 let room_id = participant.room_id;
1245
1246 if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) {
1247 return Err(anyhow!("declining call on unexpected room"))?;
1248 }
1249
1250 room_participant::Entity::delete(participant.into_active_model())
1251 .exec(&*tx)
1252 .await?;
1253
1254 let room = self.get_room(room_id, &tx).await?;
1255 Ok((room_id, room))
1256 })
1257 .await
1258 }
1259
1260 pub async fn cancel_call(
1261 &self,
1262 expected_room_id: Option<RoomId>,
1263 calling_connection: ConnectionId,
1264 called_user_id: UserId,
1265 ) -> Result<RoomGuard<proto::Room>> {
1266 self.room_transaction(|tx| async move {
1267 let participant = room_participant::Entity::find()
1268 .filter(
1269 Condition::all()
1270 .add(room_participant::Column::UserId.eq(called_user_id))
1271 .add(
1272 room_participant::Column::CallingConnectionId
1273 .eq(calling_connection.id as i32),
1274 )
1275 .add(
1276 room_participant::Column::CallingConnectionServerId
1277 .eq(calling_connection.epoch as i32),
1278 )
1279 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1280 )
1281 .one(&*tx)
1282 .await?
1283 .ok_or_else(|| anyhow!("could not cancel call"))?;
1284 let room_id = participant.room_id;
1285 if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) {
1286 return Err(anyhow!("canceling call on unexpected room"))?;
1287 }
1288
1289 room_participant::Entity::delete(participant.into_active_model())
1290 .exec(&*tx)
1291 .await?;
1292
1293 let room = self.get_room(room_id, &tx).await?;
1294 Ok((room_id, room))
1295 })
1296 .await
1297 }
1298
1299 pub async fn join_room(
1300 &self,
1301 room_id: RoomId,
1302 user_id: UserId,
1303 connection: ConnectionId,
1304 ) -> Result<RoomGuard<proto::Room>> {
1305 self.room_transaction(|tx| async move {
1306 let result = room_participant::Entity::update_many()
1307 .filter(
1308 Condition::all()
1309 .add(room_participant::Column::RoomId.eq(room_id))
1310 .add(room_participant::Column::UserId.eq(user_id))
1311 .add(
1312 Condition::any()
1313 .add(room_participant::Column::AnsweringConnectionId.is_null())
1314 .add(room_participant::Column::AnsweringConnectionLost.eq(true))
1315 .add(
1316 room_participant::Column::AnsweringConnectionServerId
1317 .ne(connection.epoch as i32),
1318 ),
1319 ),
1320 )
1321 .set(room_participant::ActiveModel {
1322 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
1323 answering_connection_server_id: ActiveValue::set(Some(ServerId(
1324 connection.epoch as i32,
1325 ))),
1326 answering_connection_lost: ActiveValue::set(false),
1327 ..Default::default()
1328 })
1329 .exec(&*tx)
1330 .await?;
1331 if result.rows_affected == 0 {
1332 Err(anyhow!("room does not exist or was already joined"))?
1333 } else {
1334 let room = self.get_room(room_id, &tx).await?;
1335 Ok((room_id, room))
1336 }
1337 })
1338 .await
1339 }
1340
1341 pub async fn leave_room(&self, connection: ConnectionId) -> Result<RoomGuard<LeftRoom>> {
1342 self.room_transaction(|tx| async move {
1343 let leaving_participant = room_participant::Entity::find()
1344 .filter(
1345 Condition::all()
1346 .add(
1347 room_participant::Column::AnsweringConnectionId
1348 .eq(connection.id as i32),
1349 )
1350 .add(
1351 room_participant::Column::AnsweringConnectionServerId
1352 .eq(connection.epoch as i32),
1353 ),
1354 )
1355 .one(&*tx)
1356 .await?;
1357
1358 if let Some(leaving_participant) = leaving_participant {
1359 // Leave room.
1360 let room_id = leaving_participant.room_id;
1361 room_participant::Entity::delete_by_id(leaving_participant.id)
1362 .exec(&*tx)
1363 .await?;
1364
1365 // Cancel pending calls initiated by the leaving user.
1366 let called_participants = room_participant::Entity::find()
1367 .filter(
1368 Condition::all()
1369 .add(
1370 room_participant::Column::CallingConnectionId
1371 .eq(connection.id as i32),
1372 )
1373 .add(
1374 room_participant::Column::CallingConnectionServerId
1375 .eq(connection.epoch as i32),
1376 )
1377 .add(room_participant::Column::AnsweringConnectionId.is_null()),
1378 )
1379 .all(&*tx)
1380 .await?;
1381 room_participant::Entity::delete_many()
1382 .filter(
1383 room_participant::Column::Id
1384 .is_in(called_participants.iter().map(|participant| participant.id)),
1385 )
1386 .exec(&*tx)
1387 .await?;
1388 let canceled_calls_to_user_ids = called_participants
1389 .into_iter()
1390 .map(|participant| participant.user_id)
1391 .collect();
1392
1393 // Detect left projects.
1394 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1395 enum QueryProjectIds {
1396 ProjectId,
1397 }
1398 let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
1399 .select_only()
1400 .column_as(
1401 project_collaborator::Column::ProjectId,
1402 QueryProjectIds::ProjectId,
1403 )
1404 .filter(
1405 Condition::all()
1406 .add(
1407 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1408 )
1409 .add(
1410 project_collaborator::Column::ConnectionServerId
1411 .eq(connection.epoch as i32),
1412 ),
1413 )
1414 .into_values::<_, QueryProjectIds>()
1415 .all(&*tx)
1416 .await?;
1417 let mut left_projects = HashMap::default();
1418 let mut collaborators = project_collaborator::Entity::find()
1419 .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
1420 .stream(&*tx)
1421 .await?;
1422 while let Some(collaborator) = collaborators.next().await {
1423 let collaborator = collaborator?;
1424 let left_project =
1425 left_projects
1426 .entry(collaborator.project_id)
1427 .or_insert(LeftProject {
1428 id: collaborator.project_id,
1429 host_user_id: Default::default(),
1430 connection_ids: Default::default(),
1431 host_connection_id: Default::default(),
1432 });
1433
1434 let collaborator_connection_id = ConnectionId {
1435 epoch: collaborator.connection_server_id.0 as u32,
1436 id: collaborator.connection_id as u32,
1437 };
1438 if collaborator_connection_id != connection {
1439 left_project.connection_ids.push(collaborator_connection_id);
1440 }
1441
1442 if collaborator.is_host {
1443 left_project.host_user_id = collaborator.user_id;
1444 left_project.host_connection_id = collaborator_connection_id;
1445 }
1446 }
1447 drop(collaborators);
1448
1449 // Leave projects.
1450 project_collaborator::Entity::delete_many()
1451 .filter(
1452 Condition::all()
1453 .add(
1454 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1455 )
1456 .add(
1457 project_collaborator::Column::ConnectionServerId
1458 .eq(connection.epoch as i32),
1459 ),
1460 )
1461 .exec(&*tx)
1462 .await?;
1463
1464 // Unshare projects.
1465 project::Entity::delete_many()
1466 .filter(
1467 Condition::all()
1468 .add(project::Column::RoomId.eq(room_id))
1469 .add(project::Column::HostConnectionId.eq(connection.id as i32))
1470 .add(
1471 project::Column::HostConnectionServerId.eq(connection.epoch as i32),
1472 ),
1473 )
1474 .exec(&*tx)
1475 .await?;
1476
1477 let room = self.get_room(room_id, &tx).await?;
1478 if room.participants.is_empty() {
1479 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
1480 }
1481
1482 let left_room = LeftRoom {
1483 room,
1484 left_projects,
1485 canceled_calls_to_user_ids,
1486 };
1487
1488 if left_room.room.participants.is_empty() {
1489 self.rooms.remove(&room_id);
1490 }
1491
1492 Ok((room_id, left_room))
1493 } else {
1494 Err(anyhow!("could not leave room"))?
1495 }
1496 })
1497 .await
1498 }
1499
1500 pub async fn update_room_participant_location(
1501 &self,
1502 room_id: RoomId,
1503 connection: ConnectionId,
1504 location: proto::ParticipantLocation,
1505 ) -> Result<RoomGuard<proto::Room>> {
1506 self.room_transaction(|tx| async {
1507 let tx = tx;
1508 let location_kind;
1509 let location_project_id;
1510 match location
1511 .variant
1512 .as_ref()
1513 .ok_or_else(|| anyhow!("invalid location"))?
1514 {
1515 proto::participant_location::Variant::SharedProject(project) => {
1516 location_kind = 0;
1517 location_project_id = Some(ProjectId::from_proto(project.id));
1518 }
1519 proto::participant_location::Variant::UnsharedProject(_) => {
1520 location_kind = 1;
1521 location_project_id = None;
1522 }
1523 proto::participant_location::Variant::External(_) => {
1524 location_kind = 2;
1525 location_project_id = None;
1526 }
1527 }
1528
1529 let result = room_participant::Entity::update_many()
1530 .filter(
1531 Condition::all()
1532 .add(room_participant::Column::RoomId.eq(room_id))
1533 .add(
1534 room_participant::Column::AnsweringConnectionId
1535 .eq(connection.id as i32),
1536 )
1537 .add(
1538 room_participant::Column::AnsweringConnectionServerId
1539 .eq(connection.epoch as i32),
1540 ),
1541 )
1542 .set(room_participant::ActiveModel {
1543 location_kind: ActiveValue::set(Some(location_kind)),
1544 location_project_id: ActiveValue::set(location_project_id),
1545 ..Default::default()
1546 })
1547 .exec(&*tx)
1548 .await?;
1549
1550 if result.rows_affected == 1 {
1551 let room = self.get_room(room_id, &tx).await?;
1552 Ok((room_id, room))
1553 } else {
1554 Err(anyhow!("could not update room participant location"))?
1555 }
1556 })
1557 .await
1558 }
1559
1560 pub async fn connection_lost(
1561 &self,
1562 connection: ConnectionId,
1563 ) -> Result<RoomGuard<Vec<LeftProject>>> {
1564 self.room_transaction(|tx| async move {
1565 let participant = room_participant::Entity::find()
1566 .filter(
1567 Condition::all()
1568 .add(
1569 room_participant::Column::AnsweringConnectionId
1570 .eq(connection.id as i32),
1571 )
1572 .add(
1573 room_participant::Column::AnsweringConnectionServerId
1574 .eq(connection.epoch as i32),
1575 ),
1576 )
1577 .one(&*tx)
1578 .await?
1579 .ok_or_else(|| anyhow!("not a participant in any room"))?;
1580 let room_id = participant.room_id;
1581
1582 room_participant::Entity::update(room_participant::ActiveModel {
1583 answering_connection_lost: ActiveValue::set(true),
1584 ..participant.into_active_model()
1585 })
1586 .exec(&*tx)
1587 .await?;
1588
1589 let collaborator_on_projects = project_collaborator::Entity::find()
1590 .find_also_related(project::Entity)
1591 .filter(
1592 Condition::all()
1593 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
1594 .add(
1595 project_collaborator::Column::ConnectionServerId
1596 .eq(connection.epoch as i32),
1597 ),
1598 )
1599 .all(&*tx)
1600 .await?;
1601 project_collaborator::Entity::delete_many()
1602 .filter(
1603 Condition::all()
1604 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
1605 .add(
1606 project_collaborator::Column::ConnectionServerId
1607 .eq(connection.epoch as i32),
1608 ),
1609 )
1610 .exec(&*tx)
1611 .await?;
1612
1613 let mut left_projects = Vec::new();
1614 for (_, project) in collaborator_on_projects {
1615 if let Some(project) = project {
1616 let collaborators = project
1617 .find_related(project_collaborator::Entity)
1618 .all(&*tx)
1619 .await?;
1620 let connection_ids = collaborators
1621 .into_iter()
1622 .map(|collaborator| ConnectionId {
1623 id: collaborator.connection_id as u32,
1624 epoch: collaborator.connection_server_id.0 as u32,
1625 })
1626 .collect();
1627
1628 left_projects.push(LeftProject {
1629 id: project.id,
1630 host_user_id: project.host_user_id,
1631 host_connection_id: ConnectionId {
1632 id: project.host_connection_id as u32,
1633 epoch: project.host_connection_server_id.0 as u32,
1634 },
1635 connection_ids,
1636 });
1637 }
1638 }
1639
1640 project::Entity::delete_many()
1641 .filter(
1642 Condition::all()
1643 .add(project::Column::HostConnectionId.eq(connection.id as i32))
1644 .add(project::Column::HostConnectionServerId.eq(connection.epoch as i32)),
1645 )
1646 .exec(&*tx)
1647 .await?;
1648
1649 Ok((room_id, left_projects))
1650 })
1651 .await
1652 }
1653
1654 fn build_incoming_call(
1655 room: &proto::Room,
1656 called_user_id: UserId,
1657 ) -> Option<proto::IncomingCall> {
1658 let pending_participant = room
1659 .pending_participants
1660 .iter()
1661 .find(|participant| participant.user_id == called_user_id.to_proto())?;
1662
1663 Some(proto::IncomingCall {
1664 room_id: room.id,
1665 calling_user_id: pending_participant.calling_user_id,
1666 participant_user_ids: room
1667 .participants
1668 .iter()
1669 .map(|participant| participant.user_id)
1670 .collect(),
1671 initial_project: room.participants.iter().find_map(|participant| {
1672 let initial_project_id = pending_participant.initial_project_id?;
1673 participant
1674 .projects
1675 .iter()
1676 .find(|project| project.id == initial_project_id)
1677 .cloned()
1678 }),
1679 })
1680 }
1681
1682 async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1683 let db_room = room::Entity::find_by_id(room_id)
1684 .one(tx)
1685 .await?
1686 .ok_or_else(|| anyhow!("could not find room"))?;
1687
1688 let mut db_participants = db_room
1689 .find_related(room_participant::Entity)
1690 .stream(tx)
1691 .await?;
1692 let mut participants = HashMap::default();
1693 let mut pending_participants = Vec::new();
1694 while let Some(db_participant) = db_participants.next().await {
1695 let db_participant = db_participant?;
1696 if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
1697 .answering_connection_id
1698 .zip(db_participant.answering_connection_server_id)
1699 {
1700 let location = match (
1701 db_participant.location_kind,
1702 db_participant.location_project_id,
1703 ) {
1704 (Some(0), Some(project_id)) => {
1705 Some(proto::participant_location::Variant::SharedProject(
1706 proto::participant_location::SharedProject {
1707 id: project_id.to_proto(),
1708 },
1709 ))
1710 }
1711 (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1712 Default::default(),
1713 )),
1714 _ => Some(proto::participant_location::Variant::External(
1715 Default::default(),
1716 )),
1717 };
1718
1719 let answering_connection = ConnectionId {
1720 epoch: answering_connection_server_id.0 as u32,
1721 id: answering_connection_id as u32,
1722 };
1723 participants.insert(
1724 answering_connection,
1725 proto::Participant {
1726 user_id: db_participant.user_id.to_proto(),
1727 peer_id: Some(answering_connection.into()),
1728 projects: Default::default(),
1729 location: Some(proto::ParticipantLocation { variant: location }),
1730 },
1731 );
1732 } else {
1733 pending_participants.push(proto::PendingParticipant {
1734 user_id: db_participant.user_id.to_proto(),
1735 calling_user_id: db_participant.calling_user_id.to_proto(),
1736 initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1737 });
1738 }
1739 }
1740 drop(db_participants);
1741
1742 let mut db_projects = db_room
1743 .find_related(project::Entity)
1744 .find_with_related(worktree::Entity)
1745 .stream(tx)
1746 .await?;
1747
1748 while let Some(row) = db_projects.next().await {
1749 let (db_project, db_worktree) = row?;
1750 let host_connection = ConnectionId {
1751 epoch: db_project.host_connection_server_id.0 as u32,
1752 id: db_project.host_connection_id as u32,
1753 };
1754 if let Some(participant) = participants.get_mut(&host_connection) {
1755 let project = if let Some(project) = participant
1756 .projects
1757 .iter_mut()
1758 .find(|project| project.id == db_project.id.to_proto())
1759 {
1760 project
1761 } else {
1762 participant.projects.push(proto::ParticipantProject {
1763 id: db_project.id.to_proto(),
1764 worktree_root_names: Default::default(),
1765 });
1766 participant.projects.last_mut().unwrap()
1767 };
1768
1769 if let Some(db_worktree) = db_worktree {
1770 project.worktree_root_names.push(db_worktree.root_name);
1771 }
1772 }
1773 }
1774
1775 Ok(proto::Room {
1776 id: db_room.id.to_proto(),
1777 live_kit_room: db_room.live_kit_room,
1778 participants: participants.into_values().collect(),
1779 pending_participants,
1780 })
1781 }
1782
1783 // projects
1784
1785 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
1786 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
1787 enum QueryAs {
1788 Count,
1789 }
1790
1791 self.transaction(|tx| async move {
1792 Ok(project::Entity::find()
1793 .select_only()
1794 .column_as(project::Column::Id.count(), QueryAs::Count)
1795 .inner_join(user::Entity)
1796 .filter(user::Column::Admin.eq(false))
1797 .into_values::<_, QueryAs>()
1798 .one(&*tx)
1799 .await?
1800 .unwrap_or(0i64) as usize)
1801 })
1802 .await
1803 }
1804
1805 pub async fn share_project(
1806 &self,
1807 room_id: RoomId,
1808 connection: ConnectionId,
1809 worktrees: &[proto::WorktreeMetadata],
1810 ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
1811 self.room_transaction(|tx| async move {
1812 let participant = room_participant::Entity::find()
1813 .filter(
1814 Condition::all()
1815 .add(
1816 room_participant::Column::AnsweringConnectionId
1817 .eq(connection.id as i32),
1818 )
1819 .add(
1820 room_participant::Column::AnsweringConnectionServerId
1821 .eq(connection.epoch as i32),
1822 ),
1823 )
1824 .one(&*tx)
1825 .await?
1826 .ok_or_else(|| anyhow!("could not find participant"))?;
1827 if participant.room_id != room_id {
1828 return Err(anyhow!("shared project on unexpected room"))?;
1829 }
1830
1831 let project = project::ActiveModel {
1832 room_id: ActiveValue::set(participant.room_id),
1833 host_user_id: ActiveValue::set(participant.user_id),
1834 host_connection_id: ActiveValue::set(connection.id as i32),
1835 host_connection_server_id: ActiveValue::set(ServerId(connection.epoch as i32)),
1836 ..Default::default()
1837 }
1838 .insert(&*tx)
1839 .await?;
1840
1841 if !worktrees.is_empty() {
1842 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
1843 worktree::ActiveModel {
1844 id: ActiveValue::set(worktree.id as i64),
1845 project_id: ActiveValue::set(project.id),
1846 abs_path: ActiveValue::set(worktree.abs_path.clone()),
1847 root_name: ActiveValue::set(worktree.root_name.clone()),
1848 visible: ActiveValue::set(worktree.visible),
1849 scan_id: ActiveValue::set(0),
1850 is_complete: ActiveValue::set(false),
1851 }
1852 }))
1853 .exec(&*tx)
1854 .await?;
1855 }
1856
1857 project_collaborator::ActiveModel {
1858 project_id: ActiveValue::set(project.id),
1859 connection_id: ActiveValue::set(connection.id as i32),
1860 connection_server_id: ActiveValue::set(ServerId(connection.epoch as i32)),
1861 user_id: ActiveValue::set(participant.user_id),
1862 replica_id: ActiveValue::set(ReplicaId(0)),
1863 is_host: ActiveValue::set(true),
1864 ..Default::default()
1865 }
1866 .insert(&*tx)
1867 .await?;
1868
1869 let room = self.get_room(room_id, &tx).await?;
1870 Ok((room_id, (project.id, room)))
1871 })
1872 .await
1873 }
1874
1875 pub async fn unshare_project(
1876 &self,
1877 project_id: ProjectId,
1878 connection: ConnectionId,
1879 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
1880 self.room_transaction(|tx| async move {
1881 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
1882
1883 let project = project::Entity::find_by_id(project_id)
1884 .one(&*tx)
1885 .await?
1886 .ok_or_else(|| anyhow!("project not found"))?;
1887 let host_connection = ConnectionId {
1888 epoch: project.host_connection_server_id.0 as u32,
1889 id: project.host_connection_id as u32,
1890 };
1891 if host_connection == connection {
1892 let room_id = project.room_id;
1893 project::Entity::delete(project.into_active_model())
1894 .exec(&*tx)
1895 .await?;
1896 let room = self.get_room(room_id, &tx).await?;
1897 Ok((room_id, (room, guest_connection_ids)))
1898 } else {
1899 Err(anyhow!("cannot unshare a project hosted by another user"))?
1900 }
1901 })
1902 .await
1903 }
1904
1905 pub async fn update_project(
1906 &self,
1907 project_id: ProjectId,
1908 connection: ConnectionId,
1909 worktrees: &[proto::WorktreeMetadata],
1910 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
1911 self.room_transaction(|tx| async move {
1912 let project = project::Entity::find_by_id(project_id)
1913 .filter(
1914 Condition::all()
1915 .add(project::Column::HostConnectionId.eq(connection.id as i32))
1916 .add(project::Column::HostConnectionServerId.eq(connection.epoch as i32)),
1917 )
1918 .one(&*tx)
1919 .await?
1920 .ok_or_else(|| anyhow!("no such project"))?;
1921
1922 if !worktrees.is_empty() {
1923 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
1924 worktree::ActiveModel {
1925 id: ActiveValue::set(worktree.id as i64),
1926 project_id: ActiveValue::set(project.id),
1927 abs_path: ActiveValue::set(worktree.abs_path.clone()),
1928 root_name: ActiveValue::set(worktree.root_name.clone()),
1929 visible: ActiveValue::set(worktree.visible),
1930 scan_id: ActiveValue::set(0),
1931 is_complete: ActiveValue::set(false),
1932 }
1933 }))
1934 .on_conflict(
1935 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
1936 .update_column(worktree::Column::RootName)
1937 .to_owned(),
1938 )
1939 .exec(&*tx)
1940 .await?;
1941 }
1942
1943 worktree::Entity::delete_many()
1944 .filter(
1945 worktree::Column::ProjectId.eq(project.id).and(
1946 worktree::Column::Id
1947 .is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
1948 ),
1949 )
1950 .exec(&*tx)
1951 .await?;
1952
1953 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
1954 let room = self.get_room(project.room_id, &tx).await?;
1955 Ok((project.room_id, (room, guest_connection_ids)))
1956 })
1957 .await
1958 }
1959
1960 pub async fn update_worktree(
1961 &self,
1962 update: &proto::UpdateWorktree,
1963 connection: ConnectionId,
1964 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
1965 self.room_transaction(|tx| async move {
1966 let project_id = ProjectId::from_proto(update.project_id);
1967 let worktree_id = update.worktree_id as i64;
1968
1969 // Ensure the update comes from the host.
1970 let project = project::Entity::find_by_id(project_id)
1971 .filter(
1972 Condition::all()
1973 .add(project::Column::HostConnectionId.eq(connection.id as i32))
1974 .add(project::Column::HostConnectionServerId.eq(connection.epoch as i32)),
1975 )
1976 .one(&*tx)
1977 .await?
1978 .ok_or_else(|| anyhow!("no such project"))?;
1979 let room_id = project.room_id;
1980
1981 // Update metadata.
1982 worktree::Entity::update(worktree::ActiveModel {
1983 id: ActiveValue::set(worktree_id),
1984 project_id: ActiveValue::set(project_id),
1985 root_name: ActiveValue::set(update.root_name.clone()),
1986 scan_id: ActiveValue::set(update.scan_id as i64),
1987 is_complete: ActiveValue::set(update.is_last_update),
1988 abs_path: ActiveValue::set(update.abs_path.clone()),
1989 ..Default::default()
1990 })
1991 .exec(&*tx)
1992 .await?;
1993
1994 if !update.updated_entries.is_empty() {
1995 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
1996 let mtime = entry.mtime.clone().unwrap_or_default();
1997 worktree_entry::ActiveModel {
1998 project_id: ActiveValue::set(project_id),
1999 worktree_id: ActiveValue::set(worktree_id),
2000 id: ActiveValue::set(entry.id as i64),
2001 is_dir: ActiveValue::set(entry.is_dir),
2002 path: ActiveValue::set(entry.path.clone()),
2003 inode: ActiveValue::set(entry.inode as i64),
2004 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
2005 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
2006 is_symlink: ActiveValue::set(entry.is_symlink),
2007 is_ignored: ActiveValue::set(entry.is_ignored),
2008 }
2009 }))
2010 .on_conflict(
2011 OnConflict::columns([
2012 worktree_entry::Column::ProjectId,
2013 worktree_entry::Column::WorktreeId,
2014 worktree_entry::Column::Id,
2015 ])
2016 .update_columns([
2017 worktree_entry::Column::IsDir,
2018 worktree_entry::Column::Path,
2019 worktree_entry::Column::Inode,
2020 worktree_entry::Column::MtimeSeconds,
2021 worktree_entry::Column::MtimeNanos,
2022 worktree_entry::Column::IsSymlink,
2023 worktree_entry::Column::IsIgnored,
2024 ])
2025 .to_owned(),
2026 )
2027 .exec(&*tx)
2028 .await?;
2029 }
2030
2031 if !update.removed_entries.is_empty() {
2032 worktree_entry::Entity::delete_many()
2033 .filter(
2034 worktree_entry::Column::ProjectId
2035 .eq(project_id)
2036 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
2037 .and(
2038 worktree_entry::Column::Id
2039 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
2040 ),
2041 )
2042 .exec(&*tx)
2043 .await?;
2044 }
2045
2046 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2047 Ok((room_id, connection_ids))
2048 })
2049 .await
2050 }
2051
2052 pub async fn update_diagnostic_summary(
2053 &self,
2054 update: &proto::UpdateDiagnosticSummary,
2055 connection: ConnectionId,
2056 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2057 self.room_transaction(|tx| async move {
2058 let project_id = ProjectId::from_proto(update.project_id);
2059 let worktree_id = update.worktree_id as i64;
2060 let summary = update
2061 .summary
2062 .as_ref()
2063 .ok_or_else(|| anyhow!("invalid summary"))?;
2064
2065 // Ensure the update comes from the host.
2066 let project = project::Entity::find_by_id(project_id)
2067 .one(&*tx)
2068 .await?
2069 .ok_or_else(|| anyhow!("no such project"))?;
2070 let host_connection = ConnectionId {
2071 epoch: project.host_connection_server_id.0 as u32,
2072 id: project.host_connection_id as u32,
2073 };
2074 if host_connection != connection {
2075 return Err(anyhow!("can't update a project hosted by someone else"))?;
2076 }
2077
2078 // Update summary.
2079 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
2080 project_id: ActiveValue::set(project_id),
2081 worktree_id: ActiveValue::set(worktree_id),
2082 path: ActiveValue::set(summary.path.clone()),
2083 language_server_id: ActiveValue::set(summary.language_server_id as i64),
2084 error_count: ActiveValue::set(summary.error_count as i32),
2085 warning_count: ActiveValue::set(summary.warning_count as i32),
2086 ..Default::default()
2087 })
2088 .on_conflict(
2089 OnConflict::columns([
2090 worktree_diagnostic_summary::Column::ProjectId,
2091 worktree_diagnostic_summary::Column::WorktreeId,
2092 worktree_diagnostic_summary::Column::Path,
2093 ])
2094 .update_columns([
2095 worktree_diagnostic_summary::Column::LanguageServerId,
2096 worktree_diagnostic_summary::Column::ErrorCount,
2097 worktree_diagnostic_summary::Column::WarningCount,
2098 ])
2099 .to_owned(),
2100 )
2101 .exec(&*tx)
2102 .await?;
2103
2104 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2105 Ok((project.room_id, connection_ids))
2106 })
2107 .await
2108 }
2109
2110 pub async fn start_language_server(
2111 &self,
2112 update: &proto::StartLanguageServer,
2113 connection: ConnectionId,
2114 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
2115 self.room_transaction(|tx| async move {
2116 let project_id = ProjectId::from_proto(update.project_id);
2117 let server = update
2118 .server
2119 .as_ref()
2120 .ok_or_else(|| anyhow!("invalid language server"))?;
2121
2122 // Ensure the update comes from the host.
2123 let project = project::Entity::find_by_id(project_id)
2124 .one(&*tx)
2125 .await?
2126 .ok_or_else(|| anyhow!("no such project"))?;
2127 let host_connection = ConnectionId {
2128 epoch: project.host_connection_server_id.0 as u32,
2129 id: project.host_connection_id as u32,
2130 };
2131 if host_connection != connection {
2132 return Err(anyhow!("can't update a project hosted by someone else"))?;
2133 }
2134
2135 // Add the newly-started language server.
2136 language_server::Entity::insert(language_server::ActiveModel {
2137 project_id: ActiveValue::set(project_id),
2138 id: ActiveValue::set(server.id as i64),
2139 name: ActiveValue::set(server.name.clone()),
2140 ..Default::default()
2141 })
2142 .on_conflict(
2143 OnConflict::columns([
2144 language_server::Column::ProjectId,
2145 language_server::Column::Id,
2146 ])
2147 .update_column(language_server::Column::Name)
2148 .to_owned(),
2149 )
2150 .exec(&*tx)
2151 .await?;
2152
2153 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
2154 Ok((project.room_id, connection_ids))
2155 })
2156 .await
2157 }
2158
2159 pub async fn join_project(
2160 &self,
2161 project_id: ProjectId,
2162 connection: ConnectionId,
2163 ) -> Result<RoomGuard<(Project, ReplicaId)>> {
2164 self.room_transaction(|tx| async move {
2165 let participant = room_participant::Entity::find()
2166 .filter(
2167 Condition::all()
2168 .add(
2169 room_participant::Column::AnsweringConnectionId
2170 .eq(connection.id as i32),
2171 )
2172 .add(
2173 room_participant::Column::AnsweringConnectionServerId
2174 .eq(connection.epoch as i32),
2175 ),
2176 )
2177 .one(&*tx)
2178 .await?
2179 .ok_or_else(|| anyhow!("must join a room first"))?;
2180
2181 let project = project::Entity::find_by_id(project_id)
2182 .one(&*tx)
2183 .await?
2184 .ok_or_else(|| anyhow!("no such project"))?;
2185 if project.room_id != participant.room_id {
2186 return Err(anyhow!("no such project"))?;
2187 }
2188
2189 let mut collaborators = project
2190 .find_related(project_collaborator::Entity)
2191 .all(&*tx)
2192 .await?;
2193 let replica_ids = collaborators
2194 .iter()
2195 .map(|c| c.replica_id)
2196 .collect::<HashSet<_>>();
2197 let mut replica_id = ReplicaId(1);
2198 while replica_ids.contains(&replica_id) {
2199 replica_id.0 += 1;
2200 }
2201 let new_collaborator = project_collaborator::ActiveModel {
2202 project_id: ActiveValue::set(project_id),
2203 connection_id: ActiveValue::set(connection.id as i32),
2204 connection_server_id: ActiveValue::set(ServerId(connection.epoch as i32)),
2205 user_id: ActiveValue::set(participant.user_id),
2206 replica_id: ActiveValue::set(replica_id),
2207 is_host: ActiveValue::set(false),
2208 ..Default::default()
2209 }
2210 .insert(&*tx)
2211 .await?;
2212 collaborators.push(new_collaborator);
2213
2214 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
2215 let mut worktrees = db_worktrees
2216 .into_iter()
2217 .map(|db_worktree| {
2218 (
2219 db_worktree.id as u64,
2220 Worktree {
2221 id: db_worktree.id as u64,
2222 abs_path: db_worktree.abs_path,
2223 root_name: db_worktree.root_name,
2224 visible: db_worktree.visible,
2225 entries: Default::default(),
2226 diagnostic_summaries: Default::default(),
2227 scan_id: db_worktree.scan_id as u64,
2228 is_complete: db_worktree.is_complete,
2229 },
2230 )
2231 })
2232 .collect::<BTreeMap<_, _>>();
2233
2234 // Populate worktree entries.
2235 {
2236 let mut db_entries = worktree_entry::Entity::find()
2237 .filter(worktree_entry::Column::ProjectId.eq(project_id))
2238 .stream(&*tx)
2239 .await?;
2240 while let Some(db_entry) = db_entries.next().await {
2241 let db_entry = db_entry?;
2242 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
2243 worktree.entries.push(proto::Entry {
2244 id: db_entry.id as u64,
2245 is_dir: db_entry.is_dir,
2246 path: db_entry.path,
2247 inode: db_entry.inode as u64,
2248 mtime: Some(proto::Timestamp {
2249 seconds: db_entry.mtime_seconds as u64,
2250 nanos: db_entry.mtime_nanos as u32,
2251 }),
2252 is_symlink: db_entry.is_symlink,
2253 is_ignored: db_entry.is_ignored,
2254 });
2255 }
2256 }
2257 }
2258
2259 // Populate worktree diagnostic summaries.
2260 {
2261 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
2262 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
2263 .stream(&*tx)
2264 .await?;
2265 while let Some(db_summary) = db_summaries.next().await {
2266 let db_summary = db_summary?;
2267 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
2268 worktree
2269 .diagnostic_summaries
2270 .push(proto::DiagnosticSummary {
2271 path: db_summary.path,
2272 language_server_id: db_summary.language_server_id as u64,
2273 error_count: db_summary.error_count as u32,
2274 warning_count: db_summary.warning_count as u32,
2275 });
2276 }
2277 }
2278 }
2279
2280 // Populate language servers.
2281 let language_servers = project
2282 .find_related(language_server::Entity)
2283 .all(&*tx)
2284 .await?;
2285
2286 let room_id = project.room_id;
2287 let project = Project {
2288 collaborators,
2289 worktrees,
2290 language_servers: language_servers
2291 .into_iter()
2292 .map(|language_server| proto::LanguageServer {
2293 id: language_server.id as u64,
2294 name: language_server.name,
2295 })
2296 .collect(),
2297 };
2298 Ok((room_id, (project, replica_id as ReplicaId)))
2299 })
2300 .await
2301 }
2302
2303 pub async fn leave_project(
2304 &self,
2305 project_id: ProjectId,
2306 connection: ConnectionId,
2307 ) -> Result<RoomGuard<LeftProject>> {
2308 self.room_transaction(|tx| async move {
2309 let result = project_collaborator::Entity::delete_many()
2310 .filter(
2311 Condition::all()
2312 .add(project_collaborator::Column::ProjectId.eq(project_id))
2313 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
2314 .add(
2315 project_collaborator::Column::ConnectionServerId
2316 .eq(connection.epoch as i32),
2317 ),
2318 )
2319 .exec(&*tx)
2320 .await?;
2321 if result.rows_affected == 0 {
2322 Err(anyhow!("not a collaborator on this project"))?;
2323 }
2324
2325 let project = project::Entity::find_by_id(project_id)
2326 .one(&*tx)
2327 .await?
2328 .ok_or_else(|| anyhow!("no such project"))?;
2329 let collaborators = project
2330 .find_related(project_collaborator::Entity)
2331 .all(&*tx)
2332 .await?;
2333 let connection_ids = collaborators
2334 .into_iter()
2335 .map(|collaborator| ConnectionId {
2336 epoch: collaborator.connection_server_id.0 as u32,
2337 id: collaborator.connection_id as u32,
2338 })
2339 .collect();
2340
2341 let left_project = LeftProject {
2342 id: project_id,
2343 host_user_id: project.host_user_id,
2344 host_connection_id: ConnectionId {
2345 epoch: project.host_connection_server_id.0 as u32,
2346 id: project.host_connection_id as u32,
2347 },
2348 connection_ids,
2349 };
2350 Ok((project.room_id, left_project))
2351 })
2352 .await
2353 }
2354
2355 pub async fn project_collaborators(
2356 &self,
2357 project_id: ProjectId,
2358 connection: ConnectionId,
2359 ) -> Result<RoomGuard<Vec<project_collaborator::Model>>> {
2360 self.room_transaction(|tx| async move {
2361 let project = project::Entity::find_by_id(project_id)
2362 .one(&*tx)
2363 .await?
2364 .ok_or_else(|| anyhow!("no such project"))?;
2365 let collaborators = project_collaborator::Entity::find()
2366 .filter(project_collaborator::Column::ProjectId.eq(project_id))
2367 .all(&*tx)
2368 .await?;
2369
2370 if collaborators.iter().any(|collaborator| {
2371 let collaborator_connection = ConnectionId {
2372 epoch: collaborator.connection_server_id.0 as u32,
2373 id: collaborator.connection_id as u32,
2374 };
2375 collaborator_connection == connection
2376 }) {
2377 Ok((project.room_id, collaborators))
2378 } else {
2379 Err(anyhow!("no such project"))?
2380 }
2381 })
2382 .await
2383 }
2384
2385 pub async fn project_connection_ids(
2386 &self,
2387 project_id: ProjectId,
2388 connection_id: ConnectionId,
2389 ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
2390 self.room_transaction(|tx| async move {
2391 let project = project::Entity::find_by_id(project_id)
2392 .one(&*tx)
2393 .await?
2394 .ok_or_else(|| anyhow!("no such project"))?;
2395 let mut participants = project_collaborator::Entity::find()
2396 .filter(project_collaborator::Column::ProjectId.eq(project_id))
2397 .stream(&*tx)
2398 .await?;
2399
2400 let mut connection_ids = HashSet::default();
2401 while let Some(participant) = participants.next().await {
2402 let participant = participant?;
2403 connection_ids.insert(ConnectionId {
2404 epoch: participant.connection_server_id.0 as u32,
2405 id: participant.connection_id as u32,
2406 });
2407 }
2408
2409 if connection_ids.contains(&connection_id) {
2410 Ok((project.room_id, connection_ids))
2411 } else {
2412 Err(anyhow!("no such project"))?
2413 }
2414 })
2415 .await
2416 }
2417
2418 async fn project_guest_connection_ids(
2419 &self,
2420 project_id: ProjectId,
2421 tx: &DatabaseTransaction,
2422 ) -> Result<Vec<ConnectionId>> {
2423 let mut participants = project_collaborator::Entity::find()
2424 .filter(
2425 project_collaborator::Column::ProjectId
2426 .eq(project_id)
2427 .and(project_collaborator::Column::IsHost.eq(false)),
2428 )
2429 .stream(tx)
2430 .await?;
2431
2432 let mut guest_connection_ids = Vec::new();
2433 while let Some(participant) = participants.next().await {
2434 let participant = participant?;
2435 guest_connection_ids.push(ConnectionId {
2436 epoch: participant.connection_server_id.0 as u32,
2437 id: participant.connection_id as u32,
2438 });
2439 }
2440 Ok(guest_connection_ids)
2441 }
2442
2443 // access tokens
2444
2445 pub async fn create_access_token_hash(
2446 &self,
2447 user_id: UserId,
2448 access_token_hash: &str,
2449 max_access_token_count: usize,
2450 ) -> Result<()> {
2451 self.transaction(|tx| async {
2452 let tx = tx;
2453
2454 access_token::ActiveModel {
2455 user_id: ActiveValue::set(user_id),
2456 hash: ActiveValue::set(access_token_hash.into()),
2457 ..Default::default()
2458 }
2459 .insert(&*tx)
2460 .await?;
2461
2462 access_token::Entity::delete_many()
2463 .filter(
2464 access_token::Column::Id.in_subquery(
2465 Query::select()
2466 .column(access_token::Column::Id)
2467 .from(access_token::Entity)
2468 .and_where(access_token::Column::UserId.eq(user_id))
2469 .order_by(access_token::Column::Id, sea_orm::Order::Desc)
2470 .limit(10000)
2471 .offset(max_access_token_count as u64)
2472 .to_owned(),
2473 ),
2474 )
2475 .exec(&*tx)
2476 .await?;
2477 Ok(())
2478 })
2479 .await
2480 }
2481
2482 pub async fn get_access_token_hashes(&self, user_id: UserId) -> Result<Vec<String>> {
2483 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
2484 enum QueryAs {
2485 Hash,
2486 }
2487
2488 self.transaction(|tx| async move {
2489 Ok(access_token::Entity::find()
2490 .select_only()
2491 .column(access_token::Column::Hash)
2492 .filter(access_token::Column::UserId.eq(user_id))
2493 .order_by_desc(access_token::Column::Id)
2494 .into_values::<_, QueryAs>()
2495 .all(&*tx)
2496 .await?)
2497 })
2498 .await
2499 }
2500
2501 async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
2502 where
2503 F: Send + Fn(TransactionHandle) -> Fut,
2504 Fut: Send + Future<Output = Result<T>>,
2505 {
2506 let body = async {
2507 loop {
2508 let (tx, result) = self.with_transaction(&f).await?;
2509 match result {
2510 Ok(result) => {
2511 match tx.commit().await.map_err(Into::into) {
2512 Ok(()) => return Ok(result),
2513 Err(error) => {
2514 if is_serialization_error(&error) {
2515 // Retry (don't break the loop)
2516 } else {
2517 return Err(error);
2518 }
2519 }
2520 }
2521 }
2522 Err(error) => {
2523 tx.rollback().await?;
2524 if is_serialization_error(&error) {
2525 // Retry (don't break the loop)
2526 } else {
2527 return Err(error);
2528 }
2529 }
2530 }
2531 }
2532 };
2533
2534 self.run(body).await
2535 }
2536
2537 async fn room_transaction<F, Fut, T>(&self, f: F) -> Result<RoomGuard<T>>
2538 where
2539 F: Send + Fn(TransactionHandle) -> Fut,
2540 Fut: Send + Future<Output = Result<(RoomId, T)>>,
2541 {
2542 let body = async {
2543 loop {
2544 let (tx, result) = self.with_transaction(&f).await?;
2545 match result {
2546 Ok((room_id, data)) => {
2547 let lock = self.rooms.entry(room_id).or_default().clone();
2548 let _guard = lock.lock_owned().await;
2549 match tx.commit().await.map_err(Into::into) {
2550 Ok(()) => {
2551 return Ok(RoomGuard {
2552 data,
2553 _guard,
2554 _not_send: PhantomData,
2555 });
2556 }
2557 Err(error) => {
2558 if is_serialization_error(&error) {
2559 // Retry (don't break the loop)
2560 } else {
2561 return Err(error);
2562 }
2563 }
2564 }
2565 }
2566 Err(error) => {
2567 tx.rollback().await?;
2568 if is_serialization_error(&error) {
2569 // Retry (don't break the loop)
2570 } else {
2571 return Err(error);
2572 }
2573 }
2574 }
2575 }
2576 };
2577
2578 self.run(body).await
2579 }
2580
2581 async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
2582 where
2583 F: Send + Fn(TransactionHandle) -> Fut,
2584 Fut: Send + Future<Output = Result<T>>,
2585 {
2586 let tx = self
2587 .pool
2588 .begin_with_config(Some(IsolationLevel::Serializable), None)
2589 .await?;
2590
2591 let mut tx = Arc::new(Some(tx));
2592 let result = f(TransactionHandle(tx.clone())).await;
2593 let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
2594 return Err(anyhow!("couldn't complete transaction because it's still in use"))?;
2595 };
2596
2597 Ok((tx, result))
2598 }
2599
2600 async fn run<F, T>(&self, future: F) -> T
2601 where
2602 F: Future<Output = T>,
2603 {
2604 #[cfg(test)]
2605 {
2606 if let Some(background) = self.background.as_ref() {
2607 background.simulate_random_delay().await;
2608 }
2609
2610 self.runtime.as_ref().unwrap().block_on(future)
2611 }
2612
2613 #[cfg(not(test))]
2614 {
2615 future.await
2616 }
2617 }
2618}
2619
2620fn is_serialization_error(error: &Error) -> bool {
2621 const SERIALIZATION_FAILURE_CODE: &'static str = "40001";
2622 match error {
2623 Error::Database(
2624 DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
2625 | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
2626 ) if error
2627 .as_database_error()
2628 .and_then(|error| error.code())
2629 .as_deref()
2630 == Some(SERIALIZATION_FAILURE_CODE) =>
2631 {
2632 true
2633 }
2634 _ => false,
2635 }
2636}
2637
2638struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
2639
2640impl Deref for TransactionHandle {
2641 type Target = DatabaseTransaction;
2642
2643 fn deref(&self) -> &Self::Target {
2644 self.0.as_ref().as_ref().unwrap()
2645 }
2646}
2647
2648pub struct RoomGuard<T> {
2649 data: T,
2650 _guard: OwnedMutexGuard<()>,
2651 _not_send: PhantomData<Rc<()>>,
2652}
2653
2654impl<T> Deref for RoomGuard<T> {
2655 type Target = T;
2656
2657 fn deref(&self) -> &T {
2658 &self.data
2659 }
2660}
2661
2662impl<T> DerefMut for RoomGuard<T> {
2663 fn deref_mut(&mut self) -> &mut T {
2664 &mut self.data
2665 }
2666}
2667
2668#[derive(Debug, Serialize, Deserialize)]
2669pub struct NewUserParams {
2670 pub github_login: String,
2671 pub github_user_id: i32,
2672 pub invite_count: i32,
2673}
2674
2675#[derive(Debug)]
2676pub struct NewUserResult {
2677 pub user_id: UserId,
2678 pub metrics_id: String,
2679 pub inviting_user_id: Option<UserId>,
2680 pub signup_device_id: Option<String>,
2681}
2682
2683fn random_invite_code() -> String {
2684 nanoid::nanoid!(16)
2685}
2686
2687fn random_email_confirmation_code() -> String {
2688 nanoid::nanoid!(64)
2689}
2690
2691macro_rules! id_type {
2692 ($name:ident) => {
2693 #[derive(
2694 Clone,
2695 Copy,
2696 Debug,
2697 Default,
2698 PartialEq,
2699 Eq,
2700 PartialOrd,
2701 Ord,
2702 Hash,
2703 Serialize,
2704 Deserialize,
2705 )]
2706 #[serde(transparent)]
2707 pub struct $name(pub i32);
2708
2709 impl $name {
2710 #[allow(unused)]
2711 pub const MAX: Self = Self(i32::MAX);
2712
2713 #[allow(unused)]
2714 pub fn from_proto(value: u64) -> Self {
2715 Self(value as i32)
2716 }
2717
2718 #[allow(unused)]
2719 pub fn to_proto(self) -> u64 {
2720 self.0 as u64
2721 }
2722 }
2723
2724 impl std::fmt::Display for $name {
2725 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2726 self.0.fmt(f)
2727 }
2728 }
2729
2730 impl From<$name> for sea_query::Value {
2731 fn from(value: $name) -> Self {
2732 sea_query::Value::Int(Some(value.0))
2733 }
2734 }
2735
2736 impl sea_orm::TryGetable for $name {
2737 fn try_get(
2738 res: &sea_orm::QueryResult,
2739 pre: &str,
2740 col: &str,
2741 ) -> Result<Self, sea_orm::TryGetError> {
2742 Ok(Self(i32::try_get(res, pre, col)?))
2743 }
2744 }
2745
2746 impl sea_query::ValueType for $name {
2747 fn try_from(v: Value) -> Result<Self, sea_query::ValueTypeErr> {
2748 match v {
2749 Value::TinyInt(Some(int)) => {
2750 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2751 }
2752 Value::SmallInt(Some(int)) => {
2753 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2754 }
2755 Value::Int(Some(int)) => {
2756 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2757 }
2758 Value::BigInt(Some(int)) => {
2759 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2760 }
2761 Value::TinyUnsigned(Some(int)) => {
2762 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2763 }
2764 Value::SmallUnsigned(Some(int)) => {
2765 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2766 }
2767 Value::Unsigned(Some(int)) => {
2768 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2769 }
2770 Value::BigUnsigned(Some(int)) => {
2771 Ok(Self(int.try_into().map_err(|_| sea_query::ValueTypeErr)?))
2772 }
2773 _ => Err(sea_query::ValueTypeErr),
2774 }
2775 }
2776
2777 fn type_name() -> String {
2778 stringify!($name).into()
2779 }
2780
2781 fn array_type() -> sea_query::ArrayType {
2782 sea_query::ArrayType::Int
2783 }
2784
2785 fn column_type() -> sea_query::ColumnType {
2786 sea_query::ColumnType::Integer(None)
2787 }
2788 }
2789
2790 impl sea_orm::TryFromU64 for $name {
2791 fn try_from_u64(n: u64) -> Result<Self, DbErr> {
2792 Ok(Self(n.try_into().map_err(|_| {
2793 DbErr::ConvertFromU64(concat!(
2794 "error converting ",
2795 stringify!($name),
2796 " to u64"
2797 ))
2798 })?))
2799 }
2800 }
2801
2802 impl sea_query::Nullable for $name {
2803 fn null() -> Value {
2804 Value::Int(None)
2805 }
2806 }
2807 };
2808}
2809
2810id_type!(AccessTokenId);
2811id_type!(ContactId);
2812id_type!(RoomId);
2813id_type!(RoomParticipantId);
2814id_type!(ProjectId);
2815id_type!(ProjectCollaboratorId);
2816id_type!(ReplicaId);
2817id_type!(ServerId);
2818id_type!(SignupId);
2819id_type!(UserId);
2820
2821pub struct LeftRoom {
2822 pub room: proto::Room,
2823 pub left_projects: HashMap<ProjectId, LeftProject>,
2824 pub canceled_calls_to_user_ids: Vec<UserId>,
2825}
2826
2827pub struct RefreshedRoom {
2828 pub room: proto::Room,
2829 pub stale_participant_user_ids: Vec<UserId>,
2830 pub canceled_calls_to_user_ids: Vec<UserId>,
2831}
2832
2833pub struct Project {
2834 pub collaborators: Vec<project_collaborator::Model>,
2835 pub worktrees: BTreeMap<u64, Worktree>,
2836 pub language_servers: Vec<proto::LanguageServer>,
2837}
2838
2839pub struct LeftProject {
2840 pub id: ProjectId,
2841 pub host_user_id: UserId,
2842 pub host_connection_id: ConnectionId,
2843 pub connection_ids: Vec<ConnectionId>,
2844}
2845
2846pub struct Worktree {
2847 pub id: u64,
2848 pub abs_path: String,
2849 pub root_name: String,
2850 pub visible: bool,
2851 pub entries: Vec<proto::Entry>,
2852 pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
2853 pub scan_id: u64,
2854 pub is_complete: bool,
2855}
2856
2857#[cfg(test)]
2858pub use test::*;
2859
2860#[cfg(test)]
2861mod test {
2862 use super::*;
2863 use gpui::executor::Background;
2864 use lazy_static::lazy_static;
2865 use parking_lot::Mutex;
2866 use rand::prelude::*;
2867 use sea_orm::ConnectionTrait;
2868 use sqlx::migrate::MigrateDatabase;
2869 use std::sync::Arc;
2870
2871 pub struct TestDb {
2872 pub db: Option<Arc<Database>>,
2873 pub connection: Option<sqlx::AnyConnection>,
2874 }
2875
2876 impl TestDb {
2877 pub fn sqlite(background: Arc<Background>) -> Self {
2878 let url = format!("sqlite::memory:");
2879 let runtime = tokio::runtime::Builder::new_current_thread()
2880 .enable_io()
2881 .enable_time()
2882 .build()
2883 .unwrap();
2884
2885 let mut db = runtime.block_on(async {
2886 let mut options = ConnectOptions::new(url);
2887 options.max_connections(5);
2888 let db = Database::new(options).await.unwrap();
2889 let sql = include_str!(concat!(
2890 env!("CARGO_MANIFEST_DIR"),
2891 "/migrations.sqlite/20221109000000_test_schema.sql"
2892 ));
2893 db.pool
2894 .execute(sea_orm::Statement::from_string(
2895 db.pool.get_database_backend(),
2896 sql.into(),
2897 ))
2898 .await
2899 .unwrap();
2900 db
2901 });
2902
2903 db.background = Some(background);
2904 db.runtime = Some(runtime);
2905
2906 Self {
2907 db: Some(Arc::new(db)),
2908 connection: None,
2909 }
2910 }
2911
2912 pub fn postgres(background: Arc<Background>) -> Self {
2913 lazy_static! {
2914 static ref LOCK: Mutex<()> = Mutex::new(());
2915 }
2916
2917 let _guard = LOCK.lock();
2918 let mut rng = StdRng::from_entropy();
2919 let url = format!(
2920 "postgres://postgres@localhost/zed-test-{}",
2921 rng.gen::<u128>()
2922 );
2923 let runtime = tokio::runtime::Builder::new_current_thread()
2924 .enable_io()
2925 .enable_time()
2926 .build()
2927 .unwrap();
2928
2929 let mut db = runtime.block_on(async {
2930 sqlx::Postgres::create_database(&url)
2931 .await
2932 .expect("failed to create test db");
2933 let mut options = ConnectOptions::new(url);
2934 options
2935 .max_connections(5)
2936 .idle_timeout(Duration::from_secs(0));
2937 let db = Database::new(options).await.unwrap();
2938 let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
2939 db.migrate(Path::new(migrations_path), false).await.unwrap();
2940 db
2941 });
2942
2943 db.background = Some(background);
2944 db.runtime = Some(runtime);
2945
2946 Self {
2947 db: Some(Arc::new(db)),
2948 connection: None,
2949 }
2950 }
2951
2952 pub fn db(&self) -> &Arc<Database> {
2953 self.db.as_ref().unwrap()
2954 }
2955 }
2956
2957 impl Drop for TestDb {
2958 fn drop(&mut self) {
2959 let db = self.db.take().unwrap();
2960 if let sea_orm::DatabaseBackend::Postgres = db.pool.get_database_backend() {
2961 db.runtime.as_ref().unwrap().block_on(async {
2962 use util::ResultExt;
2963 let query = "
2964 SELECT pg_terminate_backend(pg_stat_activity.pid)
2965 FROM pg_stat_activity
2966 WHERE
2967 pg_stat_activity.datname = current_database() AND
2968 pid <> pg_backend_pid();
2969 ";
2970 db.pool
2971 .execute(sea_orm::Statement::from_string(
2972 db.pool.get_database_backend(),
2973 query.into(),
2974 ))
2975 .await
2976 .log_err();
2977 sqlx::Postgres::drop_database(db.options.get_url())
2978 .await
2979 .log_err();
2980 })
2981 }
2982 }
2983 }
2984}