1use anyhow::Context as _;
2use collections::HashSet;
3use util::ResultExt;
4
5use super::*;
6
7impl Database {
8 /// Returns the count of all projects, excluding ones marked as admin.
9 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
10 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
11 enum QueryAs {
12 Count,
13 }
14
15 self.transaction(|tx| async move {
16 Ok(project::Entity::find()
17 .select_only()
18 .column_as(project::Column::Id.count(), QueryAs::Count)
19 .inner_join(user::Entity)
20 .filter(user::Column::Admin.eq(false))
21 .into_values::<_, QueryAs>()
22 .one(&*tx)
23 .await?
24 .unwrap_or(0i64) as usize)
25 })
26 .await
27 }
28
29 /// Shares a project with the given room.
30 pub async fn share_project(
31 &self,
32 room_id: RoomId,
33 connection: ConnectionId,
34 worktrees: &[proto::WorktreeMetadata],
35 is_ssh_project: bool,
36 ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
37 self.room_transaction(room_id, |tx| async move {
38 let participant = room_participant::Entity::find()
39 .filter(
40 Condition::all()
41 .add(
42 room_participant::Column::AnsweringConnectionId
43 .eq(connection.id as i32),
44 )
45 .add(
46 room_participant::Column::AnsweringConnectionServerId
47 .eq(connection.owner_id as i32),
48 ),
49 )
50 .one(&*tx)
51 .await?
52 .context("could not find participant")?;
53 if participant.room_id != room_id {
54 return Err(anyhow!("shared project on unexpected room"))?;
55 }
56 if !participant
57 .role
58 .unwrap_or(ChannelRole::Member)
59 .can_edit_projects()
60 {
61 return Err(anyhow!("guests cannot share projects"))?;
62 }
63
64 let project = project::ActiveModel {
65 room_id: ActiveValue::set(Some(participant.room_id)),
66 host_user_id: ActiveValue::set(Some(participant.user_id)),
67 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
68 host_connection_server_id: ActiveValue::set(Some(ServerId(
69 connection.owner_id as i32,
70 ))),
71 id: ActiveValue::NotSet,
72 }
73 .insert(&*tx)
74 .await?;
75
76 if !worktrees.is_empty() {
77 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
78 worktree::ActiveModel {
79 id: ActiveValue::set(worktree.id as i64),
80 project_id: ActiveValue::set(project.id),
81 abs_path: ActiveValue::set(worktree.abs_path.clone()),
82 root_name: ActiveValue::set(worktree.root_name.clone()),
83 visible: ActiveValue::set(worktree.visible),
84 scan_id: ActiveValue::set(0),
85 completed_scan_id: ActiveValue::set(0),
86 }
87 }))
88 .exec(&*tx)
89 .await?;
90 }
91
92 let replica_id = if is_ssh_project { 1 } else { 0 };
93
94 project_collaborator::ActiveModel {
95 project_id: ActiveValue::set(project.id),
96 connection_id: ActiveValue::set(connection.id as i32),
97 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
98 user_id: ActiveValue::set(participant.user_id),
99 replica_id: ActiveValue::set(ReplicaId(replica_id)),
100 is_host: ActiveValue::set(true),
101 id: ActiveValue::NotSet,
102 committer_name: ActiveValue::Set(None),
103 committer_email: ActiveValue::Set(None),
104 }
105 .insert(&*tx)
106 .await?;
107
108 let room = self.get_room(room_id, &tx).await?;
109 Ok((project.id, room))
110 })
111 .await
112 }
113
114 pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
115 self.transaction(|tx| async move {
116 project::Entity::delete_by_id(project_id).exec(&*tx).await?;
117 Ok(())
118 })
119 .await
120 }
121
122 /// Unshares the given project.
123 pub async fn unshare_project(
124 &self,
125 project_id: ProjectId,
126 connection: ConnectionId,
127 ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
128 self.project_transaction(project_id, |tx| async move {
129 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
130 let project = project::Entity::find_by_id(project_id)
131 .one(&*tx)
132 .await?
133 .context("project not found")?;
134 let room = if let Some(room_id) = project.room_id {
135 Some(self.get_room(room_id, &tx).await?)
136 } else {
137 None
138 };
139 if project.host_connection()? == connection {
140 return Ok((true, room, guest_connection_ids));
141 }
142 Err(anyhow!("cannot unshare a project hosted by another user"))?
143 })
144 .await
145 }
146
147 /// Updates the worktrees associated with the given project.
148 pub async fn update_project(
149 &self,
150 project_id: ProjectId,
151 connection: ConnectionId,
152 worktrees: &[proto::WorktreeMetadata],
153 ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
154 self.project_transaction(project_id, |tx| async move {
155 let project = project::Entity::find_by_id(project_id)
156 .filter(
157 Condition::all()
158 .add(project::Column::HostConnectionId.eq(connection.id as i32))
159 .add(
160 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
161 ),
162 )
163 .one(&*tx)
164 .await?
165 .context("no such project")?;
166
167 self.update_project_worktrees(project.id, worktrees, &tx)
168 .await?;
169
170 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
171
172 let room = if let Some(room_id) = project.room_id {
173 Some(self.get_room(room_id, &tx).await?)
174 } else {
175 None
176 };
177
178 Ok((room, guest_connection_ids))
179 })
180 .await
181 }
182
183 pub(in crate::db) async fn update_project_worktrees(
184 &self,
185 project_id: ProjectId,
186 worktrees: &[proto::WorktreeMetadata],
187 tx: &DatabaseTransaction,
188 ) -> Result<()> {
189 if !worktrees.is_empty() {
190 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
191 id: ActiveValue::set(worktree.id as i64),
192 project_id: ActiveValue::set(project_id),
193 abs_path: ActiveValue::set(worktree.abs_path.clone()),
194 root_name: ActiveValue::set(worktree.root_name.clone()),
195 visible: ActiveValue::set(worktree.visible),
196 scan_id: ActiveValue::set(0),
197 completed_scan_id: ActiveValue::set(0),
198 }))
199 .on_conflict(
200 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
201 .update_column(worktree::Column::RootName)
202 .to_owned(),
203 )
204 .exec(tx)
205 .await?;
206 }
207
208 worktree::Entity::delete_many()
209 .filter(worktree::Column::ProjectId.eq(project_id).and(
210 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
211 ))
212 .exec(tx)
213 .await?;
214
215 Ok(())
216 }
217
218 pub async fn update_worktree(
219 &self,
220 update: &proto::UpdateWorktree,
221 connection: ConnectionId,
222 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
223 if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
224 || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
225 {
226 return Err(anyhow!(
227 "invalid worktree update. removed entries: {}, updated entries: {}",
228 update.removed_entries.len(),
229 update.updated_entries.len()
230 ))?;
231 }
232
233 let project_id = ProjectId::from_proto(update.project_id);
234 let worktree_id = update.worktree_id as i64;
235 self.project_transaction(project_id, |tx| async move {
236 // Ensure the update comes from the host.
237 let _project = project::Entity::find_by_id(project_id)
238 .filter(
239 Condition::all()
240 .add(project::Column::HostConnectionId.eq(connection.id as i32))
241 .add(
242 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
243 ),
244 )
245 .one(&*tx)
246 .await?
247 .with_context(|| format!("no such project: {project_id}"))?;
248
249 // Update metadata.
250 worktree::Entity::update(worktree::ActiveModel {
251 id: ActiveValue::set(worktree_id),
252 project_id: ActiveValue::set(project_id),
253 root_name: ActiveValue::set(update.root_name.clone()),
254 scan_id: ActiveValue::set(update.scan_id as i64),
255 completed_scan_id: if update.is_last_update {
256 ActiveValue::set(update.scan_id as i64)
257 } else {
258 ActiveValue::default()
259 },
260 abs_path: ActiveValue::set(update.abs_path.clone()),
261 ..Default::default()
262 })
263 .exec(&*tx)
264 .await?;
265
266 if !update.updated_entries.is_empty() {
267 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
268 let mtime = entry.mtime.clone().unwrap_or_default();
269 worktree_entry::ActiveModel {
270 project_id: ActiveValue::set(project_id),
271 worktree_id: ActiveValue::set(worktree_id),
272 id: ActiveValue::set(entry.id as i64),
273 is_dir: ActiveValue::set(entry.is_dir),
274 path: ActiveValue::set(entry.path.clone()),
275 inode: ActiveValue::set(entry.inode as i64),
276 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
277 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
278 canonical_path: ActiveValue::set(entry.canonical_path.clone()),
279 is_ignored: ActiveValue::set(entry.is_ignored),
280 git_status: ActiveValue::set(None),
281 is_external: ActiveValue::set(entry.is_external),
282 is_deleted: ActiveValue::set(false),
283 scan_id: ActiveValue::set(update.scan_id as i64),
284 is_fifo: ActiveValue::set(entry.is_fifo),
285 }
286 }))
287 .on_conflict(
288 OnConflict::columns([
289 worktree_entry::Column::ProjectId,
290 worktree_entry::Column::WorktreeId,
291 worktree_entry::Column::Id,
292 ])
293 .update_columns([
294 worktree_entry::Column::IsDir,
295 worktree_entry::Column::Path,
296 worktree_entry::Column::Inode,
297 worktree_entry::Column::MtimeSeconds,
298 worktree_entry::Column::MtimeNanos,
299 worktree_entry::Column::CanonicalPath,
300 worktree_entry::Column::IsIgnored,
301 worktree_entry::Column::ScanId,
302 ])
303 .to_owned(),
304 )
305 .exec(&*tx)
306 .await?;
307 }
308
309 if !update.removed_entries.is_empty() {
310 worktree_entry::Entity::update_many()
311 .filter(
312 worktree_entry::Column::ProjectId
313 .eq(project_id)
314 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
315 .and(
316 worktree_entry::Column::Id
317 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
318 ),
319 )
320 .set(worktree_entry::ActiveModel {
321 is_deleted: ActiveValue::Set(true),
322 scan_id: ActiveValue::Set(update.scan_id as i64),
323 ..Default::default()
324 })
325 .exec(&*tx)
326 .await?;
327 }
328
329 // Backward-compatibility for old Zed clients.
330 //
331 // Remove this block when Zed 1.80 stable has been out for a week.
332 {
333 if !update.updated_repositories.is_empty() {
334 project_repository::Entity::insert_many(
335 update.updated_repositories.iter().map(|repository| {
336 project_repository::ActiveModel {
337 project_id: ActiveValue::set(project_id),
338 legacy_worktree_id: ActiveValue::set(Some(worktree_id)),
339 id: ActiveValue::set(repository.repository_id as i64),
340 scan_id: ActiveValue::set(update.scan_id as i64),
341 is_deleted: ActiveValue::set(false),
342 branch_summary: ActiveValue::Set(
343 repository
344 .branch_summary
345 .as_ref()
346 .map(|summary| serde_json::to_string(summary).unwrap()),
347 ),
348 current_merge_conflicts: ActiveValue::Set(Some(
349 serde_json::to_string(&repository.current_merge_conflicts)
350 .unwrap(),
351 )),
352 // Old clients do not use abs path, entry ids, head_commit_details, or merge_message.
353 abs_path: ActiveValue::set(String::new()),
354 entry_ids: ActiveValue::set("[]".into()),
355 head_commit_details: ActiveValue::set(None),
356 merge_message: ActiveValue::set(None),
357 }
358 }),
359 )
360 .on_conflict(
361 OnConflict::columns([
362 project_repository::Column::ProjectId,
363 project_repository::Column::Id,
364 ])
365 .update_columns([
366 project_repository::Column::ScanId,
367 project_repository::Column::BranchSummary,
368 project_repository::Column::CurrentMergeConflicts,
369 ])
370 .to_owned(),
371 )
372 .exec(&*tx)
373 .await?;
374
375 let has_any_statuses = update
376 .updated_repositories
377 .iter()
378 .any(|repository| !repository.updated_statuses.is_empty());
379
380 if has_any_statuses {
381 project_repository_statuses::Entity::insert_many(
382 update.updated_repositories.iter().flat_map(
383 |repository: &proto::RepositoryEntry| {
384 repository.updated_statuses.iter().map(|status_entry| {
385 let (repo_path, status_kind, first_status, second_status) =
386 proto_status_to_db(status_entry.clone());
387 project_repository_statuses::ActiveModel {
388 project_id: ActiveValue::set(project_id),
389 repository_id: ActiveValue::set(
390 repository.repository_id as i64,
391 ),
392 scan_id: ActiveValue::set(update.scan_id as i64),
393 is_deleted: ActiveValue::set(false),
394 repo_path: ActiveValue::set(repo_path),
395 status: ActiveValue::set(0),
396 status_kind: ActiveValue::set(status_kind),
397 first_status: ActiveValue::set(first_status),
398 second_status: ActiveValue::set(second_status),
399 }
400 })
401 },
402 ),
403 )
404 .on_conflict(
405 OnConflict::columns([
406 project_repository_statuses::Column::ProjectId,
407 project_repository_statuses::Column::RepositoryId,
408 project_repository_statuses::Column::RepoPath,
409 ])
410 .update_columns([
411 project_repository_statuses::Column::ScanId,
412 project_repository_statuses::Column::StatusKind,
413 project_repository_statuses::Column::FirstStatus,
414 project_repository_statuses::Column::SecondStatus,
415 ])
416 .to_owned(),
417 )
418 .exec(&*tx)
419 .await?;
420 }
421
422 for repo in &update.updated_repositories {
423 if !repo.removed_statuses.is_empty() {
424 project_repository_statuses::Entity::update_many()
425 .filter(
426 project_repository_statuses::Column::ProjectId
427 .eq(project_id)
428 .and(
429 project_repository_statuses::Column::RepositoryId
430 .eq(repo.repository_id),
431 )
432 .and(
433 project_repository_statuses::Column::RepoPath
434 .is_in(repo.removed_statuses.iter()),
435 ),
436 )
437 .set(project_repository_statuses::ActiveModel {
438 is_deleted: ActiveValue::Set(true),
439 scan_id: ActiveValue::Set(update.scan_id as i64),
440 ..Default::default()
441 })
442 .exec(&*tx)
443 .await?;
444 }
445 }
446 }
447
448 if !update.removed_repositories.is_empty() {
449 project_repository::Entity::update_many()
450 .filter(
451 project_repository::Column::ProjectId
452 .eq(project_id)
453 .and(project_repository::Column::LegacyWorktreeId.eq(worktree_id))
454 .and(project_repository::Column::Id.is_in(
455 update.removed_repositories.iter().map(|id| *id as i64),
456 )),
457 )
458 .set(project_repository::ActiveModel {
459 is_deleted: ActiveValue::Set(true),
460 scan_id: ActiveValue::Set(update.scan_id as i64),
461 ..Default::default()
462 })
463 .exec(&*tx)
464 .await?;
465 }
466 }
467
468 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
469 Ok(connection_ids)
470 })
471 .await
472 }
473
474 pub async fn update_repository(
475 &self,
476 update: &proto::UpdateRepository,
477 _connection: ConnectionId,
478 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
479 let project_id = ProjectId::from_proto(update.project_id);
480 let repository_id = update.id as i64;
481 self.project_transaction(project_id, |tx| async move {
482 project_repository::Entity::insert(project_repository::ActiveModel {
483 project_id: ActiveValue::set(project_id),
484 id: ActiveValue::set(repository_id),
485 legacy_worktree_id: ActiveValue::set(None),
486 abs_path: ActiveValue::set(update.abs_path.clone()),
487 entry_ids: ActiveValue::Set(serde_json::to_string(&update.entry_ids).unwrap()),
488 scan_id: ActiveValue::set(update.scan_id as i64),
489 is_deleted: ActiveValue::set(false),
490 branch_summary: ActiveValue::Set(
491 update
492 .branch_summary
493 .as_ref()
494 .map(|summary| serde_json::to_string(summary).unwrap()),
495 ),
496 head_commit_details: ActiveValue::Set(
497 update
498 .head_commit_details
499 .as_ref()
500 .map(|details| serde_json::to_string(details).unwrap()),
501 ),
502 current_merge_conflicts: ActiveValue::Set(Some(
503 serde_json::to_string(&update.current_merge_conflicts).unwrap(),
504 )),
505 merge_message: ActiveValue::set(update.merge_message.clone()),
506 })
507 .on_conflict(
508 OnConflict::columns([
509 project_repository::Column::ProjectId,
510 project_repository::Column::Id,
511 ])
512 .update_columns([
513 project_repository::Column::ScanId,
514 project_repository::Column::BranchSummary,
515 project_repository::Column::EntryIds,
516 project_repository::Column::AbsPath,
517 project_repository::Column::CurrentMergeConflicts,
518 project_repository::Column::HeadCommitDetails,
519 project_repository::Column::MergeMessage,
520 ])
521 .to_owned(),
522 )
523 .exec(&*tx)
524 .await?;
525
526 let has_any_statuses = !update.updated_statuses.is_empty();
527
528 if has_any_statuses {
529 project_repository_statuses::Entity::insert_many(
530 update.updated_statuses.iter().map(|status_entry| {
531 let (repo_path, status_kind, first_status, second_status) =
532 proto_status_to_db(status_entry.clone());
533 project_repository_statuses::ActiveModel {
534 project_id: ActiveValue::set(project_id),
535 repository_id: ActiveValue::set(repository_id),
536 scan_id: ActiveValue::set(update.scan_id as i64),
537 is_deleted: ActiveValue::set(false),
538 repo_path: ActiveValue::set(repo_path),
539 status: ActiveValue::set(0),
540 status_kind: ActiveValue::set(status_kind),
541 first_status: ActiveValue::set(first_status),
542 second_status: ActiveValue::set(second_status),
543 }
544 }),
545 )
546 .on_conflict(
547 OnConflict::columns([
548 project_repository_statuses::Column::ProjectId,
549 project_repository_statuses::Column::RepositoryId,
550 project_repository_statuses::Column::RepoPath,
551 ])
552 .update_columns([
553 project_repository_statuses::Column::ScanId,
554 project_repository_statuses::Column::StatusKind,
555 project_repository_statuses::Column::FirstStatus,
556 project_repository_statuses::Column::SecondStatus,
557 ])
558 .to_owned(),
559 )
560 .exec(&*tx)
561 .await?;
562 }
563
564 let has_any_removed_statuses = !update.removed_statuses.is_empty();
565
566 if has_any_removed_statuses {
567 project_repository_statuses::Entity::update_many()
568 .filter(
569 project_repository_statuses::Column::ProjectId
570 .eq(project_id)
571 .and(
572 project_repository_statuses::Column::RepositoryId.eq(repository_id),
573 )
574 .and(
575 project_repository_statuses::Column::RepoPath
576 .is_in(update.removed_statuses.iter()),
577 ),
578 )
579 .set(project_repository_statuses::ActiveModel {
580 is_deleted: ActiveValue::Set(true),
581 scan_id: ActiveValue::Set(update.scan_id as i64),
582 ..Default::default()
583 })
584 .exec(&*tx)
585 .await?;
586 }
587
588 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
589 Ok(connection_ids)
590 })
591 .await
592 }
593
594 pub async fn remove_repository(
595 &self,
596 remove: &proto::RemoveRepository,
597 _connection: ConnectionId,
598 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
599 let project_id = ProjectId::from_proto(remove.project_id);
600 let repository_id = remove.id as i64;
601 self.project_transaction(project_id, |tx| async move {
602 project_repository::Entity::update_many()
603 .filter(
604 project_repository::Column::ProjectId
605 .eq(project_id)
606 .and(project_repository::Column::Id.eq(repository_id)),
607 )
608 .set(project_repository::ActiveModel {
609 is_deleted: ActiveValue::Set(true),
610 // scan_id: ActiveValue::Set(update.scan_id as i64),
611 ..Default::default()
612 })
613 .exec(&*tx)
614 .await?;
615
616 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
617 Ok(connection_ids)
618 })
619 .await
620 }
621
622 /// Updates the diagnostic summary for the given connection.
623 pub async fn update_diagnostic_summary(
624 &self,
625 update: &proto::UpdateDiagnosticSummary,
626 connection: ConnectionId,
627 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
628 let project_id = ProjectId::from_proto(update.project_id);
629 let worktree_id = update.worktree_id as i64;
630 self.project_transaction(project_id, |tx| async move {
631 let summary = update.summary.as_ref().context("invalid summary")?;
632
633 // Ensure the update comes from the host.
634 let project = project::Entity::find_by_id(project_id)
635 .one(&*tx)
636 .await?
637 .context("no such project")?;
638 if project.host_connection()? != connection {
639 return Err(anyhow!("can't update a project hosted by someone else"))?;
640 }
641
642 // Update summary.
643 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
644 project_id: ActiveValue::set(project_id),
645 worktree_id: ActiveValue::set(worktree_id),
646 path: ActiveValue::set(summary.path.clone()),
647 language_server_id: ActiveValue::set(summary.language_server_id as i64),
648 error_count: ActiveValue::set(summary.error_count as i32),
649 warning_count: ActiveValue::set(summary.warning_count as i32),
650 })
651 .on_conflict(
652 OnConflict::columns([
653 worktree_diagnostic_summary::Column::ProjectId,
654 worktree_diagnostic_summary::Column::WorktreeId,
655 worktree_diagnostic_summary::Column::Path,
656 ])
657 .update_columns([
658 worktree_diagnostic_summary::Column::LanguageServerId,
659 worktree_diagnostic_summary::Column::ErrorCount,
660 worktree_diagnostic_summary::Column::WarningCount,
661 ])
662 .to_owned(),
663 )
664 .exec(&*tx)
665 .await?;
666
667 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
668 Ok(connection_ids)
669 })
670 .await
671 }
672
673 /// Starts the language server for the given connection.
674 pub async fn start_language_server(
675 &self,
676 update: &proto::StartLanguageServer,
677 connection: ConnectionId,
678 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
679 let project_id = ProjectId::from_proto(update.project_id);
680 self.project_transaction(project_id, |tx| async move {
681 let server = update.server.as_ref().context("invalid language server")?;
682
683 // Ensure the update comes from the host.
684 let project = project::Entity::find_by_id(project_id)
685 .one(&*tx)
686 .await?
687 .context("no such project")?;
688 if project.host_connection()? != connection {
689 return Err(anyhow!("can't update a project hosted by someone else"))?;
690 }
691
692 // Add the newly-started language server.
693 language_server::Entity::insert(language_server::ActiveModel {
694 project_id: ActiveValue::set(project_id),
695 id: ActiveValue::set(server.id as i64),
696 name: ActiveValue::set(server.name.clone()),
697 worktree_id: ActiveValue::set(server.worktree_id.map(|id| id as i64)),
698 capabilities: ActiveValue::set(update.capabilities.clone()),
699 })
700 .on_conflict(
701 OnConflict::columns([
702 language_server::Column::ProjectId,
703 language_server::Column::Id,
704 ])
705 .update_columns([
706 language_server::Column::Name,
707 language_server::Column::Capabilities,
708 language_server::Column::WorktreeId,
709 ])
710 .to_owned(),
711 )
712 .exec(&*tx)
713 .await?;
714
715 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
716 Ok(connection_ids)
717 })
718 .await
719 }
720
721 /// Updates the worktree settings for the given connection.
722 pub async fn update_worktree_settings(
723 &self,
724 update: &proto::UpdateWorktreeSettings,
725 connection: ConnectionId,
726 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
727 let project_id = ProjectId::from_proto(update.project_id);
728 let kind = match update.kind {
729 Some(kind) => proto::LocalSettingsKind::from_i32(kind)
730 .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
731 None => proto::LocalSettingsKind::Settings,
732 };
733 let kind = LocalSettingsKind::from_proto(kind);
734 self.project_transaction(project_id, |tx| async move {
735 // Ensure the update comes from the host.
736 let project = project::Entity::find_by_id(project_id)
737 .one(&*tx)
738 .await?
739 .context("no such project")?;
740 if project.host_connection()? != connection {
741 return Err(anyhow!("can't update a project hosted by someone else"))?;
742 }
743
744 if let Some(content) = &update.content {
745 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
746 project_id: ActiveValue::Set(project_id),
747 worktree_id: ActiveValue::Set(update.worktree_id as i64),
748 path: ActiveValue::Set(update.path.clone()),
749 content: ActiveValue::Set(content.clone()),
750 kind: ActiveValue::Set(kind),
751 })
752 .on_conflict(
753 OnConflict::columns([
754 worktree_settings_file::Column::ProjectId,
755 worktree_settings_file::Column::WorktreeId,
756 worktree_settings_file::Column::Path,
757 ])
758 .update_column(worktree_settings_file::Column::Content)
759 .to_owned(),
760 )
761 .exec(&*tx)
762 .await?;
763 } else {
764 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
765 project_id: ActiveValue::Set(project_id),
766 worktree_id: ActiveValue::Set(update.worktree_id as i64),
767 path: ActiveValue::Set(update.path.clone()),
768 ..Default::default()
769 })
770 .exec(&*tx)
771 .await?;
772 }
773
774 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
775 Ok(connection_ids)
776 })
777 .await
778 }
779
780 pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
781 self.transaction(|tx| async move {
782 Ok(project::Entity::find_by_id(id)
783 .one(&*tx)
784 .await?
785 .context("no such project")?)
786 })
787 .await
788 }
789
790 /// Adds the given connection to the specified project
791 /// in the current room.
792 pub async fn join_project(
793 &self,
794 project_id: ProjectId,
795 connection: ConnectionId,
796 user_id: UserId,
797 committer_name: Option<String>,
798 committer_email: Option<String>,
799 ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
800 self.project_transaction(project_id, move |tx| {
801 let committer_name = committer_name.clone();
802 let committer_email = committer_email.clone();
803 async move {
804 let (project, role) = self
805 .access_project(project_id, connection, Capability::ReadOnly, &tx)
806 .await?;
807 self.join_project_internal(
808 project,
809 user_id,
810 committer_name,
811 committer_email,
812 connection,
813 role,
814 &tx,
815 )
816 .await
817 }
818 })
819 .await
820 }
821
822 async fn join_project_internal(
823 &self,
824 project: project::Model,
825 user_id: UserId,
826 committer_name: Option<String>,
827 committer_email: Option<String>,
828 connection: ConnectionId,
829 role: ChannelRole,
830 tx: &DatabaseTransaction,
831 ) -> Result<(Project, ReplicaId)> {
832 let mut collaborators = project
833 .find_related(project_collaborator::Entity)
834 .all(tx)
835 .await?;
836 let replica_ids = collaborators
837 .iter()
838 .map(|c| c.replica_id)
839 .collect::<HashSet<_>>();
840 let mut replica_id = ReplicaId(1);
841 while replica_ids.contains(&replica_id) {
842 replica_id.0 += 1;
843 }
844 let new_collaborator = project_collaborator::ActiveModel {
845 project_id: ActiveValue::set(project.id),
846 connection_id: ActiveValue::set(connection.id as i32),
847 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
848 user_id: ActiveValue::set(user_id),
849 replica_id: ActiveValue::set(replica_id),
850 is_host: ActiveValue::set(false),
851 id: ActiveValue::NotSet,
852 committer_name: ActiveValue::set(committer_name),
853 committer_email: ActiveValue::set(committer_email),
854 }
855 .insert(tx)
856 .await?;
857 collaborators.push(new_collaborator);
858
859 let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
860 let mut worktrees = db_worktrees
861 .into_iter()
862 .map(|db_worktree| {
863 (
864 db_worktree.id as u64,
865 Worktree {
866 id: db_worktree.id as u64,
867 abs_path: db_worktree.abs_path,
868 root_name: db_worktree.root_name,
869 visible: db_worktree.visible,
870 entries: Default::default(),
871 diagnostic_summaries: Default::default(),
872 settings_files: Default::default(),
873 scan_id: db_worktree.scan_id as u64,
874 completed_scan_id: db_worktree.completed_scan_id as u64,
875 legacy_repository_entries: Default::default(),
876 },
877 )
878 })
879 .collect::<BTreeMap<_, _>>();
880
881 // Populate worktree entries.
882 {
883 let mut db_entries = worktree_entry::Entity::find()
884 .filter(
885 Condition::all()
886 .add(worktree_entry::Column::ProjectId.eq(project.id))
887 .add(worktree_entry::Column::IsDeleted.eq(false)),
888 )
889 .stream(tx)
890 .await?;
891 while let Some(db_entry) = db_entries.next().await {
892 let db_entry = db_entry?;
893 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
894 worktree.entries.push(proto::Entry {
895 id: db_entry.id as u64,
896 is_dir: db_entry.is_dir,
897 path: db_entry.path,
898 inode: db_entry.inode as u64,
899 mtime: Some(proto::Timestamp {
900 seconds: db_entry.mtime_seconds as u64,
901 nanos: db_entry.mtime_nanos as u32,
902 }),
903 canonical_path: db_entry.canonical_path,
904 is_ignored: db_entry.is_ignored,
905 is_external: db_entry.is_external,
906 // This is only used in the summarization backlog, so if it's None,
907 // that just means we won't be able to detect when to resummarize
908 // based on total number of backlogged bytes - instead, we'd go
909 // on number of files only. That shouldn't be a huge deal in practice.
910 size: None,
911 is_fifo: db_entry.is_fifo,
912 });
913 }
914 }
915 }
916
917 // Populate repository entries.
918 let mut repositories = Vec::new();
919 {
920 let db_repository_entries = project_repository::Entity::find()
921 .filter(
922 Condition::all()
923 .add(project_repository::Column::ProjectId.eq(project.id))
924 .add(project_repository::Column::IsDeleted.eq(false)),
925 )
926 .all(tx)
927 .await?;
928 for db_repository_entry in db_repository_entries {
929 let mut repository_statuses = project_repository_statuses::Entity::find()
930 .filter(
931 Condition::all()
932 .add(project_repository_statuses::Column::ProjectId.eq(project.id))
933 .add(
934 project_repository_statuses::Column::RepositoryId
935 .eq(db_repository_entry.id),
936 )
937 .add(project_repository_statuses::Column::IsDeleted.eq(false)),
938 )
939 .stream(tx)
940 .await?;
941 let mut updated_statuses = Vec::new();
942 while let Some(status_entry) = repository_statuses.next().await {
943 let status_entry = status_entry?;
944 updated_statuses.push(db_status_to_proto(status_entry)?);
945 }
946
947 let current_merge_conflicts = db_repository_entry
948 .current_merge_conflicts
949 .as_ref()
950 .map(|conflicts| serde_json::from_str(conflicts))
951 .transpose()?
952 .unwrap_or_default();
953
954 let branch_summary = db_repository_entry
955 .branch_summary
956 .as_ref()
957 .map(|branch_summary| serde_json::from_str(branch_summary))
958 .transpose()?
959 .unwrap_or_default();
960
961 let head_commit_details = db_repository_entry
962 .head_commit_details
963 .as_ref()
964 .map(|head_commit_details| serde_json::from_str(head_commit_details))
965 .transpose()?
966 .unwrap_or_default();
967
968 let entry_ids = serde_json::from_str(&db_repository_entry.entry_ids)
969 .context("failed to deserialize repository's entry ids")?;
970
971 if let Some(worktree_id) = db_repository_entry.legacy_worktree_id {
972 if let Some(worktree) = worktrees.get_mut(&(worktree_id as u64)) {
973 worktree.legacy_repository_entries.insert(
974 db_repository_entry.id as u64,
975 proto::RepositoryEntry {
976 repository_id: db_repository_entry.id as u64,
977 updated_statuses,
978 removed_statuses: Vec::new(),
979 current_merge_conflicts,
980 branch_summary,
981 },
982 );
983 }
984 } else {
985 repositories.push(proto::UpdateRepository {
986 project_id: db_repository_entry.project_id.0 as u64,
987 id: db_repository_entry.id as u64,
988 abs_path: db_repository_entry.abs_path,
989 entry_ids,
990 updated_statuses,
991 removed_statuses: Vec::new(),
992 current_merge_conflicts,
993 branch_summary,
994 head_commit_details,
995 scan_id: db_repository_entry.scan_id as u64,
996 is_last_update: true,
997 merge_message: db_repository_entry.merge_message,
998 stash_entries: Vec::new(),
999 });
1000 }
1001 }
1002 }
1003
1004 // Populate worktree diagnostic summaries.
1005 {
1006 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
1007 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
1008 .stream(tx)
1009 .await?;
1010 while let Some(db_summary) = db_summaries.next().await {
1011 let db_summary = db_summary?;
1012 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
1013 worktree
1014 .diagnostic_summaries
1015 .push(proto::DiagnosticSummary {
1016 path: db_summary.path,
1017 language_server_id: db_summary.language_server_id as u64,
1018 error_count: db_summary.error_count as u32,
1019 warning_count: db_summary.warning_count as u32,
1020 });
1021 }
1022 }
1023 }
1024
1025 // Populate worktree settings files
1026 {
1027 let mut db_settings_files = worktree_settings_file::Entity::find()
1028 .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
1029 .stream(tx)
1030 .await?;
1031 while let Some(db_settings_file) = db_settings_files.next().await {
1032 let db_settings_file = db_settings_file?;
1033 if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
1034 worktree.settings_files.push(WorktreeSettingsFile {
1035 path: db_settings_file.path,
1036 content: db_settings_file.content,
1037 kind: db_settings_file.kind,
1038 });
1039 }
1040 }
1041 }
1042
1043 // Populate language servers.
1044 let language_servers = project
1045 .find_related(language_server::Entity)
1046 .all(tx)
1047 .await?;
1048
1049 let project = Project {
1050 id: project.id,
1051 role,
1052 collaborators: collaborators
1053 .into_iter()
1054 .map(|collaborator| ProjectCollaborator {
1055 connection_id: collaborator.connection(),
1056 user_id: collaborator.user_id,
1057 replica_id: collaborator.replica_id,
1058 is_host: collaborator.is_host,
1059 committer_name: collaborator.committer_name,
1060 committer_email: collaborator.committer_email,
1061 })
1062 .collect(),
1063 worktrees,
1064 repositories,
1065 language_servers: language_servers
1066 .into_iter()
1067 .map(|language_server| LanguageServer {
1068 server: proto::LanguageServer {
1069 id: language_server.id as u64,
1070 name: language_server.name,
1071 worktree_id: language_server.worktree_id.map(|id| id as u64),
1072 },
1073 capabilities: language_server.capabilities,
1074 })
1075 .collect(),
1076 };
1077 Ok((project, replica_id as ReplicaId))
1078 }
1079
1080 /// Removes the given connection from the specified project.
1081 pub async fn leave_project(
1082 &self,
1083 project_id: ProjectId,
1084 connection: ConnectionId,
1085 ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
1086 self.project_transaction(project_id, |tx| async move {
1087 let result = project_collaborator::Entity::delete_many()
1088 .filter(
1089 Condition::all()
1090 .add(project_collaborator::Column::ProjectId.eq(project_id))
1091 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
1092 .add(
1093 project_collaborator::Column::ConnectionServerId
1094 .eq(connection.owner_id as i32),
1095 ),
1096 )
1097 .exec(&*tx)
1098 .await?;
1099 if result.rows_affected == 0 {
1100 Err(anyhow!("not a collaborator on this project"))?;
1101 }
1102
1103 let project = project::Entity::find_by_id(project_id)
1104 .one(&*tx)
1105 .await?
1106 .context("no such project")?;
1107 let collaborators = project
1108 .find_related(project_collaborator::Entity)
1109 .all(&*tx)
1110 .await?;
1111 let connection_ids: Vec<ConnectionId> = collaborators
1112 .into_iter()
1113 .map(|collaborator| collaborator.connection())
1114 .collect();
1115
1116 follower::Entity::delete_many()
1117 .filter(
1118 Condition::any()
1119 .add(
1120 Condition::all()
1121 .add(follower::Column::ProjectId.eq(Some(project_id)))
1122 .add(
1123 follower::Column::LeaderConnectionServerId
1124 .eq(connection.owner_id),
1125 )
1126 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
1127 )
1128 .add(
1129 Condition::all()
1130 .add(follower::Column::ProjectId.eq(Some(project_id)))
1131 .add(
1132 follower::Column::FollowerConnectionServerId
1133 .eq(connection.owner_id),
1134 )
1135 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
1136 ),
1137 )
1138 .exec(&*tx)
1139 .await?;
1140
1141 let room = if let Some(room_id) = project.room_id {
1142 Some(self.get_room(room_id, &tx).await?)
1143 } else {
1144 None
1145 };
1146
1147 let left_project = LeftProject {
1148 id: project_id,
1149 should_unshare: connection == project.host_connection()?,
1150 connection_ids,
1151 };
1152 Ok((room, left_project))
1153 })
1154 .await
1155 }
1156
1157 pub async fn check_user_is_project_host(
1158 &self,
1159 project_id: ProjectId,
1160 connection_id: ConnectionId,
1161 ) -> Result<()> {
1162 self.project_transaction(project_id, |tx| async move {
1163 project::Entity::find()
1164 .filter(
1165 Condition::all()
1166 .add(project::Column::Id.eq(project_id))
1167 .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
1168 .add(
1169 project::Column::HostConnectionServerId
1170 .eq(Some(connection_id.owner_id as i32)),
1171 ),
1172 )
1173 .one(&*tx)
1174 .await?
1175 .context("failed to read project host")?;
1176
1177 Ok(())
1178 })
1179 .await
1180 .map(|guard| guard.into_inner())
1181 }
1182
1183 /// Returns the current project if the given user is authorized to access it with the specified capability.
1184 pub async fn access_project(
1185 &self,
1186 project_id: ProjectId,
1187 connection_id: ConnectionId,
1188 capability: Capability,
1189 tx: &DatabaseTransaction,
1190 ) -> Result<(project::Model, ChannelRole)> {
1191 let project = project::Entity::find_by_id(project_id)
1192 .one(tx)
1193 .await?
1194 .context("no such project")?;
1195
1196 let role_from_room = if let Some(room_id) = project.room_id {
1197 room_participant::Entity::find()
1198 .filter(room_participant::Column::RoomId.eq(room_id))
1199 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1200 .one(tx)
1201 .await?
1202 .and_then(|participant| participant.role)
1203 } else {
1204 None
1205 };
1206
1207 let role = role_from_room.unwrap_or(ChannelRole::Banned);
1208
1209 match capability {
1210 Capability::ReadWrite => {
1211 if !role.can_edit_projects() {
1212 return Err(anyhow!("not authorized to edit projects"))?;
1213 }
1214 }
1215 Capability::ReadOnly => {
1216 if !role.can_read_projects() {
1217 return Err(anyhow!("not authorized to read projects"))?;
1218 }
1219 }
1220 }
1221
1222 Ok((project, role))
1223 }
1224
1225 /// Returns the host connection for a read-only request to join a shared project.
1226 pub async fn host_for_read_only_project_request(
1227 &self,
1228 project_id: ProjectId,
1229 connection_id: ConnectionId,
1230 ) -> Result<ConnectionId> {
1231 self.project_transaction(project_id, |tx| async move {
1232 let (project, _) = self
1233 .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1234 .await?;
1235 project.host_connection()
1236 })
1237 .await
1238 .map(|guard| guard.into_inner())
1239 }
1240
1241 /// Returns the host connection for a request to join a shared project.
1242 pub async fn host_for_mutating_project_request(
1243 &self,
1244 project_id: ProjectId,
1245 connection_id: ConnectionId,
1246 ) -> Result<ConnectionId> {
1247 self.project_transaction(project_id, |tx| async move {
1248 let (project, _) = self
1249 .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1250 .await?;
1251 project.host_connection()
1252 })
1253 .await
1254 .map(|guard| guard.into_inner())
1255 }
1256
1257 pub async fn connections_for_buffer_update(
1258 &self,
1259 project_id: ProjectId,
1260 connection_id: ConnectionId,
1261 capability: Capability,
1262 ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1263 self.project_transaction(project_id, |tx| async move {
1264 // Authorize
1265 let (project, _) = self
1266 .access_project(project_id, connection_id, capability, &tx)
1267 .await?;
1268
1269 let host_connection_id = project.host_connection()?;
1270
1271 let collaborators = project_collaborator::Entity::find()
1272 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1273 .all(&*tx)
1274 .await?;
1275
1276 let guest_connection_ids = collaborators
1277 .into_iter()
1278 .filter_map(|collaborator| {
1279 if collaborator.is_host {
1280 None
1281 } else {
1282 Some(collaborator.connection())
1283 }
1284 })
1285 .collect();
1286
1287 Ok((host_connection_id, guest_connection_ids))
1288 })
1289 .await
1290 }
1291
1292 /// Returns the connection IDs in the given project.
1293 ///
1294 /// The provided `connection_id` must also be a collaborator in the project,
1295 /// otherwise an error will be returned.
1296 pub async fn project_connection_ids(
1297 &self,
1298 project_id: ProjectId,
1299 connection_id: ConnectionId,
1300 exclude_dev_server: bool,
1301 ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1302 self.project_transaction(project_id, |tx| async move {
1303 self.internal_project_connection_ids(project_id, connection_id, exclude_dev_server, &tx)
1304 .await
1305 })
1306 .await
1307 }
1308
1309 async fn internal_project_connection_ids(
1310 &self,
1311 project_id: ProjectId,
1312 connection_id: ConnectionId,
1313 exclude_dev_server: bool,
1314 tx: &DatabaseTransaction,
1315 ) -> Result<HashSet<ConnectionId>> {
1316 let project = project::Entity::find_by_id(project_id)
1317 .one(tx)
1318 .await?
1319 .context("no such project")?;
1320
1321 let mut collaborators = project_collaborator::Entity::find()
1322 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1323 .stream(tx)
1324 .await?;
1325
1326 let mut connection_ids = HashSet::default();
1327 if let Some(host_connection) = project.host_connection().log_err()
1328 && !exclude_dev_server
1329 {
1330 connection_ids.insert(host_connection);
1331 }
1332
1333 while let Some(collaborator) = collaborators.next().await {
1334 let collaborator = collaborator?;
1335 connection_ids.insert(collaborator.connection());
1336 }
1337
1338 if connection_ids.contains(&connection_id)
1339 || Some(connection_id) == project.host_connection().ok()
1340 {
1341 Ok(connection_ids)
1342 } else {
1343 Err(anyhow!(
1344 "can only send project updates to a project you're in"
1345 ))?
1346 }
1347 }
1348
1349 async fn project_guest_connection_ids(
1350 &self,
1351 project_id: ProjectId,
1352 tx: &DatabaseTransaction,
1353 ) -> Result<Vec<ConnectionId>> {
1354 let mut collaborators = project_collaborator::Entity::find()
1355 .filter(
1356 project_collaborator::Column::ProjectId
1357 .eq(project_id)
1358 .and(project_collaborator::Column::IsHost.eq(false)),
1359 )
1360 .stream(tx)
1361 .await?;
1362
1363 let mut guest_connection_ids = Vec::new();
1364 while let Some(collaborator) = collaborators.next().await {
1365 let collaborator = collaborator?;
1366 guest_connection_ids.push(collaborator.connection());
1367 }
1368 Ok(guest_connection_ids)
1369 }
1370
1371 /// Returns the [`RoomId`] for the given project.
1372 pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1373 self.transaction(|tx| async move {
1374 Ok(project::Entity::find_by_id(project_id)
1375 .one(&*tx)
1376 .await?
1377 .and_then(|project| project.room_id))
1378 })
1379 .await
1380 }
1381
1382 pub async fn check_room_participants(
1383 &self,
1384 room_id: RoomId,
1385 leader_id: ConnectionId,
1386 follower_id: ConnectionId,
1387 ) -> Result<()> {
1388 self.transaction(|tx| async move {
1389 use room_participant::Column;
1390
1391 let count = room_participant::Entity::find()
1392 .filter(
1393 Condition::all().add(Column::RoomId.eq(room_id)).add(
1394 Condition::any()
1395 .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1396 Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1397 ))
1398 .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1399 Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1400 )),
1401 ),
1402 )
1403 .count(&*tx)
1404 .await?;
1405
1406 if count < 2 {
1407 Err(anyhow!("not room participants"))?;
1408 }
1409
1410 Ok(())
1411 })
1412 .await
1413 }
1414
1415 /// Adds the given follower connection as a follower of the given leader connection.
1416 pub async fn follow(
1417 &self,
1418 room_id: RoomId,
1419 project_id: ProjectId,
1420 leader_connection: ConnectionId,
1421 follower_connection: ConnectionId,
1422 ) -> Result<TransactionGuard<proto::Room>> {
1423 self.room_transaction(room_id, |tx| async move {
1424 follower::ActiveModel {
1425 room_id: ActiveValue::set(room_id),
1426 project_id: ActiveValue::set(project_id),
1427 leader_connection_server_id: ActiveValue::set(ServerId(
1428 leader_connection.owner_id as i32,
1429 )),
1430 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1431 follower_connection_server_id: ActiveValue::set(ServerId(
1432 follower_connection.owner_id as i32,
1433 )),
1434 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1435 ..Default::default()
1436 }
1437 .insert(&*tx)
1438 .await?;
1439
1440 let room = self.get_room(room_id, &tx).await?;
1441 Ok(room)
1442 })
1443 .await
1444 }
1445
1446 /// Removes the given follower connection as a follower of the given leader connection.
1447 pub async fn unfollow(
1448 &self,
1449 room_id: RoomId,
1450 project_id: ProjectId,
1451 leader_connection: ConnectionId,
1452 follower_connection: ConnectionId,
1453 ) -> Result<TransactionGuard<proto::Room>> {
1454 self.room_transaction(room_id, |tx| async move {
1455 follower::Entity::delete_many()
1456 .filter(
1457 Condition::all()
1458 .add(follower::Column::RoomId.eq(room_id))
1459 .add(follower::Column::ProjectId.eq(project_id))
1460 .add(
1461 follower::Column::LeaderConnectionServerId
1462 .eq(leader_connection.owner_id),
1463 )
1464 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1465 .add(
1466 follower::Column::FollowerConnectionServerId
1467 .eq(follower_connection.owner_id),
1468 )
1469 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1470 )
1471 .exec(&*tx)
1472 .await?;
1473
1474 let room = self.get_room(room_id, &tx).await?;
1475 Ok(room)
1476 })
1477 .await
1478 }
1479}