1use anyhow::Context as _;
2use collections::HashSet;
3use util::ResultExt;
4
5use super::*;
6
7impl Database {
8 /// Returns the count of all projects, excluding ones marked as admin.
9 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
10 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
11 enum QueryAs {
12 Count,
13 }
14
15 self.transaction(|tx| async move {
16 Ok(project::Entity::find()
17 .select_only()
18 .column_as(project::Column::Id.count(), QueryAs::Count)
19 .inner_join(user::Entity)
20 .filter(user::Column::Admin.eq(false))
21 .into_values::<_, QueryAs>()
22 .one(&*tx)
23 .await?
24 .unwrap_or(0i64) as usize)
25 })
26 .await
27 }
28
29 /// Shares a project with the given room.
30 pub async fn share_project(
31 &self,
32 room_id: RoomId,
33 connection: ConnectionId,
34 worktrees: &[proto::WorktreeMetadata],
35 is_ssh_project: bool,
36 windows_paths: bool,
37 ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
38 self.room_transaction(room_id, |tx| async move {
39 let participant = room_participant::Entity::find()
40 .filter(
41 Condition::all()
42 .add(
43 room_participant::Column::AnsweringConnectionId
44 .eq(connection.id as i32),
45 )
46 .add(
47 room_participant::Column::AnsweringConnectionServerId
48 .eq(connection.owner_id as i32),
49 ),
50 )
51 .one(&*tx)
52 .await?
53 .context("could not find participant")?;
54 if participant.room_id != room_id {
55 return Err(anyhow!("shared project on unexpected room"))?;
56 }
57 if !participant
58 .role
59 .unwrap_or(ChannelRole::Member)
60 .can_edit_projects()
61 {
62 return Err(anyhow!("guests cannot share projects"))?;
63 }
64
65 let project = project::ActiveModel {
66 room_id: ActiveValue::set(Some(participant.room_id)),
67 host_user_id: ActiveValue::set(Some(participant.user_id)),
68 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
69 host_connection_server_id: ActiveValue::set(Some(ServerId(
70 connection.owner_id as i32,
71 ))),
72 id: ActiveValue::NotSet,
73 windows_paths: ActiveValue::set(windows_paths),
74 }
75 .insert(&*tx)
76 .await?;
77
78 if !worktrees.is_empty() {
79 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
80 worktree::ActiveModel {
81 id: ActiveValue::set(worktree.id as i64),
82 project_id: ActiveValue::set(project.id),
83 abs_path: ActiveValue::set(worktree.abs_path.clone()),
84 root_name: ActiveValue::set(worktree.root_name.clone()),
85 visible: ActiveValue::set(worktree.visible),
86 scan_id: ActiveValue::set(0),
87 completed_scan_id: ActiveValue::set(0),
88 }
89 }))
90 .exec(&*tx)
91 .await?;
92 }
93
94 let replica_id = if is_ssh_project {
95 clock::ReplicaId::REMOTE_SERVER
96 } else {
97 clock::ReplicaId::LOCAL
98 };
99
100 project_collaborator::ActiveModel {
101 project_id: ActiveValue::set(project.id),
102 connection_id: ActiveValue::set(connection.id as i32),
103 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
104 user_id: ActiveValue::set(participant.user_id),
105 replica_id: ActiveValue::set(ReplicaId(replica_id.as_u16() as i32)),
106 is_host: ActiveValue::set(true),
107 id: ActiveValue::NotSet,
108 committer_name: ActiveValue::Set(None),
109 committer_email: ActiveValue::Set(None),
110 }
111 .insert(&*tx)
112 .await?;
113
114 let room = self.get_room(room_id, &tx).await?;
115 Ok((project.id, room))
116 })
117 .await
118 }
119
120 pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
121 self.transaction(|tx| async move {
122 project::Entity::delete_by_id(project_id).exec(&*tx).await?;
123 Ok(())
124 })
125 .await
126 }
127
128 /// Unshares the given project.
129 pub async fn unshare_project(
130 &self,
131 project_id: ProjectId,
132 connection: ConnectionId,
133 ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
134 self.project_transaction(project_id, |tx| async move {
135 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
136 let project = project::Entity::find_by_id(project_id)
137 .one(&*tx)
138 .await?
139 .context("project not found")?;
140 let room = if let Some(room_id) = project.room_id {
141 Some(self.get_room(room_id, &tx).await?)
142 } else {
143 None
144 };
145 if project.host_connection()? == connection {
146 return Ok((true, room, guest_connection_ids));
147 }
148 Err(anyhow!("cannot unshare a project hosted by another user"))?
149 })
150 .await
151 }
152
153 /// Updates the worktrees associated with the given project.
154 pub async fn update_project(
155 &self,
156 project_id: ProjectId,
157 connection: ConnectionId,
158 worktrees: &[proto::WorktreeMetadata],
159 ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
160 self.project_transaction(project_id, |tx| async move {
161 let project = project::Entity::find_by_id(project_id)
162 .filter(
163 Condition::all()
164 .add(project::Column::HostConnectionId.eq(connection.id as i32))
165 .add(
166 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
167 ),
168 )
169 .one(&*tx)
170 .await?
171 .context("no such project")?;
172
173 self.update_project_worktrees(project.id, worktrees, &tx)
174 .await?;
175
176 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
177
178 let room = if let Some(room_id) = project.room_id {
179 Some(self.get_room(room_id, &tx).await?)
180 } else {
181 None
182 };
183
184 Ok((room, guest_connection_ids))
185 })
186 .await
187 }
188
189 pub(in crate::db) async fn update_project_worktrees(
190 &self,
191 project_id: ProjectId,
192 worktrees: &[proto::WorktreeMetadata],
193 tx: &DatabaseTransaction,
194 ) -> Result<()> {
195 if !worktrees.is_empty() {
196 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
197 id: ActiveValue::set(worktree.id as i64),
198 project_id: ActiveValue::set(project_id),
199 abs_path: ActiveValue::set(worktree.abs_path.clone()),
200 root_name: ActiveValue::set(worktree.root_name.clone()),
201 visible: ActiveValue::set(worktree.visible),
202 scan_id: ActiveValue::set(0),
203 completed_scan_id: ActiveValue::set(0),
204 }))
205 .on_conflict(
206 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
207 .update_column(worktree::Column::RootName)
208 .to_owned(),
209 )
210 .exec(tx)
211 .await?;
212 }
213
214 worktree::Entity::delete_many()
215 .filter(worktree::Column::ProjectId.eq(project_id).and(
216 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
217 ))
218 .exec(tx)
219 .await?;
220
221 Ok(())
222 }
223
224 pub async fn update_worktree(
225 &self,
226 update: &proto::UpdateWorktree,
227 connection: ConnectionId,
228 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
229 if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
230 || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
231 {
232 return Err(anyhow!(
233 "invalid worktree update. removed entries: {}, updated entries: {}",
234 update.removed_entries.len(),
235 update.updated_entries.len()
236 ))?;
237 }
238
239 let project_id = ProjectId::from_proto(update.project_id);
240 let worktree_id = update.worktree_id as i64;
241 self.project_transaction(project_id, |tx| async move {
242 // Ensure the update comes from the host.
243 let _project = project::Entity::find_by_id(project_id)
244 .filter(
245 Condition::all()
246 .add(project::Column::HostConnectionId.eq(connection.id as i32))
247 .add(
248 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
249 ),
250 )
251 .one(&*tx)
252 .await?
253 .with_context(|| format!("no such project: {project_id}"))?;
254
255 // Update metadata.
256 worktree::Entity::update(worktree::ActiveModel {
257 id: ActiveValue::set(worktree_id),
258 project_id: ActiveValue::set(project_id),
259 root_name: ActiveValue::set(update.root_name.clone()),
260 scan_id: ActiveValue::set(update.scan_id as i64),
261 completed_scan_id: if update.is_last_update {
262 ActiveValue::set(update.scan_id as i64)
263 } else {
264 ActiveValue::default()
265 },
266 abs_path: ActiveValue::set(update.abs_path.clone()),
267 ..Default::default()
268 })
269 .exec(&*tx)
270 .await?;
271
272 if !update.updated_entries.is_empty() {
273 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
274 let mtime = entry.mtime.clone().unwrap_or_default();
275 worktree_entry::ActiveModel {
276 project_id: ActiveValue::set(project_id),
277 worktree_id: ActiveValue::set(worktree_id),
278 id: ActiveValue::set(entry.id as i64),
279 is_dir: ActiveValue::set(entry.is_dir),
280 path: ActiveValue::set(entry.path.clone()),
281 inode: ActiveValue::set(entry.inode as i64),
282 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
283 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
284 canonical_path: ActiveValue::set(entry.canonical_path.clone()),
285 is_ignored: ActiveValue::set(entry.is_ignored),
286 git_status: ActiveValue::set(None),
287 is_external: ActiveValue::set(entry.is_external),
288 is_deleted: ActiveValue::set(false),
289 is_hidden: ActiveValue::set(entry.is_hidden),
290 scan_id: ActiveValue::set(update.scan_id as i64),
291 is_fifo: ActiveValue::set(entry.is_fifo),
292 }
293 }))
294 .on_conflict(
295 OnConflict::columns([
296 worktree_entry::Column::ProjectId,
297 worktree_entry::Column::WorktreeId,
298 worktree_entry::Column::Id,
299 ])
300 .update_columns([
301 worktree_entry::Column::IsDir,
302 worktree_entry::Column::Path,
303 worktree_entry::Column::Inode,
304 worktree_entry::Column::MtimeSeconds,
305 worktree_entry::Column::MtimeNanos,
306 worktree_entry::Column::CanonicalPath,
307 worktree_entry::Column::IsIgnored,
308 worktree_entry::Column::IsHidden,
309 worktree_entry::Column::ScanId,
310 ])
311 .to_owned(),
312 )
313 .exec(&*tx)
314 .await?;
315 }
316
317 if !update.removed_entries.is_empty() {
318 worktree_entry::Entity::update_many()
319 .filter(
320 worktree_entry::Column::ProjectId
321 .eq(project_id)
322 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
323 .and(
324 worktree_entry::Column::Id
325 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
326 ),
327 )
328 .set(worktree_entry::ActiveModel {
329 is_deleted: ActiveValue::Set(true),
330 scan_id: ActiveValue::Set(update.scan_id as i64),
331 ..Default::default()
332 })
333 .exec(&*tx)
334 .await?;
335 }
336
337 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
338 Ok(connection_ids)
339 })
340 .await
341 }
342
343 pub async fn update_repository(
344 &self,
345 update: &proto::UpdateRepository,
346 _connection: ConnectionId,
347 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
348 let project_id = ProjectId::from_proto(update.project_id);
349 let repository_id = update.id as i64;
350 self.project_transaction(project_id, |tx| async move {
351 project_repository::Entity::insert(project_repository::ActiveModel {
352 project_id: ActiveValue::set(project_id),
353 id: ActiveValue::set(repository_id),
354 legacy_worktree_id: ActiveValue::set(None),
355 abs_path: ActiveValue::set(update.abs_path.clone()),
356 entry_ids: ActiveValue::Set(serde_json::to_string(&update.entry_ids).unwrap()),
357 scan_id: ActiveValue::set(update.scan_id as i64),
358 is_deleted: ActiveValue::set(false),
359 branch_summary: ActiveValue::Set(
360 update
361 .branch_summary
362 .as_ref()
363 .map(|summary| serde_json::to_string(summary).unwrap()),
364 ),
365 head_commit_details: ActiveValue::Set(
366 update
367 .head_commit_details
368 .as_ref()
369 .map(|details| serde_json::to_string(details).unwrap()),
370 ),
371 current_merge_conflicts: ActiveValue::Set(Some(
372 serde_json::to_string(&update.current_merge_conflicts).unwrap(),
373 )),
374 merge_message: ActiveValue::set(update.merge_message.clone()),
375 remote_upstream_url: ActiveValue::set(update.remote_upstream_url.clone()),
376 remote_origin_url: ActiveValue::set(update.remote_origin_url.clone()),
377 linked_worktrees: ActiveValue::Set(Some(
378 serde_json::to_string(&update.linked_worktrees).unwrap(),
379 )),
380 })
381 .on_conflict(
382 OnConflict::columns([
383 project_repository::Column::ProjectId,
384 project_repository::Column::Id,
385 ])
386 .update_columns([
387 project_repository::Column::ScanId,
388 project_repository::Column::BranchSummary,
389 project_repository::Column::EntryIds,
390 project_repository::Column::AbsPath,
391 project_repository::Column::CurrentMergeConflicts,
392 project_repository::Column::HeadCommitDetails,
393 project_repository::Column::MergeMessage,
394 project_repository::Column::LinkedWorktrees,
395 ])
396 .to_owned(),
397 )
398 .exec(&*tx)
399 .await?;
400
401 let has_any_statuses = !update.updated_statuses.is_empty();
402
403 if has_any_statuses {
404 project_repository_statuses::Entity::insert_many(
405 update.updated_statuses.iter().map(|status_entry| {
406 let (repo_path, status_kind, first_status, second_status) =
407 proto_status_to_db(status_entry.clone());
408 project_repository_statuses::ActiveModel {
409 project_id: ActiveValue::set(project_id),
410 repository_id: ActiveValue::set(repository_id),
411 scan_id: ActiveValue::set(update.scan_id as i64),
412 is_deleted: ActiveValue::set(false),
413 repo_path: ActiveValue::set(repo_path),
414 status: ActiveValue::set(0),
415 status_kind: ActiveValue::set(status_kind),
416 first_status: ActiveValue::set(first_status),
417 second_status: ActiveValue::set(second_status),
418 lines_added: ActiveValue::set(
419 status_entry.diff_stat_added.map(|v| v as i32),
420 ),
421 lines_deleted: ActiveValue::set(
422 status_entry.diff_stat_deleted.map(|v| v as i32),
423 ),
424 }
425 }),
426 )
427 .on_conflict(
428 OnConflict::columns([
429 project_repository_statuses::Column::ProjectId,
430 project_repository_statuses::Column::RepositoryId,
431 project_repository_statuses::Column::RepoPath,
432 ])
433 .update_columns([
434 project_repository_statuses::Column::ScanId,
435 project_repository_statuses::Column::StatusKind,
436 project_repository_statuses::Column::FirstStatus,
437 project_repository_statuses::Column::SecondStatus,
438 project_repository_statuses::Column::LinesAdded,
439 project_repository_statuses::Column::LinesDeleted,
440 ])
441 .to_owned(),
442 )
443 .exec(&*tx)
444 .await?;
445 }
446
447 let has_any_removed_statuses = !update.removed_statuses.is_empty();
448
449 if has_any_removed_statuses {
450 project_repository_statuses::Entity::update_many()
451 .filter(
452 project_repository_statuses::Column::ProjectId
453 .eq(project_id)
454 .and(
455 project_repository_statuses::Column::RepositoryId.eq(repository_id),
456 )
457 .and(
458 project_repository_statuses::Column::RepoPath
459 .is_in(update.removed_statuses.iter()),
460 ),
461 )
462 .set(project_repository_statuses::ActiveModel {
463 is_deleted: ActiveValue::Set(true),
464 scan_id: ActiveValue::Set(update.scan_id as i64),
465 ..Default::default()
466 })
467 .exec(&*tx)
468 .await?;
469 }
470
471 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
472 Ok(connection_ids)
473 })
474 .await
475 }
476
477 pub async fn remove_repository(
478 &self,
479 remove: &proto::RemoveRepository,
480 _connection: ConnectionId,
481 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
482 let project_id = ProjectId::from_proto(remove.project_id);
483 let repository_id = remove.id as i64;
484 self.project_transaction(project_id, |tx| async move {
485 project_repository::Entity::update_many()
486 .filter(
487 project_repository::Column::ProjectId
488 .eq(project_id)
489 .and(project_repository::Column::Id.eq(repository_id)),
490 )
491 .set(project_repository::ActiveModel {
492 is_deleted: ActiveValue::Set(true),
493 // scan_id: ActiveValue::Set(update.scan_id as i64),
494 ..Default::default()
495 })
496 .exec(&*tx)
497 .await?;
498
499 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
500 Ok(connection_ids)
501 })
502 .await
503 }
504
505 /// Updates the diagnostic summary for the given connection.
506 pub async fn update_diagnostic_summary(
507 &self,
508 update: &proto::UpdateDiagnosticSummary,
509 connection: ConnectionId,
510 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
511 let project_id = ProjectId::from_proto(update.project_id);
512 let worktree_id = update.worktree_id as i64;
513 self.project_transaction(project_id, |tx| async move {
514 let summary = update.summary.as_ref().context("invalid summary")?;
515
516 // Ensure the update comes from the host.
517 let project = project::Entity::find_by_id(project_id)
518 .one(&*tx)
519 .await?
520 .context("no such project")?;
521 if project.host_connection()? != connection {
522 return Err(anyhow!("can't update a project hosted by someone else"))?;
523 }
524
525 // Update summary.
526 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
527 project_id: ActiveValue::set(project_id),
528 worktree_id: ActiveValue::set(worktree_id),
529 path: ActiveValue::set(summary.path.clone()),
530 language_server_id: ActiveValue::set(summary.language_server_id as i64),
531 error_count: ActiveValue::set(summary.error_count as i32),
532 warning_count: ActiveValue::set(summary.warning_count as i32),
533 })
534 .on_conflict(
535 OnConflict::columns([
536 worktree_diagnostic_summary::Column::ProjectId,
537 worktree_diagnostic_summary::Column::WorktreeId,
538 worktree_diagnostic_summary::Column::Path,
539 ])
540 .update_columns([
541 worktree_diagnostic_summary::Column::LanguageServerId,
542 worktree_diagnostic_summary::Column::ErrorCount,
543 worktree_diagnostic_summary::Column::WarningCount,
544 ])
545 .to_owned(),
546 )
547 .exec(&*tx)
548 .await?;
549
550 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
551 Ok(connection_ids)
552 })
553 .await
554 }
555
556 /// Starts the language server for the given connection.
557 pub async fn start_language_server(
558 &self,
559 update: &proto::StartLanguageServer,
560 connection: ConnectionId,
561 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
562 let project_id = ProjectId::from_proto(update.project_id);
563 self.project_transaction(project_id, |tx| async move {
564 let server = update.server.as_ref().context("invalid language server")?;
565
566 // Ensure the update comes from the host.
567 let project = project::Entity::find_by_id(project_id)
568 .one(&*tx)
569 .await?
570 .context("no such project")?;
571 if project.host_connection()? != connection {
572 return Err(anyhow!("can't update a project hosted by someone else"))?;
573 }
574
575 // Add the newly-started language server.
576 language_server::Entity::insert(language_server::ActiveModel {
577 project_id: ActiveValue::set(project_id),
578 id: ActiveValue::set(server.id as i64),
579 name: ActiveValue::set(server.name.clone()),
580 worktree_id: ActiveValue::set(server.worktree_id.map(|id| id as i64)),
581 capabilities: ActiveValue::set(update.capabilities.clone()),
582 })
583 .on_conflict(
584 OnConflict::columns([
585 language_server::Column::ProjectId,
586 language_server::Column::Id,
587 ])
588 .update_columns([
589 language_server::Column::Name,
590 language_server::Column::Capabilities,
591 language_server::Column::WorktreeId,
592 ])
593 .to_owned(),
594 )
595 .exec(&*tx)
596 .await?;
597
598 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
599 Ok(connection_ids)
600 })
601 .await
602 }
603
604 /// Updates the worktree settings for the given connection.
605 pub async fn update_worktree_settings(
606 &self,
607 update: &proto::UpdateWorktreeSettings,
608 connection: ConnectionId,
609 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
610 let project_id = ProjectId::from_proto(update.project_id);
611 let kind = match update.kind {
612 Some(kind) => proto::LocalSettingsKind::from_i32(kind)
613 .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
614 None => proto::LocalSettingsKind::Settings,
615 };
616 let kind = LocalSettingsKind::from_proto(kind);
617 self.project_transaction(project_id, |tx| async move {
618 // Ensure the update comes from the host.
619 let project = project::Entity::find_by_id(project_id)
620 .one(&*tx)
621 .await?
622 .context("no such project")?;
623 if project.host_connection()? != connection {
624 return Err(anyhow!("can't update a project hosted by someone else"))?;
625 }
626
627 if let Some(content) = &update.content {
628 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
629 project_id: ActiveValue::Set(project_id),
630 worktree_id: ActiveValue::Set(update.worktree_id as i64),
631 path: ActiveValue::Set(update.path.clone()),
632 content: ActiveValue::Set(content.clone()),
633 kind: ActiveValue::Set(kind),
634 outside_worktree: ActiveValue::Set(update.outside_worktree.unwrap_or(false)),
635 })
636 .on_conflict(
637 OnConflict::columns([
638 worktree_settings_file::Column::ProjectId,
639 worktree_settings_file::Column::WorktreeId,
640 worktree_settings_file::Column::Path,
641 ])
642 .update_columns([
643 worktree_settings_file::Column::Content,
644 worktree_settings_file::Column::OutsideWorktree,
645 ])
646 .to_owned(),
647 )
648 .exec(&*tx)
649 .await?;
650 } else {
651 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
652 project_id: ActiveValue::Set(project_id),
653 worktree_id: ActiveValue::Set(update.worktree_id as i64),
654 path: ActiveValue::Set(update.path.clone()),
655 ..Default::default()
656 })
657 .exec(&*tx)
658 .await?;
659 }
660
661 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
662 Ok(connection_ids)
663 })
664 .await
665 }
666
667 pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
668 self.transaction(|tx| async move {
669 Ok(project::Entity::find_by_id(id)
670 .one(&*tx)
671 .await?
672 .context("no such project")?)
673 })
674 .await
675 }
676
677 /// Adds the given connection to the specified project
678 /// in the current room.
679 pub async fn join_project(
680 &self,
681 project_id: ProjectId,
682 connection: ConnectionId,
683 user_id: UserId,
684 committer_name: Option<String>,
685 committer_email: Option<String>,
686 ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
687 self.project_transaction(project_id, move |tx| {
688 let committer_name = committer_name.clone();
689 let committer_email = committer_email.clone();
690 async move {
691 let (project, role) = self
692 .access_project(project_id, connection, Capability::ReadOnly, &tx)
693 .await?;
694 self.join_project_internal(
695 project,
696 user_id,
697 committer_name,
698 committer_email,
699 connection,
700 role,
701 &tx,
702 )
703 .await
704 }
705 })
706 .await
707 }
708
709 async fn join_project_internal(
710 &self,
711 project: project::Model,
712 user_id: UserId,
713 committer_name: Option<String>,
714 committer_email: Option<String>,
715 connection: ConnectionId,
716 role: ChannelRole,
717 tx: &DatabaseTransaction,
718 ) -> Result<(Project, ReplicaId)> {
719 let mut collaborators = project
720 .find_related(project_collaborator::Entity)
721 .all(tx)
722 .await?;
723 let replica_ids = collaborators
724 .iter()
725 .map(|c| c.replica_id)
726 .collect::<HashSet<_>>();
727 let mut replica_id = ReplicaId(clock::ReplicaId::FIRST_COLLAB_ID.as_u16() as i32);
728 while replica_ids.contains(&replica_id) {
729 replica_id.0 += 1;
730 }
731 let new_collaborator = project_collaborator::ActiveModel {
732 project_id: ActiveValue::set(project.id),
733 connection_id: ActiveValue::set(connection.id as i32),
734 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
735 user_id: ActiveValue::set(user_id),
736 replica_id: ActiveValue::set(replica_id),
737 is_host: ActiveValue::set(false),
738 id: ActiveValue::NotSet,
739 committer_name: ActiveValue::set(committer_name),
740 committer_email: ActiveValue::set(committer_email),
741 }
742 .insert(tx)
743 .await?;
744 collaborators.push(new_collaborator);
745
746 let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
747 let mut worktrees = db_worktrees
748 .into_iter()
749 .map(|db_worktree| {
750 (
751 db_worktree.id as u64,
752 Worktree {
753 id: db_worktree.id as u64,
754 abs_path: db_worktree.abs_path,
755 root_name: db_worktree.root_name,
756 visible: db_worktree.visible,
757 entries: Default::default(),
758 diagnostic_summaries: Default::default(),
759 settings_files: Default::default(),
760 scan_id: db_worktree.scan_id as u64,
761 completed_scan_id: db_worktree.completed_scan_id as u64,
762 legacy_repository_entries: Default::default(),
763 },
764 )
765 })
766 .collect::<BTreeMap<_, _>>();
767
768 // Populate worktree entries.
769 {
770 let mut db_entries = worktree_entry::Entity::find()
771 .filter(
772 Condition::all()
773 .add(worktree_entry::Column::ProjectId.eq(project.id))
774 .add(worktree_entry::Column::IsDeleted.eq(false)),
775 )
776 .stream(tx)
777 .await?;
778 while let Some(db_entry) = db_entries.next().await {
779 let db_entry = db_entry?;
780 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
781 worktree.entries.push(proto::Entry {
782 id: db_entry.id as u64,
783 is_dir: db_entry.is_dir,
784 path: db_entry.path,
785 inode: db_entry.inode as u64,
786 mtime: Some(proto::Timestamp {
787 seconds: db_entry.mtime_seconds as u64,
788 nanos: db_entry.mtime_nanos as u32,
789 }),
790 canonical_path: db_entry.canonical_path,
791 is_ignored: db_entry.is_ignored,
792 is_external: db_entry.is_external,
793 is_hidden: db_entry.is_hidden,
794 // This is only used in the summarization backlog, so if it's None,
795 // that just means we won't be able to detect when to resummarize
796 // based on total number of backlogged bytes - instead, we'd go
797 // on number of files only. That shouldn't be a huge deal in practice.
798 size: None,
799 is_fifo: db_entry.is_fifo,
800 });
801 }
802 }
803 }
804
805 // Populate repository entries.
806 let mut repositories = Vec::new();
807 {
808 let db_repository_entries = project_repository::Entity::find()
809 .filter(
810 Condition::all()
811 .add(project_repository::Column::ProjectId.eq(project.id))
812 .add(project_repository::Column::IsDeleted.eq(false)),
813 )
814 .all(tx)
815 .await?;
816 for db_repository_entry in db_repository_entries {
817 let mut repository_statuses = project_repository_statuses::Entity::find()
818 .filter(
819 Condition::all()
820 .add(project_repository_statuses::Column::ProjectId.eq(project.id))
821 .add(
822 project_repository_statuses::Column::RepositoryId
823 .eq(db_repository_entry.id),
824 )
825 .add(project_repository_statuses::Column::IsDeleted.eq(false)),
826 )
827 .stream(tx)
828 .await?;
829 let mut updated_statuses = Vec::new();
830 while let Some(status_entry) = repository_statuses.next().await {
831 let status_entry = status_entry?;
832 updated_statuses.push(db_status_to_proto(status_entry)?);
833 }
834
835 let current_merge_conflicts = db_repository_entry
836 .current_merge_conflicts
837 .as_ref()
838 .map(|conflicts| serde_json::from_str(conflicts))
839 .transpose()?
840 .unwrap_or_default();
841
842 let branch_summary = db_repository_entry
843 .branch_summary
844 .as_ref()
845 .map(|branch_summary| serde_json::from_str(branch_summary))
846 .transpose()?
847 .unwrap_or_default();
848
849 let head_commit_details = db_repository_entry
850 .head_commit_details
851 .as_ref()
852 .map(|head_commit_details| serde_json::from_str(head_commit_details))
853 .transpose()?
854 .unwrap_or_default();
855
856 let entry_ids = serde_json::from_str(&db_repository_entry.entry_ids)
857 .context("failed to deserialize repository's entry ids")?;
858
859 if let Some(worktree_id) = db_repository_entry.legacy_worktree_id {
860 if let Some(worktree) = worktrees.get_mut(&(worktree_id as u64)) {
861 worktree.legacy_repository_entries.insert(
862 db_repository_entry.id as u64,
863 proto::RepositoryEntry {
864 repository_id: db_repository_entry.id as u64,
865 updated_statuses,
866 removed_statuses: Vec::new(),
867 current_merge_conflicts,
868 branch_summary,
869 },
870 );
871 }
872 } else {
873 repositories.push(proto::UpdateRepository {
874 project_id: db_repository_entry.project_id.0 as u64,
875 id: db_repository_entry.id as u64,
876 abs_path: db_repository_entry.abs_path.clone(),
877 entry_ids,
878 updated_statuses,
879 removed_statuses: Vec::new(),
880 current_merge_conflicts,
881 branch_summary,
882 head_commit_details,
883 scan_id: db_repository_entry.scan_id as u64,
884 is_last_update: true,
885 merge_message: db_repository_entry.merge_message,
886 stash_entries: Vec::new(),
887 remote_upstream_url: db_repository_entry.remote_upstream_url.clone(),
888 remote_origin_url: db_repository_entry.remote_origin_url.clone(),
889 original_repo_abs_path: Some(db_repository_entry.abs_path),
890 linked_worktrees: db_repository_entry
891 .linked_worktrees
892 .as_deref()
893 .and_then(|s| serde_json::from_str(s).ok())
894 .unwrap_or_default(),
895 });
896 }
897 }
898 }
899
900 // Populate worktree diagnostic summaries.
901 {
902 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
903 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
904 .stream(tx)
905 .await?;
906 while let Some(db_summary) = db_summaries.next().await {
907 let db_summary = db_summary?;
908 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
909 worktree
910 .diagnostic_summaries
911 .push(proto::DiagnosticSummary {
912 path: db_summary.path,
913 language_server_id: db_summary.language_server_id as u64,
914 error_count: db_summary.error_count as u32,
915 warning_count: db_summary.warning_count as u32,
916 });
917 }
918 }
919 }
920
921 // Populate worktree settings files
922 {
923 let mut db_settings_files = worktree_settings_file::Entity::find()
924 .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
925 .stream(tx)
926 .await?;
927 while let Some(db_settings_file) = db_settings_files.next().await {
928 let db_settings_file = db_settings_file?;
929 if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
930 worktree.settings_files.push(WorktreeSettingsFile {
931 path: db_settings_file.path,
932 content: db_settings_file.content,
933 kind: db_settings_file.kind,
934 outside_worktree: db_settings_file.outside_worktree,
935 });
936 }
937 }
938 }
939
940 // Populate language servers.
941 let language_servers = project
942 .find_related(language_server::Entity)
943 .all(tx)
944 .await?;
945
946 let path_style = if project.windows_paths {
947 PathStyle::Windows
948 } else {
949 PathStyle::Posix
950 };
951
952 let project = Project {
953 id: project.id,
954 role,
955 collaborators: collaborators
956 .into_iter()
957 .map(|collaborator| ProjectCollaborator {
958 connection_id: collaborator.connection(),
959 user_id: collaborator.user_id,
960 replica_id: collaborator.replica_id,
961 is_host: collaborator.is_host,
962 committer_name: collaborator.committer_name,
963 committer_email: collaborator.committer_email,
964 })
965 .collect(),
966 worktrees,
967 repositories,
968 language_servers: language_servers
969 .into_iter()
970 .map(|language_server| LanguageServer {
971 server: proto::LanguageServer {
972 id: language_server.id as u64,
973 name: language_server.name,
974 worktree_id: language_server.worktree_id.map(|id| id as u64),
975 },
976 capabilities: language_server.capabilities,
977 })
978 .collect(),
979 path_style,
980 };
981 Ok((project, replica_id as ReplicaId))
982 }
983
984 /// Removes the given connection from the specified project.
985 pub async fn leave_project(
986 &self,
987 project_id: ProjectId,
988 connection: ConnectionId,
989 ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
990 self.project_transaction(project_id, |tx| async move {
991 let result = project_collaborator::Entity::delete_many()
992 .filter(
993 Condition::all()
994 .add(project_collaborator::Column::ProjectId.eq(project_id))
995 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
996 .add(
997 project_collaborator::Column::ConnectionServerId
998 .eq(connection.owner_id as i32),
999 ),
1000 )
1001 .exec(&*tx)
1002 .await?;
1003 if result.rows_affected == 0 {
1004 Err(anyhow!("not a collaborator on this project"))?;
1005 }
1006
1007 let project = project::Entity::find_by_id(project_id)
1008 .one(&*tx)
1009 .await?
1010 .context("no such project")?;
1011 let collaborators = project
1012 .find_related(project_collaborator::Entity)
1013 .all(&*tx)
1014 .await?;
1015 let connection_ids: Vec<ConnectionId> = collaborators
1016 .into_iter()
1017 .map(|collaborator| collaborator.connection())
1018 .collect();
1019
1020 follower::Entity::delete_many()
1021 .filter(
1022 Condition::any()
1023 .add(
1024 Condition::all()
1025 .add(follower::Column::ProjectId.eq(Some(project_id)))
1026 .add(
1027 follower::Column::LeaderConnectionServerId
1028 .eq(connection.owner_id),
1029 )
1030 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
1031 )
1032 .add(
1033 Condition::all()
1034 .add(follower::Column::ProjectId.eq(Some(project_id)))
1035 .add(
1036 follower::Column::FollowerConnectionServerId
1037 .eq(connection.owner_id),
1038 )
1039 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
1040 ),
1041 )
1042 .exec(&*tx)
1043 .await?;
1044
1045 let room = if let Some(room_id) = project.room_id {
1046 Some(self.get_room(room_id, &tx).await?)
1047 } else {
1048 None
1049 };
1050
1051 let left_project = LeftProject {
1052 id: project_id,
1053 should_unshare: connection == project.host_connection()?,
1054 connection_ids,
1055 };
1056 Ok((room, left_project))
1057 })
1058 .await
1059 }
1060
1061 pub async fn check_user_is_project_host(
1062 &self,
1063 project_id: ProjectId,
1064 connection_id: ConnectionId,
1065 ) -> Result<()> {
1066 self.project_transaction(project_id, |tx| async move {
1067 project::Entity::find()
1068 .filter(
1069 Condition::all()
1070 .add(project::Column::Id.eq(project_id))
1071 .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
1072 .add(
1073 project::Column::HostConnectionServerId
1074 .eq(Some(connection_id.owner_id as i32)),
1075 ),
1076 )
1077 .one(&*tx)
1078 .await?
1079 .context("failed to read project host")?;
1080
1081 Ok(())
1082 })
1083 .await
1084 .map(|guard| guard.into_inner())
1085 }
1086
1087 /// Returns the current project if the given user is authorized to access it with the specified capability.
1088 pub async fn access_project(
1089 &self,
1090 project_id: ProjectId,
1091 connection_id: ConnectionId,
1092 capability: Capability,
1093 tx: &DatabaseTransaction,
1094 ) -> Result<(project::Model, ChannelRole)> {
1095 let project = project::Entity::find_by_id(project_id)
1096 .one(tx)
1097 .await?
1098 .context("no such project")?;
1099
1100 let role_from_room = if let Some(room_id) = project.room_id {
1101 room_participant::Entity::find()
1102 .filter(room_participant::Column::RoomId.eq(room_id))
1103 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1104 .one(tx)
1105 .await?
1106 .and_then(|participant| participant.role)
1107 } else {
1108 None
1109 };
1110
1111 let role = role_from_room.unwrap_or(ChannelRole::Banned);
1112
1113 match capability {
1114 Capability::ReadWrite => {
1115 if !role.can_edit_projects() {
1116 return Err(anyhow!("not authorized to edit projects"))?;
1117 }
1118 }
1119 Capability::ReadOnly => {
1120 if !role.can_read_projects() {
1121 return Err(anyhow!("not authorized to read projects"))?;
1122 }
1123 }
1124 }
1125
1126 Ok((project, role))
1127 }
1128
1129 /// Returns the host connection for a read-only request to join a shared project.
1130 pub async fn host_for_read_only_project_request(
1131 &self,
1132 project_id: ProjectId,
1133 connection_id: ConnectionId,
1134 ) -> Result<ConnectionId> {
1135 self.project_transaction(project_id, |tx| async move {
1136 let (project, _) = self
1137 .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1138 .await?;
1139 project.host_connection()
1140 })
1141 .await
1142 .map(|guard| guard.into_inner())
1143 }
1144
1145 /// Returns the host connection for a request to join a shared project.
1146 pub async fn host_for_mutating_project_request(
1147 &self,
1148 project_id: ProjectId,
1149 connection_id: ConnectionId,
1150 ) -> Result<ConnectionId> {
1151 self.project_transaction(project_id, |tx| async move {
1152 let (project, _) = self
1153 .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1154 .await?;
1155 project.host_connection()
1156 })
1157 .await
1158 .map(|guard| guard.into_inner())
1159 }
1160
1161 pub async fn connections_for_buffer_update(
1162 &self,
1163 project_id: ProjectId,
1164 connection_id: ConnectionId,
1165 capability: Capability,
1166 ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1167 self.project_transaction(project_id, |tx| async move {
1168 // Authorize
1169 let (project, _) = self
1170 .access_project(project_id, connection_id, capability, &tx)
1171 .await?;
1172
1173 let host_connection_id = project.host_connection()?;
1174
1175 let collaborators = project_collaborator::Entity::find()
1176 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1177 .all(&*tx)
1178 .await?;
1179
1180 let guest_connection_ids = collaborators
1181 .into_iter()
1182 .filter_map(|collaborator| {
1183 if collaborator.is_host {
1184 None
1185 } else {
1186 Some(collaborator.connection())
1187 }
1188 })
1189 .collect();
1190
1191 Ok((host_connection_id, guest_connection_ids))
1192 })
1193 .await
1194 }
1195
1196 /// Returns the connection IDs in the given project.
1197 ///
1198 /// The provided `connection_id` must also be a collaborator in the project,
1199 /// otherwise an error will be returned.
1200 pub async fn project_connection_ids(
1201 &self,
1202 project_id: ProjectId,
1203 connection_id: ConnectionId,
1204 exclude_dev_server: bool,
1205 ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1206 self.project_transaction(project_id, |tx| async move {
1207 self.internal_project_connection_ids(project_id, connection_id, exclude_dev_server, &tx)
1208 .await
1209 })
1210 .await
1211 }
1212
1213 async fn internal_project_connection_ids(
1214 &self,
1215 project_id: ProjectId,
1216 connection_id: ConnectionId,
1217 exclude_dev_server: bool,
1218 tx: &DatabaseTransaction,
1219 ) -> Result<HashSet<ConnectionId>> {
1220 let project = project::Entity::find_by_id(project_id)
1221 .one(tx)
1222 .await?
1223 .context("no such project")?;
1224
1225 let mut collaborators = project_collaborator::Entity::find()
1226 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1227 .stream(tx)
1228 .await?;
1229
1230 let mut connection_ids = HashSet::default();
1231 if let Some(host_connection) = project.host_connection().log_err()
1232 && !exclude_dev_server
1233 {
1234 connection_ids.insert(host_connection);
1235 }
1236
1237 while let Some(collaborator) = collaborators.next().await {
1238 let collaborator = collaborator?;
1239 connection_ids.insert(collaborator.connection());
1240 }
1241
1242 if connection_ids.contains(&connection_id)
1243 || Some(connection_id) == project.host_connection().ok()
1244 {
1245 Ok(connection_ids)
1246 } else {
1247 Err(anyhow!(
1248 "can only send project updates to a project you're in"
1249 ))?
1250 }
1251 }
1252
1253 async fn project_guest_connection_ids(
1254 &self,
1255 project_id: ProjectId,
1256 tx: &DatabaseTransaction,
1257 ) -> Result<Vec<ConnectionId>> {
1258 let mut collaborators = project_collaborator::Entity::find()
1259 .filter(
1260 project_collaborator::Column::ProjectId
1261 .eq(project_id)
1262 .and(project_collaborator::Column::IsHost.eq(false)),
1263 )
1264 .stream(tx)
1265 .await?;
1266
1267 let mut guest_connection_ids = Vec::new();
1268 while let Some(collaborator) = collaborators.next().await {
1269 let collaborator = collaborator?;
1270 guest_connection_ids.push(collaborator.connection());
1271 }
1272 Ok(guest_connection_ids)
1273 }
1274
1275 /// Returns the [`RoomId`] for the given project.
1276 pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1277 self.transaction(|tx| async move {
1278 Ok(project::Entity::find_by_id(project_id)
1279 .one(&*tx)
1280 .await?
1281 .and_then(|project| project.room_id))
1282 })
1283 .await
1284 }
1285
1286 pub async fn check_room_participants(
1287 &self,
1288 room_id: RoomId,
1289 leader_id: ConnectionId,
1290 follower_id: ConnectionId,
1291 ) -> Result<()> {
1292 self.transaction(|tx| async move {
1293 use room_participant::Column;
1294
1295 let count = room_participant::Entity::find()
1296 .filter(
1297 Condition::all().add(Column::RoomId.eq(room_id)).add(
1298 Condition::any()
1299 .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1300 Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1301 ))
1302 .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1303 Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1304 )),
1305 ),
1306 )
1307 .count(&*tx)
1308 .await?;
1309
1310 if count < 2 {
1311 Err(anyhow!("not room participants"))?;
1312 }
1313
1314 Ok(())
1315 })
1316 .await
1317 }
1318
1319 /// Adds the given follower connection as a follower of the given leader connection.
1320 pub async fn follow(
1321 &self,
1322 room_id: RoomId,
1323 project_id: ProjectId,
1324 leader_connection: ConnectionId,
1325 follower_connection: ConnectionId,
1326 ) -> Result<TransactionGuard<proto::Room>> {
1327 self.room_transaction(room_id, |tx| async move {
1328 follower::ActiveModel {
1329 room_id: ActiveValue::set(room_id),
1330 project_id: ActiveValue::set(project_id),
1331 leader_connection_server_id: ActiveValue::set(ServerId(
1332 leader_connection.owner_id as i32,
1333 )),
1334 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1335 follower_connection_server_id: ActiveValue::set(ServerId(
1336 follower_connection.owner_id as i32,
1337 )),
1338 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1339 ..Default::default()
1340 }
1341 .insert(&*tx)
1342 .await?;
1343
1344 let room = self.get_room(room_id, &tx).await?;
1345 Ok(room)
1346 })
1347 .await
1348 }
1349
1350 /// Removes the given follower connection as a follower of the given leader connection.
1351 pub async fn unfollow(
1352 &self,
1353 room_id: RoomId,
1354 project_id: ProjectId,
1355 leader_connection: ConnectionId,
1356 follower_connection: ConnectionId,
1357 ) -> Result<TransactionGuard<proto::Room>> {
1358 self.room_transaction(room_id, |tx| async move {
1359 follower::Entity::delete_many()
1360 .filter(
1361 Condition::all()
1362 .add(follower::Column::RoomId.eq(room_id))
1363 .add(follower::Column::ProjectId.eq(project_id))
1364 .add(
1365 follower::Column::LeaderConnectionServerId
1366 .eq(leader_connection.owner_id),
1367 )
1368 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1369 .add(
1370 follower::Column::FollowerConnectionServerId
1371 .eq(follower_connection.owner_id),
1372 )
1373 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1374 )
1375 .exec(&*tx)
1376 .await?;
1377
1378 let room = self.get_room(room_id, &tx).await?;
1379 Ok(room)
1380 })
1381 .await
1382 }
1383}