1use anyhow::Context as _;
2use collections::HashSet;
3use util::ResultExt;
4
5use super::*;
6
7impl Database {
8 /// Returns the count of all projects, excluding ones marked as admin.
9 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
10 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
11 enum QueryAs {
12 Count,
13 }
14
15 self.transaction(|tx| async move {
16 Ok(project::Entity::find()
17 .select_only()
18 .column_as(project::Column::Id.count(), QueryAs::Count)
19 .inner_join(user::Entity)
20 .filter(user::Column::Admin.eq(false))
21 .into_values::<_, QueryAs>()
22 .one(&*tx)
23 .await?
24 .unwrap_or(0i64) as usize)
25 })
26 .await
27 }
28
29 /// Shares a project with the given room.
30 pub async fn share_project(
31 &self,
32 room_id: RoomId,
33 connection: ConnectionId,
34 worktrees: &[proto::WorktreeMetadata],
35 is_ssh_project: bool,
36 windows_paths: bool,
37 ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
38 self.room_transaction(room_id, |tx| async move {
39 let participant = room_participant::Entity::find()
40 .filter(
41 Condition::all()
42 .add(
43 room_participant::Column::AnsweringConnectionId
44 .eq(connection.id as i32),
45 )
46 .add(
47 room_participant::Column::AnsweringConnectionServerId
48 .eq(connection.owner_id as i32),
49 ),
50 )
51 .one(&*tx)
52 .await?
53 .context("could not find participant")?;
54 if participant.room_id != room_id {
55 return Err(anyhow!("shared project on unexpected room"))?;
56 }
57 if !participant
58 .role
59 .unwrap_or(ChannelRole::Member)
60 .can_edit_projects()
61 {
62 return Err(anyhow!("guests cannot share projects"))?;
63 }
64
65 let project = project::ActiveModel {
66 room_id: ActiveValue::set(Some(participant.room_id)),
67 host_user_id: ActiveValue::set(Some(participant.user_id)),
68 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
69 host_connection_server_id: ActiveValue::set(Some(ServerId(
70 connection.owner_id as i32,
71 ))),
72 id: ActiveValue::NotSet,
73 windows_paths: ActiveValue::set(windows_paths),
74 }
75 .insert(&*tx)
76 .await?;
77
78 if !worktrees.is_empty() {
79 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
80 worktree::ActiveModel {
81 id: ActiveValue::set(worktree.id as i64),
82 project_id: ActiveValue::set(project.id),
83 abs_path: ActiveValue::set(worktree.abs_path.clone()),
84 root_name: ActiveValue::set(worktree.root_name.clone()),
85 visible: ActiveValue::set(worktree.visible),
86 scan_id: ActiveValue::set(0),
87 completed_scan_id: ActiveValue::set(0),
88 }
89 }))
90 .exec(&*tx)
91 .await?;
92 }
93
94 let replica_id = if is_ssh_project {
95 clock::ReplicaId::REMOTE_SERVER
96 } else {
97 clock::ReplicaId::LOCAL
98 };
99
100 project_collaborator::ActiveModel {
101 project_id: ActiveValue::set(project.id),
102 connection_id: ActiveValue::set(connection.id as i32),
103 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
104 user_id: ActiveValue::set(participant.user_id),
105 replica_id: ActiveValue::set(ReplicaId(replica_id.as_u16() as i32)),
106 is_host: ActiveValue::set(true),
107 id: ActiveValue::NotSet,
108 committer_name: ActiveValue::Set(None),
109 committer_email: ActiveValue::Set(None),
110 }
111 .insert(&*tx)
112 .await?;
113
114 let room = self.get_room(room_id, &tx).await?;
115 Ok((project.id, room))
116 })
117 .await
118 }
119
120 pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
121 self.transaction(|tx| async move {
122 project::Entity::delete_by_id(project_id).exec(&*tx).await?;
123 Ok(())
124 })
125 .await
126 }
127
128 /// Unshares the given project.
129 pub async fn unshare_project(
130 &self,
131 project_id: ProjectId,
132 connection: ConnectionId,
133 ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
134 self.project_transaction(project_id, |tx| async move {
135 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
136 let project = project::Entity::find_by_id(project_id)
137 .one(&*tx)
138 .await?
139 .context("project not found")?;
140 let room = if let Some(room_id) = project.room_id {
141 Some(self.get_room(room_id, &tx).await?)
142 } else {
143 None
144 };
145 if project.host_connection()? == connection {
146 return Ok((true, room, guest_connection_ids));
147 }
148 Err(anyhow!("cannot unshare a project hosted by another user"))?
149 })
150 .await
151 }
152
153 /// Updates the worktrees associated with the given project.
154 pub async fn update_project(
155 &self,
156 project_id: ProjectId,
157 connection: ConnectionId,
158 worktrees: &[proto::WorktreeMetadata],
159 ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
160 self.project_transaction(project_id, |tx| async move {
161 let project = project::Entity::find_by_id(project_id)
162 .filter(
163 Condition::all()
164 .add(project::Column::HostConnectionId.eq(connection.id as i32))
165 .add(
166 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
167 ),
168 )
169 .one(&*tx)
170 .await?
171 .context("no such project")?;
172
173 self.update_project_worktrees(project.id, worktrees, &tx)
174 .await?;
175
176 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
177
178 let room = if let Some(room_id) = project.room_id {
179 Some(self.get_room(room_id, &tx).await?)
180 } else {
181 None
182 };
183
184 Ok((room, guest_connection_ids))
185 })
186 .await
187 }
188
189 pub(in crate::db) async fn update_project_worktrees(
190 &self,
191 project_id: ProjectId,
192 worktrees: &[proto::WorktreeMetadata],
193 tx: &DatabaseTransaction,
194 ) -> Result<()> {
195 if !worktrees.is_empty() {
196 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
197 id: ActiveValue::set(worktree.id as i64),
198 project_id: ActiveValue::set(project_id),
199 abs_path: ActiveValue::set(worktree.abs_path.clone()),
200 root_name: ActiveValue::set(worktree.root_name.clone()),
201 visible: ActiveValue::set(worktree.visible),
202 scan_id: ActiveValue::set(0),
203 completed_scan_id: ActiveValue::set(0),
204 }))
205 .on_conflict(
206 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
207 .update_column(worktree::Column::RootName)
208 .to_owned(),
209 )
210 .exec(tx)
211 .await?;
212 }
213
214 worktree::Entity::delete_many()
215 .filter(worktree::Column::ProjectId.eq(project_id).and(
216 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
217 ))
218 .exec(tx)
219 .await?;
220
221 Ok(())
222 }
223
224 pub async fn update_worktree(
225 &self,
226 update: &proto::UpdateWorktree,
227 connection: ConnectionId,
228 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
229 if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
230 || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
231 {
232 return Err(anyhow!(
233 "invalid worktree update. removed entries: {}, updated entries: {}",
234 update.removed_entries.len(),
235 update.updated_entries.len()
236 ))?;
237 }
238
239 let project_id = ProjectId::from_proto(update.project_id);
240 let worktree_id = update.worktree_id as i64;
241 self.project_transaction(project_id, |tx| async move {
242 // Ensure the update comes from the host.
243 let _project = project::Entity::find_by_id(project_id)
244 .filter(
245 Condition::all()
246 .add(project::Column::HostConnectionId.eq(connection.id as i32))
247 .add(
248 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
249 ),
250 )
251 .one(&*tx)
252 .await?
253 .with_context(|| format!("no such project: {project_id}"))?;
254
255 // Update metadata.
256 worktree::Entity::update(worktree::ActiveModel {
257 id: ActiveValue::set(worktree_id),
258 project_id: ActiveValue::set(project_id),
259 root_name: ActiveValue::set(update.root_name.clone()),
260 scan_id: ActiveValue::set(update.scan_id as i64),
261 completed_scan_id: if update.is_last_update {
262 ActiveValue::set(update.scan_id as i64)
263 } else {
264 ActiveValue::default()
265 },
266 abs_path: ActiveValue::set(update.abs_path.clone()),
267 ..Default::default()
268 })
269 .exec(&*tx)
270 .await?;
271
272 if !update.updated_entries.is_empty() {
273 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
274 let mtime = entry.mtime.clone().unwrap_or_default();
275 worktree_entry::ActiveModel {
276 project_id: ActiveValue::set(project_id),
277 worktree_id: ActiveValue::set(worktree_id),
278 id: ActiveValue::set(entry.id as i64),
279 is_dir: ActiveValue::set(entry.is_dir),
280 path: ActiveValue::set(entry.path.clone()),
281 inode: ActiveValue::set(entry.inode as i64),
282 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
283 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
284 canonical_path: ActiveValue::set(entry.canonical_path.clone()),
285 is_ignored: ActiveValue::set(entry.is_ignored),
286 git_status: ActiveValue::set(None),
287 is_external: ActiveValue::set(entry.is_external),
288 is_deleted: ActiveValue::set(false),
289 is_hidden: ActiveValue::set(entry.is_hidden),
290 scan_id: ActiveValue::set(update.scan_id as i64),
291 is_fifo: ActiveValue::set(entry.is_fifo),
292 }
293 }))
294 .on_conflict(
295 OnConflict::columns([
296 worktree_entry::Column::ProjectId,
297 worktree_entry::Column::WorktreeId,
298 worktree_entry::Column::Id,
299 ])
300 .update_columns([
301 worktree_entry::Column::IsDir,
302 worktree_entry::Column::Path,
303 worktree_entry::Column::Inode,
304 worktree_entry::Column::MtimeSeconds,
305 worktree_entry::Column::MtimeNanos,
306 worktree_entry::Column::CanonicalPath,
307 worktree_entry::Column::IsIgnored,
308 worktree_entry::Column::IsHidden,
309 worktree_entry::Column::ScanId,
310 ])
311 .to_owned(),
312 )
313 .exec(&*tx)
314 .await?;
315 }
316
317 if !update.removed_entries.is_empty() {
318 worktree_entry::Entity::update_many()
319 .filter(
320 worktree_entry::Column::ProjectId
321 .eq(project_id)
322 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
323 .and(
324 worktree_entry::Column::Id
325 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
326 ),
327 )
328 .set(worktree_entry::ActiveModel {
329 is_deleted: ActiveValue::Set(true),
330 scan_id: ActiveValue::Set(update.scan_id as i64),
331 ..Default::default()
332 })
333 .exec(&*tx)
334 .await?;
335 }
336
337 // Backward-compatibility for old Zed clients.
338 //
339 // Remove this block when Zed 1.80 stable has been out for a week.
340 {
341 if !update.updated_repositories.is_empty() {
342 project_repository::Entity::insert_many(
343 update.updated_repositories.iter().map(|repository| {
344 project_repository::ActiveModel {
345 project_id: ActiveValue::set(project_id),
346 legacy_worktree_id: ActiveValue::set(Some(worktree_id)),
347 id: ActiveValue::set(repository.repository_id as i64),
348 scan_id: ActiveValue::set(update.scan_id as i64),
349 is_deleted: ActiveValue::set(false),
350 branch_summary: ActiveValue::Set(
351 repository
352 .branch_summary
353 .as_ref()
354 .map(|summary| serde_json::to_string(summary).unwrap()),
355 ),
356 current_merge_conflicts: ActiveValue::Set(Some(
357 serde_json::to_string(&repository.current_merge_conflicts)
358 .unwrap(),
359 )),
360 // Old clients do not use abs path, entry ids, head_commit_details, or merge_message.
361 abs_path: ActiveValue::set(String::new()),
362 entry_ids: ActiveValue::set("[]".into()),
363 head_commit_details: ActiveValue::set(None),
364 merge_message: ActiveValue::set(None),
365 }
366 }),
367 )
368 .on_conflict(
369 OnConflict::columns([
370 project_repository::Column::ProjectId,
371 project_repository::Column::Id,
372 ])
373 .update_columns([
374 project_repository::Column::ScanId,
375 project_repository::Column::BranchSummary,
376 project_repository::Column::CurrentMergeConflicts,
377 ])
378 .to_owned(),
379 )
380 .exec(&*tx)
381 .await?;
382
383 let has_any_statuses = update
384 .updated_repositories
385 .iter()
386 .any(|repository| !repository.updated_statuses.is_empty());
387
388 if has_any_statuses {
389 project_repository_statuses::Entity::insert_many(
390 update.updated_repositories.iter().flat_map(
391 |repository: &proto::RepositoryEntry| {
392 repository.updated_statuses.iter().map(|status_entry| {
393 let (repo_path, status_kind, first_status, second_status) =
394 proto_status_to_db(status_entry.clone());
395 project_repository_statuses::ActiveModel {
396 project_id: ActiveValue::set(project_id),
397 repository_id: ActiveValue::set(
398 repository.repository_id as i64,
399 ),
400 scan_id: ActiveValue::set(update.scan_id as i64),
401 is_deleted: ActiveValue::set(false),
402 repo_path: ActiveValue::set(repo_path),
403 status: ActiveValue::set(0),
404 status_kind: ActiveValue::set(status_kind),
405 first_status: ActiveValue::set(first_status),
406 second_status: ActiveValue::set(second_status),
407 }
408 })
409 },
410 ),
411 )
412 .on_conflict(
413 OnConflict::columns([
414 project_repository_statuses::Column::ProjectId,
415 project_repository_statuses::Column::RepositoryId,
416 project_repository_statuses::Column::RepoPath,
417 ])
418 .update_columns([
419 project_repository_statuses::Column::ScanId,
420 project_repository_statuses::Column::StatusKind,
421 project_repository_statuses::Column::FirstStatus,
422 project_repository_statuses::Column::SecondStatus,
423 ])
424 .to_owned(),
425 )
426 .exec(&*tx)
427 .await?;
428 }
429
430 for repo in &update.updated_repositories {
431 if !repo.removed_statuses.is_empty() {
432 project_repository_statuses::Entity::update_many()
433 .filter(
434 project_repository_statuses::Column::ProjectId
435 .eq(project_id)
436 .and(
437 project_repository_statuses::Column::RepositoryId
438 .eq(repo.repository_id),
439 )
440 .and(
441 project_repository_statuses::Column::RepoPath
442 .is_in(repo.removed_statuses.iter()),
443 ),
444 )
445 .set(project_repository_statuses::ActiveModel {
446 is_deleted: ActiveValue::Set(true),
447 scan_id: ActiveValue::Set(update.scan_id as i64),
448 ..Default::default()
449 })
450 .exec(&*tx)
451 .await?;
452 }
453 }
454 }
455
456 if !update.removed_repositories.is_empty() {
457 project_repository::Entity::update_many()
458 .filter(
459 project_repository::Column::ProjectId
460 .eq(project_id)
461 .and(project_repository::Column::LegacyWorktreeId.eq(worktree_id))
462 .and(project_repository::Column::Id.is_in(
463 update.removed_repositories.iter().map(|id| *id as i64),
464 )),
465 )
466 .set(project_repository::ActiveModel {
467 is_deleted: ActiveValue::Set(true),
468 scan_id: ActiveValue::Set(update.scan_id as i64),
469 ..Default::default()
470 })
471 .exec(&*tx)
472 .await?;
473 }
474 }
475
476 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
477 Ok(connection_ids)
478 })
479 .await
480 }
481
482 pub async fn update_repository(
483 &self,
484 update: &proto::UpdateRepository,
485 _connection: ConnectionId,
486 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
487 let project_id = ProjectId::from_proto(update.project_id);
488 let repository_id = update.id as i64;
489 self.project_transaction(project_id, |tx| async move {
490 project_repository::Entity::insert(project_repository::ActiveModel {
491 project_id: ActiveValue::set(project_id),
492 id: ActiveValue::set(repository_id),
493 legacy_worktree_id: ActiveValue::set(None),
494 abs_path: ActiveValue::set(update.abs_path.clone()),
495 entry_ids: ActiveValue::Set(serde_json::to_string(&update.entry_ids).unwrap()),
496 scan_id: ActiveValue::set(update.scan_id as i64),
497 is_deleted: ActiveValue::set(false),
498 branch_summary: ActiveValue::Set(
499 update
500 .branch_summary
501 .as_ref()
502 .map(|summary| serde_json::to_string(summary).unwrap()),
503 ),
504 head_commit_details: ActiveValue::Set(
505 update
506 .head_commit_details
507 .as_ref()
508 .map(|details| serde_json::to_string(details).unwrap()),
509 ),
510 current_merge_conflicts: ActiveValue::Set(Some(
511 serde_json::to_string(&update.current_merge_conflicts).unwrap(),
512 )),
513 merge_message: ActiveValue::set(update.merge_message.clone()),
514 })
515 .on_conflict(
516 OnConflict::columns([
517 project_repository::Column::ProjectId,
518 project_repository::Column::Id,
519 ])
520 .update_columns([
521 project_repository::Column::ScanId,
522 project_repository::Column::BranchSummary,
523 project_repository::Column::EntryIds,
524 project_repository::Column::AbsPath,
525 project_repository::Column::CurrentMergeConflicts,
526 project_repository::Column::HeadCommitDetails,
527 project_repository::Column::MergeMessage,
528 ])
529 .to_owned(),
530 )
531 .exec(&*tx)
532 .await?;
533
534 let has_any_statuses = !update.updated_statuses.is_empty();
535
536 if has_any_statuses {
537 project_repository_statuses::Entity::insert_many(
538 update.updated_statuses.iter().map(|status_entry| {
539 let (repo_path, status_kind, first_status, second_status) =
540 proto_status_to_db(status_entry.clone());
541 project_repository_statuses::ActiveModel {
542 project_id: ActiveValue::set(project_id),
543 repository_id: ActiveValue::set(repository_id),
544 scan_id: ActiveValue::set(update.scan_id as i64),
545 is_deleted: ActiveValue::set(false),
546 repo_path: ActiveValue::set(repo_path),
547 status: ActiveValue::set(0),
548 status_kind: ActiveValue::set(status_kind),
549 first_status: ActiveValue::set(first_status),
550 second_status: ActiveValue::set(second_status),
551 }
552 }),
553 )
554 .on_conflict(
555 OnConflict::columns([
556 project_repository_statuses::Column::ProjectId,
557 project_repository_statuses::Column::RepositoryId,
558 project_repository_statuses::Column::RepoPath,
559 ])
560 .update_columns([
561 project_repository_statuses::Column::ScanId,
562 project_repository_statuses::Column::StatusKind,
563 project_repository_statuses::Column::FirstStatus,
564 project_repository_statuses::Column::SecondStatus,
565 ])
566 .to_owned(),
567 )
568 .exec(&*tx)
569 .await?;
570 }
571
572 let has_any_removed_statuses = !update.removed_statuses.is_empty();
573
574 if has_any_removed_statuses {
575 project_repository_statuses::Entity::update_many()
576 .filter(
577 project_repository_statuses::Column::ProjectId
578 .eq(project_id)
579 .and(
580 project_repository_statuses::Column::RepositoryId.eq(repository_id),
581 )
582 .and(
583 project_repository_statuses::Column::RepoPath
584 .is_in(update.removed_statuses.iter()),
585 ),
586 )
587 .set(project_repository_statuses::ActiveModel {
588 is_deleted: ActiveValue::Set(true),
589 scan_id: ActiveValue::Set(update.scan_id as i64),
590 ..Default::default()
591 })
592 .exec(&*tx)
593 .await?;
594 }
595
596 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
597 Ok(connection_ids)
598 })
599 .await
600 }
601
602 pub async fn remove_repository(
603 &self,
604 remove: &proto::RemoveRepository,
605 _connection: ConnectionId,
606 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
607 let project_id = ProjectId::from_proto(remove.project_id);
608 let repository_id = remove.id as i64;
609 self.project_transaction(project_id, |tx| async move {
610 project_repository::Entity::update_many()
611 .filter(
612 project_repository::Column::ProjectId
613 .eq(project_id)
614 .and(project_repository::Column::Id.eq(repository_id)),
615 )
616 .set(project_repository::ActiveModel {
617 is_deleted: ActiveValue::Set(true),
618 // scan_id: ActiveValue::Set(update.scan_id as i64),
619 ..Default::default()
620 })
621 .exec(&*tx)
622 .await?;
623
624 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
625 Ok(connection_ids)
626 })
627 .await
628 }
629
630 /// Updates the diagnostic summary for the given connection.
631 pub async fn update_diagnostic_summary(
632 &self,
633 update: &proto::UpdateDiagnosticSummary,
634 connection: ConnectionId,
635 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
636 let project_id = ProjectId::from_proto(update.project_id);
637 let worktree_id = update.worktree_id as i64;
638 self.project_transaction(project_id, |tx| async move {
639 let summary = update.summary.as_ref().context("invalid summary")?;
640
641 // Ensure the update comes from the host.
642 let project = project::Entity::find_by_id(project_id)
643 .one(&*tx)
644 .await?
645 .context("no such project")?;
646 if project.host_connection()? != connection {
647 return Err(anyhow!("can't update a project hosted by someone else"))?;
648 }
649
650 // Update summary.
651 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
652 project_id: ActiveValue::set(project_id),
653 worktree_id: ActiveValue::set(worktree_id),
654 path: ActiveValue::set(summary.path.clone()),
655 language_server_id: ActiveValue::set(summary.language_server_id as i64),
656 error_count: ActiveValue::set(summary.error_count as i32),
657 warning_count: ActiveValue::set(summary.warning_count as i32),
658 })
659 .on_conflict(
660 OnConflict::columns([
661 worktree_diagnostic_summary::Column::ProjectId,
662 worktree_diagnostic_summary::Column::WorktreeId,
663 worktree_diagnostic_summary::Column::Path,
664 ])
665 .update_columns([
666 worktree_diagnostic_summary::Column::LanguageServerId,
667 worktree_diagnostic_summary::Column::ErrorCount,
668 worktree_diagnostic_summary::Column::WarningCount,
669 ])
670 .to_owned(),
671 )
672 .exec(&*tx)
673 .await?;
674
675 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
676 Ok(connection_ids)
677 })
678 .await
679 }
680
681 /// Starts the language server for the given connection.
682 pub async fn start_language_server(
683 &self,
684 update: &proto::StartLanguageServer,
685 connection: ConnectionId,
686 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
687 let project_id = ProjectId::from_proto(update.project_id);
688 self.project_transaction(project_id, |tx| async move {
689 let server = update.server.as_ref().context("invalid language server")?;
690
691 // Ensure the update comes from the host.
692 let project = project::Entity::find_by_id(project_id)
693 .one(&*tx)
694 .await?
695 .context("no such project")?;
696 if project.host_connection()? != connection {
697 return Err(anyhow!("can't update a project hosted by someone else"))?;
698 }
699
700 // Add the newly-started language server.
701 language_server::Entity::insert(language_server::ActiveModel {
702 project_id: ActiveValue::set(project_id),
703 id: ActiveValue::set(server.id as i64),
704 name: ActiveValue::set(server.name.clone()),
705 worktree_id: ActiveValue::set(server.worktree_id.map(|id| id as i64)),
706 capabilities: ActiveValue::set(update.capabilities.clone()),
707 })
708 .on_conflict(
709 OnConflict::columns([
710 language_server::Column::ProjectId,
711 language_server::Column::Id,
712 ])
713 .update_columns([
714 language_server::Column::Name,
715 language_server::Column::Capabilities,
716 language_server::Column::WorktreeId,
717 ])
718 .to_owned(),
719 )
720 .exec(&*tx)
721 .await?;
722
723 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
724 Ok(connection_ids)
725 })
726 .await
727 }
728
729 /// Updates the worktree settings for the given connection.
730 pub async fn update_worktree_settings(
731 &self,
732 update: &proto::UpdateWorktreeSettings,
733 connection: ConnectionId,
734 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
735 let project_id = ProjectId::from_proto(update.project_id);
736 let kind = match update.kind {
737 Some(kind) => proto::LocalSettingsKind::from_i32(kind)
738 .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
739 None => proto::LocalSettingsKind::Settings,
740 };
741 let kind = LocalSettingsKind::from_proto(kind);
742 self.project_transaction(project_id, |tx| async move {
743 // Ensure the update comes from the host.
744 let project = project::Entity::find_by_id(project_id)
745 .one(&*tx)
746 .await?
747 .context("no such project")?;
748 if project.host_connection()? != connection {
749 return Err(anyhow!("can't update a project hosted by someone else"))?;
750 }
751
752 if let Some(content) = &update.content {
753 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
754 project_id: ActiveValue::Set(project_id),
755 worktree_id: ActiveValue::Set(update.worktree_id as i64),
756 path: ActiveValue::Set(update.path.clone()),
757 content: ActiveValue::Set(content.clone()),
758 kind: ActiveValue::Set(kind),
759 })
760 .on_conflict(
761 OnConflict::columns([
762 worktree_settings_file::Column::ProjectId,
763 worktree_settings_file::Column::WorktreeId,
764 worktree_settings_file::Column::Path,
765 ])
766 .update_column(worktree_settings_file::Column::Content)
767 .to_owned(),
768 )
769 .exec(&*tx)
770 .await?;
771 } else {
772 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
773 project_id: ActiveValue::Set(project_id),
774 worktree_id: ActiveValue::Set(update.worktree_id as i64),
775 path: ActiveValue::Set(update.path.clone()),
776 ..Default::default()
777 })
778 .exec(&*tx)
779 .await?;
780 }
781
782 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
783 Ok(connection_ids)
784 })
785 .await
786 }
787
788 pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
789 self.transaction(|tx| async move {
790 Ok(project::Entity::find_by_id(id)
791 .one(&*tx)
792 .await?
793 .context("no such project")?)
794 })
795 .await
796 }
797
798 /// Adds the given connection to the specified project
799 /// in the current room.
800 pub async fn join_project(
801 &self,
802 project_id: ProjectId,
803 connection: ConnectionId,
804 user_id: UserId,
805 committer_name: Option<String>,
806 committer_email: Option<String>,
807 ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
808 self.project_transaction(project_id, move |tx| {
809 let committer_name = committer_name.clone();
810 let committer_email = committer_email.clone();
811 async move {
812 let (project, role) = self
813 .access_project(project_id, connection, Capability::ReadOnly, &tx)
814 .await?;
815 self.join_project_internal(
816 project,
817 user_id,
818 committer_name,
819 committer_email,
820 connection,
821 role,
822 &tx,
823 )
824 .await
825 }
826 })
827 .await
828 }
829
830 async fn join_project_internal(
831 &self,
832 project: project::Model,
833 user_id: UserId,
834 committer_name: Option<String>,
835 committer_email: Option<String>,
836 connection: ConnectionId,
837 role: ChannelRole,
838 tx: &DatabaseTransaction,
839 ) -> Result<(Project, ReplicaId)> {
840 let mut collaborators = project
841 .find_related(project_collaborator::Entity)
842 .all(tx)
843 .await?;
844 let replica_ids = collaborators
845 .iter()
846 .map(|c| c.replica_id)
847 .collect::<HashSet<_>>();
848 let mut replica_id = ReplicaId(clock::ReplicaId::FIRST_COLLAB_ID.as_u16() as i32);
849 while replica_ids.contains(&replica_id) {
850 replica_id.0 += 1;
851 }
852 let new_collaborator = project_collaborator::ActiveModel {
853 project_id: ActiveValue::set(project.id),
854 connection_id: ActiveValue::set(connection.id as i32),
855 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
856 user_id: ActiveValue::set(user_id),
857 replica_id: ActiveValue::set(replica_id),
858 is_host: ActiveValue::set(false),
859 id: ActiveValue::NotSet,
860 committer_name: ActiveValue::set(committer_name),
861 committer_email: ActiveValue::set(committer_email),
862 }
863 .insert(tx)
864 .await?;
865 collaborators.push(new_collaborator);
866
867 let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
868 let mut worktrees = db_worktrees
869 .into_iter()
870 .map(|db_worktree| {
871 (
872 db_worktree.id as u64,
873 Worktree {
874 id: db_worktree.id as u64,
875 abs_path: db_worktree.abs_path,
876 root_name: db_worktree.root_name,
877 visible: db_worktree.visible,
878 entries: Default::default(),
879 diagnostic_summaries: Default::default(),
880 settings_files: Default::default(),
881 scan_id: db_worktree.scan_id as u64,
882 completed_scan_id: db_worktree.completed_scan_id as u64,
883 legacy_repository_entries: Default::default(),
884 },
885 )
886 })
887 .collect::<BTreeMap<_, _>>();
888
889 // Populate worktree entries.
890 {
891 let mut db_entries = worktree_entry::Entity::find()
892 .filter(
893 Condition::all()
894 .add(worktree_entry::Column::ProjectId.eq(project.id))
895 .add(worktree_entry::Column::IsDeleted.eq(false)),
896 )
897 .stream(tx)
898 .await?;
899 while let Some(db_entry) = db_entries.next().await {
900 let db_entry = db_entry?;
901 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
902 worktree.entries.push(proto::Entry {
903 id: db_entry.id as u64,
904 is_dir: db_entry.is_dir,
905 path: db_entry.path,
906 inode: db_entry.inode as u64,
907 mtime: Some(proto::Timestamp {
908 seconds: db_entry.mtime_seconds as u64,
909 nanos: db_entry.mtime_nanos as u32,
910 }),
911 canonical_path: db_entry.canonical_path,
912 is_ignored: db_entry.is_ignored,
913 is_external: db_entry.is_external,
914 is_hidden: db_entry.is_hidden,
915 // This is only used in the summarization backlog, so if it's None,
916 // that just means we won't be able to detect when to resummarize
917 // based on total number of backlogged bytes - instead, we'd go
918 // on number of files only. That shouldn't be a huge deal in practice.
919 size: None,
920 is_fifo: db_entry.is_fifo,
921 });
922 }
923 }
924 }
925
926 // Populate repository entries.
927 let mut repositories = Vec::new();
928 {
929 let db_repository_entries = project_repository::Entity::find()
930 .filter(
931 Condition::all()
932 .add(project_repository::Column::ProjectId.eq(project.id))
933 .add(project_repository::Column::IsDeleted.eq(false)),
934 )
935 .all(tx)
936 .await?;
937 for db_repository_entry in db_repository_entries {
938 let mut repository_statuses = project_repository_statuses::Entity::find()
939 .filter(
940 Condition::all()
941 .add(project_repository_statuses::Column::ProjectId.eq(project.id))
942 .add(
943 project_repository_statuses::Column::RepositoryId
944 .eq(db_repository_entry.id),
945 )
946 .add(project_repository_statuses::Column::IsDeleted.eq(false)),
947 )
948 .stream(tx)
949 .await?;
950 let mut updated_statuses = Vec::new();
951 while let Some(status_entry) = repository_statuses.next().await {
952 let status_entry = status_entry?;
953 updated_statuses.push(db_status_to_proto(status_entry)?);
954 }
955
956 let current_merge_conflicts = db_repository_entry
957 .current_merge_conflicts
958 .as_ref()
959 .map(|conflicts| serde_json::from_str(conflicts))
960 .transpose()?
961 .unwrap_or_default();
962
963 let branch_summary = db_repository_entry
964 .branch_summary
965 .as_ref()
966 .map(|branch_summary| serde_json::from_str(branch_summary))
967 .transpose()?
968 .unwrap_or_default();
969
970 let head_commit_details = db_repository_entry
971 .head_commit_details
972 .as_ref()
973 .map(|head_commit_details| serde_json::from_str(head_commit_details))
974 .transpose()?
975 .unwrap_or_default();
976
977 let entry_ids = serde_json::from_str(&db_repository_entry.entry_ids)
978 .context("failed to deserialize repository's entry ids")?;
979
980 if let Some(worktree_id) = db_repository_entry.legacy_worktree_id {
981 if let Some(worktree) = worktrees.get_mut(&(worktree_id as u64)) {
982 worktree.legacy_repository_entries.insert(
983 db_repository_entry.id as u64,
984 proto::RepositoryEntry {
985 repository_id: db_repository_entry.id as u64,
986 updated_statuses,
987 removed_statuses: Vec::new(),
988 current_merge_conflicts,
989 branch_summary,
990 },
991 );
992 }
993 } else {
994 repositories.push(proto::UpdateRepository {
995 project_id: db_repository_entry.project_id.0 as u64,
996 id: db_repository_entry.id as u64,
997 abs_path: db_repository_entry.abs_path,
998 entry_ids,
999 updated_statuses,
1000 removed_statuses: Vec::new(),
1001 current_merge_conflicts,
1002 branch_summary,
1003 head_commit_details,
1004 scan_id: db_repository_entry.scan_id as u64,
1005 is_last_update: true,
1006 merge_message: db_repository_entry.merge_message,
1007 stash_entries: Vec::new(),
1008 renamed_paths: Default::default(),
1009 });
1010 }
1011 }
1012 }
1013
1014 // Populate worktree diagnostic summaries.
1015 {
1016 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
1017 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
1018 .stream(tx)
1019 .await?;
1020 while let Some(db_summary) = db_summaries.next().await {
1021 let db_summary = db_summary?;
1022 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
1023 worktree
1024 .diagnostic_summaries
1025 .push(proto::DiagnosticSummary {
1026 path: db_summary.path,
1027 language_server_id: db_summary.language_server_id as u64,
1028 error_count: db_summary.error_count as u32,
1029 warning_count: db_summary.warning_count as u32,
1030 });
1031 }
1032 }
1033 }
1034
1035 // Populate worktree settings files
1036 {
1037 let mut db_settings_files = worktree_settings_file::Entity::find()
1038 .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
1039 .stream(tx)
1040 .await?;
1041 while let Some(db_settings_file) = db_settings_files.next().await {
1042 let db_settings_file = db_settings_file?;
1043 if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
1044 worktree.settings_files.push(WorktreeSettingsFile {
1045 path: db_settings_file.path,
1046 content: db_settings_file.content,
1047 kind: db_settings_file.kind,
1048 });
1049 }
1050 }
1051 }
1052
1053 // Populate language servers.
1054 let language_servers = project
1055 .find_related(language_server::Entity)
1056 .all(tx)
1057 .await?;
1058
1059 let path_style = if project.windows_paths {
1060 PathStyle::Windows
1061 } else {
1062 PathStyle::Posix
1063 };
1064
1065 let project = Project {
1066 id: project.id,
1067 role,
1068 collaborators: collaborators
1069 .into_iter()
1070 .map(|collaborator| ProjectCollaborator {
1071 connection_id: collaborator.connection(),
1072 user_id: collaborator.user_id,
1073 replica_id: collaborator.replica_id,
1074 is_host: collaborator.is_host,
1075 committer_name: collaborator.committer_name,
1076 committer_email: collaborator.committer_email,
1077 })
1078 .collect(),
1079 worktrees,
1080 repositories,
1081 language_servers: language_servers
1082 .into_iter()
1083 .map(|language_server| LanguageServer {
1084 server: proto::LanguageServer {
1085 id: language_server.id as u64,
1086 name: language_server.name,
1087 worktree_id: language_server.worktree_id.map(|id| id as u64),
1088 },
1089 capabilities: language_server.capabilities,
1090 })
1091 .collect(),
1092 path_style,
1093 };
1094 Ok((project, replica_id as ReplicaId))
1095 }
1096
1097 /// Removes the given connection from the specified project.
1098 pub async fn leave_project(
1099 &self,
1100 project_id: ProjectId,
1101 connection: ConnectionId,
1102 ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
1103 self.project_transaction(project_id, |tx| async move {
1104 let result = project_collaborator::Entity::delete_many()
1105 .filter(
1106 Condition::all()
1107 .add(project_collaborator::Column::ProjectId.eq(project_id))
1108 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
1109 .add(
1110 project_collaborator::Column::ConnectionServerId
1111 .eq(connection.owner_id as i32),
1112 ),
1113 )
1114 .exec(&*tx)
1115 .await?;
1116 if result.rows_affected == 0 {
1117 Err(anyhow!("not a collaborator on this project"))?;
1118 }
1119
1120 let project = project::Entity::find_by_id(project_id)
1121 .one(&*tx)
1122 .await?
1123 .context("no such project")?;
1124 let collaborators = project
1125 .find_related(project_collaborator::Entity)
1126 .all(&*tx)
1127 .await?;
1128 let connection_ids: Vec<ConnectionId> = collaborators
1129 .into_iter()
1130 .map(|collaborator| collaborator.connection())
1131 .collect();
1132
1133 follower::Entity::delete_many()
1134 .filter(
1135 Condition::any()
1136 .add(
1137 Condition::all()
1138 .add(follower::Column::ProjectId.eq(Some(project_id)))
1139 .add(
1140 follower::Column::LeaderConnectionServerId
1141 .eq(connection.owner_id),
1142 )
1143 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
1144 )
1145 .add(
1146 Condition::all()
1147 .add(follower::Column::ProjectId.eq(Some(project_id)))
1148 .add(
1149 follower::Column::FollowerConnectionServerId
1150 .eq(connection.owner_id),
1151 )
1152 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
1153 ),
1154 )
1155 .exec(&*tx)
1156 .await?;
1157
1158 let room = if let Some(room_id) = project.room_id {
1159 Some(self.get_room(room_id, &tx).await?)
1160 } else {
1161 None
1162 };
1163
1164 let left_project = LeftProject {
1165 id: project_id,
1166 should_unshare: connection == project.host_connection()?,
1167 connection_ids,
1168 };
1169 Ok((room, left_project))
1170 })
1171 .await
1172 }
1173
1174 pub async fn check_user_is_project_host(
1175 &self,
1176 project_id: ProjectId,
1177 connection_id: ConnectionId,
1178 ) -> Result<()> {
1179 self.project_transaction(project_id, |tx| async move {
1180 project::Entity::find()
1181 .filter(
1182 Condition::all()
1183 .add(project::Column::Id.eq(project_id))
1184 .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
1185 .add(
1186 project::Column::HostConnectionServerId
1187 .eq(Some(connection_id.owner_id as i32)),
1188 ),
1189 )
1190 .one(&*tx)
1191 .await?
1192 .context("failed to read project host")?;
1193
1194 Ok(())
1195 })
1196 .await
1197 .map(|guard| guard.into_inner())
1198 }
1199
1200 /// Returns the current project if the given user is authorized to access it with the specified capability.
1201 pub async fn access_project(
1202 &self,
1203 project_id: ProjectId,
1204 connection_id: ConnectionId,
1205 capability: Capability,
1206 tx: &DatabaseTransaction,
1207 ) -> Result<(project::Model, ChannelRole)> {
1208 let project = project::Entity::find_by_id(project_id)
1209 .one(tx)
1210 .await?
1211 .context("no such project")?;
1212
1213 let role_from_room = if let Some(room_id) = project.room_id {
1214 room_participant::Entity::find()
1215 .filter(room_participant::Column::RoomId.eq(room_id))
1216 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1217 .one(tx)
1218 .await?
1219 .and_then(|participant| participant.role)
1220 } else {
1221 None
1222 };
1223
1224 let role = role_from_room.unwrap_or(ChannelRole::Banned);
1225
1226 match capability {
1227 Capability::ReadWrite => {
1228 if !role.can_edit_projects() {
1229 return Err(anyhow!("not authorized to edit projects"))?;
1230 }
1231 }
1232 Capability::ReadOnly => {
1233 if !role.can_read_projects() {
1234 return Err(anyhow!("not authorized to read projects"))?;
1235 }
1236 }
1237 }
1238
1239 Ok((project, role))
1240 }
1241
1242 /// Returns the host connection for a read-only request to join a shared project.
1243 pub async fn host_for_read_only_project_request(
1244 &self,
1245 project_id: ProjectId,
1246 connection_id: ConnectionId,
1247 ) -> Result<ConnectionId> {
1248 self.project_transaction(project_id, |tx| async move {
1249 let (project, _) = self
1250 .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1251 .await?;
1252 project.host_connection()
1253 })
1254 .await
1255 .map(|guard| guard.into_inner())
1256 }
1257
1258 /// Returns the host connection for a request to join a shared project.
1259 pub async fn host_for_mutating_project_request(
1260 &self,
1261 project_id: ProjectId,
1262 connection_id: ConnectionId,
1263 ) -> Result<ConnectionId> {
1264 self.project_transaction(project_id, |tx| async move {
1265 let (project, _) = self
1266 .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1267 .await?;
1268 project.host_connection()
1269 })
1270 .await
1271 .map(|guard| guard.into_inner())
1272 }
1273
1274 pub async fn connections_for_buffer_update(
1275 &self,
1276 project_id: ProjectId,
1277 connection_id: ConnectionId,
1278 capability: Capability,
1279 ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1280 self.project_transaction(project_id, |tx| async move {
1281 // Authorize
1282 let (project, _) = self
1283 .access_project(project_id, connection_id, capability, &tx)
1284 .await?;
1285
1286 let host_connection_id = project.host_connection()?;
1287
1288 let collaborators = project_collaborator::Entity::find()
1289 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1290 .all(&*tx)
1291 .await?;
1292
1293 let guest_connection_ids = collaborators
1294 .into_iter()
1295 .filter_map(|collaborator| {
1296 if collaborator.is_host {
1297 None
1298 } else {
1299 Some(collaborator.connection())
1300 }
1301 })
1302 .collect();
1303
1304 Ok((host_connection_id, guest_connection_ids))
1305 })
1306 .await
1307 }
1308
1309 /// Returns the connection IDs in the given project.
1310 ///
1311 /// The provided `connection_id` must also be a collaborator in the project,
1312 /// otherwise an error will be returned.
1313 pub async fn project_connection_ids(
1314 &self,
1315 project_id: ProjectId,
1316 connection_id: ConnectionId,
1317 exclude_dev_server: bool,
1318 ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1319 self.project_transaction(project_id, |tx| async move {
1320 self.internal_project_connection_ids(project_id, connection_id, exclude_dev_server, &tx)
1321 .await
1322 })
1323 .await
1324 }
1325
1326 async fn internal_project_connection_ids(
1327 &self,
1328 project_id: ProjectId,
1329 connection_id: ConnectionId,
1330 exclude_dev_server: bool,
1331 tx: &DatabaseTransaction,
1332 ) -> Result<HashSet<ConnectionId>> {
1333 let project = project::Entity::find_by_id(project_id)
1334 .one(tx)
1335 .await?
1336 .context("no such project")?;
1337
1338 let mut collaborators = project_collaborator::Entity::find()
1339 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1340 .stream(tx)
1341 .await?;
1342
1343 let mut connection_ids = HashSet::default();
1344 if let Some(host_connection) = project.host_connection().log_err()
1345 && !exclude_dev_server
1346 {
1347 connection_ids.insert(host_connection);
1348 }
1349
1350 while let Some(collaborator) = collaborators.next().await {
1351 let collaborator = collaborator?;
1352 connection_ids.insert(collaborator.connection());
1353 }
1354
1355 if connection_ids.contains(&connection_id)
1356 || Some(connection_id) == project.host_connection().ok()
1357 {
1358 Ok(connection_ids)
1359 } else {
1360 Err(anyhow!(
1361 "can only send project updates to a project you're in"
1362 ))?
1363 }
1364 }
1365
1366 async fn project_guest_connection_ids(
1367 &self,
1368 project_id: ProjectId,
1369 tx: &DatabaseTransaction,
1370 ) -> Result<Vec<ConnectionId>> {
1371 let mut collaborators = project_collaborator::Entity::find()
1372 .filter(
1373 project_collaborator::Column::ProjectId
1374 .eq(project_id)
1375 .and(project_collaborator::Column::IsHost.eq(false)),
1376 )
1377 .stream(tx)
1378 .await?;
1379
1380 let mut guest_connection_ids = Vec::new();
1381 while let Some(collaborator) = collaborators.next().await {
1382 let collaborator = collaborator?;
1383 guest_connection_ids.push(collaborator.connection());
1384 }
1385 Ok(guest_connection_ids)
1386 }
1387
1388 /// Returns the [`RoomId`] for the given project.
1389 pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1390 self.transaction(|tx| async move {
1391 Ok(project::Entity::find_by_id(project_id)
1392 .one(&*tx)
1393 .await?
1394 .and_then(|project| project.room_id))
1395 })
1396 .await
1397 }
1398
1399 pub async fn check_room_participants(
1400 &self,
1401 room_id: RoomId,
1402 leader_id: ConnectionId,
1403 follower_id: ConnectionId,
1404 ) -> Result<()> {
1405 self.transaction(|tx| async move {
1406 use room_participant::Column;
1407
1408 let count = room_participant::Entity::find()
1409 .filter(
1410 Condition::all().add(Column::RoomId.eq(room_id)).add(
1411 Condition::any()
1412 .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1413 Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1414 ))
1415 .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1416 Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1417 )),
1418 ),
1419 )
1420 .count(&*tx)
1421 .await?;
1422
1423 if count < 2 {
1424 Err(anyhow!("not room participants"))?;
1425 }
1426
1427 Ok(())
1428 })
1429 .await
1430 }
1431
1432 /// Adds the given follower connection as a follower of the given leader connection.
1433 pub async fn follow(
1434 &self,
1435 room_id: RoomId,
1436 project_id: ProjectId,
1437 leader_connection: ConnectionId,
1438 follower_connection: ConnectionId,
1439 ) -> Result<TransactionGuard<proto::Room>> {
1440 self.room_transaction(room_id, |tx| async move {
1441 follower::ActiveModel {
1442 room_id: ActiveValue::set(room_id),
1443 project_id: ActiveValue::set(project_id),
1444 leader_connection_server_id: ActiveValue::set(ServerId(
1445 leader_connection.owner_id as i32,
1446 )),
1447 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1448 follower_connection_server_id: ActiveValue::set(ServerId(
1449 follower_connection.owner_id as i32,
1450 )),
1451 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1452 ..Default::default()
1453 }
1454 .insert(&*tx)
1455 .await?;
1456
1457 let room = self.get_room(room_id, &tx).await?;
1458 Ok(room)
1459 })
1460 .await
1461 }
1462
1463 /// Removes the given follower connection as a follower of the given leader connection.
1464 pub async fn unfollow(
1465 &self,
1466 room_id: RoomId,
1467 project_id: ProjectId,
1468 leader_connection: ConnectionId,
1469 follower_connection: ConnectionId,
1470 ) -> Result<TransactionGuard<proto::Room>> {
1471 self.room_transaction(room_id, |tx| async move {
1472 follower::Entity::delete_many()
1473 .filter(
1474 Condition::all()
1475 .add(follower::Column::RoomId.eq(room_id))
1476 .add(follower::Column::ProjectId.eq(project_id))
1477 .add(
1478 follower::Column::LeaderConnectionServerId
1479 .eq(leader_connection.owner_id),
1480 )
1481 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1482 .add(
1483 follower::Column::FollowerConnectionServerId
1484 .eq(follower_connection.owner_id),
1485 )
1486 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1487 )
1488 .exec(&*tx)
1489 .await?;
1490
1491 let room = self.get_room(room_id, &tx).await?;
1492 Ok(room)
1493 })
1494 .await
1495 }
1496}