1use anyhow::Context as _;
2use collections::HashSet;
3use util::ResultExt;
4
5use super::*;
6
7impl Database {
8 /// Returns the count of all projects, excluding ones marked as admin.
9 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
10 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
11 enum QueryAs {
12 Count,
13 }
14
15 self.transaction(|tx| async move {
16 Ok(project::Entity::find()
17 .select_only()
18 .column_as(project::Column::Id.count(), QueryAs::Count)
19 .inner_join(user::Entity)
20 .filter(user::Column::Admin.eq(false))
21 .into_values::<_, QueryAs>()
22 .one(&*tx)
23 .await?
24 .unwrap_or(0i64) as usize)
25 })
26 .await
27 }
28
29 /// Shares a project with the given room.
30 pub async fn share_project(
31 &self,
32 room_id: RoomId,
33 connection: ConnectionId,
34 worktrees: &[proto::WorktreeMetadata],
35 is_ssh_project: bool,
36 windows_paths: bool,
37 ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
38 self.room_transaction(room_id, |tx| async move {
39 let participant = room_participant::Entity::find()
40 .filter(
41 Condition::all()
42 .add(
43 room_participant::Column::AnsweringConnectionId
44 .eq(connection.id as i32),
45 )
46 .add(
47 room_participant::Column::AnsweringConnectionServerId
48 .eq(connection.owner_id as i32),
49 ),
50 )
51 .one(&*tx)
52 .await?
53 .context("could not find participant")?;
54 if participant.room_id != room_id {
55 return Err(anyhow!("shared project on unexpected room"))?;
56 }
57 if !participant
58 .role
59 .unwrap_or(ChannelRole::Member)
60 .can_edit_projects()
61 {
62 return Err(anyhow!("guests cannot share projects"))?;
63 }
64
65 let project = project::ActiveModel {
66 room_id: ActiveValue::set(Some(participant.room_id)),
67 host_user_id: ActiveValue::set(Some(participant.user_id)),
68 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
69 host_connection_server_id: ActiveValue::set(Some(ServerId(
70 connection.owner_id as i32,
71 ))),
72 id: ActiveValue::NotSet,
73 windows_paths: ActiveValue::set(windows_paths),
74 }
75 .insert(&*tx)
76 .await?;
77
78 if !worktrees.is_empty() {
79 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
80 worktree::ActiveModel {
81 id: ActiveValue::set(worktree.id as i64),
82 project_id: ActiveValue::set(project.id),
83 abs_path: ActiveValue::set(worktree.abs_path.clone()),
84 root_name: ActiveValue::set(worktree.root_name.clone()),
85 visible: ActiveValue::set(worktree.visible),
86 scan_id: ActiveValue::set(0),
87 completed_scan_id: ActiveValue::set(0),
88 }
89 }))
90 .exec(&*tx)
91 .await?;
92 }
93
94 let replica_id = if is_ssh_project { 1 } else { 0 };
95
96 project_collaborator::ActiveModel {
97 project_id: ActiveValue::set(project.id),
98 connection_id: ActiveValue::set(connection.id as i32),
99 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
100 user_id: ActiveValue::set(participant.user_id),
101 replica_id: ActiveValue::set(ReplicaId(replica_id)),
102 is_host: ActiveValue::set(true),
103 id: ActiveValue::NotSet,
104 committer_name: ActiveValue::Set(None),
105 committer_email: ActiveValue::Set(None),
106 }
107 .insert(&*tx)
108 .await?;
109
110 let room = self.get_room(room_id, &tx).await?;
111 Ok((project.id, room))
112 })
113 .await
114 }
115
116 pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
117 self.transaction(|tx| async move {
118 project::Entity::delete_by_id(project_id).exec(&*tx).await?;
119 Ok(())
120 })
121 .await
122 }
123
124 /// Unshares the given project.
125 pub async fn unshare_project(
126 &self,
127 project_id: ProjectId,
128 connection: ConnectionId,
129 ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
130 self.project_transaction(project_id, |tx| async move {
131 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
132 let project = project::Entity::find_by_id(project_id)
133 .one(&*tx)
134 .await?
135 .context("project not found")?;
136 let room = if let Some(room_id) = project.room_id {
137 Some(self.get_room(room_id, &tx).await?)
138 } else {
139 None
140 };
141 if project.host_connection()? == connection {
142 return Ok((true, room, guest_connection_ids));
143 }
144 Err(anyhow!("cannot unshare a project hosted by another user"))?
145 })
146 .await
147 }
148
149 /// Updates the worktrees associated with the given project.
150 pub async fn update_project(
151 &self,
152 project_id: ProjectId,
153 connection: ConnectionId,
154 worktrees: &[proto::WorktreeMetadata],
155 ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
156 self.project_transaction(project_id, |tx| async move {
157 let project = project::Entity::find_by_id(project_id)
158 .filter(
159 Condition::all()
160 .add(project::Column::HostConnectionId.eq(connection.id as i32))
161 .add(
162 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
163 ),
164 )
165 .one(&*tx)
166 .await?
167 .context("no such project")?;
168
169 self.update_project_worktrees(project.id, worktrees, &tx)
170 .await?;
171
172 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
173
174 let room = if let Some(room_id) = project.room_id {
175 Some(self.get_room(room_id, &tx).await?)
176 } else {
177 None
178 };
179
180 Ok((room, guest_connection_ids))
181 })
182 .await
183 }
184
185 pub(in crate::db) async fn update_project_worktrees(
186 &self,
187 project_id: ProjectId,
188 worktrees: &[proto::WorktreeMetadata],
189 tx: &DatabaseTransaction,
190 ) -> Result<()> {
191 if !worktrees.is_empty() {
192 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
193 id: ActiveValue::set(worktree.id as i64),
194 project_id: ActiveValue::set(project_id),
195 abs_path: ActiveValue::set(worktree.abs_path.clone()),
196 root_name: ActiveValue::set(worktree.root_name.clone()),
197 visible: ActiveValue::set(worktree.visible),
198 scan_id: ActiveValue::set(0),
199 completed_scan_id: ActiveValue::set(0),
200 }))
201 .on_conflict(
202 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
203 .update_column(worktree::Column::RootName)
204 .to_owned(),
205 )
206 .exec(tx)
207 .await?;
208 }
209
210 worktree::Entity::delete_many()
211 .filter(worktree::Column::ProjectId.eq(project_id).and(
212 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
213 ))
214 .exec(tx)
215 .await?;
216
217 Ok(())
218 }
219
220 pub async fn update_worktree(
221 &self,
222 update: &proto::UpdateWorktree,
223 connection: ConnectionId,
224 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
225 if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
226 || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
227 {
228 return Err(anyhow!(
229 "invalid worktree update. removed entries: {}, updated entries: {}",
230 update.removed_entries.len(),
231 update.updated_entries.len()
232 ))?;
233 }
234
235 let project_id = ProjectId::from_proto(update.project_id);
236 let worktree_id = update.worktree_id as i64;
237 self.project_transaction(project_id, |tx| async move {
238 // Ensure the update comes from the host.
239 let _project = project::Entity::find_by_id(project_id)
240 .filter(
241 Condition::all()
242 .add(project::Column::HostConnectionId.eq(connection.id as i32))
243 .add(
244 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
245 ),
246 )
247 .one(&*tx)
248 .await?
249 .with_context(|| format!("no such project: {project_id}"))?;
250
251 // Update metadata.
252 worktree::Entity::update(worktree::ActiveModel {
253 id: ActiveValue::set(worktree_id),
254 project_id: ActiveValue::set(project_id),
255 root_name: ActiveValue::set(update.root_name.clone()),
256 scan_id: ActiveValue::set(update.scan_id as i64),
257 completed_scan_id: if update.is_last_update {
258 ActiveValue::set(update.scan_id as i64)
259 } else {
260 ActiveValue::default()
261 },
262 abs_path: ActiveValue::set(update.abs_path.clone()),
263 ..Default::default()
264 })
265 .exec(&*tx)
266 .await?;
267
268 if !update.updated_entries.is_empty() {
269 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
270 let mtime = entry.mtime.clone().unwrap_or_default();
271 worktree_entry::ActiveModel {
272 project_id: ActiveValue::set(project_id),
273 worktree_id: ActiveValue::set(worktree_id),
274 id: ActiveValue::set(entry.id as i64),
275 is_dir: ActiveValue::set(entry.is_dir),
276 path: ActiveValue::set(entry.path.clone()),
277 inode: ActiveValue::set(entry.inode as i64),
278 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
279 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
280 canonical_path: ActiveValue::set(entry.canonical_path.clone()),
281 is_ignored: ActiveValue::set(entry.is_ignored),
282 git_status: ActiveValue::set(None),
283 is_external: ActiveValue::set(entry.is_external),
284 is_deleted: ActiveValue::set(false),
285 scan_id: ActiveValue::set(update.scan_id as i64),
286 is_fifo: ActiveValue::set(entry.is_fifo),
287 }
288 }))
289 .on_conflict(
290 OnConflict::columns([
291 worktree_entry::Column::ProjectId,
292 worktree_entry::Column::WorktreeId,
293 worktree_entry::Column::Id,
294 ])
295 .update_columns([
296 worktree_entry::Column::IsDir,
297 worktree_entry::Column::Path,
298 worktree_entry::Column::Inode,
299 worktree_entry::Column::MtimeSeconds,
300 worktree_entry::Column::MtimeNanos,
301 worktree_entry::Column::CanonicalPath,
302 worktree_entry::Column::IsIgnored,
303 worktree_entry::Column::ScanId,
304 ])
305 .to_owned(),
306 )
307 .exec(&*tx)
308 .await?;
309 }
310
311 if !update.removed_entries.is_empty() {
312 worktree_entry::Entity::update_many()
313 .filter(
314 worktree_entry::Column::ProjectId
315 .eq(project_id)
316 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
317 .and(
318 worktree_entry::Column::Id
319 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
320 ),
321 )
322 .set(worktree_entry::ActiveModel {
323 is_deleted: ActiveValue::Set(true),
324 scan_id: ActiveValue::Set(update.scan_id as i64),
325 ..Default::default()
326 })
327 .exec(&*tx)
328 .await?;
329 }
330
331 // Backward-compatibility for old Zed clients.
332 //
333 // Remove this block when Zed 1.80 stable has been out for a week.
334 {
335 if !update.updated_repositories.is_empty() {
336 project_repository::Entity::insert_many(
337 update.updated_repositories.iter().map(|repository| {
338 project_repository::ActiveModel {
339 project_id: ActiveValue::set(project_id),
340 legacy_worktree_id: ActiveValue::set(Some(worktree_id)),
341 id: ActiveValue::set(repository.repository_id as i64),
342 scan_id: ActiveValue::set(update.scan_id as i64),
343 is_deleted: ActiveValue::set(false),
344 branch_summary: ActiveValue::Set(
345 repository
346 .branch_summary
347 .as_ref()
348 .map(|summary| serde_json::to_string(summary).unwrap()),
349 ),
350 current_merge_conflicts: ActiveValue::Set(Some(
351 serde_json::to_string(&repository.current_merge_conflicts)
352 .unwrap(),
353 )),
354 // Old clients do not use abs path, entry ids, head_commit_details, or merge_message.
355 abs_path: ActiveValue::set(String::new()),
356 entry_ids: ActiveValue::set("[]".into()),
357 head_commit_details: ActiveValue::set(None),
358 merge_message: ActiveValue::set(None),
359 }
360 }),
361 )
362 .on_conflict(
363 OnConflict::columns([
364 project_repository::Column::ProjectId,
365 project_repository::Column::Id,
366 ])
367 .update_columns([
368 project_repository::Column::ScanId,
369 project_repository::Column::BranchSummary,
370 project_repository::Column::CurrentMergeConflicts,
371 ])
372 .to_owned(),
373 )
374 .exec(&*tx)
375 .await?;
376
377 let has_any_statuses = update
378 .updated_repositories
379 .iter()
380 .any(|repository| !repository.updated_statuses.is_empty());
381
382 if has_any_statuses {
383 project_repository_statuses::Entity::insert_many(
384 update.updated_repositories.iter().flat_map(
385 |repository: &proto::RepositoryEntry| {
386 repository.updated_statuses.iter().map(|status_entry| {
387 let (repo_path, status_kind, first_status, second_status) =
388 proto_status_to_db(status_entry.clone());
389 project_repository_statuses::ActiveModel {
390 project_id: ActiveValue::set(project_id),
391 repository_id: ActiveValue::set(
392 repository.repository_id as i64,
393 ),
394 scan_id: ActiveValue::set(update.scan_id as i64),
395 is_deleted: ActiveValue::set(false),
396 repo_path: ActiveValue::set(repo_path),
397 status: ActiveValue::set(0),
398 status_kind: ActiveValue::set(status_kind),
399 first_status: ActiveValue::set(first_status),
400 second_status: ActiveValue::set(second_status),
401 }
402 })
403 },
404 ),
405 )
406 .on_conflict(
407 OnConflict::columns([
408 project_repository_statuses::Column::ProjectId,
409 project_repository_statuses::Column::RepositoryId,
410 project_repository_statuses::Column::RepoPath,
411 ])
412 .update_columns([
413 project_repository_statuses::Column::ScanId,
414 project_repository_statuses::Column::StatusKind,
415 project_repository_statuses::Column::FirstStatus,
416 project_repository_statuses::Column::SecondStatus,
417 ])
418 .to_owned(),
419 )
420 .exec(&*tx)
421 .await?;
422 }
423
424 for repo in &update.updated_repositories {
425 if !repo.removed_statuses.is_empty() {
426 project_repository_statuses::Entity::update_many()
427 .filter(
428 project_repository_statuses::Column::ProjectId
429 .eq(project_id)
430 .and(
431 project_repository_statuses::Column::RepositoryId
432 .eq(repo.repository_id),
433 )
434 .and(
435 project_repository_statuses::Column::RepoPath
436 .is_in(repo.removed_statuses.iter()),
437 ),
438 )
439 .set(project_repository_statuses::ActiveModel {
440 is_deleted: ActiveValue::Set(true),
441 scan_id: ActiveValue::Set(update.scan_id as i64),
442 ..Default::default()
443 })
444 .exec(&*tx)
445 .await?;
446 }
447 }
448 }
449
450 if !update.removed_repositories.is_empty() {
451 project_repository::Entity::update_many()
452 .filter(
453 project_repository::Column::ProjectId
454 .eq(project_id)
455 .and(project_repository::Column::LegacyWorktreeId.eq(worktree_id))
456 .and(project_repository::Column::Id.is_in(
457 update.removed_repositories.iter().map(|id| *id as i64),
458 )),
459 )
460 .set(project_repository::ActiveModel {
461 is_deleted: ActiveValue::Set(true),
462 scan_id: ActiveValue::Set(update.scan_id as i64),
463 ..Default::default()
464 })
465 .exec(&*tx)
466 .await?;
467 }
468 }
469
470 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
471 Ok(connection_ids)
472 })
473 .await
474 }
475
476 pub async fn update_repository(
477 &self,
478 update: &proto::UpdateRepository,
479 _connection: ConnectionId,
480 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
481 let project_id = ProjectId::from_proto(update.project_id);
482 let repository_id = update.id as i64;
483 self.project_transaction(project_id, |tx| async move {
484 project_repository::Entity::insert(project_repository::ActiveModel {
485 project_id: ActiveValue::set(project_id),
486 id: ActiveValue::set(repository_id),
487 legacy_worktree_id: ActiveValue::set(None),
488 abs_path: ActiveValue::set(update.abs_path.clone()),
489 entry_ids: ActiveValue::Set(serde_json::to_string(&update.entry_ids).unwrap()),
490 scan_id: ActiveValue::set(update.scan_id as i64),
491 is_deleted: ActiveValue::set(false),
492 branch_summary: ActiveValue::Set(
493 update
494 .branch_summary
495 .as_ref()
496 .map(|summary| serde_json::to_string(summary).unwrap()),
497 ),
498 head_commit_details: ActiveValue::Set(
499 update
500 .head_commit_details
501 .as_ref()
502 .map(|details| serde_json::to_string(details).unwrap()),
503 ),
504 current_merge_conflicts: ActiveValue::Set(Some(
505 serde_json::to_string(&update.current_merge_conflicts).unwrap(),
506 )),
507 merge_message: ActiveValue::set(update.merge_message.clone()),
508 })
509 .on_conflict(
510 OnConflict::columns([
511 project_repository::Column::ProjectId,
512 project_repository::Column::Id,
513 ])
514 .update_columns([
515 project_repository::Column::ScanId,
516 project_repository::Column::BranchSummary,
517 project_repository::Column::EntryIds,
518 project_repository::Column::AbsPath,
519 project_repository::Column::CurrentMergeConflicts,
520 project_repository::Column::HeadCommitDetails,
521 project_repository::Column::MergeMessage,
522 ])
523 .to_owned(),
524 )
525 .exec(&*tx)
526 .await?;
527
528 let has_any_statuses = !update.updated_statuses.is_empty();
529
530 if has_any_statuses {
531 project_repository_statuses::Entity::insert_many(
532 update.updated_statuses.iter().map(|status_entry| {
533 let (repo_path, status_kind, first_status, second_status) =
534 proto_status_to_db(status_entry.clone());
535 project_repository_statuses::ActiveModel {
536 project_id: ActiveValue::set(project_id),
537 repository_id: ActiveValue::set(repository_id),
538 scan_id: ActiveValue::set(update.scan_id as i64),
539 is_deleted: ActiveValue::set(false),
540 repo_path: ActiveValue::set(repo_path),
541 status: ActiveValue::set(0),
542 status_kind: ActiveValue::set(status_kind),
543 first_status: ActiveValue::set(first_status),
544 second_status: ActiveValue::set(second_status),
545 }
546 }),
547 )
548 .on_conflict(
549 OnConflict::columns([
550 project_repository_statuses::Column::ProjectId,
551 project_repository_statuses::Column::RepositoryId,
552 project_repository_statuses::Column::RepoPath,
553 ])
554 .update_columns([
555 project_repository_statuses::Column::ScanId,
556 project_repository_statuses::Column::StatusKind,
557 project_repository_statuses::Column::FirstStatus,
558 project_repository_statuses::Column::SecondStatus,
559 ])
560 .to_owned(),
561 )
562 .exec(&*tx)
563 .await?;
564 }
565
566 let has_any_removed_statuses = !update.removed_statuses.is_empty();
567
568 if has_any_removed_statuses {
569 project_repository_statuses::Entity::update_many()
570 .filter(
571 project_repository_statuses::Column::ProjectId
572 .eq(project_id)
573 .and(
574 project_repository_statuses::Column::RepositoryId.eq(repository_id),
575 )
576 .and(
577 project_repository_statuses::Column::RepoPath
578 .is_in(update.removed_statuses.iter()),
579 ),
580 )
581 .set(project_repository_statuses::ActiveModel {
582 is_deleted: ActiveValue::Set(true),
583 scan_id: ActiveValue::Set(update.scan_id as i64),
584 ..Default::default()
585 })
586 .exec(&*tx)
587 .await?;
588 }
589
590 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
591 Ok(connection_ids)
592 })
593 .await
594 }
595
596 pub async fn remove_repository(
597 &self,
598 remove: &proto::RemoveRepository,
599 _connection: ConnectionId,
600 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
601 let project_id = ProjectId::from_proto(remove.project_id);
602 let repository_id = remove.id as i64;
603 self.project_transaction(project_id, |tx| async move {
604 project_repository::Entity::update_many()
605 .filter(
606 project_repository::Column::ProjectId
607 .eq(project_id)
608 .and(project_repository::Column::Id.eq(repository_id)),
609 )
610 .set(project_repository::ActiveModel {
611 is_deleted: ActiveValue::Set(true),
612 // scan_id: ActiveValue::Set(update.scan_id as i64),
613 ..Default::default()
614 })
615 .exec(&*tx)
616 .await?;
617
618 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
619 Ok(connection_ids)
620 })
621 .await
622 }
623
624 /// Updates the diagnostic summary for the given connection.
625 pub async fn update_diagnostic_summary(
626 &self,
627 update: &proto::UpdateDiagnosticSummary,
628 connection: ConnectionId,
629 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
630 let project_id = ProjectId::from_proto(update.project_id);
631 let worktree_id = update.worktree_id as i64;
632 self.project_transaction(project_id, |tx| async move {
633 let summary = update.summary.as_ref().context("invalid summary")?;
634
635 // Ensure the update comes from the host.
636 let project = project::Entity::find_by_id(project_id)
637 .one(&*tx)
638 .await?
639 .context("no such project")?;
640 if project.host_connection()? != connection {
641 return Err(anyhow!("can't update a project hosted by someone else"))?;
642 }
643
644 // Update summary.
645 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
646 project_id: ActiveValue::set(project_id),
647 worktree_id: ActiveValue::set(worktree_id),
648 path: ActiveValue::set(summary.path.clone()),
649 language_server_id: ActiveValue::set(summary.language_server_id as i64),
650 error_count: ActiveValue::set(summary.error_count as i32),
651 warning_count: ActiveValue::set(summary.warning_count as i32),
652 })
653 .on_conflict(
654 OnConflict::columns([
655 worktree_diagnostic_summary::Column::ProjectId,
656 worktree_diagnostic_summary::Column::WorktreeId,
657 worktree_diagnostic_summary::Column::Path,
658 ])
659 .update_columns([
660 worktree_diagnostic_summary::Column::LanguageServerId,
661 worktree_diagnostic_summary::Column::ErrorCount,
662 worktree_diagnostic_summary::Column::WarningCount,
663 ])
664 .to_owned(),
665 )
666 .exec(&*tx)
667 .await?;
668
669 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
670 Ok(connection_ids)
671 })
672 .await
673 }
674
675 /// Starts the language server for the given connection.
676 pub async fn start_language_server(
677 &self,
678 update: &proto::StartLanguageServer,
679 connection: ConnectionId,
680 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
681 let project_id = ProjectId::from_proto(update.project_id);
682 self.project_transaction(project_id, |tx| async move {
683 let server = update.server.as_ref().context("invalid language server")?;
684
685 // Ensure the update comes from the host.
686 let project = project::Entity::find_by_id(project_id)
687 .one(&*tx)
688 .await?
689 .context("no such project")?;
690 if project.host_connection()? != connection {
691 return Err(anyhow!("can't update a project hosted by someone else"))?;
692 }
693
694 // Add the newly-started language server.
695 language_server::Entity::insert(language_server::ActiveModel {
696 project_id: ActiveValue::set(project_id),
697 id: ActiveValue::set(server.id as i64),
698 name: ActiveValue::set(server.name.clone()),
699 worktree_id: ActiveValue::set(server.worktree_id.map(|id| id as i64)),
700 capabilities: ActiveValue::set(update.capabilities.clone()),
701 })
702 .on_conflict(
703 OnConflict::columns([
704 language_server::Column::ProjectId,
705 language_server::Column::Id,
706 ])
707 .update_columns([
708 language_server::Column::Name,
709 language_server::Column::Capabilities,
710 language_server::Column::WorktreeId,
711 ])
712 .to_owned(),
713 )
714 .exec(&*tx)
715 .await?;
716
717 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
718 Ok(connection_ids)
719 })
720 .await
721 }
722
723 /// Updates the worktree settings for the given connection.
724 pub async fn update_worktree_settings(
725 &self,
726 update: &proto::UpdateWorktreeSettings,
727 connection: ConnectionId,
728 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
729 let project_id = ProjectId::from_proto(update.project_id);
730 let kind = match update.kind {
731 Some(kind) => proto::LocalSettingsKind::from_i32(kind)
732 .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
733 None => proto::LocalSettingsKind::Settings,
734 };
735 let kind = LocalSettingsKind::from_proto(kind);
736 self.project_transaction(project_id, |tx| async move {
737 // Ensure the update comes from the host.
738 let project = project::Entity::find_by_id(project_id)
739 .one(&*tx)
740 .await?
741 .context("no such project")?;
742 if project.host_connection()? != connection {
743 return Err(anyhow!("can't update a project hosted by someone else"))?;
744 }
745
746 if let Some(content) = &update.content {
747 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
748 project_id: ActiveValue::Set(project_id),
749 worktree_id: ActiveValue::Set(update.worktree_id as i64),
750 path: ActiveValue::Set(update.path.clone()),
751 content: ActiveValue::Set(content.clone()),
752 kind: ActiveValue::Set(kind),
753 })
754 .on_conflict(
755 OnConflict::columns([
756 worktree_settings_file::Column::ProjectId,
757 worktree_settings_file::Column::WorktreeId,
758 worktree_settings_file::Column::Path,
759 ])
760 .update_column(worktree_settings_file::Column::Content)
761 .to_owned(),
762 )
763 .exec(&*tx)
764 .await?;
765 } else {
766 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
767 project_id: ActiveValue::Set(project_id),
768 worktree_id: ActiveValue::Set(update.worktree_id as i64),
769 path: ActiveValue::Set(update.path.clone()),
770 ..Default::default()
771 })
772 .exec(&*tx)
773 .await?;
774 }
775
776 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
777 Ok(connection_ids)
778 })
779 .await
780 }
781
782 pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
783 self.transaction(|tx| async move {
784 Ok(project::Entity::find_by_id(id)
785 .one(&*tx)
786 .await?
787 .context("no such project")?)
788 })
789 .await
790 }
791
792 /// Adds the given connection to the specified project
793 /// in the current room.
794 pub async fn join_project(
795 &self,
796 project_id: ProjectId,
797 connection: ConnectionId,
798 user_id: UserId,
799 committer_name: Option<String>,
800 committer_email: Option<String>,
801 ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
802 self.project_transaction(project_id, move |tx| {
803 let committer_name = committer_name.clone();
804 let committer_email = committer_email.clone();
805 async move {
806 let (project, role) = self
807 .access_project(project_id, connection, Capability::ReadOnly, &tx)
808 .await?;
809 self.join_project_internal(
810 project,
811 user_id,
812 committer_name,
813 committer_email,
814 connection,
815 role,
816 &tx,
817 )
818 .await
819 }
820 })
821 .await
822 }
823
824 async fn join_project_internal(
825 &self,
826 project: project::Model,
827 user_id: UserId,
828 committer_name: Option<String>,
829 committer_email: Option<String>,
830 connection: ConnectionId,
831 role: ChannelRole,
832 tx: &DatabaseTransaction,
833 ) -> Result<(Project, ReplicaId)> {
834 let mut collaborators = project
835 .find_related(project_collaborator::Entity)
836 .all(tx)
837 .await?;
838 let replica_ids = collaborators
839 .iter()
840 .map(|c| c.replica_id)
841 .collect::<HashSet<_>>();
842 let mut replica_id = ReplicaId(1);
843 while replica_ids.contains(&replica_id) {
844 replica_id.0 += 1;
845 }
846 let new_collaborator = project_collaborator::ActiveModel {
847 project_id: ActiveValue::set(project.id),
848 connection_id: ActiveValue::set(connection.id as i32),
849 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
850 user_id: ActiveValue::set(user_id),
851 replica_id: ActiveValue::set(replica_id),
852 is_host: ActiveValue::set(false),
853 id: ActiveValue::NotSet,
854 committer_name: ActiveValue::set(committer_name),
855 committer_email: ActiveValue::set(committer_email),
856 }
857 .insert(tx)
858 .await?;
859 collaborators.push(new_collaborator);
860
861 let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
862 let mut worktrees = db_worktrees
863 .into_iter()
864 .map(|db_worktree| {
865 (
866 db_worktree.id as u64,
867 Worktree {
868 id: db_worktree.id as u64,
869 abs_path: db_worktree.abs_path,
870 root_name: db_worktree.root_name,
871 visible: db_worktree.visible,
872 entries: Default::default(),
873 diagnostic_summaries: Default::default(),
874 settings_files: Default::default(),
875 scan_id: db_worktree.scan_id as u64,
876 completed_scan_id: db_worktree.completed_scan_id as u64,
877 legacy_repository_entries: Default::default(),
878 },
879 )
880 })
881 .collect::<BTreeMap<_, _>>();
882
883 // Populate worktree entries.
884 {
885 let mut db_entries = worktree_entry::Entity::find()
886 .filter(
887 Condition::all()
888 .add(worktree_entry::Column::ProjectId.eq(project.id))
889 .add(worktree_entry::Column::IsDeleted.eq(false)),
890 )
891 .stream(tx)
892 .await?;
893 while let Some(db_entry) = db_entries.next().await {
894 let db_entry = db_entry?;
895 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
896 worktree.entries.push(proto::Entry {
897 id: db_entry.id as u64,
898 is_dir: db_entry.is_dir,
899 path: db_entry.path,
900 inode: db_entry.inode as u64,
901 mtime: Some(proto::Timestamp {
902 seconds: db_entry.mtime_seconds as u64,
903 nanos: db_entry.mtime_nanos as u32,
904 }),
905 canonical_path: db_entry.canonical_path,
906 is_ignored: db_entry.is_ignored,
907 is_external: db_entry.is_external,
908 // This is only used in the summarization backlog, so if it's None,
909 // that just means we won't be able to detect when to resummarize
910 // based on total number of backlogged bytes - instead, we'd go
911 // on number of files only. That shouldn't be a huge deal in practice.
912 size: None,
913 is_fifo: db_entry.is_fifo,
914 });
915 }
916 }
917 }
918
919 // Populate repository entries.
920 let mut repositories = Vec::new();
921 {
922 let db_repository_entries = project_repository::Entity::find()
923 .filter(
924 Condition::all()
925 .add(project_repository::Column::ProjectId.eq(project.id))
926 .add(project_repository::Column::IsDeleted.eq(false)),
927 )
928 .all(tx)
929 .await?;
930 for db_repository_entry in db_repository_entries {
931 let mut repository_statuses = project_repository_statuses::Entity::find()
932 .filter(
933 Condition::all()
934 .add(project_repository_statuses::Column::ProjectId.eq(project.id))
935 .add(
936 project_repository_statuses::Column::RepositoryId
937 .eq(db_repository_entry.id),
938 )
939 .add(project_repository_statuses::Column::IsDeleted.eq(false)),
940 )
941 .stream(tx)
942 .await?;
943 let mut updated_statuses = Vec::new();
944 while let Some(status_entry) = repository_statuses.next().await {
945 let status_entry = status_entry?;
946 updated_statuses.push(db_status_to_proto(status_entry)?);
947 }
948
949 let current_merge_conflicts = db_repository_entry
950 .current_merge_conflicts
951 .as_ref()
952 .map(|conflicts| serde_json::from_str(conflicts))
953 .transpose()?
954 .unwrap_or_default();
955
956 let branch_summary = db_repository_entry
957 .branch_summary
958 .as_ref()
959 .map(|branch_summary| serde_json::from_str(branch_summary))
960 .transpose()?
961 .unwrap_or_default();
962
963 let head_commit_details = db_repository_entry
964 .head_commit_details
965 .as_ref()
966 .map(|head_commit_details| serde_json::from_str(head_commit_details))
967 .transpose()?
968 .unwrap_or_default();
969
970 let entry_ids = serde_json::from_str(&db_repository_entry.entry_ids)
971 .context("failed to deserialize repository's entry ids")?;
972
973 if let Some(worktree_id) = db_repository_entry.legacy_worktree_id {
974 if let Some(worktree) = worktrees.get_mut(&(worktree_id as u64)) {
975 worktree.legacy_repository_entries.insert(
976 db_repository_entry.id as u64,
977 proto::RepositoryEntry {
978 repository_id: db_repository_entry.id as u64,
979 updated_statuses,
980 removed_statuses: Vec::new(),
981 current_merge_conflicts,
982 branch_summary,
983 },
984 );
985 }
986 } else {
987 repositories.push(proto::UpdateRepository {
988 project_id: db_repository_entry.project_id.0 as u64,
989 id: db_repository_entry.id as u64,
990 abs_path: db_repository_entry.abs_path,
991 entry_ids,
992 updated_statuses,
993 removed_statuses: Vec::new(),
994 current_merge_conflicts,
995 branch_summary,
996 head_commit_details,
997 scan_id: db_repository_entry.scan_id as u64,
998 is_last_update: true,
999 merge_message: db_repository_entry.merge_message,
1000 stash_entries: Vec::new(),
1001 });
1002 }
1003 }
1004 }
1005
1006 // Populate worktree diagnostic summaries.
1007 {
1008 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
1009 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
1010 .stream(tx)
1011 .await?;
1012 while let Some(db_summary) = db_summaries.next().await {
1013 let db_summary = db_summary?;
1014 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
1015 worktree
1016 .diagnostic_summaries
1017 .push(proto::DiagnosticSummary {
1018 path: db_summary.path,
1019 language_server_id: db_summary.language_server_id as u64,
1020 error_count: db_summary.error_count as u32,
1021 warning_count: db_summary.warning_count as u32,
1022 });
1023 }
1024 }
1025 }
1026
1027 // Populate worktree settings files
1028 {
1029 let mut db_settings_files = worktree_settings_file::Entity::find()
1030 .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
1031 .stream(tx)
1032 .await?;
1033 while let Some(db_settings_file) = db_settings_files.next().await {
1034 let db_settings_file = db_settings_file?;
1035 if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
1036 worktree.settings_files.push(WorktreeSettingsFile {
1037 path: db_settings_file.path,
1038 content: db_settings_file.content,
1039 kind: db_settings_file.kind,
1040 });
1041 }
1042 }
1043 }
1044
1045 // Populate language servers.
1046 let language_servers = project
1047 .find_related(language_server::Entity)
1048 .all(tx)
1049 .await?;
1050
1051 let path_style = if project.windows_paths {
1052 PathStyle::Windows
1053 } else {
1054 PathStyle::Posix
1055 };
1056
1057 let project = Project {
1058 id: project.id,
1059 role,
1060 collaborators: collaborators
1061 .into_iter()
1062 .map(|collaborator| ProjectCollaborator {
1063 connection_id: collaborator.connection(),
1064 user_id: collaborator.user_id,
1065 replica_id: collaborator.replica_id,
1066 is_host: collaborator.is_host,
1067 committer_name: collaborator.committer_name,
1068 committer_email: collaborator.committer_email,
1069 })
1070 .collect(),
1071 worktrees,
1072 repositories,
1073 language_servers: language_servers
1074 .into_iter()
1075 .map(|language_server| LanguageServer {
1076 server: proto::LanguageServer {
1077 id: language_server.id as u64,
1078 name: language_server.name,
1079 worktree_id: language_server.worktree_id.map(|id| id as u64),
1080 },
1081 capabilities: language_server.capabilities,
1082 })
1083 .collect(),
1084 path_style,
1085 };
1086 Ok((project, replica_id as ReplicaId))
1087 }
1088
1089 /// Removes the given connection from the specified project.
1090 pub async fn leave_project(
1091 &self,
1092 project_id: ProjectId,
1093 connection: ConnectionId,
1094 ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
1095 self.project_transaction(project_id, |tx| async move {
1096 let result = project_collaborator::Entity::delete_many()
1097 .filter(
1098 Condition::all()
1099 .add(project_collaborator::Column::ProjectId.eq(project_id))
1100 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
1101 .add(
1102 project_collaborator::Column::ConnectionServerId
1103 .eq(connection.owner_id as i32),
1104 ),
1105 )
1106 .exec(&*tx)
1107 .await?;
1108 if result.rows_affected == 0 {
1109 Err(anyhow!("not a collaborator on this project"))?;
1110 }
1111
1112 let project = project::Entity::find_by_id(project_id)
1113 .one(&*tx)
1114 .await?
1115 .context("no such project")?;
1116 let collaborators = project
1117 .find_related(project_collaborator::Entity)
1118 .all(&*tx)
1119 .await?;
1120 let connection_ids: Vec<ConnectionId> = collaborators
1121 .into_iter()
1122 .map(|collaborator| collaborator.connection())
1123 .collect();
1124
1125 follower::Entity::delete_many()
1126 .filter(
1127 Condition::any()
1128 .add(
1129 Condition::all()
1130 .add(follower::Column::ProjectId.eq(Some(project_id)))
1131 .add(
1132 follower::Column::LeaderConnectionServerId
1133 .eq(connection.owner_id),
1134 )
1135 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
1136 )
1137 .add(
1138 Condition::all()
1139 .add(follower::Column::ProjectId.eq(Some(project_id)))
1140 .add(
1141 follower::Column::FollowerConnectionServerId
1142 .eq(connection.owner_id),
1143 )
1144 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
1145 ),
1146 )
1147 .exec(&*tx)
1148 .await?;
1149
1150 let room = if let Some(room_id) = project.room_id {
1151 Some(self.get_room(room_id, &tx).await?)
1152 } else {
1153 None
1154 };
1155
1156 let left_project = LeftProject {
1157 id: project_id,
1158 should_unshare: connection == project.host_connection()?,
1159 connection_ids,
1160 };
1161 Ok((room, left_project))
1162 })
1163 .await
1164 }
1165
1166 pub async fn check_user_is_project_host(
1167 &self,
1168 project_id: ProjectId,
1169 connection_id: ConnectionId,
1170 ) -> Result<()> {
1171 self.project_transaction(project_id, |tx| async move {
1172 project::Entity::find()
1173 .filter(
1174 Condition::all()
1175 .add(project::Column::Id.eq(project_id))
1176 .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
1177 .add(
1178 project::Column::HostConnectionServerId
1179 .eq(Some(connection_id.owner_id as i32)),
1180 ),
1181 )
1182 .one(&*tx)
1183 .await?
1184 .context("failed to read project host")?;
1185
1186 Ok(())
1187 })
1188 .await
1189 .map(|guard| guard.into_inner())
1190 }
1191
1192 /// Returns the current project if the given user is authorized to access it with the specified capability.
1193 pub async fn access_project(
1194 &self,
1195 project_id: ProjectId,
1196 connection_id: ConnectionId,
1197 capability: Capability,
1198 tx: &DatabaseTransaction,
1199 ) -> Result<(project::Model, ChannelRole)> {
1200 let project = project::Entity::find_by_id(project_id)
1201 .one(tx)
1202 .await?
1203 .context("no such project")?;
1204
1205 let role_from_room = if let Some(room_id) = project.room_id {
1206 room_participant::Entity::find()
1207 .filter(room_participant::Column::RoomId.eq(room_id))
1208 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1209 .one(tx)
1210 .await?
1211 .and_then(|participant| participant.role)
1212 } else {
1213 None
1214 };
1215
1216 let role = role_from_room.unwrap_or(ChannelRole::Banned);
1217
1218 match capability {
1219 Capability::ReadWrite => {
1220 if !role.can_edit_projects() {
1221 return Err(anyhow!("not authorized to edit projects"))?;
1222 }
1223 }
1224 Capability::ReadOnly => {
1225 if !role.can_read_projects() {
1226 return Err(anyhow!("not authorized to read projects"))?;
1227 }
1228 }
1229 }
1230
1231 Ok((project, role))
1232 }
1233
1234 /// Returns the host connection for a read-only request to join a shared project.
1235 pub async fn host_for_read_only_project_request(
1236 &self,
1237 project_id: ProjectId,
1238 connection_id: ConnectionId,
1239 ) -> Result<ConnectionId> {
1240 self.project_transaction(project_id, |tx| async move {
1241 let (project, _) = self
1242 .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1243 .await?;
1244 project.host_connection()
1245 })
1246 .await
1247 .map(|guard| guard.into_inner())
1248 }
1249
1250 /// Returns the host connection for a request to join a shared project.
1251 pub async fn host_for_mutating_project_request(
1252 &self,
1253 project_id: ProjectId,
1254 connection_id: ConnectionId,
1255 ) -> Result<ConnectionId> {
1256 self.project_transaction(project_id, |tx| async move {
1257 let (project, _) = self
1258 .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1259 .await?;
1260 project.host_connection()
1261 })
1262 .await
1263 .map(|guard| guard.into_inner())
1264 }
1265
1266 pub async fn connections_for_buffer_update(
1267 &self,
1268 project_id: ProjectId,
1269 connection_id: ConnectionId,
1270 capability: Capability,
1271 ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1272 self.project_transaction(project_id, |tx| async move {
1273 // Authorize
1274 let (project, _) = self
1275 .access_project(project_id, connection_id, capability, &tx)
1276 .await?;
1277
1278 let host_connection_id = project.host_connection()?;
1279
1280 let collaborators = project_collaborator::Entity::find()
1281 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1282 .all(&*tx)
1283 .await?;
1284
1285 let guest_connection_ids = collaborators
1286 .into_iter()
1287 .filter_map(|collaborator| {
1288 if collaborator.is_host {
1289 None
1290 } else {
1291 Some(collaborator.connection())
1292 }
1293 })
1294 .collect();
1295
1296 Ok((host_connection_id, guest_connection_ids))
1297 })
1298 .await
1299 }
1300
1301 /// Returns the connection IDs in the given project.
1302 ///
1303 /// The provided `connection_id` must also be a collaborator in the project,
1304 /// otherwise an error will be returned.
1305 pub async fn project_connection_ids(
1306 &self,
1307 project_id: ProjectId,
1308 connection_id: ConnectionId,
1309 exclude_dev_server: bool,
1310 ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1311 self.project_transaction(project_id, |tx| async move {
1312 self.internal_project_connection_ids(project_id, connection_id, exclude_dev_server, &tx)
1313 .await
1314 })
1315 .await
1316 }
1317
1318 async fn internal_project_connection_ids(
1319 &self,
1320 project_id: ProjectId,
1321 connection_id: ConnectionId,
1322 exclude_dev_server: bool,
1323 tx: &DatabaseTransaction,
1324 ) -> Result<HashSet<ConnectionId>> {
1325 let project = project::Entity::find_by_id(project_id)
1326 .one(tx)
1327 .await?
1328 .context("no such project")?;
1329
1330 let mut collaborators = project_collaborator::Entity::find()
1331 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1332 .stream(tx)
1333 .await?;
1334
1335 let mut connection_ids = HashSet::default();
1336 if let Some(host_connection) = project.host_connection().log_err()
1337 && !exclude_dev_server
1338 {
1339 connection_ids.insert(host_connection);
1340 }
1341
1342 while let Some(collaborator) = collaborators.next().await {
1343 let collaborator = collaborator?;
1344 connection_ids.insert(collaborator.connection());
1345 }
1346
1347 if connection_ids.contains(&connection_id)
1348 || Some(connection_id) == project.host_connection().ok()
1349 {
1350 Ok(connection_ids)
1351 } else {
1352 Err(anyhow!(
1353 "can only send project updates to a project you're in"
1354 ))?
1355 }
1356 }
1357
1358 async fn project_guest_connection_ids(
1359 &self,
1360 project_id: ProjectId,
1361 tx: &DatabaseTransaction,
1362 ) -> Result<Vec<ConnectionId>> {
1363 let mut collaborators = project_collaborator::Entity::find()
1364 .filter(
1365 project_collaborator::Column::ProjectId
1366 .eq(project_id)
1367 .and(project_collaborator::Column::IsHost.eq(false)),
1368 )
1369 .stream(tx)
1370 .await?;
1371
1372 let mut guest_connection_ids = Vec::new();
1373 while let Some(collaborator) = collaborators.next().await {
1374 let collaborator = collaborator?;
1375 guest_connection_ids.push(collaborator.connection());
1376 }
1377 Ok(guest_connection_ids)
1378 }
1379
1380 /// Returns the [`RoomId`] for the given project.
1381 pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1382 self.transaction(|tx| async move {
1383 Ok(project::Entity::find_by_id(project_id)
1384 .one(&*tx)
1385 .await?
1386 .and_then(|project| project.room_id))
1387 })
1388 .await
1389 }
1390
1391 pub async fn check_room_participants(
1392 &self,
1393 room_id: RoomId,
1394 leader_id: ConnectionId,
1395 follower_id: ConnectionId,
1396 ) -> Result<()> {
1397 self.transaction(|tx| async move {
1398 use room_participant::Column;
1399
1400 let count = room_participant::Entity::find()
1401 .filter(
1402 Condition::all().add(Column::RoomId.eq(room_id)).add(
1403 Condition::any()
1404 .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1405 Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1406 ))
1407 .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1408 Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1409 )),
1410 ),
1411 )
1412 .count(&*tx)
1413 .await?;
1414
1415 if count < 2 {
1416 Err(anyhow!("not room participants"))?;
1417 }
1418
1419 Ok(())
1420 })
1421 .await
1422 }
1423
1424 /// Adds the given follower connection as a follower of the given leader connection.
1425 pub async fn follow(
1426 &self,
1427 room_id: RoomId,
1428 project_id: ProjectId,
1429 leader_connection: ConnectionId,
1430 follower_connection: ConnectionId,
1431 ) -> Result<TransactionGuard<proto::Room>> {
1432 self.room_transaction(room_id, |tx| async move {
1433 follower::ActiveModel {
1434 room_id: ActiveValue::set(room_id),
1435 project_id: ActiveValue::set(project_id),
1436 leader_connection_server_id: ActiveValue::set(ServerId(
1437 leader_connection.owner_id as i32,
1438 )),
1439 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1440 follower_connection_server_id: ActiveValue::set(ServerId(
1441 follower_connection.owner_id as i32,
1442 )),
1443 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1444 ..Default::default()
1445 }
1446 .insert(&*tx)
1447 .await?;
1448
1449 let room = self.get_room(room_id, &tx).await?;
1450 Ok(room)
1451 })
1452 .await
1453 }
1454
1455 /// Removes the given follower connection as a follower of the given leader connection.
1456 pub async fn unfollow(
1457 &self,
1458 room_id: RoomId,
1459 project_id: ProjectId,
1460 leader_connection: ConnectionId,
1461 follower_connection: ConnectionId,
1462 ) -> Result<TransactionGuard<proto::Room>> {
1463 self.room_transaction(room_id, |tx| async move {
1464 follower::Entity::delete_many()
1465 .filter(
1466 Condition::all()
1467 .add(follower::Column::RoomId.eq(room_id))
1468 .add(follower::Column::ProjectId.eq(project_id))
1469 .add(
1470 follower::Column::LeaderConnectionServerId
1471 .eq(leader_connection.owner_id),
1472 )
1473 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1474 .add(
1475 follower::Column::FollowerConnectionServerId
1476 .eq(follower_connection.owner_id),
1477 )
1478 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1479 )
1480 .exec(&*tx)
1481 .await?;
1482
1483 let room = self.get_room(room_id, &tx).await?;
1484 Ok(room)
1485 })
1486 .await
1487 }
1488}