1use anyhow::Context as _;
2use collections::HashSet;
3use util::ResultExt;
4
5use super::*;
6
7impl Database {
8 /// Returns the count of all projects, excluding ones marked as admin.
9 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
10 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
11 enum QueryAs {
12 Count,
13 }
14
15 self.transaction(|tx| async move {
16 Ok(project::Entity::find()
17 .select_only()
18 .column_as(project::Column::Id.count(), QueryAs::Count)
19 .inner_join(user::Entity)
20 .filter(user::Column::Admin.eq(false))
21 .into_values::<_, QueryAs>()
22 .one(&*tx)
23 .await?
24 .unwrap_or(0i64) as usize)
25 })
26 .await
27 }
28
29 /// Shares a project with the given room.
30 pub async fn share_project(
31 &self,
32 room_id: RoomId,
33 connection: ConnectionId,
34 worktrees: &[proto::WorktreeMetadata],
35 is_ssh_project: bool,
36 ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
37 self.room_transaction(room_id, |tx| async move {
38 let participant = room_participant::Entity::find()
39 .filter(
40 Condition::all()
41 .add(
42 room_participant::Column::AnsweringConnectionId
43 .eq(connection.id as i32),
44 )
45 .add(
46 room_participant::Column::AnsweringConnectionServerId
47 .eq(connection.owner_id as i32),
48 ),
49 )
50 .one(&*tx)
51 .await?
52 .context("could not find participant")?;
53 if participant.room_id != room_id {
54 return Err(anyhow!("shared project on unexpected room"))?;
55 }
56 if !participant
57 .role
58 .unwrap_or(ChannelRole::Member)
59 .can_edit_projects()
60 {
61 return Err(anyhow!("guests cannot share projects"))?;
62 }
63
64 let project = project::ActiveModel {
65 room_id: ActiveValue::set(Some(participant.room_id)),
66 host_user_id: ActiveValue::set(Some(participant.user_id)),
67 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
68 host_connection_server_id: ActiveValue::set(Some(ServerId(
69 connection.owner_id as i32,
70 ))),
71 id: ActiveValue::NotSet,
72 }
73 .insert(&*tx)
74 .await?;
75
76 if !worktrees.is_empty() {
77 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
78 worktree::ActiveModel {
79 id: ActiveValue::set(worktree.id as i64),
80 project_id: ActiveValue::set(project.id),
81 abs_path: ActiveValue::set(worktree.abs_path.clone()),
82 root_name: ActiveValue::set(worktree.root_name.clone()),
83 visible: ActiveValue::set(worktree.visible),
84 scan_id: ActiveValue::set(0),
85 completed_scan_id: ActiveValue::set(0),
86 }
87 }))
88 .exec(&*tx)
89 .await?;
90 }
91
92 let replica_id = if is_ssh_project { 1 } else { 0 };
93
94 project_collaborator::ActiveModel {
95 project_id: ActiveValue::set(project.id),
96 connection_id: ActiveValue::set(connection.id as i32),
97 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
98 user_id: ActiveValue::set(participant.user_id),
99 replica_id: ActiveValue::set(ReplicaId(replica_id)),
100 is_host: ActiveValue::set(true),
101 id: ActiveValue::NotSet,
102 committer_name: ActiveValue::Set(None),
103 committer_email: ActiveValue::Set(None),
104 }
105 .insert(&*tx)
106 .await?;
107
108 let room = self.get_room(room_id, &tx).await?;
109 Ok((project.id, room))
110 })
111 .await
112 }
113
114 pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
115 self.transaction(|tx| async move {
116 project::Entity::delete_by_id(project_id).exec(&*tx).await?;
117 Ok(())
118 })
119 .await
120 }
121
122 /// Unshares the given project.
123 pub async fn unshare_project(
124 &self,
125 project_id: ProjectId,
126 connection: ConnectionId,
127 ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
128 self.project_transaction(project_id, |tx| async move {
129 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
130 let project = project::Entity::find_by_id(project_id)
131 .one(&*tx)
132 .await?
133 .context("project not found")?;
134 let room = if let Some(room_id) = project.room_id {
135 Some(self.get_room(room_id, &tx).await?)
136 } else {
137 None
138 };
139 if project.host_connection()? == connection {
140 return Ok((true, room, guest_connection_ids));
141 }
142 Err(anyhow!("cannot unshare a project hosted by another user"))?
143 })
144 .await
145 }
146
147 /// Updates the worktrees associated with the given project.
148 pub async fn update_project(
149 &self,
150 project_id: ProjectId,
151 connection: ConnectionId,
152 worktrees: &[proto::WorktreeMetadata],
153 ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
154 self.project_transaction(project_id, |tx| async move {
155 let project = project::Entity::find_by_id(project_id)
156 .filter(
157 Condition::all()
158 .add(project::Column::HostConnectionId.eq(connection.id as i32))
159 .add(
160 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
161 ),
162 )
163 .one(&*tx)
164 .await?
165 .context("no such project")?;
166
167 self.update_project_worktrees(project.id, worktrees, &tx)
168 .await?;
169
170 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
171
172 let room = if let Some(room_id) = project.room_id {
173 Some(self.get_room(room_id, &tx).await?)
174 } else {
175 None
176 };
177
178 Ok((room, guest_connection_ids))
179 })
180 .await
181 }
182
183 pub(in crate::db) async fn update_project_worktrees(
184 &self,
185 project_id: ProjectId,
186 worktrees: &[proto::WorktreeMetadata],
187 tx: &DatabaseTransaction,
188 ) -> Result<()> {
189 if !worktrees.is_empty() {
190 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
191 id: ActiveValue::set(worktree.id as i64),
192 project_id: ActiveValue::set(project_id),
193 abs_path: ActiveValue::set(worktree.abs_path.clone()),
194 root_name: ActiveValue::set(worktree.root_name.clone()),
195 visible: ActiveValue::set(worktree.visible),
196 scan_id: ActiveValue::set(0),
197 completed_scan_id: ActiveValue::set(0),
198 }))
199 .on_conflict(
200 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
201 .update_column(worktree::Column::RootName)
202 .to_owned(),
203 )
204 .exec(tx)
205 .await?;
206 }
207
208 worktree::Entity::delete_many()
209 .filter(worktree::Column::ProjectId.eq(project_id).and(
210 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
211 ))
212 .exec(tx)
213 .await?;
214
215 Ok(())
216 }
217
218 pub async fn update_worktree(
219 &self,
220 update: &proto::UpdateWorktree,
221 connection: ConnectionId,
222 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
223 if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
224 || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
225 {
226 return Err(anyhow!(
227 "invalid worktree update. removed entries: {}, updated entries: {}",
228 update.removed_entries.len(),
229 update.updated_entries.len()
230 ))?;
231 }
232
233 let project_id = ProjectId::from_proto(update.project_id);
234 let worktree_id = update.worktree_id as i64;
235 self.project_transaction(project_id, |tx| async move {
236 // Ensure the update comes from the host.
237 let _project = project::Entity::find_by_id(project_id)
238 .filter(
239 Condition::all()
240 .add(project::Column::HostConnectionId.eq(connection.id as i32))
241 .add(
242 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
243 ),
244 )
245 .one(&*tx)
246 .await?
247 .with_context(|| format!("no such project: {project_id}"))?;
248
249 // Update metadata.
250 worktree::Entity::update(worktree::ActiveModel {
251 id: ActiveValue::set(worktree_id),
252 project_id: ActiveValue::set(project_id),
253 root_name: ActiveValue::set(update.root_name.clone()),
254 scan_id: ActiveValue::set(update.scan_id as i64),
255 completed_scan_id: if update.is_last_update {
256 ActiveValue::set(update.scan_id as i64)
257 } else {
258 ActiveValue::default()
259 },
260 abs_path: ActiveValue::set(update.abs_path.clone()),
261 ..Default::default()
262 })
263 .exec(&*tx)
264 .await?;
265
266 if !update.updated_entries.is_empty() {
267 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
268 let mtime = entry.mtime.clone().unwrap_or_default();
269 worktree_entry::ActiveModel {
270 project_id: ActiveValue::set(project_id),
271 worktree_id: ActiveValue::set(worktree_id),
272 id: ActiveValue::set(entry.id as i64),
273 is_dir: ActiveValue::set(entry.is_dir),
274 path: ActiveValue::set(entry.path.clone()),
275 inode: ActiveValue::set(entry.inode as i64),
276 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
277 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
278 canonical_path: ActiveValue::set(entry.canonical_path.clone()),
279 is_ignored: ActiveValue::set(entry.is_ignored),
280 git_status: ActiveValue::set(None),
281 is_external: ActiveValue::set(entry.is_external),
282 is_deleted: ActiveValue::set(false),
283 scan_id: ActiveValue::set(update.scan_id as i64),
284 is_fifo: ActiveValue::set(entry.is_fifo),
285 }
286 }))
287 .on_conflict(
288 OnConflict::columns([
289 worktree_entry::Column::ProjectId,
290 worktree_entry::Column::WorktreeId,
291 worktree_entry::Column::Id,
292 ])
293 .update_columns([
294 worktree_entry::Column::IsDir,
295 worktree_entry::Column::Path,
296 worktree_entry::Column::Inode,
297 worktree_entry::Column::MtimeSeconds,
298 worktree_entry::Column::MtimeNanos,
299 worktree_entry::Column::CanonicalPath,
300 worktree_entry::Column::IsIgnored,
301 worktree_entry::Column::ScanId,
302 ])
303 .to_owned(),
304 )
305 .exec(&*tx)
306 .await?;
307 }
308
309 if !update.removed_entries.is_empty() {
310 worktree_entry::Entity::update_many()
311 .filter(
312 worktree_entry::Column::ProjectId
313 .eq(project_id)
314 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
315 .and(
316 worktree_entry::Column::Id
317 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
318 ),
319 )
320 .set(worktree_entry::ActiveModel {
321 is_deleted: ActiveValue::Set(true),
322 scan_id: ActiveValue::Set(update.scan_id as i64),
323 ..Default::default()
324 })
325 .exec(&*tx)
326 .await?;
327 }
328
329 // Backward-compatibility for old Zed clients.
330 //
331 // Remove this block when Zed 1.80 stable has been out for a week.
332 {
333 if !update.updated_repositories.is_empty() {
334 project_repository::Entity::insert_many(
335 update.updated_repositories.iter().map(|repository| {
336 project_repository::ActiveModel {
337 project_id: ActiveValue::set(project_id),
338 legacy_worktree_id: ActiveValue::set(Some(worktree_id)),
339 id: ActiveValue::set(repository.repository_id as i64),
340 scan_id: ActiveValue::set(update.scan_id as i64),
341 is_deleted: ActiveValue::set(false),
342 branch_summary: ActiveValue::Set(
343 repository
344 .branch_summary
345 .as_ref()
346 .map(|summary| serde_json::to_string(summary).unwrap()),
347 ),
348 current_merge_conflicts: ActiveValue::Set(Some(
349 serde_json::to_string(&repository.current_merge_conflicts)
350 .unwrap(),
351 )),
352
353 // Old clients do not use abs path, entry ids or head_commit_details.
354 abs_path: ActiveValue::set(String::new()),
355 entry_ids: ActiveValue::set("[]".into()),
356 head_commit_details: ActiveValue::set(None),
357 }
358 }),
359 )
360 .on_conflict(
361 OnConflict::columns([
362 project_repository::Column::ProjectId,
363 project_repository::Column::Id,
364 ])
365 .update_columns([
366 project_repository::Column::ScanId,
367 project_repository::Column::BranchSummary,
368 project_repository::Column::CurrentMergeConflicts,
369 ])
370 .to_owned(),
371 )
372 .exec(&*tx)
373 .await?;
374
375 let has_any_statuses = update
376 .updated_repositories
377 .iter()
378 .any(|repository| !repository.updated_statuses.is_empty());
379
380 if has_any_statuses {
381 project_repository_statuses::Entity::insert_many(
382 update.updated_repositories.iter().flat_map(
383 |repository: &proto::RepositoryEntry| {
384 repository.updated_statuses.iter().map(|status_entry| {
385 let (repo_path, status_kind, first_status, second_status) =
386 proto_status_to_db(status_entry.clone());
387 project_repository_statuses::ActiveModel {
388 project_id: ActiveValue::set(project_id),
389 repository_id: ActiveValue::set(
390 repository.repository_id as i64,
391 ),
392 scan_id: ActiveValue::set(update.scan_id as i64),
393 is_deleted: ActiveValue::set(false),
394 repo_path: ActiveValue::set(repo_path),
395 status: ActiveValue::set(0),
396 status_kind: ActiveValue::set(status_kind),
397 first_status: ActiveValue::set(first_status),
398 second_status: ActiveValue::set(second_status),
399 }
400 })
401 },
402 ),
403 )
404 .on_conflict(
405 OnConflict::columns([
406 project_repository_statuses::Column::ProjectId,
407 project_repository_statuses::Column::RepositoryId,
408 project_repository_statuses::Column::RepoPath,
409 ])
410 .update_columns([
411 project_repository_statuses::Column::ScanId,
412 project_repository_statuses::Column::StatusKind,
413 project_repository_statuses::Column::FirstStatus,
414 project_repository_statuses::Column::SecondStatus,
415 ])
416 .to_owned(),
417 )
418 .exec(&*tx)
419 .await?;
420 }
421
422 for repo in &update.updated_repositories {
423 if !repo.removed_statuses.is_empty() {
424 project_repository_statuses::Entity::update_many()
425 .filter(
426 project_repository_statuses::Column::ProjectId
427 .eq(project_id)
428 .and(
429 project_repository_statuses::Column::RepositoryId
430 .eq(repo.repository_id),
431 )
432 .and(
433 project_repository_statuses::Column::RepoPath
434 .is_in(repo.removed_statuses.iter()),
435 ),
436 )
437 .set(project_repository_statuses::ActiveModel {
438 is_deleted: ActiveValue::Set(true),
439 scan_id: ActiveValue::Set(update.scan_id as i64),
440 ..Default::default()
441 })
442 .exec(&*tx)
443 .await?;
444 }
445 }
446 }
447
448 if !update.removed_repositories.is_empty() {
449 project_repository::Entity::update_many()
450 .filter(
451 project_repository::Column::ProjectId
452 .eq(project_id)
453 .and(project_repository::Column::LegacyWorktreeId.eq(worktree_id))
454 .and(project_repository::Column::Id.is_in(
455 update.removed_repositories.iter().map(|id| *id as i64),
456 )),
457 )
458 .set(project_repository::ActiveModel {
459 is_deleted: ActiveValue::Set(true),
460 scan_id: ActiveValue::Set(update.scan_id as i64),
461 ..Default::default()
462 })
463 .exec(&*tx)
464 .await?;
465 }
466 }
467
468 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
469 Ok(connection_ids)
470 })
471 .await
472 }
473
474 pub async fn update_repository(
475 &self,
476 update: &proto::UpdateRepository,
477 _connection: ConnectionId,
478 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
479 let project_id = ProjectId::from_proto(update.project_id);
480 let repository_id = update.id as i64;
481 self.project_transaction(project_id, |tx| async move {
482 project_repository::Entity::insert(project_repository::ActiveModel {
483 project_id: ActiveValue::set(project_id),
484 id: ActiveValue::set(repository_id),
485 legacy_worktree_id: ActiveValue::set(None),
486 abs_path: ActiveValue::set(update.abs_path.clone()),
487 entry_ids: ActiveValue::Set(serde_json::to_string(&update.entry_ids).unwrap()),
488 scan_id: ActiveValue::set(update.scan_id as i64),
489 is_deleted: ActiveValue::set(false),
490 branch_summary: ActiveValue::Set(
491 update
492 .branch_summary
493 .as_ref()
494 .map(|summary| serde_json::to_string(summary).unwrap()),
495 ),
496 head_commit_details: ActiveValue::Set(
497 update
498 .head_commit_details
499 .as_ref()
500 .map(|details| serde_json::to_string(details).unwrap()),
501 ),
502 current_merge_conflicts: ActiveValue::Set(Some(
503 serde_json::to_string(&update.current_merge_conflicts).unwrap(),
504 )),
505 })
506 .on_conflict(
507 OnConflict::columns([
508 project_repository::Column::ProjectId,
509 project_repository::Column::Id,
510 ])
511 .update_columns([
512 project_repository::Column::ScanId,
513 project_repository::Column::BranchSummary,
514 project_repository::Column::EntryIds,
515 project_repository::Column::AbsPath,
516 project_repository::Column::CurrentMergeConflicts,
517 project_repository::Column::HeadCommitDetails,
518 ])
519 .to_owned(),
520 )
521 .exec(&*tx)
522 .await?;
523
524 let has_any_statuses = !update.updated_statuses.is_empty();
525
526 if has_any_statuses {
527 project_repository_statuses::Entity::insert_many(
528 update.updated_statuses.iter().map(|status_entry| {
529 let (repo_path, status_kind, first_status, second_status) =
530 proto_status_to_db(status_entry.clone());
531 project_repository_statuses::ActiveModel {
532 project_id: ActiveValue::set(project_id),
533 repository_id: ActiveValue::set(repository_id),
534 scan_id: ActiveValue::set(update.scan_id as i64),
535 is_deleted: ActiveValue::set(false),
536 repo_path: ActiveValue::set(repo_path),
537 status: ActiveValue::set(0),
538 status_kind: ActiveValue::set(status_kind),
539 first_status: ActiveValue::set(first_status),
540 second_status: ActiveValue::set(second_status),
541 }
542 }),
543 )
544 .on_conflict(
545 OnConflict::columns([
546 project_repository_statuses::Column::ProjectId,
547 project_repository_statuses::Column::RepositoryId,
548 project_repository_statuses::Column::RepoPath,
549 ])
550 .update_columns([
551 project_repository_statuses::Column::ScanId,
552 project_repository_statuses::Column::StatusKind,
553 project_repository_statuses::Column::FirstStatus,
554 project_repository_statuses::Column::SecondStatus,
555 ])
556 .to_owned(),
557 )
558 .exec(&*tx)
559 .await?;
560 }
561
562 let has_any_removed_statuses = !update.removed_statuses.is_empty();
563
564 if has_any_removed_statuses {
565 project_repository_statuses::Entity::update_many()
566 .filter(
567 project_repository_statuses::Column::ProjectId
568 .eq(project_id)
569 .and(
570 project_repository_statuses::Column::RepositoryId.eq(repository_id),
571 )
572 .and(
573 project_repository_statuses::Column::RepoPath
574 .is_in(update.removed_statuses.iter()),
575 ),
576 )
577 .set(project_repository_statuses::ActiveModel {
578 is_deleted: ActiveValue::Set(true),
579 scan_id: ActiveValue::Set(update.scan_id as i64),
580 ..Default::default()
581 })
582 .exec(&*tx)
583 .await?;
584 }
585
586 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
587 Ok(connection_ids)
588 })
589 .await
590 }
591
592 pub async fn remove_repository(
593 &self,
594 remove: &proto::RemoveRepository,
595 _connection: ConnectionId,
596 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
597 let project_id = ProjectId::from_proto(remove.project_id);
598 let repository_id = remove.id as i64;
599 self.project_transaction(project_id, |tx| async move {
600 project_repository::Entity::update_many()
601 .filter(
602 project_repository::Column::ProjectId
603 .eq(project_id)
604 .and(project_repository::Column::Id.eq(repository_id)),
605 )
606 .set(project_repository::ActiveModel {
607 is_deleted: ActiveValue::Set(true),
608 // scan_id: ActiveValue::Set(update.scan_id as i64),
609 ..Default::default()
610 })
611 .exec(&*tx)
612 .await?;
613
614 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
615 Ok(connection_ids)
616 })
617 .await
618 }
619
620 /// Updates the diagnostic summary for the given connection.
621 pub async fn update_diagnostic_summary(
622 &self,
623 update: &proto::UpdateDiagnosticSummary,
624 connection: ConnectionId,
625 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
626 let project_id = ProjectId::from_proto(update.project_id);
627 let worktree_id = update.worktree_id as i64;
628 self.project_transaction(project_id, |tx| async move {
629 let summary = update.summary.as_ref().context("invalid summary")?;
630
631 // Ensure the update comes from the host.
632 let project = project::Entity::find_by_id(project_id)
633 .one(&*tx)
634 .await?
635 .context("no such project")?;
636 if project.host_connection()? != connection {
637 return Err(anyhow!("can't update a project hosted by someone else"))?;
638 }
639
640 // Update summary.
641 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
642 project_id: ActiveValue::set(project_id),
643 worktree_id: ActiveValue::set(worktree_id),
644 path: ActiveValue::set(summary.path.clone()),
645 language_server_id: ActiveValue::set(summary.language_server_id as i64),
646 error_count: ActiveValue::set(summary.error_count as i32),
647 warning_count: ActiveValue::set(summary.warning_count as i32),
648 })
649 .on_conflict(
650 OnConflict::columns([
651 worktree_diagnostic_summary::Column::ProjectId,
652 worktree_diagnostic_summary::Column::WorktreeId,
653 worktree_diagnostic_summary::Column::Path,
654 ])
655 .update_columns([
656 worktree_diagnostic_summary::Column::LanguageServerId,
657 worktree_diagnostic_summary::Column::ErrorCount,
658 worktree_diagnostic_summary::Column::WarningCount,
659 ])
660 .to_owned(),
661 )
662 .exec(&*tx)
663 .await?;
664
665 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
666 Ok(connection_ids)
667 })
668 .await
669 }
670
671 /// Starts the language server for the given connection.
672 pub async fn start_language_server(
673 &self,
674 update: &proto::StartLanguageServer,
675 connection: ConnectionId,
676 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
677 let project_id = ProjectId::from_proto(update.project_id);
678 self.project_transaction(project_id, |tx| async move {
679 let server = update.server.as_ref().context("invalid language server")?;
680
681 // Ensure the update comes from the host.
682 let project = project::Entity::find_by_id(project_id)
683 .one(&*tx)
684 .await?
685 .context("no such project")?;
686 if project.host_connection()? != connection {
687 return Err(anyhow!("can't update a project hosted by someone else"))?;
688 }
689
690 // Add the newly-started language server.
691 language_server::Entity::insert(language_server::ActiveModel {
692 project_id: ActiveValue::set(project_id),
693 id: ActiveValue::set(server.id as i64),
694 name: ActiveValue::set(server.name.clone()),
695 })
696 .on_conflict(
697 OnConflict::columns([
698 language_server::Column::ProjectId,
699 language_server::Column::Id,
700 ])
701 .update_column(language_server::Column::Name)
702 .to_owned(),
703 )
704 .exec(&*tx)
705 .await?;
706
707 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
708 Ok(connection_ids)
709 })
710 .await
711 }
712
713 /// Updates the worktree settings for the given connection.
714 pub async fn update_worktree_settings(
715 &self,
716 update: &proto::UpdateWorktreeSettings,
717 connection: ConnectionId,
718 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
719 let project_id = ProjectId::from_proto(update.project_id);
720 let kind = match update.kind {
721 Some(kind) => proto::LocalSettingsKind::from_i32(kind)
722 .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
723 None => proto::LocalSettingsKind::Settings,
724 };
725 let kind = LocalSettingsKind::from_proto(kind);
726 self.project_transaction(project_id, |tx| async move {
727 // Ensure the update comes from the host.
728 let project = project::Entity::find_by_id(project_id)
729 .one(&*tx)
730 .await?
731 .context("no such project")?;
732 if project.host_connection()? != connection {
733 return Err(anyhow!("can't update a project hosted by someone else"))?;
734 }
735
736 if let Some(content) = &update.content {
737 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
738 project_id: ActiveValue::Set(project_id),
739 worktree_id: ActiveValue::Set(update.worktree_id as i64),
740 path: ActiveValue::Set(update.path.clone()),
741 content: ActiveValue::Set(content.clone()),
742 kind: ActiveValue::Set(kind),
743 })
744 .on_conflict(
745 OnConflict::columns([
746 worktree_settings_file::Column::ProjectId,
747 worktree_settings_file::Column::WorktreeId,
748 worktree_settings_file::Column::Path,
749 ])
750 .update_column(worktree_settings_file::Column::Content)
751 .to_owned(),
752 )
753 .exec(&*tx)
754 .await?;
755 } else {
756 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
757 project_id: ActiveValue::Set(project_id),
758 worktree_id: ActiveValue::Set(update.worktree_id as i64),
759 path: ActiveValue::Set(update.path.clone()),
760 ..Default::default()
761 })
762 .exec(&*tx)
763 .await?;
764 }
765
766 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
767 Ok(connection_ids)
768 })
769 .await
770 }
771
772 pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
773 self.transaction(|tx| async move {
774 Ok(project::Entity::find_by_id(id)
775 .one(&*tx)
776 .await?
777 .context("no such project")?)
778 })
779 .await
780 }
781
782 /// Adds the given connection to the specified project
783 /// in the current room.
784 pub async fn join_project(
785 &self,
786 project_id: ProjectId,
787 connection: ConnectionId,
788 user_id: UserId,
789 committer_name: Option<String>,
790 committer_email: Option<String>,
791 ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
792 self.project_transaction(project_id, move |tx| {
793 let committer_name = committer_name.clone();
794 let committer_email = committer_email.clone();
795 async move {
796 let (project, role) = self
797 .access_project(project_id, connection, Capability::ReadOnly, &tx)
798 .await?;
799 self.join_project_internal(
800 project,
801 user_id,
802 committer_name,
803 committer_email,
804 connection,
805 role,
806 &tx,
807 )
808 .await
809 }
810 })
811 .await
812 }
813
814 async fn join_project_internal(
815 &self,
816 project: project::Model,
817 user_id: UserId,
818 committer_name: Option<String>,
819 committer_email: Option<String>,
820 connection: ConnectionId,
821 role: ChannelRole,
822 tx: &DatabaseTransaction,
823 ) -> Result<(Project, ReplicaId)> {
824 let mut collaborators = project
825 .find_related(project_collaborator::Entity)
826 .all(tx)
827 .await?;
828 let replica_ids = collaborators
829 .iter()
830 .map(|c| c.replica_id)
831 .collect::<HashSet<_>>();
832 let mut replica_id = ReplicaId(1);
833 while replica_ids.contains(&replica_id) {
834 replica_id.0 += 1;
835 }
836 let new_collaborator = project_collaborator::ActiveModel {
837 project_id: ActiveValue::set(project.id),
838 connection_id: ActiveValue::set(connection.id as i32),
839 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
840 user_id: ActiveValue::set(user_id),
841 replica_id: ActiveValue::set(replica_id),
842 is_host: ActiveValue::set(false),
843 id: ActiveValue::NotSet,
844 committer_name: ActiveValue::set(committer_name),
845 committer_email: ActiveValue::set(committer_email),
846 }
847 .insert(tx)
848 .await?;
849 collaborators.push(new_collaborator);
850
851 let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
852 let mut worktrees = db_worktrees
853 .into_iter()
854 .map(|db_worktree| {
855 (
856 db_worktree.id as u64,
857 Worktree {
858 id: db_worktree.id as u64,
859 abs_path: db_worktree.abs_path,
860 root_name: db_worktree.root_name,
861 visible: db_worktree.visible,
862 entries: Default::default(),
863 diagnostic_summaries: Default::default(),
864 settings_files: Default::default(),
865 scan_id: db_worktree.scan_id as u64,
866 completed_scan_id: db_worktree.completed_scan_id as u64,
867 legacy_repository_entries: Default::default(),
868 },
869 )
870 })
871 .collect::<BTreeMap<_, _>>();
872
873 // Populate worktree entries.
874 {
875 let mut db_entries = worktree_entry::Entity::find()
876 .filter(
877 Condition::all()
878 .add(worktree_entry::Column::ProjectId.eq(project.id))
879 .add(worktree_entry::Column::IsDeleted.eq(false)),
880 )
881 .stream(tx)
882 .await?;
883 while let Some(db_entry) = db_entries.next().await {
884 let db_entry = db_entry?;
885 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
886 worktree.entries.push(proto::Entry {
887 id: db_entry.id as u64,
888 is_dir: db_entry.is_dir,
889 path: db_entry.path,
890 inode: db_entry.inode as u64,
891 mtime: Some(proto::Timestamp {
892 seconds: db_entry.mtime_seconds as u64,
893 nanos: db_entry.mtime_nanos as u32,
894 }),
895 canonical_path: db_entry.canonical_path,
896 is_ignored: db_entry.is_ignored,
897 is_external: db_entry.is_external,
898 // This is only used in the summarization backlog, so if it's None,
899 // that just means we won't be able to detect when to resummarize
900 // based on total number of backlogged bytes - instead, we'd go
901 // on number of files only. That shouldn't be a huge deal in practice.
902 size: None,
903 is_fifo: db_entry.is_fifo,
904 });
905 }
906 }
907 }
908
909 // Populate repository entries.
910 let mut repositories = Vec::new();
911 {
912 let db_repository_entries = project_repository::Entity::find()
913 .filter(
914 Condition::all()
915 .add(project_repository::Column::ProjectId.eq(project.id))
916 .add(project_repository::Column::IsDeleted.eq(false)),
917 )
918 .all(tx)
919 .await?;
920 for db_repository_entry in db_repository_entries {
921 let mut repository_statuses = project_repository_statuses::Entity::find()
922 .filter(
923 Condition::all()
924 .add(project_repository_statuses::Column::ProjectId.eq(project.id))
925 .add(
926 project_repository_statuses::Column::RepositoryId
927 .eq(db_repository_entry.id),
928 )
929 .add(project_repository_statuses::Column::IsDeleted.eq(false)),
930 )
931 .stream(tx)
932 .await?;
933 let mut updated_statuses = Vec::new();
934 while let Some(status_entry) = repository_statuses.next().await {
935 let status_entry = status_entry?;
936 updated_statuses.push(db_status_to_proto(status_entry)?);
937 }
938
939 let current_merge_conflicts = db_repository_entry
940 .current_merge_conflicts
941 .as_ref()
942 .map(|conflicts| serde_json::from_str(&conflicts))
943 .transpose()?
944 .unwrap_or_default();
945
946 let branch_summary = db_repository_entry
947 .branch_summary
948 .as_ref()
949 .map(|branch_summary| serde_json::from_str(&branch_summary))
950 .transpose()?
951 .unwrap_or_default();
952
953 let head_commit_details = db_repository_entry
954 .head_commit_details
955 .as_ref()
956 .map(|head_commit_details| serde_json::from_str(&head_commit_details))
957 .transpose()?
958 .unwrap_or_default();
959
960 let entry_ids = serde_json::from_str(&db_repository_entry.entry_ids)
961 .context("failed to deserialize repository's entry ids")?;
962
963 if let Some(worktree_id) = db_repository_entry.legacy_worktree_id {
964 if let Some(worktree) = worktrees.get_mut(&(worktree_id as u64)) {
965 worktree.legacy_repository_entries.insert(
966 db_repository_entry.id as u64,
967 proto::RepositoryEntry {
968 repository_id: db_repository_entry.id as u64,
969 updated_statuses,
970 removed_statuses: Vec::new(),
971 current_merge_conflicts,
972 branch_summary,
973 },
974 );
975 }
976 } else {
977 repositories.push(proto::UpdateRepository {
978 project_id: db_repository_entry.project_id.0 as u64,
979 id: db_repository_entry.id as u64,
980 abs_path: db_repository_entry.abs_path,
981 entry_ids,
982 updated_statuses,
983 removed_statuses: Vec::new(),
984 current_merge_conflicts,
985 branch_summary,
986 head_commit_details,
987 scan_id: db_repository_entry.scan_id as u64,
988 is_last_update: true,
989 });
990 }
991 }
992 }
993
994 // Populate worktree diagnostic summaries.
995 {
996 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
997 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
998 .stream(tx)
999 .await?;
1000 while let Some(db_summary) = db_summaries.next().await {
1001 let db_summary = db_summary?;
1002 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
1003 worktree
1004 .diagnostic_summaries
1005 .push(proto::DiagnosticSummary {
1006 path: db_summary.path,
1007 language_server_id: db_summary.language_server_id as u64,
1008 error_count: db_summary.error_count as u32,
1009 warning_count: db_summary.warning_count as u32,
1010 });
1011 }
1012 }
1013 }
1014
1015 // Populate worktree settings files
1016 {
1017 let mut db_settings_files = worktree_settings_file::Entity::find()
1018 .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
1019 .stream(tx)
1020 .await?;
1021 while let Some(db_settings_file) = db_settings_files.next().await {
1022 let db_settings_file = db_settings_file?;
1023 if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
1024 worktree.settings_files.push(WorktreeSettingsFile {
1025 path: db_settings_file.path,
1026 content: db_settings_file.content,
1027 kind: db_settings_file.kind,
1028 });
1029 }
1030 }
1031 }
1032
1033 // Populate language servers.
1034 let language_servers = project
1035 .find_related(language_server::Entity)
1036 .all(tx)
1037 .await?;
1038
1039 let project = Project {
1040 id: project.id,
1041 role,
1042 collaborators: collaborators
1043 .into_iter()
1044 .map(|collaborator| ProjectCollaborator {
1045 connection_id: collaborator.connection(),
1046 user_id: collaborator.user_id,
1047 replica_id: collaborator.replica_id,
1048 is_host: collaborator.is_host,
1049 committer_name: collaborator.committer_name,
1050 committer_email: collaborator.committer_email,
1051 })
1052 .collect(),
1053 worktrees,
1054 repositories,
1055 language_servers: language_servers
1056 .into_iter()
1057 .map(|language_server| proto::LanguageServer {
1058 id: language_server.id as u64,
1059 name: language_server.name,
1060 worktree_id: None,
1061 })
1062 .collect(),
1063 };
1064 Ok((project, replica_id as ReplicaId))
1065 }
1066
1067 /// Removes the given connection from the specified project.
1068 pub async fn leave_project(
1069 &self,
1070 project_id: ProjectId,
1071 connection: ConnectionId,
1072 ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
1073 self.project_transaction(project_id, |tx| async move {
1074 let result = project_collaborator::Entity::delete_many()
1075 .filter(
1076 Condition::all()
1077 .add(project_collaborator::Column::ProjectId.eq(project_id))
1078 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
1079 .add(
1080 project_collaborator::Column::ConnectionServerId
1081 .eq(connection.owner_id as i32),
1082 ),
1083 )
1084 .exec(&*tx)
1085 .await?;
1086 if result.rows_affected == 0 {
1087 Err(anyhow!("not a collaborator on this project"))?;
1088 }
1089
1090 let project = project::Entity::find_by_id(project_id)
1091 .one(&*tx)
1092 .await?
1093 .context("no such project")?;
1094 let collaborators = project
1095 .find_related(project_collaborator::Entity)
1096 .all(&*tx)
1097 .await?;
1098 let connection_ids: Vec<ConnectionId> = collaborators
1099 .into_iter()
1100 .map(|collaborator| collaborator.connection())
1101 .collect();
1102
1103 follower::Entity::delete_many()
1104 .filter(
1105 Condition::any()
1106 .add(
1107 Condition::all()
1108 .add(follower::Column::ProjectId.eq(Some(project_id)))
1109 .add(
1110 follower::Column::LeaderConnectionServerId
1111 .eq(connection.owner_id),
1112 )
1113 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
1114 )
1115 .add(
1116 Condition::all()
1117 .add(follower::Column::ProjectId.eq(Some(project_id)))
1118 .add(
1119 follower::Column::FollowerConnectionServerId
1120 .eq(connection.owner_id),
1121 )
1122 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
1123 ),
1124 )
1125 .exec(&*tx)
1126 .await?;
1127
1128 let room = if let Some(room_id) = project.room_id {
1129 Some(self.get_room(room_id, &tx).await?)
1130 } else {
1131 None
1132 };
1133
1134 let left_project = LeftProject {
1135 id: project_id,
1136 should_unshare: connection == project.host_connection()?,
1137 connection_ids,
1138 };
1139 Ok((room, left_project))
1140 })
1141 .await
1142 }
1143
1144 pub async fn check_user_is_project_host(
1145 &self,
1146 project_id: ProjectId,
1147 connection_id: ConnectionId,
1148 ) -> Result<()> {
1149 self.project_transaction(project_id, |tx| async move {
1150 project::Entity::find()
1151 .filter(
1152 Condition::all()
1153 .add(project::Column::Id.eq(project_id))
1154 .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
1155 .add(
1156 project::Column::HostConnectionServerId
1157 .eq(Some(connection_id.owner_id as i32)),
1158 ),
1159 )
1160 .one(&*tx)
1161 .await?
1162 .context("failed to read project host")?;
1163
1164 Ok(())
1165 })
1166 .await
1167 .map(|guard| guard.into_inner())
1168 }
1169
1170 /// Returns the current project if the given user is authorized to access it with the specified capability.
1171 pub async fn access_project(
1172 &self,
1173 project_id: ProjectId,
1174 connection_id: ConnectionId,
1175 capability: Capability,
1176 tx: &DatabaseTransaction,
1177 ) -> Result<(project::Model, ChannelRole)> {
1178 let project = project::Entity::find_by_id(project_id)
1179 .one(tx)
1180 .await?
1181 .context("no such project")?;
1182
1183 let role_from_room = if let Some(room_id) = project.room_id {
1184 room_participant::Entity::find()
1185 .filter(room_participant::Column::RoomId.eq(room_id))
1186 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1187 .one(tx)
1188 .await?
1189 .and_then(|participant| participant.role)
1190 } else {
1191 None
1192 };
1193
1194 let role = role_from_room.unwrap_or(ChannelRole::Banned);
1195
1196 match capability {
1197 Capability::ReadWrite => {
1198 if !role.can_edit_projects() {
1199 return Err(anyhow!("not authorized to edit projects"))?;
1200 }
1201 }
1202 Capability::ReadOnly => {
1203 if !role.can_read_projects() {
1204 return Err(anyhow!("not authorized to read projects"))?;
1205 }
1206 }
1207 }
1208
1209 Ok((project, role))
1210 }
1211
1212 /// Returns the host connection for a read-only request to join a shared project.
1213 pub async fn host_for_read_only_project_request(
1214 &self,
1215 project_id: ProjectId,
1216 connection_id: ConnectionId,
1217 ) -> Result<ConnectionId> {
1218 self.project_transaction(project_id, |tx| async move {
1219 let (project, _) = self
1220 .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1221 .await?;
1222 project.host_connection()
1223 })
1224 .await
1225 .map(|guard| guard.into_inner())
1226 }
1227
1228 /// Returns the host connection for a request to join a shared project.
1229 pub async fn host_for_mutating_project_request(
1230 &self,
1231 project_id: ProjectId,
1232 connection_id: ConnectionId,
1233 ) -> Result<ConnectionId> {
1234 self.project_transaction(project_id, |tx| async move {
1235 let (project, _) = self
1236 .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1237 .await?;
1238 project.host_connection()
1239 })
1240 .await
1241 .map(|guard| guard.into_inner())
1242 }
1243
1244 pub async fn connections_for_buffer_update(
1245 &self,
1246 project_id: ProjectId,
1247 connection_id: ConnectionId,
1248 capability: Capability,
1249 ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1250 self.project_transaction(project_id, |tx| async move {
1251 // Authorize
1252 let (project, _) = self
1253 .access_project(project_id, connection_id, capability, &tx)
1254 .await?;
1255
1256 let host_connection_id = project.host_connection()?;
1257
1258 let collaborators = project_collaborator::Entity::find()
1259 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1260 .all(&*tx)
1261 .await?;
1262
1263 let guest_connection_ids = collaborators
1264 .into_iter()
1265 .filter_map(|collaborator| {
1266 if collaborator.is_host {
1267 None
1268 } else {
1269 Some(collaborator.connection())
1270 }
1271 })
1272 .collect();
1273
1274 Ok((host_connection_id, guest_connection_ids))
1275 })
1276 .await
1277 }
1278
1279 /// Returns the connection IDs in the given project.
1280 ///
1281 /// The provided `connection_id` must also be a collaborator in the project,
1282 /// otherwise an error will be returned.
1283 pub async fn project_connection_ids(
1284 &self,
1285 project_id: ProjectId,
1286 connection_id: ConnectionId,
1287 exclude_dev_server: bool,
1288 ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1289 self.project_transaction(project_id, |tx| async move {
1290 self.internal_project_connection_ids(project_id, connection_id, exclude_dev_server, &tx)
1291 .await
1292 })
1293 .await
1294 }
1295
1296 async fn internal_project_connection_ids(
1297 &self,
1298 project_id: ProjectId,
1299 connection_id: ConnectionId,
1300 exclude_dev_server: bool,
1301 tx: &DatabaseTransaction,
1302 ) -> Result<HashSet<ConnectionId>> {
1303 let project = project::Entity::find_by_id(project_id)
1304 .one(tx)
1305 .await?
1306 .context("no such project")?;
1307
1308 let mut collaborators = project_collaborator::Entity::find()
1309 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1310 .stream(tx)
1311 .await?;
1312
1313 let mut connection_ids = HashSet::default();
1314 if let Some(host_connection) = project.host_connection().log_err() {
1315 if !exclude_dev_server {
1316 connection_ids.insert(host_connection);
1317 }
1318 }
1319
1320 while let Some(collaborator) = collaborators.next().await {
1321 let collaborator = collaborator?;
1322 connection_ids.insert(collaborator.connection());
1323 }
1324
1325 if connection_ids.contains(&connection_id)
1326 || Some(connection_id) == project.host_connection().ok()
1327 {
1328 Ok(connection_ids)
1329 } else {
1330 Err(anyhow!(
1331 "can only send project updates to a project you're in"
1332 ))?
1333 }
1334 }
1335
1336 async fn project_guest_connection_ids(
1337 &self,
1338 project_id: ProjectId,
1339 tx: &DatabaseTransaction,
1340 ) -> Result<Vec<ConnectionId>> {
1341 let mut collaborators = project_collaborator::Entity::find()
1342 .filter(
1343 project_collaborator::Column::ProjectId
1344 .eq(project_id)
1345 .and(project_collaborator::Column::IsHost.eq(false)),
1346 )
1347 .stream(tx)
1348 .await?;
1349
1350 let mut guest_connection_ids = Vec::new();
1351 while let Some(collaborator) = collaborators.next().await {
1352 let collaborator = collaborator?;
1353 guest_connection_ids.push(collaborator.connection());
1354 }
1355 Ok(guest_connection_ids)
1356 }
1357
1358 /// Returns the [`RoomId`] for the given project.
1359 pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1360 self.transaction(|tx| async move {
1361 Ok(project::Entity::find_by_id(project_id)
1362 .one(&*tx)
1363 .await?
1364 .and_then(|project| project.room_id))
1365 })
1366 .await
1367 }
1368
1369 pub async fn check_room_participants(
1370 &self,
1371 room_id: RoomId,
1372 leader_id: ConnectionId,
1373 follower_id: ConnectionId,
1374 ) -> Result<()> {
1375 self.transaction(|tx| async move {
1376 use room_participant::Column;
1377
1378 let count = room_participant::Entity::find()
1379 .filter(
1380 Condition::all().add(Column::RoomId.eq(room_id)).add(
1381 Condition::any()
1382 .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1383 Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1384 ))
1385 .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1386 Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1387 )),
1388 ),
1389 )
1390 .count(&*tx)
1391 .await?;
1392
1393 if count < 2 {
1394 Err(anyhow!("not room participants"))?;
1395 }
1396
1397 Ok(())
1398 })
1399 .await
1400 }
1401
1402 /// Adds the given follower connection as a follower of the given leader connection.
1403 pub async fn follow(
1404 &self,
1405 room_id: RoomId,
1406 project_id: ProjectId,
1407 leader_connection: ConnectionId,
1408 follower_connection: ConnectionId,
1409 ) -> Result<TransactionGuard<proto::Room>> {
1410 self.room_transaction(room_id, |tx| async move {
1411 follower::ActiveModel {
1412 room_id: ActiveValue::set(room_id),
1413 project_id: ActiveValue::set(project_id),
1414 leader_connection_server_id: ActiveValue::set(ServerId(
1415 leader_connection.owner_id as i32,
1416 )),
1417 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1418 follower_connection_server_id: ActiveValue::set(ServerId(
1419 follower_connection.owner_id as i32,
1420 )),
1421 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1422 ..Default::default()
1423 }
1424 .insert(&*tx)
1425 .await?;
1426
1427 let room = self.get_room(room_id, &tx).await?;
1428 Ok(room)
1429 })
1430 .await
1431 }
1432
1433 /// Removes the given follower connection as a follower of the given leader connection.
1434 pub async fn unfollow(
1435 &self,
1436 room_id: RoomId,
1437 project_id: ProjectId,
1438 leader_connection: ConnectionId,
1439 follower_connection: ConnectionId,
1440 ) -> Result<TransactionGuard<proto::Room>> {
1441 self.room_transaction(room_id, |tx| async move {
1442 follower::Entity::delete_many()
1443 .filter(
1444 Condition::all()
1445 .add(follower::Column::RoomId.eq(room_id))
1446 .add(follower::Column::ProjectId.eq(project_id))
1447 .add(
1448 follower::Column::LeaderConnectionServerId
1449 .eq(leader_connection.owner_id),
1450 )
1451 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1452 .add(
1453 follower::Column::FollowerConnectionServerId
1454 .eq(follower_connection.owner_id),
1455 )
1456 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1457 )
1458 .exec(&*tx)
1459 .await?;
1460
1461 let room = self.get_room(room_id, &tx).await?;
1462 Ok(room)
1463 })
1464 .await
1465 }
1466}