1use anyhow::Context as _;
2use collections::HashSet;
3use util::ResultExt;
4
5use super::*;
6
7impl Database {
8 /// Returns the count of all projects, excluding ones marked as admin.
9 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
10 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
11 enum QueryAs {
12 Count,
13 }
14
15 self.transaction(|tx| async move {
16 Ok(project::Entity::find()
17 .select_only()
18 .column_as(project::Column::Id.count(), QueryAs::Count)
19 .inner_join(user::Entity)
20 .filter(user::Column::Admin.eq(false))
21 .into_values::<_, QueryAs>()
22 .one(&*tx)
23 .await?
24 .unwrap_or(0i64) as usize)
25 })
26 .await
27 }
28
29 /// Shares a project with the given room.
30 pub async fn share_project(
31 &self,
32 room_id: RoomId,
33 connection: ConnectionId,
34 worktrees: &[proto::WorktreeMetadata],
35 is_ssh_project: bool,
36 windows_paths: bool,
37 ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
38 self.room_transaction(room_id, |tx| async move {
39 let participant = room_participant::Entity::find()
40 .filter(
41 Condition::all()
42 .add(
43 room_participant::Column::AnsweringConnectionId
44 .eq(connection.id as i32),
45 )
46 .add(
47 room_participant::Column::AnsweringConnectionServerId
48 .eq(connection.owner_id as i32),
49 ),
50 )
51 .one(&*tx)
52 .await?
53 .context("could not find participant")?;
54 if participant.room_id != room_id {
55 return Err(anyhow!("shared project on unexpected room"))?;
56 }
57 if !participant
58 .role
59 .unwrap_or(ChannelRole::Member)
60 .can_edit_projects()
61 {
62 return Err(anyhow!("guests cannot share projects"))?;
63 }
64
65 let project = project::ActiveModel {
66 room_id: ActiveValue::set(Some(participant.room_id)),
67 host_user_id: ActiveValue::set(Some(participant.user_id)),
68 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
69 host_connection_server_id: ActiveValue::set(Some(ServerId(
70 connection.owner_id as i32,
71 ))),
72 id: ActiveValue::NotSet,
73 windows_paths: ActiveValue::set(windows_paths),
74 }
75 .insert(&*tx)
76 .await?;
77
78 if !worktrees.is_empty() {
79 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
80 worktree::ActiveModel {
81 id: ActiveValue::set(worktree.id as i64),
82 project_id: ActiveValue::set(project.id),
83 abs_path: ActiveValue::set(worktree.abs_path.clone()),
84 root_name: ActiveValue::set(worktree.root_name.clone()),
85 visible: ActiveValue::set(worktree.visible),
86 scan_id: ActiveValue::set(0),
87 completed_scan_id: ActiveValue::set(0),
88 }
89 }))
90 .exec(&*tx)
91 .await?;
92 }
93
94 let replica_id = if is_ssh_project { 1 } else { 0 };
95
96 project_collaborator::ActiveModel {
97 project_id: ActiveValue::set(project.id),
98 connection_id: ActiveValue::set(connection.id as i32),
99 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
100 user_id: ActiveValue::set(participant.user_id),
101 replica_id: ActiveValue::set(ReplicaId(replica_id)),
102 is_host: ActiveValue::set(true),
103 id: ActiveValue::NotSet,
104 committer_name: ActiveValue::Set(None),
105 committer_email: ActiveValue::Set(None),
106 }
107 .insert(&*tx)
108 .await?;
109
110 let room = self.get_room(room_id, &tx).await?;
111 Ok((project.id, room))
112 })
113 .await
114 }
115
116 pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
117 self.transaction(|tx| async move {
118 project::Entity::delete_by_id(project_id).exec(&*tx).await?;
119 Ok(())
120 })
121 .await
122 }
123
124 /// Unshares the given project.
125 pub async fn unshare_project(
126 &self,
127 project_id: ProjectId,
128 connection: ConnectionId,
129 ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
130 self.project_transaction(project_id, |tx| async move {
131 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
132 let project = project::Entity::find_by_id(project_id)
133 .one(&*tx)
134 .await?
135 .context("project not found")?;
136 let room = if let Some(room_id) = project.room_id {
137 Some(self.get_room(room_id, &tx).await?)
138 } else {
139 None
140 };
141 if project.host_connection()? == connection {
142 return Ok((true, room, guest_connection_ids));
143 }
144 Err(anyhow!("cannot unshare a project hosted by another user"))?
145 })
146 .await
147 }
148
149 /// Updates the worktrees associated with the given project.
150 pub async fn update_project(
151 &self,
152 project_id: ProjectId,
153 connection: ConnectionId,
154 worktrees: &[proto::WorktreeMetadata],
155 ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
156 self.project_transaction(project_id, |tx| async move {
157 let project = project::Entity::find_by_id(project_id)
158 .filter(
159 Condition::all()
160 .add(project::Column::HostConnectionId.eq(connection.id as i32))
161 .add(
162 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
163 ),
164 )
165 .one(&*tx)
166 .await?
167 .context("no such project")?;
168
169 self.update_project_worktrees(project.id, worktrees, &tx)
170 .await?;
171
172 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
173
174 let room = if let Some(room_id) = project.room_id {
175 Some(self.get_room(room_id, &tx).await?)
176 } else {
177 None
178 };
179
180 Ok((room, guest_connection_ids))
181 })
182 .await
183 }
184
185 pub(in crate::db) async fn update_project_worktrees(
186 &self,
187 project_id: ProjectId,
188 worktrees: &[proto::WorktreeMetadata],
189 tx: &DatabaseTransaction,
190 ) -> Result<()> {
191 if !worktrees.is_empty() {
192 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
193 id: ActiveValue::set(worktree.id as i64),
194 project_id: ActiveValue::set(project_id),
195 abs_path: ActiveValue::set(worktree.abs_path.clone()),
196 root_name: ActiveValue::set(worktree.root_name.clone()),
197 visible: ActiveValue::set(worktree.visible),
198 scan_id: ActiveValue::set(0),
199 completed_scan_id: ActiveValue::set(0),
200 }))
201 .on_conflict(
202 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
203 .update_column(worktree::Column::RootName)
204 .to_owned(),
205 )
206 .exec(tx)
207 .await?;
208 }
209
210 worktree::Entity::delete_many()
211 .filter(worktree::Column::ProjectId.eq(project_id).and(
212 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
213 ))
214 .exec(tx)
215 .await?;
216
217 Ok(())
218 }
219
220 pub async fn update_worktree(
221 &self,
222 update: &proto::UpdateWorktree,
223 connection: ConnectionId,
224 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
225 if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
226 || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
227 {
228 return Err(anyhow!(
229 "invalid worktree update. removed entries: {}, updated entries: {}",
230 update.removed_entries.len(),
231 update.updated_entries.len()
232 ))?;
233 }
234
235 let project_id = ProjectId::from_proto(update.project_id);
236 let worktree_id = update.worktree_id as i64;
237 self.project_transaction(project_id, |tx| async move {
238 // Ensure the update comes from the host.
239 let _project = project::Entity::find_by_id(project_id)
240 .filter(
241 Condition::all()
242 .add(project::Column::HostConnectionId.eq(connection.id as i32))
243 .add(
244 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
245 ),
246 )
247 .one(&*tx)
248 .await?
249 .with_context(|| format!("no such project: {project_id}"))?;
250
251 // Update metadata.
252 worktree::Entity::update(worktree::ActiveModel {
253 id: ActiveValue::set(worktree_id),
254 project_id: ActiveValue::set(project_id),
255 root_name: ActiveValue::set(update.root_name.clone()),
256 scan_id: ActiveValue::set(update.scan_id as i64),
257 completed_scan_id: if update.is_last_update {
258 ActiveValue::set(update.scan_id as i64)
259 } else {
260 ActiveValue::default()
261 },
262 abs_path: ActiveValue::set(update.abs_path.clone()),
263 ..Default::default()
264 })
265 .exec(&*tx)
266 .await?;
267
268 if !update.updated_entries.is_empty() {
269 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
270 let mtime = entry.mtime.clone().unwrap_or_default();
271 worktree_entry::ActiveModel {
272 project_id: ActiveValue::set(project_id),
273 worktree_id: ActiveValue::set(worktree_id),
274 id: ActiveValue::set(entry.id as i64),
275 is_dir: ActiveValue::set(entry.is_dir),
276 path: ActiveValue::set(entry.path.clone()),
277 inode: ActiveValue::set(entry.inode as i64),
278 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
279 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
280 canonical_path: ActiveValue::set(entry.canonical_path.clone()),
281 is_ignored: ActiveValue::set(entry.is_ignored),
282 git_status: ActiveValue::set(None),
283 is_external: ActiveValue::set(entry.is_external),
284 is_deleted: ActiveValue::set(false),
285 is_hidden: ActiveValue::set(entry.is_hidden),
286 scan_id: ActiveValue::set(update.scan_id as i64),
287 is_fifo: ActiveValue::set(entry.is_fifo),
288 }
289 }))
290 .on_conflict(
291 OnConflict::columns([
292 worktree_entry::Column::ProjectId,
293 worktree_entry::Column::WorktreeId,
294 worktree_entry::Column::Id,
295 ])
296 .update_columns([
297 worktree_entry::Column::IsDir,
298 worktree_entry::Column::Path,
299 worktree_entry::Column::Inode,
300 worktree_entry::Column::MtimeSeconds,
301 worktree_entry::Column::MtimeNanos,
302 worktree_entry::Column::CanonicalPath,
303 worktree_entry::Column::IsIgnored,
304 worktree_entry::Column::IsHidden,
305 worktree_entry::Column::ScanId,
306 ])
307 .to_owned(),
308 )
309 .exec(&*tx)
310 .await?;
311 }
312
313 if !update.removed_entries.is_empty() {
314 worktree_entry::Entity::update_many()
315 .filter(
316 worktree_entry::Column::ProjectId
317 .eq(project_id)
318 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
319 .and(
320 worktree_entry::Column::Id
321 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
322 ),
323 )
324 .set(worktree_entry::ActiveModel {
325 is_deleted: ActiveValue::Set(true),
326 scan_id: ActiveValue::Set(update.scan_id as i64),
327 ..Default::default()
328 })
329 .exec(&*tx)
330 .await?;
331 }
332
333 // Backward-compatibility for old Zed clients.
334 //
335 // Remove this block when Zed 1.80 stable has been out for a week.
336 {
337 if !update.updated_repositories.is_empty() {
338 project_repository::Entity::insert_many(
339 update.updated_repositories.iter().map(|repository| {
340 project_repository::ActiveModel {
341 project_id: ActiveValue::set(project_id),
342 legacy_worktree_id: ActiveValue::set(Some(worktree_id)),
343 id: ActiveValue::set(repository.repository_id as i64),
344 scan_id: ActiveValue::set(update.scan_id as i64),
345 is_deleted: ActiveValue::set(false),
346 branch_summary: ActiveValue::Set(
347 repository
348 .branch_summary
349 .as_ref()
350 .map(|summary| serde_json::to_string(summary).unwrap()),
351 ),
352 current_merge_conflicts: ActiveValue::Set(Some(
353 serde_json::to_string(&repository.current_merge_conflicts)
354 .unwrap(),
355 )),
356 // Old clients do not use abs path, entry ids, head_commit_details, or merge_message.
357 abs_path: ActiveValue::set(String::new()),
358 entry_ids: ActiveValue::set("[]".into()),
359 head_commit_details: ActiveValue::set(None),
360 merge_message: ActiveValue::set(None),
361 }
362 }),
363 )
364 .on_conflict(
365 OnConflict::columns([
366 project_repository::Column::ProjectId,
367 project_repository::Column::Id,
368 ])
369 .update_columns([
370 project_repository::Column::ScanId,
371 project_repository::Column::BranchSummary,
372 project_repository::Column::CurrentMergeConflicts,
373 ])
374 .to_owned(),
375 )
376 .exec(&*tx)
377 .await?;
378
379 let has_any_statuses = update
380 .updated_repositories
381 .iter()
382 .any(|repository| !repository.updated_statuses.is_empty());
383
384 if has_any_statuses {
385 project_repository_statuses::Entity::insert_many(
386 update.updated_repositories.iter().flat_map(
387 |repository: &proto::RepositoryEntry| {
388 repository.updated_statuses.iter().map(|status_entry| {
389 let (repo_path, status_kind, first_status, second_status) =
390 proto_status_to_db(status_entry.clone());
391 project_repository_statuses::ActiveModel {
392 project_id: ActiveValue::set(project_id),
393 repository_id: ActiveValue::set(
394 repository.repository_id as i64,
395 ),
396 scan_id: ActiveValue::set(update.scan_id as i64),
397 is_deleted: ActiveValue::set(false),
398 repo_path: ActiveValue::set(repo_path),
399 status: ActiveValue::set(0),
400 status_kind: ActiveValue::set(status_kind),
401 first_status: ActiveValue::set(first_status),
402 second_status: ActiveValue::set(second_status),
403 }
404 })
405 },
406 ),
407 )
408 .on_conflict(
409 OnConflict::columns([
410 project_repository_statuses::Column::ProjectId,
411 project_repository_statuses::Column::RepositoryId,
412 project_repository_statuses::Column::RepoPath,
413 ])
414 .update_columns([
415 project_repository_statuses::Column::ScanId,
416 project_repository_statuses::Column::StatusKind,
417 project_repository_statuses::Column::FirstStatus,
418 project_repository_statuses::Column::SecondStatus,
419 ])
420 .to_owned(),
421 )
422 .exec(&*tx)
423 .await?;
424 }
425
426 for repo in &update.updated_repositories {
427 if !repo.removed_statuses.is_empty() {
428 project_repository_statuses::Entity::update_many()
429 .filter(
430 project_repository_statuses::Column::ProjectId
431 .eq(project_id)
432 .and(
433 project_repository_statuses::Column::RepositoryId
434 .eq(repo.repository_id),
435 )
436 .and(
437 project_repository_statuses::Column::RepoPath
438 .is_in(repo.removed_statuses.iter()),
439 ),
440 )
441 .set(project_repository_statuses::ActiveModel {
442 is_deleted: ActiveValue::Set(true),
443 scan_id: ActiveValue::Set(update.scan_id as i64),
444 ..Default::default()
445 })
446 .exec(&*tx)
447 .await?;
448 }
449 }
450 }
451
452 if !update.removed_repositories.is_empty() {
453 project_repository::Entity::update_many()
454 .filter(
455 project_repository::Column::ProjectId
456 .eq(project_id)
457 .and(project_repository::Column::LegacyWorktreeId.eq(worktree_id))
458 .and(project_repository::Column::Id.is_in(
459 update.removed_repositories.iter().map(|id| *id as i64),
460 )),
461 )
462 .set(project_repository::ActiveModel {
463 is_deleted: ActiveValue::Set(true),
464 scan_id: ActiveValue::Set(update.scan_id as i64),
465 ..Default::default()
466 })
467 .exec(&*tx)
468 .await?;
469 }
470 }
471
472 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
473 Ok(connection_ids)
474 })
475 .await
476 }
477
478 pub async fn update_repository(
479 &self,
480 update: &proto::UpdateRepository,
481 _connection: ConnectionId,
482 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
483 let project_id = ProjectId::from_proto(update.project_id);
484 let repository_id = update.id as i64;
485 self.project_transaction(project_id, |tx| async move {
486 project_repository::Entity::insert(project_repository::ActiveModel {
487 project_id: ActiveValue::set(project_id),
488 id: ActiveValue::set(repository_id),
489 legacy_worktree_id: ActiveValue::set(None),
490 abs_path: ActiveValue::set(update.abs_path.clone()),
491 entry_ids: ActiveValue::Set(serde_json::to_string(&update.entry_ids).unwrap()),
492 scan_id: ActiveValue::set(update.scan_id as i64),
493 is_deleted: ActiveValue::set(false),
494 branch_summary: ActiveValue::Set(
495 update
496 .branch_summary
497 .as_ref()
498 .map(|summary| serde_json::to_string(summary).unwrap()),
499 ),
500 head_commit_details: ActiveValue::Set(
501 update
502 .head_commit_details
503 .as_ref()
504 .map(|details| serde_json::to_string(details).unwrap()),
505 ),
506 current_merge_conflicts: ActiveValue::Set(Some(
507 serde_json::to_string(&update.current_merge_conflicts).unwrap(),
508 )),
509 merge_message: ActiveValue::set(update.merge_message.clone()),
510 })
511 .on_conflict(
512 OnConflict::columns([
513 project_repository::Column::ProjectId,
514 project_repository::Column::Id,
515 ])
516 .update_columns([
517 project_repository::Column::ScanId,
518 project_repository::Column::BranchSummary,
519 project_repository::Column::EntryIds,
520 project_repository::Column::AbsPath,
521 project_repository::Column::CurrentMergeConflicts,
522 project_repository::Column::HeadCommitDetails,
523 project_repository::Column::MergeMessage,
524 ])
525 .to_owned(),
526 )
527 .exec(&*tx)
528 .await?;
529
530 let has_any_statuses = !update.updated_statuses.is_empty();
531
532 if has_any_statuses {
533 project_repository_statuses::Entity::insert_many(
534 update.updated_statuses.iter().map(|status_entry| {
535 let (repo_path, status_kind, first_status, second_status) =
536 proto_status_to_db(status_entry.clone());
537 project_repository_statuses::ActiveModel {
538 project_id: ActiveValue::set(project_id),
539 repository_id: ActiveValue::set(repository_id),
540 scan_id: ActiveValue::set(update.scan_id as i64),
541 is_deleted: ActiveValue::set(false),
542 repo_path: ActiveValue::set(repo_path),
543 status: ActiveValue::set(0),
544 status_kind: ActiveValue::set(status_kind),
545 first_status: ActiveValue::set(first_status),
546 second_status: ActiveValue::set(second_status),
547 }
548 }),
549 )
550 .on_conflict(
551 OnConflict::columns([
552 project_repository_statuses::Column::ProjectId,
553 project_repository_statuses::Column::RepositoryId,
554 project_repository_statuses::Column::RepoPath,
555 ])
556 .update_columns([
557 project_repository_statuses::Column::ScanId,
558 project_repository_statuses::Column::StatusKind,
559 project_repository_statuses::Column::FirstStatus,
560 project_repository_statuses::Column::SecondStatus,
561 ])
562 .to_owned(),
563 )
564 .exec(&*tx)
565 .await?;
566 }
567
568 let has_any_removed_statuses = !update.removed_statuses.is_empty();
569
570 if has_any_removed_statuses {
571 project_repository_statuses::Entity::update_many()
572 .filter(
573 project_repository_statuses::Column::ProjectId
574 .eq(project_id)
575 .and(
576 project_repository_statuses::Column::RepositoryId.eq(repository_id),
577 )
578 .and(
579 project_repository_statuses::Column::RepoPath
580 .is_in(update.removed_statuses.iter()),
581 ),
582 )
583 .set(project_repository_statuses::ActiveModel {
584 is_deleted: ActiveValue::Set(true),
585 scan_id: ActiveValue::Set(update.scan_id as i64),
586 ..Default::default()
587 })
588 .exec(&*tx)
589 .await?;
590 }
591
592 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
593 Ok(connection_ids)
594 })
595 .await
596 }
597
598 pub async fn remove_repository(
599 &self,
600 remove: &proto::RemoveRepository,
601 _connection: ConnectionId,
602 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
603 let project_id = ProjectId::from_proto(remove.project_id);
604 let repository_id = remove.id as i64;
605 self.project_transaction(project_id, |tx| async move {
606 project_repository::Entity::update_many()
607 .filter(
608 project_repository::Column::ProjectId
609 .eq(project_id)
610 .and(project_repository::Column::Id.eq(repository_id)),
611 )
612 .set(project_repository::ActiveModel {
613 is_deleted: ActiveValue::Set(true),
614 // scan_id: ActiveValue::Set(update.scan_id as i64),
615 ..Default::default()
616 })
617 .exec(&*tx)
618 .await?;
619
620 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
621 Ok(connection_ids)
622 })
623 .await
624 }
625
626 /// Updates the diagnostic summary for the given connection.
627 pub async fn update_diagnostic_summary(
628 &self,
629 update: &proto::UpdateDiagnosticSummary,
630 connection: ConnectionId,
631 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
632 let project_id = ProjectId::from_proto(update.project_id);
633 let worktree_id = update.worktree_id as i64;
634 self.project_transaction(project_id, |tx| async move {
635 let summary = update.summary.as_ref().context("invalid summary")?;
636
637 // Ensure the update comes from the host.
638 let project = project::Entity::find_by_id(project_id)
639 .one(&*tx)
640 .await?
641 .context("no such project")?;
642 if project.host_connection()? != connection {
643 return Err(anyhow!("can't update a project hosted by someone else"))?;
644 }
645
646 // Update summary.
647 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
648 project_id: ActiveValue::set(project_id),
649 worktree_id: ActiveValue::set(worktree_id),
650 path: ActiveValue::set(summary.path.clone()),
651 language_server_id: ActiveValue::set(summary.language_server_id as i64),
652 error_count: ActiveValue::set(summary.error_count as i32),
653 warning_count: ActiveValue::set(summary.warning_count as i32),
654 })
655 .on_conflict(
656 OnConflict::columns([
657 worktree_diagnostic_summary::Column::ProjectId,
658 worktree_diagnostic_summary::Column::WorktreeId,
659 worktree_diagnostic_summary::Column::Path,
660 ])
661 .update_columns([
662 worktree_diagnostic_summary::Column::LanguageServerId,
663 worktree_diagnostic_summary::Column::ErrorCount,
664 worktree_diagnostic_summary::Column::WarningCount,
665 ])
666 .to_owned(),
667 )
668 .exec(&*tx)
669 .await?;
670
671 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
672 Ok(connection_ids)
673 })
674 .await
675 }
676
677 /// Starts the language server for the given connection.
678 pub async fn start_language_server(
679 &self,
680 update: &proto::StartLanguageServer,
681 connection: ConnectionId,
682 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
683 let project_id = ProjectId::from_proto(update.project_id);
684 self.project_transaction(project_id, |tx| async move {
685 let server = update.server.as_ref().context("invalid language server")?;
686
687 // Ensure the update comes from the host.
688 let project = project::Entity::find_by_id(project_id)
689 .one(&*tx)
690 .await?
691 .context("no such project")?;
692 if project.host_connection()? != connection {
693 return Err(anyhow!("can't update a project hosted by someone else"))?;
694 }
695
696 // Add the newly-started language server.
697 language_server::Entity::insert(language_server::ActiveModel {
698 project_id: ActiveValue::set(project_id),
699 id: ActiveValue::set(server.id as i64),
700 name: ActiveValue::set(server.name.clone()),
701 worktree_id: ActiveValue::set(server.worktree_id.map(|id| id as i64)),
702 capabilities: ActiveValue::set(update.capabilities.clone()),
703 })
704 .on_conflict(
705 OnConflict::columns([
706 language_server::Column::ProjectId,
707 language_server::Column::Id,
708 ])
709 .update_columns([
710 language_server::Column::Name,
711 language_server::Column::Capabilities,
712 language_server::Column::WorktreeId,
713 ])
714 .to_owned(),
715 )
716 .exec(&*tx)
717 .await?;
718
719 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
720 Ok(connection_ids)
721 })
722 .await
723 }
724
725 /// Updates the worktree settings for the given connection.
726 pub async fn update_worktree_settings(
727 &self,
728 update: &proto::UpdateWorktreeSettings,
729 connection: ConnectionId,
730 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
731 let project_id = ProjectId::from_proto(update.project_id);
732 let kind = match update.kind {
733 Some(kind) => proto::LocalSettingsKind::from_i32(kind)
734 .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
735 None => proto::LocalSettingsKind::Settings,
736 };
737 let kind = LocalSettingsKind::from_proto(kind);
738 self.project_transaction(project_id, |tx| async move {
739 // Ensure the update comes from the host.
740 let project = project::Entity::find_by_id(project_id)
741 .one(&*tx)
742 .await?
743 .context("no such project")?;
744 if project.host_connection()? != connection {
745 return Err(anyhow!("can't update a project hosted by someone else"))?;
746 }
747
748 if let Some(content) = &update.content {
749 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
750 project_id: ActiveValue::Set(project_id),
751 worktree_id: ActiveValue::Set(update.worktree_id as i64),
752 path: ActiveValue::Set(update.path.clone()),
753 content: ActiveValue::Set(content.clone()),
754 kind: ActiveValue::Set(kind),
755 })
756 .on_conflict(
757 OnConflict::columns([
758 worktree_settings_file::Column::ProjectId,
759 worktree_settings_file::Column::WorktreeId,
760 worktree_settings_file::Column::Path,
761 ])
762 .update_column(worktree_settings_file::Column::Content)
763 .to_owned(),
764 )
765 .exec(&*tx)
766 .await?;
767 } else {
768 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
769 project_id: ActiveValue::Set(project_id),
770 worktree_id: ActiveValue::Set(update.worktree_id as i64),
771 path: ActiveValue::Set(update.path.clone()),
772 ..Default::default()
773 })
774 .exec(&*tx)
775 .await?;
776 }
777
778 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
779 Ok(connection_ids)
780 })
781 .await
782 }
783
784 pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
785 self.transaction(|tx| async move {
786 Ok(project::Entity::find_by_id(id)
787 .one(&*tx)
788 .await?
789 .context("no such project")?)
790 })
791 .await
792 }
793
794 /// Adds the given connection to the specified project
795 /// in the current room.
796 pub async fn join_project(
797 &self,
798 project_id: ProjectId,
799 connection: ConnectionId,
800 user_id: UserId,
801 committer_name: Option<String>,
802 committer_email: Option<String>,
803 ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
804 self.project_transaction(project_id, move |tx| {
805 let committer_name = committer_name.clone();
806 let committer_email = committer_email.clone();
807 async move {
808 let (project, role) = self
809 .access_project(project_id, connection, Capability::ReadOnly, &tx)
810 .await?;
811 self.join_project_internal(
812 project,
813 user_id,
814 committer_name,
815 committer_email,
816 connection,
817 role,
818 &tx,
819 )
820 .await
821 }
822 })
823 .await
824 }
825
826 async fn join_project_internal(
827 &self,
828 project: project::Model,
829 user_id: UserId,
830 committer_name: Option<String>,
831 committer_email: Option<String>,
832 connection: ConnectionId,
833 role: ChannelRole,
834 tx: &DatabaseTransaction,
835 ) -> Result<(Project, ReplicaId)> {
836 let mut collaborators = project
837 .find_related(project_collaborator::Entity)
838 .all(tx)
839 .await?;
840 let replica_ids = collaborators
841 .iter()
842 .map(|c| c.replica_id)
843 .collect::<HashSet<_>>();
844 let mut replica_id = ReplicaId(1);
845 while replica_ids.contains(&replica_id) {
846 replica_id.0 += 1;
847 }
848 let new_collaborator = project_collaborator::ActiveModel {
849 project_id: ActiveValue::set(project.id),
850 connection_id: ActiveValue::set(connection.id as i32),
851 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
852 user_id: ActiveValue::set(user_id),
853 replica_id: ActiveValue::set(replica_id),
854 is_host: ActiveValue::set(false),
855 id: ActiveValue::NotSet,
856 committer_name: ActiveValue::set(committer_name),
857 committer_email: ActiveValue::set(committer_email),
858 }
859 .insert(tx)
860 .await?;
861 collaborators.push(new_collaborator);
862
863 let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
864 let mut worktrees = db_worktrees
865 .into_iter()
866 .map(|db_worktree| {
867 (
868 db_worktree.id as u64,
869 Worktree {
870 id: db_worktree.id as u64,
871 abs_path: db_worktree.abs_path,
872 root_name: db_worktree.root_name,
873 visible: db_worktree.visible,
874 entries: Default::default(),
875 diagnostic_summaries: Default::default(),
876 settings_files: Default::default(),
877 scan_id: db_worktree.scan_id as u64,
878 completed_scan_id: db_worktree.completed_scan_id as u64,
879 legacy_repository_entries: Default::default(),
880 },
881 )
882 })
883 .collect::<BTreeMap<_, _>>();
884
885 // Populate worktree entries.
886 {
887 let mut db_entries = worktree_entry::Entity::find()
888 .filter(
889 Condition::all()
890 .add(worktree_entry::Column::ProjectId.eq(project.id))
891 .add(worktree_entry::Column::IsDeleted.eq(false)),
892 )
893 .stream(tx)
894 .await?;
895 while let Some(db_entry) = db_entries.next().await {
896 let db_entry = db_entry?;
897 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
898 worktree.entries.push(proto::Entry {
899 id: db_entry.id as u64,
900 is_dir: db_entry.is_dir,
901 path: db_entry.path,
902 inode: db_entry.inode as u64,
903 mtime: Some(proto::Timestamp {
904 seconds: db_entry.mtime_seconds as u64,
905 nanos: db_entry.mtime_nanos as u32,
906 }),
907 canonical_path: db_entry.canonical_path,
908 is_ignored: db_entry.is_ignored,
909 is_external: db_entry.is_external,
910 is_hidden: db_entry.is_hidden,
911 // This is only used in the summarization backlog, so if it's None,
912 // that just means we won't be able to detect when to resummarize
913 // based on total number of backlogged bytes - instead, we'd go
914 // on number of files only. That shouldn't be a huge deal in practice.
915 size: None,
916 is_fifo: db_entry.is_fifo,
917 });
918 }
919 }
920 }
921
922 // Populate repository entries.
923 let mut repositories = Vec::new();
924 {
925 let db_repository_entries = project_repository::Entity::find()
926 .filter(
927 Condition::all()
928 .add(project_repository::Column::ProjectId.eq(project.id))
929 .add(project_repository::Column::IsDeleted.eq(false)),
930 )
931 .all(tx)
932 .await?;
933 for db_repository_entry in db_repository_entries {
934 let mut repository_statuses = project_repository_statuses::Entity::find()
935 .filter(
936 Condition::all()
937 .add(project_repository_statuses::Column::ProjectId.eq(project.id))
938 .add(
939 project_repository_statuses::Column::RepositoryId
940 .eq(db_repository_entry.id),
941 )
942 .add(project_repository_statuses::Column::IsDeleted.eq(false)),
943 )
944 .stream(tx)
945 .await?;
946 let mut updated_statuses = Vec::new();
947 while let Some(status_entry) = repository_statuses.next().await {
948 let status_entry = status_entry?;
949 updated_statuses.push(db_status_to_proto(status_entry)?);
950 }
951
952 let current_merge_conflicts = db_repository_entry
953 .current_merge_conflicts
954 .as_ref()
955 .map(|conflicts| serde_json::from_str(conflicts))
956 .transpose()?
957 .unwrap_or_default();
958
959 let branch_summary = db_repository_entry
960 .branch_summary
961 .as_ref()
962 .map(|branch_summary| serde_json::from_str(branch_summary))
963 .transpose()?
964 .unwrap_or_default();
965
966 let head_commit_details = db_repository_entry
967 .head_commit_details
968 .as_ref()
969 .map(|head_commit_details| serde_json::from_str(head_commit_details))
970 .transpose()?
971 .unwrap_or_default();
972
973 let entry_ids = serde_json::from_str(&db_repository_entry.entry_ids)
974 .context("failed to deserialize repository's entry ids")?;
975
976 if let Some(worktree_id) = db_repository_entry.legacy_worktree_id {
977 if let Some(worktree) = worktrees.get_mut(&(worktree_id as u64)) {
978 worktree.legacy_repository_entries.insert(
979 db_repository_entry.id as u64,
980 proto::RepositoryEntry {
981 repository_id: db_repository_entry.id as u64,
982 updated_statuses,
983 removed_statuses: Vec::new(),
984 current_merge_conflicts,
985 branch_summary,
986 },
987 );
988 }
989 } else {
990 repositories.push(proto::UpdateRepository {
991 project_id: db_repository_entry.project_id.0 as u64,
992 id: db_repository_entry.id as u64,
993 abs_path: db_repository_entry.abs_path,
994 entry_ids,
995 updated_statuses,
996 removed_statuses: Vec::new(),
997 current_merge_conflicts,
998 branch_summary,
999 head_commit_details,
1000 scan_id: db_repository_entry.scan_id as u64,
1001 is_last_update: true,
1002 merge_message: db_repository_entry.merge_message,
1003 stash_entries: Vec::new(),
1004 });
1005 }
1006 }
1007 }
1008
1009 // Populate worktree diagnostic summaries.
1010 {
1011 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
1012 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
1013 .stream(tx)
1014 .await?;
1015 while let Some(db_summary) = db_summaries.next().await {
1016 let db_summary = db_summary?;
1017 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
1018 worktree
1019 .diagnostic_summaries
1020 .push(proto::DiagnosticSummary {
1021 path: db_summary.path,
1022 language_server_id: db_summary.language_server_id as u64,
1023 error_count: db_summary.error_count as u32,
1024 warning_count: db_summary.warning_count as u32,
1025 });
1026 }
1027 }
1028 }
1029
1030 // Populate worktree settings files
1031 {
1032 let mut db_settings_files = worktree_settings_file::Entity::find()
1033 .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
1034 .stream(tx)
1035 .await?;
1036 while let Some(db_settings_file) = db_settings_files.next().await {
1037 let db_settings_file = db_settings_file?;
1038 if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
1039 worktree.settings_files.push(WorktreeSettingsFile {
1040 path: db_settings_file.path,
1041 content: db_settings_file.content,
1042 kind: db_settings_file.kind,
1043 });
1044 }
1045 }
1046 }
1047
1048 // Populate language servers.
1049 let language_servers = project
1050 .find_related(language_server::Entity)
1051 .all(tx)
1052 .await?;
1053
1054 let path_style = if project.windows_paths {
1055 PathStyle::Windows
1056 } else {
1057 PathStyle::Posix
1058 };
1059
1060 let project = Project {
1061 id: project.id,
1062 role,
1063 collaborators: collaborators
1064 .into_iter()
1065 .map(|collaborator| ProjectCollaborator {
1066 connection_id: collaborator.connection(),
1067 user_id: collaborator.user_id,
1068 replica_id: collaborator.replica_id,
1069 is_host: collaborator.is_host,
1070 committer_name: collaborator.committer_name,
1071 committer_email: collaborator.committer_email,
1072 })
1073 .collect(),
1074 worktrees,
1075 repositories,
1076 language_servers: language_servers
1077 .into_iter()
1078 .map(|language_server| LanguageServer {
1079 server: proto::LanguageServer {
1080 id: language_server.id as u64,
1081 name: language_server.name,
1082 worktree_id: language_server.worktree_id.map(|id| id as u64),
1083 },
1084 capabilities: language_server.capabilities,
1085 })
1086 .collect(),
1087 path_style,
1088 };
1089 Ok((project, replica_id as ReplicaId))
1090 }
1091
1092 /// Removes the given connection from the specified project.
1093 pub async fn leave_project(
1094 &self,
1095 project_id: ProjectId,
1096 connection: ConnectionId,
1097 ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
1098 self.project_transaction(project_id, |tx| async move {
1099 let result = project_collaborator::Entity::delete_many()
1100 .filter(
1101 Condition::all()
1102 .add(project_collaborator::Column::ProjectId.eq(project_id))
1103 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
1104 .add(
1105 project_collaborator::Column::ConnectionServerId
1106 .eq(connection.owner_id as i32),
1107 ),
1108 )
1109 .exec(&*tx)
1110 .await?;
1111 if result.rows_affected == 0 {
1112 Err(anyhow!("not a collaborator on this project"))?;
1113 }
1114
1115 let project = project::Entity::find_by_id(project_id)
1116 .one(&*tx)
1117 .await?
1118 .context("no such project")?;
1119 let collaborators = project
1120 .find_related(project_collaborator::Entity)
1121 .all(&*tx)
1122 .await?;
1123 let connection_ids: Vec<ConnectionId> = collaborators
1124 .into_iter()
1125 .map(|collaborator| collaborator.connection())
1126 .collect();
1127
1128 follower::Entity::delete_many()
1129 .filter(
1130 Condition::any()
1131 .add(
1132 Condition::all()
1133 .add(follower::Column::ProjectId.eq(Some(project_id)))
1134 .add(
1135 follower::Column::LeaderConnectionServerId
1136 .eq(connection.owner_id),
1137 )
1138 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
1139 )
1140 .add(
1141 Condition::all()
1142 .add(follower::Column::ProjectId.eq(Some(project_id)))
1143 .add(
1144 follower::Column::FollowerConnectionServerId
1145 .eq(connection.owner_id),
1146 )
1147 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
1148 ),
1149 )
1150 .exec(&*tx)
1151 .await?;
1152
1153 let room = if let Some(room_id) = project.room_id {
1154 Some(self.get_room(room_id, &tx).await?)
1155 } else {
1156 None
1157 };
1158
1159 let left_project = LeftProject {
1160 id: project_id,
1161 should_unshare: connection == project.host_connection()?,
1162 connection_ids,
1163 };
1164 Ok((room, left_project))
1165 })
1166 .await
1167 }
1168
1169 pub async fn check_user_is_project_host(
1170 &self,
1171 project_id: ProjectId,
1172 connection_id: ConnectionId,
1173 ) -> Result<()> {
1174 self.project_transaction(project_id, |tx| async move {
1175 project::Entity::find()
1176 .filter(
1177 Condition::all()
1178 .add(project::Column::Id.eq(project_id))
1179 .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
1180 .add(
1181 project::Column::HostConnectionServerId
1182 .eq(Some(connection_id.owner_id as i32)),
1183 ),
1184 )
1185 .one(&*tx)
1186 .await?
1187 .context("failed to read project host")?;
1188
1189 Ok(())
1190 })
1191 .await
1192 .map(|guard| guard.into_inner())
1193 }
1194
1195 /// Returns the current project if the given user is authorized to access it with the specified capability.
1196 pub async fn access_project(
1197 &self,
1198 project_id: ProjectId,
1199 connection_id: ConnectionId,
1200 capability: Capability,
1201 tx: &DatabaseTransaction,
1202 ) -> Result<(project::Model, ChannelRole)> {
1203 let project = project::Entity::find_by_id(project_id)
1204 .one(tx)
1205 .await?
1206 .context("no such project")?;
1207
1208 let role_from_room = if let Some(room_id) = project.room_id {
1209 room_participant::Entity::find()
1210 .filter(room_participant::Column::RoomId.eq(room_id))
1211 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1212 .one(tx)
1213 .await?
1214 .and_then(|participant| participant.role)
1215 } else {
1216 None
1217 };
1218
1219 let role = role_from_room.unwrap_or(ChannelRole::Banned);
1220
1221 match capability {
1222 Capability::ReadWrite => {
1223 if !role.can_edit_projects() {
1224 return Err(anyhow!("not authorized to edit projects"))?;
1225 }
1226 }
1227 Capability::ReadOnly => {
1228 if !role.can_read_projects() {
1229 return Err(anyhow!("not authorized to read projects"))?;
1230 }
1231 }
1232 }
1233
1234 Ok((project, role))
1235 }
1236
1237 /// Returns the host connection for a read-only request to join a shared project.
1238 pub async fn host_for_read_only_project_request(
1239 &self,
1240 project_id: ProjectId,
1241 connection_id: ConnectionId,
1242 ) -> Result<ConnectionId> {
1243 self.project_transaction(project_id, |tx| async move {
1244 let (project, _) = self
1245 .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1246 .await?;
1247 project.host_connection()
1248 })
1249 .await
1250 .map(|guard| guard.into_inner())
1251 }
1252
1253 /// Returns the host connection for a request to join a shared project.
1254 pub async fn host_for_mutating_project_request(
1255 &self,
1256 project_id: ProjectId,
1257 connection_id: ConnectionId,
1258 ) -> Result<ConnectionId> {
1259 self.project_transaction(project_id, |tx| async move {
1260 let (project, _) = self
1261 .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1262 .await?;
1263 project.host_connection()
1264 })
1265 .await
1266 .map(|guard| guard.into_inner())
1267 }
1268
1269 pub async fn connections_for_buffer_update(
1270 &self,
1271 project_id: ProjectId,
1272 connection_id: ConnectionId,
1273 capability: Capability,
1274 ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1275 self.project_transaction(project_id, |tx| async move {
1276 // Authorize
1277 let (project, _) = self
1278 .access_project(project_id, connection_id, capability, &tx)
1279 .await?;
1280
1281 let host_connection_id = project.host_connection()?;
1282
1283 let collaborators = project_collaborator::Entity::find()
1284 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1285 .all(&*tx)
1286 .await?;
1287
1288 let guest_connection_ids = collaborators
1289 .into_iter()
1290 .filter_map(|collaborator| {
1291 if collaborator.is_host {
1292 None
1293 } else {
1294 Some(collaborator.connection())
1295 }
1296 })
1297 .collect();
1298
1299 Ok((host_connection_id, guest_connection_ids))
1300 })
1301 .await
1302 }
1303
1304 /// Returns the connection IDs in the given project.
1305 ///
1306 /// The provided `connection_id` must also be a collaborator in the project,
1307 /// otherwise an error will be returned.
1308 pub async fn project_connection_ids(
1309 &self,
1310 project_id: ProjectId,
1311 connection_id: ConnectionId,
1312 exclude_dev_server: bool,
1313 ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1314 self.project_transaction(project_id, |tx| async move {
1315 self.internal_project_connection_ids(project_id, connection_id, exclude_dev_server, &tx)
1316 .await
1317 })
1318 .await
1319 }
1320
1321 async fn internal_project_connection_ids(
1322 &self,
1323 project_id: ProjectId,
1324 connection_id: ConnectionId,
1325 exclude_dev_server: bool,
1326 tx: &DatabaseTransaction,
1327 ) -> Result<HashSet<ConnectionId>> {
1328 let project = project::Entity::find_by_id(project_id)
1329 .one(tx)
1330 .await?
1331 .context("no such project")?;
1332
1333 let mut collaborators = project_collaborator::Entity::find()
1334 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1335 .stream(tx)
1336 .await?;
1337
1338 let mut connection_ids = HashSet::default();
1339 if let Some(host_connection) = project.host_connection().log_err()
1340 && !exclude_dev_server
1341 {
1342 connection_ids.insert(host_connection);
1343 }
1344
1345 while let Some(collaborator) = collaborators.next().await {
1346 let collaborator = collaborator?;
1347 connection_ids.insert(collaborator.connection());
1348 }
1349
1350 if connection_ids.contains(&connection_id)
1351 || Some(connection_id) == project.host_connection().ok()
1352 {
1353 Ok(connection_ids)
1354 } else {
1355 Err(anyhow!(
1356 "can only send project updates to a project you're in"
1357 ))?
1358 }
1359 }
1360
1361 async fn project_guest_connection_ids(
1362 &self,
1363 project_id: ProjectId,
1364 tx: &DatabaseTransaction,
1365 ) -> Result<Vec<ConnectionId>> {
1366 let mut collaborators = project_collaborator::Entity::find()
1367 .filter(
1368 project_collaborator::Column::ProjectId
1369 .eq(project_id)
1370 .and(project_collaborator::Column::IsHost.eq(false)),
1371 )
1372 .stream(tx)
1373 .await?;
1374
1375 let mut guest_connection_ids = Vec::new();
1376 while let Some(collaborator) = collaborators.next().await {
1377 let collaborator = collaborator?;
1378 guest_connection_ids.push(collaborator.connection());
1379 }
1380 Ok(guest_connection_ids)
1381 }
1382
1383 /// Returns the [`RoomId`] for the given project.
1384 pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1385 self.transaction(|tx| async move {
1386 Ok(project::Entity::find_by_id(project_id)
1387 .one(&*tx)
1388 .await?
1389 .and_then(|project| project.room_id))
1390 })
1391 .await
1392 }
1393
1394 pub async fn check_room_participants(
1395 &self,
1396 room_id: RoomId,
1397 leader_id: ConnectionId,
1398 follower_id: ConnectionId,
1399 ) -> Result<()> {
1400 self.transaction(|tx| async move {
1401 use room_participant::Column;
1402
1403 let count = room_participant::Entity::find()
1404 .filter(
1405 Condition::all().add(Column::RoomId.eq(room_id)).add(
1406 Condition::any()
1407 .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1408 Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1409 ))
1410 .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1411 Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1412 )),
1413 ),
1414 )
1415 .count(&*tx)
1416 .await?;
1417
1418 if count < 2 {
1419 Err(anyhow!("not room participants"))?;
1420 }
1421
1422 Ok(())
1423 })
1424 .await
1425 }
1426
1427 /// Adds the given follower connection as a follower of the given leader connection.
1428 pub async fn follow(
1429 &self,
1430 room_id: RoomId,
1431 project_id: ProjectId,
1432 leader_connection: ConnectionId,
1433 follower_connection: ConnectionId,
1434 ) -> Result<TransactionGuard<proto::Room>> {
1435 self.room_transaction(room_id, |tx| async move {
1436 follower::ActiveModel {
1437 room_id: ActiveValue::set(room_id),
1438 project_id: ActiveValue::set(project_id),
1439 leader_connection_server_id: ActiveValue::set(ServerId(
1440 leader_connection.owner_id as i32,
1441 )),
1442 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1443 follower_connection_server_id: ActiveValue::set(ServerId(
1444 follower_connection.owner_id as i32,
1445 )),
1446 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1447 ..Default::default()
1448 }
1449 .insert(&*tx)
1450 .await?;
1451
1452 let room = self.get_room(room_id, &tx).await?;
1453 Ok(room)
1454 })
1455 .await
1456 }
1457
1458 /// Removes the given follower connection as a follower of the given leader connection.
1459 pub async fn unfollow(
1460 &self,
1461 room_id: RoomId,
1462 project_id: ProjectId,
1463 leader_connection: ConnectionId,
1464 follower_connection: ConnectionId,
1465 ) -> Result<TransactionGuard<proto::Room>> {
1466 self.room_transaction(room_id, |tx| async move {
1467 follower::Entity::delete_many()
1468 .filter(
1469 Condition::all()
1470 .add(follower::Column::RoomId.eq(room_id))
1471 .add(follower::Column::ProjectId.eq(project_id))
1472 .add(
1473 follower::Column::LeaderConnectionServerId
1474 .eq(leader_connection.owner_id),
1475 )
1476 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1477 .add(
1478 follower::Column::FollowerConnectionServerId
1479 .eq(follower_connection.owner_id),
1480 )
1481 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1482 )
1483 .exec(&*tx)
1484 .await?;
1485
1486 let room = self.get_room(room_id, &tx).await?;
1487 Ok(room)
1488 })
1489 .await
1490 }
1491}