1use anyhow::Context as _;
2
3use util::ResultExt;
4
5use super::*;
6
7impl Database {
8 /// Returns the count of all projects, excluding ones marked as admin.
9 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
10 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
11 enum QueryAs {
12 Count,
13 }
14
15 self.transaction(|tx| async move {
16 Ok(project::Entity::find()
17 .select_only()
18 .column_as(project::Column::Id.count(), QueryAs::Count)
19 .inner_join(user::Entity)
20 .filter(user::Column::Admin.eq(false))
21 .into_values::<_, QueryAs>()
22 .one(&*tx)
23 .await?
24 .unwrap_or(0i64) as usize)
25 })
26 .await
27 }
28
29 /// Shares a project with the given room.
30 pub async fn share_project(
31 &self,
32 room_id: RoomId,
33 connection: ConnectionId,
34 worktrees: &[proto::WorktreeMetadata],
35 is_ssh_project: bool,
36 ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
37 self.room_transaction(room_id, |tx| async move {
38 let participant = room_participant::Entity::find()
39 .filter(
40 Condition::all()
41 .add(
42 room_participant::Column::AnsweringConnectionId
43 .eq(connection.id as i32),
44 )
45 .add(
46 room_participant::Column::AnsweringConnectionServerId
47 .eq(connection.owner_id as i32),
48 ),
49 )
50 .one(&*tx)
51 .await?
52 .ok_or_else(|| anyhow!("could not find participant"))?;
53 if participant.room_id != room_id {
54 return Err(anyhow!("shared project on unexpected room"))?;
55 }
56 if !participant
57 .role
58 .unwrap_or(ChannelRole::Member)
59 .can_edit_projects()
60 {
61 return Err(anyhow!("guests cannot share projects"))?;
62 }
63
64 let project = project::ActiveModel {
65 room_id: ActiveValue::set(Some(participant.room_id)),
66 host_user_id: ActiveValue::set(Some(participant.user_id)),
67 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
68 host_connection_server_id: ActiveValue::set(Some(ServerId(
69 connection.owner_id as i32,
70 ))),
71 id: ActiveValue::NotSet,
72 }
73 .insert(&*tx)
74 .await?;
75
76 if !worktrees.is_empty() {
77 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
78 worktree::ActiveModel {
79 id: ActiveValue::set(worktree.id as i64),
80 project_id: ActiveValue::set(project.id),
81 abs_path: ActiveValue::set(worktree.abs_path.clone()),
82 root_name: ActiveValue::set(worktree.root_name.clone()),
83 visible: ActiveValue::set(worktree.visible),
84 scan_id: ActiveValue::set(0),
85 completed_scan_id: ActiveValue::set(0),
86 }
87 }))
88 .exec(&*tx)
89 .await?;
90 }
91
92 let replica_id = if is_ssh_project { 1 } else { 0 };
93
94 project_collaborator::ActiveModel {
95 project_id: ActiveValue::set(project.id),
96 connection_id: ActiveValue::set(connection.id as i32),
97 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
98 user_id: ActiveValue::set(participant.user_id),
99 replica_id: ActiveValue::set(ReplicaId(replica_id)),
100 is_host: ActiveValue::set(true),
101 ..Default::default()
102 }
103 .insert(&*tx)
104 .await?;
105
106 let room = self.get_room(room_id, &tx).await?;
107 Ok((project.id, room))
108 })
109 .await
110 }
111
112 pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
113 self.weak_transaction(|tx| async move {
114 project::Entity::delete_by_id(project_id).exec(&*tx).await?;
115 Ok(())
116 })
117 .await
118 }
119
120 /// Unshares the given project.
121 pub async fn unshare_project(
122 &self,
123 project_id: ProjectId,
124 connection: ConnectionId,
125 ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
126 self.project_transaction(project_id, |tx| async move {
127 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
128 let project = project::Entity::find_by_id(project_id)
129 .one(&*tx)
130 .await?
131 .ok_or_else(|| anyhow!("project not found"))?;
132 let room = if let Some(room_id) = project.room_id {
133 Some(self.get_room(room_id, &tx).await?)
134 } else {
135 None
136 };
137 if project.host_connection()? == connection {
138 return Ok((true, room, guest_connection_ids));
139 }
140 Err(anyhow!("cannot unshare a project hosted by another user"))?
141 })
142 .await
143 }
144
145 /// Updates the worktrees associated with the given project.
146 pub async fn update_project(
147 &self,
148 project_id: ProjectId,
149 connection: ConnectionId,
150 worktrees: &[proto::WorktreeMetadata],
151 ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
152 self.project_transaction(project_id, |tx| async move {
153 let project = project::Entity::find_by_id(project_id)
154 .filter(
155 Condition::all()
156 .add(project::Column::HostConnectionId.eq(connection.id as i32))
157 .add(
158 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
159 ),
160 )
161 .one(&*tx)
162 .await?
163 .ok_or_else(|| anyhow!("no such project"))?;
164
165 self.update_project_worktrees(project.id, worktrees, &tx)
166 .await?;
167
168 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
169
170 let room = if let Some(room_id) = project.room_id {
171 Some(self.get_room(room_id, &tx).await?)
172 } else {
173 None
174 };
175
176 Ok((room, guest_connection_ids))
177 })
178 .await
179 }
180
181 pub(in crate::db) async fn update_project_worktrees(
182 &self,
183 project_id: ProjectId,
184 worktrees: &[proto::WorktreeMetadata],
185 tx: &DatabaseTransaction,
186 ) -> Result<()> {
187 if !worktrees.is_empty() {
188 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
189 id: ActiveValue::set(worktree.id as i64),
190 project_id: ActiveValue::set(project_id),
191 abs_path: ActiveValue::set(worktree.abs_path.clone()),
192 root_name: ActiveValue::set(worktree.root_name.clone()),
193 visible: ActiveValue::set(worktree.visible),
194 scan_id: ActiveValue::set(0),
195 completed_scan_id: ActiveValue::set(0),
196 }))
197 .on_conflict(
198 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
199 .update_column(worktree::Column::RootName)
200 .to_owned(),
201 )
202 .exec(tx)
203 .await?;
204 }
205
206 worktree::Entity::delete_many()
207 .filter(worktree::Column::ProjectId.eq(project_id).and(
208 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
209 ))
210 .exec(tx)
211 .await?;
212
213 Ok(())
214 }
215
216 pub async fn update_worktree(
217 &self,
218 update: &proto::UpdateWorktree,
219 connection: ConnectionId,
220 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
221 if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
222 || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
223 {
224 return Err(anyhow!(
225 "invalid worktree update. removed entries: {}, updated entries: {}",
226 update.removed_entries.len(),
227 update.updated_entries.len()
228 ))?;
229 }
230
231 let project_id = ProjectId::from_proto(update.project_id);
232 let worktree_id = update.worktree_id as i64;
233 self.project_transaction(project_id, |tx| async move {
234 // Ensure the update comes from the host.
235 let _project = project::Entity::find_by_id(project_id)
236 .filter(
237 Condition::all()
238 .add(project::Column::HostConnectionId.eq(connection.id as i32))
239 .add(
240 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
241 ),
242 )
243 .one(&*tx)
244 .await?
245 .ok_or_else(|| anyhow!("no such project: {project_id}"))?;
246
247 // Update metadata.
248 worktree::Entity::update(worktree::ActiveModel {
249 id: ActiveValue::set(worktree_id),
250 project_id: ActiveValue::set(project_id),
251 root_name: ActiveValue::set(update.root_name.clone()),
252 scan_id: ActiveValue::set(update.scan_id as i64),
253 completed_scan_id: if update.is_last_update {
254 ActiveValue::set(update.scan_id as i64)
255 } else {
256 ActiveValue::default()
257 },
258 abs_path: ActiveValue::set(update.abs_path.clone()),
259 ..Default::default()
260 })
261 .exec(&*tx)
262 .await?;
263
264 if !update.updated_entries.is_empty() {
265 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
266 let mtime = entry.mtime.clone().unwrap_or_default();
267 worktree_entry::ActiveModel {
268 project_id: ActiveValue::set(project_id),
269 worktree_id: ActiveValue::set(worktree_id),
270 id: ActiveValue::set(entry.id as i64),
271 is_dir: ActiveValue::set(entry.is_dir),
272 path: ActiveValue::set(entry.path.clone()),
273 inode: ActiveValue::set(entry.inode as i64),
274 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
275 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
276 canonical_path: ActiveValue::set(entry.canonical_path.clone()),
277 is_ignored: ActiveValue::set(entry.is_ignored),
278 git_status: ActiveValue::set(None),
279 is_external: ActiveValue::set(entry.is_external),
280 is_deleted: ActiveValue::set(false),
281 scan_id: ActiveValue::set(update.scan_id as i64),
282 is_fifo: ActiveValue::set(entry.is_fifo),
283 }
284 }))
285 .on_conflict(
286 OnConflict::columns([
287 worktree_entry::Column::ProjectId,
288 worktree_entry::Column::WorktreeId,
289 worktree_entry::Column::Id,
290 ])
291 .update_columns([
292 worktree_entry::Column::IsDir,
293 worktree_entry::Column::Path,
294 worktree_entry::Column::Inode,
295 worktree_entry::Column::MtimeSeconds,
296 worktree_entry::Column::MtimeNanos,
297 worktree_entry::Column::CanonicalPath,
298 worktree_entry::Column::IsIgnored,
299 worktree_entry::Column::ScanId,
300 ])
301 .to_owned(),
302 )
303 .exec(&*tx)
304 .await?;
305 }
306
307 if !update.removed_entries.is_empty() {
308 worktree_entry::Entity::update_many()
309 .filter(
310 worktree_entry::Column::ProjectId
311 .eq(project_id)
312 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
313 .and(
314 worktree_entry::Column::Id
315 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
316 ),
317 )
318 .set(worktree_entry::ActiveModel {
319 is_deleted: ActiveValue::Set(true),
320 scan_id: ActiveValue::Set(update.scan_id as i64),
321 ..Default::default()
322 })
323 .exec(&*tx)
324 .await?;
325 }
326
327 if !update.updated_repositories.is_empty() {
328 worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
329 |repository| worktree_repository::ActiveModel {
330 project_id: ActiveValue::set(project_id),
331 worktree_id: ActiveValue::set(worktree_id),
332 work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
333 scan_id: ActiveValue::set(update.scan_id as i64),
334 branch: ActiveValue::set(repository.branch.clone()),
335 is_deleted: ActiveValue::set(false),
336 },
337 ))
338 .on_conflict(
339 OnConflict::columns([
340 worktree_repository::Column::ProjectId,
341 worktree_repository::Column::WorktreeId,
342 worktree_repository::Column::WorkDirectoryId,
343 ])
344 .update_columns([
345 worktree_repository::Column::ScanId,
346 worktree_repository::Column::Branch,
347 ])
348 .to_owned(),
349 )
350 .exec(&*tx)
351 .await?;
352
353 let has_any_statuses = update
354 .updated_repositories
355 .iter()
356 .any(|repository| !repository.updated_statuses.is_empty());
357
358 if has_any_statuses {
359 worktree_repository_statuses::Entity::insert_many(
360 update.updated_repositories.iter().flat_map(
361 |repository: &proto::RepositoryEntry| {
362 repository.updated_statuses.iter().map(|status_entry| {
363 worktree_repository_statuses::ActiveModel {
364 project_id: ActiveValue::set(project_id),
365 worktree_id: ActiveValue::set(worktree_id),
366 work_directory_id: ActiveValue::set(
367 repository.work_directory_id as i64,
368 ),
369 scan_id: ActiveValue::set(update.scan_id as i64),
370 is_deleted: ActiveValue::set(false),
371 repo_path: ActiveValue::set(status_entry.repo_path.clone()),
372 status: ActiveValue::set(status_entry.status as i64),
373 }
374 })
375 },
376 ),
377 )
378 .on_conflict(
379 OnConflict::columns([
380 worktree_repository_statuses::Column::ProjectId,
381 worktree_repository_statuses::Column::WorktreeId,
382 worktree_repository_statuses::Column::WorkDirectoryId,
383 worktree_repository_statuses::Column::RepoPath,
384 ])
385 .update_columns([
386 worktree_repository_statuses::Column::ScanId,
387 worktree_repository_statuses::Column::Status,
388 ])
389 .to_owned(),
390 )
391 .exec(&*tx)
392 .await?;
393 }
394
395 let has_any_removed_statuses = update
396 .updated_repositories
397 .iter()
398 .any(|repository| !repository.removed_statuses.is_empty());
399
400 if has_any_removed_statuses {
401 worktree_repository_statuses::Entity::update_many()
402 .filter(
403 worktree_repository_statuses::Column::ProjectId
404 .eq(project_id)
405 .and(
406 worktree_repository_statuses::Column::WorktreeId
407 .eq(worktree_id),
408 )
409 .and(
410 worktree_repository_statuses::Column::RepoPath.is_in(
411 update.updated_repositories.iter().flat_map(|repository| {
412 repository.removed_statuses.iter()
413 }),
414 ),
415 ),
416 )
417 .set(worktree_repository_statuses::ActiveModel {
418 is_deleted: ActiveValue::Set(true),
419 scan_id: ActiveValue::Set(update.scan_id as i64),
420 ..Default::default()
421 })
422 .exec(&*tx)
423 .await?;
424 }
425 }
426
427 if !update.removed_repositories.is_empty() {
428 worktree_repository::Entity::update_many()
429 .filter(
430 worktree_repository::Column::ProjectId
431 .eq(project_id)
432 .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
433 .and(
434 worktree_repository::Column::WorkDirectoryId
435 .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
436 ),
437 )
438 .set(worktree_repository::ActiveModel {
439 is_deleted: ActiveValue::Set(true),
440 scan_id: ActiveValue::Set(update.scan_id as i64),
441 ..Default::default()
442 })
443 .exec(&*tx)
444 .await?;
445 }
446
447 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
448 Ok(connection_ids)
449 })
450 .await
451 }
452
453 /// Updates the diagnostic summary for the given connection.
454 pub async fn update_diagnostic_summary(
455 &self,
456 update: &proto::UpdateDiagnosticSummary,
457 connection: ConnectionId,
458 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
459 let project_id = ProjectId::from_proto(update.project_id);
460 let worktree_id = update.worktree_id as i64;
461 self.project_transaction(project_id, |tx| async move {
462 let summary = update
463 .summary
464 .as_ref()
465 .ok_or_else(|| anyhow!("invalid summary"))?;
466
467 // Ensure the update comes from the host.
468 let project = project::Entity::find_by_id(project_id)
469 .one(&*tx)
470 .await?
471 .ok_or_else(|| anyhow!("no such project"))?;
472 if project.host_connection()? != connection {
473 return Err(anyhow!("can't update a project hosted by someone else"))?;
474 }
475
476 // Update summary.
477 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
478 project_id: ActiveValue::set(project_id),
479 worktree_id: ActiveValue::set(worktree_id),
480 path: ActiveValue::set(summary.path.clone()),
481 language_server_id: ActiveValue::set(summary.language_server_id as i64),
482 error_count: ActiveValue::set(summary.error_count as i32),
483 warning_count: ActiveValue::set(summary.warning_count as i32),
484 })
485 .on_conflict(
486 OnConflict::columns([
487 worktree_diagnostic_summary::Column::ProjectId,
488 worktree_diagnostic_summary::Column::WorktreeId,
489 worktree_diagnostic_summary::Column::Path,
490 ])
491 .update_columns([
492 worktree_diagnostic_summary::Column::LanguageServerId,
493 worktree_diagnostic_summary::Column::ErrorCount,
494 worktree_diagnostic_summary::Column::WarningCount,
495 ])
496 .to_owned(),
497 )
498 .exec(&*tx)
499 .await?;
500
501 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
502 Ok(connection_ids)
503 })
504 .await
505 }
506
507 /// Starts the language server for the given connection.
508 pub async fn start_language_server(
509 &self,
510 update: &proto::StartLanguageServer,
511 connection: ConnectionId,
512 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
513 let project_id = ProjectId::from_proto(update.project_id);
514 self.project_transaction(project_id, |tx| async move {
515 let server = update
516 .server
517 .as_ref()
518 .ok_or_else(|| anyhow!("invalid language server"))?;
519
520 // Ensure the update comes from the host.
521 let project = project::Entity::find_by_id(project_id)
522 .one(&*tx)
523 .await?
524 .ok_or_else(|| anyhow!("no such project"))?;
525 if project.host_connection()? != connection {
526 return Err(anyhow!("can't update a project hosted by someone else"))?;
527 }
528
529 // Add the newly-started language server.
530 language_server::Entity::insert(language_server::ActiveModel {
531 project_id: ActiveValue::set(project_id),
532 id: ActiveValue::set(server.id as i64),
533 name: ActiveValue::set(server.name.clone()),
534 })
535 .on_conflict(
536 OnConflict::columns([
537 language_server::Column::ProjectId,
538 language_server::Column::Id,
539 ])
540 .update_column(language_server::Column::Name)
541 .to_owned(),
542 )
543 .exec(&*tx)
544 .await?;
545
546 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
547 Ok(connection_ids)
548 })
549 .await
550 }
551
552 /// Updates the worktree settings for the given connection.
553 pub async fn update_worktree_settings(
554 &self,
555 update: &proto::UpdateWorktreeSettings,
556 connection: ConnectionId,
557 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
558 let project_id = ProjectId::from_proto(update.project_id);
559 let kind = match update.kind {
560 Some(kind) => proto::LocalSettingsKind::from_i32(kind)
561 .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
562 None => proto::LocalSettingsKind::Settings,
563 };
564 let kind = LocalSettingsKind::from_proto(kind);
565 self.project_transaction(project_id, |tx| async move {
566 // Ensure the update comes from the host.
567 let project = project::Entity::find_by_id(project_id)
568 .one(&*tx)
569 .await?
570 .ok_or_else(|| anyhow!("no such project"))?;
571 if project.host_connection()? != connection {
572 return Err(anyhow!("can't update a project hosted by someone else"))?;
573 }
574
575 if let Some(content) = &update.content {
576 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
577 project_id: ActiveValue::Set(project_id),
578 worktree_id: ActiveValue::Set(update.worktree_id as i64),
579 path: ActiveValue::Set(update.path.clone()),
580 content: ActiveValue::Set(content.clone()),
581 kind: ActiveValue::Set(kind),
582 })
583 .on_conflict(
584 OnConflict::columns([
585 worktree_settings_file::Column::ProjectId,
586 worktree_settings_file::Column::WorktreeId,
587 worktree_settings_file::Column::Path,
588 ])
589 .update_column(worktree_settings_file::Column::Content)
590 .to_owned(),
591 )
592 .exec(&*tx)
593 .await?;
594 } else {
595 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
596 project_id: ActiveValue::Set(project_id),
597 worktree_id: ActiveValue::Set(update.worktree_id as i64),
598 path: ActiveValue::Set(update.path.clone()),
599 ..Default::default()
600 })
601 .exec(&*tx)
602 .await?;
603 }
604
605 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
606 Ok(connection_ids)
607 })
608 .await
609 }
610
611 pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
612 self.transaction(|tx| async move {
613 Ok(project::Entity::find_by_id(id)
614 .one(&*tx)
615 .await?
616 .ok_or_else(|| anyhow!("no such project"))?)
617 })
618 .await
619 }
620
621 /// Adds the given connection to the specified project
622 /// in the current room.
623 pub async fn join_project(
624 &self,
625 project_id: ProjectId,
626 connection: ConnectionId,
627 user_id: UserId,
628 ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
629 self.project_transaction(project_id, |tx| async move {
630 let (project, role) = self
631 .access_project(project_id, connection, Capability::ReadOnly, &tx)
632 .await?;
633 self.join_project_internal(project, user_id, connection, role, &tx)
634 .await
635 })
636 .await
637 }
638
639 async fn join_project_internal(
640 &self,
641 project: project::Model,
642 user_id: UserId,
643 connection: ConnectionId,
644 role: ChannelRole,
645 tx: &DatabaseTransaction,
646 ) -> Result<(Project, ReplicaId)> {
647 let mut collaborators = project
648 .find_related(project_collaborator::Entity)
649 .all(tx)
650 .await?;
651 let replica_ids = collaborators
652 .iter()
653 .map(|c| c.replica_id)
654 .collect::<HashSet<_>>();
655 let mut replica_id = ReplicaId(1);
656 while replica_ids.contains(&replica_id) {
657 replica_id.0 += 1;
658 }
659 let new_collaborator = project_collaborator::ActiveModel {
660 project_id: ActiveValue::set(project.id),
661 connection_id: ActiveValue::set(connection.id as i32),
662 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
663 user_id: ActiveValue::set(user_id),
664 replica_id: ActiveValue::set(replica_id),
665 is_host: ActiveValue::set(false),
666 ..Default::default()
667 }
668 .insert(tx)
669 .await?;
670 collaborators.push(new_collaborator);
671
672 let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
673 let mut worktrees = db_worktrees
674 .into_iter()
675 .map(|db_worktree| {
676 (
677 db_worktree.id as u64,
678 Worktree {
679 id: db_worktree.id as u64,
680 abs_path: db_worktree.abs_path,
681 root_name: db_worktree.root_name,
682 visible: db_worktree.visible,
683 entries: Default::default(),
684 repository_entries: Default::default(),
685 diagnostic_summaries: Default::default(),
686 settings_files: Default::default(),
687 scan_id: db_worktree.scan_id as u64,
688 completed_scan_id: db_worktree.completed_scan_id as u64,
689 },
690 )
691 })
692 .collect::<BTreeMap<_, _>>();
693
694 // Populate worktree entries.
695 {
696 let mut db_entries = worktree_entry::Entity::find()
697 .filter(
698 Condition::all()
699 .add(worktree_entry::Column::ProjectId.eq(project.id))
700 .add(worktree_entry::Column::IsDeleted.eq(false)),
701 )
702 .stream(tx)
703 .await?;
704 while let Some(db_entry) = db_entries.next().await {
705 let db_entry = db_entry?;
706 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
707 worktree.entries.push(proto::Entry {
708 id: db_entry.id as u64,
709 is_dir: db_entry.is_dir,
710 path: db_entry.path,
711 inode: db_entry.inode as u64,
712 mtime: Some(proto::Timestamp {
713 seconds: db_entry.mtime_seconds as u64,
714 nanos: db_entry.mtime_nanos as u32,
715 }),
716 canonical_path: db_entry.canonical_path,
717 is_ignored: db_entry.is_ignored,
718 is_external: db_entry.is_external,
719 // This is only used in the summarization backlog, so if it's None,
720 // that just means we won't be able to detect when to resummarize
721 // based on total number of backlogged bytes - instead, we'd go
722 // on number of files only. That shouldn't be a huge deal in practice.
723 size: None,
724 is_fifo: db_entry.is_fifo,
725 });
726 }
727 }
728 }
729
730 // Populate repository entries.
731 {
732 let db_repository_entries = worktree_repository::Entity::find()
733 .filter(
734 Condition::all()
735 .add(worktree_repository::Column::ProjectId.eq(project.id))
736 .add(worktree_repository::Column::IsDeleted.eq(false)),
737 )
738 .all(tx)
739 .await?;
740 for db_repository_entry in db_repository_entries {
741 if let Some(worktree) = worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
742 {
743 let mut repository_statuses = worktree_repository_statuses::Entity::find()
744 .filter(
745 Condition::all()
746 .add(worktree_repository_statuses::Column::ProjectId.eq(project.id))
747 .add(
748 worktree_repository_statuses::Column::WorktreeId
749 .eq(worktree.id),
750 )
751 .add(
752 worktree_repository_statuses::Column::WorkDirectoryId
753 .eq(db_repository_entry.work_directory_id),
754 )
755 .add(worktree_repository_statuses::Column::IsDeleted.eq(false)),
756 )
757 .stream(tx)
758 .await?;
759 let mut updated_statuses = Vec::new();
760 while let Some(status_entry) = repository_statuses.next().await {
761 let status_entry: worktree_repository_statuses::Model = status_entry?;
762 updated_statuses.push(proto::StatusEntry {
763 repo_path: status_entry.repo_path,
764 status: status_entry.status as i32,
765 });
766 }
767
768 worktree.repository_entries.insert(
769 db_repository_entry.work_directory_id as u64,
770 proto::RepositoryEntry {
771 work_directory_id: db_repository_entry.work_directory_id as u64,
772 branch: db_repository_entry.branch,
773 updated_statuses,
774 removed_statuses: Vec::new(),
775 },
776 );
777 }
778 }
779 }
780
781 // Populate worktree diagnostic summaries.
782 {
783 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
784 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
785 .stream(tx)
786 .await?;
787 while let Some(db_summary) = db_summaries.next().await {
788 let db_summary = db_summary?;
789 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
790 worktree
791 .diagnostic_summaries
792 .push(proto::DiagnosticSummary {
793 path: db_summary.path,
794 language_server_id: db_summary.language_server_id as u64,
795 error_count: db_summary.error_count as u32,
796 warning_count: db_summary.warning_count as u32,
797 });
798 }
799 }
800 }
801
802 // Populate worktree settings files
803 {
804 let mut db_settings_files = worktree_settings_file::Entity::find()
805 .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
806 .stream(tx)
807 .await?;
808 while let Some(db_settings_file) = db_settings_files.next().await {
809 let db_settings_file = db_settings_file?;
810 if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
811 worktree.settings_files.push(WorktreeSettingsFile {
812 path: db_settings_file.path,
813 content: db_settings_file.content,
814 kind: db_settings_file.kind,
815 });
816 }
817 }
818 }
819
820 // Populate language servers.
821 let language_servers = project
822 .find_related(language_server::Entity)
823 .all(tx)
824 .await?;
825
826 let project = Project {
827 id: project.id,
828 role,
829 collaborators: collaborators
830 .into_iter()
831 .map(|collaborator| ProjectCollaborator {
832 connection_id: collaborator.connection(),
833 user_id: collaborator.user_id,
834 replica_id: collaborator.replica_id,
835 is_host: collaborator.is_host,
836 })
837 .collect(),
838 worktrees,
839 language_servers: language_servers
840 .into_iter()
841 .map(|language_server| proto::LanguageServer {
842 id: language_server.id as u64,
843 name: language_server.name,
844 worktree_id: None,
845 })
846 .collect(),
847 };
848 Ok((project, replica_id as ReplicaId))
849 }
850
851 /// Removes the given connection from the specified project.
852 pub async fn leave_project(
853 &self,
854 project_id: ProjectId,
855 connection: ConnectionId,
856 ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
857 self.project_transaction(project_id, |tx| async move {
858 let result = project_collaborator::Entity::delete_many()
859 .filter(
860 Condition::all()
861 .add(project_collaborator::Column::ProjectId.eq(project_id))
862 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
863 .add(
864 project_collaborator::Column::ConnectionServerId
865 .eq(connection.owner_id as i32),
866 ),
867 )
868 .exec(&*tx)
869 .await?;
870 if result.rows_affected == 0 {
871 Err(anyhow!("not a collaborator on this project"))?;
872 }
873
874 let project = project::Entity::find_by_id(project_id)
875 .one(&*tx)
876 .await?
877 .ok_or_else(|| anyhow!("no such project"))?;
878 let collaborators = project
879 .find_related(project_collaborator::Entity)
880 .all(&*tx)
881 .await?;
882 let connection_ids: Vec<ConnectionId> = collaborators
883 .into_iter()
884 .map(|collaborator| collaborator.connection())
885 .collect();
886
887 follower::Entity::delete_many()
888 .filter(
889 Condition::any()
890 .add(
891 Condition::all()
892 .add(follower::Column::ProjectId.eq(Some(project_id)))
893 .add(
894 follower::Column::LeaderConnectionServerId
895 .eq(connection.owner_id),
896 )
897 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
898 )
899 .add(
900 Condition::all()
901 .add(follower::Column::ProjectId.eq(Some(project_id)))
902 .add(
903 follower::Column::FollowerConnectionServerId
904 .eq(connection.owner_id),
905 )
906 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
907 ),
908 )
909 .exec(&*tx)
910 .await?;
911
912 let room = if let Some(room_id) = project.room_id {
913 Some(self.get_room(room_id, &tx).await?)
914 } else {
915 None
916 };
917
918 let left_project = LeftProject {
919 id: project_id,
920 should_unshare: connection == project.host_connection()?,
921 connection_ids,
922 };
923 Ok((room, left_project))
924 })
925 .await
926 }
927
928 pub async fn check_user_is_project_host(
929 &self,
930 project_id: ProjectId,
931 connection_id: ConnectionId,
932 ) -> Result<()> {
933 self.project_transaction(project_id, |tx| async move {
934 project::Entity::find()
935 .filter(
936 Condition::all()
937 .add(project::Column::Id.eq(project_id))
938 .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
939 .add(
940 project::Column::HostConnectionServerId
941 .eq(Some(connection_id.owner_id as i32)),
942 ),
943 )
944 .one(&*tx)
945 .await?
946 .ok_or_else(|| anyhow!("failed to read project host"))?;
947
948 Ok(())
949 })
950 .await
951 .map(|guard| guard.into_inner())
952 }
953
954 /// Returns the current project if the given user is authorized to access it with the specified capability.
955 pub async fn access_project(
956 &self,
957 project_id: ProjectId,
958 connection_id: ConnectionId,
959 capability: Capability,
960 tx: &DatabaseTransaction,
961 ) -> Result<(project::Model, ChannelRole)> {
962 let project = project::Entity::find_by_id(project_id)
963 .one(tx)
964 .await?
965 .ok_or_else(|| anyhow!("no such project"))?;
966
967 let role_from_room = if let Some(room_id) = project.room_id {
968 room_participant::Entity::find()
969 .filter(room_participant::Column::RoomId.eq(room_id))
970 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
971 .one(tx)
972 .await?
973 .and_then(|participant| participant.role)
974 } else {
975 None
976 };
977
978 let role = role_from_room.unwrap_or(ChannelRole::Banned);
979
980 match capability {
981 Capability::ReadWrite => {
982 if !role.can_edit_projects() {
983 return Err(anyhow!("not authorized to edit projects"))?;
984 }
985 }
986 Capability::ReadOnly => {
987 if !role.can_read_projects() {
988 return Err(anyhow!("not authorized to read projects"))?;
989 }
990 }
991 }
992
993 Ok((project, role))
994 }
995
996 /// Returns the host connection for a read-only request to join a shared project.
997 pub async fn host_for_read_only_project_request(
998 &self,
999 project_id: ProjectId,
1000 connection_id: ConnectionId,
1001 ) -> Result<ConnectionId> {
1002 self.project_transaction(project_id, |tx| async move {
1003 let (project, _) = self
1004 .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1005 .await?;
1006 project.host_connection()
1007 })
1008 .await
1009 .map(|guard| guard.into_inner())
1010 }
1011
1012 /// Returns the host connection for a request to join a shared project.
1013 pub async fn host_for_mutating_project_request(
1014 &self,
1015 project_id: ProjectId,
1016 connection_id: ConnectionId,
1017 ) -> Result<ConnectionId> {
1018 self.project_transaction(project_id, |tx| async move {
1019 let (project, _) = self
1020 .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1021 .await?;
1022 project.host_connection()
1023 })
1024 .await
1025 .map(|guard| guard.into_inner())
1026 }
1027
1028 pub async fn connections_for_buffer_update(
1029 &self,
1030 project_id: ProjectId,
1031 connection_id: ConnectionId,
1032 capability: Capability,
1033 ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1034 self.project_transaction(project_id, |tx| async move {
1035 // Authorize
1036 let (project, _) = self
1037 .access_project(project_id, connection_id, capability, &tx)
1038 .await?;
1039
1040 let host_connection_id = project.host_connection()?;
1041
1042 let collaborators = project_collaborator::Entity::find()
1043 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1044 .all(&*tx)
1045 .await?;
1046
1047 let guest_connection_ids = collaborators
1048 .into_iter()
1049 .filter_map(|collaborator| {
1050 if collaborator.is_host {
1051 None
1052 } else {
1053 Some(collaborator.connection())
1054 }
1055 })
1056 .collect();
1057
1058 Ok((host_connection_id, guest_connection_ids))
1059 })
1060 .await
1061 }
1062
1063 /// Returns the connection IDs in the given project.
1064 ///
1065 /// The provided `connection_id` must also be a collaborator in the project,
1066 /// otherwise an error will be returned.
1067 pub async fn project_connection_ids(
1068 &self,
1069 project_id: ProjectId,
1070 connection_id: ConnectionId,
1071 exclude_dev_server: bool,
1072 ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1073 self.project_transaction(project_id, |tx| async move {
1074 let project = project::Entity::find_by_id(project_id)
1075 .one(&*tx)
1076 .await?
1077 .ok_or_else(|| anyhow!("no such project"))?;
1078
1079 let mut collaborators = project_collaborator::Entity::find()
1080 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1081 .stream(&*tx)
1082 .await?;
1083
1084 let mut connection_ids = HashSet::default();
1085 if let Some(host_connection) = project.host_connection().log_err() {
1086 if !exclude_dev_server {
1087 connection_ids.insert(host_connection);
1088 }
1089 }
1090
1091 while let Some(collaborator) = collaborators.next().await {
1092 let collaborator = collaborator?;
1093 connection_ids.insert(collaborator.connection());
1094 }
1095
1096 if connection_ids.contains(&connection_id)
1097 || Some(connection_id) == project.host_connection().ok()
1098 {
1099 Ok(connection_ids)
1100 } else {
1101 Err(anyhow!(
1102 "can only send project updates to a project you're in"
1103 ))?
1104 }
1105 })
1106 .await
1107 }
1108
1109 async fn project_guest_connection_ids(
1110 &self,
1111 project_id: ProjectId,
1112 tx: &DatabaseTransaction,
1113 ) -> Result<Vec<ConnectionId>> {
1114 let mut collaborators = project_collaborator::Entity::find()
1115 .filter(
1116 project_collaborator::Column::ProjectId
1117 .eq(project_id)
1118 .and(project_collaborator::Column::IsHost.eq(false)),
1119 )
1120 .stream(tx)
1121 .await?;
1122
1123 let mut guest_connection_ids = Vec::new();
1124 while let Some(collaborator) = collaborators.next().await {
1125 let collaborator = collaborator?;
1126 guest_connection_ids.push(collaborator.connection());
1127 }
1128 Ok(guest_connection_ids)
1129 }
1130
1131 /// Returns the [`RoomId`] for the given project.
1132 pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1133 self.transaction(|tx| async move {
1134 Ok(project::Entity::find_by_id(project_id)
1135 .one(&*tx)
1136 .await?
1137 .and_then(|project| project.room_id))
1138 })
1139 .await
1140 }
1141
1142 pub async fn check_room_participants(
1143 &self,
1144 room_id: RoomId,
1145 leader_id: ConnectionId,
1146 follower_id: ConnectionId,
1147 ) -> Result<()> {
1148 self.transaction(|tx| async move {
1149 use room_participant::Column;
1150
1151 let count = room_participant::Entity::find()
1152 .filter(
1153 Condition::all().add(Column::RoomId.eq(room_id)).add(
1154 Condition::any()
1155 .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1156 Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1157 ))
1158 .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1159 Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1160 )),
1161 ),
1162 )
1163 .count(&*tx)
1164 .await?;
1165
1166 if count < 2 {
1167 Err(anyhow!("not room participants"))?;
1168 }
1169
1170 Ok(())
1171 })
1172 .await
1173 }
1174
1175 /// Adds the given follower connection as a follower of the given leader connection.
1176 pub async fn follow(
1177 &self,
1178 room_id: RoomId,
1179 project_id: ProjectId,
1180 leader_connection: ConnectionId,
1181 follower_connection: ConnectionId,
1182 ) -> Result<TransactionGuard<proto::Room>> {
1183 self.room_transaction(room_id, |tx| async move {
1184 follower::ActiveModel {
1185 room_id: ActiveValue::set(room_id),
1186 project_id: ActiveValue::set(project_id),
1187 leader_connection_server_id: ActiveValue::set(ServerId(
1188 leader_connection.owner_id as i32,
1189 )),
1190 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1191 follower_connection_server_id: ActiveValue::set(ServerId(
1192 follower_connection.owner_id as i32,
1193 )),
1194 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1195 ..Default::default()
1196 }
1197 .insert(&*tx)
1198 .await?;
1199
1200 let room = self.get_room(room_id, &tx).await?;
1201 Ok(room)
1202 })
1203 .await
1204 }
1205
1206 /// Removes the given follower connection as a follower of the given leader connection.
1207 pub async fn unfollow(
1208 &self,
1209 room_id: RoomId,
1210 project_id: ProjectId,
1211 leader_connection: ConnectionId,
1212 follower_connection: ConnectionId,
1213 ) -> Result<TransactionGuard<proto::Room>> {
1214 self.room_transaction(room_id, |tx| async move {
1215 follower::Entity::delete_many()
1216 .filter(
1217 Condition::all()
1218 .add(follower::Column::RoomId.eq(room_id))
1219 .add(follower::Column::ProjectId.eq(project_id))
1220 .add(
1221 follower::Column::LeaderConnectionServerId
1222 .eq(leader_connection.owner_id),
1223 )
1224 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1225 .add(
1226 follower::Column::FollowerConnectionServerId
1227 .eq(follower_connection.owner_id),
1228 )
1229 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1230 )
1231 .exec(&*tx)
1232 .await?;
1233
1234 let room = self.get_room(room_id, &tx).await?;
1235 Ok(room)
1236 })
1237 .await
1238 }
1239}