1use anyhow::Context as _;
2use collections::HashSet;
3use util::ResultExt;
4
5use super::*;
6
7impl Database {
8 /// Returns the count of all projects, excluding ones marked as admin.
9 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
10 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
11 enum QueryAs {
12 Count,
13 }
14
15 self.transaction(|tx| async move {
16 Ok(project::Entity::find()
17 .select_only()
18 .column_as(project::Column::Id.count(), QueryAs::Count)
19 .inner_join(user::Entity)
20 .filter(user::Column::Admin.eq(false))
21 .into_values::<_, QueryAs>()
22 .one(&*tx)
23 .await?
24 .unwrap_or(0i64) as usize)
25 })
26 .await
27 }
28
29 /// Shares a project with the given room.
30 pub async fn share_project(
31 &self,
32 room_id: RoomId,
33 connection: ConnectionId,
34 worktrees: &[proto::WorktreeMetadata],
35 is_ssh_project: bool,
36 ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
37 self.room_transaction(room_id, |tx| async move {
38 let participant = room_participant::Entity::find()
39 .filter(
40 Condition::all()
41 .add(
42 room_participant::Column::AnsweringConnectionId
43 .eq(connection.id as i32),
44 )
45 .add(
46 room_participant::Column::AnsweringConnectionServerId
47 .eq(connection.owner_id as i32),
48 ),
49 )
50 .one(&*tx)
51 .await?
52 .context("could not find participant")?;
53 if participant.room_id != room_id {
54 return Err(anyhow!("shared project on unexpected room"))?;
55 }
56 if !participant
57 .role
58 .unwrap_or(ChannelRole::Member)
59 .can_edit_projects()
60 {
61 return Err(anyhow!("guests cannot share projects"))?;
62 }
63
64 let project = project::ActiveModel {
65 room_id: ActiveValue::set(Some(participant.room_id)),
66 host_user_id: ActiveValue::set(Some(participant.user_id)),
67 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
68 host_connection_server_id: ActiveValue::set(Some(ServerId(
69 connection.owner_id as i32,
70 ))),
71 id: ActiveValue::NotSet,
72 }
73 .insert(&*tx)
74 .await?;
75
76 if !worktrees.is_empty() {
77 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
78 worktree::ActiveModel {
79 id: ActiveValue::set(worktree.id as i64),
80 project_id: ActiveValue::set(project.id),
81 abs_path: ActiveValue::set(worktree.abs_path.clone()),
82 root_name: ActiveValue::set(worktree.root_name.clone()),
83 visible: ActiveValue::set(worktree.visible),
84 scan_id: ActiveValue::set(0),
85 completed_scan_id: ActiveValue::set(0),
86 }
87 }))
88 .exec(&*tx)
89 .await?;
90 }
91
92 let replica_id = if is_ssh_project { 1 } else { 0 };
93
94 project_collaborator::ActiveModel {
95 project_id: ActiveValue::set(project.id),
96 connection_id: ActiveValue::set(connection.id as i32),
97 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
98 user_id: ActiveValue::set(participant.user_id),
99 replica_id: ActiveValue::set(ReplicaId(replica_id)),
100 is_host: ActiveValue::set(true),
101 id: ActiveValue::NotSet,
102 committer_name: ActiveValue::Set(None),
103 committer_email: ActiveValue::Set(None),
104 }
105 .insert(&*tx)
106 .await?;
107
108 let room = self.get_room(room_id, &tx).await?;
109 Ok((project.id, room))
110 })
111 .await
112 }
113
114 pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
115 self.transaction(|tx| async move {
116 project::Entity::delete_by_id(project_id).exec(&*tx).await?;
117 Ok(())
118 })
119 .await
120 }
121
122 /// Unshares the given project.
123 pub async fn unshare_project(
124 &self,
125 project_id: ProjectId,
126 connection: ConnectionId,
127 ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
128 self.project_transaction(project_id, |tx| async move {
129 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
130 let project = project::Entity::find_by_id(project_id)
131 .one(&*tx)
132 .await?
133 .context("project not found")?;
134 let room = if let Some(room_id) = project.room_id {
135 Some(self.get_room(room_id, &tx).await?)
136 } else {
137 None
138 };
139 if project.host_connection()? == connection {
140 return Ok((true, room, guest_connection_ids));
141 }
142 Err(anyhow!("cannot unshare a project hosted by another user"))?
143 })
144 .await
145 }
146
147 /// Updates the worktrees associated with the given project.
148 pub async fn update_project(
149 &self,
150 project_id: ProjectId,
151 connection: ConnectionId,
152 worktrees: &[proto::WorktreeMetadata],
153 ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
154 self.project_transaction(project_id, |tx| async move {
155 let project = project::Entity::find_by_id(project_id)
156 .filter(
157 Condition::all()
158 .add(project::Column::HostConnectionId.eq(connection.id as i32))
159 .add(
160 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
161 ),
162 )
163 .one(&*tx)
164 .await?
165 .context("no such project")?;
166
167 self.update_project_worktrees(project.id, worktrees, &tx)
168 .await?;
169
170 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
171
172 let room = if let Some(room_id) = project.room_id {
173 Some(self.get_room(room_id, &tx).await?)
174 } else {
175 None
176 };
177
178 Ok((room, guest_connection_ids))
179 })
180 .await
181 }
182
183 pub(in crate::db) async fn update_project_worktrees(
184 &self,
185 project_id: ProjectId,
186 worktrees: &[proto::WorktreeMetadata],
187 tx: &DatabaseTransaction,
188 ) -> Result<()> {
189 if !worktrees.is_empty() {
190 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
191 id: ActiveValue::set(worktree.id as i64),
192 project_id: ActiveValue::set(project_id),
193 abs_path: ActiveValue::set(worktree.abs_path.clone()),
194 root_name: ActiveValue::set(worktree.root_name.clone()),
195 visible: ActiveValue::set(worktree.visible),
196 scan_id: ActiveValue::set(0),
197 completed_scan_id: ActiveValue::set(0),
198 }))
199 .on_conflict(
200 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
201 .update_column(worktree::Column::RootName)
202 .to_owned(),
203 )
204 .exec(tx)
205 .await?;
206 }
207
208 worktree::Entity::delete_many()
209 .filter(worktree::Column::ProjectId.eq(project_id).and(
210 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
211 ))
212 .exec(tx)
213 .await?;
214
215 Ok(())
216 }
217
218 pub async fn update_worktree(
219 &self,
220 update: &proto::UpdateWorktree,
221 connection: ConnectionId,
222 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
223 if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
224 || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
225 {
226 return Err(anyhow!(
227 "invalid worktree update. removed entries: {}, updated entries: {}",
228 update.removed_entries.len(),
229 update.updated_entries.len()
230 ))?;
231 }
232
233 let project_id = ProjectId::from_proto(update.project_id);
234 let worktree_id = update.worktree_id as i64;
235 self.project_transaction(project_id, |tx| async move {
236 // Ensure the update comes from the host.
237 let _project = project::Entity::find_by_id(project_id)
238 .filter(
239 Condition::all()
240 .add(project::Column::HostConnectionId.eq(connection.id as i32))
241 .add(
242 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
243 ),
244 )
245 .one(&*tx)
246 .await?
247 .with_context(|| format!("no such project: {project_id}"))?;
248
249 // Update metadata.
250 worktree::Entity::update(worktree::ActiveModel {
251 id: ActiveValue::set(worktree_id),
252 project_id: ActiveValue::set(project_id),
253 root_name: ActiveValue::set(update.root_name.clone()),
254 scan_id: ActiveValue::set(update.scan_id as i64),
255 completed_scan_id: if update.is_last_update {
256 ActiveValue::set(update.scan_id as i64)
257 } else {
258 ActiveValue::default()
259 },
260 abs_path: ActiveValue::set(update.abs_path.clone()),
261 ..Default::default()
262 })
263 .exec(&*tx)
264 .await?;
265
266 if !update.updated_entries.is_empty() {
267 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
268 let mtime = entry.mtime.clone().unwrap_or_default();
269 worktree_entry::ActiveModel {
270 project_id: ActiveValue::set(project_id),
271 worktree_id: ActiveValue::set(worktree_id),
272 id: ActiveValue::set(entry.id as i64),
273 is_dir: ActiveValue::set(entry.is_dir),
274 path: ActiveValue::set(entry.path.clone()),
275 inode: ActiveValue::set(entry.inode as i64),
276 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
277 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
278 canonical_path: ActiveValue::set(entry.canonical_path.clone()),
279 is_ignored: ActiveValue::set(entry.is_ignored),
280 git_status: ActiveValue::set(None),
281 is_external: ActiveValue::set(entry.is_external),
282 is_deleted: ActiveValue::set(false),
283 scan_id: ActiveValue::set(update.scan_id as i64),
284 is_fifo: ActiveValue::set(entry.is_fifo),
285 }
286 }))
287 .on_conflict(
288 OnConflict::columns([
289 worktree_entry::Column::ProjectId,
290 worktree_entry::Column::WorktreeId,
291 worktree_entry::Column::Id,
292 ])
293 .update_columns([
294 worktree_entry::Column::IsDir,
295 worktree_entry::Column::Path,
296 worktree_entry::Column::Inode,
297 worktree_entry::Column::MtimeSeconds,
298 worktree_entry::Column::MtimeNanos,
299 worktree_entry::Column::CanonicalPath,
300 worktree_entry::Column::IsIgnored,
301 worktree_entry::Column::ScanId,
302 ])
303 .to_owned(),
304 )
305 .exec(&*tx)
306 .await?;
307 }
308
309 if !update.removed_entries.is_empty() {
310 worktree_entry::Entity::update_many()
311 .filter(
312 worktree_entry::Column::ProjectId
313 .eq(project_id)
314 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
315 .and(
316 worktree_entry::Column::Id
317 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
318 ),
319 )
320 .set(worktree_entry::ActiveModel {
321 is_deleted: ActiveValue::Set(true),
322 scan_id: ActiveValue::Set(update.scan_id as i64),
323 ..Default::default()
324 })
325 .exec(&*tx)
326 .await?;
327 }
328
329 // Backward-compatibility for old Zed clients.
330 //
331 // Remove this block when Zed 1.80 stable has been out for a week.
332 {
333 if !update.updated_repositories.is_empty() {
334 project_repository::Entity::insert_many(
335 update.updated_repositories.iter().map(|repository| {
336 project_repository::ActiveModel {
337 project_id: ActiveValue::set(project_id),
338 legacy_worktree_id: ActiveValue::set(Some(worktree_id)),
339 id: ActiveValue::set(repository.repository_id as i64),
340 scan_id: ActiveValue::set(update.scan_id as i64),
341 is_deleted: ActiveValue::set(false),
342 branch_summary: ActiveValue::Set(
343 repository
344 .branch_summary
345 .as_ref()
346 .map(|summary| serde_json::to_string(summary).unwrap()),
347 ),
348 current_merge_conflicts: ActiveValue::Set(Some(
349 serde_json::to_string(&repository.current_merge_conflicts)
350 .unwrap(),
351 )),
352
353 // Old clients do not use abs path, entry ids or head_commit_details.
354 abs_path: ActiveValue::set(String::new()),
355 entry_ids: ActiveValue::set("[]".into()),
356 head_commit_details: ActiveValue::set(None),
357 }
358 }),
359 )
360 .on_conflict(
361 OnConflict::columns([
362 project_repository::Column::ProjectId,
363 project_repository::Column::Id,
364 ])
365 .update_columns([
366 project_repository::Column::ScanId,
367 project_repository::Column::BranchSummary,
368 project_repository::Column::CurrentMergeConflicts,
369 ])
370 .to_owned(),
371 )
372 .exec(&*tx)
373 .await?;
374
375 let has_any_statuses = update
376 .updated_repositories
377 .iter()
378 .any(|repository| !repository.updated_statuses.is_empty());
379
380 if has_any_statuses {
381 project_repository_statuses::Entity::insert_many(
382 update.updated_repositories.iter().flat_map(
383 |repository: &proto::RepositoryEntry| {
384 repository.updated_statuses.iter().map(|status_entry| {
385 let (repo_path, status_kind, first_status, second_status) =
386 proto_status_to_db(status_entry.clone());
387 project_repository_statuses::ActiveModel {
388 project_id: ActiveValue::set(project_id),
389 repository_id: ActiveValue::set(
390 repository.repository_id as i64,
391 ),
392 scan_id: ActiveValue::set(update.scan_id as i64),
393 is_deleted: ActiveValue::set(false),
394 repo_path: ActiveValue::set(repo_path),
395 status: ActiveValue::set(0),
396 status_kind: ActiveValue::set(status_kind),
397 first_status: ActiveValue::set(first_status),
398 second_status: ActiveValue::set(second_status),
399 }
400 })
401 },
402 ),
403 )
404 .on_conflict(
405 OnConflict::columns([
406 project_repository_statuses::Column::ProjectId,
407 project_repository_statuses::Column::RepositoryId,
408 project_repository_statuses::Column::RepoPath,
409 ])
410 .update_columns([
411 project_repository_statuses::Column::ScanId,
412 project_repository_statuses::Column::StatusKind,
413 project_repository_statuses::Column::FirstStatus,
414 project_repository_statuses::Column::SecondStatus,
415 ])
416 .to_owned(),
417 )
418 .exec(&*tx)
419 .await?;
420 }
421
422 for repo in &update.updated_repositories {
423 if !repo.removed_statuses.is_empty() {
424 project_repository_statuses::Entity::update_many()
425 .filter(
426 project_repository_statuses::Column::ProjectId
427 .eq(project_id)
428 .and(
429 project_repository_statuses::Column::RepositoryId
430 .eq(repo.repository_id),
431 )
432 .and(
433 project_repository_statuses::Column::RepoPath
434 .is_in(repo.removed_statuses.iter()),
435 ),
436 )
437 .set(project_repository_statuses::ActiveModel {
438 is_deleted: ActiveValue::Set(true),
439 scan_id: ActiveValue::Set(update.scan_id as i64),
440 ..Default::default()
441 })
442 .exec(&*tx)
443 .await?;
444 }
445 }
446 }
447
448 if !update.removed_repositories.is_empty() {
449 project_repository::Entity::update_many()
450 .filter(
451 project_repository::Column::ProjectId
452 .eq(project_id)
453 .and(project_repository::Column::LegacyWorktreeId.eq(worktree_id))
454 .and(project_repository::Column::Id.is_in(
455 update.removed_repositories.iter().map(|id| *id as i64),
456 )),
457 )
458 .set(project_repository::ActiveModel {
459 is_deleted: ActiveValue::Set(true),
460 scan_id: ActiveValue::Set(update.scan_id as i64),
461 ..Default::default()
462 })
463 .exec(&*tx)
464 .await?;
465 }
466 }
467
468 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
469 Ok(connection_ids)
470 })
471 .await
472 }
473
474 pub async fn update_repository(
475 &self,
476 update: &proto::UpdateRepository,
477 _connection: ConnectionId,
478 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
479 let project_id = ProjectId::from_proto(update.project_id);
480 let repository_id = update.id as i64;
481 self.project_transaction(project_id, |tx| async move {
482 project_repository::Entity::insert(project_repository::ActiveModel {
483 project_id: ActiveValue::set(project_id),
484 id: ActiveValue::set(repository_id),
485 legacy_worktree_id: ActiveValue::set(None),
486 abs_path: ActiveValue::set(update.abs_path.clone()),
487 entry_ids: ActiveValue::Set(serde_json::to_string(&update.entry_ids).unwrap()),
488 scan_id: ActiveValue::set(update.scan_id as i64),
489 is_deleted: ActiveValue::set(false),
490 branch_summary: ActiveValue::Set(
491 update
492 .branch_summary
493 .as_ref()
494 .map(|summary| serde_json::to_string(summary).unwrap()),
495 ),
496 head_commit_details: ActiveValue::Set(
497 update
498 .head_commit_details
499 .as_ref()
500 .map(|details| serde_json::to_string(details).unwrap()),
501 ),
502 current_merge_conflicts: ActiveValue::Set(Some(
503 serde_json::to_string(&update.current_merge_conflicts).unwrap(),
504 )),
505 })
506 .on_conflict(
507 OnConflict::columns([
508 project_repository::Column::ProjectId,
509 project_repository::Column::Id,
510 ])
511 .update_columns([
512 project_repository::Column::ScanId,
513 project_repository::Column::BranchSummary,
514 project_repository::Column::EntryIds,
515 project_repository::Column::AbsPath,
516 project_repository::Column::CurrentMergeConflicts,
517 project_repository::Column::HeadCommitDetails,
518 ])
519 .to_owned(),
520 )
521 .exec(&*tx)
522 .await?;
523
524 let has_any_statuses = !update.updated_statuses.is_empty();
525
526 if has_any_statuses {
527 project_repository_statuses::Entity::insert_many(
528 update.updated_statuses.iter().map(|status_entry| {
529 let (repo_path, status_kind, first_status, second_status) =
530 proto_status_to_db(status_entry.clone());
531 project_repository_statuses::ActiveModel {
532 project_id: ActiveValue::set(project_id),
533 repository_id: ActiveValue::set(repository_id),
534 scan_id: ActiveValue::set(update.scan_id as i64),
535 is_deleted: ActiveValue::set(false),
536 repo_path: ActiveValue::set(repo_path),
537 status: ActiveValue::set(0),
538 status_kind: ActiveValue::set(status_kind),
539 first_status: ActiveValue::set(first_status),
540 second_status: ActiveValue::set(second_status),
541 }
542 }),
543 )
544 .on_conflict(
545 OnConflict::columns([
546 project_repository_statuses::Column::ProjectId,
547 project_repository_statuses::Column::RepositoryId,
548 project_repository_statuses::Column::RepoPath,
549 ])
550 .update_columns([
551 project_repository_statuses::Column::ScanId,
552 project_repository_statuses::Column::StatusKind,
553 project_repository_statuses::Column::FirstStatus,
554 project_repository_statuses::Column::SecondStatus,
555 ])
556 .to_owned(),
557 )
558 .exec(&*tx)
559 .await?;
560 }
561
562 let has_any_removed_statuses = !update.removed_statuses.is_empty();
563
564 if has_any_removed_statuses {
565 project_repository_statuses::Entity::update_many()
566 .filter(
567 project_repository_statuses::Column::ProjectId
568 .eq(project_id)
569 .and(
570 project_repository_statuses::Column::RepositoryId.eq(repository_id),
571 )
572 .and(
573 project_repository_statuses::Column::RepoPath
574 .is_in(update.removed_statuses.iter()),
575 ),
576 )
577 .set(project_repository_statuses::ActiveModel {
578 is_deleted: ActiveValue::Set(true),
579 scan_id: ActiveValue::Set(update.scan_id as i64),
580 ..Default::default()
581 })
582 .exec(&*tx)
583 .await?;
584 }
585
586 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
587 Ok(connection_ids)
588 })
589 .await
590 }
591
592 pub async fn remove_repository(
593 &self,
594 remove: &proto::RemoveRepository,
595 _connection: ConnectionId,
596 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
597 let project_id = ProjectId::from_proto(remove.project_id);
598 let repository_id = remove.id as i64;
599 self.project_transaction(project_id, |tx| async move {
600 project_repository::Entity::update_many()
601 .filter(
602 project_repository::Column::ProjectId
603 .eq(project_id)
604 .and(project_repository::Column::Id.eq(repository_id)),
605 )
606 .set(project_repository::ActiveModel {
607 is_deleted: ActiveValue::Set(true),
608 // scan_id: ActiveValue::Set(update.scan_id as i64),
609 ..Default::default()
610 })
611 .exec(&*tx)
612 .await?;
613
614 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
615 Ok(connection_ids)
616 })
617 .await
618 }
619
620 /// Updates the diagnostic summary for the given connection.
621 pub async fn update_diagnostic_summary(
622 &self,
623 update: &proto::UpdateDiagnosticSummary,
624 connection: ConnectionId,
625 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
626 let project_id = ProjectId::from_proto(update.project_id);
627 let worktree_id = update.worktree_id as i64;
628 self.project_transaction(project_id, |tx| async move {
629 let summary = update.summary.as_ref().context("invalid summary")?;
630
631 // Ensure the update comes from the host.
632 let project = project::Entity::find_by_id(project_id)
633 .one(&*tx)
634 .await?
635 .context("no such project")?;
636 if project.host_connection()? != connection {
637 return Err(anyhow!("can't update a project hosted by someone else"))?;
638 }
639
640 // Update summary.
641 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
642 project_id: ActiveValue::set(project_id),
643 worktree_id: ActiveValue::set(worktree_id),
644 path: ActiveValue::set(summary.path.clone()),
645 language_server_id: ActiveValue::set(summary.language_server_id as i64),
646 error_count: ActiveValue::set(summary.error_count as i32),
647 warning_count: ActiveValue::set(summary.warning_count as i32),
648 })
649 .on_conflict(
650 OnConflict::columns([
651 worktree_diagnostic_summary::Column::ProjectId,
652 worktree_diagnostic_summary::Column::WorktreeId,
653 worktree_diagnostic_summary::Column::Path,
654 ])
655 .update_columns([
656 worktree_diagnostic_summary::Column::LanguageServerId,
657 worktree_diagnostic_summary::Column::ErrorCount,
658 worktree_diagnostic_summary::Column::WarningCount,
659 ])
660 .to_owned(),
661 )
662 .exec(&*tx)
663 .await?;
664
665 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
666 Ok(connection_ids)
667 })
668 .await
669 }
670
671 /// Starts the language server for the given connection.
672 pub async fn start_language_server(
673 &self,
674 update: &proto::StartLanguageServer,
675 connection: ConnectionId,
676 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
677 let project_id = ProjectId::from_proto(update.project_id);
678 self.project_transaction(project_id, |tx| async move {
679 let server = update.server.as_ref().context("invalid language server")?;
680
681 // Ensure the update comes from the host.
682 let project = project::Entity::find_by_id(project_id)
683 .one(&*tx)
684 .await?
685 .context("no such project")?;
686 if project.host_connection()? != connection {
687 return Err(anyhow!("can't update a project hosted by someone else"))?;
688 }
689
690 // Add the newly-started language server.
691 language_server::Entity::insert(language_server::ActiveModel {
692 project_id: ActiveValue::set(project_id),
693 id: ActiveValue::set(server.id as i64),
694 name: ActiveValue::set(server.name.clone()),
695 capabilities: ActiveValue::set(update.capabilities.clone()),
696 })
697 .on_conflict(
698 OnConflict::columns([
699 language_server::Column::ProjectId,
700 language_server::Column::Id,
701 ])
702 .update_column(language_server::Column::Name)
703 .to_owned(),
704 )
705 .exec(&*tx)
706 .await?;
707
708 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
709 Ok(connection_ids)
710 })
711 .await
712 }
713
714 /// Updates the worktree settings for the given connection.
715 pub async fn update_worktree_settings(
716 &self,
717 update: &proto::UpdateWorktreeSettings,
718 connection: ConnectionId,
719 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
720 let project_id = ProjectId::from_proto(update.project_id);
721 let kind = match update.kind {
722 Some(kind) => proto::LocalSettingsKind::from_i32(kind)
723 .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
724 None => proto::LocalSettingsKind::Settings,
725 };
726 let kind = LocalSettingsKind::from_proto(kind);
727 self.project_transaction(project_id, |tx| async move {
728 // Ensure the update comes from the host.
729 let project = project::Entity::find_by_id(project_id)
730 .one(&*tx)
731 .await?
732 .context("no such project")?;
733 if project.host_connection()? != connection {
734 return Err(anyhow!("can't update a project hosted by someone else"))?;
735 }
736
737 if let Some(content) = &update.content {
738 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
739 project_id: ActiveValue::Set(project_id),
740 worktree_id: ActiveValue::Set(update.worktree_id as i64),
741 path: ActiveValue::Set(update.path.clone()),
742 content: ActiveValue::Set(content.clone()),
743 kind: ActiveValue::Set(kind),
744 })
745 .on_conflict(
746 OnConflict::columns([
747 worktree_settings_file::Column::ProjectId,
748 worktree_settings_file::Column::WorktreeId,
749 worktree_settings_file::Column::Path,
750 ])
751 .update_column(worktree_settings_file::Column::Content)
752 .to_owned(),
753 )
754 .exec(&*tx)
755 .await?;
756 } else {
757 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
758 project_id: ActiveValue::Set(project_id),
759 worktree_id: ActiveValue::Set(update.worktree_id as i64),
760 path: ActiveValue::Set(update.path.clone()),
761 ..Default::default()
762 })
763 .exec(&*tx)
764 .await?;
765 }
766
767 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
768 Ok(connection_ids)
769 })
770 .await
771 }
772
773 pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
774 self.transaction(|tx| async move {
775 Ok(project::Entity::find_by_id(id)
776 .one(&*tx)
777 .await?
778 .context("no such project")?)
779 })
780 .await
781 }
782
783 /// Adds the given connection to the specified project
784 /// in the current room.
785 pub async fn join_project(
786 &self,
787 project_id: ProjectId,
788 connection: ConnectionId,
789 user_id: UserId,
790 committer_name: Option<String>,
791 committer_email: Option<String>,
792 ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
793 self.project_transaction(project_id, move |tx| {
794 let committer_name = committer_name.clone();
795 let committer_email = committer_email.clone();
796 async move {
797 let (project, role) = self
798 .access_project(project_id, connection, Capability::ReadOnly, &tx)
799 .await?;
800 self.join_project_internal(
801 project,
802 user_id,
803 committer_name,
804 committer_email,
805 connection,
806 role,
807 &tx,
808 )
809 .await
810 }
811 })
812 .await
813 }
814
815 async fn join_project_internal(
816 &self,
817 project: project::Model,
818 user_id: UserId,
819 committer_name: Option<String>,
820 committer_email: Option<String>,
821 connection: ConnectionId,
822 role: ChannelRole,
823 tx: &DatabaseTransaction,
824 ) -> Result<(Project, ReplicaId)> {
825 let mut collaborators = project
826 .find_related(project_collaborator::Entity)
827 .all(tx)
828 .await?;
829 let replica_ids = collaborators
830 .iter()
831 .map(|c| c.replica_id)
832 .collect::<HashSet<_>>();
833 let mut replica_id = ReplicaId(1);
834 while replica_ids.contains(&replica_id) {
835 replica_id.0 += 1;
836 }
837 let new_collaborator = project_collaborator::ActiveModel {
838 project_id: ActiveValue::set(project.id),
839 connection_id: ActiveValue::set(connection.id as i32),
840 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
841 user_id: ActiveValue::set(user_id),
842 replica_id: ActiveValue::set(replica_id),
843 is_host: ActiveValue::set(false),
844 id: ActiveValue::NotSet,
845 committer_name: ActiveValue::set(committer_name),
846 committer_email: ActiveValue::set(committer_email),
847 }
848 .insert(tx)
849 .await?;
850 collaborators.push(new_collaborator);
851
852 let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
853 let mut worktrees = db_worktrees
854 .into_iter()
855 .map(|db_worktree| {
856 (
857 db_worktree.id as u64,
858 Worktree {
859 id: db_worktree.id as u64,
860 abs_path: db_worktree.abs_path,
861 root_name: db_worktree.root_name,
862 visible: db_worktree.visible,
863 entries: Default::default(),
864 diagnostic_summaries: Default::default(),
865 settings_files: Default::default(),
866 scan_id: db_worktree.scan_id as u64,
867 completed_scan_id: db_worktree.completed_scan_id as u64,
868 legacy_repository_entries: Default::default(),
869 },
870 )
871 })
872 .collect::<BTreeMap<_, _>>();
873
874 // Populate worktree entries.
875 {
876 let mut db_entries = worktree_entry::Entity::find()
877 .filter(
878 Condition::all()
879 .add(worktree_entry::Column::ProjectId.eq(project.id))
880 .add(worktree_entry::Column::IsDeleted.eq(false)),
881 )
882 .stream(tx)
883 .await?;
884 while let Some(db_entry) = db_entries.next().await {
885 let db_entry = db_entry?;
886 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
887 worktree.entries.push(proto::Entry {
888 id: db_entry.id as u64,
889 is_dir: db_entry.is_dir,
890 path: db_entry.path,
891 inode: db_entry.inode as u64,
892 mtime: Some(proto::Timestamp {
893 seconds: db_entry.mtime_seconds as u64,
894 nanos: db_entry.mtime_nanos as u32,
895 }),
896 canonical_path: db_entry.canonical_path,
897 is_ignored: db_entry.is_ignored,
898 is_external: db_entry.is_external,
899 // This is only used in the summarization backlog, so if it's None,
900 // that just means we won't be able to detect when to resummarize
901 // based on total number of backlogged bytes - instead, we'd go
902 // on number of files only. That shouldn't be a huge deal in practice.
903 size: None,
904 is_fifo: db_entry.is_fifo,
905 });
906 }
907 }
908 }
909
910 // Populate repository entries.
911 let mut repositories = Vec::new();
912 {
913 let db_repository_entries = project_repository::Entity::find()
914 .filter(
915 Condition::all()
916 .add(project_repository::Column::ProjectId.eq(project.id))
917 .add(project_repository::Column::IsDeleted.eq(false)),
918 )
919 .all(tx)
920 .await?;
921 for db_repository_entry in db_repository_entries {
922 let mut repository_statuses = project_repository_statuses::Entity::find()
923 .filter(
924 Condition::all()
925 .add(project_repository_statuses::Column::ProjectId.eq(project.id))
926 .add(
927 project_repository_statuses::Column::RepositoryId
928 .eq(db_repository_entry.id),
929 )
930 .add(project_repository_statuses::Column::IsDeleted.eq(false)),
931 )
932 .stream(tx)
933 .await?;
934 let mut updated_statuses = Vec::new();
935 while let Some(status_entry) = repository_statuses.next().await {
936 let status_entry = status_entry?;
937 updated_statuses.push(db_status_to_proto(status_entry)?);
938 }
939
940 let current_merge_conflicts = db_repository_entry
941 .current_merge_conflicts
942 .as_ref()
943 .map(|conflicts| serde_json::from_str(&conflicts))
944 .transpose()?
945 .unwrap_or_default();
946
947 let branch_summary = db_repository_entry
948 .branch_summary
949 .as_ref()
950 .map(|branch_summary| serde_json::from_str(&branch_summary))
951 .transpose()?
952 .unwrap_or_default();
953
954 let head_commit_details = db_repository_entry
955 .head_commit_details
956 .as_ref()
957 .map(|head_commit_details| serde_json::from_str(&head_commit_details))
958 .transpose()?
959 .unwrap_or_default();
960
961 let entry_ids = serde_json::from_str(&db_repository_entry.entry_ids)
962 .context("failed to deserialize repository's entry ids")?;
963
964 if let Some(worktree_id) = db_repository_entry.legacy_worktree_id {
965 if let Some(worktree) = worktrees.get_mut(&(worktree_id as u64)) {
966 worktree.legacy_repository_entries.insert(
967 db_repository_entry.id as u64,
968 proto::RepositoryEntry {
969 repository_id: db_repository_entry.id as u64,
970 updated_statuses,
971 removed_statuses: Vec::new(),
972 current_merge_conflicts,
973 branch_summary,
974 },
975 );
976 }
977 } else {
978 repositories.push(proto::UpdateRepository {
979 project_id: db_repository_entry.project_id.0 as u64,
980 id: db_repository_entry.id as u64,
981 abs_path: db_repository_entry.abs_path,
982 entry_ids,
983 updated_statuses,
984 removed_statuses: Vec::new(),
985 current_merge_conflicts,
986 branch_summary,
987 head_commit_details,
988 scan_id: db_repository_entry.scan_id as u64,
989 is_last_update: true,
990 });
991 }
992 }
993 }
994
995 // Populate worktree diagnostic summaries.
996 {
997 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
998 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
999 .stream(tx)
1000 .await?;
1001 while let Some(db_summary) = db_summaries.next().await {
1002 let db_summary = db_summary?;
1003 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
1004 worktree
1005 .diagnostic_summaries
1006 .push(proto::DiagnosticSummary {
1007 path: db_summary.path,
1008 language_server_id: db_summary.language_server_id as u64,
1009 error_count: db_summary.error_count as u32,
1010 warning_count: db_summary.warning_count as u32,
1011 });
1012 }
1013 }
1014 }
1015
1016 // Populate worktree settings files
1017 {
1018 let mut db_settings_files = worktree_settings_file::Entity::find()
1019 .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
1020 .stream(tx)
1021 .await?;
1022 while let Some(db_settings_file) = db_settings_files.next().await {
1023 let db_settings_file = db_settings_file?;
1024 if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
1025 worktree.settings_files.push(WorktreeSettingsFile {
1026 path: db_settings_file.path,
1027 content: db_settings_file.content,
1028 kind: db_settings_file.kind,
1029 });
1030 }
1031 }
1032 }
1033
1034 // Populate language servers.
1035 let language_servers = project
1036 .find_related(language_server::Entity)
1037 .all(tx)
1038 .await?;
1039
1040 let project = Project {
1041 id: project.id,
1042 role,
1043 collaborators: collaborators
1044 .into_iter()
1045 .map(|collaborator| ProjectCollaborator {
1046 connection_id: collaborator.connection(),
1047 user_id: collaborator.user_id,
1048 replica_id: collaborator.replica_id,
1049 is_host: collaborator.is_host,
1050 committer_name: collaborator.committer_name,
1051 committer_email: collaborator.committer_email,
1052 })
1053 .collect(),
1054 worktrees,
1055 repositories,
1056 language_servers: language_servers
1057 .into_iter()
1058 .map(|language_server| LanguageServer {
1059 server: proto::LanguageServer {
1060 id: language_server.id as u64,
1061 name: language_server.name,
1062 worktree_id: None,
1063 },
1064 capabilities: language_server.capabilities,
1065 })
1066 .collect(),
1067 };
1068 Ok((project, replica_id as ReplicaId))
1069 }
1070
1071 /// Removes the given connection from the specified project.
1072 pub async fn leave_project(
1073 &self,
1074 project_id: ProjectId,
1075 connection: ConnectionId,
1076 ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
1077 self.project_transaction(project_id, |tx| async move {
1078 let result = project_collaborator::Entity::delete_many()
1079 .filter(
1080 Condition::all()
1081 .add(project_collaborator::Column::ProjectId.eq(project_id))
1082 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
1083 .add(
1084 project_collaborator::Column::ConnectionServerId
1085 .eq(connection.owner_id as i32),
1086 ),
1087 )
1088 .exec(&*tx)
1089 .await?;
1090 if result.rows_affected == 0 {
1091 Err(anyhow!("not a collaborator on this project"))?;
1092 }
1093
1094 let project = project::Entity::find_by_id(project_id)
1095 .one(&*tx)
1096 .await?
1097 .context("no such project")?;
1098 let collaborators = project
1099 .find_related(project_collaborator::Entity)
1100 .all(&*tx)
1101 .await?;
1102 let connection_ids: Vec<ConnectionId> = collaborators
1103 .into_iter()
1104 .map(|collaborator| collaborator.connection())
1105 .collect();
1106
1107 follower::Entity::delete_many()
1108 .filter(
1109 Condition::any()
1110 .add(
1111 Condition::all()
1112 .add(follower::Column::ProjectId.eq(Some(project_id)))
1113 .add(
1114 follower::Column::LeaderConnectionServerId
1115 .eq(connection.owner_id),
1116 )
1117 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
1118 )
1119 .add(
1120 Condition::all()
1121 .add(follower::Column::ProjectId.eq(Some(project_id)))
1122 .add(
1123 follower::Column::FollowerConnectionServerId
1124 .eq(connection.owner_id),
1125 )
1126 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
1127 ),
1128 )
1129 .exec(&*tx)
1130 .await?;
1131
1132 let room = if let Some(room_id) = project.room_id {
1133 Some(self.get_room(room_id, &tx).await?)
1134 } else {
1135 None
1136 };
1137
1138 let left_project = LeftProject {
1139 id: project_id,
1140 should_unshare: connection == project.host_connection()?,
1141 connection_ids,
1142 };
1143 Ok((room, left_project))
1144 })
1145 .await
1146 }
1147
1148 pub async fn check_user_is_project_host(
1149 &self,
1150 project_id: ProjectId,
1151 connection_id: ConnectionId,
1152 ) -> Result<()> {
1153 self.project_transaction(project_id, |tx| async move {
1154 project::Entity::find()
1155 .filter(
1156 Condition::all()
1157 .add(project::Column::Id.eq(project_id))
1158 .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
1159 .add(
1160 project::Column::HostConnectionServerId
1161 .eq(Some(connection_id.owner_id as i32)),
1162 ),
1163 )
1164 .one(&*tx)
1165 .await?
1166 .context("failed to read project host")?;
1167
1168 Ok(())
1169 })
1170 .await
1171 .map(|guard| guard.into_inner())
1172 }
1173
1174 /// Returns the current project if the given user is authorized to access it with the specified capability.
1175 pub async fn access_project(
1176 &self,
1177 project_id: ProjectId,
1178 connection_id: ConnectionId,
1179 capability: Capability,
1180 tx: &DatabaseTransaction,
1181 ) -> Result<(project::Model, ChannelRole)> {
1182 let project = project::Entity::find_by_id(project_id)
1183 .one(tx)
1184 .await?
1185 .context("no such project")?;
1186
1187 let role_from_room = if let Some(room_id) = project.room_id {
1188 room_participant::Entity::find()
1189 .filter(room_participant::Column::RoomId.eq(room_id))
1190 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1191 .one(tx)
1192 .await?
1193 .and_then(|participant| participant.role)
1194 } else {
1195 None
1196 };
1197
1198 let role = role_from_room.unwrap_or(ChannelRole::Banned);
1199
1200 match capability {
1201 Capability::ReadWrite => {
1202 if !role.can_edit_projects() {
1203 return Err(anyhow!("not authorized to edit projects"))?;
1204 }
1205 }
1206 Capability::ReadOnly => {
1207 if !role.can_read_projects() {
1208 return Err(anyhow!("not authorized to read projects"))?;
1209 }
1210 }
1211 }
1212
1213 Ok((project, role))
1214 }
1215
1216 /// Returns the host connection for a read-only request to join a shared project.
1217 pub async fn host_for_read_only_project_request(
1218 &self,
1219 project_id: ProjectId,
1220 connection_id: ConnectionId,
1221 ) -> Result<ConnectionId> {
1222 self.project_transaction(project_id, |tx| async move {
1223 let (project, _) = self
1224 .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1225 .await?;
1226 project.host_connection()
1227 })
1228 .await
1229 .map(|guard| guard.into_inner())
1230 }
1231
1232 /// Returns the host connection for a request to join a shared project.
1233 pub async fn host_for_mutating_project_request(
1234 &self,
1235 project_id: ProjectId,
1236 connection_id: ConnectionId,
1237 ) -> Result<ConnectionId> {
1238 self.project_transaction(project_id, |tx| async move {
1239 let (project, _) = self
1240 .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1241 .await?;
1242 project.host_connection()
1243 })
1244 .await
1245 .map(|guard| guard.into_inner())
1246 }
1247
1248 pub async fn connections_for_buffer_update(
1249 &self,
1250 project_id: ProjectId,
1251 connection_id: ConnectionId,
1252 capability: Capability,
1253 ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1254 self.project_transaction(project_id, |tx| async move {
1255 // Authorize
1256 let (project, _) = self
1257 .access_project(project_id, connection_id, capability, &tx)
1258 .await?;
1259
1260 let host_connection_id = project.host_connection()?;
1261
1262 let collaborators = project_collaborator::Entity::find()
1263 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1264 .all(&*tx)
1265 .await?;
1266
1267 let guest_connection_ids = collaborators
1268 .into_iter()
1269 .filter_map(|collaborator| {
1270 if collaborator.is_host {
1271 None
1272 } else {
1273 Some(collaborator.connection())
1274 }
1275 })
1276 .collect();
1277
1278 Ok((host_connection_id, guest_connection_ids))
1279 })
1280 .await
1281 }
1282
1283 /// Returns the connection IDs in the given project.
1284 ///
1285 /// The provided `connection_id` must also be a collaborator in the project,
1286 /// otherwise an error will be returned.
1287 pub async fn project_connection_ids(
1288 &self,
1289 project_id: ProjectId,
1290 connection_id: ConnectionId,
1291 exclude_dev_server: bool,
1292 ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1293 self.project_transaction(project_id, |tx| async move {
1294 self.internal_project_connection_ids(project_id, connection_id, exclude_dev_server, &tx)
1295 .await
1296 })
1297 .await
1298 }
1299
1300 async fn internal_project_connection_ids(
1301 &self,
1302 project_id: ProjectId,
1303 connection_id: ConnectionId,
1304 exclude_dev_server: bool,
1305 tx: &DatabaseTransaction,
1306 ) -> Result<HashSet<ConnectionId>> {
1307 let project = project::Entity::find_by_id(project_id)
1308 .one(tx)
1309 .await?
1310 .context("no such project")?;
1311
1312 let mut collaborators = project_collaborator::Entity::find()
1313 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1314 .stream(tx)
1315 .await?;
1316
1317 let mut connection_ids = HashSet::default();
1318 if let Some(host_connection) = project.host_connection().log_err() {
1319 if !exclude_dev_server {
1320 connection_ids.insert(host_connection);
1321 }
1322 }
1323
1324 while let Some(collaborator) = collaborators.next().await {
1325 let collaborator = collaborator?;
1326 connection_ids.insert(collaborator.connection());
1327 }
1328
1329 if connection_ids.contains(&connection_id)
1330 || Some(connection_id) == project.host_connection().ok()
1331 {
1332 Ok(connection_ids)
1333 } else {
1334 Err(anyhow!(
1335 "can only send project updates to a project you're in"
1336 ))?
1337 }
1338 }
1339
1340 async fn project_guest_connection_ids(
1341 &self,
1342 project_id: ProjectId,
1343 tx: &DatabaseTransaction,
1344 ) -> Result<Vec<ConnectionId>> {
1345 let mut collaborators = project_collaborator::Entity::find()
1346 .filter(
1347 project_collaborator::Column::ProjectId
1348 .eq(project_id)
1349 .and(project_collaborator::Column::IsHost.eq(false)),
1350 )
1351 .stream(tx)
1352 .await?;
1353
1354 let mut guest_connection_ids = Vec::new();
1355 while let Some(collaborator) = collaborators.next().await {
1356 let collaborator = collaborator?;
1357 guest_connection_ids.push(collaborator.connection());
1358 }
1359 Ok(guest_connection_ids)
1360 }
1361
1362 /// Returns the [`RoomId`] for the given project.
1363 pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1364 self.transaction(|tx| async move {
1365 Ok(project::Entity::find_by_id(project_id)
1366 .one(&*tx)
1367 .await?
1368 .and_then(|project| project.room_id))
1369 })
1370 .await
1371 }
1372
1373 pub async fn check_room_participants(
1374 &self,
1375 room_id: RoomId,
1376 leader_id: ConnectionId,
1377 follower_id: ConnectionId,
1378 ) -> Result<()> {
1379 self.transaction(|tx| async move {
1380 use room_participant::Column;
1381
1382 let count = room_participant::Entity::find()
1383 .filter(
1384 Condition::all().add(Column::RoomId.eq(room_id)).add(
1385 Condition::any()
1386 .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1387 Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1388 ))
1389 .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1390 Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1391 )),
1392 ),
1393 )
1394 .count(&*tx)
1395 .await?;
1396
1397 if count < 2 {
1398 Err(anyhow!("not room participants"))?;
1399 }
1400
1401 Ok(())
1402 })
1403 .await
1404 }
1405
1406 /// Adds the given follower connection as a follower of the given leader connection.
1407 pub async fn follow(
1408 &self,
1409 room_id: RoomId,
1410 project_id: ProjectId,
1411 leader_connection: ConnectionId,
1412 follower_connection: ConnectionId,
1413 ) -> Result<TransactionGuard<proto::Room>> {
1414 self.room_transaction(room_id, |tx| async move {
1415 follower::ActiveModel {
1416 room_id: ActiveValue::set(room_id),
1417 project_id: ActiveValue::set(project_id),
1418 leader_connection_server_id: ActiveValue::set(ServerId(
1419 leader_connection.owner_id as i32,
1420 )),
1421 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1422 follower_connection_server_id: ActiveValue::set(ServerId(
1423 follower_connection.owner_id as i32,
1424 )),
1425 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1426 ..Default::default()
1427 }
1428 .insert(&*tx)
1429 .await?;
1430
1431 let room = self.get_room(room_id, &tx).await?;
1432 Ok(room)
1433 })
1434 .await
1435 }
1436
1437 /// Removes the given follower connection as a follower of the given leader connection.
1438 pub async fn unfollow(
1439 &self,
1440 room_id: RoomId,
1441 project_id: ProjectId,
1442 leader_connection: ConnectionId,
1443 follower_connection: ConnectionId,
1444 ) -> Result<TransactionGuard<proto::Room>> {
1445 self.room_transaction(room_id, |tx| async move {
1446 follower::Entity::delete_many()
1447 .filter(
1448 Condition::all()
1449 .add(follower::Column::RoomId.eq(room_id))
1450 .add(follower::Column::ProjectId.eq(project_id))
1451 .add(
1452 follower::Column::LeaderConnectionServerId
1453 .eq(leader_connection.owner_id),
1454 )
1455 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1456 .add(
1457 follower::Column::FollowerConnectionServerId
1458 .eq(follower_connection.owner_id),
1459 )
1460 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1461 )
1462 .exec(&*tx)
1463 .await?;
1464
1465 let room = self.get_room(room_id, &tx).await?;
1466 Ok(room)
1467 })
1468 .await
1469 }
1470}