1use anyhow::Context as _;
2
3use util::ResultExt;
4
5use super::*;
6
7impl Database {
8 /// Returns the count of all projects, excluding ones marked as admin.
9 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
10 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
11 enum QueryAs {
12 Count,
13 }
14
15 self.transaction(|tx| async move {
16 Ok(project::Entity::find()
17 .select_only()
18 .column_as(project::Column::Id.count(), QueryAs::Count)
19 .inner_join(user::Entity)
20 .filter(user::Column::Admin.eq(false))
21 .into_values::<_, QueryAs>()
22 .one(&*tx)
23 .await?
24 .unwrap_or(0i64) as usize)
25 })
26 .await
27 }
28
29 /// Shares a project with the given room.
30 pub async fn share_project(
31 &self,
32 room_id: RoomId,
33 connection: ConnectionId,
34 worktrees: &[proto::WorktreeMetadata],
35 is_ssh_project: bool,
36 ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
37 self.room_transaction(room_id, |tx| async move {
38 let participant = room_participant::Entity::find()
39 .filter(
40 Condition::all()
41 .add(
42 room_participant::Column::AnsweringConnectionId
43 .eq(connection.id as i32),
44 )
45 .add(
46 room_participant::Column::AnsweringConnectionServerId
47 .eq(connection.owner_id as i32),
48 ),
49 )
50 .one(&*tx)
51 .await?
52 .ok_or_else(|| anyhow!("could not find participant"))?;
53 if participant.room_id != room_id {
54 return Err(anyhow!("shared project on unexpected room"))?;
55 }
56 if !participant
57 .role
58 .unwrap_or(ChannelRole::Member)
59 .can_edit_projects()
60 {
61 return Err(anyhow!("guests cannot share projects"))?;
62 }
63
64 let project = project::ActiveModel {
65 room_id: ActiveValue::set(Some(participant.room_id)),
66 host_user_id: ActiveValue::set(Some(participant.user_id)),
67 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
68 host_connection_server_id: ActiveValue::set(Some(ServerId(
69 connection.owner_id as i32,
70 ))),
71 id: ActiveValue::NotSet,
72 }
73 .insert(&*tx)
74 .await?;
75
76 if !worktrees.is_empty() {
77 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
78 worktree::ActiveModel {
79 id: ActiveValue::set(worktree.id as i64),
80 project_id: ActiveValue::set(project.id),
81 abs_path: ActiveValue::set(worktree.abs_path.clone()),
82 root_name: ActiveValue::set(worktree.root_name.clone()),
83 visible: ActiveValue::set(worktree.visible),
84 scan_id: ActiveValue::set(0),
85 completed_scan_id: ActiveValue::set(0),
86 }
87 }))
88 .exec(&*tx)
89 .await?;
90 }
91
92 let replica_id = if is_ssh_project { 1 } else { 0 };
93
94 project_collaborator::ActiveModel {
95 project_id: ActiveValue::set(project.id),
96 connection_id: ActiveValue::set(connection.id as i32),
97 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
98 user_id: ActiveValue::set(participant.user_id),
99 replica_id: ActiveValue::set(ReplicaId(replica_id)),
100 is_host: ActiveValue::set(true),
101 ..Default::default()
102 }
103 .insert(&*tx)
104 .await?;
105
106 let room = self.get_room(room_id, &tx).await?;
107 Ok((project.id, room))
108 })
109 .await
110 }
111
112 pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
113 self.weak_transaction(|tx| async move {
114 project::Entity::delete_by_id(project_id).exec(&*tx).await?;
115 Ok(())
116 })
117 .await
118 }
119
120 /// Unshares the given project.
121 pub async fn unshare_project(
122 &self,
123 project_id: ProjectId,
124 connection: ConnectionId,
125 ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
126 self.project_transaction(project_id, |tx| async move {
127 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
128 let project = project::Entity::find_by_id(project_id)
129 .one(&*tx)
130 .await?
131 .ok_or_else(|| anyhow!("project not found"))?;
132 let room = if let Some(room_id) = project.room_id {
133 Some(self.get_room(room_id, &tx).await?)
134 } else {
135 None
136 };
137 if project.host_connection()? == connection {
138 return Ok((true, room, guest_connection_ids));
139 }
140 Err(anyhow!("cannot unshare a project hosted by another user"))?
141 })
142 .await
143 }
144
145 /// Updates the worktrees associated with the given project.
146 pub async fn update_project(
147 &self,
148 project_id: ProjectId,
149 connection: ConnectionId,
150 worktrees: &[proto::WorktreeMetadata],
151 ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
152 self.project_transaction(project_id, |tx| async move {
153 let project = project::Entity::find_by_id(project_id)
154 .filter(
155 Condition::all()
156 .add(project::Column::HostConnectionId.eq(connection.id as i32))
157 .add(
158 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
159 ),
160 )
161 .one(&*tx)
162 .await?
163 .ok_or_else(|| anyhow!("no such project"))?;
164
165 self.update_project_worktrees(project.id, worktrees, &tx)
166 .await?;
167
168 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
169
170 let room = if let Some(room_id) = project.room_id {
171 Some(self.get_room(room_id, &tx).await?)
172 } else {
173 None
174 };
175
176 Ok((room, guest_connection_ids))
177 })
178 .await
179 }
180
181 pub(in crate::db) async fn update_project_worktrees(
182 &self,
183 project_id: ProjectId,
184 worktrees: &[proto::WorktreeMetadata],
185 tx: &DatabaseTransaction,
186 ) -> Result<()> {
187 if !worktrees.is_empty() {
188 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
189 id: ActiveValue::set(worktree.id as i64),
190 project_id: ActiveValue::set(project_id),
191 abs_path: ActiveValue::set(worktree.abs_path.clone()),
192 root_name: ActiveValue::set(worktree.root_name.clone()),
193 visible: ActiveValue::set(worktree.visible),
194 scan_id: ActiveValue::set(0),
195 completed_scan_id: ActiveValue::set(0),
196 }))
197 .on_conflict(
198 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
199 .update_column(worktree::Column::RootName)
200 .to_owned(),
201 )
202 .exec(tx)
203 .await?;
204 }
205
206 worktree::Entity::delete_many()
207 .filter(worktree::Column::ProjectId.eq(project_id).and(
208 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
209 ))
210 .exec(tx)
211 .await?;
212
213 Ok(())
214 }
215
216 pub async fn update_worktree(
217 &self,
218 update: &proto::UpdateWorktree,
219 connection: ConnectionId,
220 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
221 if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
222 || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
223 {
224 return Err(anyhow!(
225 "invalid worktree update. removed entries: {}, updated entries: {}",
226 update.removed_entries.len(),
227 update.updated_entries.len()
228 ))?;
229 }
230
231 let project_id = ProjectId::from_proto(update.project_id);
232 let worktree_id = update.worktree_id as i64;
233 self.project_transaction(project_id, |tx| async move {
234 // Ensure the update comes from the host.
235 let _project = project::Entity::find_by_id(project_id)
236 .filter(
237 Condition::all()
238 .add(project::Column::HostConnectionId.eq(connection.id as i32))
239 .add(
240 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
241 ),
242 )
243 .one(&*tx)
244 .await?
245 .ok_or_else(|| anyhow!("no such project: {project_id}"))?;
246
247 // Update metadata.
248 worktree::Entity::update(worktree::ActiveModel {
249 id: ActiveValue::set(worktree_id),
250 project_id: ActiveValue::set(project_id),
251 root_name: ActiveValue::set(update.root_name.clone()),
252 scan_id: ActiveValue::set(update.scan_id as i64),
253 completed_scan_id: if update.is_last_update {
254 ActiveValue::set(update.scan_id as i64)
255 } else {
256 ActiveValue::default()
257 },
258 abs_path: ActiveValue::set(update.abs_path.clone()),
259 ..Default::default()
260 })
261 .exec(&*tx)
262 .await?;
263
264 if !update.updated_entries.is_empty() {
265 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
266 let mtime = entry.mtime.clone().unwrap_or_default();
267 worktree_entry::ActiveModel {
268 project_id: ActiveValue::set(project_id),
269 worktree_id: ActiveValue::set(worktree_id),
270 id: ActiveValue::set(entry.id as i64),
271 is_dir: ActiveValue::set(entry.is_dir),
272 path: ActiveValue::set(entry.path.clone()),
273 inode: ActiveValue::set(entry.inode as i64),
274 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
275 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
276 canonical_path: ActiveValue::set(entry.canonical_path.clone()),
277 is_ignored: ActiveValue::set(entry.is_ignored),
278 git_status: ActiveValue::set(None),
279 is_external: ActiveValue::set(entry.is_external),
280 is_deleted: ActiveValue::set(false),
281 scan_id: ActiveValue::set(update.scan_id as i64),
282 is_fifo: ActiveValue::set(entry.is_fifo),
283 }
284 }))
285 .on_conflict(
286 OnConflict::columns([
287 worktree_entry::Column::ProjectId,
288 worktree_entry::Column::WorktreeId,
289 worktree_entry::Column::Id,
290 ])
291 .update_columns([
292 worktree_entry::Column::IsDir,
293 worktree_entry::Column::Path,
294 worktree_entry::Column::Inode,
295 worktree_entry::Column::MtimeSeconds,
296 worktree_entry::Column::MtimeNanos,
297 worktree_entry::Column::CanonicalPath,
298 worktree_entry::Column::IsIgnored,
299 worktree_entry::Column::ScanId,
300 ])
301 .to_owned(),
302 )
303 .exec(&*tx)
304 .await?;
305 }
306
307 if !update.removed_entries.is_empty() {
308 worktree_entry::Entity::update_many()
309 .filter(
310 worktree_entry::Column::ProjectId
311 .eq(project_id)
312 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
313 .and(
314 worktree_entry::Column::Id
315 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
316 ),
317 )
318 .set(worktree_entry::ActiveModel {
319 is_deleted: ActiveValue::Set(true),
320 scan_id: ActiveValue::Set(update.scan_id as i64),
321 ..Default::default()
322 })
323 .exec(&*tx)
324 .await?;
325 }
326
327 if !update.updated_repositories.is_empty() {
328 worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
329 |repository| worktree_repository::ActiveModel {
330 project_id: ActiveValue::set(project_id),
331 worktree_id: ActiveValue::set(worktree_id),
332 work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
333 scan_id: ActiveValue::set(update.scan_id as i64),
334 branch: ActiveValue::set(repository.branch.clone()),
335 is_deleted: ActiveValue::set(false),
336 current_merge_conflicts: ActiveValue::Set(Some(
337 serde_json::to_string(&repository.current_merge_conflicts).unwrap(),
338 )),
339 },
340 ))
341 .on_conflict(
342 OnConflict::columns([
343 worktree_repository::Column::ProjectId,
344 worktree_repository::Column::WorktreeId,
345 worktree_repository::Column::WorkDirectoryId,
346 ])
347 .update_columns([
348 worktree_repository::Column::ScanId,
349 worktree_repository::Column::Branch,
350 ])
351 .to_owned(),
352 )
353 .exec(&*tx)
354 .await?;
355
356 let has_any_statuses = update
357 .updated_repositories
358 .iter()
359 .any(|repository| !repository.updated_statuses.is_empty());
360
361 if has_any_statuses {
362 worktree_repository_statuses::Entity::insert_many(
363 update.updated_repositories.iter().flat_map(
364 |repository: &proto::RepositoryEntry| {
365 repository.updated_statuses.iter().map(|status_entry| {
366 let (repo_path, status_kind, first_status, second_status) =
367 proto_status_to_db(status_entry.clone());
368 worktree_repository_statuses::ActiveModel {
369 project_id: ActiveValue::set(project_id),
370 worktree_id: ActiveValue::set(worktree_id),
371 work_directory_id: ActiveValue::set(
372 repository.work_directory_id as i64,
373 ),
374 scan_id: ActiveValue::set(update.scan_id as i64),
375 is_deleted: ActiveValue::set(false),
376 repo_path: ActiveValue::set(repo_path),
377 status: ActiveValue::set(0),
378 status_kind: ActiveValue::set(status_kind),
379 first_status: ActiveValue::set(first_status),
380 second_status: ActiveValue::set(second_status),
381 }
382 })
383 },
384 ),
385 )
386 .on_conflict(
387 OnConflict::columns([
388 worktree_repository_statuses::Column::ProjectId,
389 worktree_repository_statuses::Column::WorktreeId,
390 worktree_repository_statuses::Column::WorkDirectoryId,
391 worktree_repository_statuses::Column::RepoPath,
392 ])
393 .update_columns([
394 worktree_repository_statuses::Column::ScanId,
395 worktree_repository_statuses::Column::StatusKind,
396 worktree_repository_statuses::Column::FirstStatus,
397 worktree_repository_statuses::Column::SecondStatus,
398 ])
399 .to_owned(),
400 )
401 .exec(&*tx)
402 .await?;
403 }
404
405 let has_any_removed_statuses = update
406 .updated_repositories
407 .iter()
408 .any(|repository| !repository.removed_statuses.is_empty());
409
410 if has_any_removed_statuses {
411 worktree_repository_statuses::Entity::update_many()
412 .filter(
413 worktree_repository_statuses::Column::ProjectId
414 .eq(project_id)
415 .and(
416 worktree_repository_statuses::Column::WorktreeId
417 .eq(worktree_id),
418 )
419 .and(
420 worktree_repository_statuses::Column::RepoPath.is_in(
421 update.updated_repositories.iter().flat_map(|repository| {
422 repository.removed_statuses.iter()
423 }),
424 ),
425 ),
426 )
427 .set(worktree_repository_statuses::ActiveModel {
428 is_deleted: ActiveValue::Set(true),
429 scan_id: ActiveValue::Set(update.scan_id as i64),
430 ..Default::default()
431 })
432 .exec(&*tx)
433 .await?;
434 }
435 }
436
437 if !update.removed_repositories.is_empty() {
438 worktree_repository::Entity::update_many()
439 .filter(
440 worktree_repository::Column::ProjectId
441 .eq(project_id)
442 .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
443 .and(
444 worktree_repository::Column::WorkDirectoryId
445 .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
446 ),
447 )
448 .set(worktree_repository::ActiveModel {
449 is_deleted: ActiveValue::Set(true),
450 scan_id: ActiveValue::Set(update.scan_id as i64),
451 ..Default::default()
452 })
453 .exec(&*tx)
454 .await?;
455 }
456
457 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
458 Ok(connection_ids)
459 })
460 .await
461 }
462
463 /// Updates the diagnostic summary for the given connection.
464 pub async fn update_diagnostic_summary(
465 &self,
466 update: &proto::UpdateDiagnosticSummary,
467 connection: ConnectionId,
468 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
469 let project_id = ProjectId::from_proto(update.project_id);
470 let worktree_id = update.worktree_id as i64;
471 self.project_transaction(project_id, |tx| async move {
472 let summary = update
473 .summary
474 .as_ref()
475 .ok_or_else(|| anyhow!("invalid summary"))?;
476
477 // Ensure the update comes from the host.
478 let project = project::Entity::find_by_id(project_id)
479 .one(&*tx)
480 .await?
481 .ok_or_else(|| anyhow!("no such project"))?;
482 if project.host_connection()? != connection {
483 return Err(anyhow!("can't update a project hosted by someone else"))?;
484 }
485
486 // Update summary.
487 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
488 project_id: ActiveValue::set(project_id),
489 worktree_id: ActiveValue::set(worktree_id),
490 path: ActiveValue::set(summary.path.clone()),
491 language_server_id: ActiveValue::set(summary.language_server_id as i64),
492 error_count: ActiveValue::set(summary.error_count as i32),
493 warning_count: ActiveValue::set(summary.warning_count as i32),
494 })
495 .on_conflict(
496 OnConflict::columns([
497 worktree_diagnostic_summary::Column::ProjectId,
498 worktree_diagnostic_summary::Column::WorktreeId,
499 worktree_diagnostic_summary::Column::Path,
500 ])
501 .update_columns([
502 worktree_diagnostic_summary::Column::LanguageServerId,
503 worktree_diagnostic_summary::Column::ErrorCount,
504 worktree_diagnostic_summary::Column::WarningCount,
505 ])
506 .to_owned(),
507 )
508 .exec(&*tx)
509 .await?;
510
511 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
512 Ok(connection_ids)
513 })
514 .await
515 }
516
517 /// Starts the language server for the given connection.
518 pub async fn start_language_server(
519 &self,
520 update: &proto::StartLanguageServer,
521 connection: ConnectionId,
522 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
523 let project_id = ProjectId::from_proto(update.project_id);
524 self.project_transaction(project_id, |tx| async move {
525 let server = update
526 .server
527 .as_ref()
528 .ok_or_else(|| anyhow!("invalid language server"))?;
529
530 // Ensure the update comes from the host.
531 let project = project::Entity::find_by_id(project_id)
532 .one(&*tx)
533 .await?
534 .ok_or_else(|| anyhow!("no such project"))?;
535 if project.host_connection()? != connection {
536 return Err(anyhow!("can't update a project hosted by someone else"))?;
537 }
538
539 // Add the newly-started language server.
540 language_server::Entity::insert(language_server::ActiveModel {
541 project_id: ActiveValue::set(project_id),
542 id: ActiveValue::set(server.id as i64),
543 name: ActiveValue::set(server.name.clone()),
544 })
545 .on_conflict(
546 OnConflict::columns([
547 language_server::Column::ProjectId,
548 language_server::Column::Id,
549 ])
550 .update_column(language_server::Column::Name)
551 .to_owned(),
552 )
553 .exec(&*tx)
554 .await?;
555
556 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
557 Ok(connection_ids)
558 })
559 .await
560 }
561
562 /// Updates the worktree settings for the given connection.
563 pub async fn update_worktree_settings(
564 &self,
565 update: &proto::UpdateWorktreeSettings,
566 connection: ConnectionId,
567 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
568 let project_id = ProjectId::from_proto(update.project_id);
569 let kind = match update.kind {
570 Some(kind) => proto::LocalSettingsKind::from_i32(kind)
571 .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
572 None => proto::LocalSettingsKind::Settings,
573 };
574 let kind = LocalSettingsKind::from_proto(kind);
575 self.project_transaction(project_id, |tx| async move {
576 // Ensure the update comes from the host.
577 let project = project::Entity::find_by_id(project_id)
578 .one(&*tx)
579 .await?
580 .ok_or_else(|| anyhow!("no such project"))?;
581 if project.host_connection()? != connection {
582 return Err(anyhow!("can't update a project hosted by someone else"))?;
583 }
584
585 if let Some(content) = &update.content {
586 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
587 project_id: ActiveValue::Set(project_id),
588 worktree_id: ActiveValue::Set(update.worktree_id as i64),
589 path: ActiveValue::Set(update.path.clone()),
590 content: ActiveValue::Set(content.clone()),
591 kind: ActiveValue::Set(kind),
592 })
593 .on_conflict(
594 OnConflict::columns([
595 worktree_settings_file::Column::ProjectId,
596 worktree_settings_file::Column::WorktreeId,
597 worktree_settings_file::Column::Path,
598 ])
599 .update_column(worktree_settings_file::Column::Content)
600 .to_owned(),
601 )
602 .exec(&*tx)
603 .await?;
604 } else {
605 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
606 project_id: ActiveValue::Set(project_id),
607 worktree_id: ActiveValue::Set(update.worktree_id as i64),
608 path: ActiveValue::Set(update.path.clone()),
609 ..Default::default()
610 })
611 .exec(&*tx)
612 .await?;
613 }
614
615 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
616 Ok(connection_ids)
617 })
618 .await
619 }
620
621 pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
622 self.transaction(|tx| async move {
623 Ok(project::Entity::find_by_id(id)
624 .one(&*tx)
625 .await?
626 .ok_or_else(|| anyhow!("no such project"))?)
627 })
628 .await
629 }
630
631 /// Adds the given connection to the specified project
632 /// in the current room.
633 pub async fn join_project(
634 &self,
635 project_id: ProjectId,
636 connection: ConnectionId,
637 user_id: UserId,
638 ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
639 self.project_transaction(project_id, |tx| async move {
640 let (project, role) = self
641 .access_project(project_id, connection, Capability::ReadOnly, &tx)
642 .await?;
643 self.join_project_internal(project, user_id, connection, role, &tx)
644 .await
645 })
646 .await
647 }
648
649 async fn join_project_internal(
650 &self,
651 project: project::Model,
652 user_id: UserId,
653 connection: ConnectionId,
654 role: ChannelRole,
655 tx: &DatabaseTransaction,
656 ) -> Result<(Project, ReplicaId)> {
657 let mut collaborators = project
658 .find_related(project_collaborator::Entity)
659 .all(tx)
660 .await?;
661 let replica_ids = collaborators
662 .iter()
663 .map(|c| c.replica_id)
664 .collect::<HashSet<_>>();
665 let mut replica_id = ReplicaId(1);
666 while replica_ids.contains(&replica_id) {
667 replica_id.0 += 1;
668 }
669 let new_collaborator = project_collaborator::ActiveModel {
670 project_id: ActiveValue::set(project.id),
671 connection_id: ActiveValue::set(connection.id as i32),
672 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
673 user_id: ActiveValue::set(user_id),
674 replica_id: ActiveValue::set(replica_id),
675 is_host: ActiveValue::set(false),
676 ..Default::default()
677 }
678 .insert(tx)
679 .await?;
680 collaborators.push(new_collaborator);
681
682 let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
683 let mut worktrees = db_worktrees
684 .into_iter()
685 .map(|db_worktree| {
686 (
687 db_worktree.id as u64,
688 Worktree {
689 id: db_worktree.id as u64,
690 abs_path: db_worktree.abs_path,
691 root_name: db_worktree.root_name,
692 visible: db_worktree.visible,
693 entries: Default::default(),
694 repository_entries: Default::default(),
695 diagnostic_summaries: Default::default(),
696 settings_files: Default::default(),
697 scan_id: db_worktree.scan_id as u64,
698 completed_scan_id: db_worktree.completed_scan_id as u64,
699 },
700 )
701 })
702 .collect::<BTreeMap<_, _>>();
703
704 // Populate worktree entries.
705 {
706 let mut db_entries = worktree_entry::Entity::find()
707 .filter(
708 Condition::all()
709 .add(worktree_entry::Column::ProjectId.eq(project.id))
710 .add(worktree_entry::Column::IsDeleted.eq(false)),
711 )
712 .stream(tx)
713 .await?;
714 while let Some(db_entry) = db_entries.next().await {
715 let db_entry = db_entry?;
716 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
717 worktree.entries.push(proto::Entry {
718 id: db_entry.id as u64,
719 is_dir: db_entry.is_dir,
720 path: db_entry.path,
721 inode: db_entry.inode as u64,
722 mtime: Some(proto::Timestamp {
723 seconds: db_entry.mtime_seconds as u64,
724 nanos: db_entry.mtime_nanos as u32,
725 }),
726 canonical_path: db_entry.canonical_path,
727 is_ignored: db_entry.is_ignored,
728 is_external: db_entry.is_external,
729 // This is only used in the summarization backlog, so if it's None,
730 // that just means we won't be able to detect when to resummarize
731 // based on total number of backlogged bytes - instead, we'd go
732 // on number of files only. That shouldn't be a huge deal in practice.
733 size: None,
734 is_fifo: db_entry.is_fifo,
735 });
736 }
737 }
738 }
739
740 // Populate repository entries.
741 {
742 let db_repository_entries = worktree_repository::Entity::find()
743 .filter(
744 Condition::all()
745 .add(worktree_repository::Column::ProjectId.eq(project.id))
746 .add(worktree_repository::Column::IsDeleted.eq(false)),
747 )
748 .all(tx)
749 .await?;
750 for db_repository_entry in db_repository_entries {
751 if let Some(worktree) = worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
752 {
753 let mut repository_statuses = worktree_repository_statuses::Entity::find()
754 .filter(
755 Condition::all()
756 .add(worktree_repository_statuses::Column::ProjectId.eq(project.id))
757 .add(
758 worktree_repository_statuses::Column::WorktreeId
759 .eq(worktree.id),
760 )
761 .add(
762 worktree_repository_statuses::Column::WorkDirectoryId
763 .eq(db_repository_entry.work_directory_id),
764 )
765 .add(worktree_repository_statuses::Column::IsDeleted.eq(false)),
766 )
767 .stream(tx)
768 .await?;
769 let mut updated_statuses = Vec::new();
770 while let Some(status_entry) = repository_statuses.next().await {
771 let status_entry: worktree_repository_statuses::Model = status_entry?;
772 updated_statuses.push(db_status_to_proto(status_entry)?);
773 }
774
775 let current_merge_conflicts = db_repository_entry
776 .current_merge_conflicts
777 .as_ref()
778 .map(|conflicts| serde_json::from_str(&conflicts))
779 .transpose()?
780 .unwrap_or_default();
781
782 worktree.repository_entries.insert(
783 db_repository_entry.work_directory_id as u64,
784 proto::RepositoryEntry {
785 work_directory_id: db_repository_entry.work_directory_id as u64,
786 branch: db_repository_entry.branch,
787 updated_statuses,
788 removed_statuses: Vec::new(),
789 current_merge_conflicts,
790 },
791 );
792 }
793 }
794 }
795
796 // Populate worktree diagnostic summaries.
797 {
798 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
799 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
800 .stream(tx)
801 .await?;
802 while let Some(db_summary) = db_summaries.next().await {
803 let db_summary = db_summary?;
804 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
805 worktree
806 .diagnostic_summaries
807 .push(proto::DiagnosticSummary {
808 path: db_summary.path,
809 language_server_id: db_summary.language_server_id as u64,
810 error_count: db_summary.error_count as u32,
811 warning_count: db_summary.warning_count as u32,
812 });
813 }
814 }
815 }
816
817 // Populate worktree settings files
818 {
819 let mut db_settings_files = worktree_settings_file::Entity::find()
820 .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
821 .stream(tx)
822 .await?;
823 while let Some(db_settings_file) = db_settings_files.next().await {
824 let db_settings_file = db_settings_file?;
825 if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
826 worktree.settings_files.push(WorktreeSettingsFile {
827 path: db_settings_file.path,
828 content: db_settings_file.content,
829 kind: db_settings_file.kind,
830 });
831 }
832 }
833 }
834
835 // Populate language servers.
836 let language_servers = project
837 .find_related(language_server::Entity)
838 .all(tx)
839 .await?;
840
841 let project = Project {
842 id: project.id,
843 role,
844 collaborators: collaborators
845 .into_iter()
846 .map(|collaborator| ProjectCollaborator {
847 connection_id: collaborator.connection(),
848 user_id: collaborator.user_id,
849 replica_id: collaborator.replica_id,
850 is_host: collaborator.is_host,
851 })
852 .collect(),
853 worktrees,
854 language_servers: language_servers
855 .into_iter()
856 .map(|language_server| proto::LanguageServer {
857 id: language_server.id as u64,
858 name: language_server.name,
859 worktree_id: None,
860 })
861 .collect(),
862 };
863 Ok((project, replica_id as ReplicaId))
864 }
865
866 /// Removes the given connection from the specified project.
867 pub async fn leave_project(
868 &self,
869 project_id: ProjectId,
870 connection: ConnectionId,
871 ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
872 self.project_transaction(project_id, |tx| async move {
873 let result = project_collaborator::Entity::delete_many()
874 .filter(
875 Condition::all()
876 .add(project_collaborator::Column::ProjectId.eq(project_id))
877 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
878 .add(
879 project_collaborator::Column::ConnectionServerId
880 .eq(connection.owner_id as i32),
881 ),
882 )
883 .exec(&*tx)
884 .await?;
885 if result.rows_affected == 0 {
886 Err(anyhow!("not a collaborator on this project"))?;
887 }
888
889 let project = project::Entity::find_by_id(project_id)
890 .one(&*tx)
891 .await?
892 .ok_or_else(|| anyhow!("no such project"))?;
893 let collaborators = project
894 .find_related(project_collaborator::Entity)
895 .all(&*tx)
896 .await?;
897 let connection_ids: Vec<ConnectionId> = collaborators
898 .into_iter()
899 .map(|collaborator| collaborator.connection())
900 .collect();
901
902 follower::Entity::delete_many()
903 .filter(
904 Condition::any()
905 .add(
906 Condition::all()
907 .add(follower::Column::ProjectId.eq(Some(project_id)))
908 .add(
909 follower::Column::LeaderConnectionServerId
910 .eq(connection.owner_id),
911 )
912 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
913 )
914 .add(
915 Condition::all()
916 .add(follower::Column::ProjectId.eq(Some(project_id)))
917 .add(
918 follower::Column::FollowerConnectionServerId
919 .eq(connection.owner_id),
920 )
921 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
922 ),
923 )
924 .exec(&*tx)
925 .await?;
926
927 let room = if let Some(room_id) = project.room_id {
928 Some(self.get_room(room_id, &tx).await?)
929 } else {
930 None
931 };
932
933 let left_project = LeftProject {
934 id: project_id,
935 should_unshare: connection == project.host_connection()?,
936 connection_ids,
937 };
938 Ok((room, left_project))
939 })
940 .await
941 }
942
943 pub async fn check_user_is_project_host(
944 &self,
945 project_id: ProjectId,
946 connection_id: ConnectionId,
947 ) -> Result<()> {
948 self.project_transaction(project_id, |tx| async move {
949 project::Entity::find()
950 .filter(
951 Condition::all()
952 .add(project::Column::Id.eq(project_id))
953 .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
954 .add(
955 project::Column::HostConnectionServerId
956 .eq(Some(connection_id.owner_id as i32)),
957 ),
958 )
959 .one(&*tx)
960 .await?
961 .ok_or_else(|| anyhow!("failed to read project host"))?;
962
963 Ok(())
964 })
965 .await
966 .map(|guard| guard.into_inner())
967 }
968
969 /// Returns the current project if the given user is authorized to access it with the specified capability.
970 pub async fn access_project(
971 &self,
972 project_id: ProjectId,
973 connection_id: ConnectionId,
974 capability: Capability,
975 tx: &DatabaseTransaction,
976 ) -> Result<(project::Model, ChannelRole)> {
977 let project = project::Entity::find_by_id(project_id)
978 .one(tx)
979 .await?
980 .ok_or_else(|| anyhow!("no such project"))?;
981
982 let role_from_room = if let Some(room_id) = project.room_id {
983 room_participant::Entity::find()
984 .filter(room_participant::Column::RoomId.eq(room_id))
985 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
986 .one(tx)
987 .await?
988 .and_then(|participant| participant.role)
989 } else {
990 None
991 };
992
993 let role = role_from_room.unwrap_or(ChannelRole::Banned);
994
995 match capability {
996 Capability::ReadWrite => {
997 if !role.can_edit_projects() {
998 return Err(anyhow!("not authorized to edit projects"))?;
999 }
1000 }
1001 Capability::ReadOnly => {
1002 if !role.can_read_projects() {
1003 return Err(anyhow!("not authorized to read projects"))?;
1004 }
1005 }
1006 }
1007
1008 Ok((project, role))
1009 }
1010
1011 /// Returns the host connection for a read-only request to join a shared project.
1012 pub async fn host_for_read_only_project_request(
1013 &self,
1014 project_id: ProjectId,
1015 connection_id: ConnectionId,
1016 ) -> Result<ConnectionId> {
1017 self.project_transaction(project_id, |tx| async move {
1018 let (project, _) = self
1019 .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1020 .await?;
1021 project.host_connection()
1022 })
1023 .await
1024 .map(|guard| guard.into_inner())
1025 }
1026
1027 /// Returns the host connection for a request to join a shared project.
1028 pub async fn host_for_mutating_project_request(
1029 &self,
1030 project_id: ProjectId,
1031 connection_id: ConnectionId,
1032 ) -> Result<ConnectionId> {
1033 self.project_transaction(project_id, |tx| async move {
1034 let (project, _) = self
1035 .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1036 .await?;
1037 project.host_connection()
1038 })
1039 .await
1040 .map(|guard| guard.into_inner())
1041 }
1042
1043 pub async fn connections_for_buffer_update(
1044 &self,
1045 project_id: ProjectId,
1046 connection_id: ConnectionId,
1047 capability: Capability,
1048 ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1049 self.project_transaction(project_id, |tx| async move {
1050 // Authorize
1051 let (project, _) = self
1052 .access_project(project_id, connection_id, capability, &tx)
1053 .await?;
1054
1055 let host_connection_id = project.host_connection()?;
1056
1057 let collaborators = project_collaborator::Entity::find()
1058 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1059 .all(&*tx)
1060 .await?;
1061
1062 let guest_connection_ids = collaborators
1063 .into_iter()
1064 .filter_map(|collaborator| {
1065 if collaborator.is_host {
1066 None
1067 } else {
1068 Some(collaborator.connection())
1069 }
1070 })
1071 .collect();
1072
1073 Ok((host_connection_id, guest_connection_ids))
1074 })
1075 .await
1076 }
1077
1078 /// Returns the connection IDs in the given project.
1079 ///
1080 /// The provided `connection_id` must also be a collaborator in the project,
1081 /// otherwise an error will be returned.
1082 pub async fn project_connection_ids(
1083 &self,
1084 project_id: ProjectId,
1085 connection_id: ConnectionId,
1086 exclude_dev_server: bool,
1087 ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1088 self.project_transaction(project_id, |tx| async move {
1089 let project = project::Entity::find_by_id(project_id)
1090 .one(&*tx)
1091 .await?
1092 .ok_or_else(|| anyhow!("no such project"))?;
1093
1094 let mut collaborators = project_collaborator::Entity::find()
1095 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1096 .stream(&*tx)
1097 .await?;
1098
1099 let mut connection_ids = HashSet::default();
1100 if let Some(host_connection) = project.host_connection().log_err() {
1101 if !exclude_dev_server {
1102 connection_ids.insert(host_connection);
1103 }
1104 }
1105
1106 while let Some(collaborator) = collaborators.next().await {
1107 let collaborator = collaborator?;
1108 connection_ids.insert(collaborator.connection());
1109 }
1110
1111 if connection_ids.contains(&connection_id)
1112 || Some(connection_id) == project.host_connection().ok()
1113 {
1114 Ok(connection_ids)
1115 } else {
1116 Err(anyhow!(
1117 "can only send project updates to a project you're in"
1118 ))?
1119 }
1120 })
1121 .await
1122 }
1123
1124 async fn project_guest_connection_ids(
1125 &self,
1126 project_id: ProjectId,
1127 tx: &DatabaseTransaction,
1128 ) -> Result<Vec<ConnectionId>> {
1129 let mut collaborators = project_collaborator::Entity::find()
1130 .filter(
1131 project_collaborator::Column::ProjectId
1132 .eq(project_id)
1133 .and(project_collaborator::Column::IsHost.eq(false)),
1134 )
1135 .stream(tx)
1136 .await?;
1137
1138 let mut guest_connection_ids = Vec::new();
1139 while let Some(collaborator) = collaborators.next().await {
1140 let collaborator = collaborator?;
1141 guest_connection_ids.push(collaborator.connection());
1142 }
1143 Ok(guest_connection_ids)
1144 }
1145
1146 /// Returns the [`RoomId`] for the given project.
1147 pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1148 self.transaction(|tx| async move {
1149 Ok(project::Entity::find_by_id(project_id)
1150 .one(&*tx)
1151 .await?
1152 .and_then(|project| project.room_id))
1153 })
1154 .await
1155 }
1156
1157 pub async fn check_room_participants(
1158 &self,
1159 room_id: RoomId,
1160 leader_id: ConnectionId,
1161 follower_id: ConnectionId,
1162 ) -> Result<()> {
1163 self.transaction(|tx| async move {
1164 use room_participant::Column;
1165
1166 let count = room_participant::Entity::find()
1167 .filter(
1168 Condition::all().add(Column::RoomId.eq(room_id)).add(
1169 Condition::any()
1170 .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1171 Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1172 ))
1173 .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1174 Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1175 )),
1176 ),
1177 )
1178 .count(&*tx)
1179 .await?;
1180
1181 if count < 2 {
1182 Err(anyhow!("not room participants"))?;
1183 }
1184
1185 Ok(())
1186 })
1187 .await
1188 }
1189
1190 /// Adds the given follower connection as a follower of the given leader connection.
1191 pub async fn follow(
1192 &self,
1193 room_id: RoomId,
1194 project_id: ProjectId,
1195 leader_connection: ConnectionId,
1196 follower_connection: ConnectionId,
1197 ) -> Result<TransactionGuard<proto::Room>> {
1198 self.room_transaction(room_id, |tx| async move {
1199 follower::ActiveModel {
1200 room_id: ActiveValue::set(room_id),
1201 project_id: ActiveValue::set(project_id),
1202 leader_connection_server_id: ActiveValue::set(ServerId(
1203 leader_connection.owner_id as i32,
1204 )),
1205 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1206 follower_connection_server_id: ActiveValue::set(ServerId(
1207 follower_connection.owner_id as i32,
1208 )),
1209 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1210 ..Default::default()
1211 }
1212 .insert(&*tx)
1213 .await?;
1214
1215 let room = self.get_room(room_id, &tx).await?;
1216 Ok(room)
1217 })
1218 .await
1219 }
1220
1221 /// Removes the given follower connection as a follower of the given leader connection.
1222 pub async fn unfollow(
1223 &self,
1224 room_id: RoomId,
1225 project_id: ProjectId,
1226 leader_connection: ConnectionId,
1227 follower_connection: ConnectionId,
1228 ) -> Result<TransactionGuard<proto::Room>> {
1229 self.room_transaction(room_id, |tx| async move {
1230 follower::Entity::delete_many()
1231 .filter(
1232 Condition::all()
1233 .add(follower::Column::RoomId.eq(room_id))
1234 .add(follower::Column::ProjectId.eq(project_id))
1235 .add(
1236 follower::Column::LeaderConnectionServerId
1237 .eq(leader_connection.owner_id),
1238 )
1239 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1240 .add(
1241 follower::Column::FollowerConnectionServerId
1242 .eq(follower_connection.owner_id),
1243 )
1244 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1245 )
1246 .exec(&*tx)
1247 .await?;
1248
1249 let room = self.get_room(room_id, &tx).await?;
1250 Ok(room)
1251 })
1252 .await
1253 }
1254}