1use anyhow::Context as _;
2use collections::HashSet;
3use util::ResultExt;
4
5use super::*;
6
7impl Database {
8 /// Returns the count of all projects, excluding ones marked as admin.
9 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
10 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
11 enum QueryAs {
12 Count,
13 }
14
15 self.transaction(|tx| async move {
16 Ok(project::Entity::find()
17 .select_only()
18 .column_as(project::Column::Id.count(), QueryAs::Count)
19 .inner_join(user::Entity)
20 .filter(user::Column::Admin.eq(false))
21 .into_values::<_, QueryAs>()
22 .one(&*tx)
23 .await?
24 .unwrap_or(0i64) as usize)
25 })
26 .await
27 }
28
29 /// Shares a project with the given room.
30 pub async fn share_project(
31 &self,
32 room_id: RoomId,
33 connection: ConnectionId,
34 worktrees: &[proto::WorktreeMetadata],
35 is_ssh_project: bool,
36 ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
37 self.room_transaction(room_id, |tx| async move {
38 let participant = room_participant::Entity::find()
39 .filter(
40 Condition::all()
41 .add(
42 room_participant::Column::AnsweringConnectionId
43 .eq(connection.id as i32),
44 )
45 .add(
46 room_participant::Column::AnsweringConnectionServerId
47 .eq(connection.owner_id as i32),
48 ),
49 )
50 .one(&*tx)
51 .await?
52 .ok_or_else(|| anyhow!("could not find participant"))?;
53 if participant.room_id != room_id {
54 return Err(anyhow!("shared project on unexpected room"))?;
55 }
56 if !participant
57 .role
58 .unwrap_or(ChannelRole::Member)
59 .can_edit_projects()
60 {
61 return Err(anyhow!("guests cannot share projects"))?;
62 }
63
64 let project = project::ActiveModel {
65 room_id: ActiveValue::set(Some(participant.room_id)),
66 host_user_id: ActiveValue::set(Some(participant.user_id)),
67 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
68 host_connection_server_id: ActiveValue::set(Some(ServerId(
69 connection.owner_id as i32,
70 ))),
71 id: ActiveValue::NotSet,
72 }
73 .insert(&*tx)
74 .await?;
75
76 if !worktrees.is_empty() {
77 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
78 worktree::ActiveModel {
79 id: ActiveValue::set(worktree.id as i64),
80 project_id: ActiveValue::set(project.id),
81 abs_path: ActiveValue::set(worktree.abs_path.clone()),
82 root_name: ActiveValue::set(worktree.root_name.clone()),
83 visible: ActiveValue::set(worktree.visible),
84 scan_id: ActiveValue::set(0),
85 completed_scan_id: ActiveValue::set(0),
86 }
87 }))
88 .exec(&*tx)
89 .await?;
90 }
91
92 let replica_id = if is_ssh_project { 1 } else { 0 };
93
94 project_collaborator::ActiveModel {
95 project_id: ActiveValue::set(project.id),
96 connection_id: ActiveValue::set(connection.id as i32),
97 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
98 user_id: ActiveValue::set(participant.user_id),
99 replica_id: ActiveValue::set(ReplicaId(replica_id)),
100 is_host: ActiveValue::set(true),
101 ..Default::default()
102 }
103 .insert(&*tx)
104 .await?;
105
106 let room = self.get_room(room_id, &tx).await?;
107 Ok((project.id, room))
108 })
109 .await
110 }
111
112 pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
113 self.weak_transaction(|tx| async move {
114 project::Entity::delete_by_id(project_id).exec(&*tx).await?;
115 Ok(())
116 })
117 .await
118 }
119
120 /// Unshares the given project.
121 pub async fn unshare_project(
122 &self,
123 project_id: ProjectId,
124 connection: ConnectionId,
125 ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
126 self.project_transaction(project_id, |tx| async move {
127 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
128 let project = project::Entity::find_by_id(project_id)
129 .one(&*tx)
130 .await?
131 .ok_or_else(|| anyhow!("project not found"))?;
132 let room = if let Some(room_id) = project.room_id {
133 Some(self.get_room(room_id, &tx).await?)
134 } else {
135 None
136 };
137 if project.host_connection()? == connection {
138 return Ok((true, room, guest_connection_ids));
139 }
140 Err(anyhow!("cannot unshare a project hosted by another user"))?
141 })
142 .await
143 }
144
145 /// Updates the worktrees associated with the given project.
146 pub async fn update_project(
147 &self,
148 project_id: ProjectId,
149 connection: ConnectionId,
150 worktrees: &[proto::WorktreeMetadata],
151 ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
152 self.project_transaction(project_id, |tx| async move {
153 let project = project::Entity::find_by_id(project_id)
154 .filter(
155 Condition::all()
156 .add(project::Column::HostConnectionId.eq(connection.id as i32))
157 .add(
158 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
159 ),
160 )
161 .one(&*tx)
162 .await?
163 .ok_or_else(|| anyhow!("no such project"))?;
164
165 self.update_project_worktrees(project.id, worktrees, &tx)
166 .await?;
167
168 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
169
170 let room = if let Some(room_id) = project.room_id {
171 Some(self.get_room(room_id, &tx).await?)
172 } else {
173 None
174 };
175
176 Ok((room, guest_connection_ids))
177 })
178 .await
179 }
180
181 pub(in crate::db) async fn update_project_worktrees(
182 &self,
183 project_id: ProjectId,
184 worktrees: &[proto::WorktreeMetadata],
185 tx: &DatabaseTransaction,
186 ) -> Result<()> {
187 if !worktrees.is_empty() {
188 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
189 id: ActiveValue::set(worktree.id as i64),
190 project_id: ActiveValue::set(project_id),
191 abs_path: ActiveValue::set(worktree.abs_path.clone()),
192 root_name: ActiveValue::set(worktree.root_name.clone()),
193 visible: ActiveValue::set(worktree.visible),
194 scan_id: ActiveValue::set(0),
195 completed_scan_id: ActiveValue::set(0),
196 }))
197 .on_conflict(
198 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
199 .update_column(worktree::Column::RootName)
200 .to_owned(),
201 )
202 .exec(tx)
203 .await?;
204 }
205
206 worktree::Entity::delete_many()
207 .filter(worktree::Column::ProjectId.eq(project_id).and(
208 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
209 ))
210 .exec(tx)
211 .await?;
212
213 Ok(())
214 }
215
216 pub async fn update_worktree(
217 &self,
218 update: &proto::UpdateWorktree,
219 connection: ConnectionId,
220 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
221 if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
222 || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
223 {
224 return Err(anyhow!(
225 "invalid worktree update. removed entries: {}, updated entries: {}",
226 update.removed_entries.len(),
227 update.updated_entries.len()
228 ))?;
229 }
230
231 let project_id = ProjectId::from_proto(update.project_id);
232 let worktree_id = update.worktree_id as i64;
233 self.project_transaction(project_id, |tx| async move {
234 // Ensure the update comes from the host.
235 let _project = project::Entity::find_by_id(project_id)
236 .filter(
237 Condition::all()
238 .add(project::Column::HostConnectionId.eq(connection.id as i32))
239 .add(
240 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
241 ),
242 )
243 .one(&*tx)
244 .await?
245 .ok_or_else(|| anyhow!("no such project: {project_id}"))?;
246
247 // Update metadata.
248 worktree::Entity::update(worktree::ActiveModel {
249 id: ActiveValue::set(worktree_id),
250 project_id: ActiveValue::set(project_id),
251 root_name: ActiveValue::set(update.root_name.clone()),
252 scan_id: ActiveValue::set(update.scan_id as i64),
253 completed_scan_id: if update.is_last_update {
254 ActiveValue::set(update.scan_id as i64)
255 } else {
256 ActiveValue::default()
257 },
258 abs_path: ActiveValue::set(update.abs_path.clone()),
259 ..Default::default()
260 })
261 .exec(&*tx)
262 .await?;
263
264 if !update.updated_entries.is_empty() {
265 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
266 let mtime = entry.mtime.clone().unwrap_or_default();
267 worktree_entry::ActiveModel {
268 project_id: ActiveValue::set(project_id),
269 worktree_id: ActiveValue::set(worktree_id),
270 id: ActiveValue::set(entry.id as i64),
271 is_dir: ActiveValue::set(entry.is_dir),
272 path: ActiveValue::set(entry.path.clone()),
273 inode: ActiveValue::set(entry.inode as i64),
274 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
275 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
276 canonical_path: ActiveValue::set(entry.canonical_path.clone()),
277 is_ignored: ActiveValue::set(entry.is_ignored),
278 git_status: ActiveValue::set(None),
279 is_external: ActiveValue::set(entry.is_external),
280 is_deleted: ActiveValue::set(false),
281 scan_id: ActiveValue::set(update.scan_id as i64),
282 is_fifo: ActiveValue::set(entry.is_fifo),
283 }
284 }))
285 .on_conflict(
286 OnConflict::columns([
287 worktree_entry::Column::ProjectId,
288 worktree_entry::Column::WorktreeId,
289 worktree_entry::Column::Id,
290 ])
291 .update_columns([
292 worktree_entry::Column::IsDir,
293 worktree_entry::Column::Path,
294 worktree_entry::Column::Inode,
295 worktree_entry::Column::MtimeSeconds,
296 worktree_entry::Column::MtimeNanos,
297 worktree_entry::Column::CanonicalPath,
298 worktree_entry::Column::IsIgnored,
299 worktree_entry::Column::ScanId,
300 ])
301 .to_owned(),
302 )
303 .exec(&*tx)
304 .await?;
305 }
306
307 if !update.removed_entries.is_empty() {
308 worktree_entry::Entity::update_many()
309 .filter(
310 worktree_entry::Column::ProjectId
311 .eq(project_id)
312 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
313 .and(
314 worktree_entry::Column::Id
315 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
316 ),
317 )
318 .set(worktree_entry::ActiveModel {
319 is_deleted: ActiveValue::Set(true),
320 scan_id: ActiveValue::Set(update.scan_id as i64),
321 ..Default::default()
322 })
323 .exec(&*tx)
324 .await?;
325 }
326
327 // Backward-compatibility for old Zed clients.
328 //
329 // Remove this block when Zed 1.80 stable has been out for a week.
330 {
331 if !update.updated_repositories.is_empty() {
332 project_repository::Entity::insert_many(
333 update.updated_repositories.iter().map(|repository| {
334 project_repository::ActiveModel {
335 project_id: ActiveValue::set(project_id),
336 legacy_worktree_id: ActiveValue::set(Some(worktree_id)),
337 id: ActiveValue::set(repository.repository_id as i64),
338 scan_id: ActiveValue::set(update.scan_id as i64),
339 is_deleted: ActiveValue::set(false),
340 branch_summary: ActiveValue::Set(
341 repository
342 .branch_summary
343 .as_ref()
344 .map(|summary| serde_json::to_string(summary).unwrap()),
345 ),
346 current_merge_conflicts: ActiveValue::Set(Some(
347 serde_json::to_string(&repository.current_merge_conflicts)
348 .unwrap(),
349 )),
350
351 // Old clients do not use abs path or entry ids.
352 abs_path: ActiveValue::set(String::new()),
353 entry_ids: ActiveValue::set("[]".into()),
354 }
355 }),
356 )
357 .on_conflict(
358 OnConflict::columns([
359 project_repository::Column::ProjectId,
360 project_repository::Column::Id,
361 ])
362 .update_columns([
363 project_repository::Column::ScanId,
364 project_repository::Column::BranchSummary,
365 project_repository::Column::CurrentMergeConflicts,
366 ])
367 .to_owned(),
368 )
369 .exec(&*tx)
370 .await?;
371
372 let has_any_statuses = update
373 .updated_repositories
374 .iter()
375 .any(|repository| !repository.updated_statuses.is_empty());
376
377 if has_any_statuses {
378 project_repository_statuses::Entity::insert_many(
379 update.updated_repositories.iter().flat_map(
380 |repository: &proto::RepositoryEntry| {
381 repository.updated_statuses.iter().map(|status_entry| {
382 let (repo_path, status_kind, first_status, second_status) =
383 proto_status_to_db(status_entry.clone());
384 project_repository_statuses::ActiveModel {
385 project_id: ActiveValue::set(project_id),
386 repository_id: ActiveValue::set(
387 repository.repository_id as i64,
388 ),
389 scan_id: ActiveValue::set(update.scan_id as i64),
390 is_deleted: ActiveValue::set(false),
391 repo_path: ActiveValue::set(repo_path),
392 status: ActiveValue::set(0),
393 status_kind: ActiveValue::set(status_kind),
394 first_status: ActiveValue::set(first_status),
395 second_status: ActiveValue::set(second_status),
396 }
397 })
398 },
399 ),
400 )
401 .on_conflict(
402 OnConflict::columns([
403 project_repository_statuses::Column::ProjectId,
404 project_repository_statuses::Column::RepositoryId,
405 project_repository_statuses::Column::RepoPath,
406 ])
407 .update_columns([
408 project_repository_statuses::Column::ScanId,
409 project_repository_statuses::Column::StatusKind,
410 project_repository_statuses::Column::FirstStatus,
411 project_repository_statuses::Column::SecondStatus,
412 ])
413 .to_owned(),
414 )
415 .exec(&*tx)
416 .await?;
417 }
418
419 for repo in &update.updated_repositories {
420 if !repo.removed_statuses.is_empty() {
421 project_repository_statuses::Entity::update_many()
422 .filter(
423 project_repository_statuses::Column::ProjectId
424 .eq(project_id)
425 .and(
426 project_repository_statuses::Column::RepositoryId
427 .eq(repo.repository_id),
428 )
429 .and(
430 project_repository_statuses::Column::RepoPath
431 .is_in(repo.removed_statuses.iter()),
432 ),
433 )
434 .set(project_repository_statuses::ActiveModel {
435 is_deleted: ActiveValue::Set(true),
436 scan_id: ActiveValue::Set(update.scan_id as i64),
437 ..Default::default()
438 })
439 .exec(&*tx)
440 .await?;
441 }
442 }
443 }
444
445 if !update.removed_repositories.is_empty() {
446 project_repository::Entity::update_many()
447 .filter(
448 project_repository::Column::ProjectId
449 .eq(project_id)
450 .and(project_repository::Column::LegacyWorktreeId.eq(worktree_id))
451 .and(project_repository::Column::Id.is_in(
452 update.removed_repositories.iter().map(|id| *id as i64),
453 )),
454 )
455 .set(project_repository::ActiveModel {
456 is_deleted: ActiveValue::Set(true),
457 scan_id: ActiveValue::Set(update.scan_id as i64),
458 ..Default::default()
459 })
460 .exec(&*tx)
461 .await?;
462 }
463 }
464
465 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
466 Ok(connection_ids)
467 })
468 .await
469 }
470
471 pub async fn update_repository(
472 &self,
473 update: &proto::UpdateRepository,
474 _connection: ConnectionId,
475 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
476 let project_id = ProjectId::from_proto(update.project_id);
477 let repository_id = update.id as i64;
478 self.project_transaction(project_id, |tx| async move {
479 project_repository::Entity::insert(project_repository::ActiveModel {
480 project_id: ActiveValue::set(project_id),
481 id: ActiveValue::set(repository_id),
482 legacy_worktree_id: ActiveValue::set(None),
483 abs_path: ActiveValue::set(update.abs_path.clone()),
484 entry_ids: ActiveValue::Set(serde_json::to_string(&update.entry_ids).unwrap()),
485 scan_id: ActiveValue::set(update.scan_id as i64),
486 is_deleted: ActiveValue::set(false),
487 branch_summary: ActiveValue::Set(
488 update
489 .branch_summary
490 .as_ref()
491 .map(|summary| serde_json::to_string(summary).unwrap()),
492 ),
493 current_merge_conflicts: ActiveValue::Set(Some(
494 serde_json::to_string(&update.current_merge_conflicts).unwrap(),
495 )),
496 })
497 .on_conflict(
498 OnConflict::columns([
499 project_repository::Column::ProjectId,
500 project_repository::Column::Id,
501 ])
502 .update_columns([
503 project_repository::Column::ScanId,
504 project_repository::Column::BranchSummary,
505 project_repository::Column::EntryIds,
506 project_repository::Column::AbsPath,
507 project_repository::Column::CurrentMergeConflicts,
508 ])
509 .to_owned(),
510 )
511 .exec(&*tx)
512 .await?;
513
514 let has_any_statuses = !update.updated_statuses.is_empty();
515
516 if has_any_statuses {
517 project_repository_statuses::Entity::insert_many(
518 update.updated_statuses.iter().map(|status_entry| {
519 let (repo_path, status_kind, first_status, second_status) =
520 proto_status_to_db(status_entry.clone());
521 project_repository_statuses::ActiveModel {
522 project_id: ActiveValue::set(project_id),
523 repository_id: ActiveValue::set(repository_id),
524 scan_id: ActiveValue::set(update.scan_id as i64),
525 is_deleted: ActiveValue::set(false),
526 repo_path: ActiveValue::set(repo_path),
527 status: ActiveValue::set(0),
528 status_kind: ActiveValue::set(status_kind),
529 first_status: ActiveValue::set(first_status),
530 second_status: ActiveValue::set(second_status),
531 }
532 }),
533 )
534 .on_conflict(
535 OnConflict::columns([
536 project_repository_statuses::Column::ProjectId,
537 project_repository_statuses::Column::RepositoryId,
538 project_repository_statuses::Column::RepoPath,
539 ])
540 .update_columns([
541 project_repository_statuses::Column::ScanId,
542 project_repository_statuses::Column::StatusKind,
543 project_repository_statuses::Column::FirstStatus,
544 project_repository_statuses::Column::SecondStatus,
545 ])
546 .to_owned(),
547 )
548 .exec(&*tx)
549 .await?;
550 }
551
552 let has_any_removed_statuses = !update.removed_statuses.is_empty();
553
554 if has_any_removed_statuses {
555 project_repository_statuses::Entity::update_many()
556 .filter(
557 project_repository_statuses::Column::ProjectId
558 .eq(project_id)
559 .and(
560 project_repository_statuses::Column::RepositoryId.eq(repository_id),
561 )
562 .and(
563 project_repository_statuses::Column::RepoPath
564 .is_in(update.removed_statuses.iter()),
565 ),
566 )
567 .set(project_repository_statuses::ActiveModel {
568 is_deleted: ActiveValue::Set(true),
569 scan_id: ActiveValue::Set(update.scan_id as i64),
570 ..Default::default()
571 })
572 .exec(&*tx)
573 .await?;
574 }
575
576 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
577 Ok(connection_ids)
578 })
579 .await
580 }
581
582 pub async fn remove_repository(
583 &self,
584 remove: &proto::RemoveRepository,
585 _connection: ConnectionId,
586 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
587 let project_id = ProjectId::from_proto(remove.project_id);
588 let repository_id = remove.id as i64;
589 self.project_transaction(project_id, |tx| async move {
590 project_repository::Entity::update_many()
591 .filter(
592 project_repository::Column::ProjectId
593 .eq(project_id)
594 .and(project_repository::Column::Id.eq(repository_id)),
595 )
596 .set(project_repository::ActiveModel {
597 is_deleted: ActiveValue::Set(true),
598 // scan_id: ActiveValue::Set(update.scan_id as i64),
599 ..Default::default()
600 })
601 .exec(&*tx)
602 .await?;
603
604 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
605 Ok(connection_ids)
606 })
607 .await
608 }
609
610 /// Updates the diagnostic summary for the given connection.
611 pub async fn update_diagnostic_summary(
612 &self,
613 update: &proto::UpdateDiagnosticSummary,
614 connection: ConnectionId,
615 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
616 let project_id = ProjectId::from_proto(update.project_id);
617 let worktree_id = update.worktree_id as i64;
618 self.project_transaction(project_id, |tx| async move {
619 let summary = update
620 .summary
621 .as_ref()
622 .ok_or_else(|| anyhow!("invalid summary"))?;
623
624 // Ensure the update comes from the host.
625 let project = project::Entity::find_by_id(project_id)
626 .one(&*tx)
627 .await?
628 .ok_or_else(|| anyhow!("no such project"))?;
629 if project.host_connection()? != connection {
630 return Err(anyhow!("can't update a project hosted by someone else"))?;
631 }
632
633 // Update summary.
634 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
635 project_id: ActiveValue::set(project_id),
636 worktree_id: ActiveValue::set(worktree_id),
637 path: ActiveValue::set(summary.path.clone()),
638 language_server_id: ActiveValue::set(summary.language_server_id as i64),
639 error_count: ActiveValue::set(summary.error_count as i32),
640 warning_count: ActiveValue::set(summary.warning_count as i32),
641 })
642 .on_conflict(
643 OnConflict::columns([
644 worktree_diagnostic_summary::Column::ProjectId,
645 worktree_diagnostic_summary::Column::WorktreeId,
646 worktree_diagnostic_summary::Column::Path,
647 ])
648 .update_columns([
649 worktree_diagnostic_summary::Column::LanguageServerId,
650 worktree_diagnostic_summary::Column::ErrorCount,
651 worktree_diagnostic_summary::Column::WarningCount,
652 ])
653 .to_owned(),
654 )
655 .exec(&*tx)
656 .await?;
657
658 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
659 Ok(connection_ids)
660 })
661 .await
662 }
663
664 /// Starts the language server for the given connection.
665 pub async fn start_language_server(
666 &self,
667 update: &proto::StartLanguageServer,
668 connection: ConnectionId,
669 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
670 let project_id = ProjectId::from_proto(update.project_id);
671 self.project_transaction(project_id, |tx| async move {
672 let server = update
673 .server
674 .as_ref()
675 .ok_or_else(|| anyhow!("invalid language server"))?;
676
677 // Ensure the update comes from the host.
678 let project = project::Entity::find_by_id(project_id)
679 .one(&*tx)
680 .await?
681 .ok_or_else(|| anyhow!("no such project"))?;
682 if project.host_connection()? != connection {
683 return Err(anyhow!("can't update a project hosted by someone else"))?;
684 }
685
686 // Add the newly-started language server.
687 language_server::Entity::insert(language_server::ActiveModel {
688 project_id: ActiveValue::set(project_id),
689 id: ActiveValue::set(server.id as i64),
690 name: ActiveValue::set(server.name.clone()),
691 })
692 .on_conflict(
693 OnConflict::columns([
694 language_server::Column::ProjectId,
695 language_server::Column::Id,
696 ])
697 .update_column(language_server::Column::Name)
698 .to_owned(),
699 )
700 .exec(&*tx)
701 .await?;
702
703 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
704 Ok(connection_ids)
705 })
706 .await
707 }
708
709 /// Updates the worktree settings for the given connection.
710 pub async fn update_worktree_settings(
711 &self,
712 update: &proto::UpdateWorktreeSettings,
713 connection: ConnectionId,
714 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
715 let project_id = ProjectId::from_proto(update.project_id);
716 let kind = match update.kind {
717 Some(kind) => proto::LocalSettingsKind::from_i32(kind)
718 .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
719 None => proto::LocalSettingsKind::Settings,
720 };
721 let kind = LocalSettingsKind::from_proto(kind);
722 self.project_transaction(project_id, |tx| async move {
723 // Ensure the update comes from the host.
724 let project = project::Entity::find_by_id(project_id)
725 .one(&*tx)
726 .await?
727 .ok_or_else(|| anyhow!("no such project"))?;
728 if project.host_connection()? != connection {
729 return Err(anyhow!("can't update a project hosted by someone else"))?;
730 }
731
732 if let Some(content) = &update.content {
733 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
734 project_id: ActiveValue::Set(project_id),
735 worktree_id: ActiveValue::Set(update.worktree_id as i64),
736 path: ActiveValue::Set(update.path.clone()),
737 content: ActiveValue::Set(content.clone()),
738 kind: ActiveValue::Set(kind),
739 })
740 .on_conflict(
741 OnConflict::columns([
742 worktree_settings_file::Column::ProjectId,
743 worktree_settings_file::Column::WorktreeId,
744 worktree_settings_file::Column::Path,
745 ])
746 .update_column(worktree_settings_file::Column::Content)
747 .to_owned(),
748 )
749 .exec(&*tx)
750 .await?;
751 } else {
752 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
753 project_id: ActiveValue::Set(project_id),
754 worktree_id: ActiveValue::Set(update.worktree_id as i64),
755 path: ActiveValue::Set(update.path.clone()),
756 ..Default::default()
757 })
758 .exec(&*tx)
759 .await?;
760 }
761
762 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
763 Ok(connection_ids)
764 })
765 .await
766 }
767
768 pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
769 self.transaction(|tx| async move {
770 Ok(project::Entity::find_by_id(id)
771 .one(&*tx)
772 .await?
773 .ok_or_else(|| anyhow!("no such project"))?)
774 })
775 .await
776 }
777
778 /// Adds the given connection to the specified project
779 /// in the current room.
780 pub async fn join_project(
781 &self,
782 project_id: ProjectId,
783 connection: ConnectionId,
784 user_id: UserId,
785 ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
786 self.project_transaction(project_id, |tx| async move {
787 let (project, role) = self
788 .access_project(project_id, connection, Capability::ReadOnly, &tx)
789 .await?;
790 self.join_project_internal(project, user_id, connection, role, &tx)
791 .await
792 })
793 .await
794 }
795
796 async fn join_project_internal(
797 &self,
798 project: project::Model,
799 user_id: UserId,
800 connection: ConnectionId,
801 role: ChannelRole,
802 tx: &DatabaseTransaction,
803 ) -> Result<(Project, ReplicaId)> {
804 let mut collaborators = project
805 .find_related(project_collaborator::Entity)
806 .all(tx)
807 .await?;
808 let replica_ids = collaborators
809 .iter()
810 .map(|c| c.replica_id)
811 .collect::<HashSet<_>>();
812 let mut replica_id = ReplicaId(1);
813 while replica_ids.contains(&replica_id) {
814 replica_id.0 += 1;
815 }
816 let new_collaborator = project_collaborator::ActiveModel {
817 project_id: ActiveValue::set(project.id),
818 connection_id: ActiveValue::set(connection.id as i32),
819 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
820 user_id: ActiveValue::set(user_id),
821 replica_id: ActiveValue::set(replica_id),
822 is_host: ActiveValue::set(false),
823 ..Default::default()
824 }
825 .insert(tx)
826 .await?;
827 collaborators.push(new_collaborator);
828
829 let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
830 let mut worktrees = db_worktrees
831 .into_iter()
832 .map(|db_worktree| {
833 (
834 db_worktree.id as u64,
835 Worktree {
836 id: db_worktree.id as u64,
837 abs_path: db_worktree.abs_path,
838 root_name: db_worktree.root_name,
839 visible: db_worktree.visible,
840 entries: Default::default(),
841 diagnostic_summaries: Default::default(),
842 settings_files: Default::default(),
843 scan_id: db_worktree.scan_id as u64,
844 completed_scan_id: db_worktree.completed_scan_id as u64,
845 legacy_repository_entries: Default::default(),
846 },
847 )
848 })
849 .collect::<BTreeMap<_, _>>();
850
851 // Populate worktree entries.
852 {
853 let mut db_entries = worktree_entry::Entity::find()
854 .filter(
855 Condition::all()
856 .add(worktree_entry::Column::ProjectId.eq(project.id))
857 .add(worktree_entry::Column::IsDeleted.eq(false)),
858 )
859 .stream(tx)
860 .await?;
861 while let Some(db_entry) = db_entries.next().await {
862 let db_entry = db_entry?;
863 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
864 worktree.entries.push(proto::Entry {
865 id: db_entry.id as u64,
866 is_dir: db_entry.is_dir,
867 path: db_entry.path,
868 inode: db_entry.inode as u64,
869 mtime: Some(proto::Timestamp {
870 seconds: db_entry.mtime_seconds as u64,
871 nanos: db_entry.mtime_nanos as u32,
872 }),
873 canonical_path: db_entry.canonical_path,
874 is_ignored: db_entry.is_ignored,
875 is_external: db_entry.is_external,
876 // This is only used in the summarization backlog, so if it's None,
877 // that just means we won't be able to detect when to resummarize
878 // based on total number of backlogged bytes - instead, we'd go
879 // on number of files only. That shouldn't be a huge deal in practice.
880 size: None,
881 is_fifo: db_entry.is_fifo,
882 });
883 }
884 }
885 }
886
887 // Populate repository entries.
888 let mut repositories = Vec::new();
889 {
890 let db_repository_entries = project_repository::Entity::find()
891 .filter(
892 Condition::all()
893 .add(project_repository::Column::ProjectId.eq(project.id))
894 .add(project_repository::Column::IsDeleted.eq(false)),
895 )
896 .all(tx)
897 .await?;
898 for db_repository_entry in db_repository_entries {
899 let mut repository_statuses = project_repository_statuses::Entity::find()
900 .filter(
901 Condition::all()
902 .add(project_repository_statuses::Column::ProjectId.eq(project.id))
903 .add(
904 project_repository_statuses::Column::RepositoryId
905 .eq(db_repository_entry.id),
906 )
907 .add(project_repository_statuses::Column::IsDeleted.eq(false)),
908 )
909 .stream(tx)
910 .await?;
911 let mut updated_statuses = Vec::new();
912 while let Some(status_entry) = repository_statuses.next().await {
913 let status_entry = status_entry?;
914 updated_statuses.push(db_status_to_proto(status_entry)?);
915 }
916
917 let current_merge_conflicts = db_repository_entry
918 .current_merge_conflicts
919 .as_ref()
920 .map(|conflicts| serde_json::from_str(&conflicts))
921 .transpose()?
922 .unwrap_or_default();
923
924 let branch_summary = db_repository_entry
925 .branch_summary
926 .as_ref()
927 .map(|branch_summary| serde_json::from_str(&branch_summary))
928 .transpose()?
929 .unwrap_or_default();
930
931 let entry_ids = serde_json::from_str(&db_repository_entry.entry_ids)
932 .context("failed to deserialize repository's entry ids")?;
933
934 if let Some(worktree_id) = db_repository_entry.legacy_worktree_id {
935 if let Some(worktree) = worktrees.get_mut(&(worktree_id as u64)) {
936 worktree.legacy_repository_entries.insert(
937 db_repository_entry.id as u64,
938 proto::RepositoryEntry {
939 repository_id: db_repository_entry.id as u64,
940 updated_statuses,
941 removed_statuses: Vec::new(),
942 current_merge_conflicts,
943 branch_summary,
944 },
945 );
946 }
947 } else {
948 repositories.push(proto::UpdateRepository {
949 project_id: db_repository_entry.project_id.0 as u64,
950 id: db_repository_entry.id as u64,
951 abs_path: db_repository_entry.abs_path,
952 entry_ids,
953 updated_statuses,
954 removed_statuses: Vec::new(),
955 current_merge_conflicts,
956 branch_summary,
957 scan_id: db_repository_entry.scan_id as u64,
958 is_last_update: true,
959 });
960 }
961 }
962 }
963
964 // Populate worktree diagnostic summaries.
965 {
966 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
967 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
968 .stream(tx)
969 .await?;
970 while let Some(db_summary) = db_summaries.next().await {
971 let db_summary = db_summary?;
972 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
973 worktree
974 .diagnostic_summaries
975 .push(proto::DiagnosticSummary {
976 path: db_summary.path,
977 language_server_id: db_summary.language_server_id as u64,
978 error_count: db_summary.error_count as u32,
979 warning_count: db_summary.warning_count as u32,
980 });
981 }
982 }
983 }
984
985 // Populate worktree settings files
986 {
987 let mut db_settings_files = worktree_settings_file::Entity::find()
988 .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
989 .stream(tx)
990 .await?;
991 while let Some(db_settings_file) = db_settings_files.next().await {
992 let db_settings_file = db_settings_file?;
993 if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
994 worktree.settings_files.push(WorktreeSettingsFile {
995 path: db_settings_file.path,
996 content: db_settings_file.content,
997 kind: db_settings_file.kind,
998 });
999 }
1000 }
1001 }
1002
1003 // Populate language servers.
1004 let language_servers = project
1005 .find_related(language_server::Entity)
1006 .all(tx)
1007 .await?;
1008
1009 let project = Project {
1010 id: project.id,
1011 role,
1012 collaborators: collaborators
1013 .into_iter()
1014 .map(|collaborator| ProjectCollaborator {
1015 connection_id: collaborator.connection(),
1016 user_id: collaborator.user_id,
1017 replica_id: collaborator.replica_id,
1018 is_host: collaborator.is_host,
1019 })
1020 .collect(),
1021 worktrees,
1022 repositories,
1023 language_servers: language_servers
1024 .into_iter()
1025 .map(|language_server| proto::LanguageServer {
1026 id: language_server.id as u64,
1027 name: language_server.name,
1028 worktree_id: None,
1029 })
1030 .collect(),
1031 };
1032 Ok((project, replica_id as ReplicaId))
1033 }
1034
1035 /// Removes the given connection from the specified project.
1036 pub async fn leave_project(
1037 &self,
1038 project_id: ProjectId,
1039 connection: ConnectionId,
1040 ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
1041 self.project_transaction(project_id, |tx| async move {
1042 let result = project_collaborator::Entity::delete_many()
1043 .filter(
1044 Condition::all()
1045 .add(project_collaborator::Column::ProjectId.eq(project_id))
1046 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
1047 .add(
1048 project_collaborator::Column::ConnectionServerId
1049 .eq(connection.owner_id as i32),
1050 ),
1051 )
1052 .exec(&*tx)
1053 .await?;
1054 if result.rows_affected == 0 {
1055 Err(anyhow!("not a collaborator on this project"))?;
1056 }
1057
1058 let project = project::Entity::find_by_id(project_id)
1059 .one(&*tx)
1060 .await?
1061 .ok_or_else(|| anyhow!("no such project"))?;
1062 let collaborators = project
1063 .find_related(project_collaborator::Entity)
1064 .all(&*tx)
1065 .await?;
1066 let connection_ids: Vec<ConnectionId> = collaborators
1067 .into_iter()
1068 .map(|collaborator| collaborator.connection())
1069 .collect();
1070
1071 follower::Entity::delete_many()
1072 .filter(
1073 Condition::any()
1074 .add(
1075 Condition::all()
1076 .add(follower::Column::ProjectId.eq(Some(project_id)))
1077 .add(
1078 follower::Column::LeaderConnectionServerId
1079 .eq(connection.owner_id),
1080 )
1081 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
1082 )
1083 .add(
1084 Condition::all()
1085 .add(follower::Column::ProjectId.eq(Some(project_id)))
1086 .add(
1087 follower::Column::FollowerConnectionServerId
1088 .eq(connection.owner_id),
1089 )
1090 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
1091 ),
1092 )
1093 .exec(&*tx)
1094 .await?;
1095
1096 let room = if let Some(room_id) = project.room_id {
1097 Some(self.get_room(room_id, &tx).await?)
1098 } else {
1099 None
1100 };
1101
1102 let left_project = LeftProject {
1103 id: project_id,
1104 should_unshare: connection == project.host_connection()?,
1105 connection_ids,
1106 };
1107 Ok((room, left_project))
1108 })
1109 .await
1110 }
1111
1112 pub async fn check_user_is_project_host(
1113 &self,
1114 project_id: ProjectId,
1115 connection_id: ConnectionId,
1116 ) -> Result<()> {
1117 self.project_transaction(project_id, |tx| async move {
1118 project::Entity::find()
1119 .filter(
1120 Condition::all()
1121 .add(project::Column::Id.eq(project_id))
1122 .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
1123 .add(
1124 project::Column::HostConnectionServerId
1125 .eq(Some(connection_id.owner_id as i32)),
1126 ),
1127 )
1128 .one(&*tx)
1129 .await?
1130 .ok_or_else(|| anyhow!("failed to read project host"))?;
1131
1132 Ok(())
1133 })
1134 .await
1135 .map(|guard| guard.into_inner())
1136 }
1137
1138 /// Returns the current project if the given user is authorized to access it with the specified capability.
1139 pub async fn access_project(
1140 &self,
1141 project_id: ProjectId,
1142 connection_id: ConnectionId,
1143 capability: Capability,
1144 tx: &DatabaseTransaction,
1145 ) -> Result<(project::Model, ChannelRole)> {
1146 let project = project::Entity::find_by_id(project_id)
1147 .one(tx)
1148 .await?
1149 .ok_or_else(|| anyhow!("no such project"))?;
1150
1151 let role_from_room = if let Some(room_id) = project.room_id {
1152 room_participant::Entity::find()
1153 .filter(room_participant::Column::RoomId.eq(room_id))
1154 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1155 .one(tx)
1156 .await?
1157 .and_then(|participant| participant.role)
1158 } else {
1159 None
1160 };
1161
1162 let role = role_from_room.unwrap_or(ChannelRole::Banned);
1163
1164 match capability {
1165 Capability::ReadWrite => {
1166 if !role.can_edit_projects() {
1167 return Err(anyhow!("not authorized to edit projects"))?;
1168 }
1169 }
1170 Capability::ReadOnly => {
1171 if !role.can_read_projects() {
1172 return Err(anyhow!("not authorized to read projects"))?;
1173 }
1174 }
1175 }
1176
1177 Ok((project, role))
1178 }
1179
1180 /// Returns the host connection for a read-only request to join a shared project.
1181 pub async fn host_for_read_only_project_request(
1182 &self,
1183 project_id: ProjectId,
1184 connection_id: ConnectionId,
1185 ) -> Result<ConnectionId> {
1186 self.project_transaction(project_id, |tx| async move {
1187 let (project, _) = self
1188 .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1189 .await?;
1190 project.host_connection()
1191 })
1192 .await
1193 .map(|guard| guard.into_inner())
1194 }
1195
1196 /// Returns the host connection for a request to join a shared project.
1197 pub async fn host_for_mutating_project_request(
1198 &self,
1199 project_id: ProjectId,
1200 connection_id: ConnectionId,
1201 ) -> Result<ConnectionId> {
1202 self.project_transaction(project_id, |tx| async move {
1203 let (project, _) = self
1204 .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1205 .await?;
1206 project.host_connection()
1207 })
1208 .await
1209 .map(|guard| guard.into_inner())
1210 }
1211
1212 pub async fn connections_for_buffer_update(
1213 &self,
1214 project_id: ProjectId,
1215 connection_id: ConnectionId,
1216 capability: Capability,
1217 ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1218 self.project_transaction(project_id, |tx| async move {
1219 // Authorize
1220 let (project, _) = self
1221 .access_project(project_id, connection_id, capability, &tx)
1222 .await?;
1223
1224 let host_connection_id = project.host_connection()?;
1225
1226 let collaborators = project_collaborator::Entity::find()
1227 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1228 .all(&*tx)
1229 .await?;
1230
1231 let guest_connection_ids = collaborators
1232 .into_iter()
1233 .filter_map(|collaborator| {
1234 if collaborator.is_host {
1235 None
1236 } else {
1237 Some(collaborator.connection())
1238 }
1239 })
1240 .collect();
1241
1242 Ok((host_connection_id, guest_connection_ids))
1243 })
1244 .await
1245 }
1246
1247 /// Returns the connection IDs in the given project.
1248 ///
1249 /// The provided `connection_id` must also be a collaborator in the project,
1250 /// otherwise an error will be returned.
1251 pub async fn project_connection_ids(
1252 &self,
1253 project_id: ProjectId,
1254 connection_id: ConnectionId,
1255 exclude_dev_server: bool,
1256 ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1257 self.project_transaction(project_id, |tx| async move {
1258 self.internal_project_connection_ids(project_id, connection_id, exclude_dev_server, &tx)
1259 .await
1260 })
1261 .await
1262 }
1263
1264 async fn internal_project_connection_ids(
1265 &self,
1266 project_id: ProjectId,
1267 connection_id: ConnectionId,
1268 exclude_dev_server: bool,
1269 tx: &DatabaseTransaction,
1270 ) -> Result<HashSet<ConnectionId>> {
1271 let project = project::Entity::find_by_id(project_id)
1272 .one(tx)
1273 .await?
1274 .ok_or_else(|| anyhow!("no such project"))?;
1275
1276 let mut collaborators = project_collaborator::Entity::find()
1277 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1278 .stream(tx)
1279 .await?;
1280
1281 let mut connection_ids = HashSet::default();
1282 if let Some(host_connection) = project.host_connection().log_err() {
1283 if !exclude_dev_server {
1284 connection_ids.insert(host_connection);
1285 }
1286 }
1287
1288 while let Some(collaborator) = collaborators.next().await {
1289 let collaborator = collaborator?;
1290 connection_ids.insert(collaborator.connection());
1291 }
1292
1293 if connection_ids.contains(&connection_id)
1294 || Some(connection_id) == project.host_connection().ok()
1295 {
1296 Ok(connection_ids)
1297 } else {
1298 Err(anyhow!(
1299 "can only send project updates to a project you're in"
1300 ))?
1301 }
1302 }
1303
1304 async fn project_guest_connection_ids(
1305 &self,
1306 project_id: ProjectId,
1307 tx: &DatabaseTransaction,
1308 ) -> Result<Vec<ConnectionId>> {
1309 let mut collaborators = project_collaborator::Entity::find()
1310 .filter(
1311 project_collaborator::Column::ProjectId
1312 .eq(project_id)
1313 .and(project_collaborator::Column::IsHost.eq(false)),
1314 )
1315 .stream(tx)
1316 .await?;
1317
1318 let mut guest_connection_ids = Vec::new();
1319 while let Some(collaborator) = collaborators.next().await {
1320 let collaborator = collaborator?;
1321 guest_connection_ids.push(collaborator.connection());
1322 }
1323 Ok(guest_connection_ids)
1324 }
1325
1326 /// Returns the [`RoomId`] for the given project.
1327 pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1328 self.transaction(|tx| async move {
1329 Ok(project::Entity::find_by_id(project_id)
1330 .one(&*tx)
1331 .await?
1332 .and_then(|project| project.room_id))
1333 })
1334 .await
1335 }
1336
1337 pub async fn check_room_participants(
1338 &self,
1339 room_id: RoomId,
1340 leader_id: ConnectionId,
1341 follower_id: ConnectionId,
1342 ) -> Result<()> {
1343 self.transaction(|tx| async move {
1344 use room_participant::Column;
1345
1346 let count = room_participant::Entity::find()
1347 .filter(
1348 Condition::all().add(Column::RoomId.eq(room_id)).add(
1349 Condition::any()
1350 .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1351 Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1352 ))
1353 .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1354 Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1355 )),
1356 ),
1357 )
1358 .count(&*tx)
1359 .await?;
1360
1361 if count < 2 {
1362 Err(anyhow!("not room participants"))?;
1363 }
1364
1365 Ok(())
1366 })
1367 .await
1368 }
1369
1370 /// Adds the given follower connection as a follower of the given leader connection.
1371 pub async fn follow(
1372 &self,
1373 room_id: RoomId,
1374 project_id: ProjectId,
1375 leader_connection: ConnectionId,
1376 follower_connection: ConnectionId,
1377 ) -> Result<TransactionGuard<proto::Room>> {
1378 self.room_transaction(room_id, |tx| async move {
1379 follower::ActiveModel {
1380 room_id: ActiveValue::set(room_id),
1381 project_id: ActiveValue::set(project_id),
1382 leader_connection_server_id: ActiveValue::set(ServerId(
1383 leader_connection.owner_id as i32,
1384 )),
1385 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1386 follower_connection_server_id: ActiveValue::set(ServerId(
1387 follower_connection.owner_id as i32,
1388 )),
1389 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1390 ..Default::default()
1391 }
1392 .insert(&*tx)
1393 .await?;
1394
1395 let room = self.get_room(room_id, &tx).await?;
1396 Ok(room)
1397 })
1398 .await
1399 }
1400
1401 /// Removes the given follower connection as a follower of the given leader connection.
1402 pub async fn unfollow(
1403 &self,
1404 room_id: RoomId,
1405 project_id: ProjectId,
1406 leader_connection: ConnectionId,
1407 follower_connection: ConnectionId,
1408 ) -> Result<TransactionGuard<proto::Room>> {
1409 self.room_transaction(room_id, |tx| async move {
1410 follower::Entity::delete_many()
1411 .filter(
1412 Condition::all()
1413 .add(follower::Column::RoomId.eq(room_id))
1414 .add(follower::Column::ProjectId.eq(project_id))
1415 .add(
1416 follower::Column::LeaderConnectionServerId
1417 .eq(leader_connection.owner_id),
1418 )
1419 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1420 .add(
1421 follower::Column::FollowerConnectionServerId
1422 .eq(follower_connection.owner_id),
1423 )
1424 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1425 )
1426 .exec(&*tx)
1427 .await?;
1428
1429 let room = self.get_room(room_id, &tx).await?;
1430 Ok(room)
1431 })
1432 .await
1433 }
1434}