1use anyhow::Context as _;
2use collections::HashSet;
3use util::ResultExt;
4
5use super::*;
6
7impl Database {
8 /// Returns the count of all projects, excluding ones marked as admin.
9 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
10 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
11 enum QueryAs {
12 Count,
13 }
14
15 self.transaction(|tx| async move {
16 Ok(project::Entity::find()
17 .select_only()
18 .column_as(project::Column::Id.count(), QueryAs::Count)
19 .inner_join(user::Entity)
20 .filter(user::Column::Admin.eq(false))
21 .into_values::<_, QueryAs>()
22 .one(&*tx)
23 .await?
24 .unwrap_or(0i64) as usize)
25 })
26 .await
27 }
28
29 /// Shares a project with the given room.
30 pub async fn share_project(
31 &self,
32 room_id: RoomId,
33 connection: ConnectionId,
34 worktrees: &[proto::WorktreeMetadata],
35 is_ssh_project: bool,
36 windows_paths: bool,
37 ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
38 self.room_transaction(room_id, |tx| async move {
39 let participant = room_participant::Entity::find()
40 .filter(
41 Condition::all()
42 .add(
43 room_participant::Column::AnsweringConnectionId
44 .eq(connection.id as i32),
45 )
46 .add(
47 room_participant::Column::AnsweringConnectionServerId
48 .eq(connection.owner_id as i32),
49 ),
50 )
51 .one(&*tx)
52 .await?
53 .context("could not find participant")?;
54 if participant.room_id != room_id {
55 return Err(anyhow!("shared project on unexpected room"))?;
56 }
57 if !participant
58 .role
59 .unwrap_or(ChannelRole::Member)
60 .can_edit_projects()
61 {
62 return Err(anyhow!("guests cannot share projects"))?;
63 }
64
65 let project = project::ActiveModel {
66 room_id: ActiveValue::set(Some(participant.room_id)),
67 host_user_id: ActiveValue::set(Some(participant.user_id)),
68 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
69 host_connection_server_id: ActiveValue::set(Some(ServerId(
70 connection.owner_id as i32,
71 ))),
72 id: ActiveValue::NotSet,
73 windows_paths: ActiveValue::set(windows_paths),
74 }
75 .insert(&*tx)
76 .await?;
77
78 if !worktrees.is_empty() {
79 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
80 worktree::ActiveModel {
81 id: ActiveValue::set(worktree.id as i64),
82 project_id: ActiveValue::set(project.id),
83 abs_path: ActiveValue::set(worktree.abs_path.clone()),
84 root_name: ActiveValue::set(worktree.root_name.clone()),
85 visible: ActiveValue::set(worktree.visible),
86 scan_id: ActiveValue::set(0),
87 completed_scan_id: ActiveValue::set(0),
88 }
89 }))
90 .exec(&*tx)
91 .await?;
92 }
93
94 let replica_id = if is_ssh_project {
95 clock::ReplicaId::REMOTE_SERVER
96 } else {
97 clock::ReplicaId::LOCAL
98 };
99
100 project_collaborator::ActiveModel {
101 project_id: ActiveValue::set(project.id),
102 connection_id: ActiveValue::set(connection.id as i32),
103 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
104 user_id: ActiveValue::set(participant.user_id),
105 replica_id: ActiveValue::set(ReplicaId(replica_id.as_u16() as i32)),
106 is_host: ActiveValue::set(true),
107 id: ActiveValue::NotSet,
108 committer_name: ActiveValue::Set(None),
109 committer_email: ActiveValue::Set(None),
110 }
111 .insert(&*tx)
112 .await?;
113
114 let room = self.get_room(room_id, &tx).await?;
115 Ok((project.id, room))
116 })
117 .await
118 }
119
120 pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
121 self.transaction(|tx| async move {
122 project::Entity::delete_by_id(project_id).exec(&*tx).await?;
123 Ok(())
124 })
125 .await
126 }
127
128 /// Unshares the given project.
129 pub async fn unshare_project(
130 &self,
131 project_id: ProjectId,
132 connection: ConnectionId,
133 ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
134 self.project_transaction(project_id, |tx| async move {
135 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
136 let project = project::Entity::find_by_id(project_id)
137 .one(&*tx)
138 .await?
139 .context("project not found")?;
140 let room = if let Some(room_id) = project.room_id {
141 Some(self.get_room(room_id, &tx).await?)
142 } else {
143 None
144 };
145 if project.host_connection()? == connection {
146 return Ok((true, room, guest_connection_ids));
147 }
148 Err(anyhow!("cannot unshare a project hosted by another user"))?
149 })
150 .await
151 }
152
153 /// Updates the worktrees associated with the given project.
154 pub async fn update_project(
155 &self,
156 project_id: ProjectId,
157 connection: ConnectionId,
158 worktrees: &[proto::WorktreeMetadata],
159 ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
160 self.project_transaction(project_id, |tx| async move {
161 let project = project::Entity::find_by_id(project_id)
162 .filter(
163 Condition::all()
164 .add(project::Column::HostConnectionId.eq(connection.id as i32))
165 .add(
166 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
167 ),
168 )
169 .one(&*tx)
170 .await?
171 .context("no such project")?;
172
173 self.update_project_worktrees(project.id, worktrees, &tx)
174 .await?;
175
176 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
177
178 let room = if let Some(room_id) = project.room_id {
179 Some(self.get_room(room_id, &tx).await?)
180 } else {
181 None
182 };
183
184 Ok((room, guest_connection_ids))
185 })
186 .await
187 }
188
189 pub(in crate::db) async fn update_project_worktrees(
190 &self,
191 project_id: ProjectId,
192 worktrees: &[proto::WorktreeMetadata],
193 tx: &DatabaseTransaction,
194 ) -> Result<()> {
195 if !worktrees.is_empty() {
196 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
197 id: ActiveValue::set(worktree.id as i64),
198 project_id: ActiveValue::set(project_id),
199 abs_path: ActiveValue::set(worktree.abs_path.clone()),
200 root_name: ActiveValue::set(worktree.root_name.clone()),
201 visible: ActiveValue::set(worktree.visible),
202 scan_id: ActiveValue::set(0),
203 completed_scan_id: ActiveValue::set(0),
204 }))
205 .on_conflict(
206 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
207 .update_column(worktree::Column::RootName)
208 .to_owned(),
209 )
210 .exec(tx)
211 .await?;
212 }
213
214 worktree::Entity::delete_many()
215 .filter(worktree::Column::ProjectId.eq(project_id).and(
216 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
217 ))
218 .exec(tx)
219 .await?;
220
221 Ok(())
222 }
223
224 pub async fn update_worktree(
225 &self,
226 update: &proto::UpdateWorktree,
227 connection: ConnectionId,
228 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
229 if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
230 || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
231 {
232 return Err(anyhow!(
233 "invalid worktree update. removed entries: {}, updated entries: {}",
234 update.removed_entries.len(),
235 update.updated_entries.len()
236 ))?;
237 }
238
239 let project_id = ProjectId::from_proto(update.project_id);
240 let worktree_id = update.worktree_id as i64;
241 self.project_transaction(project_id, |tx| async move {
242 // Ensure the update comes from the host.
243 let _project = project::Entity::find_by_id(project_id)
244 .filter(
245 Condition::all()
246 .add(project::Column::HostConnectionId.eq(connection.id as i32))
247 .add(
248 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
249 ),
250 )
251 .one(&*tx)
252 .await?
253 .with_context(|| format!("no such project: {project_id}"))?;
254
255 // Update metadata.
256 worktree::Entity::update(worktree::ActiveModel {
257 id: ActiveValue::set(worktree_id),
258 project_id: ActiveValue::set(project_id),
259 root_name: ActiveValue::set(update.root_name.clone()),
260 scan_id: ActiveValue::set(update.scan_id as i64),
261 completed_scan_id: if update.is_last_update {
262 ActiveValue::set(update.scan_id as i64)
263 } else {
264 ActiveValue::default()
265 },
266 abs_path: ActiveValue::set(update.abs_path.clone()),
267 ..Default::default()
268 })
269 .exec(&*tx)
270 .await?;
271
272 if !update.updated_entries.is_empty() {
273 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
274 let mtime = entry.mtime.clone().unwrap_or_default();
275 worktree_entry::ActiveModel {
276 project_id: ActiveValue::set(project_id),
277 worktree_id: ActiveValue::set(worktree_id),
278 id: ActiveValue::set(entry.id as i64),
279 is_dir: ActiveValue::set(entry.is_dir),
280 path: ActiveValue::set(entry.path.clone()),
281 inode: ActiveValue::set(entry.inode as i64),
282 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
283 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
284 canonical_path: ActiveValue::set(entry.canonical_path.clone()),
285 is_ignored: ActiveValue::set(entry.is_ignored),
286 git_status: ActiveValue::set(None),
287 is_external: ActiveValue::set(entry.is_external),
288 is_deleted: ActiveValue::set(false),
289 is_hidden: ActiveValue::set(entry.is_hidden),
290 scan_id: ActiveValue::set(update.scan_id as i64),
291 is_fifo: ActiveValue::set(entry.is_fifo),
292 }
293 }))
294 .on_conflict(
295 OnConflict::columns([
296 worktree_entry::Column::ProjectId,
297 worktree_entry::Column::WorktreeId,
298 worktree_entry::Column::Id,
299 ])
300 .update_columns([
301 worktree_entry::Column::IsDir,
302 worktree_entry::Column::Path,
303 worktree_entry::Column::Inode,
304 worktree_entry::Column::MtimeSeconds,
305 worktree_entry::Column::MtimeNanos,
306 worktree_entry::Column::CanonicalPath,
307 worktree_entry::Column::IsIgnored,
308 worktree_entry::Column::IsHidden,
309 worktree_entry::Column::ScanId,
310 ])
311 .to_owned(),
312 )
313 .exec(&*tx)
314 .await?;
315 }
316
317 if !update.removed_entries.is_empty() {
318 worktree_entry::Entity::update_many()
319 .filter(
320 worktree_entry::Column::ProjectId
321 .eq(project_id)
322 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
323 .and(
324 worktree_entry::Column::Id
325 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
326 ),
327 )
328 .set(worktree_entry::ActiveModel {
329 is_deleted: ActiveValue::Set(true),
330 scan_id: ActiveValue::Set(update.scan_id as i64),
331 ..Default::default()
332 })
333 .exec(&*tx)
334 .await?;
335 }
336
337 // Backward-compatibility for old Zed clients.
338 //
339 // Remove this block when Zed 1.80 stable has been out for a week.
340 {
341 if !update.updated_repositories.is_empty() {
342 project_repository::Entity::insert_many(
343 update.updated_repositories.iter().map(|repository| {
344 project_repository::ActiveModel {
345 project_id: ActiveValue::set(project_id),
346 legacy_worktree_id: ActiveValue::set(Some(worktree_id)),
347 id: ActiveValue::set(repository.repository_id as i64),
348 scan_id: ActiveValue::set(update.scan_id as i64),
349 is_deleted: ActiveValue::set(false),
350 branch_summary: ActiveValue::Set(
351 repository
352 .branch_summary
353 .as_ref()
354 .map(|summary| serde_json::to_string(summary).unwrap()),
355 ),
356 current_merge_conflicts: ActiveValue::Set(Some(
357 serde_json::to_string(&repository.current_merge_conflicts)
358 .unwrap(),
359 )),
360 // Old clients do not use abs path, entry ids, head_commit_details, or merge_message.
361 abs_path: ActiveValue::set(String::new()),
362 entry_ids: ActiveValue::set("[]".into()),
363 head_commit_details: ActiveValue::set(None),
364 merge_message: ActiveValue::set(None),
365 remote_upstream_url: ActiveValue::set(None),
366 remote_origin_url: ActiveValue::set(None),
367 }
368 }),
369 )
370 .on_conflict(
371 OnConflict::columns([
372 project_repository::Column::ProjectId,
373 project_repository::Column::Id,
374 ])
375 .update_columns([
376 project_repository::Column::ScanId,
377 project_repository::Column::BranchSummary,
378 project_repository::Column::CurrentMergeConflicts,
379 ])
380 .to_owned(),
381 )
382 .exec(&*tx)
383 .await?;
384
385 let has_any_statuses = update
386 .updated_repositories
387 .iter()
388 .any(|repository| !repository.updated_statuses.is_empty());
389
390 if has_any_statuses {
391 project_repository_statuses::Entity::insert_many(
392 update.updated_repositories.iter().flat_map(
393 |repository: &proto::RepositoryEntry| {
394 repository.updated_statuses.iter().map(|status_entry| {
395 let (repo_path, status_kind, first_status, second_status) =
396 proto_status_to_db(status_entry.clone());
397 project_repository_statuses::ActiveModel {
398 project_id: ActiveValue::set(project_id),
399 repository_id: ActiveValue::set(
400 repository.repository_id as i64,
401 ),
402 scan_id: ActiveValue::set(update.scan_id as i64),
403 is_deleted: ActiveValue::set(false),
404 repo_path: ActiveValue::set(repo_path),
405 status: ActiveValue::set(0),
406 status_kind: ActiveValue::set(status_kind),
407 first_status: ActiveValue::set(first_status),
408 second_status: ActiveValue::set(second_status),
409 }
410 })
411 },
412 ),
413 )
414 .on_conflict(
415 OnConflict::columns([
416 project_repository_statuses::Column::ProjectId,
417 project_repository_statuses::Column::RepositoryId,
418 project_repository_statuses::Column::RepoPath,
419 ])
420 .update_columns([
421 project_repository_statuses::Column::ScanId,
422 project_repository_statuses::Column::StatusKind,
423 project_repository_statuses::Column::FirstStatus,
424 project_repository_statuses::Column::SecondStatus,
425 ])
426 .to_owned(),
427 )
428 .exec(&*tx)
429 .await?;
430 }
431
432 for repo in &update.updated_repositories {
433 if !repo.removed_statuses.is_empty() {
434 project_repository_statuses::Entity::update_many()
435 .filter(
436 project_repository_statuses::Column::ProjectId
437 .eq(project_id)
438 .and(
439 project_repository_statuses::Column::RepositoryId
440 .eq(repo.repository_id),
441 )
442 .and(
443 project_repository_statuses::Column::RepoPath
444 .is_in(repo.removed_statuses.iter()),
445 ),
446 )
447 .set(project_repository_statuses::ActiveModel {
448 is_deleted: ActiveValue::Set(true),
449 scan_id: ActiveValue::Set(update.scan_id as i64),
450 ..Default::default()
451 })
452 .exec(&*tx)
453 .await?;
454 }
455 }
456 }
457
458 if !update.removed_repositories.is_empty() {
459 project_repository::Entity::update_many()
460 .filter(
461 project_repository::Column::ProjectId
462 .eq(project_id)
463 .and(project_repository::Column::LegacyWorktreeId.eq(worktree_id))
464 .and(project_repository::Column::Id.is_in(
465 update.removed_repositories.iter().map(|id| *id as i64),
466 )),
467 )
468 .set(project_repository::ActiveModel {
469 is_deleted: ActiveValue::Set(true),
470 scan_id: ActiveValue::Set(update.scan_id as i64),
471 ..Default::default()
472 })
473 .exec(&*tx)
474 .await?;
475 }
476 }
477
478 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
479 Ok(connection_ids)
480 })
481 .await
482 }
483
484 pub async fn update_repository(
485 &self,
486 update: &proto::UpdateRepository,
487 _connection: ConnectionId,
488 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
489 let project_id = ProjectId::from_proto(update.project_id);
490 let repository_id = update.id as i64;
491 self.project_transaction(project_id, |tx| async move {
492 project_repository::Entity::insert(project_repository::ActiveModel {
493 project_id: ActiveValue::set(project_id),
494 id: ActiveValue::set(repository_id),
495 legacy_worktree_id: ActiveValue::set(None),
496 abs_path: ActiveValue::set(update.abs_path.clone()),
497 entry_ids: ActiveValue::Set(serde_json::to_string(&update.entry_ids).unwrap()),
498 scan_id: ActiveValue::set(update.scan_id as i64),
499 is_deleted: ActiveValue::set(false),
500 branch_summary: ActiveValue::Set(
501 update
502 .branch_summary
503 .as_ref()
504 .map(|summary| serde_json::to_string(summary).unwrap()),
505 ),
506 head_commit_details: ActiveValue::Set(
507 update
508 .head_commit_details
509 .as_ref()
510 .map(|details| serde_json::to_string(details).unwrap()),
511 ),
512 current_merge_conflicts: ActiveValue::Set(Some(
513 serde_json::to_string(&update.current_merge_conflicts).unwrap(),
514 )),
515 merge_message: ActiveValue::set(update.merge_message.clone()),
516 remote_upstream_url: ActiveValue::set(update.remote_upstream_url.clone()),
517 remote_origin_url: ActiveValue::set(update.remote_origin_url.clone()),
518 })
519 .on_conflict(
520 OnConflict::columns([
521 project_repository::Column::ProjectId,
522 project_repository::Column::Id,
523 ])
524 .update_columns([
525 project_repository::Column::ScanId,
526 project_repository::Column::BranchSummary,
527 project_repository::Column::EntryIds,
528 project_repository::Column::AbsPath,
529 project_repository::Column::CurrentMergeConflicts,
530 project_repository::Column::HeadCommitDetails,
531 project_repository::Column::MergeMessage,
532 ])
533 .to_owned(),
534 )
535 .exec(&*tx)
536 .await?;
537
538 let has_any_statuses = !update.updated_statuses.is_empty();
539
540 if has_any_statuses {
541 project_repository_statuses::Entity::insert_many(
542 update.updated_statuses.iter().map(|status_entry| {
543 let (repo_path, status_kind, first_status, second_status) =
544 proto_status_to_db(status_entry.clone());
545 project_repository_statuses::ActiveModel {
546 project_id: ActiveValue::set(project_id),
547 repository_id: ActiveValue::set(repository_id),
548 scan_id: ActiveValue::set(update.scan_id as i64),
549 is_deleted: ActiveValue::set(false),
550 repo_path: ActiveValue::set(repo_path),
551 status: ActiveValue::set(0),
552 status_kind: ActiveValue::set(status_kind),
553 first_status: ActiveValue::set(first_status),
554 second_status: ActiveValue::set(second_status),
555 }
556 }),
557 )
558 .on_conflict(
559 OnConflict::columns([
560 project_repository_statuses::Column::ProjectId,
561 project_repository_statuses::Column::RepositoryId,
562 project_repository_statuses::Column::RepoPath,
563 ])
564 .update_columns([
565 project_repository_statuses::Column::ScanId,
566 project_repository_statuses::Column::StatusKind,
567 project_repository_statuses::Column::FirstStatus,
568 project_repository_statuses::Column::SecondStatus,
569 ])
570 .to_owned(),
571 )
572 .exec(&*tx)
573 .await?;
574 }
575
576 let has_any_removed_statuses = !update.removed_statuses.is_empty();
577
578 if has_any_removed_statuses {
579 project_repository_statuses::Entity::update_many()
580 .filter(
581 project_repository_statuses::Column::ProjectId
582 .eq(project_id)
583 .and(
584 project_repository_statuses::Column::RepositoryId.eq(repository_id),
585 )
586 .and(
587 project_repository_statuses::Column::RepoPath
588 .is_in(update.removed_statuses.iter()),
589 ),
590 )
591 .set(project_repository_statuses::ActiveModel {
592 is_deleted: ActiveValue::Set(true),
593 scan_id: ActiveValue::Set(update.scan_id as i64),
594 ..Default::default()
595 })
596 .exec(&*tx)
597 .await?;
598 }
599
600 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
601 Ok(connection_ids)
602 })
603 .await
604 }
605
606 pub async fn remove_repository(
607 &self,
608 remove: &proto::RemoveRepository,
609 _connection: ConnectionId,
610 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
611 let project_id = ProjectId::from_proto(remove.project_id);
612 let repository_id = remove.id as i64;
613 self.project_transaction(project_id, |tx| async move {
614 project_repository::Entity::update_many()
615 .filter(
616 project_repository::Column::ProjectId
617 .eq(project_id)
618 .and(project_repository::Column::Id.eq(repository_id)),
619 )
620 .set(project_repository::ActiveModel {
621 is_deleted: ActiveValue::Set(true),
622 // scan_id: ActiveValue::Set(update.scan_id as i64),
623 ..Default::default()
624 })
625 .exec(&*tx)
626 .await?;
627
628 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
629 Ok(connection_ids)
630 })
631 .await
632 }
633
634 /// Updates the diagnostic summary for the given connection.
635 pub async fn update_diagnostic_summary(
636 &self,
637 update: &proto::UpdateDiagnosticSummary,
638 connection: ConnectionId,
639 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
640 let project_id = ProjectId::from_proto(update.project_id);
641 let worktree_id = update.worktree_id as i64;
642 self.project_transaction(project_id, |tx| async move {
643 let summary = update.summary.as_ref().context("invalid summary")?;
644
645 // Ensure the update comes from the host.
646 let project = project::Entity::find_by_id(project_id)
647 .one(&*tx)
648 .await?
649 .context("no such project")?;
650 if project.host_connection()? != connection {
651 return Err(anyhow!("can't update a project hosted by someone else"))?;
652 }
653
654 // Update summary.
655 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
656 project_id: ActiveValue::set(project_id),
657 worktree_id: ActiveValue::set(worktree_id),
658 path: ActiveValue::set(summary.path.clone()),
659 language_server_id: ActiveValue::set(summary.language_server_id as i64),
660 error_count: ActiveValue::set(summary.error_count as i32),
661 warning_count: ActiveValue::set(summary.warning_count as i32),
662 })
663 .on_conflict(
664 OnConflict::columns([
665 worktree_diagnostic_summary::Column::ProjectId,
666 worktree_diagnostic_summary::Column::WorktreeId,
667 worktree_diagnostic_summary::Column::Path,
668 ])
669 .update_columns([
670 worktree_diagnostic_summary::Column::LanguageServerId,
671 worktree_diagnostic_summary::Column::ErrorCount,
672 worktree_diagnostic_summary::Column::WarningCount,
673 ])
674 .to_owned(),
675 )
676 .exec(&*tx)
677 .await?;
678
679 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
680 Ok(connection_ids)
681 })
682 .await
683 }
684
685 /// Starts the language server for the given connection.
686 pub async fn start_language_server(
687 &self,
688 update: &proto::StartLanguageServer,
689 connection: ConnectionId,
690 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
691 let project_id = ProjectId::from_proto(update.project_id);
692 self.project_transaction(project_id, |tx| async move {
693 let server = update.server.as_ref().context("invalid language server")?;
694
695 // Ensure the update comes from the host.
696 let project = project::Entity::find_by_id(project_id)
697 .one(&*tx)
698 .await?
699 .context("no such project")?;
700 if project.host_connection()? != connection {
701 return Err(anyhow!("can't update a project hosted by someone else"))?;
702 }
703
704 // Add the newly-started language server.
705 language_server::Entity::insert(language_server::ActiveModel {
706 project_id: ActiveValue::set(project_id),
707 id: ActiveValue::set(server.id as i64),
708 name: ActiveValue::set(server.name.clone()),
709 worktree_id: ActiveValue::set(server.worktree_id.map(|id| id as i64)),
710 capabilities: ActiveValue::set(update.capabilities.clone()),
711 })
712 .on_conflict(
713 OnConflict::columns([
714 language_server::Column::ProjectId,
715 language_server::Column::Id,
716 ])
717 .update_columns([
718 language_server::Column::Name,
719 language_server::Column::Capabilities,
720 language_server::Column::WorktreeId,
721 ])
722 .to_owned(),
723 )
724 .exec(&*tx)
725 .await?;
726
727 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
728 Ok(connection_ids)
729 })
730 .await
731 }
732
733 /// Updates the worktree settings for the given connection.
734 pub async fn update_worktree_settings(
735 &self,
736 update: &proto::UpdateWorktreeSettings,
737 connection: ConnectionId,
738 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
739 let project_id = ProjectId::from_proto(update.project_id);
740 let kind = match update.kind {
741 Some(kind) => proto::LocalSettingsKind::from_i32(kind)
742 .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
743 None => proto::LocalSettingsKind::Settings,
744 };
745 let kind = LocalSettingsKind::from_proto(kind);
746 self.project_transaction(project_id, |tx| async move {
747 // Ensure the update comes from the host.
748 let project = project::Entity::find_by_id(project_id)
749 .one(&*tx)
750 .await?
751 .context("no such project")?;
752 if project.host_connection()? != connection {
753 return Err(anyhow!("can't update a project hosted by someone else"))?;
754 }
755
756 if let Some(content) = &update.content {
757 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
758 project_id: ActiveValue::Set(project_id),
759 worktree_id: ActiveValue::Set(update.worktree_id as i64),
760 path: ActiveValue::Set(update.path.clone()),
761 content: ActiveValue::Set(content.clone()),
762 kind: ActiveValue::Set(kind),
763 outside_worktree: ActiveValue::Set(update.outside_worktree.unwrap_or(false)),
764 })
765 .on_conflict(
766 OnConflict::columns([
767 worktree_settings_file::Column::ProjectId,
768 worktree_settings_file::Column::WorktreeId,
769 worktree_settings_file::Column::Path,
770 ])
771 .update_columns([
772 worktree_settings_file::Column::Content,
773 worktree_settings_file::Column::OutsideWorktree,
774 ])
775 .to_owned(),
776 )
777 .exec(&*tx)
778 .await?;
779 } else {
780 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
781 project_id: ActiveValue::Set(project_id),
782 worktree_id: ActiveValue::Set(update.worktree_id as i64),
783 path: ActiveValue::Set(update.path.clone()),
784 ..Default::default()
785 })
786 .exec(&*tx)
787 .await?;
788 }
789
790 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
791 Ok(connection_ids)
792 })
793 .await
794 }
795
796 pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
797 self.transaction(|tx| async move {
798 Ok(project::Entity::find_by_id(id)
799 .one(&*tx)
800 .await?
801 .context("no such project")?)
802 })
803 .await
804 }
805
806 /// Adds the given connection to the specified project
807 /// in the current room.
808 pub async fn join_project(
809 &self,
810 project_id: ProjectId,
811 connection: ConnectionId,
812 user_id: UserId,
813 committer_name: Option<String>,
814 committer_email: Option<String>,
815 ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
816 self.project_transaction(project_id, move |tx| {
817 let committer_name = committer_name.clone();
818 let committer_email = committer_email.clone();
819 async move {
820 let (project, role) = self
821 .access_project(project_id, connection, Capability::ReadOnly, &tx)
822 .await?;
823 self.join_project_internal(
824 project,
825 user_id,
826 committer_name,
827 committer_email,
828 connection,
829 role,
830 &tx,
831 )
832 .await
833 }
834 })
835 .await
836 }
837
838 async fn join_project_internal(
839 &self,
840 project: project::Model,
841 user_id: UserId,
842 committer_name: Option<String>,
843 committer_email: Option<String>,
844 connection: ConnectionId,
845 role: ChannelRole,
846 tx: &DatabaseTransaction,
847 ) -> Result<(Project, ReplicaId)> {
848 let mut collaborators = project
849 .find_related(project_collaborator::Entity)
850 .all(tx)
851 .await?;
852 let replica_ids = collaborators
853 .iter()
854 .map(|c| c.replica_id)
855 .collect::<HashSet<_>>();
856 let mut replica_id = ReplicaId(clock::ReplicaId::FIRST_COLLAB_ID.as_u16() as i32);
857 while replica_ids.contains(&replica_id) {
858 replica_id.0 += 1;
859 }
860 let new_collaborator = project_collaborator::ActiveModel {
861 project_id: ActiveValue::set(project.id),
862 connection_id: ActiveValue::set(connection.id as i32),
863 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
864 user_id: ActiveValue::set(user_id),
865 replica_id: ActiveValue::set(replica_id),
866 is_host: ActiveValue::set(false),
867 id: ActiveValue::NotSet,
868 committer_name: ActiveValue::set(committer_name),
869 committer_email: ActiveValue::set(committer_email),
870 }
871 .insert(tx)
872 .await?;
873 collaborators.push(new_collaborator);
874
875 let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
876 let mut worktrees = db_worktrees
877 .into_iter()
878 .map(|db_worktree| {
879 (
880 db_worktree.id as u64,
881 Worktree {
882 id: db_worktree.id as u64,
883 abs_path: db_worktree.abs_path,
884 root_name: db_worktree.root_name,
885 visible: db_worktree.visible,
886 entries: Default::default(),
887 diagnostic_summaries: Default::default(),
888 settings_files: Default::default(),
889 scan_id: db_worktree.scan_id as u64,
890 completed_scan_id: db_worktree.completed_scan_id as u64,
891 legacy_repository_entries: Default::default(),
892 },
893 )
894 })
895 .collect::<BTreeMap<_, _>>();
896
897 // Populate worktree entries.
898 {
899 let mut db_entries = worktree_entry::Entity::find()
900 .filter(
901 Condition::all()
902 .add(worktree_entry::Column::ProjectId.eq(project.id))
903 .add(worktree_entry::Column::IsDeleted.eq(false)),
904 )
905 .stream(tx)
906 .await?;
907 while let Some(db_entry) = db_entries.next().await {
908 let db_entry = db_entry?;
909 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
910 worktree.entries.push(proto::Entry {
911 id: db_entry.id as u64,
912 is_dir: db_entry.is_dir,
913 path: db_entry.path,
914 inode: db_entry.inode as u64,
915 mtime: Some(proto::Timestamp {
916 seconds: db_entry.mtime_seconds as u64,
917 nanos: db_entry.mtime_nanos as u32,
918 }),
919 canonical_path: db_entry.canonical_path,
920 is_ignored: db_entry.is_ignored,
921 is_external: db_entry.is_external,
922 is_hidden: db_entry.is_hidden,
923 // This is only used in the summarization backlog, so if it's None,
924 // that just means we won't be able to detect when to resummarize
925 // based on total number of backlogged bytes - instead, we'd go
926 // on number of files only. That shouldn't be a huge deal in practice.
927 size: None,
928 is_fifo: db_entry.is_fifo,
929 });
930 }
931 }
932 }
933
934 // Populate repository entries.
935 let mut repositories = Vec::new();
936 {
937 let db_repository_entries = project_repository::Entity::find()
938 .filter(
939 Condition::all()
940 .add(project_repository::Column::ProjectId.eq(project.id))
941 .add(project_repository::Column::IsDeleted.eq(false)),
942 )
943 .all(tx)
944 .await?;
945 for db_repository_entry in db_repository_entries {
946 let mut repository_statuses = project_repository_statuses::Entity::find()
947 .filter(
948 Condition::all()
949 .add(project_repository_statuses::Column::ProjectId.eq(project.id))
950 .add(
951 project_repository_statuses::Column::RepositoryId
952 .eq(db_repository_entry.id),
953 )
954 .add(project_repository_statuses::Column::IsDeleted.eq(false)),
955 )
956 .stream(tx)
957 .await?;
958 let mut updated_statuses = Vec::new();
959 while let Some(status_entry) = repository_statuses.next().await {
960 let status_entry = status_entry?;
961 updated_statuses.push(db_status_to_proto(status_entry)?);
962 }
963
964 let current_merge_conflicts = db_repository_entry
965 .current_merge_conflicts
966 .as_ref()
967 .map(|conflicts| serde_json::from_str(conflicts))
968 .transpose()?
969 .unwrap_or_default();
970
971 let branch_summary = db_repository_entry
972 .branch_summary
973 .as_ref()
974 .map(|branch_summary| serde_json::from_str(branch_summary))
975 .transpose()?
976 .unwrap_or_default();
977
978 let head_commit_details = db_repository_entry
979 .head_commit_details
980 .as_ref()
981 .map(|head_commit_details| serde_json::from_str(head_commit_details))
982 .transpose()?
983 .unwrap_or_default();
984
985 let entry_ids = serde_json::from_str(&db_repository_entry.entry_ids)
986 .context("failed to deserialize repository's entry ids")?;
987
988 if let Some(worktree_id) = db_repository_entry.legacy_worktree_id {
989 if let Some(worktree) = worktrees.get_mut(&(worktree_id as u64)) {
990 worktree.legacy_repository_entries.insert(
991 db_repository_entry.id as u64,
992 proto::RepositoryEntry {
993 repository_id: db_repository_entry.id as u64,
994 updated_statuses,
995 removed_statuses: Vec::new(),
996 current_merge_conflicts,
997 branch_summary,
998 },
999 );
1000 }
1001 } else {
1002 repositories.push(proto::UpdateRepository {
1003 project_id: db_repository_entry.project_id.0 as u64,
1004 id: db_repository_entry.id as u64,
1005 abs_path: db_repository_entry.abs_path.clone(),
1006 entry_ids,
1007 updated_statuses,
1008 removed_statuses: Vec::new(),
1009 current_merge_conflicts,
1010 branch_summary,
1011 head_commit_details,
1012 scan_id: db_repository_entry.scan_id as u64,
1013 is_last_update: true,
1014 merge_message: db_repository_entry.merge_message,
1015 stash_entries: Vec::new(),
1016 remote_upstream_url: db_repository_entry.remote_upstream_url.clone(),
1017 remote_origin_url: db_repository_entry.remote_origin_url.clone(),
1018 original_repo_abs_path: Some(db_repository_entry.abs_path),
1019 });
1020 }
1021 }
1022 }
1023
1024 // Populate worktree diagnostic summaries.
1025 {
1026 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
1027 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
1028 .stream(tx)
1029 .await?;
1030 while let Some(db_summary) = db_summaries.next().await {
1031 let db_summary = db_summary?;
1032 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
1033 worktree
1034 .diagnostic_summaries
1035 .push(proto::DiagnosticSummary {
1036 path: db_summary.path,
1037 language_server_id: db_summary.language_server_id as u64,
1038 error_count: db_summary.error_count as u32,
1039 warning_count: db_summary.warning_count as u32,
1040 });
1041 }
1042 }
1043 }
1044
1045 // Populate worktree settings files
1046 {
1047 let mut db_settings_files = worktree_settings_file::Entity::find()
1048 .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
1049 .stream(tx)
1050 .await?;
1051 while let Some(db_settings_file) = db_settings_files.next().await {
1052 let db_settings_file = db_settings_file?;
1053 if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
1054 worktree.settings_files.push(WorktreeSettingsFile {
1055 path: db_settings_file.path,
1056 content: db_settings_file.content,
1057 kind: db_settings_file.kind,
1058 outside_worktree: db_settings_file.outside_worktree,
1059 });
1060 }
1061 }
1062 }
1063
1064 // Populate language servers.
1065 let language_servers = project
1066 .find_related(language_server::Entity)
1067 .all(tx)
1068 .await?;
1069
1070 let path_style = if project.windows_paths {
1071 PathStyle::Windows
1072 } else {
1073 PathStyle::Posix
1074 };
1075
1076 let project = Project {
1077 id: project.id,
1078 role,
1079 collaborators: collaborators
1080 .into_iter()
1081 .map(|collaborator| ProjectCollaborator {
1082 connection_id: collaborator.connection(),
1083 user_id: collaborator.user_id,
1084 replica_id: collaborator.replica_id,
1085 is_host: collaborator.is_host,
1086 committer_name: collaborator.committer_name,
1087 committer_email: collaborator.committer_email,
1088 })
1089 .collect(),
1090 worktrees,
1091 repositories,
1092 language_servers: language_servers
1093 .into_iter()
1094 .map(|language_server| LanguageServer {
1095 server: proto::LanguageServer {
1096 id: language_server.id as u64,
1097 name: language_server.name,
1098 worktree_id: language_server.worktree_id.map(|id| id as u64),
1099 },
1100 capabilities: language_server.capabilities,
1101 })
1102 .collect(),
1103 path_style,
1104 };
1105 Ok((project, replica_id as ReplicaId))
1106 }
1107
1108 /// Removes the given connection from the specified project.
1109 pub async fn leave_project(
1110 &self,
1111 project_id: ProjectId,
1112 connection: ConnectionId,
1113 ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
1114 self.project_transaction(project_id, |tx| async move {
1115 let result = project_collaborator::Entity::delete_many()
1116 .filter(
1117 Condition::all()
1118 .add(project_collaborator::Column::ProjectId.eq(project_id))
1119 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
1120 .add(
1121 project_collaborator::Column::ConnectionServerId
1122 .eq(connection.owner_id as i32),
1123 ),
1124 )
1125 .exec(&*tx)
1126 .await?;
1127 if result.rows_affected == 0 {
1128 Err(anyhow!("not a collaborator on this project"))?;
1129 }
1130
1131 let project = project::Entity::find_by_id(project_id)
1132 .one(&*tx)
1133 .await?
1134 .context("no such project")?;
1135 let collaborators = project
1136 .find_related(project_collaborator::Entity)
1137 .all(&*tx)
1138 .await?;
1139 let connection_ids: Vec<ConnectionId> = collaborators
1140 .into_iter()
1141 .map(|collaborator| collaborator.connection())
1142 .collect();
1143
1144 follower::Entity::delete_many()
1145 .filter(
1146 Condition::any()
1147 .add(
1148 Condition::all()
1149 .add(follower::Column::ProjectId.eq(Some(project_id)))
1150 .add(
1151 follower::Column::LeaderConnectionServerId
1152 .eq(connection.owner_id),
1153 )
1154 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
1155 )
1156 .add(
1157 Condition::all()
1158 .add(follower::Column::ProjectId.eq(Some(project_id)))
1159 .add(
1160 follower::Column::FollowerConnectionServerId
1161 .eq(connection.owner_id),
1162 )
1163 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
1164 ),
1165 )
1166 .exec(&*tx)
1167 .await?;
1168
1169 let room = if let Some(room_id) = project.room_id {
1170 Some(self.get_room(room_id, &tx).await?)
1171 } else {
1172 None
1173 };
1174
1175 let left_project = LeftProject {
1176 id: project_id,
1177 should_unshare: connection == project.host_connection()?,
1178 connection_ids,
1179 };
1180 Ok((room, left_project))
1181 })
1182 .await
1183 }
1184
1185 pub async fn check_user_is_project_host(
1186 &self,
1187 project_id: ProjectId,
1188 connection_id: ConnectionId,
1189 ) -> Result<()> {
1190 self.project_transaction(project_id, |tx| async move {
1191 project::Entity::find()
1192 .filter(
1193 Condition::all()
1194 .add(project::Column::Id.eq(project_id))
1195 .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
1196 .add(
1197 project::Column::HostConnectionServerId
1198 .eq(Some(connection_id.owner_id as i32)),
1199 ),
1200 )
1201 .one(&*tx)
1202 .await?
1203 .context("failed to read project host")?;
1204
1205 Ok(())
1206 })
1207 .await
1208 .map(|guard| guard.into_inner())
1209 }
1210
1211 /// Returns the current project if the given user is authorized to access it with the specified capability.
1212 pub async fn access_project(
1213 &self,
1214 project_id: ProjectId,
1215 connection_id: ConnectionId,
1216 capability: Capability,
1217 tx: &DatabaseTransaction,
1218 ) -> Result<(project::Model, ChannelRole)> {
1219 let project = project::Entity::find_by_id(project_id)
1220 .one(tx)
1221 .await?
1222 .context("no such project")?;
1223
1224 let role_from_room = if let Some(room_id) = project.room_id {
1225 room_participant::Entity::find()
1226 .filter(room_participant::Column::RoomId.eq(room_id))
1227 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1228 .one(tx)
1229 .await?
1230 .and_then(|participant| participant.role)
1231 } else {
1232 None
1233 };
1234
1235 let role = role_from_room.unwrap_or(ChannelRole::Banned);
1236
1237 match capability {
1238 Capability::ReadWrite => {
1239 if !role.can_edit_projects() {
1240 return Err(anyhow!("not authorized to edit projects"))?;
1241 }
1242 }
1243 Capability::ReadOnly => {
1244 if !role.can_read_projects() {
1245 return Err(anyhow!("not authorized to read projects"))?;
1246 }
1247 }
1248 }
1249
1250 Ok((project, role))
1251 }
1252
1253 /// Returns the host connection for a read-only request to join a shared project.
1254 pub async fn host_for_read_only_project_request(
1255 &self,
1256 project_id: ProjectId,
1257 connection_id: ConnectionId,
1258 ) -> Result<ConnectionId> {
1259 self.project_transaction(project_id, |tx| async move {
1260 let (project, _) = self
1261 .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1262 .await?;
1263 project.host_connection()
1264 })
1265 .await
1266 .map(|guard| guard.into_inner())
1267 }
1268
1269 /// Returns the host connection for a request to join a shared project.
1270 pub async fn host_for_mutating_project_request(
1271 &self,
1272 project_id: ProjectId,
1273 connection_id: ConnectionId,
1274 ) -> Result<ConnectionId> {
1275 self.project_transaction(project_id, |tx| async move {
1276 let (project, _) = self
1277 .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1278 .await?;
1279 project.host_connection()
1280 })
1281 .await
1282 .map(|guard| guard.into_inner())
1283 }
1284
1285 pub async fn connections_for_buffer_update(
1286 &self,
1287 project_id: ProjectId,
1288 connection_id: ConnectionId,
1289 capability: Capability,
1290 ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1291 self.project_transaction(project_id, |tx| async move {
1292 // Authorize
1293 let (project, _) = self
1294 .access_project(project_id, connection_id, capability, &tx)
1295 .await?;
1296
1297 let host_connection_id = project.host_connection()?;
1298
1299 let collaborators = project_collaborator::Entity::find()
1300 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1301 .all(&*tx)
1302 .await?;
1303
1304 let guest_connection_ids = collaborators
1305 .into_iter()
1306 .filter_map(|collaborator| {
1307 if collaborator.is_host {
1308 None
1309 } else {
1310 Some(collaborator.connection())
1311 }
1312 })
1313 .collect();
1314
1315 Ok((host_connection_id, guest_connection_ids))
1316 })
1317 .await
1318 }
1319
1320 /// Returns the connection IDs in the given project.
1321 ///
1322 /// The provided `connection_id` must also be a collaborator in the project,
1323 /// otherwise an error will be returned.
1324 pub async fn project_connection_ids(
1325 &self,
1326 project_id: ProjectId,
1327 connection_id: ConnectionId,
1328 exclude_dev_server: bool,
1329 ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1330 self.project_transaction(project_id, |tx| async move {
1331 self.internal_project_connection_ids(project_id, connection_id, exclude_dev_server, &tx)
1332 .await
1333 })
1334 .await
1335 }
1336
1337 async fn internal_project_connection_ids(
1338 &self,
1339 project_id: ProjectId,
1340 connection_id: ConnectionId,
1341 exclude_dev_server: bool,
1342 tx: &DatabaseTransaction,
1343 ) -> Result<HashSet<ConnectionId>> {
1344 let project = project::Entity::find_by_id(project_id)
1345 .one(tx)
1346 .await?
1347 .context("no such project")?;
1348
1349 let mut collaborators = project_collaborator::Entity::find()
1350 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1351 .stream(tx)
1352 .await?;
1353
1354 let mut connection_ids = HashSet::default();
1355 if let Some(host_connection) = project.host_connection().log_err()
1356 && !exclude_dev_server
1357 {
1358 connection_ids.insert(host_connection);
1359 }
1360
1361 while let Some(collaborator) = collaborators.next().await {
1362 let collaborator = collaborator?;
1363 connection_ids.insert(collaborator.connection());
1364 }
1365
1366 if connection_ids.contains(&connection_id)
1367 || Some(connection_id) == project.host_connection().ok()
1368 {
1369 Ok(connection_ids)
1370 } else {
1371 Err(anyhow!(
1372 "can only send project updates to a project you're in"
1373 ))?
1374 }
1375 }
1376
1377 async fn project_guest_connection_ids(
1378 &self,
1379 project_id: ProjectId,
1380 tx: &DatabaseTransaction,
1381 ) -> Result<Vec<ConnectionId>> {
1382 let mut collaborators = project_collaborator::Entity::find()
1383 .filter(
1384 project_collaborator::Column::ProjectId
1385 .eq(project_id)
1386 .and(project_collaborator::Column::IsHost.eq(false)),
1387 )
1388 .stream(tx)
1389 .await?;
1390
1391 let mut guest_connection_ids = Vec::new();
1392 while let Some(collaborator) = collaborators.next().await {
1393 let collaborator = collaborator?;
1394 guest_connection_ids.push(collaborator.connection());
1395 }
1396 Ok(guest_connection_ids)
1397 }
1398
1399 /// Returns the [`RoomId`] for the given project.
1400 pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1401 self.transaction(|tx| async move {
1402 Ok(project::Entity::find_by_id(project_id)
1403 .one(&*tx)
1404 .await?
1405 .and_then(|project| project.room_id))
1406 })
1407 .await
1408 }
1409
1410 pub async fn check_room_participants(
1411 &self,
1412 room_id: RoomId,
1413 leader_id: ConnectionId,
1414 follower_id: ConnectionId,
1415 ) -> Result<()> {
1416 self.transaction(|tx| async move {
1417 use room_participant::Column;
1418
1419 let count = room_participant::Entity::find()
1420 .filter(
1421 Condition::all().add(Column::RoomId.eq(room_id)).add(
1422 Condition::any()
1423 .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1424 Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1425 ))
1426 .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1427 Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1428 )),
1429 ),
1430 )
1431 .count(&*tx)
1432 .await?;
1433
1434 if count < 2 {
1435 Err(anyhow!("not room participants"))?;
1436 }
1437
1438 Ok(())
1439 })
1440 .await
1441 }
1442
1443 /// Adds the given follower connection as a follower of the given leader connection.
1444 pub async fn follow(
1445 &self,
1446 room_id: RoomId,
1447 project_id: ProjectId,
1448 leader_connection: ConnectionId,
1449 follower_connection: ConnectionId,
1450 ) -> Result<TransactionGuard<proto::Room>> {
1451 self.room_transaction(room_id, |tx| async move {
1452 follower::ActiveModel {
1453 room_id: ActiveValue::set(room_id),
1454 project_id: ActiveValue::set(project_id),
1455 leader_connection_server_id: ActiveValue::set(ServerId(
1456 leader_connection.owner_id as i32,
1457 )),
1458 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1459 follower_connection_server_id: ActiveValue::set(ServerId(
1460 follower_connection.owner_id as i32,
1461 )),
1462 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1463 ..Default::default()
1464 }
1465 .insert(&*tx)
1466 .await?;
1467
1468 let room = self.get_room(room_id, &tx).await?;
1469 Ok(room)
1470 })
1471 .await
1472 }
1473
1474 /// Removes the given follower connection as a follower of the given leader connection.
1475 pub async fn unfollow(
1476 &self,
1477 room_id: RoomId,
1478 project_id: ProjectId,
1479 leader_connection: ConnectionId,
1480 follower_connection: ConnectionId,
1481 ) -> Result<TransactionGuard<proto::Room>> {
1482 self.room_transaction(room_id, |tx| async move {
1483 follower::Entity::delete_many()
1484 .filter(
1485 Condition::all()
1486 .add(follower::Column::RoomId.eq(room_id))
1487 .add(follower::Column::ProjectId.eq(project_id))
1488 .add(
1489 follower::Column::LeaderConnectionServerId
1490 .eq(leader_connection.owner_id),
1491 )
1492 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1493 .add(
1494 follower::Column::FollowerConnectionServerId
1495 .eq(follower_connection.owner_id),
1496 )
1497 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1498 )
1499 .exec(&*tx)
1500 .await?;
1501
1502 let room = self.get_room(room_id, &tx).await?;
1503 Ok(room)
1504 })
1505 .await
1506 }
1507}