1use anyhow::Context as _;
2use collections::HashSet;
3use util::ResultExt;
4
5use super::*;
6
7impl Database {
8 /// Returns the count of all projects, excluding ones marked as admin.
9 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
10 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
11 enum QueryAs {
12 Count,
13 }
14
15 self.transaction(|tx| async move {
16 Ok(project::Entity::find()
17 .select_only()
18 .column_as(project::Column::Id.count(), QueryAs::Count)
19 .inner_join(user::Entity)
20 .filter(user::Column::Admin.eq(false))
21 .into_values::<_, QueryAs>()
22 .one(&*tx)
23 .await?
24 .unwrap_or(0i64) as usize)
25 })
26 .await
27 }
28
29 /// Shares a project with the given room.
30 pub async fn share_project(
31 &self,
32 room_id: RoomId,
33 connection: ConnectionId,
34 worktrees: &[proto::WorktreeMetadata],
35 is_ssh_project: bool,
36 windows_paths: bool,
37 features: &[String],
38 ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
39 self.room_transaction(room_id, |tx| async move {
40 let participant = room_participant::Entity::find()
41 .filter(
42 Condition::all()
43 .add(
44 room_participant::Column::AnsweringConnectionId
45 .eq(connection.id as i32),
46 )
47 .add(
48 room_participant::Column::AnsweringConnectionServerId
49 .eq(connection.owner_id as i32),
50 ),
51 )
52 .one(&*tx)
53 .await?
54 .context("could not find participant")?;
55 if participant.room_id != room_id {
56 return Err(anyhow!("shared project on unexpected room"))?;
57 }
58 if !participant
59 .role
60 .unwrap_or(ChannelRole::Member)
61 .can_edit_projects()
62 {
63 return Err(anyhow!("guests cannot share projects"))?;
64 }
65
66 let project = project::ActiveModel {
67 room_id: ActiveValue::set(Some(participant.room_id)),
68 host_user_id: ActiveValue::set(Some(participant.user_id)),
69 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
70 host_connection_server_id: ActiveValue::set(Some(ServerId(
71 connection.owner_id as i32,
72 ))),
73 id: ActiveValue::NotSet,
74 windows_paths: ActiveValue::set(windows_paths),
75 features: ActiveValue::set(serde_json::to_string(features).unwrap()),
76 }
77 .insert(&*tx)
78 .await?;
79
80 if !worktrees.is_empty() {
81 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
82 worktree::ActiveModel {
83 id: ActiveValue::set(worktree.id as i64),
84 project_id: ActiveValue::set(project.id),
85 abs_path: ActiveValue::set(worktree.abs_path.clone()),
86 root_name: ActiveValue::set(worktree.root_name.clone()),
87 visible: ActiveValue::set(worktree.visible),
88 scan_id: ActiveValue::set(0),
89 completed_scan_id: ActiveValue::set(0),
90 root_repo_common_dir: ActiveValue::set(None),
91 }
92 }))
93 .exec(&*tx)
94 .await?;
95 }
96
97 let replica_id = if is_ssh_project {
98 clock::ReplicaId::REMOTE_SERVER
99 } else {
100 clock::ReplicaId::LOCAL
101 };
102
103 project_collaborator::ActiveModel {
104 project_id: ActiveValue::set(project.id),
105 connection_id: ActiveValue::set(connection.id as i32),
106 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
107 user_id: ActiveValue::set(participant.user_id),
108 replica_id: ActiveValue::set(ReplicaId(replica_id.as_u16() as i32)),
109 is_host: ActiveValue::set(true),
110 id: ActiveValue::NotSet,
111 committer_name: ActiveValue::Set(None),
112 committer_email: ActiveValue::Set(None),
113 }
114 .insert(&*tx)
115 .await?;
116
117 let room = self.get_room(room_id, &tx).await?;
118 Ok((project.id, room))
119 })
120 .await
121 }
122
123 pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
124 self.transaction(|tx| async move {
125 project::Entity::delete_by_id(project_id).exec(&*tx).await?;
126 Ok(())
127 })
128 .await
129 }
130
131 /// Unshares the given project.
132 pub async fn unshare_project(
133 &self,
134 project_id: ProjectId,
135 connection: ConnectionId,
136 ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
137 self.project_transaction(project_id, |tx| async move {
138 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
139 let project = project::Entity::find_by_id(project_id)
140 .one(&*tx)
141 .await?
142 .context("project not found")?;
143 let room = if let Some(room_id) = project.room_id {
144 Some(self.get_room(room_id, &tx).await?)
145 } else {
146 None
147 };
148 if project.host_connection()? == connection {
149 return Ok((true, room, guest_connection_ids));
150 }
151 Err(anyhow!("cannot unshare a project hosted by another user"))?
152 })
153 .await
154 }
155
156 /// Updates the worktrees associated with the given project.
157 pub async fn update_project(
158 &self,
159 project_id: ProjectId,
160 connection: ConnectionId,
161 worktrees: &[proto::WorktreeMetadata],
162 ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
163 self.project_transaction(project_id, |tx| async move {
164 let project = project::Entity::find_by_id(project_id)
165 .filter(
166 Condition::all()
167 .add(project::Column::HostConnectionId.eq(connection.id as i32))
168 .add(
169 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
170 ),
171 )
172 .one(&*tx)
173 .await?
174 .context("no such project")?;
175
176 self.update_project_worktrees(project.id, worktrees, &tx)
177 .await?;
178
179 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
180
181 let room = if let Some(room_id) = project.room_id {
182 Some(self.get_room(room_id, &tx).await?)
183 } else {
184 None
185 };
186
187 Ok((room, guest_connection_ids))
188 })
189 .await
190 }
191
192 pub(in crate::db) async fn update_project_worktrees(
193 &self,
194 project_id: ProjectId,
195 worktrees: &[proto::WorktreeMetadata],
196 tx: &DatabaseTransaction,
197 ) -> Result<()> {
198 if !worktrees.is_empty() {
199 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
200 id: ActiveValue::set(worktree.id as i64),
201 project_id: ActiveValue::set(project_id),
202 abs_path: ActiveValue::set(worktree.abs_path.clone()),
203 root_name: ActiveValue::set(worktree.root_name.clone()),
204 visible: ActiveValue::set(worktree.visible),
205 scan_id: ActiveValue::set(0),
206 completed_scan_id: ActiveValue::set(0),
207 root_repo_common_dir: ActiveValue::set(None),
208 }))
209 .on_conflict(
210 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
211 .update_column(worktree::Column::RootName)
212 .to_owned(),
213 )
214 .exec(tx)
215 .await?;
216 }
217
218 worktree::Entity::delete_many()
219 .filter(worktree::Column::ProjectId.eq(project_id).and(
220 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
221 ))
222 .exec(tx)
223 .await?;
224
225 Ok(())
226 }
227
228 pub async fn update_worktree(
229 &self,
230 update: &proto::UpdateWorktree,
231 connection: ConnectionId,
232 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
233 if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
234 || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
235 {
236 return Err(anyhow!(
237 "invalid worktree update. removed entries: {}, updated entries: {}",
238 update.removed_entries.len(),
239 update.updated_entries.len()
240 ))?;
241 }
242
243 let project_id = ProjectId::from_proto(update.project_id);
244 let worktree_id = update.worktree_id as i64;
245 self.project_transaction(project_id, |tx| async move {
246 // Ensure the update comes from the host.
247 let _project = project::Entity::find_by_id(project_id)
248 .filter(
249 Condition::all()
250 .add(project::Column::HostConnectionId.eq(connection.id as i32))
251 .add(
252 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
253 ),
254 )
255 .one(&*tx)
256 .await?
257 .with_context(|| format!("no such project: {project_id}"))?;
258
259 // Update metadata.
260 worktree::Entity::update(worktree::ActiveModel {
261 id: ActiveValue::set(worktree_id),
262 project_id: ActiveValue::set(project_id),
263 root_name: ActiveValue::set(update.root_name.clone()),
264 scan_id: ActiveValue::set(update.scan_id as i64),
265 completed_scan_id: if update.is_last_update {
266 ActiveValue::set(update.scan_id as i64)
267 } else {
268 ActiveValue::default()
269 },
270 abs_path: ActiveValue::set(update.abs_path.clone()),
271 root_repo_common_dir: ActiveValue::set(update.root_repo_common_dir.clone()),
272 ..Default::default()
273 })
274 .exec(&*tx)
275 .await?;
276
277 if !update.updated_entries.is_empty() {
278 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
279 let mtime = entry.mtime.clone().unwrap_or_default();
280 worktree_entry::ActiveModel {
281 project_id: ActiveValue::set(project_id),
282 worktree_id: ActiveValue::set(worktree_id),
283 id: ActiveValue::set(entry.id as i64),
284 is_dir: ActiveValue::set(entry.is_dir),
285 path: ActiveValue::set(entry.path.clone()),
286 inode: ActiveValue::set(entry.inode as i64),
287 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
288 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
289 canonical_path: ActiveValue::set(entry.canonical_path.clone()),
290 is_ignored: ActiveValue::set(entry.is_ignored),
291 git_status: ActiveValue::set(None),
292 is_external: ActiveValue::set(entry.is_external),
293 is_deleted: ActiveValue::set(false),
294 is_hidden: ActiveValue::set(entry.is_hidden),
295 scan_id: ActiveValue::set(update.scan_id as i64),
296 is_fifo: ActiveValue::set(entry.is_fifo),
297 }
298 }))
299 .on_conflict(
300 OnConflict::columns([
301 worktree_entry::Column::ProjectId,
302 worktree_entry::Column::WorktreeId,
303 worktree_entry::Column::Id,
304 ])
305 .update_columns([
306 worktree_entry::Column::IsDir,
307 worktree_entry::Column::Path,
308 worktree_entry::Column::Inode,
309 worktree_entry::Column::MtimeSeconds,
310 worktree_entry::Column::MtimeNanos,
311 worktree_entry::Column::CanonicalPath,
312 worktree_entry::Column::IsIgnored,
313 worktree_entry::Column::IsHidden,
314 worktree_entry::Column::ScanId,
315 ])
316 .to_owned(),
317 )
318 .exec(&*tx)
319 .await?;
320 }
321
322 if !update.removed_entries.is_empty() {
323 worktree_entry::Entity::update_many()
324 .filter(
325 worktree_entry::Column::ProjectId
326 .eq(project_id)
327 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
328 .and(
329 worktree_entry::Column::Id
330 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
331 ),
332 )
333 .set(worktree_entry::ActiveModel {
334 is_deleted: ActiveValue::Set(true),
335 scan_id: ActiveValue::Set(update.scan_id as i64),
336 ..Default::default()
337 })
338 .exec(&*tx)
339 .await?;
340 }
341
342 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
343 Ok(connection_ids)
344 })
345 .await
346 }
347
348 pub async fn update_repository(
349 &self,
350 update: &proto::UpdateRepository,
351 _connection: ConnectionId,
352 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
353 let project_id = ProjectId::from_proto(update.project_id);
354 let repository_id = update.id as i64;
355 self.project_transaction(project_id, |tx| async move {
356 project_repository::Entity::insert(project_repository::ActiveModel {
357 project_id: ActiveValue::set(project_id),
358 id: ActiveValue::set(repository_id),
359 legacy_worktree_id: ActiveValue::set(None),
360 abs_path: ActiveValue::set(update.abs_path.clone()),
361 entry_ids: ActiveValue::Set(serde_json::to_string(&update.entry_ids).unwrap()),
362 scan_id: ActiveValue::set(update.scan_id as i64),
363 is_deleted: ActiveValue::set(false),
364 branch_summary: ActiveValue::Set(
365 update
366 .branch_summary
367 .as_ref()
368 .map(|summary| serde_json::to_string(summary).unwrap()),
369 ),
370 head_commit_details: ActiveValue::Set(
371 update
372 .head_commit_details
373 .as_ref()
374 .map(|details| serde_json::to_string(details).unwrap()),
375 ),
376 current_merge_conflicts: ActiveValue::Set(Some(
377 serde_json::to_string(&update.current_merge_conflicts).unwrap(),
378 )),
379 merge_message: ActiveValue::set(update.merge_message.clone()),
380 remote_upstream_url: ActiveValue::set(update.remote_upstream_url.clone()),
381 remote_origin_url: ActiveValue::set(update.remote_origin_url.clone()),
382 linked_worktrees: ActiveValue::Set(Some(
383 serde_json::to_string(&update.linked_worktrees).unwrap(),
384 )),
385 })
386 .on_conflict(
387 OnConflict::columns([
388 project_repository::Column::ProjectId,
389 project_repository::Column::Id,
390 ])
391 .update_columns([
392 project_repository::Column::ScanId,
393 project_repository::Column::BranchSummary,
394 project_repository::Column::EntryIds,
395 project_repository::Column::AbsPath,
396 project_repository::Column::CurrentMergeConflicts,
397 project_repository::Column::HeadCommitDetails,
398 project_repository::Column::MergeMessage,
399 project_repository::Column::LinkedWorktrees,
400 ])
401 .to_owned(),
402 )
403 .exec(&*tx)
404 .await?;
405
406 let has_any_statuses = !update.updated_statuses.is_empty();
407
408 if has_any_statuses {
409 project_repository_statuses::Entity::insert_many(
410 update.updated_statuses.iter().map(|status_entry| {
411 let (repo_path, status_kind, first_status, second_status) =
412 proto_status_to_db(status_entry.clone());
413 project_repository_statuses::ActiveModel {
414 project_id: ActiveValue::set(project_id),
415 repository_id: ActiveValue::set(repository_id),
416 scan_id: ActiveValue::set(update.scan_id as i64),
417 is_deleted: ActiveValue::set(false),
418 repo_path: ActiveValue::set(repo_path),
419 status: ActiveValue::set(0),
420 status_kind: ActiveValue::set(status_kind),
421 first_status: ActiveValue::set(first_status),
422 second_status: ActiveValue::set(second_status),
423 lines_added: ActiveValue::set(
424 status_entry.diff_stat_added.map(|v| v as i32),
425 ),
426 lines_deleted: ActiveValue::set(
427 status_entry.diff_stat_deleted.map(|v| v as i32),
428 ),
429 }
430 }),
431 )
432 .on_conflict(
433 OnConflict::columns([
434 project_repository_statuses::Column::ProjectId,
435 project_repository_statuses::Column::RepositoryId,
436 project_repository_statuses::Column::RepoPath,
437 ])
438 .update_columns([
439 project_repository_statuses::Column::ScanId,
440 project_repository_statuses::Column::StatusKind,
441 project_repository_statuses::Column::FirstStatus,
442 project_repository_statuses::Column::SecondStatus,
443 project_repository_statuses::Column::LinesAdded,
444 project_repository_statuses::Column::LinesDeleted,
445 ])
446 .to_owned(),
447 )
448 .exec(&*tx)
449 .await?;
450 }
451
452 let has_any_removed_statuses = !update.removed_statuses.is_empty();
453
454 if has_any_removed_statuses {
455 project_repository_statuses::Entity::update_many()
456 .filter(
457 project_repository_statuses::Column::ProjectId
458 .eq(project_id)
459 .and(
460 project_repository_statuses::Column::RepositoryId.eq(repository_id),
461 )
462 .and(
463 project_repository_statuses::Column::RepoPath
464 .is_in(update.removed_statuses.iter()),
465 ),
466 )
467 .set(project_repository_statuses::ActiveModel {
468 is_deleted: ActiveValue::Set(true),
469 scan_id: ActiveValue::Set(update.scan_id as i64),
470 ..Default::default()
471 })
472 .exec(&*tx)
473 .await?;
474 }
475
476 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
477 Ok(connection_ids)
478 })
479 .await
480 }
481
482 pub async fn remove_repository(
483 &self,
484 remove: &proto::RemoveRepository,
485 _connection: ConnectionId,
486 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
487 let project_id = ProjectId::from_proto(remove.project_id);
488 let repository_id = remove.id as i64;
489 self.project_transaction(project_id, |tx| async move {
490 project_repository::Entity::update_many()
491 .filter(
492 project_repository::Column::ProjectId
493 .eq(project_id)
494 .and(project_repository::Column::Id.eq(repository_id)),
495 )
496 .set(project_repository::ActiveModel {
497 is_deleted: ActiveValue::Set(true),
498 // scan_id: ActiveValue::Set(update.scan_id as i64),
499 ..Default::default()
500 })
501 .exec(&*tx)
502 .await?;
503
504 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
505 Ok(connection_ids)
506 })
507 .await
508 }
509
510 /// Updates the diagnostic summary for the given connection.
511 pub async fn update_diagnostic_summary(
512 &self,
513 update: &proto::UpdateDiagnosticSummary,
514 connection: ConnectionId,
515 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
516 let project_id = ProjectId::from_proto(update.project_id);
517 let worktree_id = update.worktree_id as i64;
518 self.project_transaction(project_id, |tx| async move {
519 let summary = update.summary.as_ref().context("invalid summary")?;
520
521 // Ensure the update comes from the host.
522 let project = project::Entity::find_by_id(project_id)
523 .one(&*tx)
524 .await?
525 .context("no such project")?;
526 if project.host_connection()? != connection {
527 return Err(anyhow!("can't update a project hosted by someone else"))?;
528 }
529
530 // Update summary.
531 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
532 project_id: ActiveValue::set(project_id),
533 worktree_id: ActiveValue::set(worktree_id),
534 path: ActiveValue::set(summary.path.clone()),
535 language_server_id: ActiveValue::set(summary.language_server_id as i64),
536 error_count: ActiveValue::set(summary.error_count as i32),
537 warning_count: ActiveValue::set(summary.warning_count as i32),
538 })
539 .on_conflict(
540 OnConflict::columns([
541 worktree_diagnostic_summary::Column::ProjectId,
542 worktree_diagnostic_summary::Column::WorktreeId,
543 worktree_diagnostic_summary::Column::Path,
544 ])
545 .update_columns([
546 worktree_diagnostic_summary::Column::LanguageServerId,
547 worktree_diagnostic_summary::Column::ErrorCount,
548 worktree_diagnostic_summary::Column::WarningCount,
549 ])
550 .to_owned(),
551 )
552 .exec(&*tx)
553 .await?;
554
555 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
556 Ok(connection_ids)
557 })
558 .await
559 }
560
561 /// Starts the language server for the given connection.
562 pub async fn start_language_server(
563 &self,
564 update: &proto::StartLanguageServer,
565 connection: ConnectionId,
566 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
567 let project_id = ProjectId::from_proto(update.project_id);
568 self.project_transaction(project_id, |tx| async move {
569 let server = update.server.as_ref().context("invalid language server")?;
570
571 // Ensure the update comes from the host.
572 let project = project::Entity::find_by_id(project_id)
573 .one(&*tx)
574 .await?
575 .context("no such project")?;
576 if project.host_connection()? != connection {
577 return Err(anyhow!("can't update a project hosted by someone else"))?;
578 }
579
580 // Add the newly-started language server.
581 language_server::Entity::insert(language_server::ActiveModel {
582 project_id: ActiveValue::set(project_id),
583 id: ActiveValue::set(server.id as i64),
584 name: ActiveValue::set(server.name.clone()),
585 worktree_id: ActiveValue::set(server.worktree_id.map(|id| id as i64)),
586 capabilities: ActiveValue::set(update.capabilities.clone()),
587 })
588 .on_conflict(
589 OnConflict::columns([
590 language_server::Column::ProjectId,
591 language_server::Column::Id,
592 ])
593 .update_columns([
594 language_server::Column::Name,
595 language_server::Column::Capabilities,
596 language_server::Column::WorktreeId,
597 ])
598 .to_owned(),
599 )
600 .exec(&*tx)
601 .await?;
602
603 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
604 Ok(connection_ids)
605 })
606 .await
607 }
608
609 /// Updates the worktree settings for the given connection.
610 pub async fn update_worktree_settings(
611 &self,
612 update: &proto::UpdateWorktreeSettings,
613 connection: ConnectionId,
614 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
615 let project_id = ProjectId::from_proto(update.project_id);
616 let kind = match update.kind {
617 Some(kind) => proto::LocalSettingsKind::from_i32(kind)
618 .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
619 None => proto::LocalSettingsKind::Settings,
620 };
621 let kind = LocalSettingsKind::from_proto(kind);
622 self.project_transaction(project_id, |tx| async move {
623 // Ensure the update comes from the host.
624 let project = project::Entity::find_by_id(project_id)
625 .one(&*tx)
626 .await?
627 .context("no such project")?;
628 if project.host_connection()? != connection {
629 return Err(anyhow!("can't update a project hosted by someone else"))?;
630 }
631
632 if let Some(content) = &update.content {
633 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
634 project_id: ActiveValue::Set(project_id),
635 worktree_id: ActiveValue::Set(update.worktree_id as i64),
636 path: ActiveValue::Set(update.path.clone()),
637 content: ActiveValue::Set(content.clone()),
638 kind: ActiveValue::Set(kind),
639 outside_worktree: ActiveValue::Set(update.outside_worktree.unwrap_or(false)),
640 })
641 .on_conflict(
642 OnConflict::columns([
643 worktree_settings_file::Column::ProjectId,
644 worktree_settings_file::Column::WorktreeId,
645 worktree_settings_file::Column::Path,
646 ])
647 .update_columns([
648 worktree_settings_file::Column::Content,
649 worktree_settings_file::Column::OutsideWorktree,
650 ])
651 .to_owned(),
652 )
653 .exec(&*tx)
654 .await?;
655 } else {
656 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
657 project_id: ActiveValue::Set(project_id),
658 worktree_id: ActiveValue::Set(update.worktree_id as i64),
659 path: ActiveValue::Set(update.path.clone()),
660 ..Default::default()
661 })
662 .exec(&*tx)
663 .await?;
664 }
665
666 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
667 Ok(connection_ids)
668 })
669 .await
670 }
671
672 pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
673 self.transaction(|tx| async move {
674 Ok(project::Entity::find_by_id(id)
675 .one(&*tx)
676 .await?
677 .context("no such project")?)
678 })
679 .await
680 }
681
682 /// Adds the given connection to the specified project
683 /// in the current room.
684 pub async fn join_project(
685 &self,
686 project_id: ProjectId,
687 connection: ConnectionId,
688 user_id: UserId,
689 committer_name: Option<String>,
690 committer_email: Option<String>,
691 ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
692 self.project_transaction(project_id, move |tx| {
693 let committer_name = committer_name.clone();
694 let committer_email = committer_email.clone();
695 async move {
696 let (project, role) = self
697 .access_project(project_id, connection, Capability::ReadOnly, &tx)
698 .await?;
699 self.join_project_internal(
700 project,
701 user_id,
702 committer_name,
703 committer_email,
704 connection,
705 role,
706 &tx,
707 )
708 .await
709 }
710 })
711 .await
712 }
713
714 async fn join_project_internal(
715 &self,
716 project: project::Model,
717 user_id: UserId,
718 committer_name: Option<String>,
719 committer_email: Option<String>,
720 connection: ConnectionId,
721 role: ChannelRole,
722 tx: &DatabaseTransaction,
723 ) -> Result<(Project, ReplicaId)> {
724 let mut collaborators = project
725 .find_related(project_collaborator::Entity)
726 .all(tx)
727 .await?;
728 let replica_ids = collaborators
729 .iter()
730 .map(|c| c.replica_id)
731 .collect::<HashSet<_>>();
732 let mut replica_id = ReplicaId(clock::ReplicaId::FIRST_COLLAB_ID.as_u16() as i32);
733 while replica_ids.contains(&replica_id) {
734 replica_id.0 += 1;
735 }
736 let new_collaborator = project_collaborator::ActiveModel {
737 project_id: ActiveValue::set(project.id),
738 connection_id: ActiveValue::set(connection.id as i32),
739 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
740 user_id: ActiveValue::set(user_id),
741 replica_id: ActiveValue::set(replica_id),
742 is_host: ActiveValue::set(false),
743 id: ActiveValue::NotSet,
744 committer_name: ActiveValue::set(committer_name),
745 committer_email: ActiveValue::set(committer_email),
746 }
747 .insert(tx)
748 .await?;
749 collaborators.push(new_collaborator);
750
751 let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
752 let mut worktrees = db_worktrees
753 .into_iter()
754 .map(|db_worktree| {
755 (
756 db_worktree.id as u64,
757 Worktree {
758 id: db_worktree.id as u64,
759 abs_path: db_worktree.abs_path,
760 root_name: db_worktree.root_name,
761 visible: db_worktree.visible,
762 entries: Default::default(),
763 diagnostic_summaries: Default::default(),
764 settings_files: Default::default(),
765 scan_id: db_worktree.scan_id as u64,
766 completed_scan_id: db_worktree.completed_scan_id as u64,
767 root_repo_common_dir: db_worktree.root_repo_common_dir,
768 legacy_repository_entries: Default::default(),
769 },
770 )
771 })
772 .collect::<BTreeMap<_, _>>();
773
774 // Populate worktree entries.
775 {
776 let mut db_entries = worktree_entry::Entity::find()
777 .filter(
778 Condition::all()
779 .add(worktree_entry::Column::ProjectId.eq(project.id))
780 .add(worktree_entry::Column::IsDeleted.eq(false)),
781 )
782 .stream(tx)
783 .await?;
784 while let Some(db_entry) = db_entries.next().await {
785 let db_entry = db_entry?;
786 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
787 worktree.entries.push(proto::Entry {
788 id: db_entry.id as u64,
789 is_dir: db_entry.is_dir,
790 path: db_entry.path,
791 inode: db_entry.inode as u64,
792 mtime: Some(proto::Timestamp {
793 seconds: db_entry.mtime_seconds as u64,
794 nanos: db_entry.mtime_nanos as u32,
795 }),
796 canonical_path: db_entry.canonical_path,
797 is_ignored: db_entry.is_ignored,
798 is_external: db_entry.is_external,
799 is_hidden: db_entry.is_hidden,
800 // This is only used in the summarization backlog, so if it's None,
801 // that just means we won't be able to detect when to resummarize
802 // based on total number of backlogged bytes - instead, we'd go
803 // on number of files only. That shouldn't be a huge deal in practice.
804 size: None,
805 is_fifo: db_entry.is_fifo,
806 });
807 }
808 }
809 }
810
811 // Populate repository entries.
812 let mut repositories = Vec::new();
813 {
814 let db_repository_entries = project_repository::Entity::find()
815 .filter(
816 Condition::all()
817 .add(project_repository::Column::ProjectId.eq(project.id))
818 .add(project_repository::Column::IsDeleted.eq(false)),
819 )
820 .all(tx)
821 .await?;
822 for db_repository_entry in db_repository_entries {
823 let mut repository_statuses = project_repository_statuses::Entity::find()
824 .filter(
825 Condition::all()
826 .add(project_repository_statuses::Column::ProjectId.eq(project.id))
827 .add(
828 project_repository_statuses::Column::RepositoryId
829 .eq(db_repository_entry.id),
830 )
831 .add(project_repository_statuses::Column::IsDeleted.eq(false)),
832 )
833 .stream(tx)
834 .await?;
835 let mut updated_statuses = Vec::new();
836 while let Some(status_entry) = repository_statuses.next().await {
837 let status_entry = status_entry?;
838 updated_statuses.push(db_status_to_proto(status_entry)?);
839 }
840
841 let current_merge_conflicts = db_repository_entry
842 .current_merge_conflicts
843 .as_ref()
844 .map(|conflicts| serde_json::from_str(conflicts))
845 .transpose()?
846 .unwrap_or_default();
847
848 let branch_summary = db_repository_entry
849 .branch_summary
850 .as_ref()
851 .map(|branch_summary| serde_json::from_str(branch_summary))
852 .transpose()?
853 .unwrap_or_default();
854
855 let head_commit_details = db_repository_entry
856 .head_commit_details
857 .as_ref()
858 .map(|head_commit_details| serde_json::from_str(head_commit_details))
859 .transpose()?
860 .unwrap_or_default();
861
862 let entry_ids = serde_json::from_str(&db_repository_entry.entry_ids)
863 .context("failed to deserialize repository's entry ids")?;
864
865 if let Some(worktree_id) = db_repository_entry.legacy_worktree_id {
866 if let Some(worktree) = worktrees.get_mut(&(worktree_id as u64)) {
867 worktree.legacy_repository_entries.insert(
868 db_repository_entry.id as u64,
869 proto::RepositoryEntry {
870 repository_id: db_repository_entry.id as u64,
871 updated_statuses,
872 removed_statuses: Vec::new(),
873 current_merge_conflicts,
874 branch_summary,
875 },
876 );
877 }
878 } else {
879 repositories.push(proto::UpdateRepository {
880 project_id: db_repository_entry.project_id.0 as u64,
881 id: db_repository_entry.id as u64,
882 abs_path: db_repository_entry.abs_path.clone(),
883 entry_ids,
884 updated_statuses,
885 removed_statuses: Vec::new(),
886 current_merge_conflicts,
887 branch_summary,
888 head_commit_details,
889 scan_id: db_repository_entry.scan_id as u64,
890 is_last_update: true,
891 merge_message: db_repository_entry.merge_message,
892 stash_entries: Vec::new(),
893 remote_upstream_url: db_repository_entry.remote_upstream_url.clone(),
894 remote_origin_url: db_repository_entry.remote_origin_url.clone(),
895 original_repo_abs_path: Some(db_repository_entry.abs_path),
896 linked_worktrees: db_repository_entry
897 .linked_worktrees
898 .as_deref()
899 .and_then(|s| serde_json::from_str(s).ok())
900 .unwrap_or_default(),
901 });
902 }
903 }
904 }
905
906 // Populate worktree diagnostic summaries.
907 {
908 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
909 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
910 .stream(tx)
911 .await?;
912 while let Some(db_summary) = db_summaries.next().await {
913 let db_summary = db_summary?;
914 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
915 worktree
916 .diagnostic_summaries
917 .push(proto::DiagnosticSummary {
918 path: db_summary.path,
919 language_server_id: db_summary.language_server_id as u64,
920 error_count: db_summary.error_count as u32,
921 warning_count: db_summary.warning_count as u32,
922 });
923 }
924 }
925 }
926
927 // Populate worktree settings files
928 {
929 let mut db_settings_files = worktree_settings_file::Entity::find()
930 .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
931 .stream(tx)
932 .await?;
933 while let Some(db_settings_file) = db_settings_files.next().await {
934 let db_settings_file = db_settings_file?;
935 if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
936 worktree.settings_files.push(WorktreeSettingsFile {
937 path: db_settings_file.path,
938 content: db_settings_file.content,
939 kind: db_settings_file.kind,
940 outside_worktree: db_settings_file.outside_worktree,
941 });
942 }
943 }
944 }
945
946 // Populate language servers.
947 let language_servers = project
948 .find_related(language_server::Entity)
949 .all(tx)
950 .await?;
951
952 let path_style = if project.windows_paths {
953 PathStyle::Windows
954 } else {
955 PathStyle::Posix
956 };
957 let features: Vec<String> = serde_json::from_str(&project.features).unwrap_or_default();
958
959 let project = Project {
960 id: project.id,
961 role,
962 collaborators: collaborators
963 .into_iter()
964 .map(|collaborator| ProjectCollaborator {
965 connection_id: collaborator.connection(),
966 user_id: collaborator.user_id,
967 replica_id: collaborator.replica_id,
968 is_host: collaborator.is_host,
969 committer_name: collaborator.committer_name,
970 committer_email: collaborator.committer_email,
971 })
972 .collect(),
973 worktrees,
974 repositories,
975 language_servers: language_servers
976 .into_iter()
977 .map(|language_server| LanguageServer {
978 server: proto::LanguageServer {
979 id: language_server.id as u64,
980 name: language_server.name,
981 worktree_id: language_server.worktree_id.map(|id| id as u64),
982 },
983 capabilities: language_server.capabilities,
984 })
985 .collect(),
986 path_style,
987 features,
988 };
989 Ok((project, replica_id as ReplicaId))
990 }
991
992 /// Removes the given connection from the specified project.
993 pub async fn leave_project(
994 &self,
995 project_id: ProjectId,
996 connection: ConnectionId,
997 ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
998 self.project_transaction(project_id, |tx| async move {
999 let result = project_collaborator::Entity::delete_many()
1000 .filter(
1001 Condition::all()
1002 .add(project_collaborator::Column::ProjectId.eq(project_id))
1003 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
1004 .add(
1005 project_collaborator::Column::ConnectionServerId
1006 .eq(connection.owner_id as i32),
1007 ),
1008 )
1009 .exec(&*tx)
1010 .await?;
1011 if result.rows_affected == 0 {
1012 Err(anyhow!("not a collaborator on this project"))?;
1013 }
1014
1015 let project = project::Entity::find_by_id(project_id)
1016 .one(&*tx)
1017 .await?
1018 .context("no such project")?;
1019 let collaborators = project
1020 .find_related(project_collaborator::Entity)
1021 .all(&*tx)
1022 .await?;
1023 let connection_ids: Vec<ConnectionId> = collaborators
1024 .into_iter()
1025 .map(|collaborator| collaborator.connection())
1026 .collect();
1027
1028 follower::Entity::delete_many()
1029 .filter(
1030 Condition::any()
1031 .add(
1032 Condition::all()
1033 .add(follower::Column::ProjectId.eq(Some(project_id)))
1034 .add(
1035 follower::Column::LeaderConnectionServerId
1036 .eq(connection.owner_id),
1037 )
1038 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
1039 )
1040 .add(
1041 Condition::all()
1042 .add(follower::Column::ProjectId.eq(Some(project_id)))
1043 .add(
1044 follower::Column::FollowerConnectionServerId
1045 .eq(connection.owner_id),
1046 )
1047 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
1048 ),
1049 )
1050 .exec(&*tx)
1051 .await?;
1052
1053 let room = if let Some(room_id) = project.room_id {
1054 Some(self.get_room(room_id, &tx).await?)
1055 } else {
1056 None
1057 };
1058
1059 let left_project = LeftProject {
1060 id: project_id,
1061 should_unshare: connection == project.host_connection()?,
1062 connection_ids,
1063 };
1064 Ok((room, left_project))
1065 })
1066 .await
1067 }
1068
1069 pub async fn check_user_is_project_host(
1070 &self,
1071 project_id: ProjectId,
1072 connection_id: ConnectionId,
1073 ) -> Result<()> {
1074 self.project_transaction(project_id, |tx| async move {
1075 project::Entity::find()
1076 .filter(
1077 Condition::all()
1078 .add(project::Column::Id.eq(project_id))
1079 .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
1080 .add(
1081 project::Column::HostConnectionServerId
1082 .eq(Some(connection_id.owner_id as i32)),
1083 ),
1084 )
1085 .one(&*tx)
1086 .await?
1087 .context("failed to read project host")?;
1088
1089 Ok(())
1090 })
1091 .await
1092 .map(|guard| guard.into_inner())
1093 }
1094
1095 /// Returns the current project if the given user is authorized to access it with the specified capability.
1096 pub async fn access_project(
1097 &self,
1098 project_id: ProjectId,
1099 connection_id: ConnectionId,
1100 capability: Capability,
1101 tx: &DatabaseTransaction,
1102 ) -> Result<(project::Model, ChannelRole)> {
1103 let project = project::Entity::find_by_id(project_id)
1104 .one(tx)
1105 .await?
1106 .context("no such project")?;
1107
1108 let role_from_room = if let Some(room_id) = project.room_id {
1109 room_participant::Entity::find()
1110 .filter(room_participant::Column::RoomId.eq(room_id))
1111 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1112 .one(tx)
1113 .await?
1114 .and_then(|participant| participant.role)
1115 } else {
1116 None
1117 };
1118
1119 let role = role_from_room.unwrap_or(ChannelRole::Banned);
1120
1121 match capability {
1122 Capability::ReadWrite => {
1123 if !role.can_edit_projects() {
1124 return Err(anyhow!("not authorized to edit projects"))?;
1125 }
1126 }
1127 Capability::ReadOnly => {
1128 if !role.can_read_projects() {
1129 return Err(anyhow!("not authorized to read projects"))?;
1130 }
1131 }
1132 }
1133
1134 Ok((project, role))
1135 }
1136
1137 /// Returns the host connection for a read-only request to join a shared project.
1138 pub async fn host_for_read_only_project_request(
1139 &self,
1140 project_id: ProjectId,
1141 connection_id: ConnectionId,
1142 ) -> Result<ConnectionId> {
1143 self.project_transaction(project_id, |tx| async move {
1144 let (project, _) = self
1145 .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1146 .await?;
1147 project.host_connection()
1148 })
1149 .await
1150 .map(|guard| guard.into_inner())
1151 }
1152
1153 /// Returns the host connection for a request to join a shared project.
1154 pub async fn host_for_mutating_project_request(
1155 &self,
1156 project_id: ProjectId,
1157 connection_id: ConnectionId,
1158 ) -> Result<ConnectionId> {
1159 self.project_transaction(project_id, |tx| async move {
1160 let (project, _) = self
1161 .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1162 .await?;
1163 project.host_connection()
1164 })
1165 .await
1166 .map(|guard| guard.into_inner())
1167 }
1168
1169 pub async fn connections_for_buffer_update(
1170 &self,
1171 project_id: ProjectId,
1172 connection_id: ConnectionId,
1173 capability: Capability,
1174 ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1175 self.project_transaction(project_id, |tx| async move {
1176 // Authorize
1177 let (project, _) = self
1178 .access_project(project_id, connection_id, capability, &tx)
1179 .await?;
1180
1181 let host_connection_id = project.host_connection()?;
1182
1183 let collaborators = project_collaborator::Entity::find()
1184 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1185 .all(&*tx)
1186 .await?;
1187
1188 let guest_connection_ids = collaborators
1189 .into_iter()
1190 .filter_map(|collaborator| {
1191 if collaborator.is_host {
1192 None
1193 } else {
1194 Some(collaborator.connection())
1195 }
1196 })
1197 .collect();
1198
1199 Ok((host_connection_id, guest_connection_ids))
1200 })
1201 .await
1202 }
1203
1204 /// Returns the connection IDs in the given project.
1205 ///
1206 /// The provided `connection_id` must also be a collaborator in the project,
1207 /// otherwise an error will be returned.
1208 pub async fn project_connection_ids(
1209 &self,
1210 project_id: ProjectId,
1211 connection_id: ConnectionId,
1212 exclude_dev_server: bool,
1213 ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1214 self.project_transaction(project_id, |tx| async move {
1215 self.internal_project_connection_ids(project_id, connection_id, exclude_dev_server, &tx)
1216 .await
1217 })
1218 .await
1219 }
1220
1221 async fn internal_project_connection_ids(
1222 &self,
1223 project_id: ProjectId,
1224 connection_id: ConnectionId,
1225 exclude_dev_server: bool,
1226 tx: &DatabaseTransaction,
1227 ) -> Result<HashSet<ConnectionId>> {
1228 let project = project::Entity::find_by_id(project_id)
1229 .one(tx)
1230 .await?
1231 .context("no such project")?;
1232
1233 let mut collaborators = project_collaborator::Entity::find()
1234 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1235 .stream(tx)
1236 .await?;
1237
1238 let mut connection_ids = HashSet::default();
1239 if let Some(host_connection) = project.host_connection().log_err()
1240 && !exclude_dev_server
1241 {
1242 connection_ids.insert(host_connection);
1243 }
1244
1245 while let Some(collaborator) = collaborators.next().await {
1246 let collaborator = collaborator?;
1247 connection_ids.insert(collaborator.connection());
1248 }
1249
1250 if connection_ids.contains(&connection_id)
1251 || Some(connection_id) == project.host_connection().ok()
1252 {
1253 Ok(connection_ids)
1254 } else {
1255 Err(anyhow!(
1256 "can only send project updates to a project you're in"
1257 ))?
1258 }
1259 }
1260
1261 async fn project_guest_connection_ids(
1262 &self,
1263 project_id: ProjectId,
1264 tx: &DatabaseTransaction,
1265 ) -> Result<Vec<ConnectionId>> {
1266 let mut collaborators = project_collaborator::Entity::find()
1267 .filter(
1268 project_collaborator::Column::ProjectId
1269 .eq(project_id)
1270 .and(project_collaborator::Column::IsHost.eq(false)),
1271 )
1272 .stream(tx)
1273 .await?;
1274
1275 let mut guest_connection_ids = Vec::new();
1276 while let Some(collaborator) = collaborators.next().await {
1277 let collaborator = collaborator?;
1278 guest_connection_ids.push(collaborator.connection());
1279 }
1280 Ok(guest_connection_ids)
1281 }
1282
1283 /// Returns the [`RoomId`] for the given project.
1284 pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1285 self.transaction(|tx| async move {
1286 Ok(project::Entity::find_by_id(project_id)
1287 .one(&*tx)
1288 .await?
1289 .and_then(|project| project.room_id))
1290 })
1291 .await
1292 }
1293
1294 pub async fn check_room_participants(
1295 &self,
1296 room_id: RoomId,
1297 leader_id: ConnectionId,
1298 follower_id: ConnectionId,
1299 ) -> Result<()> {
1300 self.transaction(|tx| async move {
1301 use room_participant::Column;
1302
1303 let count = room_participant::Entity::find()
1304 .filter(
1305 Condition::all().add(Column::RoomId.eq(room_id)).add(
1306 Condition::any()
1307 .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1308 Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1309 ))
1310 .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1311 Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1312 )),
1313 ),
1314 )
1315 .count(&*tx)
1316 .await?;
1317
1318 if count < 2 {
1319 Err(anyhow!("not room participants"))?;
1320 }
1321
1322 Ok(())
1323 })
1324 .await
1325 }
1326
1327 /// Adds the given follower connection as a follower of the given leader connection.
1328 pub async fn follow(
1329 &self,
1330 room_id: RoomId,
1331 project_id: ProjectId,
1332 leader_connection: ConnectionId,
1333 follower_connection: ConnectionId,
1334 ) -> Result<TransactionGuard<proto::Room>> {
1335 self.room_transaction(room_id, |tx| async move {
1336 follower::ActiveModel {
1337 room_id: ActiveValue::set(room_id),
1338 project_id: ActiveValue::set(project_id),
1339 leader_connection_server_id: ActiveValue::set(ServerId(
1340 leader_connection.owner_id as i32,
1341 )),
1342 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1343 follower_connection_server_id: ActiveValue::set(ServerId(
1344 follower_connection.owner_id as i32,
1345 )),
1346 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1347 ..Default::default()
1348 }
1349 .insert(&*tx)
1350 .await?;
1351
1352 let room = self.get_room(room_id, &tx).await?;
1353 Ok(room)
1354 })
1355 .await
1356 }
1357
1358 /// Removes the given follower connection as a follower of the given leader connection.
1359 pub async fn unfollow(
1360 &self,
1361 room_id: RoomId,
1362 project_id: ProjectId,
1363 leader_connection: ConnectionId,
1364 follower_connection: ConnectionId,
1365 ) -> Result<TransactionGuard<proto::Room>> {
1366 self.room_transaction(room_id, |tx| async move {
1367 follower::Entity::delete_many()
1368 .filter(
1369 Condition::all()
1370 .add(follower::Column::RoomId.eq(room_id))
1371 .add(follower::Column::ProjectId.eq(project_id))
1372 .add(
1373 follower::Column::LeaderConnectionServerId
1374 .eq(leader_connection.owner_id),
1375 )
1376 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1377 .add(
1378 follower::Column::FollowerConnectionServerId
1379 .eq(follower_connection.owner_id),
1380 )
1381 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1382 )
1383 .exec(&*tx)
1384 .await?;
1385
1386 let room = self.get_room(room_id, &tx).await?;
1387 Ok(room)
1388 })
1389 .await
1390 }
1391}