1use anyhow::Context as _;
2use collections::HashSet;
3use util::ResultExt;
4
5use super::*;
6
7impl Database {
8 /// Returns the count of all projects, excluding ones marked as admin.
9 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
10 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
11 enum QueryAs {
12 Count,
13 }
14
15 self.transaction(|tx| async move {
16 Ok(project::Entity::find()
17 .select_only()
18 .column_as(project::Column::Id.count(), QueryAs::Count)
19 .inner_join(user::Entity)
20 .filter(user::Column::Admin.eq(false))
21 .into_values::<_, QueryAs>()
22 .one(&*tx)
23 .await?
24 .unwrap_or(0i64) as usize)
25 })
26 .await
27 }
28
29 /// Shares a project with the given room.
30 pub async fn share_project(
31 &self,
32 room_id: RoomId,
33 connection: ConnectionId,
34 worktrees: &[proto::WorktreeMetadata],
35 is_ssh_project: bool,
36 ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
37 self.room_transaction(room_id, |tx| async move {
38 let participant = room_participant::Entity::find()
39 .filter(
40 Condition::all()
41 .add(
42 room_participant::Column::AnsweringConnectionId
43 .eq(connection.id as i32),
44 )
45 .add(
46 room_participant::Column::AnsweringConnectionServerId
47 .eq(connection.owner_id as i32),
48 ),
49 )
50 .one(&*tx)
51 .await?
52 .ok_or_else(|| anyhow!("could not find participant"))?;
53 if participant.room_id != room_id {
54 return Err(anyhow!("shared project on unexpected room"))?;
55 }
56 if !participant
57 .role
58 .unwrap_or(ChannelRole::Member)
59 .can_edit_projects()
60 {
61 return Err(anyhow!("guests cannot share projects"))?;
62 }
63
64 let project = project::ActiveModel {
65 room_id: ActiveValue::set(Some(participant.room_id)),
66 host_user_id: ActiveValue::set(Some(participant.user_id)),
67 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
68 host_connection_server_id: ActiveValue::set(Some(ServerId(
69 connection.owner_id as i32,
70 ))),
71 id: ActiveValue::NotSet,
72 }
73 .insert(&*tx)
74 .await?;
75
76 if !worktrees.is_empty() {
77 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
78 worktree::ActiveModel {
79 id: ActiveValue::set(worktree.id as i64),
80 project_id: ActiveValue::set(project.id),
81 abs_path: ActiveValue::set(worktree.abs_path.clone()),
82 root_name: ActiveValue::set(worktree.root_name.clone()),
83 visible: ActiveValue::set(worktree.visible),
84 scan_id: ActiveValue::set(0),
85 completed_scan_id: ActiveValue::set(0),
86 }
87 }))
88 .exec(&*tx)
89 .await?;
90 }
91
92 let replica_id = if is_ssh_project { 1 } else { 0 };
93
94 project_collaborator::ActiveModel {
95 project_id: ActiveValue::set(project.id),
96 connection_id: ActiveValue::set(connection.id as i32),
97 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
98 user_id: ActiveValue::set(participant.user_id),
99 replica_id: ActiveValue::set(ReplicaId(replica_id)),
100 is_host: ActiveValue::set(true),
101 ..Default::default()
102 }
103 .insert(&*tx)
104 .await?;
105
106 let room = self.get_room(room_id, &tx).await?;
107 Ok((project.id, room))
108 })
109 .await
110 }
111
112 pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
113 self.weak_transaction(|tx| async move {
114 project::Entity::delete_by_id(project_id).exec(&*tx).await?;
115 Ok(())
116 })
117 .await
118 }
119
120 /// Unshares the given project.
121 pub async fn unshare_project(
122 &self,
123 project_id: ProjectId,
124 connection: ConnectionId,
125 ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
126 self.project_transaction(project_id, |tx| async move {
127 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
128 let project = project::Entity::find_by_id(project_id)
129 .one(&*tx)
130 .await?
131 .ok_or_else(|| anyhow!("project not found"))?;
132 let room = if let Some(room_id) = project.room_id {
133 Some(self.get_room(room_id, &tx).await?)
134 } else {
135 None
136 };
137 if project.host_connection()? == connection {
138 return Ok((true, room, guest_connection_ids));
139 }
140 Err(anyhow!("cannot unshare a project hosted by another user"))?
141 })
142 .await
143 }
144
145 /// Updates the worktrees associated with the given project.
146 pub async fn update_project(
147 &self,
148 project_id: ProjectId,
149 connection: ConnectionId,
150 worktrees: &[proto::WorktreeMetadata],
151 ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
152 self.project_transaction(project_id, |tx| async move {
153 let project = project::Entity::find_by_id(project_id)
154 .filter(
155 Condition::all()
156 .add(project::Column::HostConnectionId.eq(connection.id as i32))
157 .add(
158 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
159 ),
160 )
161 .one(&*tx)
162 .await?
163 .ok_or_else(|| anyhow!("no such project"))?;
164
165 self.update_project_worktrees(project.id, worktrees, &tx)
166 .await?;
167
168 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
169
170 let room = if let Some(room_id) = project.room_id {
171 Some(self.get_room(room_id, &tx).await?)
172 } else {
173 None
174 };
175
176 Ok((room, guest_connection_ids))
177 })
178 .await
179 }
180
181 pub(in crate::db) async fn update_project_worktrees(
182 &self,
183 project_id: ProjectId,
184 worktrees: &[proto::WorktreeMetadata],
185 tx: &DatabaseTransaction,
186 ) -> Result<()> {
187 if !worktrees.is_empty() {
188 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
189 id: ActiveValue::set(worktree.id as i64),
190 project_id: ActiveValue::set(project_id),
191 abs_path: ActiveValue::set(worktree.abs_path.clone()),
192 root_name: ActiveValue::set(worktree.root_name.clone()),
193 visible: ActiveValue::set(worktree.visible),
194 scan_id: ActiveValue::set(0),
195 completed_scan_id: ActiveValue::set(0),
196 }))
197 .on_conflict(
198 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
199 .update_column(worktree::Column::RootName)
200 .to_owned(),
201 )
202 .exec(tx)
203 .await?;
204 }
205
206 worktree::Entity::delete_many()
207 .filter(worktree::Column::ProjectId.eq(project_id).and(
208 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
209 ))
210 .exec(tx)
211 .await?;
212
213 Ok(())
214 }
215
216 pub async fn update_worktree(
217 &self,
218 update: &proto::UpdateWorktree,
219 connection: ConnectionId,
220 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
221 if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
222 || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
223 {
224 return Err(anyhow!(
225 "invalid worktree update. removed entries: {}, updated entries: {}",
226 update.removed_entries.len(),
227 update.updated_entries.len()
228 ))?;
229 }
230
231 let project_id = ProjectId::from_proto(update.project_id);
232 let worktree_id = update.worktree_id as i64;
233 self.project_transaction(project_id, |tx| async move {
234 // Ensure the update comes from the host.
235 let _project = project::Entity::find_by_id(project_id)
236 .filter(
237 Condition::all()
238 .add(project::Column::HostConnectionId.eq(connection.id as i32))
239 .add(
240 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
241 ),
242 )
243 .one(&*tx)
244 .await?
245 .ok_or_else(|| anyhow!("no such project: {project_id}"))?;
246
247 // Update metadata.
248 worktree::Entity::update(worktree::ActiveModel {
249 id: ActiveValue::set(worktree_id),
250 project_id: ActiveValue::set(project_id),
251 root_name: ActiveValue::set(update.root_name.clone()),
252 scan_id: ActiveValue::set(update.scan_id as i64),
253 completed_scan_id: if update.is_last_update {
254 ActiveValue::set(update.scan_id as i64)
255 } else {
256 ActiveValue::default()
257 },
258 abs_path: ActiveValue::set(update.abs_path.clone()),
259 ..Default::default()
260 })
261 .exec(&*tx)
262 .await?;
263
264 if !update.updated_entries.is_empty() {
265 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
266 let mtime = entry.mtime.clone().unwrap_or_default();
267 worktree_entry::ActiveModel {
268 project_id: ActiveValue::set(project_id),
269 worktree_id: ActiveValue::set(worktree_id),
270 id: ActiveValue::set(entry.id as i64),
271 is_dir: ActiveValue::set(entry.is_dir),
272 path: ActiveValue::set(entry.path.clone()),
273 inode: ActiveValue::set(entry.inode as i64),
274 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
275 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
276 canonical_path: ActiveValue::set(entry.canonical_path.clone()),
277 is_ignored: ActiveValue::set(entry.is_ignored),
278 git_status: ActiveValue::set(None),
279 is_external: ActiveValue::set(entry.is_external),
280 is_deleted: ActiveValue::set(false),
281 scan_id: ActiveValue::set(update.scan_id as i64),
282 is_fifo: ActiveValue::set(entry.is_fifo),
283 }
284 }))
285 .on_conflict(
286 OnConflict::columns([
287 worktree_entry::Column::ProjectId,
288 worktree_entry::Column::WorktreeId,
289 worktree_entry::Column::Id,
290 ])
291 .update_columns([
292 worktree_entry::Column::IsDir,
293 worktree_entry::Column::Path,
294 worktree_entry::Column::Inode,
295 worktree_entry::Column::MtimeSeconds,
296 worktree_entry::Column::MtimeNanos,
297 worktree_entry::Column::CanonicalPath,
298 worktree_entry::Column::IsIgnored,
299 worktree_entry::Column::ScanId,
300 ])
301 .to_owned(),
302 )
303 .exec(&*tx)
304 .await?;
305 }
306
307 if !update.removed_entries.is_empty() {
308 worktree_entry::Entity::update_many()
309 .filter(
310 worktree_entry::Column::ProjectId
311 .eq(project_id)
312 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
313 .and(
314 worktree_entry::Column::Id
315 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
316 ),
317 )
318 .set(worktree_entry::ActiveModel {
319 is_deleted: ActiveValue::Set(true),
320 scan_id: ActiveValue::Set(update.scan_id as i64),
321 ..Default::default()
322 })
323 .exec(&*tx)
324 .await?;
325 }
326
327 // Backward-compatibility for old Zed clients.
328 //
329 // Remove this block when Zed 1.80 stable has been out for a week.
330 {
331 if !update.updated_repositories.is_empty() {
332 project_repository::Entity::insert_many(
333 update.updated_repositories.iter().map(|repository| {
334 project_repository::ActiveModel {
335 project_id: ActiveValue::set(project_id),
336 legacy_worktree_id: ActiveValue::set(Some(worktree_id)),
337 id: ActiveValue::set(repository.repository_id as i64),
338 scan_id: ActiveValue::set(update.scan_id as i64),
339 is_deleted: ActiveValue::set(false),
340 branch_summary: ActiveValue::Set(
341 repository
342 .branch_summary
343 .as_ref()
344 .map(|summary| serde_json::to_string(summary).unwrap()),
345 ),
346 current_merge_conflicts: ActiveValue::Set(Some(
347 serde_json::to_string(&repository.current_merge_conflicts)
348 .unwrap(),
349 )),
350
351 // Old clients do not use abs path, entry ids or head_commit_details.
352 abs_path: ActiveValue::set(String::new()),
353 entry_ids: ActiveValue::set("[]".into()),
354 head_commit_details: ActiveValue::set(None),
355 }
356 }),
357 )
358 .on_conflict(
359 OnConflict::columns([
360 project_repository::Column::ProjectId,
361 project_repository::Column::Id,
362 ])
363 .update_columns([
364 project_repository::Column::ScanId,
365 project_repository::Column::BranchSummary,
366 project_repository::Column::CurrentMergeConflicts,
367 ])
368 .to_owned(),
369 )
370 .exec(&*tx)
371 .await?;
372
373 let has_any_statuses = update
374 .updated_repositories
375 .iter()
376 .any(|repository| !repository.updated_statuses.is_empty());
377
378 if has_any_statuses {
379 project_repository_statuses::Entity::insert_many(
380 update.updated_repositories.iter().flat_map(
381 |repository: &proto::RepositoryEntry| {
382 repository.updated_statuses.iter().map(|status_entry| {
383 let (repo_path, status_kind, first_status, second_status) =
384 proto_status_to_db(status_entry.clone());
385 project_repository_statuses::ActiveModel {
386 project_id: ActiveValue::set(project_id),
387 repository_id: ActiveValue::set(
388 repository.repository_id as i64,
389 ),
390 scan_id: ActiveValue::set(update.scan_id as i64),
391 is_deleted: ActiveValue::set(false),
392 repo_path: ActiveValue::set(repo_path),
393 status: ActiveValue::set(0),
394 status_kind: ActiveValue::set(status_kind),
395 first_status: ActiveValue::set(first_status),
396 second_status: ActiveValue::set(second_status),
397 }
398 })
399 },
400 ),
401 )
402 .on_conflict(
403 OnConflict::columns([
404 project_repository_statuses::Column::ProjectId,
405 project_repository_statuses::Column::RepositoryId,
406 project_repository_statuses::Column::RepoPath,
407 ])
408 .update_columns([
409 project_repository_statuses::Column::ScanId,
410 project_repository_statuses::Column::StatusKind,
411 project_repository_statuses::Column::FirstStatus,
412 project_repository_statuses::Column::SecondStatus,
413 ])
414 .to_owned(),
415 )
416 .exec(&*tx)
417 .await?;
418 }
419
420 for repo in &update.updated_repositories {
421 if !repo.removed_statuses.is_empty() {
422 project_repository_statuses::Entity::update_many()
423 .filter(
424 project_repository_statuses::Column::ProjectId
425 .eq(project_id)
426 .and(
427 project_repository_statuses::Column::RepositoryId
428 .eq(repo.repository_id),
429 )
430 .and(
431 project_repository_statuses::Column::RepoPath
432 .is_in(repo.removed_statuses.iter()),
433 ),
434 )
435 .set(project_repository_statuses::ActiveModel {
436 is_deleted: ActiveValue::Set(true),
437 scan_id: ActiveValue::Set(update.scan_id as i64),
438 ..Default::default()
439 })
440 .exec(&*tx)
441 .await?;
442 }
443 }
444 }
445
446 if !update.removed_repositories.is_empty() {
447 project_repository::Entity::update_many()
448 .filter(
449 project_repository::Column::ProjectId
450 .eq(project_id)
451 .and(project_repository::Column::LegacyWorktreeId.eq(worktree_id))
452 .and(project_repository::Column::Id.is_in(
453 update.removed_repositories.iter().map(|id| *id as i64),
454 )),
455 )
456 .set(project_repository::ActiveModel {
457 is_deleted: ActiveValue::Set(true),
458 scan_id: ActiveValue::Set(update.scan_id as i64),
459 ..Default::default()
460 })
461 .exec(&*tx)
462 .await?;
463 }
464 }
465
466 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
467 Ok(connection_ids)
468 })
469 .await
470 }
471
472 pub async fn update_repository(
473 &self,
474 update: &proto::UpdateRepository,
475 _connection: ConnectionId,
476 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
477 let project_id = ProjectId::from_proto(update.project_id);
478 let repository_id = update.id as i64;
479 self.project_transaction(project_id, |tx| async move {
480 project_repository::Entity::insert(project_repository::ActiveModel {
481 project_id: ActiveValue::set(project_id),
482 id: ActiveValue::set(repository_id),
483 legacy_worktree_id: ActiveValue::set(None),
484 abs_path: ActiveValue::set(update.abs_path.clone()),
485 entry_ids: ActiveValue::Set(serde_json::to_string(&update.entry_ids).unwrap()),
486 scan_id: ActiveValue::set(update.scan_id as i64),
487 is_deleted: ActiveValue::set(false),
488 branch_summary: ActiveValue::Set(
489 update
490 .branch_summary
491 .as_ref()
492 .map(|summary| serde_json::to_string(summary).unwrap()),
493 ),
494 head_commit_details: ActiveValue::Set(
495 update
496 .head_commit_details
497 .as_ref()
498 .map(|details| serde_json::to_string(details).unwrap()),
499 ),
500 current_merge_conflicts: ActiveValue::Set(Some(
501 serde_json::to_string(&update.current_merge_conflicts).unwrap(),
502 )),
503 })
504 .on_conflict(
505 OnConflict::columns([
506 project_repository::Column::ProjectId,
507 project_repository::Column::Id,
508 ])
509 .update_columns([
510 project_repository::Column::ScanId,
511 project_repository::Column::BranchSummary,
512 project_repository::Column::EntryIds,
513 project_repository::Column::AbsPath,
514 project_repository::Column::CurrentMergeConflicts,
515 project_repository::Column::HeadCommitDetails,
516 ])
517 .to_owned(),
518 )
519 .exec(&*tx)
520 .await?;
521
522 let has_any_statuses = !update.updated_statuses.is_empty();
523
524 if has_any_statuses {
525 project_repository_statuses::Entity::insert_many(
526 update.updated_statuses.iter().map(|status_entry| {
527 let (repo_path, status_kind, first_status, second_status) =
528 proto_status_to_db(status_entry.clone());
529 project_repository_statuses::ActiveModel {
530 project_id: ActiveValue::set(project_id),
531 repository_id: ActiveValue::set(repository_id),
532 scan_id: ActiveValue::set(update.scan_id as i64),
533 is_deleted: ActiveValue::set(false),
534 repo_path: ActiveValue::set(repo_path),
535 status: ActiveValue::set(0),
536 status_kind: ActiveValue::set(status_kind),
537 first_status: ActiveValue::set(first_status),
538 second_status: ActiveValue::set(second_status),
539 }
540 }),
541 )
542 .on_conflict(
543 OnConflict::columns([
544 project_repository_statuses::Column::ProjectId,
545 project_repository_statuses::Column::RepositoryId,
546 project_repository_statuses::Column::RepoPath,
547 ])
548 .update_columns([
549 project_repository_statuses::Column::ScanId,
550 project_repository_statuses::Column::StatusKind,
551 project_repository_statuses::Column::FirstStatus,
552 project_repository_statuses::Column::SecondStatus,
553 ])
554 .to_owned(),
555 )
556 .exec(&*tx)
557 .await?;
558 }
559
560 let has_any_removed_statuses = !update.removed_statuses.is_empty();
561
562 if has_any_removed_statuses {
563 project_repository_statuses::Entity::update_many()
564 .filter(
565 project_repository_statuses::Column::ProjectId
566 .eq(project_id)
567 .and(
568 project_repository_statuses::Column::RepositoryId.eq(repository_id),
569 )
570 .and(
571 project_repository_statuses::Column::RepoPath
572 .is_in(update.removed_statuses.iter()),
573 ),
574 )
575 .set(project_repository_statuses::ActiveModel {
576 is_deleted: ActiveValue::Set(true),
577 scan_id: ActiveValue::Set(update.scan_id as i64),
578 ..Default::default()
579 })
580 .exec(&*tx)
581 .await?;
582 }
583
584 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
585 Ok(connection_ids)
586 })
587 .await
588 }
589
590 pub async fn remove_repository(
591 &self,
592 remove: &proto::RemoveRepository,
593 _connection: ConnectionId,
594 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
595 let project_id = ProjectId::from_proto(remove.project_id);
596 let repository_id = remove.id as i64;
597 self.project_transaction(project_id, |tx| async move {
598 project_repository::Entity::update_many()
599 .filter(
600 project_repository::Column::ProjectId
601 .eq(project_id)
602 .and(project_repository::Column::Id.eq(repository_id)),
603 )
604 .set(project_repository::ActiveModel {
605 is_deleted: ActiveValue::Set(true),
606 // scan_id: ActiveValue::Set(update.scan_id as i64),
607 ..Default::default()
608 })
609 .exec(&*tx)
610 .await?;
611
612 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
613 Ok(connection_ids)
614 })
615 .await
616 }
617
618 /// Updates the diagnostic summary for the given connection.
619 pub async fn update_diagnostic_summary(
620 &self,
621 update: &proto::UpdateDiagnosticSummary,
622 connection: ConnectionId,
623 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
624 let project_id = ProjectId::from_proto(update.project_id);
625 let worktree_id = update.worktree_id as i64;
626 self.project_transaction(project_id, |tx| async move {
627 let summary = update
628 .summary
629 .as_ref()
630 .ok_or_else(|| anyhow!("invalid summary"))?;
631
632 // Ensure the update comes from the host.
633 let project = project::Entity::find_by_id(project_id)
634 .one(&*tx)
635 .await?
636 .ok_or_else(|| anyhow!("no such project"))?;
637 if project.host_connection()? != connection {
638 return Err(anyhow!("can't update a project hosted by someone else"))?;
639 }
640
641 // Update summary.
642 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
643 project_id: ActiveValue::set(project_id),
644 worktree_id: ActiveValue::set(worktree_id),
645 path: ActiveValue::set(summary.path.clone()),
646 language_server_id: ActiveValue::set(summary.language_server_id as i64),
647 error_count: ActiveValue::set(summary.error_count as i32),
648 warning_count: ActiveValue::set(summary.warning_count as i32),
649 })
650 .on_conflict(
651 OnConflict::columns([
652 worktree_diagnostic_summary::Column::ProjectId,
653 worktree_diagnostic_summary::Column::WorktreeId,
654 worktree_diagnostic_summary::Column::Path,
655 ])
656 .update_columns([
657 worktree_diagnostic_summary::Column::LanguageServerId,
658 worktree_diagnostic_summary::Column::ErrorCount,
659 worktree_diagnostic_summary::Column::WarningCount,
660 ])
661 .to_owned(),
662 )
663 .exec(&*tx)
664 .await?;
665
666 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
667 Ok(connection_ids)
668 })
669 .await
670 }
671
672 /// Starts the language server for the given connection.
673 pub async fn start_language_server(
674 &self,
675 update: &proto::StartLanguageServer,
676 connection: ConnectionId,
677 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
678 let project_id = ProjectId::from_proto(update.project_id);
679 self.project_transaction(project_id, |tx| async move {
680 let server = update
681 .server
682 .as_ref()
683 .ok_or_else(|| anyhow!("invalid language server"))?;
684
685 // Ensure the update comes from the host.
686 let project = project::Entity::find_by_id(project_id)
687 .one(&*tx)
688 .await?
689 .ok_or_else(|| anyhow!("no such project"))?;
690 if project.host_connection()? != connection {
691 return Err(anyhow!("can't update a project hosted by someone else"))?;
692 }
693
694 // Add the newly-started language server.
695 language_server::Entity::insert(language_server::ActiveModel {
696 project_id: ActiveValue::set(project_id),
697 id: ActiveValue::set(server.id as i64),
698 name: ActiveValue::set(server.name.clone()),
699 })
700 .on_conflict(
701 OnConflict::columns([
702 language_server::Column::ProjectId,
703 language_server::Column::Id,
704 ])
705 .update_column(language_server::Column::Name)
706 .to_owned(),
707 )
708 .exec(&*tx)
709 .await?;
710
711 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
712 Ok(connection_ids)
713 })
714 .await
715 }
716
717 /// Updates the worktree settings for the given connection.
718 pub async fn update_worktree_settings(
719 &self,
720 update: &proto::UpdateWorktreeSettings,
721 connection: ConnectionId,
722 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
723 let project_id = ProjectId::from_proto(update.project_id);
724 let kind = match update.kind {
725 Some(kind) => proto::LocalSettingsKind::from_i32(kind)
726 .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
727 None => proto::LocalSettingsKind::Settings,
728 };
729 let kind = LocalSettingsKind::from_proto(kind);
730 self.project_transaction(project_id, |tx| async move {
731 // Ensure the update comes from the host.
732 let project = project::Entity::find_by_id(project_id)
733 .one(&*tx)
734 .await?
735 .ok_or_else(|| anyhow!("no such project"))?;
736 if project.host_connection()? != connection {
737 return Err(anyhow!("can't update a project hosted by someone else"))?;
738 }
739
740 if let Some(content) = &update.content {
741 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
742 project_id: ActiveValue::Set(project_id),
743 worktree_id: ActiveValue::Set(update.worktree_id as i64),
744 path: ActiveValue::Set(update.path.clone()),
745 content: ActiveValue::Set(content.clone()),
746 kind: ActiveValue::Set(kind),
747 })
748 .on_conflict(
749 OnConflict::columns([
750 worktree_settings_file::Column::ProjectId,
751 worktree_settings_file::Column::WorktreeId,
752 worktree_settings_file::Column::Path,
753 ])
754 .update_column(worktree_settings_file::Column::Content)
755 .to_owned(),
756 )
757 .exec(&*tx)
758 .await?;
759 } else {
760 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
761 project_id: ActiveValue::Set(project_id),
762 worktree_id: ActiveValue::Set(update.worktree_id as i64),
763 path: ActiveValue::Set(update.path.clone()),
764 ..Default::default()
765 })
766 .exec(&*tx)
767 .await?;
768 }
769
770 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
771 Ok(connection_ids)
772 })
773 .await
774 }
775
776 pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
777 self.transaction(|tx| async move {
778 Ok(project::Entity::find_by_id(id)
779 .one(&*tx)
780 .await?
781 .ok_or_else(|| anyhow!("no such project"))?)
782 })
783 .await
784 }
785
786 /// Adds the given connection to the specified project
787 /// in the current room.
788 pub async fn join_project(
789 &self,
790 project_id: ProjectId,
791 connection: ConnectionId,
792 user_id: UserId,
793 ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
794 self.project_transaction(project_id, |tx| async move {
795 let (project, role) = self
796 .access_project(project_id, connection, Capability::ReadOnly, &tx)
797 .await?;
798 self.join_project_internal(project, user_id, connection, role, &tx)
799 .await
800 })
801 .await
802 }
803
804 async fn join_project_internal(
805 &self,
806 project: project::Model,
807 user_id: UserId,
808 connection: ConnectionId,
809 role: ChannelRole,
810 tx: &DatabaseTransaction,
811 ) -> Result<(Project, ReplicaId)> {
812 let mut collaborators = project
813 .find_related(project_collaborator::Entity)
814 .all(tx)
815 .await?;
816 let replica_ids = collaborators
817 .iter()
818 .map(|c| c.replica_id)
819 .collect::<HashSet<_>>();
820 let mut replica_id = ReplicaId(1);
821 while replica_ids.contains(&replica_id) {
822 replica_id.0 += 1;
823 }
824 let new_collaborator = project_collaborator::ActiveModel {
825 project_id: ActiveValue::set(project.id),
826 connection_id: ActiveValue::set(connection.id as i32),
827 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
828 user_id: ActiveValue::set(user_id),
829 replica_id: ActiveValue::set(replica_id),
830 is_host: ActiveValue::set(false),
831 ..Default::default()
832 }
833 .insert(tx)
834 .await?;
835 collaborators.push(new_collaborator);
836
837 let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
838 let mut worktrees = db_worktrees
839 .into_iter()
840 .map(|db_worktree| {
841 (
842 db_worktree.id as u64,
843 Worktree {
844 id: db_worktree.id as u64,
845 abs_path: db_worktree.abs_path,
846 root_name: db_worktree.root_name,
847 visible: db_worktree.visible,
848 entries: Default::default(),
849 diagnostic_summaries: Default::default(),
850 settings_files: Default::default(),
851 scan_id: db_worktree.scan_id as u64,
852 completed_scan_id: db_worktree.completed_scan_id as u64,
853 legacy_repository_entries: Default::default(),
854 },
855 )
856 })
857 .collect::<BTreeMap<_, _>>();
858
859 // Populate worktree entries.
860 {
861 let mut db_entries = worktree_entry::Entity::find()
862 .filter(
863 Condition::all()
864 .add(worktree_entry::Column::ProjectId.eq(project.id))
865 .add(worktree_entry::Column::IsDeleted.eq(false)),
866 )
867 .stream(tx)
868 .await?;
869 while let Some(db_entry) = db_entries.next().await {
870 let db_entry = db_entry?;
871 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
872 worktree.entries.push(proto::Entry {
873 id: db_entry.id as u64,
874 is_dir: db_entry.is_dir,
875 path: db_entry.path,
876 inode: db_entry.inode as u64,
877 mtime: Some(proto::Timestamp {
878 seconds: db_entry.mtime_seconds as u64,
879 nanos: db_entry.mtime_nanos as u32,
880 }),
881 canonical_path: db_entry.canonical_path,
882 is_ignored: db_entry.is_ignored,
883 is_external: db_entry.is_external,
884 // This is only used in the summarization backlog, so if it's None,
885 // that just means we won't be able to detect when to resummarize
886 // based on total number of backlogged bytes - instead, we'd go
887 // on number of files only. That shouldn't be a huge deal in practice.
888 size: None,
889 is_fifo: db_entry.is_fifo,
890 });
891 }
892 }
893 }
894
895 // Populate repository entries.
896 let mut repositories = Vec::new();
897 {
898 let db_repository_entries = project_repository::Entity::find()
899 .filter(
900 Condition::all()
901 .add(project_repository::Column::ProjectId.eq(project.id))
902 .add(project_repository::Column::IsDeleted.eq(false)),
903 )
904 .all(tx)
905 .await?;
906 for db_repository_entry in db_repository_entries {
907 let mut repository_statuses = project_repository_statuses::Entity::find()
908 .filter(
909 Condition::all()
910 .add(project_repository_statuses::Column::ProjectId.eq(project.id))
911 .add(
912 project_repository_statuses::Column::RepositoryId
913 .eq(db_repository_entry.id),
914 )
915 .add(project_repository_statuses::Column::IsDeleted.eq(false)),
916 )
917 .stream(tx)
918 .await?;
919 let mut updated_statuses = Vec::new();
920 while let Some(status_entry) = repository_statuses.next().await {
921 let status_entry = status_entry?;
922 updated_statuses.push(db_status_to_proto(status_entry)?);
923 }
924
925 let current_merge_conflicts = db_repository_entry
926 .current_merge_conflicts
927 .as_ref()
928 .map(|conflicts| serde_json::from_str(&conflicts))
929 .transpose()?
930 .unwrap_or_default();
931
932 let branch_summary = db_repository_entry
933 .branch_summary
934 .as_ref()
935 .map(|branch_summary| serde_json::from_str(&branch_summary))
936 .transpose()?
937 .unwrap_or_default();
938
939 let head_commit_details = db_repository_entry
940 .head_commit_details
941 .as_ref()
942 .map(|head_commit_details| serde_json::from_str(&head_commit_details))
943 .transpose()?
944 .unwrap_or_default();
945
946 let entry_ids = serde_json::from_str(&db_repository_entry.entry_ids)
947 .context("failed to deserialize repository's entry ids")?;
948
949 if let Some(worktree_id) = db_repository_entry.legacy_worktree_id {
950 if let Some(worktree) = worktrees.get_mut(&(worktree_id as u64)) {
951 worktree.legacy_repository_entries.insert(
952 db_repository_entry.id as u64,
953 proto::RepositoryEntry {
954 repository_id: db_repository_entry.id as u64,
955 updated_statuses,
956 removed_statuses: Vec::new(),
957 current_merge_conflicts,
958 branch_summary,
959 },
960 );
961 }
962 } else {
963 repositories.push(proto::UpdateRepository {
964 project_id: db_repository_entry.project_id.0 as u64,
965 id: db_repository_entry.id as u64,
966 abs_path: db_repository_entry.abs_path,
967 entry_ids,
968 updated_statuses,
969 removed_statuses: Vec::new(),
970 current_merge_conflicts,
971 branch_summary,
972 head_commit_details,
973 scan_id: db_repository_entry.scan_id as u64,
974 is_last_update: true,
975 });
976 }
977 }
978 }
979
980 // Populate worktree diagnostic summaries.
981 {
982 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
983 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
984 .stream(tx)
985 .await?;
986 while let Some(db_summary) = db_summaries.next().await {
987 let db_summary = db_summary?;
988 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
989 worktree
990 .diagnostic_summaries
991 .push(proto::DiagnosticSummary {
992 path: db_summary.path,
993 language_server_id: db_summary.language_server_id as u64,
994 error_count: db_summary.error_count as u32,
995 warning_count: db_summary.warning_count as u32,
996 });
997 }
998 }
999 }
1000
1001 // Populate worktree settings files
1002 {
1003 let mut db_settings_files = worktree_settings_file::Entity::find()
1004 .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
1005 .stream(tx)
1006 .await?;
1007 while let Some(db_settings_file) = db_settings_files.next().await {
1008 let db_settings_file = db_settings_file?;
1009 if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
1010 worktree.settings_files.push(WorktreeSettingsFile {
1011 path: db_settings_file.path,
1012 content: db_settings_file.content,
1013 kind: db_settings_file.kind,
1014 });
1015 }
1016 }
1017 }
1018
1019 // Populate language servers.
1020 let language_servers = project
1021 .find_related(language_server::Entity)
1022 .all(tx)
1023 .await?;
1024
1025 let project = Project {
1026 id: project.id,
1027 role,
1028 collaborators: collaborators
1029 .into_iter()
1030 .map(|collaborator| ProjectCollaborator {
1031 connection_id: collaborator.connection(),
1032 user_id: collaborator.user_id,
1033 replica_id: collaborator.replica_id,
1034 is_host: collaborator.is_host,
1035 })
1036 .collect(),
1037 worktrees,
1038 repositories,
1039 language_servers: language_servers
1040 .into_iter()
1041 .map(|language_server| proto::LanguageServer {
1042 id: language_server.id as u64,
1043 name: language_server.name,
1044 worktree_id: None,
1045 })
1046 .collect(),
1047 };
1048 Ok((project, replica_id as ReplicaId))
1049 }
1050
1051 /// Removes the given connection from the specified project.
1052 pub async fn leave_project(
1053 &self,
1054 project_id: ProjectId,
1055 connection: ConnectionId,
1056 ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
1057 self.project_transaction(project_id, |tx| async move {
1058 let result = project_collaborator::Entity::delete_many()
1059 .filter(
1060 Condition::all()
1061 .add(project_collaborator::Column::ProjectId.eq(project_id))
1062 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
1063 .add(
1064 project_collaborator::Column::ConnectionServerId
1065 .eq(connection.owner_id as i32),
1066 ),
1067 )
1068 .exec(&*tx)
1069 .await?;
1070 if result.rows_affected == 0 {
1071 Err(anyhow!("not a collaborator on this project"))?;
1072 }
1073
1074 let project = project::Entity::find_by_id(project_id)
1075 .one(&*tx)
1076 .await?
1077 .ok_or_else(|| anyhow!("no such project"))?;
1078 let collaborators = project
1079 .find_related(project_collaborator::Entity)
1080 .all(&*tx)
1081 .await?;
1082 let connection_ids: Vec<ConnectionId> = collaborators
1083 .into_iter()
1084 .map(|collaborator| collaborator.connection())
1085 .collect();
1086
1087 follower::Entity::delete_many()
1088 .filter(
1089 Condition::any()
1090 .add(
1091 Condition::all()
1092 .add(follower::Column::ProjectId.eq(Some(project_id)))
1093 .add(
1094 follower::Column::LeaderConnectionServerId
1095 .eq(connection.owner_id),
1096 )
1097 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
1098 )
1099 .add(
1100 Condition::all()
1101 .add(follower::Column::ProjectId.eq(Some(project_id)))
1102 .add(
1103 follower::Column::FollowerConnectionServerId
1104 .eq(connection.owner_id),
1105 )
1106 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
1107 ),
1108 )
1109 .exec(&*tx)
1110 .await?;
1111
1112 let room = if let Some(room_id) = project.room_id {
1113 Some(self.get_room(room_id, &tx).await?)
1114 } else {
1115 None
1116 };
1117
1118 let left_project = LeftProject {
1119 id: project_id,
1120 should_unshare: connection == project.host_connection()?,
1121 connection_ids,
1122 };
1123 Ok((room, left_project))
1124 })
1125 .await
1126 }
1127
1128 pub async fn check_user_is_project_host(
1129 &self,
1130 project_id: ProjectId,
1131 connection_id: ConnectionId,
1132 ) -> Result<()> {
1133 self.project_transaction(project_id, |tx| async move {
1134 project::Entity::find()
1135 .filter(
1136 Condition::all()
1137 .add(project::Column::Id.eq(project_id))
1138 .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
1139 .add(
1140 project::Column::HostConnectionServerId
1141 .eq(Some(connection_id.owner_id as i32)),
1142 ),
1143 )
1144 .one(&*tx)
1145 .await?
1146 .ok_or_else(|| anyhow!("failed to read project host"))?;
1147
1148 Ok(())
1149 })
1150 .await
1151 .map(|guard| guard.into_inner())
1152 }
1153
1154 /// Returns the current project if the given user is authorized to access it with the specified capability.
1155 pub async fn access_project(
1156 &self,
1157 project_id: ProjectId,
1158 connection_id: ConnectionId,
1159 capability: Capability,
1160 tx: &DatabaseTransaction,
1161 ) -> Result<(project::Model, ChannelRole)> {
1162 let project = project::Entity::find_by_id(project_id)
1163 .one(tx)
1164 .await?
1165 .ok_or_else(|| anyhow!("no such project"))?;
1166
1167 let role_from_room = if let Some(room_id) = project.room_id {
1168 room_participant::Entity::find()
1169 .filter(room_participant::Column::RoomId.eq(room_id))
1170 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1171 .one(tx)
1172 .await?
1173 .and_then(|participant| participant.role)
1174 } else {
1175 None
1176 };
1177
1178 let role = role_from_room.unwrap_or(ChannelRole::Banned);
1179
1180 match capability {
1181 Capability::ReadWrite => {
1182 if !role.can_edit_projects() {
1183 return Err(anyhow!("not authorized to edit projects"))?;
1184 }
1185 }
1186 Capability::ReadOnly => {
1187 if !role.can_read_projects() {
1188 return Err(anyhow!("not authorized to read projects"))?;
1189 }
1190 }
1191 }
1192
1193 Ok((project, role))
1194 }
1195
1196 /// Returns the host connection for a read-only request to join a shared project.
1197 pub async fn host_for_read_only_project_request(
1198 &self,
1199 project_id: ProjectId,
1200 connection_id: ConnectionId,
1201 ) -> Result<ConnectionId> {
1202 self.project_transaction(project_id, |tx| async move {
1203 let (project, _) = self
1204 .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1205 .await?;
1206 project.host_connection()
1207 })
1208 .await
1209 .map(|guard| guard.into_inner())
1210 }
1211
1212 /// Returns the host connection for a request to join a shared project.
1213 pub async fn host_for_mutating_project_request(
1214 &self,
1215 project_id: ProjectId,
1216 connection_id: ConnectionId,
1217 ) -> Result<ConnectionId> {
1218 self.project_transaction(project_id, |tx| async move {
1219 let (project, _) = self
1220 .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1221 .await?;
1222 project.host_connection()
1223 })
1224 .await
1225 .map(|guard| guard.into_inner())
1226 }
1227
1228 pub async fn connections_for_buffer_update(
1229 &self,
1230 project_id: ProjectId,
1231 connection_id: ConnectionId,
1232 capability: Capability,
1233 ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1234 self.project_transaction(project_id, |tx| async move {
1235 // Authorize
1236 let (project, _) = self
1237 .access_project(project_id, connection_id, capability, &tx)
1238 .await?;
1239
1240 let host_connection_id = project.host_connection()?;
1241
1242 let collaborators = project_collaborator::Entity::find()
1243 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1244 .all(&*tx)
1245 .await?;
1246
1247 let guest_connection_ids = collaborators
1248 .into_iter()
1249 .filter_map(|collaborator| {
1250 if collaborator.is_host {
1251 None
1252 } else {
1253 Some(collaborator.connection())
1254 }
1255 })
1256 .collect();
1257
1258 Ok((host_connection_id, guest_connection_ids))
1259 })
1260 .await
1261 }
1262
1263 /// Returns the connection IDs in the given project.
1264 ///
1265 /// The provided `connection_id` must also be a collaborator in the project,
1266 /// otherwise an error will be returned.
1267 pub async fn project_connection_ids(
1268 &self,
1269 project_id: ProjectId,
1270 connection_id: ConnectionId,
1271 exclude_dev_server: bool,
1272 ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1273 self.project_transaction(project_id, |tx| async move {
1274 self.internal_project_connection_ids(project_id, connection_id, exclude_dev_server, &tx)
1275 .await
1276 })
1277 .await
1278 }
1279
1280 async fn internal_project_connection_ids(
1281 &self,
1282 project_id: ProjectId,
1283 connection_id: ConnectionId,
1284 exclude_dev_server: bool,
1285 tx: &DatabaseTransaction,
1286 ) -> Result<HashSet<ConnectionId>> {
1287 let project = project::Entity::find_by_id(project_id)
1288 .one(tx)
1289 .await?
1290 .ok_or_else(|| anyhow!("no such project"))?;
1291
1292 let mut collaborators = project_collaborator::Entity::find()
1293 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1294 .stream(tx)
1295 .await?;
1296
1297 let mut connection_ids = HashSet::default();
1298 if let Some(host_connection) = project.host_connection().log_err() {
1299 if !exclude_dev_server {
1300 connection_ids.insert(host_connection);
1301 }
1302 }
1303
1304 while let Some(collaborator) = collaborators.next().await {
1305 let collaborator = collaborator?;
1306 connection_ids.insert(collaborator.connection());
1307 }
1308
1309 if connection_ids.contains(&connection_id)
1310 || Some(connection_id) == project.host_connection().ok()
1311 {
1312 Ok(connection_ids)
1313 } else {
1314 Err(anyhow!(
1315 "can only send project updates to a project you're in"
1316 ))?
1317 }
1318 }
1319
1320 async fn project_guest_connection_ids(
1321 &self,
1322 project_id: ProjectId,
1323 tx: &DatabaseTransaction,
1324 ) -> Result<Vec<ConnectionId>> {
1325 let mut collaborators = project_collaborator::Entity::find()
1326 .filter(
1327 project_collaborator::Column::ProjectId
1328 .eq(project_id)
1329 .and(project_collaborator::Column::IsHost.eq(false)),
1330 )
1331 .stream(tx)
1332 .await?;
1333
1334 let mut guest_connection_ids = Vec::new();
1335 while let Some(collaborator) = collaborators.next().await {
1336 let collaborator = collaborator?;
1337 guest_connection_ids.push(collaborator.connection());
1338 }
1339 Ok(guest_connection_ids)
1340 }
1341
1342 /// Returns the [`RoomId`] for the given project.
1343 pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1344 self.transaction(|tx| async move {
1345 Ok(project::Entity::find_by_id(project_id)
1346 .one(&*tx)
1347 .await?
1348 .and_then(|project| project.room_id))
1349 })
1350 .await
1351 }
1352
1353 pub async fn check_room_participants(
1354 &self,
1355 room_id: RoomId,
1356 leader_id: ConnectionId,
1357 follower_id: ConnectionId,
1358 ) -> Result<()> {
1359 self.transaction(|tx| async move {
1360 use room_participant::Column;
1361
1362 let count = room_participant::Entity::find()
1363 .filter(
1364 Condition::all().add(Column::RoomId.eq(room_id)).add(
1365 Condition::any()
1366 .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1367 Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1368 ))
1369 .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1370 Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1371 )),
1372 ),
1373 )
1374 .count(&*tx)
1375 .await?;
1376
1377 if count < 2 {
1378 Err(anyhow!("not room participants"))?;
1379 }
1380
1381 Ok(())
1382 })
1383 .await
1384 }
1385
1386 /// Adds the given follower connection as a follower of the given leader connection.
1387 pub async fn follow(
1388 &self,
1389 room_id: RoomId,
1390 project_id: ProjectId,
1391 leader_connection: ConnectionId,
1392 follower_connection: ConnectionId,
1393 ) -> Result<TransactionGuard<proto::Room>> {
1394 self.room_transaction(room_id, |tx| async move {
1395 follower::ActiveModel {
1396 room_id: ActiveValue::set(room_id),
1397 project_id: ActiveValue::set(project_id),
1398 leader_connection_server_id: ActiveValue::set(ServerId(
1399 leader_connection.owner_id as i32,
1400 )),
1401 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1402 follower_connection_server_id: ActiveValue::set(ServerId(
1403 follower_connection.owner_id as i32,
1404 )),
1405 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1406 ..Default::default()
1407 }
1408 .insert(&*tx)
1409 .await?;
1410
1411 let room = self.get_room(room_id, &tx).await?;
1412 Ok(room)
1413 })
1414 .await
1415 }
1416
1417 /// Removes the given follower connection as a follower of the given leader connection.
1418 pub async fn unfollow(
1419 &self,
1420 room_id: RoomId,
1421 project_id: ProjectId,
1422 leader_connection: ConnectionId,
1423 follower_connection: ConnectionId,
1424 ) -> Result<TransactionGuard<proto::Room>> {
1425 self.room_transaction(room_id, |tx| async move {
1426 follower::Entity::delete_many()
1427 .filter(
1428 Condition::all()
1429 .add(follower::Column::RoomId.eq(room_id))
1430 .add(follower::Column::ProjectId.eq(project_id))
1431 .add(
1432 follower::Column::LeaderConnectionServerId
1433 .eq(leader_connection.owner_id),
1434 )
1435 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1436 .add(
1437 follower::Column::FollowerConnectionServerId
1438 .eq(follower_connection.owner_id),
1439 )
1440 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1441 )
1442 .exec(&*tx)
1443 .await?;
1444
1445 let room = self.get_room(room_id, &tx).await?;
1446 Ok(room)
1447 })
1448 .await
1449 }
1450}