1use anyhow::Context as _;
2use collections::HashSet;
3use util::ResultExt;
4
5use super::*;
6
7impl Database {
8 /// Returns the count of all projects, excluding ones marked as admin.
9 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
10 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
11 enum QueryAs {
12 Count,
13 }
14
15 self.transaction(|tx| async move {
16 Ok(project::Entity::find()
17 .select_only()
18 .column_as(project::Column::Id.count(), QueryAs::Count)
19 .inner_join(user::Entity)
20 .filter(user::Column::Admin.eq(false))
21 .into_values::<_, QueryAs>()
22 .one(&*tx)
23 .await?
24 .unwrap_or(0i64) as usize)
25 })
26 .await
27 }
28
29 /// Shares a project with the given room.
30 pub async fn share_project(
31 &self,
32 room_id: RoomId,
33 connection: ConnectionId,
34 worktrees: &[proto::WorktreeMetadata],
35 is_ssh_project: bool,
36 ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
37 self.room_transaction(room_id, |tx| async move {
38 let participant = room_participant::Entity::find()
39 .filter(
40 Condition::all()
41 .add(
42 room_participant::Column::AnsweringConnectionId
43 .eq(connection.id as i32),
44 )
45 .add(
46 room_participant::Column::AnsweringConnectionServerId
47 .eq(connection.owner_id as i32),
48 ),
49 )
50 .one(&*tx)
51 .await?
52 .ok_or_else(|| anyhow!("could not find participant"))?;
53 if participant.room_id != room_id {
54 return Err(anyhow!("shared project on unexpected room"))?;
55 }
56 if !participant
57 .role
58 .unwrap_or(ChannelRole::Member)
59 .can_edit_projects()
60 {
61 return Err(anyhow!("guests cannot share projects"))?;
62 }
63
64 let project = project::ActiveModel {
65 room_id: ActiveValue::set(Some(participant.room_id)),
66 host_user_id: ActiveValue::set(Some(participant.user_id)),
67 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
68 host_connection_server_id: ActiveValue::set(Some(ServerId(
69 connection.owner_id as i32,
70 ))),
71 id: ActiveValue::NotSet,
72 }
73 .insert(&*tx)
74 .await?;
75
76 if !worktrees.is_empty() {
77 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
78 worktree::ActiveModel {
79 id: ActiveValue::set(worktree.id as i64),
80 project_id: ActiveValue::set(project.id),
81 abs_path: ActiveValue::set(worktree.abs_path.clone()),
82 root_name: ActiveValue::set(worktree.root_name.clone()),
83 visible: ActiveValue::set(worktree.visible),
84 scan_id: ActiveValue::set(0),
85 completed_scan_id: ActiveValue::set(0),
86 }
87 }))
88 .exec(&*tx)
89 .await?;
90 }
91
92 let replica_id = if is_ssh_project { 1 } else { 0 };
93
94 project_collaborator::ActiveModel {
95 project_id: ActiveValue::set(project.id),
96 connection_id: ActiveValue::set(connection.id as i32),
97 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
98 user_id: ActiveValue::set(participant.user_id),
99 replica_id: ActiveValue::set(ReplicaId(replica_id)),
100 is_host: ActiveValue::set(true),
101 ..Default::default()
102 }
103 .insert(&*tx)
104 .await?;
105
106 let room = self.get_room(room_id, &tx).await?;
107 Ok((project.id, room))
108 })
109 .await
110 }
111
112 pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
113 self.weak_transaction(|tx| async move {
114 project::Entity::delete_by_id(project_id).exec(&*tx).await?;
115 Ok(())
116 })
117 .await
118 }
119
120 /// Unshares the given project.
121 pub async fn unshare_project(
122 &self,
123 project_id: ProjectId,
124 connection: ConnectionId,
125 ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
126 self.project_transaction(project_id, |tx| async move {
127 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
128 let project = project::Entity::find_by_id(project_id)
129 .one(&*tx)
130 .await?
131 .ok_or_else(|| anyhow!("project not found"))?;
132 let room = if let Some(room_id) = project.room_id {
133 Some(self.get_room(room_id, &tx).await?)
134 } else {
135 None
136 };
137 if project.host_connection()? == connection {
138 return Ok((true, room, guest_connection_ids));
139 }
140 Err(anyhow!("cannot unshare a project hosted by another user"))?
141 })
142 .await
143 }
144
145 /// Updates the worktrees associated with the given project.
146 pub async fn update_project(
147 &self,
148 project_id: ProjectId,
149 connection: ConnectionId,
150 worktrees: &[proto::WorktreeMetadata],
151 ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
152 self.project_transaction(project_id, |tx| async move {
153 let project = project::Entity::find_by_id(project_id)
154 .filter(
155 Condition::all()
156 .add(project::Column::HostConnectionId.eq(connection.id as i32))
157 .add(
158 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
159 ),
160 )
161 .one(&*tx)
162 .await?
163 .ok_or_else(|| anyhow!("no such project"))?;
164
165 self.update_project_worktrees(project.id, worktrees, &tx)
166 .await?;
167
168 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
169
170 let room = if let Some(room_id) = project.room_id {
171 Some(self.get_room(room_id, &tx).await?)
172 } else {
173 None
174 };
175
176 Ok((room, guest_connection_ids))
177 })
178 .await
179 }
180
181 pub(in crate::db) async fn update_project_worktrees(
182 &self,
183 project_id: ProjectId,
184 worktrees: &[proto::WorktreeMetadata],
185 tx: &DatabaseTransaction,
186 ) -> Result<()> {
187 if !worktrees.is_empty() {
188 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
189 id: ActiveValue::set(worktree.id as i64),
190 project_id: ActiveValue::set(project_id),
191 abs_path: ActiveValue::set(worktree.abs_path.clone()),
192 root_name: ActiveValue::set(worktree.root_name.clone()),
193 visible: ActiveValue::set(worktree.visible),
194 scan_id: ActiveValue::set(0),
195 completed_scan_id: ActiveValue::set(0),
196 }))
197 .on_conflict(
198 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
199 .update_column(worktree::Column::RootName)
200 .to_owned(),
201 )
202 .exec(tx)
203 .await?;
204 }
205
206 worktree::Entity::delete_many()
207 .filter(worktree::Column::ProjectId.eq(project_id).and(
208 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
209 ))
210 .exec(tx)
211 .await?;
212
213 Ok(())
214 }
215
216 pub async fn update_worktree(
217 &self,
218 update: &proto::UpdateWorktree,
219 connection: ConnectionId,
220 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
221 if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
222 || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
223 {
224 return Err(anyhow!(
225 "invalid worktree update. removed entries: {}, updated entries: {}",
226 update.removed_entries.len(),
227 update.updated_entries.len()
228 ))?;
229 }
230
231 let project_id = ProjectId::from_proto(update.project_id);
232 let worktree_id = update.worktree_id as i64;
233 self.project_transaction(project_id, |tx| async move {
234 // Ensure the update comes from the host.
235 let _project = project::Entity::find_by_id(project_id)
236 .filter(
237 Condition::all()
238 .add(project::Column::HostConnectionId.eq(connection.id as i32))
239 .add(
240 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
241 ),
242 )
243 .one(&*tx)
244 .await?
245 .ok_or_else(|| anyhow!("no such project: {project_id}"))?;
246
247 // Update metadata.
248 worktree::Entity::update(worktree::ActiveModel {
249 id: ActiveValue::set(worktree_id),
250 project_id: ActiveValue::set(project_id),
251 root_name: ActiveValue::set(update.root_name.clone()),
252 scan_id: ActiveValue::set(update.scan_id as i64),
253 completed_scan_id: if update.is_last_update {
254 ActiveValue::set(update.scan_id as i64)
255 } else {
256 ActiveValue::default()
257 },
258 abs_path: ActiveValue::set(update.abs_path.clone()),
259 ..Default::default()
260 })
261 .exec(&*tx)
262 .await?;
263
264 if !update.updated_entries.is_empty() {
265 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
266 let mtime = entry.mtime.clone().unwrap_or_default();
267 worktree_entry::ActiveModel {
268 project_id: ActiveValue::set(project_id),
269 worktree_id: ActiveValue::set(worktree_id),
270 id: ActiveValue::set(entry.id as i64),
271 is_dir: ActiveValue::set(entry.is_dir),
272 path: ActiveValue::set(entry.path.clone()),
273 inode: ActiveValue::set(entry.inode as i64),
274 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
275 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
276 canonical_path: ActiveValue::set(entry.canonical_path.clone()),
277 is_ignored: ActiveValue::set(entry.is_ignored),
278 git_status: ActiveValue::set(None),
279 is_external: ActiveValue::set(entry.is_external),
280 is_deleted: ActiveValue::set(false),
281 scan_id: ActiveValue::set(update.scan_id as i64),
282 is_fifo: ActiveValue::set(entry.is_fifo),
283 }
284 }))
285 .on_conflict(
286 OnConflict::columns([
287 worktree_entry::Column::ProjectId,
288 worktree_entry::Column::WorktreeId,
289 worktree_entry::Column::Id,
290 ])
291 .update_columns([
292 worktree_entry::Column::IsDir,
293 worktree_entry::Column::Path,
294 worktree_entry::Column::Inode,
295 worktree_entry::Column::MtimeSeconds,
296 worktree_entry::Column::MtimeNanos,
297 worktree_entry::Column::CanonicalPath,
298 worktree_entry::Column::IsIgnored,
299 worktree_entry::Column::ScanId,
300 ])
301 .to_owned(),
302 )
303 .exec(&*tx)
304 .await?;
305 }
306
307 if !update.removed_entries.is_empty() {
308 worktree_entry::Entity::update_many()
309 .filter(
310 worktree_entry::Column::ProjectId
311 .eq(project_id)
312 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
313 .and(
314 worktree_entry::Column::Id
315 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
316 ),
317 )
318 .set(worktree_entry::ActiveModel {
319 is_deleted: ActiveValue::Set(true),
320 scan_id: ActiveValue::Set(update.scan_id as i64),
321 ..Default::default()
322 })
323 .exec(&*tx)
324 .await?;
325 }
326
327 if !update.updated_repositories.is_empty() {
328 worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
329 |repository| {
330 worktree_repository::ActiveModel {
331 project_id: ActiveValue::set(project_id),
332 worktree_id: ActiveValue::set(worktree_id),
333 work_directory_id: ActiveValue::set(
334 repository.work_directory_id as i64,
335 ),
336 scan_id: ActiveValue::set(update.scan_id as i64),
337 branch: ActiveValue::set(repository.branch.clone()),
338 is_deleted: ActiveValue::set(false),
339 branch_summary: ActiveValue::Set(
340 repository
341 .branch_summary
342 .as_ref()
343 .map(|summary| serde_json::to_string(summary).unwrap()),
344 ),
345 current_merge_conflicts: ActiveValue::Set(Some(
346 serde_json::to_string(&repository.current_merge_conflicts).unwrap(),
347 )),
348 }
349 },
350 ))
351 .on_conflict(
352 OnConflict::columns([
353 worktree_repository::Column::ProjectId,
354 worktree_repository::Column::WorktreeId,
355 worktree_repository::Column::WorkDirectoryId,
356 ])
357 .update_columns([
358 worktree_repository::Column::ScanId,
359 worktree_repository::Column::Branch,
360 worktree_repository::Column::BranchSummary,
361 worktree_repository::Column::CurrentMergeConflicts,
362 ])
363 .to_owned(),
364 )
365 .exec(&*tx)
366 .await?;
367
368 let has_any_statuses = update
369 .updated_repositories
370 .iter()
371 .any(|repository| !repository.updated_statuses.is_empty());
372
373 if has_any_statuses {
374 worktree_repository_statuses::Entity::insert_many(
375 update.updated_repositories.iter().flat_map(
376 |repository: &proto::RepositoryEntry| {
377 repository.updated_statuses.iter().map(|status_entry| {
378 let (repo_path, status_kind, first_status, second_status) =
379 proto_status_to_db(status_entry.clone());
380 worktree_repository_statuses::ActiveModel {
381 project_id: ActiveValue::set(project_id),
382 worktree_id: ActiveValue::set(worktree_id),
383 work_directory_id: ActiveValue::set(
384 repository.work_directory_id as i64,
385 ),
386 scan_id: ActiveValue::set(update.scan_id as i64),
387 is_deleted: ActiveValue::set(false),
388 repo_path: ActiveValue::set(repo_path),
389 status: ActiveValue::set(0),
390 status_kind: ActiveValue::set(status_kind),
391 first_status: ActiveValue::set(first_status),
392 second_status: ActiveValue::set(second_status),
393 }
394 })
395 },
396 ),
397 )
398 .on_conflict(
399 OnConflict::columns([
400 worktree_repository_statuses::Column::ProjectId,
401 worktree_repository_statuses::Column::WorktreeId,
402 worktree_repository_statuses::Column::WorkDirectoryId,
403 worktree_repository_statuses::Column::RepoPath,
404 ])
405 .update_columns([
406 worktree_repository_statuses::Column::ScanId,
407 worktree_repository_statuses::Column::StatusKind,
408 worktree_repository_statuses::Column::FirstStatus,
409 worktree_repository_statuses::Column::SecondStatus,
410 ])
411 .to_owned(),
412 )
413 .exec(&*tx)
414 .await?;
415 }
416
417 let has_any_removed_statuses = update
418 .updated_repositories
419 .iter()
420 .any(|repository| !repository.removed_statuses.is_empty());
421
422 if has_any_removed_statuses {
423 worktree_repository_statuses::Entity::update_many()
424 .filter(
425 worktree_repository_statuses::Column::ProjectId
426 .eq(project_id)
427 .and(
428 worktree_repository_statuses::Column::WorktreeId
429 .eq(worktree_id),
430 )
431 .and(
432 worktree_repository_statuses::Column::RepoPath.is_in(
433 update.updated_repositories.iter().flat_map(|repository| {
434 repository.removed_statuses.iter()
435 }),
436 ),
437 ),
438 )
439 .set(worktree_repository_statuses::ActiveModel {
440 is_deleted: ActiveValue::Set(true),
441 scan_id: ActiveValue::Set(update.scan_id as i64),
442 ..Default::default()
443 })
444 .exec(&*tx)
445 .await?;
446 }
447 }
448
449 if !update.removed_repositories.is_empty() {
450 worktree_repository::Entity::update_many()
451 .filter(
452 worktree_repository::Column::ProjectId
453 .eq(project_id)
454 .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
455 .and(
456 worktree_repository::Column::WorkDirectoryId
457 .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
458 ),
459 )
460 .set(worktree_repository::ActiveModel {
461 is_deleted: ActiveValue::Set(true),
462 scan_id: ActiveValue::Set(update.scan_id as i64),
463 ..Default::default()
464 })
465 .exec(&*tx)
466 .await?;
467 }
468
469 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
470 Ok(connection_ids)
471 })
472 .await
473 }
474
475 /// Updates the diagnostic summary for the given connection.
476 pub async fn update_diagnostic_summary(
477 &self,
478 update: &proto::UpdateDiagnosticSummary,
479 connection: ConnectionId,
480 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
481 let project_id = ProjectId::from_proto(update.project_id);
482 let worktree_id = update.worktree_id as i64;
483 self.project_transaction(project_id, |tx| async move {
484 let summary = update
485 .summary
486 .as_ref()
487 .ok_or_else(|| anyhow!("invalid summary"))?;
488
489 // Ensure the update comes from the host.
490 let project = project::Entity::find_by_id(project_id)
491 .one(&*tx)
492 .await?
493 .ok_or_else(|| anyhow!("no such project"))?;
494 if project.host_connection()? != connection {
495 return Err(anyhow!("can't update a project hosted by someone else"))?;
496 }
497
498 // Update summary.
499 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
500 project_id: ActiveValue::set(project_id),
501 worktree_id: ActiveValue::set(worktree_id),
502 path: ActiveValue::set(summary.path.clone()),
503 language_server_id: ActiveValue::set(summary.language_server_id as i64),
504 error_count: ActiveValue::set(summary.error_count as i32),
505 warning_count: ActiveValue::set(summary.warning_count as i32),
506 })
507 .on_conflict(
508 OnConflict::columns([
509 worktree_diagnostic_summary::Column::ProjectId,
510 worktree_diagnostic_summary::Column::WorktreeId,
511 worktree_diagnostic_summary::Column::Path,
512 ])
513 .update_columns([
514 worktree_diagnostic_summary::Column::LanguageServerId,
515 worktree_diagnostic_summary::Column::ErrorCount,
516 worktree_diagnostic_summary::Column::WarningCount,
517 ])
518 .to_owned(),
519 )
520 .exec(&*tx)
521 .await?;
522
523 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
524 Ok(connection_ids)
525 })
526 .await
527 }
528
529 /// Starts the language server for the given connection.
530 pub async fn start_language_server(
531 &self,
532 update: &proto::StartLanguageServer,
533 connection: ConnectionId,
534 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
535 let project_id = ProjectId::from_proto(update.project_id);
536 self.project_transaction(project_id, |tx| async move {
537 let server = update
538 .server
539 .as_ref()
540 .ok_or_else(|| anyhow!("invalid language server"))?;
541
542 // Ensure the update comes from the host.
543 let project = project::Entity::find_by_id(project_id)
544 .one(&*tx)
545 .await?
546 .ok_or_else(|| anyhow!("no such project"))?;
547 if project.host_connection()? != connection {
548 return Err(anyhow!("can't update a project hosted by someone else"))?;
549 }
550
551 // Add the newly-started language server.
552 language_server::Entity::insert(language_server::ActiveModel {
553 project_id: ActiveValue::set(project_id),
554 id: ActiveValue::set(server.id as i64),
555 name: ActiveValue::set(server.name.clone()),
556 })
557 .on_conflict(
558 OnConflict::columns([
559 language_server::Column::ProjectId,
560 language_server::Column::Id,
561 ])
562 .update_column(language_server::Column::Name)
563 .to_owned(),
564 )
565 .exec(&*tx)
566 .await?;
567
568 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
569 Ok(connection_ids)
570 })
571 .await
572 }
573
574 /// Updates the worktree settings for the given connection.
575 pub async fn update_worktree_settings(
576 &self,
577 update: &proto::UpdateWorktreeSettings,
578 connection: ConnectionId,
579 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
580 let project_id = ProjectId::from_proto(update.project_id);
581 let kind = match update.kind {
582 Some(kind) => proto::LocalSettingsKind::from_i32(kind)
583 .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
584 None => proto::LocalSettingsKind::Settings,
585 };
586 let kind = LocalSettingsKind::from_proto(kind);
587 self.project_transaction(project_id, |tx| async move {
588 // Ensure the update comes from the host.
589 let project = project::Entity::find_by_id(project_id)
590 .one(&*tx)
591 .await?
592 .ok_or_else(|| anyhow!("no such project"))?;
593 if project.host_connection()? != connection {
594 return Err(anyhow!("can't update a project hosted by someone else"))?;
595 }
596
597 if let Some(content) = &update.content {
598 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
599 project_id: ActiveValue::Set(project_id),
600 worktree_id: ActiveValue::Set(update.worktree_id as i64),
601 path: ActiveValue::Set(update.path.clone()),
602 content: ActiveValue::Set(content.clone()),
603 kind: ActiveValue::Set(kind),
604 })
605 .on_conflict(
606 OnConflict::columns([
607 worktree_settings_file::Column::ProjectId,
608 worktree_settings_file::Column::WorktreeId,
609 worktree_settings_file::Column::Path,
610 ])
611 .update_column(worktree_settings_file::Column::Content)
612 .to_owned(),
613 )
614 .exec(&*tx)
615 .await?;
616 } else {
617 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
618 project_id: ActiveValue::Set(project_id),
619 worktree_id: ActiveValue::Set(update.worktree_id as i64),
620 path: ActiveValue::Set(update.path.clone()),
621 ..Default::default()
622 })
623 .exec(&*tx)
624 .await?;
625 }
626
627 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
628 Ok(connection_ids)
629 })
630 .await
631 }
632
633 pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
634 self.transaction(|tx| async move {
635 Ok(project::Entity::find_by_id(id)
636 .one(&*tx)
637 .await?
638 .ok_or_else(|| anyhow!("no such project"))?)
639 })
640 .await
641 }
642
643 /// Adds the given connection to the specified project
644 /// in the current room.
645 pub async fn join_project(
646 &self,
647 project_id: ProjectId,
648 connection: ConnectionId,
649 user_id: UserId,
650 ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
651 self.project_transaction(project_id, |tx| async move {
652 let (project, role) = self
653 .access_project(project_id, connection, Capability::ReadOnly, &tx)
654 .await?;
655 self.join_project_internal(project, user_id, connection, role, &tx)
656 .await
657 })
658 .await
659 }
660
661 async fn join_project_internal(
662 &self,
663 project: project::Model,
664 user_id: UserId,
665 connection: ConnectionId,
666 role: ChannelRole,
667 tx: &DatabaseTransaction,
668 ) -> Result<(Project, ReplicaId)> {
669 let mut collaborators = project
670 .find_related(project_collaborator::Entity)
671 .all(tx)
672 .await?;
673 let replica_ids = collaborators
674 .iter()
675 .map(|c| c.replica_id)
676 .collect::<HashSet<_>>();
677 let mut replica_id = ReplicaId(1);
678 while replica_ids.contains(&replica_id) {
679 replica_id.0 += 1;
680 }
681 let new_collaborator = project_collaborator::ActiveModel {
682 project_id: ActiveValue::set(project.id),
683 connection_id: ActiveValue::set(connection.id as i32),
684 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
685 user_id: ActiveValue::set(user_id),
686 replica_id: ActiveValue::set(replica_id),
687 is_host: ActiveValue::set(false),
688 ..Default::default()
689 }
690 .insert(tx)
691 .await?;
692 collaborators.push(new_collaborator);
693
694 let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
695 let mut worktrees = db_worktrees
696 .into_iter()
697 .map(|db_worktree| {
698 (
699 db_worktree.id as u64,
700 Worktree {
701 id: db_worktree.id as u64,
702 abs_path: db_worktree.abs_path,
703 root_name: db_worktree.root_name,
704 visible: db_worktree.visible,
705 entries: Default::default(),
706 repository_entries: Default::default(),
707 diagnostic_summaries: Default::default(),
708 settings_files: Default::default(),
709 scan_id: db_worktree.scan_id as u64,
710 completed_scan_id: db_worktree.completed_scan_id as u64,
711 },
712 )
713 })
714 .collect::<BTreeMap<_, _>>();
715
716 // Populate worktree entries.
717 {
718 let mut db_entries = worktree_entry::Entity::find()
719 .filter(
720 Condition::all()
721 .add(worktree_entry::Column::ProjectId.eq(project.id))
722 .add(worktree_entry::Column::IsDeleted.eq(false)),
723 )
724 .stream(tx)
725 .await?;
726 while let Some(db_entry) = db_entries.next().await {
727 let db_entry = db_entry?;
728 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
729 worktree.entries.push(proto::Entry {
730 id: db_entry.id as u64,
731 is_dir: db_entry.is_dir,
732 path: db_entry.path,
733 inode: db_entry.inode as u64,
734 mtime: Some(proto::Timestamp {
735 seconds: db_entry.mtime_seconds as u64,
736 nanos: db_entry.mtime_nanos as u32,
737 }),
738 canonical_path: db_entry.canonical_path,
739 is_ignored: db_entry.is_ignored,
740 is_external: db_entry.is_external,
741 // This is only used in the summarization backlog, so if it's None,
742 // that just means we won't be able to detect when to resummarize
743 // based on total number of backlogged bytes - instead, we'd go
744 // on number of files only. That shouldn't be a huge deal in practice.
745 size: None,
746 is_fifo: db_entry.is_fifo,
747 });
748 }
749 }
750 }
751
752 // Populate repository entries.
753 {
754 let db_repository_entries = worktree_repository::Entity::find()
755 .filter(
756 Condition::all()
757 .add(worktree_repository::Column::ProjectId.eq(project.id))
758 .add(worktree_repository::Column::IsDeleted.eq(false)),
759 )
760 .all(tx)
761 .await?;
762 for db_repository_entry in db_repository_entries {
763 if let Some(worktree) = worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
764 {
765 let mut repository_statuses = worktree_repository_statuses::Entity::find()
766 .filter(
767 Condition::all()
768 .add(worktree_repository_statuses::Column::ProjectId.eq(project.id))
769 .add(
770 worktree_repository_statuses::Column::WorktreeId
771 .eq(worktree.id),
772 )
773 .add(
774 worktree_repository_statuses::Column::WorkDirectoryId
775 .eq(db_repository_entry.work_directory_id),
776 )
777 .add(worktree_repository_statuses::Column::IsDeleted.eq(false)),
778 )
779 .stream(tx)
780 .await?;
781 let mut updated_statuses = Vec::new();
782 while let Some(status_entry) = repository_statuses.next().await {
783 let status_entry: worktree_repository_statuses::Model = status_entry?;
784 updated_statuses.push(db_status_to_proto(status_entry)?);
785 }
786
787 let current_merge_conflicts = db_repository_entry
788 .current_merge_conflicts
789 .as_ref()
790 .map(|conflicts| serde_json::from_str(&conflicts))
791 .transpose()?
792 .unwrap_or_default();
793
794 let branch_summary = db_repository_entry
795 .branch_summary
796 .as_ref()
797 .map(|branch_summary| serde_json::from_str(&branch_summary))
798 .transpose()?
799 .unwrap_or_default();
800
801 worktree.repository_entries.insert(
802 db_repository_entry.work_directory_id as u64,
803 proto::RepositoryEntry {
804 work_directory_id: db_repository_entry.work_directory_id as u64,
805 branch: db_repository_entry.branch,
806 updated_statuses,
807 removed_statuses: Vec::new(),
808 current_merge_conflicts,
809 branch_summary,
810 },
811 );
812 }
813 }
814 }
815
816 // Populate worktree diagnostic summaries.
817 {
818 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
819 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
820 .stream(tx)
821 .await?;
822 while let Some(db_summary) = db_summaries.next().await {
823 let db_summary = db_summary?;
824 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
825 worktree
826 .diagnostic_summaries
827 .push(proto::DiagnosticSummary {
828 path: db_summary.path,
829 language_server_id: db_summary.language_server_id as u64,
830 error_count: db_summary.error_count as u32,
831 warning_count: db_summary.warning_count as u32,
832 });
833 }
834 }
835 }
836
837 // Populate worktree settings files
838 {
839 let mut db_settings_files = worktree_settings_file::Entity::find()
840 .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
841 .stream(tx)
842 .await?;
843 while let Some(db_settings_file) = db_settings_files.next().await {
844 let db_settings_file = db_settings_file?;
845 if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
846 worktree.settings_files.push(WorktreeSettingsFile {
847 path: db_settings_file.path,
848 content: db_settings_file.content,
849 kind: db_settings_file.kind,
850 });
851 }
852 }
853 }
854
855 // Populate language servers.
856 let language_servers = project
857 .find_related(language_server::Entity)
858 .all(tx)
859 .await?;
860
861 let project = Project {
862 id: project.id,
863 role,
864 collaborators: collaborators
865 .into_iter()
866 .map(|collaborator| ProjectCollaborator {
867 connection_id: collaborator.connection(),
868 user_id: collaborator.user_id,
869 replica_id: collaborator.replica_id,
870 is_host: collaborator.is_host,
871 })
872 .collect(),
873 worktrees,
874 language_servers: language_servers
875 .into_iter()
876 .map(|language_server| proto::LanguageServer {
877 id: language_server.id as u64,
878 name: language_server.name,
879 worktree_id: None,
880 })
881 .collect(),
882 };
883 Ok((project, replica_id as ReplicaId))
884 }
885
886 /// Removes the given connection from the specified project.
887 pub async fn leave_project(
888 &self,
889 project_id: ProjectId,
890 connection: ConnectionId,
891 ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
892 self.project_transaction(project_id, |tx| async move {
893 let result = project_collaborator::Entity::delete_many()
894 .filter(
895 Condition::all()
896 .add(project_collaborator::Column::ProjectId.eq(project_id))
897 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
898 .add(
899 project_collaborator::Column::ConnectionServerId
900 .eq(connection.owner_id as i32),
901 ),
902 )
903 .exec(&*tx)
904 .await?;
905 if result.rows_affected == 0 {
906 Err(anyhow!("not a collaborator on this project"))?;
907 }
908
909 let project = project::Entity::find_by_id(project_id)
910 .one(&*tx)
911 .await?
912 .ok_or_else(|| anyhow!("no such project"))?;
913 let collaborators = project
914 .find_related(project_collaborator::Entity)
915 .all(&*tx)
916 .await?;
917 let connection_ids: Vec<ConnectionId> = collaborators
918 .into_iter()
919 .map(|collaborator| collaborator.connection())
920 .collect();
921
922 follower::Entity::delete_many()
923 .filter(
924 Condition::any()
925 .add(
926 Condition::all()
927 .add(follower::Column::ProjectId.eq(Some(project_id)))
928 .add(
929 follower::Column::LeaderConnectionServerId
930 .eq(connection.owner_id),
931 )
932 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
933 )
934 .add(
935 Condition::all()
936 .add(follower::Column::ProjectId.eq(Some(project_id)))
937 .add(
938 follower::Column::FollowerConnectionServerId
939 .eq(connection.owner_id),
940 )
941 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
942 ),
943 )
944 .exec(&*tx)
945 .await?;
946
947 let room = if let Some(room_id) = project.room_id {
948 Some(self.get_room(room_id, &tx).await?)
949 } else {
950 None
951 };
952
953 let left_project = LeftProject {
954 id: project_id,
955 should_unshare: connection == project.host_connection()?,
956 connection_ids,
957 };
958 Ok((room, left_project))
959 })
960 .await
961 }
962
963 pub async fn check_user_is_project_host(
964 &self,
965 project_id: ProjectId,
966 connection_id: ConnectionId,
967 ) -> Result<()> {
968 self.project_transaction(project_id, |tx| async move {
969 project::Entity::find()
970 .filter(
971 Condition::all()
972 .add(project::Column::Id.eq(project_id))
973 .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
974 .add(
975 project::Column::HostConnectionServerId
976 .eq(Some(connection_id.owner_id as i32)),
977 ),
978 )
979 .one(&*tx)
980 .await?
981 .ok_or_else(|| anyhow!("failed to read project host"))?;
982
983 Ok(())
984 })
985 .await
986 .map(|guard| guard.into_inner())
987 }
988
989 /// Returns the current project if the given user is authorized to access it with the specified capability.
990 pub async fn access_project(
991 &self,
992 project_id: ProjectId,
993 connection_id: ConnectionId,
994 capability: Capability,
995 tx: &DatabaseTransaction,
996 ) -> Result<(project::Model, ChannelRole)> {
997 let project = project::Entity::find_by_id(project_id)
998 .one(tx)
999 .await?
1000 .ok_or_else(|| anyhow!("no such project"))?;
1001
1002 let role_from_room = if let Some(room_id) = project.room_id {
1003 room_participant::Entity::find()
1004 .filter(room_participant::Column::RoomId.eq(room_id))
1005 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1006 .one(tx)
1007 .await?
1008 .and_then(|participant| participant.role)
1009 } else {
1010 None
1011 };
1012
1013 let role = role_from_room.unwrap_or(ChannelRole::Banned);
1014
1015 match capability {
1016 Capability::ReadWrite => {
1017 if !role.can_edit_projects() {
1018 return Err(anyhow!("not authorized to edit projects"))?;
1019 }
1020 }
1021 Capability::ReadOnly => {
1022 if !role.can_read_projects() {
1023 return Err(anyhow!("not authorized to read projects"))?;
1024 }
1025 }
1026 }
1027
1028 Ok((project, role))
1029 }
1030
1031 /// Returns the host connection for a read-only request to join a shared project.
1032 pub async fn host_for_read_only_project_request(
1033 &self,
1034 project_id: ProjectId,
1035 connection_id: ConnectionId,
1036 ) -> Result<ConnectionId> {
1037 self.project_transaction(project_id, |tx| async move {
1038 let (project, _) = self
1039 .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1040 .await?;
1041 project.host_connection()
1042 })
1043 .await
1044 .map(|guard| guard.into_inner())
1045 }
1046
1047 /// Returns the host connection for a request to join a shared project.
1048 pub async fn host_for_mutating_project_request(
1049 &self,
1050 project_id: ProjectId,
1051 connection_id: ConnectionId,
1052 ) -> Result<ConnectionId> {
1053 self.project_transaction(project_id, |tx| async move {
1054 let (project, _) = self
1055 .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1056 .await?;
1057 project.host_connection()
1058 })
1059 .await
1060 .map(|guard| guard.into_inner())
1061 }
1062
1063 pub async fn connections_for_buffer_update(
1064 &self,
1065 project_id: ProjectId,
1066 connection_id: ConnectionId,
1067 capability: Capability,
1068 ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1069 self.project_transaction(project_id, |tx| async move {
1070 // Authorize
1071 let (project, _) = self
1072 .access_project(project_id, connection_id, capability, &tx)
1073 .await?;
1074
1075 let host_connection_id = project.host_connection()?;
1076
1077 let collaborators = project_collaborator::Entity::find()
1078 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1079 .all(&*tx)
1080 .await?;
1081
1082 let guest_connection_ids = collaborators
1083 .into_iter()
1084 .filter_map(|collaborator| {
1085 if collaborator.is_host {
1086 None
1087 } else {
1088 Some(collaborator.connection())
1089 }
1090 })
1091 .collect();
1092
1093 Ok((host_connection_id, guest_connection_ids))
1094 })
1095 .await
1096 }
1097
1098 /// Returns the connection IDs in the given project.
1099 ///
1100 /// The provided `connection_id` must also be a collaborator in the project,
1101 /// otherwise an error will be returned.
1102 pub async fn project_connection_ids(
1103 &self,
1104 project_id: ProjectId,
1105 connection_id: ConnectionId,
1106 exclude_dev_server: bool,
1107 ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1108 self.project_transaction(project_id, |tx| async move {
1109 self.internal_project_connection_ids(project_id, connection_id, exclude_dev_server, &tx)
1110 .await
1111 })
1112 .await
1113 }
1114
1115 async fn internal_project_connection_ids(
1116 &self,
1117 project_id: ProjectId,
1118 connection_id: ConnectionId,
1119 exclude_dev_server: bool,
1120 tx: &DatabaseTransaction,
1121 ) -> Result<HashSet<ConnectionId>> {
1122 let project = project::Entity::find_by_id(project_id)
1123 .one(tx)
1124 .await?
1125 .ok_or_else(|| anyhow!("no such project"))?;
1126
1127 let mut collaborators = project_collaborator::Entity::find()
1128 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1129 .stream(tx)
1130 .await?;
1131
1132 let mut connection_ids = HashSet::default();
1133 if let Some(host_connection) = project.host_connection().log_err() {
1134 if !exclude_dev_server {
1135 connection_ids.insert(host_connection);
1136 }
1137 }
1138
1139 while let Some(collaborator) = collaborators.next().await {
1140 let collaborator = collaborator?;
1141 connection_ids.insert(collaborator.connection());
1142 }
1143
1144 if connection_ids.contains(&connection_id)
1145 || Some(connection_id) == project.host_connection().ok()
1146 {
1147 Ok(connection_ids)
1148 } else {
1149 Err(anyhow!(
1150 "can only send project updates to a project you're in"
1151 ))?
1152 }
1153 }
1154
1155 async fn project_guest_connection_ids(
1156 &self,
1157 project_id: ProjectId,
1158 tx: &DatabaseTransaction,
1159 ) -> Result<Vec<ConnectionId>> {
1160 let mut collaborators = project_collaborator::Entity::find()
1161 .filter(
1162 project_collaborator::Column::ProjectId
1163 .eq(project_id)
1164 .and(project_collaborator::Column::IsHost.eq(false)),
1165 )
1166 .stream(tx)
1167 .await?;
1168
1169 let mut guest_connection_ids = Vec::new();
1170 while let Some(collaborator) = collaborators.next().await {
1171 let collaborator = collaborator?;
1172 guest_connection_ids.push(collaborator.connection());
1173 }
1174 Ok(guest_connection_ids)
1175 }
1176
1177 /// Returns the [`RoomId`] for the given project.
1178 pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1179 self.transaction(|tx| async move {
1180 Ok(project::Entity::find_by_id(project_id)
1181 .one(&*tx)
1182 .await?
1183 .and_then(|project| project.room_id))
1184 })
1185 .await
1186 }
1187
1188 pub async fn check_room_participants(
1189 &self,
1190 room_id: RoomId,
1191 leader_id: ConnectionId,
1192 follower_id: ConnectionId,
1193 ) -> Result<()> {
1194 self.transaction(|tx| async move {
1195 use room_participant::Column;
1196
1197 let count = room_participant::Entity::find()
1198 .filter(
1199 Condition::all().add(Column::RoomId.eq(room_id)).add(
1200 Condition::any()
1201 .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1202 Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1203 ))
1204 .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1205 Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1206 )),
1207 ),
1208 )
1209 .count(&*tx)
1210 .await?;
1211
1212 if count < 2 {
1213 Err(anyhow!("not room participants"))?;
1214 }
1215
1216 Ok(())
1217 })
1218 .await
1219 }
1220
1221 /// Adds the given follower connection as a follower of the given leader connection.
1222 pub async fn follow(
1223 &self,
1224 room_id: RoomId,
1225 project_id: ProjectId,
1226 leader_connection: ConnectionId,
1227 follower_connection: ConnectionId,
1228 ) -> Result<TransactionGuard<proto::Room>> {
1229 self.room_transaction(room_id, |tx| async move {
1230 follower::ActiveModel {
1231 room_id: ActiveValue::set(room_id),
1232 project_id: ActiveValue::set(project_id),
1233 leader_connection_server_id: ActiveValue::set(ServerId(
1234 leader_connection.owner_id as i32,
1235 )),
1236 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1237 follower_connection_server_id: ActiveValue::set(ServerId(
1238 follower_connection.owner_id as i32,
1239 )),
1240 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1241 ..Default::default()
1242 }
1243 .insert(&*tx)
1244 .await?;
1245
1246 let room = self.get_room(room_id, &tx).await?;
1247 Ok(room)
1248 })
1249 .await
1250 }
1251
1252 /// Removes the given follower connection as a follower of the given leader connection.
1253 pub async fn unfollow(
1254 &self,
1255 room_id: RoomId,
1256 project_id: ProjectId,
1257 leader_connection: ConnectionId,
1258 follower_connection: ConnectionId,
1259 ) -> Result<TransactionGuard<proto::Room>> {
1260 self.room_transaction(room_id, |tx| async move {
1261 follower::Entity::delete_many()
1262 .filter(
1263 Condition::all()
1264 .add(follower::Column::RoomId.eq(room_id))
1265 .add(follower::Column::ProjectId.eq(project_id))
1266 .add(
1267 follower::Column::LeaderConnectionServerId
1268 .eq(leader_connection.owner_id),
1269 )
1270 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1271 .add(
1272 follower::Column::FollowerConnectionServerId
1273 .eq(follower_connection.owner_id),
1274 )
1275 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1276 )
1277 .exec(&*tx)
1278 .await?;
1279
1280 let room = self.get_room(room_id, &tx).await?;
1281 Ok(room)
1282 })
1283 .await
1284 }
1285}