1use anyhow::Context as _;
2
3use util::ResultExt;
4
5use super::*;
6
7impl Database {
8 /// Returns the count of all projects, excluding ones marked as admin.
9 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
10 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
11 enum QueryAs {
12 Count,
13 }
14
15 self.transaction(|tx| async move {
16 Ok(project::Entity::find()
17 .select_only()
18 .column_as(project::Column::Id.count(), QueryAs::Count)
19 .inner_join(user::Entity)
20 .filter(user::Column::Admin.eq(false))
21 .into_values::<_, QueryAs>()
22 .one(&*tx)
23 .await?
24 .unwrap_or(0i64) as usize)
25 })
26 .await
27 }
28
29 /// Shares a project with the given room.
30 pub async fn share_project(
31 &self,
32 room_id: RoomId,
33 connection: ConnectionId,
34 worktrees: &[proto::WorktreeMetadata],
35 is_ssh_project: bool,
36 ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
37 self.room_transaction(room_id, |tx| async move {
38 let participant = room_participant::Entity::find()
39 .filter(
40 Condition::all()
41 .add(
42 room_participant::Column::AnsweringConnectionId
43 .eq(connection.id as i32),
44 )
45 .add(
46 room_participant::Column::AnsweringConnectionServerId
47 .eq(connection.owner_id as i32),
48 ),
49 )
50 .one(&*tx)
51 .await?
52 .ok_or_else(|| anyhow!("could not find participant"))?;
53 if participant.room_id != room_id {
54 return Err(anyhow!("shared project on unexpected room"))?;
55 }
56 if !participant
57 .role
58 .unwrap_or(ChannelRole::Member)
59 .can_edit_projects()
60 {
61 return Err(anyhow!("guests cannot share projects"))?;
62 }
63
64 let project = project::ActiveModel {
65 room_id: ActiveValue::set(Some(participant.room_id)),
66 host_user_id: ActiveValue::set(Some(participant.user_id)),
67 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
68 host_connection_server_id: ActiveValue::set(Some(ServerId(
69 connection.owner_id as i32,
70 ))),
71 id: ActiveValue::NotSet,
72 }
73 .insert(&*tx)
74 .await?;
75
76 if !worktrees.is_empty() {
77 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
78 worktree::ActiveModel {
79 id: ActiveValue::set(worktree.id as i64),
80 project_id: ActiveValue::set(project.id),
81 abs_path: ActiveValue::set(worktree.abs_path.clone()),
82 root_name: ActiveValue::set(worktree.root_name.clone()),
83 visible: ActiveValue::set(worktree.visible),
84 scan_id: ActiveValue::set(0),
85 completed_scan_id: ActiveValue::set(0),
86 }
87 }))
88 .exec(&*tx)
89 .await?;
90 }
91
92 let replica_id = if is_ssh_project { 1 } else { 0 };
93
94 project_collaborator::ActiveModel {
95 project_id: ActiveValue::set(project.id),
96 connection_id: ActiveValue::set(connection.id as i32),
97 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
98 user_id: ActiveValue::set(participant.user_id),
99 replica_id: ActiveValue::set(ReplicaId(replica_id)),
100 is_host: ActiveValue::set(true),
101 ..Default::default()
102 }
103 .insert(&*tx)
104 .await?;
105
106 let room = self.get_room(room_id, &tx).await?;
107 Ok((project.id, room))
108 })
109 .await
110 }
111
112 pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
113 self.weak_transaction(|tx| async move {
114 project::Entity::delete_by_id(project_id).exec(&*tx).await?;
115 Ok(())
116 })
117 .await
118 }
119
120 /// Unshares the given project.
121 pub async fn unshare_project(
122 &self,
123 project_id: ProjectId,
124 connection: ConnectionId,
125 ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
126 self.project_transaction(project_id, |tx| async move {
127 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
128 let project = project::Entity::find_by_id(project_id)
129 .one(&*tx)
130 .await?
131 .ok_or_else(|| anyhow!("project not found"))?;
132 let room = if let Some(room_id) = project.room_id {
133 Some(self.get_room(room_id, &tx).await?)
134 } else {
135 None
136 };
137 if project.host_connection()? == connection {
138 return Ok((true, room, guest_connection_ids));
139 }
140 Err(anyhow!("cannot unshare a project hosted by another user"))?
141 })
142 .await
143 }
144
145 /// Updates the worktrees associated with the given project.
146 pub async fn update_project(
147 &self,
148 project_id: ProjectId,
149 connection: ConnectionId,
150 worktrees: &[proto::WorktreeMetadata],
151 ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
152 self.project_transaction(project_id, |tx| async move {
153 let project = project::Entity::find_by_id(project_id)
154 .filter(
155 Condition::all()
156 .add(project::Column::HostConnectionId.eq(connection.id as i32))
157 .add(
158 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
159 ),
160 )
161 .one(&*tx)
162 .await?
163 .ok_or_else(|| anyhow!("no such project"))?;
164
165 self.update_project_worktrees(project.id, worktrees, &tx)
166 .await?;
167
168 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
169
170 let room = if let Some(room_id) = project.room_id {
171 Some(self.get_room(room_id, &tx).await?)
172 } else {
173 None
174 };
175
176 Ok((room, guest_connection_ids))
177 })
178 .await
179 }
180
181 pub(in crate::db) async fn update_project_worktrees(
182 &self,
183 project_id: ProjectId,
184 worktrees: &[proto::WorktreeMetadata],
185 tx: &DatabaseTransaction,
186 ) -> Result<()> {
187 if !worktrees.is_empty() {
188 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
189 id: ActiveValue::set(worktree.id as i64),
190 project_id: ActiveValue::set(project_id),
191 abs_path: ActiveValue::set(worktree.abs_path.clone()),
192 root_name: ActiveValue::set(worktree.root_name.clone()),
193 visible: ActiveValue::set(worktree.visible),
194 scan_id: ActiveValue::set(0),
195 completed_scan_id: ActiveValue::set(0),
196 }))
197 .on_conflict(
198 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
199 .update_column(worktree::Column::RootName)
200 .to_owned(),
201 )
202 .exec(tx)
203 .await?;
204 }
205
206 worktree::Entity::delete_many()
207 .filter(worktree::Column::ProjectId.eq(project_id).and(
208 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
209 ))
210 .exec(tx)
211 .await?;
212
213 Ok(())
214 }
215
216 pub async fn update_worktree(
217 &self,
218 update: &proto::UpdateWorktree,
219 connection: ConnectionId,
220 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
221 if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
222 || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
223 {
224 return Err(anyhow!(
225 "invalid worktree update. removed entries: {}, updated entries: {}",
226 update.removed_entries.len(),
227 update.updated_entries.len()
228 ))?;
229 }
230
231 let project_id = ProjectId::from_proto(update.project_id);
232 let worktree_id = update.worktree_id as i64;
233 self.project_transaction(project_id, |tx| async move {
234 // Ensure the update comes from the host.
235 let _project = project::Entity::find_by_id(project_id)
236 .filter(
237 Condition::all()
238 .add(project::Column::HostConnectionId.eq(connection.id as i32))
239 .add(
240 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
241 ),
242 )
243 .one(&*tx)
244 .await?
245 .ok_or_else(|| anyhow!("no such project: {project_id}"))?;
246
247 // Update metadata.
248 worktree::Entity::update(worktree::ActiveModel {
249 id: ActiveValue::set(worktree_id),
250 project_id: ActiveValue::set(project_id),
251 root_name: ActiveValue::set(update.root_name.clone()),
252 scan_id: ActiveValue::set(update.scan_id as i64),
253 completed_scan_id: if update.is_last_update {
254 ActiveValue::set(update.scan_id as i64)
255 } else {
256 ActiveValue::default()
257 },
258 abs_path: ActiveValue::set(update.abs_path.clone()),
259 ..Default::default()
260 })
261 .exec(&*tx)
262 .await?;
263
264 if !update.updated_entries.is_empty() {
265 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
266 let mtime = entry.mtime.clone().unwrap_or_default();
267 worktree_entry::ActiveModel {
268 project_id: ActiveValue::set(project_id),
269 worktree_id: ActiveValue::set(worktree_id),
270 id: ActiveValue::set(entry.id as i64),
271 is_dir: ActiveValue::set(entry.is_dir),
272 path: ActiveValue::set(entry.path.clone()),
273 inode: ActiveValue::set(entry.inode as i64),
274 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
275 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
276 canonical_path: ActiveValue::set(entry.canonical_path.clone()),
277 is_ignored: ActiveValue::set(entry.is_ignored),
278 git_status: ActiveValue::set(None),
279 is_external: ActiveValue::set(entry.is_external),
280 is_deleted: ActiveValue::set(false),
281 scan_id: ActiveValue::set(update.scan_id as i64),
282 is_fifo: ActiveValue::set(entry.is_fifo),
283 }
284 }))
285 .on_conflict(
286 OnConflict::columns([
287 worktree_entry::Column::ProjectId,
288 worktree_entry::Column::WorktreeId,
289 worktree_entry::Column::Id,
290 ])
291 .update_columns([
292 worktree_entry::Column::IsDir,
293 worktree_entry::Column::Path,
294 worktree_entry::Column::Inode,
295 worktree_entry::Column::MtimeSeconds,
296 worktree_entry::Column::MtimeNanos,
297 worktree_entry::Column::CanonicalPath,
298 worktree_entry::Column::IsIgnored,
299 worktree_entry::Column::ScanId,
300 ])
301 .to_owned(),
302 )
303 .exec(&*tx)
304 .await?;
305 }
306
307 if !update.removed_entries.is_empty() {
308 worktree_entry::Entity::update_many()
309 .filter(
310 worktree_entry::Column::ProjectId
311 .eq(project_id)
312 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
313 .and(
314 worktree_entry::Column::Id
315 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
316 ),
317 )
318 .set(worktree_entry::ActiveModel {
319 is_deleted: ActiveValue::Set(true),
320 scan_id: ActiveValue::Set(update.scan_id as i64),
321 ..Default::default()
322 })
323 .exec(&*tx)
324 .await?;
325 }
326
327 if !update.updated_repositories.is_empty() {
328 worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
329 |repository| worktree_repository::ActiveModel {
330 project_id: ActiveValue::set(project_id),
331 worktree_id: ActiveValue::set(worktree_id),
332 work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
333 scan_id: ActiveValue::set(update.scan_id as i64),
334 branch: ActiveValue::set(repository.branch.clone()),
335 is_deleted: ActiveValue::set(false),
336 },
337 ))
338 .on_conflict(
339 OnConflict::columns([
340 worktree_repository::Column::ProjectId,
341 worktree_repository::Column::WorktreeId,
342 worktree_repository::Column::WorkDirectoryId,
343 ])
344 .update_columns([
345 worktree_repository::Column::ScanId,
346 worktree_repository::Column::Branch,
347 ])
348 .to_owned(),
349 )
350 .exec(&*tx)
351 .await?;
352
353 let has_any_statuses = update
354 .updated_repositories
355 .iter()
356 .any(|repository| !repository.updated_statuses.is_empty());
357
358 if has_any_statuses {
359 worktree_repository_statuses::Entity::insert_many(
360 update.updated_repositories.iter().flat_map(
361 |repository: &proto::RepositoryEntry| {
362 repository.updated_statuses.iter().map(|status_entry| {
363 let (repo_path, status_kind, first_status, second_status) =
364 proto_status_to_db(status_entry.clone());
365 worktree_repository_statuses::ActiveModel {
366 project_id: ActiveValue::set(project_id),
367 worktree_id: ActiveValue::set(worktree_id),
368 work_directory_id: ActiveValue::set(
369 repository.work_directory_id as i64,
370 ),
371 scan_id: ActiveValue::set(update.scan_id as i64),
372 is_deleted: ActiveValue::set(false),
373 repo_path: ActiveValue::set(repo_path),
374 status: ActiveValue::set(0),
375 status_kind: ActiveValue::set(status_kind),
376 first_status: ActiveValue::set(first_status),
377 second_status: ActiveValue::set(second_status),
378 }
379 })
380 },
381 ),
382 )
383 .on_conflict(
384 OnConflict::columns([
385 worktree_repository_statuses::Column::ProjectId,
386 worktree_repository_statuses::Column::WorktreeId,
387 worktree_repository_statuses::Column::WorkDirectoryId,
388 worktree_repository_statuses::Column::RepoPath,
389 ])
390 .update_columns([
391 worktree_repository_statuses::Column::ScanId,
392 worktree_repository_statuses::Column::StatusKind,
393 worktree_repository_statuses::Column::FirstStatus,
394 worktree_repository_statuses::Column::SecondStatus,
395 ])
396 .to_owned(),
397 )
398 .exec(&*tx)
399 .await?;
400 }
401
402 let has_any_removed_statuses = update
403 .updated_repositories
404 .iter()
405 .any(|repository| !repository.removed_statuses.is_empty());
406
407 if has_any_removed_statuses {
408 worktree_repository_statuses::Entity::update_many()
409 .filter(
410 worktree_repository_statuses::Column::ProjectId
411 .eq(project_id)
412 .and(
413 worktree_repository_statuses::Column::WorktreeId
414 .eq(worktree_id),
415 )
416 .and(
417 worktree_repository_statuses::Column::RepoPath.is_in(
418 update.updated_repositories.iter().flat_map(|repository| {
419 repository.removed_statuses.iter()
420 }),
421 ),
422 ),
423 )
424 .set(worktree_repository_statuses::ActiveModel {
425 is_deleted: ActiveValue::Set(true),
426 scan_id: ActiveValue::Set(update.scan_id as i64),
427 ..Default::default()
428 })
429 .exec(&*tx)
430 .await?;
431 }
432 }
433
434 if !update.removed_repositories.is_empty() {
435 worktree_repository::Entity::update_many()
436 .filter(
437 worktree_repository::Column::ProjectId
438 .eq(project_id)
439 .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
440 .and(
441 worktree_repository::Column::WorkDirectoryId
442 .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
443 ),
444 )
445 .set(worktree_repository::ActiveModel {
446 is_deleted: ActiveValue::Set(true),
447 scan_id: ActiveValue::Set(update.scan_id as i64),
448 ..Default::default()
449 })
450 .exec(&*tx)
451 .await?;
452 }
453
454 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
455 Ok(connection_ids)
456 })
457 .await
458 }
459
460 /// Updates the diagnostic summary for the given connection.
461 pub async fn update_diagnostic_summary(
462 &self,
463 update: &proto::UpdateDiagnosticSummary,
464 connection: ConnectionId,
465 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
466 let project_id = ProjectId::from_proto(update.project_id);
467 let worktree_id = update.worktree_id as i64;
468 self.project_transaction(project_id, |tx| async move {
469 let summary = update
470 .summary
471 .as_ref()
472 .ok_or_else(|| anyhow!("invalid summary"))?;
473
474 // Ensure the update comes from the host.
475 let project = project::Entity::find_by_id(project_id)
476 .one(&*tx)
477 .await?
478 .ok_or_else(|| anyhow!("no such project"))?;
479 if project.host_connection()? != connection {
480 return Err(anyhow!("can't update a project hosted by someone else"))?;
481 }
482
483 // Update summary.
484 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
485 project_id: ActiveValue::set(project_id),
486 worktree_id: ActiveValue::set(worktree_id),
487 path: ActiveValue::set(summary.path.clone()),
488 language_server_id: ActiveValue::set(summary.language_server_id as i64),
489 error_count: ActiveValue::set(summary.error_count as i32),
490 warning_count: ActiveValue::set(summary.warning_count as i32),
491 })
492 .on_conflict(
493 OnConflict::columns([
494 worktree_diagnostic_summary::Column::ProjectId,
495 worktree_diagnostic_summary::Column::WorktreeId,
496 worktree_diagnostic_summary::Column::Path,
497 ])
498 .update_columns([
499 worktree_diagnostic_summary::Column::LanguageServerId,
500 worktree_diagnostic_summary::Column::ErrorCount,
501 worktree_diagnostic_summary::Column::WarningCount,
502 ])
503 .to_owned(),
504 )
505 .exec(&*tx)
506 .await?;
507
508 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
509 Ok(connection_ids)
510 })
511 .await
512 }
513
514 /// Starts the language server for the given connection.
515 pub async fn start_language_server(
516 &self,
517 update: &proto::StartLanguageServer,
518 connection: ConnectionId,
519 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
520 let project_id = ProjectId::from_proto(update.project_id);
521 self.project_transaction(project_id, |tx| async move {
522 let server = update
523 .server
524 .as_ref()
525 .ok_or_else(|| anyhow!("invalid language server"))?;
526
527 // Ensure the update comes from the host.
528 let project = project::Entity::find_by_id(project_id)
529 .one(&*tx)
530 .await?
531 .ok_or_else(|| anyhow!("no such project"))?;
532 if project.host_connection()? != connection {
533 return Err(anyhow!("can't update a project hosted by someone else"))?;
534 }
535
536 // Add the newly-started language server.
537 language_server::Entity::insert(language_server::ActiveModel {
538 project_id: ActiveValue::set(project_id),
539 id: ActiveValue::set(server.id as i64),
540 name: ActiveValue::set(server.name.clone()),
541 })
542 .on_conflict(
543 OnConflict::columns([
544 language_server::Column::ProjectId,
545 language_server::Column::Id,
546 ])
547 .update_column(language_server::Column::Name)
548 .to_owned(),
549 )
550 .exec(&*tx)
551 .await?;
552
553 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
554 Ok(connection_ids)
555 })
556 .await
557 }
558
559 /// Updates the worktree settings for the given connection.
560 pub async fn update_worktree_settings(
561 &self,
562 update: &proto::UpdateWorktreeSettings,
563 connection: ConnectionId,
564 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
565 let project_id = ProjectId::from_proto(update.project_id);
566 let kind = match update.kind {
567 Some(kind) => proto::LocalSettingsKind::from_i32(kind)
568 .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
569 None => proto::LocalSettingsKind::Settings,
570 };
571 let kind = LocalSettingsKind::from_proto(kind);
572 self.project_transaction(project_id, |tx| async move {
573 // Ensure the update comes from the host.
574 let project = project::Entity::find_by_id(project_id)
575 .one(&*tx)
576 .await?
577 .ok_or_else(|| anyhow!("no such project"))?;
578 if project.host_connection()? != connection {
579 return Err(anyhow!("can't update a project hosted by someone else"))?;
580 }
581
582 if let Some(content) = &update.content {
583 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
584 project_id: ActiveValue::Set(project_id),
585 worktree_id: ActiveValue::Set(update.worktree_id as i64),
586 path: ActiveValue::Set(update.path.clone()),
587 content: ActiveValue::Set(content.clone()),
588 kind: ActiveValue::Set(kind),
589 })
590 .on_conflict(
591 OnConflict::columns([
592 worktree_settings_file::Column::ProjectId,
593 worktree_settings_file::Column::WorktreeId,
594 worktree_settings_file::Column::Path,
595 ])
596 .update_column(worktree_settings_file::Column::Content)
597 .to_owned(),
598 )
599 .exec(&*tx)
600 .await?;
601 } else {
602 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
603 project_id: ActiveValue::Set(project_id),
604 worktree_id: ActiveValue::Set(update.worktree_id as i64),
605 path: ActiveValue::Set(update.path.clone()),
606 ..Default::default()
607 })
608 .exec(&*tx)
609 .await?;
610 }
611
612 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
613 Ok(connection_ids)
614 })
615 .await
616 }
617
618 pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
619 self.transaction(|tx| async move {
620 Ok(project::Entity::find_by_id(id)
621 .one(&*tx)
622 .await?
623 .ok_or_else(|| anyhow!("no such project"))?)
624 })
625 .await
626 }
627
628 /// Adds the given connection to the specified project
629 /// in the current room.
630 pub async fn join_project(
631 &self,
632 project_id: ProjectId,
633 connection: ConnectionId,
634 user_id: UserId,
635 ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
636 self.project_transaction(project_id, |tx| async move {
637 let (project, role) = self
638 .access_project(project_id, connection, Capability::ReadOnly, &tx)
639 .await?;
640 self.join_project_internal(project, user_id, connection, role, &tx)
641 .await
642 })
643 .await
644 }
645
646 async fn join_project_internal(
647 &self,
648 project: project::Model,
649 user_id: UserId,
650 connection: ConnectionId,
651 role: ChannelRole,
652 tx: &DatabaseTransaction,
653 ) -> Result<(Project, ReplicaId)> {
654 let mut collaborators = project
655 .find_related(project_collaborator::Entity)
656 .all(tx)
657 .await?;
658 let replica_ids = collaborators
659 .iter()
660 .map(|c| c.replica_id)
661 .collect::<HashSet<_>>();
662 let mut replica_id = ReplicaId(1);
663 while replica_ids.contains(&replica_id) {
664 replica_id.0 += 1;
665 }
666 let new_collaborator = project_collaborator::ActiveModel {
667 project_id: ActiveValue::set(project.id),
668 connection_id: ActiveValue::set(connection.id as i32),
669 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
670 user_id: ActiveValue::set(user_id),
671 replica_id: ActiveValue::set(replica_id),
672 is_host: ActiveValue::set(false),
673 ..Default::default()
674 }
675 .insert(tx)
676 .await?;
677 collaborators.push(new_collaborator);
678
679 let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
680 let mut worktrees = db_worktrees
681 .into_iter()
682 .map(|db_worktree| {
683 (
684 db_worktree.id as u64,
685 Worktree {
686 id: db_worktree.id as u64,
687 abs_path: db_worktree.abs_path,
688 root_name: db_worktree.root_name,
689 visible: db_worktree.visible,
690 entries: Default::default(),
691 repository_entries: Default::default(),
692 diagnostic_summaries: Default::default(),
693 settings_files: Default::default(),
694 scan_id: db_worktree.scan_id as u64,
695 completed_scan_id: db_worktree.completed_scan_id as u64,
696 },
697 )
698 })
699 .collect::<BTreeMap<_, _>>();
700
701 // Populate worktree entries.
702 {
703 let mut db_entries = worktree_entry::Entity::find()
704 .filter(
705 Condition::all()
706 .add(worktree_entry::Column::ProjectId.eq(project.id))
707 .add(worktree_entry::Column::IsDeleted.eq(false)),
708 )
709 .stream(tx)
710 .await?;
711 while let Some(db_entry) = db_entries.next().await {
712 let db_entry = db_entry?;
713 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
714 worktree.entries.push(proto::Entry {
715 id: db_entry.id as u64,
716 is_dir: db_entry.is_dir,
717 path: db_entry.path,
718 inode: db_entry.inode as u64,
719 mtime: Some(proto::Timestamp {
720 seconds: db_entry.mtime_seconds as u64,
721 nanos: db_entry.mtime_nanos as u32,
722 }),
723 canonical_path: db_entry.canonical_path,
724 is_ignored: db_entry.is_ignored,
725 is_external: db_entry.is_external,
726 // This is only used in the summarization backlog, so if it's None,
727 // that just means we won't be able to detect when to resummarize
728 // based on total number of backlogged bytes - instead, we'd go
729 // on number of files only. That shouldn't be a huge deal in practice.
730 size: None,
731 is_fifo: db_entry.is_fifo,
732 });
733 }
734 }
735 }
736
737 // Populate repository entries.
738 {
739 let db_repository_entries = worktree_repository::Entity::find()
740 .filter(
741 Condition::all()
742 .add(worktree_repository::Column::ProjectId.eq(project.id))
743 .add(worktree_repository::Column::IsDeleted.eq(false)),
744 )
745 .all(tx)
746 .await?;
747 for db_repository_entry in db_repository_entries {
748 if let Some(worktree) = worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
749 {
750 let mut repository_statuses = worktree_repository_statuses::Entity::find()
751 .filter(
752 Condition::all()
753 .add(worktree_repository_statuses::Column::ProjectId.eq(project.id))
754 .add(
755 worktree_repository_statuses::Column::WorktreeId
756 .eq(worktree.id),
757 )
758 .add(
759 worktree_repository_statuses::Column::WorkDirectoryId
760 .eq(db_repository_entry.work_directory_id),
761 )
762 .add(worktree_repository_statuses::Column::IsDeleted.eq(false)),
763 )
764 .stream(tx)
765 .await?;
766 let mut updated_statuses = Vec::new();
767 while let Some(status_entry) = repository_statuses.next().await {
768 let status_entry: worktree_repository_statuses::Model = status_entry?;
769 updated_statuses.push(db_status_to_proto(status_entry)?);
770 }
771
772 worktree.repository_entries.insert(
773 db_repository_entry.work_directory_id as u64,
774 proto::RepositoryEntry {
775 work_directory_id: db_repository_entry.work_directory_id as u64,
776 branch: db_repository_entry.branch,
777 updated_statuses,
778 removed_statuses: Vec::new(),
779 },
780 );
781 }
782 }
783 }
784
785 // Populate worktree diagnostic summaries.
786 {
787 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
788 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
789 .stream(tx)
790 .await?;
791 while let Some(db_summary) = db_summaries.next().await {
792 let db_summary = db_summary?;
793 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
794 worktree
795 .diagnostic_summaries
796 .push(proto::DiagnosticSummary {
797 path: db_summary.path,
798 language_server_id: db_summary.language_server_id as u64,
799 error_count: db_summary.error_count as u32,
800 warning_count: db_summary.warning_count as u32,
801 });
802 }
803 }
804 }
805
806 // Populate worktree settings files
807 {
808 let mut db_settings_files = worktree_settings_file::Entity::find()
809 .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
810 .stream(tx)
811 .await?;
812 while let Some(db_settings_file) = db_settings_files.next().await {
813 let db_settings_file = db_settings_file?;
814 if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
815 worktree.settings_files.push(WorktreeSettingsFile {
816 path: db_settings_file.path,
817 content: db_settings_file.content,
818 kind: db_settings_file.kind,
819 });
820 }
821 }
822 }
823
824 // Populate language servers.
825 let language_servers = project
826 .find_related(language_server::Entity)
827 .all(tx)
828 .await?;
829
830 let project = Project {
831 id: project.id,
832 role,
833 collaborators: collaborators
834 .into_iter()
835 .map(|collaborator| ProjectCollaborator {
836 connection_id: collaborator.connection(),
837 user_id: collaborator.user_id,
838 replica_id: collaborator.replica_id,
839 is_host: collaborator.is_host,
840 })
841 .collect(),
842 worktrees,
843 language_servers: language_servers
844 .into_iter()
845 .map(|language_server| proto::LanguageServer {
846 id: language_server.id as u64,
847 name: language_server.name,
848 worktree_id: None,
849 })
850 .collect(),
851 };
852 Ok((project, replica_id as ReplicaId))
853 }
854
855 /// Removes the given connection from the specified project.
856 pub async fn leave_project(
857 &self,
858 project_id: ProjectId,
859 connection: ConnectionId,
860 ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
861 self.project_transaction(project_id, |tx| async move {
862 let result = project_collaborator::Entity::delete_many()
863 .filter(
864 Condition::all()
865 .add(project_collaborator::Column::ProjectId.eq(project_id))
866 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
867 .add(
868 project_collaborator::Column::ConnectionServerId
869 .eq(connection.owner_id as i32),
870 ),
871 )
872 .exec(&*tx)
873 .await?;
874 if result.rows_affected == 0 {
875 Err(anyhow!("not a collaborator on this project"))?;
876 }
877
878 let project = project::Entity::find_by_id(project_id)
879 .one(&*tx)
880 .await?
881 .ok_or_else(|| anyhow!("no such project"))?;
882 let collaborators = project
883 .find_related(project_collaborator::Entity)
884 .all(&*tx)
885 .await?;
886 let connection_ids: Vec<ConnectionId> = collaborators
887 .into_iter()
888 .map(|collaborator| collaborator.connection())
889 .collect();
890
891 follower::Entity::delete_many()
892 .filter(
893 Condition::any()
894 .add(
895 Condition::all()
896 .add(follower::Column::ProjectId.eq(Some(project_id)))
897 .add(
898 follower::Column::LeaderConnectionServerId
899 .eq(connection.owner_id),
900 )
901 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
902 )
903 .add(
904 Condition::all()
905 .add(follower::Column::ProjectId.eq(Some(project_id)))
906 .add(
907 follower::Column::FollowerConnectionServerId
908 .eq(connection.owner_id),
909 )
910 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
911 ),
912 )
913 .exec(&*tx)
914 .await?;
915
916 let room = if let Some(room_id) = project.room_id {
917 Some(self.get_room(room_id, &tx).await?)
918 } else {
919 None
920 };
921
922 let left_project = LeftProject {
923 id: project_id,
924 should_unshare: connection == project.host_connection()?,
925 connection_ids,
926 };
927 Ok((room, left_project))
928 })
929 .await
930 }
931
932 pub async fn check_user_is_project_host(
933 &self,
934 project_id: ProjectId,
935 connection_id: ConnectionId,
936 ) -> Result<()> {
937 self.project_transaction(project_id, |tx| async move {
938 project::Entity::find()
939 .filter(
940 Condition::all()
941 .add(project::Column::Id.eq(project_id))
942 .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
943 .add(
944 project::Column::HostConnectionServerId
945 .eq(Some(connection_id.owner_id as i32)),
946 ),
947 )
948 .one(&*tx)
949 .await?
950 .ok_or_else(|| anyhow!("failed to read project host"))?;
951
952 Ok(())
953 })
954 .await
955 .map(|guard| guard.into_inner())
956 }
957
958 /// Returns the current project if the given user is authorized to access it with the specified capability.
959 pub async fn access_project(
960 &self,
961 project_id: ProjectId,
962 connection_id: ConnectionId,
963 capability: Capability,
964 tx: &DatabaseTransaction,
965 ) -> Result<(project::Model, ChannelRole)> {
966 let project = project::Entity::find_by_id(project_id)
967 .one(tx)
968 .await?
969 .ok_or_else(|| anyhow!("no such project"))?;
970
971 let role_from_room = if let Some(room_id) = project.room_id {
972 room_participant::Entity::find()
973 .filter(room_participant::Column::RoomId.eq(room_id))
974 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
975 .one(tx)
976 .await?
977 .and_then(|participant| participant.role)
978 } else {
979 None
980 };
981
982 let role = role_from_room.unwrap_or(ChannelRole::Banned);
983
984 match capability {
985 Capability::ReadWrite => {
986 if !role.can_edit_projects() {
987 return Err(anyhow!("not authorized to edit projects"))?;
988 }
989 }
990 Capability::ReadOnly => {
991 if !role.can_read_projects() {
992 return Err(anyhow!("not authorized to read projects"))?;
993 }
994 }
995 }
996
997 Ok((project, role))
998 }
999
1000 /// Returns the host connection for a read-only request to join a shared project.
1001 pub async fn host_for_read_only_project_request(
1002 &self,
1003 project_id: ProjectId,
1004 connection_id: ConnectionId,
1005 ) -> Result<ConnectionId> {
1006 self.project_transaction(project_id, |tx| async move {
1007 let (project, _) = self
1008 .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1009 .await?;
1010 project.host_connection()
1011 })
1012 .await
1013 .map(|guard| guard.into_inner())
1014 }
1015
1016 /// Returns the host connection for a request to join a shared project.
1017 pub async fn host_for_mutating_project_request(
1018 &self,
1019 project_id: ProjectId,
1020 connection_id: ConnectionId,
1021 ) -> Result<ConnectionId> {
1022 self.project_transaction(project_id, |tx| async move {
1023 let (project, _) = self
1024 .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1025 .await?;
1026 project.host_connection()
1027 })
1028 .await
1029 .map(|guard| guard.into_inner())
1030 }
1031
1032 pub async fn connections_for_buffer_update(
1033 &self,
1034 project_id: ProjectId,
1035 connection_id: ConnectionId,
1036 capability: Capability,
1037 ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1038 self.project_transaction(project_id, |tx| async move {
1039 // Authorize
1040 let (project, _) = self
1041 .access_project(project_id, connection_id, capability, &tx)
1042 .await?;
1043
1044 let host_connection_id = project.host_connection()?;
1045
1046 let collaborators = project_collaborator::Entity::find()
1047 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1048 .all(&*tx)
1049 .await?;
1050
1051 let guest_connection_ids = collaborators
1052 .into_iter()
1053 .filter_map(|collaborator| {
1054 if collaborator.is_host {
1055 None
1056 } else {
1057 Some(collaborator.connection())
1058 }
1059 })
1060 .collect();
1061
1062 Ok((host_connection_id, guest_connection_ids))
1063 })
1064 .await
1065 }
1066
1067 /// Returns the connection IDs in the given project.
1068 ///
1069 /// The provided `connection_id` must also be a collaborator in the project,
1070 /// otherwise an error will be returned.
1071 pub async fn project_connection_ids(
1072 &self,
1073 project_id: ProjectId,
1074 connection_id: ConnectionId,
1075 exclude_dev_server: bool,
1076 ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1077 self.project_transaction(project_id, |tx| async move {
1078 let project = project::Entity::find_by_id(project_id)
1079 .one(&*tx)
1080 .await?
1081 .ok_or_else(|| anyhow!("no such project"))?;
1082
1083 let mut collaborators = project_collaborator::Entity::find()
1084 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1085 .stream(&*tx)
1086 .await?;
1087
1088 let mut connection_ids = HashSet::default();
1089 if let Some(host_connection) = project.host_connection().log_err() {
1090 if !exclude_dev_server {
1091 connection_ids.insert(host_connection);
1092 }
1093 }
1094
1095 while let Some(collaborator) = collaborators.next().await {
1096 let collaborator = collaborator?;
1097 connection_ids.insert(collaborator.connection());
1098 }
1099
1100 if connection_ids.contains(&connection_id)
1101 || Some(connection_id) == project.host_connection().ok()
1102 {
1103 Ok(connection_ids)
1104 } else {
1105 Err(anyhow!(
1106 "can only send project updates to a project you're in"
1107 ))?
1108 }
1109 })
1110 .await
1111 }
1112
1113 async fn project_guest_connection_ids(
1114 &self,
1115 project_id: ProjectId,
1116 tx: &DatabaseTransaction,
1117 ) -> Result<Vec<ConnectionId>> {
1118 let mut collaborators = project_collaborator::Entity::find()
1119 .filter(
1120 project_collaborator::Column::ProjectId
1121 .eq(project_id)
1122 .and(project_collaborator::Column::IsHost.eq(false)),
1123 )
1124 .stream(tx)
1125 .await?;
1126
1127 let mut guest_connection_ids = Vec::new();
1128 while let Some(collaborator) = collaborators.next().await {
1129 let collaborator = collaborator?;
1130 guest_connection_ids.push(collaborator.connection());
1131 }
1132 Ok(guest_connection_ids)
1133 }
1134
1135 /// Returns the [`RoomId`] for the given project.
1136 pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1137 self.transaction(|tx| async move {
1138 Ok(project::Entity::find_by_id(project_id)
1139 .one(&*tx)
1140 .await?
1141 .and_then(|project| project.room_id))
1142 })
1143 .await
1144 }
1145
1146 pub async fn check_room_participants(
1147 &self,
1148 room_id: RoomId,
1149 leader_id: ConnectionId,
1150 follower_id: ConnectionId,
1151 ) -> Result<()> {
1152 self.transaction(|tx| async move {
1153 use room_participant::Column;
1154
1155 let count = room_participant::Entity::find()
1156 .filter(
1157 Condition::all().add(Column::RoomId.eq(room_id)).add(
1158 Condition::any()
1159 .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1160 Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1161 ))
1162 .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1163 Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1164 )),
1165 ),
1166 )
1167 .count(&*tx)
1168 .await?;
1169
1170 if count < 2 {
1171 Err(anyhow!("not room participants"))?;
1172 }
1173
1174 Ok(())
1175 })
1176 .await
1177 }
1178
1179 /// Adds the given follower connection as a follower of the given leader connection.
1180 pub async fn follow(
1181 &self,
1182 room_id: RoomId,
1183 project_id: ProjectId,
1184 leader_connection: ConnectionId,
1185 follower_connection: ConnectionId,
1186 ) -> Result<TransactionGuard<proto::Room>> {
1187 self.room_transaction(room_id, |tx| async move {
1188 follower::ActiveModel {
1189 room_id: ActiveValue::set(room_id),
1190 project_id: ActiveValue::set(project_id),
1191 leader_connection_server_id: ActiveValue::set(ServerId(
1192 leader_connection.owner_id as i32,
1193 )),
1194 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1195 follower_connection_server_id: ActiveValue::set(ServerId(
1196 follower_connection.owner_id as i32,
1197 )),
1198 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1199 ..Default::default()
1200 }
1201 .insert(&*tx)
1202 .await?;
1203
1204 let room = self.get_room(room_id, &tx).await?;
1205 Ok(room)
1206 })
1207 .await
1208 }
1209
1210 /// Removes the given follower connection as a follower of the given leader connection.
1211 pub async fn unfollow(
1212 &self,
1213 room_id: RoomId,
1214 project_id: ProjectId,
1215 leader_connection: ConnectionId,
1216 follower_connection: ConnectionId,
1217 ) -> Result<TransactionGuard<proto::Room>> {
1218 self.room_transaction(room_id, |tx| async move {
1219 follower::Entity::delete_many()
1220 .filter(
1221 Condition::all()
1222 .add(follower::Column::RoomId.eq(room_id))
1223 .add(follower::Column::ProjectId.eq(project_id))
1224 .add(
1225 follower::Column::LeaderConnectionServerId
1226 .eq(leader_connection.owner_id),
1227 )
1228 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1229 .add(
1230 follower::Column::FollowerConnectionServerId
1231 .eq(follower_connection.owner_id),
1232 )
1233 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1234 )
1235 .exec(&*tx)
1236 .await?;
1237
1238 let room = self.get_room(room_id, &tx).await?;
1239 Ok(room)
1240 })
1241 .await
1242 }
1243}