1use anyhow::Context as _;
2use collections::HashSet;
3use util::ResultExt;
4
5use super::*;
6
7impl Database {
8 /// Returns the count of all projects, excluding ones marked as admin.
9 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
10 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
11 enum QueryAs {
12 Count,
13 }
14
15 self.transaction(|tx| async move {
16 Ok(project::Entity::find()
17 .select_only()
18 .column_as(project::Column::Id.count(), QueryAs::Count)
19 .inner_join(user::Entity)
20 .filter(user::Column::Admin.eq(false))
21 .into_values::<_, QueryAs>()
22 .one(&*tx)
23 .await?
24 .unwrap_or(0i64) as usize)
25 })
26 .await
27 }
28
29 /// Shares a project with the given room.
30 pub async fn share_project(
31 &self,
32 room_id: RoomId,
33 connection: ConnectionId,
34 worktrees: &[proto::WorktreeMetadata],
35 is_ssh_project: bool,
36 ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
37 self.room_transaction(room_id, |tx| async move {
38 let participant = room_participant::Entity::find()
39 .filter(
40 Condition::all()
41 .add(
42 room_participant::Column::AnsweringConnectionId
43 .eq(connection.id as i32),
44 )
45 .add(
46 room_participant::Column::AnsweringConnectionServerId
47 .eq(connection.owner_id as i32),
48 ),
49 )
50 .one(&*tx)
51 .await?
52 .context("could not find participant")?;
53 if participant.room_id != room_id {
54 return Err(anyhow!("shared project on unexpected room"))?;
55 }
56 if !participant
57 .role
58 .unwrap_or(ChannelRole::Member)
59 .can_edit_projects()
60 {
61 return Err(anyhow!("guests cannot share projects"))?;
62 }
63
64 let project = project::ActiveModel {
65 room_id: ActiveValue::set(Some(participant.room_id)),
66 host_user_id: ActiveValue::set(Some(participant.user_id)),
67 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
68 host_connection_server_id: ActiveValue::set(Some(ServerId(
69 connection.owner_id as i32,
70 ))),
71 id: ActiveValue::NotSet,
72 }
73 .insert(&*tx)
74 .await?;
75
76 if !worktrees.is_empty() {
77 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
78 worktree::ActiveModel {
79 id: ActiveValue::set(worktree.id as i64),
80 project_id: ActiveValue::set(project.id),
81 abs_path: ActiveValue::set(worktree.abs_path.clone()),
82 root_name: ActiveValue::set(worktree.root_name.clone()),
83 visible: ActiveValue::set(worktree.visible),
84 scan_id: ActiveValue::set(0),
85 completed_scan_id: ActiveValue::set(0),
86 }
87 }))
88 .exec(&*tx)
89 .await?;
90 }
91
92 let replica_id = if is_ssh_project { 1 } else { 0 };
93
94 project_collaborator::ActiveModel {
95 project_id: ActiveValue::set(project.id),
96 connection_id: ActiveValue::set(connection.id as i32),
97 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
98 user_id: ActiveValue::set(participant.user_id),
99 replica_id: ActiveValue::set(ReplicaId(replica_id)),
100 is_host: ActiveValue::set(true),
101 ..Default::default()
102 }
103 .insert(&*tx)
104 .await?;
105
106 let room = self.get_room(room_id, &tx).await?;
107 Ok((project.id, room))
108 })
109 .await
110 }
111
112 pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
113 self.weak_transaction(|tx| async move {
114 project::Entity::delete_by_id(project_id).exec(&*tx).await?;
115 Ok(())
116 })
117 .await
118 }
119
120 /// Unshares the given project.
121 pub async fn unshare_project(
122 &self,
123 project_id: ProjectId,
124 connection: ConnectionId,
125 ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
126 self.project_transaction(project_id, |tx| async move {
127 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
128 let project = project::Entity::find_by_id(project_id)
129 .one(&*tx)
130 .await?
131 .context("project not found")?;
132 let room = if let Some(room_id) = project.room_id {
133 Some(self.get_room(room_id, &tx).await?)
134 } else {
135 None
136 };
137 if project.host_connection()? == connection {
138 return Ok((true, room, guest_connection_ids));
139 }
140 Err(anyhow!("cannot unshare a project hosted by another user"))?
141 })
142 .await
143 }
144
145 /// Updates the worktrees associated with the given project.
146 pub async fn update_project(
147 &self,
148 project_id: ProjectId,
149 connection: ConnectionId,
150 worktrees: &[proto::WorktreeMetadata],
151 ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
152 self.project_transaction(project_id, |tx| async move {
153 let project = project::Entity::find_by_id(project_id)
154 .filter(
155 Condition::all()
156 .add(project::Column::HostConnectionId.eq(connection.id as i32))
157 .add(
158 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
159 ),
160 )
161 .one(&*tx)
162 .await?
163 .context("no such project")?;
164
165 self.update_project_worktrees(project.id, worktrees, &tx)
166 .await?;
167
168 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
169
170 let room = if let Some(room_id) = project.room_id {
171 Some(self.get_room(room_id, &tx).await?)
172 } else {
173 None
174 };
175
176 Ok((room, guest_connection_ids))
177 })
178 .await
179 }
180
181 pub(in crate::db) async fn update_project_worktrees(
182 &self,
183 project_id: ProjectId,
184 worktrees: &[proto::WorktreeMetadata],
185 tx: &DatabaseTransaction,
186 ) -> Result<()> {
187 if !worktrees.is_empty() {
188 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
189 id: ActiveValue::set(worktree.id as i64),
190 project_id: ActiveValue::set(project_id),
191 abs_path: ActiveValue::set(worktree.abs_path.clone()),
192 root_name: ActiveValue::set(worktree.root_name.clone()),
193 visible: ActiveValue::set(worktree.visible),
194 scan_id: ActiveValue::set(0),
195 completed_scan_id: ActiveValue::set(0),
196 }))
197 .on_conflict(
198 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
199 .update_column(worktree::Column::RootName)
200 .to_owned(),
201 )
202 .exec(tx)
203 .await?;
204 }
205
206 worktree::Entity::delete_many()
207 .filter(worktree::Column::ProjectId.eq(project_id).and(
208 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
209 ))
210 .exec(tx)
211 .await?;
212
213 Ok(())
214 }
215
216 pub async fn update_worktree(
217 &self,
218 update: &proto::UpdateWorktree,
219 connection: ConnectionId,
220 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
221 if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
222 || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
223 {
224 return Err(anyhow!(
225 "invalid worktree update. removed entries: {}, updated entries: {}",
226 update.removed_entries.len(),
227 update.updated_entries.len()
228 ))?;
229 }
230
231 let project_id = ProjectId::from_proto(update.project_id);
232 let worktree_id = update.worktree_id as i64;
233 self.project_transaction(project_id, |tx| async move {
234 // Ensure the update comes from the host.
235 let _project = project::Entity::find_by_id(project_id)
236 .filter(
237 Condition::all()
238 .add(project::Column::HostConnectionId.eq(connection.id as i32))
239 .add(
240 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
241 ),
242 )
243 .one(&*tx)
244 .await?
245 .with_context(|| format!("no such project: {project_id}"))?;
246
247 // Update metadata.
248 worktree::Entity::update(worktree::ActiveModel {
249 id: ActiveValue::set(worktree_id),
250 project_id: ActiveValue::set(project_id),
251 root_name: ActiveValue::set(update.root_name.clone()),
252 scan_id: ActiveValue::set(update.scan_id as i64),
253 completed_scan_id: if update.is_last_update {
254 ActiveValue::set(update.scan_id as i64)
255 } else {
256 ActiveValue::default()
257 },
258 abs_path: ActiveValue::set(update.abs_path.clone()),
259 ..Default::default()
260 })
261 .exec(&*tx)
262 .await?;
263
264 if !update.updated_entries.is_empty() {
265 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
266 let mtime = entry.mtime.clone().unwrap_or_default();
267 worktree_entry::ActiveModel {
268 project_id: ActiveValue::set(project_id),
269 worktree_id: ActiveValue::set(worktree_id),
270 id: ActiveValue::set(entry.id as i64),
271 is_dir: ActiveValue::set(entry.is_dir),
272 path: ActiveValue::set(entry.path.clone()),
273 inode: ActiveValue::set(entry.inode as i64),
274 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
275 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
276 canonical_path: ActiveValue::set(entry.canonical_path.clone()),
277 is_ignored: ActiveValue::set(entry.is_ignored),
278 git_status: ActiveValue::set(None),
279 is_external: ActiveValue::set(entry.is_external),
280 is_deleted: ActiveValue::set(false),
281 scan_id: ActiveValue::set(update.scan_id as i64),
282 is_fifo: ActiveValue::set(entry.is_fifo),
283 }
284 }))
285 .on_conflict(
286 OnConflict::columns([
287 worktree_entry::Column::ProjectId,
288 worktree_entry::Column::WorktreeId,
289 worktree_entry::Column::Id,
290 ])
291 .update_columns([
292 worktree_entry::Column::IsDir,
293 worktree_entry::Column::Path,
294 worktree_entry::Column::Inode,
295 worktree_entry::Column::MtimeSeconds,
296 worktree_entry::Column::MtimeNanos,
297 worktree_entry::Column::CanonicalPath,
298 worktree_entry::Column::IsIgnored,
299 worktree_entry::Column::ScanId,
300 ])
301 .to_owned(),
302 )
303 .exec(&*tx)
304 .await?;
305 }
306
307 if !update.removed_entries.is_empty() {
308 worktree_entry::Entity::update_many()
309 .filter(
310 worktree_entry::Column::ProjectId
311 .eq(project_id)
312 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
313 .and(
314 worktree_entry::Column::Id
315 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
316 ),
317 )
318 .set(worktree_entry::ActiveModel {
319 is_deleted: ActiveValue::Set(true),
320 scan_id: ActiveValue::Set(update.scan_id as i64),
321 ..Default::default()
322 })
323 .exec(&*tx)
324 .await?;
325 }
326
327 // Backward-compatibility for old Zed clients.
328 //
329 // Remove this block when Zed 1.80 stable has been out for a week.
330 {
331 if !update.updated_repositories.is_empty() {
332 project_repository::Entity::insert_many(
333 update.updated_repositories.iter().map(|repository| {
334 project_repository::ActiveModel {
335 project_id: ActiveValue::set(project_id),
336 legacy_worktree_id: ActiveValue::set(Some(worktree_id)),
337 id: ActiveValue::set(repository.repository_id as i64),
338 scan_id: ActiveValue::set(update.scan_id as i64),
339 is_deleted: ActiveValue::set(false),
340 branch_summary: ActiveValue::Set(
341 repository
342 .branch_summary
343 .as_ref()
344 .map(|summary| serde_json::to_string(summary).unwrap()),
345 ),
346 current_merge_conflicts: ActiveValue::Set(Some(
347 serde_json::to_string(&repository.current_merge_conflicts)
348 .unwrap(),
349 )),
350
351 // Old clients do not use abs path, entry ids or head_commit_details.
352 abs_path: ActiveValue::set(String::new()),
353 entry_ids: ActiveValue::set("[]".into()),
354 head_commit_details: ActiveValue::set(None),
355 }
356 }),
357 )
358 .on_conflict(
359 OnConflict::columns([
360 project_repository::Column::ProjectId,
361 project_repository::Column::Id,
362 ])
363 .update_columns([
364 project_repository::Column::ScanId,
365 project_repository::Column::BranchSummary,
366 project_repository::Column::CurrentMergeConflicts,
367 ])
368 .to_owned(),
369 )
370 .exec(&*tx)
371 .await?;
372
373 let has_any_statuses = update
374 .updated_repositories
375 .iter()
376 .any(|repository| !repository.updated_statuses.is_empty());
377
378 if has_any_statuses {
379 project_repository_statuses::Entity::insert_many(
380 update.updated_repositories.iter().flat_map(
381 |repository: &proto::RepositoryEntry| {
382 repository.updated_statuses.iter().map(|status_entry| {
383 let (repo_path, status_kind, first_status, second_status) =
384 proto_status_to_db(status_entry.clone());
385 project_repository_statuses::ActiveModel {
386 project_id: ActiveValue::set(project_id),
387 repository_id: ActiveValue::set(
388 repository.repository_id as i64,
389 ),
390 scan_id: ActiveValue::set(update.scan_id as i64),
391 is_deleted: ActiveValue::set(false),
392 repo_path: ActiveValue::set(repo_path),
393 status: ActiveValue::set(0),
394 status_kind: ActiveValue::set(status_kind),
395 first_status: ActiveValue::set(first_status),
396 second_status: ActiveValue::set(second_status),
397 }
398 })
399 },
400 ),
401 )
402 .on_conflict(
403 OnConflict::columns([
404 project_repository_statuses::Column::ProjectId,
405 project_repository_statuses::Column::RepositoryId,
406 project_repository_statuses::Column::RepoPath,
407 ])
408 .update_columns([
409 project_repository_statuses::Column::ScanId,
410 project_repository_statuses::Column::StatusKind,
411 project_repository_statuses::Column::FirstStatus,
412 project_repository_statuses::Column::SecondStatus,
413 ])
414 .to_owned(),
415 )
416 .exec(&*tx)
417 .await?;
418 }
419
420 for repo in &update.updated_repositories {
421 if !repo.removed_statuses.is_empty() {
422 project_repository_statuses::Entity::update_many()
423 .filter(
424 project_repository_statuses::Column::ProjectId
425 .eq(project_id)
426 .and(
427 project_repository_statuses::Column::RepositoryId
428 .eq(repo.repository_id),
429 )
430 .and(
431 project_repository_statuses::Column::RepoPath
432 .is_in(repo.removed_statuses.iter()),
433 ),
434 )
435 .set(project_repository_statuses::ActiveModel {
436 is_deleted: ActiveValue::Set(true),
437 scan_id: ActiveValue::Set(update.scan_id as i64),
438 ..Default::default()
439 })
440 .exec(&*tx)
441 .await?;
442 }
443 }
444 }
445
446 if !update.removed_repositories.is_empty() {
447 project_repository::Entity::update_many()
448 .filter(
449 project_repository::Column::ProjectId
450 .eq(project_id)
451 .and(project_repository::Column::LegacyWorktreeId.eq(worktree_id))
452 .and(project_repository::Column::Id.is_in(
453 update.removed_repositories.iter().map(|id| *id as i64),
454 )),
455 )
456 .set(project_repository::ActiveModel {
457 is_deleted: ActiveValue::Set(true),
458 scan_id: ActiveValue::Set(update.scan_id as i64),
459 ..Default::default()
460 })
461 .exec(&*tx)
462 .await?;
463 }
464 }
465
466 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
467 Ok(connection_ids)
468 })
469 .await
470 }
471
472 pub async fn update_repository(
473 &self,
474 update: &proto::UpdateRepository,
475 _connection: ConnectionId,
476 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
477 let project_id = ProjectId::from_proto(update.project_id);
478 let repository_id = update.id as i64;
479 self.project_transaction(project_id, |tx| async move {
480 project_repository::Entity::insert(project_repository::ActiveModel {
481 project_id: ActiveValue::set(project_id),
482 id: ActiveValue::set(repository_id),
483 legacy_worktree_id: ActiveValue::set(None),
484 abs_path: ActiveValue::set(update.abs_path.clone()),
485 entry_ids: ActiveValue::Set(serde_json::to_string(&update.entry_ids).unwrap()),
486 scan_id: ActiveValue::set(update.scan_id as i64),
487 is_deleted: ActiveValue::set(false),
488 branch_summary: ActiveValue::Set(
489 update
490 .branch_summary
491 .as_ref()
492 .map(|summary| serde_json::to_string(summary).unwrap()),
493 ),
494 head_commit_details: ActiveValue::Set(
495 update
496 .head_commit_details
497 .as_ref()
498 .map(|details| serde_json::to_string(details).unwrap()),
499 ),
500 current_merge_conflicts: ActiveValue::Set(Some(
501 serde_json::to_string(&update.current_merge_conflicts).unwrap(),
502 )),
503 })
504 .on_conflict(
505 OnConflict::columns([
506 project_repository::Column::ProjectId,
507 project_repository::Column::Id,
508 ])
509 .update_columns([
510 project_repository::Column::ScanId,
511 project_repository::Column::BranchSummary,
512 project_repository::Column::EntryIds,
513 project_repository::Column::AbsPath,
514 project_repository::Column::CurrentMergeConflicts,
515 project_repository::Column::HeadCommitDetails,
516 ])
517 .to_owned(),
518 )
519 .exec(&*tx)
520 .await?;
521
522 let has_any_statuses = !update.updated_statuses.is_empty();
523
524 if has_any_statuses {
525 project_repository_statuses::Entity::insert_many(
526 update.updated_statuses.iter().map(|status_entry| {
527 let (repo_path, status_kind, first_status, second_status) =
528 proto_status_to_db(status_entry.clone());
529 project_repository_statuses::ActiveModel {
530 project_id: ActiveValue::set(project_id),
531 repository_id: ActiveValue::set(repository_id),
532 scan_id: ActiveValue::set(update.scan_id as i64),
533 is_deleted: ActiveValue::set(false),
534 repo_path: ActiveValue::set(repo_path),
535 status: ActiveValue::set(0),
536 status_kind: ActiveValue::set(status_kind),
537 first_status: ActiveValue::set(first_status),
538 second_status: ActiveValue::set(second_status),
539 }
540 }),
541 )
542 .on_conflict(
543 OnConflict::columns([
544 project_repository_statuses::Column::ProjectId,
545 project_repository_statuses::Column::RepositoryId,
546 project_repository_statuses::Column::RepoPath,
547 ])
548 .update_columns([
549 project_repository_statuses::Column::ScanId,
550 project_repository_statuses::Column::StatusKind,
551 project_repository_statuses::Column::FirstStatus,
552 project_repository_statuses::Column::SecondStatus,
553 ])
554 .to_owned(),
555 )
556 .exec(&*tx)
557 .await?;
558 }
559
560 let has_any_removed_statuses = !update.removed_statuses.is_empty();
561
562 if has_any_removed_statuses {
563 project_repository_statuses::Entity::update_many()
564 .filter(
565 project_repository_statuses::Column::ProjectId
566 .eq(project_id)
567 .and(
568 project_repository_statuses::Column::RepositoryId.eq(repository_id),
569 )
570 .and(
571 project_repository_statuses::Column::RepoPath
572 .is_in(update.removed_statuses.iter()),
573 ),
574 )
575 .set(project_repository_statuses::ActiveModel {
576 is_deleted: ActiveValue::Set(true),
577 scan_id: ActiveValue::Set(update.scan_id as i64),
578 ..Default::default()
579 })
580 .exec(&*tx)
581 .await?;
582 }
583
584 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
585 Ok(connection_ids)
586 })
587 .await
588 }
589
590 pub async fn remove_repository(
591 &self,
592 remove: &proto::RemoveRepository,
593 _connection: ConnectionId,
594 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
595 let project_id = ProjectId::from_proto(remove.project_id);
596 let repository_id = remove.id as i64;
597 self.project_transaction(project_id, |tx| async move {
598 project_repository::Entity::update_many()
599 .filter(
600 project_repository::Column::ProjectId
601 .eq(project_id)
602 .and(project_repository::Column::Id.eq(repository_id)),
603 )
604 .set(project_repository::ActiveModel {
605 is_deleted: ActiveValue::Set(true),
606 // scan_id: ActiveValue::Set(update.scan_id as i64),
607 ..Default::default()
608 })
609 .exec(&*tx)
610 .await?;
611
612 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
613 Ok(connection_ids)
614 })
615 .await
616 }
617
618 /// Updates the diagnostic summary for the given connection.
619 pub async fn update_diagnostic_summary(
620 &self,
621 update: &proto::UpdateDiagnosticSummary,
622 connection: ConnectionId,
623 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
624 let project_id = ProjectId::from_proto(update.project_id);
625 let worktree_id = update.worktree_id as i64;
626 self.project_transaction(project_id, |tx| async move {
627 let summary = update.summary.as_ref().context("invalid summary")?;
628
629 // Ensure the update comes from the host.
630 let project = project::Entity::find_by_id(project_id)
631 .one(&*tx)
632 .await?
633 .context("no such project")?;
634 if project.host_connection()? != connection {
635 return Err(anyhow!("can't update a project hosted by someone else"))?;
636 }
637
638 // Update summary.
639 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
640 project_id: ActiveValue::set(project_id),
641 worktree_id: ActiveValue::set(worktree_id),
642 path: ActiveValue::set(summary.path.clone()),
643 language_server_id: ActiveValue::set(summary.language_server_id as i64),
644 error_count: ActiveValue::set(summary.error_count as i32),
645 warning_count: ActiveValue::set(summary.warning_count as i32),
646 })
647 .on_conflict(
648 OnConflict::columns([
649 worktree_diagnostic_summary::Column::ProjectId,
650 worktree_diagnostic_summary::Column::WorktreeId,
651 worktree_diagnostic_summary::Column::Path,
652 ])
653 .update_columns([
654 worktree_diagnostic_summary::Column::LanguageServerId,
655 worktree_diagnostic_summary::Column::ErrorCount,
656 worktree_diagnostic_summary::Column::WarningCount,
657 ])
658 .to_owned(),
659 )
660 .exec(&*tx)
661 .await?;
662
663 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
664 Ok(connection_ids)
665 })
666 .await
667 }
668
669 /// Starts the language server for the given connection.
670 pub async fn start_language_server(
671 &self,
672 update: &proto::StartLanguageServer,
673 connection: ConnectionId,
674 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
675 let project_id = ProjectId::from_proto(update.project_id);
676 self.project_transaction(project_id, |tx| async move {
677 let server = update.server.as_ref().context("invalid language server")?;
678
679 // Ensure the update comes from the host.
680 let project = project::Entity::find_by_id(project_id)
681 .one(&*tx)
682 .await?
683 .context("no such project")?;
684 if project.host_connection()? != connection {
685 return Err(anyhow!("can't update a project hosted by someone else"))?;
686 }
687
688 // Add the newly-started language server.
689 language_server::Entity::insert(language_server::ActiveModel {
690 project_id: ActiveValue::set(project_id),
691 id: ActiveValue::set(server.id as i64),
692 name: ActiveValue::set(server.name.clone()),
693 })
694 .on_conflict(
695 OnConflict::columns([
696 language_server::Column::ProjectId,
697 language_server::Column::Id,
698 ])
699 .update_column(language_server::Column::Name)
700 .to_owned(),
701 )
702 .exec(&*tx)
703 .await?;
704
705 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
706 Ok(connection_ids)
707 })
708 .await
709 }
710
711 /// Updates the worktree settings for the given connection.
712 pub async fn update_worktree_settings(
713 &self,
714 update: &proto::UpdateWorktreeSettings,
715 connection: ConnectionId,
716 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
717 let project_id = ProjectId::from_proto(update.project_id);
718 let kind = match update.kind {
719 Some(kind) => proto::LocalSettingsKind::from_i32(kind)
720 .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
721 None => proto::LocalSettingsKind::Settings,
722 };
723 let kind = LocalSettingsKind::from_proto(kind);
724 self.project_transaction(project_id, |tx| async move {
725 // Ensure the update comes from the host.
726 let project = project::Entity::find_by_id(project_id)
727 .one(&*tx)
728 .await?
729 .context("no such project")?;
730 if project.host_connection()? != connection {
731 return Err(anyhow!("can't update a project hosted by someone else"))?;
732 }
733
734 if let Some(content) = &update.content {
735 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
736 project_id: ActiveValue::Set(project_id),
737 worktree_id: ActiveValue::Set(update.worktree_id as i64),
738 path: ActiveValue::Set(update.path.clone()),
739 content: ActiveValue::Set(content.clone()),
740 kind: ActiveValue::Set(kind),
741 })
742 .on_conflict(
743 OnConflict::columns([
744 worktree_settings_file::Column::ProjectId,
745 worktree_settings_file::Column::WorktreeId,
746 worktree_settings_file::Column::Path,
747 ])
748 .update_column(worktree_settings_file::Column::Content)
749 .to_owned(),
750 )
751 .exec(&*tx)
752 .await?;
753 } else {
754 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
755 project_id: ActiveValue::Set(project_id),
756 worktree_id: ActiveValue::Set(update.worktree_id as i64),
757 path: ActiveValue::Set(update.path.clone()),
758 ..Default::default()
759 })
760 .exec(&*tx)
761 .await?;
762 }
763
764 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
765 Ok(connection_ids)
766 })
767 .await
768 }
769
770 pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
771 self.transaction(|tx| async move {
772 Ok(project::Entity::find_by_id(id)
773 .one(&*tx)
774 .await?
775 .context("no such project")?)
776 })
777 .await
778 }
779
780 /// Adds the given connection to the specified project
781 /// in the current room.
782 pub async fn join_project(
783 &self,
784 project_id: ProjectId,
785 connection: ConnectionId,
786 user_id: UserId,
787 ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
788 self.project_transaction(project_id, |tx| async move {
789 let (project, role) = self
790 .access_project(project_id, connection, Capability::ReadOnly, &tx)
791 .await?;
792 self.join_project_internal(project, user_id, connection, role, &tx)
793 .await
794 })
795 .await
796 }
797
798 async fn join_project_internal(
799 &self,
800 project: project::Model,
801 user_id: UserId,
802 connection: ConnectionId,
803 role: ChannelRole,
804 tx: &DatabaseTransaction,
805 ) -> Result<(Project, ReplicaId)> {
806 let mut collaborators = project
807 .find_related(project_collaborator::Entity)
808 .all(tx)
809 .await?;
810 let replica_ids = collaborators
811 .iter()
812 .map(|c| c.replica_id)
813 .collect::<HashSet<_>>();
814 let mut replica_id = ReplicaId(1);
815 while replica_ids.contains(&replica_id) {
816 replica_id.0 += 1;
817 }
818 let new_collaborator = project_collaborator::ActiveModel {
819 project_id: ActiveValue::set(project.id),
820 connection_id: ActiveValue::set(connection.id as i32),
821 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
822 user_id: ActiveValue::set(user_id),
823 replica_id: ActiveValue::set(replica_id),
824 is_host: ActiveValue::set(false),
825 ..Default::default()
826 }
827 .insert(tx)
828 .await?;
829 collaborators.push(new_collaborator);
830
831 let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
832 let mut worktrees = db_worktrees
833 .into_iter()
834 .map(|db_worktree| {
835 (
836 db_worktree.id as u64,
837 Worktree {
838 id: db_worktree.id as u64,
839 abs_path: db_worktree.abs_path,
840 root_name: db_worktree.root_name,
841 visible: db_worktree.visible,
842 entries: Default::default(),
843 diagnostic_summaries: Default::default(),
844 settings_files: Default::default(),
845 scan_id: db_worktree.scan_id as u64,
846 completed_scan_id: db_worktree.completed_scan_id as u64,
847 legacy_repository_entries: Default::default(),
848 },
849 )
850 })
851 .collect::<BTreeMap<_, _>>();
852
853 // Populate worktree entries.
854 {
855 let mut db_entries = worktree_entry::Entity::find()
856 .filter(
857 Condition::all()
858 .add(worktree_entry::Column::ProjectId.eq(project.id))
859 .add(worktree_entry::Column::IsDeleted.eq(false)),
860 )
861 .stream(tx)
862 .await?;
863 while let Some(db_entry) = db_entries.next().await {
864 let db_entry = db_entry?;
865 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
866 worktree.entries.push(proto::Entry {
867 id: db_entry.id as u64,
868 is_dir: db_entry.is_dir,
869 path: db_entry.path,
870 inode: db_entry.inode as u64,
871 mtime: Some(proto::Timestamp {
872 seconds: db_entry.mtime_seconds as u64,
873 nanos: db_entry.mtime_nanos as u32,
874 }),
875 canonical_path: db_entry.canonical_path,
876 is_ignored: db_entry.is_ignored,
877 is_external: db_entry.is_external,
878 // This is only used in the summarization backlog, so if it's None,
879 // that just means we won't be able to detect when to resummarize
880 // based on total number of backlogged bytes - instead, we'd go
881 // on number of files only. That shouldn't be a huge deal in practice.
882 size: None,
883 is_fifo: db_entry.is_fifo,
884 });
885 }
886 }
887 }
888
889 // Populate repository entries.
890 let mut repositories = Vec::new();
891 {
892 let db_repository_entries = project_repository::Entity::find()
893 .filter(
894 Condition::all()
895 .add(project_repository::Column::ProjectId.eq(project.id))
896 .add(project_repository::Column::IsDeleted.eq(false)),
897 )
898 .all(tx)
899 .await?;
900 for db_repository_entry in db_repository_entries {
901 let mut repository_statuses = project_repository_statuses::Entity::find()
902 .filter(
903 Condition::all()
904 .add(project_repository_statuses::Column::ProjectId.eq(project.id))
905 .add(
906 project_repository_statuses::Column::RepositoryId
907 .eq(db_repository_entry.id),
908 )
909 .add(project_repository_statuses::Column::IsDeleted.eq(false)),
910 )
911 .stream(tx)
912 .await?;
913 let mut updated_statuses = Vec::new();
914 while let Some(status_entry) = repository_statuses.next().await {
915 let status_entry = status_entry?;
916 updated_statuses.push(db_status_to_proto(status_entry)?);
917 }
918
919 let current_merge_conflicts = db_repository_entry
920 .current_merge_conflicts
921 .as_ref()
922 .map(|conflicts| serde_json::from_str(&conflicts))
923 .transpose()?
924 .unwrap_or_default();
925
926 let branch_summary = db_repository_entry
927 .branch_summary
928 .as_ref()
929 .map(|branch_summary| serde_json::from_str(&branch_summary))
930 .transpose()?
931 .unwrap_or_default();
932
933 let head_commit_details = db_repository_entry
934 .head_commit_details
935 .as_ref()
936 .map(|head_commit_details| serde_json::from_str(&head_commit_details))
937 .transpose()?
938 .unwrap_or_default();
939
940 let entry_ids = serde_json::from_str(&db_repository_entry.entry_ids)
941 .context("failed to deserialize repository's entry ids")?;
942
943 if let Some(worktree_id) = db_repository_entry.legacy_worktree_id {
944 if let Some(worktree) = worktrees.get_mut(&(worktree_id as u64)) {
945 worktree.legacy_repository_entries.insert(
946 db_repository_entry.id as u64,
947 proto::RepositoryEntry {
948 repository_id: db_repository_entry.id as u64,
949 updated_statuses,
950 removed_statuses: Vec::new(),
951 current_merge_conflicts,
952 branch_summary,
953 },
954 );
955 }
956 } else {
957 repositories.push(proto::UpdateRepository {
958 project_id: db_repository_entry.project_id.0 as u64,
959 id: db_repository_entry.id as u64,
960 abs_path: db_repository_entry.abs_path,
961 entry_ids,
962 updated_statuses,
963 removed_statuses: Vec::new(),
964 current_merge_conflicts,
965 branch_summary,
966 head_commit_details,
967 scan_id: db_repository_entry.scan_id as u64,
968 is_last_update: true,
969 });
970 }
971 }
972 }
973
974 // Populate worktree diagnostic summaries.
975 {
976 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
977 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
978 .stream(tx)
979 .await?;
980 while let Some(db_summary) = db_summaries.next().await {
981 let db_summary = db_summary?;
982 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
983 worktree
984 .diagnostic_summaries
985 .push(proto::DiagnosticSummary {
986 path: db_summary.path,
987 language_server_id: db_summary.language_server_id as u64,
988 error_count: db_summary.error_count as u32,
989 warning_count: db_summary.warning_count as u32,
990 });
991 }
992 }
993 }
994
995 // Populate worktree settings files
996 {
997 let mut db_settings_files = worktree_settings_file::Entity::find()
998 .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
999 .stream(tx)
1000 .await?;
1001 while let Some(db_settings_file) = db_settings_files.next().await {
1002 let db_settings_file = db_settings_file?;
1003 if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
1004 worktree.settings_files.push(WorktreeSettingsFile {
1005 path: db_settings_file.path,
1006 content: db_settings_file.content,
1007 kind: db_settings_file.kind,
1008 });
1009 }
1010 }
1011 }
1012
1013 // Populate language servers.
1014 let language_servers = project
1015 .find_related(language_server::Entity)
1016 .all(tx)
1017 .await?;
1018
1019 let project = Project {
1020 id: project.id,
1021 role,
1022 collaborators: collaborators
1023 .into_iter()
1024 .map(|collaborator| ProjectCollaborator {
1025 connection_id: collaborator.connection(),
1026 user_id: collaborator.user_id,
1027 replica_id: collaborator.replica_id,
1028 is_host: collaborator.is_host,
1029 })
1030 .collect(),
1031 worktrees,
1032 repositories,
1033 language_servers: language_servers
1034 .into_iter()
1035 .map(|language_server| proto::LanguageServer {
1036 id: language_server.id as u64,
1037 name: language_server.name,
1038 worktree_id: None,
1039 })
1040 .collect(),
1041 };
1042 Ok((project, replica_id as ReplicaId))
1043 }
1044
1045 /// Removes the given connection from the specified project.
1046 pub async fn leave_project(
1047 &self,
1048 project_id: ProjectId,
1049 connection: ConnectionId,
1050 ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
1051 self.project_transaction(project_id, |tx| async move {
1052 let result = project_collaborator::Entity::delete_many()
1053 .filter(
1054 Condition::all()
1055 .add(project_collaborator::Column::ProjectId.eq(project_id))
1056 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
1057 .add(
1058 project_collaborator::Column::ConnectionServerId
1059 .eq(connection.owner_id as i32),
1060 ),
1061 )
1062 .exec(&*tx)
1063 .await?;
1064 if result.rows_affected == 0 {
1065 Err(anyhow!("not a collaborator on this project"))?;
1066 }
1067
1068 let project = project::Entity::find_by_id(project_id)
1069 .one(&*tx)
1070 .await?
1071 .context("no such project")?;
1072 let collaborators = project
1073 .find_related(project_collaborator::Entity)
1074 .all(&*tx)
1075 .await?;
1076 let connection_ids: Vec<ConnectionId> = collaborators
1077 .into_iter()
1078 .map(|collaborator| collaborator.connection())
1079 .collect();
1080
1081 follower::Entity::delete_many()
1082 .filter(
1083 Condition::any()
1084 .add(
1085 Condition::all()
1086 .add(follower::Column::ProjectId.eq(Some(project_id)))
1087 .add(
1088 follower::Column::LeaderConnectionServerId
1089 .eq(connection.owner_id),
1090 )
1091 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
1092 )
1093 .add(
1094 Condition::all()
1095 .add(follower::Column::ProjectId.eq(Some(project_id)))
1096 .add(
1097 follower::Column::FollowerConnectionServerId
1098 .eq(connection.owner_id),
1099 )
1100 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
1101 ),
1102 )
1103 .exec(&*tx)
1104 .await?;
1105
1106 let room = if let Some(room_id) = project.room_id {
1107 Some(self.get_room(room_id, &tx).await?)
1108 } else {
1109 None
1110 };
1111
1112 let left_project = LeftProject {
1113 id: project_id,
1114 should_unshare: connection == project.host_connection()?,
1115 connection_ids,
1116 };
1117 Ok((room, left_project))
1118 })
1119 .await
1120 }
1121
1122 pub async fn check_user_is_project_host(
1123 &self,
1124 project_id: ProjectId,
1125 connection_id: ConnectionId,
1126 ) -> Result<()> {
1127 self.project_transaction(project_id, |tx| async move {
1128 project::Entity::find()
1129 .filter(
1130 Condition::all()
1131 .add(project::Column::Id.eq(project_id))
1132 .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
1133 .add(
1134 project::Column::HostConnectionServerId
1135 .eq(Some(connection_id.owner_id as i32)),
1136 ),
1137 )
1138 .one(&*tx)
1139 .await?
1140 .context("failed to read project host")?;
1141
1142 Ok(())
1143 })
1144 .await
1145 .map(|guard| guard.into_inner())
1146 }
1147
1148 /// Returns the current project if the given user is authorized to access it with the specified capability.
1149 pub async fn access_project(
1150 &self,
1151 project_id: ProjectId,
1152 connection_id: ConnectionId,
1153 capability: Capability,
1154 tx: &DatabaseTransaction,
1155 ) -> Result<(project::Model, ChannelRole)> {
1156 let project = project::Entity::find_by_id(project_id)
1157 .one(tx)
1158 .await?
1159 .context("no such project")?;
1160
1161 let role_from_room = if let Some(room_id) = project.room_id {
1162 room_participant::Entity::find()
1163 .filter(room_participant::Column::RoomId.eq(room_id))
1164 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1165 .one(tx)
1166 .await?
1167 .and_then(|participant| participant.role)
1168 } else {
1169 None
1170 };
1171
1172 let role = role_from_room.unwrap_or(ChannelRole::Banned);
1173
1174 match capability {
1175 Capability::ReadWrite => {
1176 if !role.can_edit_projects() {
1177 return Err(anyhow!("not authorized to edit projects"))?;
1178 }
1179 }
1180 Capability::ReadOnly => {
1181 if !role.can_read_projects() {
1182 return Err(anyhow!("not authorized to read projects"))?;
1183 }
1184 }
1185 }
1186
1187 Ok((project, role))
1188 }
1189
1190 /// Returns the host connection for a read-only request to join a shared project.
1191 pub async fn host_for_read_only_project_request(
1192 &self,
1193 project_id: ProjectId,
1194 connection_id: ConnectionId,
1195 ) -> Result<ConnectionId> {
1196 self.project_transaction(project_id, |tx| async move {
1197 let (project, _) = self
1198 .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1199 .await?;
1200 project.host_connection()
1201 })
1202 .await
1203 .map(|guard| guard.into_inner())
1204 }
1205
1206 /// Returns the host connection for a request to join a shared project.
1207 pub async fn host_for_mutating_project_request(
1208 &self,
1209 project_id: ProjectId,
1210 connection_id: ConnectionId,
1211 ) -> Result<ConnectionId> {
1212 self.project_transaction(project_id, |tx| async move {
1213 let (project, _) = self
1214 .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1215 .await?;
1216 project.host_connection()
1217 })
1218 .await
1219 .map(|guard| guard.into_inner())
1220 }
1221
1222 pub async fn connections_for_buffer_update(
1223 &self,
1224 project_id: ProjectId,
1225 connection_id: ConnectionId,
1226 capability: Capability,
1227 ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1228 self.project_transaction(project_id, |tx| async move {
1229 // Authorize
1230 let (project, _) = self
1231 .access_project(project_id, connection_id, capability, &tx)
1232 .await?;
1233
1234 let host_connection_id = project.host_connection()?;
1235
1236 let collaborators = project_collaborator::Entity::find()
1237 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1238 .all(&*tx)
1239 .await?;
1240
1241 let guest_connection_ids = collaborators
1242 .into_iter()
1243 .filter_map(|collaborator| {
1244 if collaborator.is_host {
1245 None
1246 } else {
1247 Some(collaborator.connection())
1248 }
1249 })
1250 .collect();
1251
1252 Ok((host_connection_id, guest_connection_ids))
1253 })
1254 .await
1255 }
1256
1257 /// Returns the connection IDs in the given project.
1258 ///
1259 /// The provided `connection_id` must also be a collaborator in the project,
1260 /// otherwise an error will be returned.
1261 pub async fn project_connection_ids(
1262 &self,
1263 project_id: ProjectId,
1264 connection_id: ConnectionId,
1265 exclude_dev_server: bool,
1266 ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1267 self.project_transaction(project_id, |tx| async move {
1268 self.internal_project_connection_ids(project_id, connection_id, exclude_dev_server, &tx)
1269 .await
1270 })
1271 .await
1272 }
1273
1274 async fn internal_project_connection_ids(
1275 &self,
1276 project_id: ProjectId,
1277 connection_id: ConnectionId,
1278 exclude_dev_server: bool,
1279 tx: &DatabaseTransaction,
1280 ) -> Result<HashSet<ConnectionId>> {
1281 let project = project::Entity::find_by_id(project_id)
1282 .one(tx)
1283 .await?
1284 .context("no such project")?;
1285
1286 let mut collaborators = project_collaborator::Entity::find()
1287 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1288 .stream(tx)
1289 .await?;
1290
1291 let mut connection_ids = HashSet::default();
1292 if let Some(host_connection) = project.host_connection().log_err() {
1293 if !exclude_dev_server {
1294 connection_ids.insert(host_connection);
1295 }
1296 }
1297
1298 while let Some(collaborator) = collaborators.next().await {
1299 let collaborator = collaborator?;
1300 connection_ids.insert(collaborator.connection());
1301 }
1302
1303 if connection_ids.contains(&connection_id)
1304 || Some(connection_id) == project.host_connection().ok()
1305 {
1306 Ok(connection_ids)
1307 } else {
1308 Err(anyhow!(
1309 "can only send project updates to a project you're in"
1310 ))?
1311 }
1312 }
1313
1314 async fn project_guest_connection_ids(
1315 &self,
1316 project_id: ProjectId,
1317 tx: &DatabaseTransaction,
1318 ) -> Result<Vec<ConnectionId>> {
1319 let mut collaborators = project_collaborator::Entity::find()
1320 .filter(
1321 project_collaborator::Column::ProjectId
1322 .eq(project_id)
1323 .and(project_collaborator::Column::IsHost.eq(false)),
1324 )
1325 .stream(tx)
1326 .await?;
1327
1328 let mut guest_connection_ids = Vec::new();
1329 while let Some(collaborator) = collaborators.next().await {
1330 let collaborator = collaborator?;
1331 guest_connection_ids.push(collaborator.connection());
1332 }
1333 Ok(guest_connection_ids)
1334 }
1335
1336 /// Returns the [`RoomId`] for the given project.
1337 pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1338 self.transaction(|tx| async move {
1339 Ok(project::Entity::find_by_id(project_id)
1340 .one(&*tx)
1341 .await?
1342 .and_then(|project| project.room_id))
1343 })
1344 .await
1345 }
1346
1347 pub async fn check_room_participants(
1348 &self,
1349 room_id: RoomId,
1350 leader_id: ConnectionId,
1351 follower_id: ConnectionId,
1352 ) -> Result<()> {
1353 self.transaction(|tx| async move {
1354 use room_participant::Column;
1355
1356 let count = room_participant::Entity::find()
1357 .filter(
1358 Condition::all().add(Column::RoomId.eq(room_id)).add(
1359 Condition::any()
1360 .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1361 Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1362 ))
1363 .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1364 Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1365 )),
1366 ),
1367 )
1368 .count(&*tx)
1369 .await?;
1370
1371 if count < 2 {
1372 Err(anyhow!("not room participants"))?;
1373 }
1374
1375 Ok(())
1376 })
1377 .await
1378 }
1379
1380 /// Adds the given follower connection as a follower of the given leader connection.
1381 pub async fn follow(
1382 &self,
1383 room_id: RoomId,
1384 project_id: ProjectId,
1385 leader_connection: ConnectionId,
1386 follower_connection: ConnectionId,
1387 ) -> Result<TransactionGuard<proto::Room>> {
1388 self.room_transaction(room_id, |tx| async move {
1389 follower::ActiveModel {
1390 room_id: ActiveValue::set(room_id),
1391 project_id: ActiveValue::set(project_id),
1392 leader_connection_server_id: ActiveValue::set(ServerId(
1393 leader_connection.owner_id as i32,
1394 )),
1395 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1396 follower_connection_server_id: ActiveValue::set(ServerId(
1397 follower_connection.owner_id as i32,
1398 )),
1399 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1400 ..Default::default()
1401 }
1402 .insert(&*tx)
1403 .await?;
1404
1405 let room = self.get_room(room_id, &tx).await?;
1406 Ok(room)
1407 })
1408 .await
1409 }
1410
1411 /// Removes the given follower connection as a follower of the given leader connection.
1412 pub async fn unfollow(
1413 &self,
1414 room_id: RoomId,
1415 project_id: ProjectId,
1416 leader_connection: ConnectionId,
1417 follower_connection: ConnectionId,
1418 ) -> Result<TransactionGuard<proto::Room>> {
1419 self.room_transaction(room_id, |tx| async move {
1420 follower::Entity::delete_many()
1421 .filter(
1422 Condition::all()
1423 .add(follower::Column::RoomId.eq(room_id))
1424 .add(follower::Column::ProjectId.eq(project_id))
1425 .add(
1426 follower::Column::LeaderConnectionServerId
1427 .eq(leader_connection.owner_id),
1428 )
1429 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1430 .add(
1431 follower::Column::FollowerConnectionServerId
1432 .eq(follower_connection.owner_id),
1433 )
1434 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1435 )
1436 .exec(&*tx)
1437 .await?;
1438
1439 let room = self.get_room(room_id, &tx).await?;
1440 Ok(room)
1441 })
1442 .await
1443 }
1444}