1use anyhow::Context as _;
2use collections::HashSet;
3use util::ResultExt;
4
5use super::*;
6
7impl Database {
8 /// Returns the count of all projects, excluding ones marked as admin.
9 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
10 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
11 enum QueryAs {
12 Count,
13 }
14
15 self.transaction(|tx| async move {
16 Ok(project::Entity::find()
17 .select_only()
18 .column_as(project::Column::Id.count(), QueryAs::Count)
19 .inner_join(user::Entity)
20 .filter(user::Column::Admin.eq(false))
21 .into_values::<_, QueryAs>()
22 .one(&*tx)
23 .await?
24 .unwrap_or(0i64) as usize)
25 })
26 .await
27 }
28
29 /// Shares a project with the given room.
30 pub async fn share_project(
31 &self,
32 room_id: RoomId,
33 connection: ConnectionId,
34 worktrees: &[proto::WorktreeMetadata],
35 is_ssh_project: bool,
36 windows_paths: bool,
37 features: &[String],
38 ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
39 self.room_transaction(room_id, |tx| async move {
40 let participant = room_participant::Entity::find()
41 .filter(
42 Condition::all()
43 .add(
44 room_participant::Column::AnsweringConnectionId
45 .eq(connection.id as i32),
46 )
47 .add(
48 room_participant::Column::AnsweringConnectionServerId
49 .eq(connection.owner_id as i32),
50 ),
51 )
52 .one(&*tx)
53 .await?
54 .context("could not find participant")?;
55 if participant.room_id != room_id {
56 return Err(anyhow!("shared project on unexpected room"))?;
57 }
58 if !participant
59 .role
60 .unwrap_or(ChannelRole::Member)
61 .can_edit_projects()
62 {
63 return Err(anyhow!("guests cannot share projects"))?;
64 }
65
66 let project = project::ActiveModel {
67 room_id: ActiveValue::set(Some(participant.room_id)),
68 host_user_id: ActiveValue::set(Some(participant.user_id)),
69 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
70 host_connection_server_id: ActiveValue::set(Some(ServerId(
71 connection.owner_id as i32,
72 ))),
73 id: ActiveValue::NotSet,
74 windows_paths: ActiveValue::set(windows_paths),
75 features: ActiveValue::set(serde_json::to_string(features).unwrap()),
76 }
77 .insert(&*tx)
78 .await?;
79
80 if !worktrees.is_empty() {
81 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
82 worktree::ActiveModel {
83 id: ActiveValue::set(worktree.id as i64),
84 project_id: ActiveValue::set(project.id),
85 abs_path: ActiveValue::set(worktree.abs_path.clone()),
86 root_name: ActiveValue::set(worktree.root_name.clone()),
87 visible: ActiveValue::set(worktree.visible),
88 scan_id: ActiveValue::set(0),
89 completed_scan_id: ActiveValue::set(0),
90 }
91 }))
92 .exec(&*tx)
93 .await?;
94 }
95
96 let replica_id = if is_ssh_project {
97 clock::ReplicaId::REMOTE_SERVER
98 } else {
99 clock::ReplicaId::LOCAL
100 };
101
102 project_collaborator::ActiveModel {
103 project_id: ActiveValue::set(project.id),
104 connection_id: ActiveValue::set(connection.id as i32),
105 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
106 user_id: ActiveValue::set(participant.user_id),
107 replica_id: ActiveValue::set(ReplicaId(replica_id.as_u16() as i32)),
108 is_host: ActiveValue::set(true),
109 id: ActiveValue::NotSet,
110 committer_name: ActiveValue::Set(None),
111 committer_email: ActiveValue::Set(None),
112 }
113 .insert(&*tx)
114 .await?;
115
116 let room = self.get_room(room_id, &tx).await?;
117 Ok((project.id, room))
118 })
119 .await
120 }
121
122 pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
123 self.transaction(|tx| async move {
124 project::Entity::delete_by_id(project_id).exec(&*tx).await?;
125 Ok(())
126 })
127 .await
128 }
129
130 /// Unshares the given project.
131 pub async fn unshare_project(
132 &self,
133 project_id: ProjectId,
134 connection: ConnectionId,
135 ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
136 self.project_transaction(project_id, |tx| async move {
137 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
138 let project = project::Entity::find_by_id(project_id)
139 .one(&*tx)
140 .await?
141 .context("project not found")?;
142 let room = if let Some(room_id) = project.room_id {
143 Some(self.get_room(room_id, &tx).await?)
144 } else {
145 None
146 };
147 if project.host_connection()? == connection {
148 return Ok((true, room, guest_connection_ids));
149 }
150 Err(anyhow!("cannot unshare a project hosted by another user"))?
151 })
152 .await
153 }
154
155 /// Updates the worktrees associated with the given project.
156 pub async fn update_project(
157 &self,
158 project_id: ProjectId,
159 connection: ConnectionId,
160 worktrees: &[proto::WorktreeMetadata],
161 ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
162 self.project_transaction(project_id, |tx| async move {
163 let project = project::Entity::find_by_id(project_id)
164 .filter(
165 Condition::all()
166 .add(project::Column::HostConnectionId.eq(connection.id as i32))
167 .add(
168 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
169 ),
170 )
171 .one(&*tx)
172 .await?
173 .context("no such project")?;
174
175 self.update_project_worktrees(project.id, worktrees, &tx)
176 .await?;
177
178 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
179
180 let room = if let Some(room_id) = project.room_id {
181 Some(self.get_room(room_id, &tx).await?)
182 } else {
183 None
184 };
185
186 Ok((room, guest_connection_ids))
187 })
188 .await
189 }
190
191 pub(in crate::db) async fn update_project_worktrees(
192 &self,
193 project_id: ProjectId,
194 worktrees: &[proto::WorktreeMetadata],
195 tx: &DatabaseTransaction,
196 ) -> Result<()> {
197 if !worktrees.is_empty() {
198 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
199 id: ActiveValue::set(worktree.id as i64),
200 project_id: ActiveValue::set(project_id),
201 abs_path: ActiveValue::set(worktree.abs_path.clone()),
202 root_name: ActiveValue::set(worktree.root_name.clone()),
203 visible: ActiveValue::set(worktree.visible),
204 scan_id: ActiveValue::set(0),
205 completed_scan_id: ActiveValue::set(0),
206 }))
207 .on_conflict(
208 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
209 .update_column(worktree::Column::RootName)
210 .to_owned(),
211 )
212 .exec(tx)
213 .await?;
214 }
215
216 worktree::Entity::delete_many()
217 .filter(worktree::Column::ProjectId.eq(project_id).and(
218 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
219 ))
220 .exec(tx)
221 .await?;
222
223 Ok(())
224 }
225
226 pub async fn update_worktree(
227 &self,
228 update: &proto::UpdateWorktree,
229 connection: ConnectionId,
230 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
231 if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
232 || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
233 {
234 return Err(anyhow!(
235 "invalid worktree update. removed entries: {}, updated entries: {}",
236 update.removed_entries.len(),
237 update.updated_entries.len()
238 ))?;
239 }
240
241 let project_id = ProjectId::from_proto(update.project_id);
242 let worktree_id = update.worktree_id as i64;
243 self.project_transaction(project_id, |tx| async move {
244 // Ensure the update comes from the host.
245 let _project = project::Entity::find_by_id(project_id)
246 .filter(
247 Condition::all()
248 .add(project::Column::HostConnectionId.eq(connection.id as i32))
249 .add(
250 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
251 ),
252 )
253 .one(&*tx)
254 .await?
255 .with_context(|| format!("no such project: {project_id}"))?;
256
257 // Update metadata.
258 worktree::Entity::update(worktree::ActiveModel {
259 id: ActiveValue::set(worktree_id),
260 project_id: ActiveValue::set(project_id),
261 root_name: ActiveValue::set(update.root_name.clone()),
262 scan_id: ActiveValue::set(update.scan_id as i64),
263 completed_scan_id: if update.is_last_update {
264 ActiveValue::set(update.scan_id as i64)
265 } else {
266 ActiveValue::default()
267 },
268 abs_path: ActiveValue::set(update.abs_path.clone()),
269 ..Default::default()
270 })
271 .exec(&*tx)
272 .await?;
273
274 if !update.updated_entries.is_empty() {
275 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
276 let mtime = entry.mtime.clone().unwrap_or_default();
277 worktree_entry::ActiveModel {
278 project_id: ActiveValue::set(project_id),
279 worktree_id: ActiveValue::set(worktree_id),
280 id: ActiveValue::set(entry.id as i64),
281 is_dir: ActiveValue::set(entry.is_dir),
282 path: ActiveValue::set(entry.path.clone()),
283 inode: ActiveValue::set(entry.inode as i64),
284 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
285 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
286 canonical_path: ActiveValue::set(entry.canonical_path.clone()),
287 is_ignored: ActiveValue::set(entry.is_ignored),
288 git_status: ActiveValue::set(None),
289 is_external: ActiveValue::set(entry.is_external),
290 is_deleted: ActiveValue::set(false),
291 is_hidden: ActiveValue::set(entry.is_hidden),
292 scan_id: ActiveValue::set(update.scan_id as i64),
293 is_fifo: ActiveValue::set(entry.is_fifo),
294 }
295 }))
296 .on_conflict(
297 OnConflict::columns([
298 worktree_entry::Column::ProjectId,
299 worktree_entry::Column::WorktreeId,
300 worktree_entry::Column::Id,
301 ])
302 .update_columns([
303 worktree_entry::Column::IsDir,
304 worktree_entry::Column::Path,
305 worktree_entry::Column::Inode,
306 worktree_entry::Column::MtimeSeconds,
307 worktree_entry::Column::MtimeNanos,
308 worktree_entry::Column::CanonicalPath,
309 worktree_entry::Column::IsIgnored,
310 worktree_entry::Column::IsHidden,
311 worktree_entry::Column::ScanId,
312 ])
313 .to_owned(),
314 )
315 .exec(&*tx)
316 .await?;
317 }
318
319 if !update.removed_entries.is_empty() {
320 worktree_entry::Entity::update_many()
321 .filter(
322 worktree_entry::Column::ProjectId
323 .eq(project_id)
324 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
325 .and(
326 worktree_entry::Column::Id
327 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
328 ),
329 )
330 .set(worktree_entry::ActiveModel {
331 is_deleted: ActiveValue::Set(true),
332 scan_id: ActiveValue::Set(update.scan_id as i64),
333 ..Default::default()
334 })
335 .exec(&*tx)
336 .await?;
337 }
338
339 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
340 Ok(connection_ids)
341 })
342 .await
343 }
344
345 pub async fn update_repository(
346 &self,
347 update: &proto::UpdateRepository,
348 _connection: ConnectionId,
349 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
350 let project_id = ProjectId::from_proto(update.project_id);
351 let repository_id = update.id as i64;
352 self.project_transaction(project_id, |tx| async move {
353 project_repository::Entity::insert(project_repository::ActiveModel {
354 project_id: ActiveValue::set(project_id),
355 id: ActiveValue::set(repository_id),
356 legacy_worktree_id: ActiveValue::set(None),
357 abs_path: ActiveValue::set(update.abs_path.clone()),
358 entry_ids: ActiveValue::Set(serde_json::to_string(&update.entry_ids).unwrap()),
359 scan_id: ActiveValue::set(update.scan_id as i64),
360 is_deleted: ActiveValue::set(false),
361 branch_summary: ActiveValue::Set(
362 update
363 .branch_summary
364 .as_ref()
365 .map(|summary| serde_json::to_string(summary).unwrap()),
366 ),
367 head_commit_details: ActiveValue::Set(
368 update
369 .head_commit_details
370 .as_ref()
371 .map(|details| serde_json::to_string(details).unwrap()),
372 ),
373 current_merge_conflicts: ActiveValue::Set(Some(
374 serde_json::to_string(&update.current_merge_conflicts).unwrap(),
375 )),
376 merge_message: ActiveValue::set(update.merge_message.clone()),
377 remote_upstream_url: ActiveValue::set(update.remote_upstream_url.clone()),
378 remote_origin_url: ActiveValue::set(update.remote_origin_url.clone()),
379 linked_worktrees: ActiveValue::Set(Some(
380 serde_json::to_string(&update.linked_worktrees).unwrap(),
381 )),
382 })
383 .on_conflict(
384 OnConflict::columns([
385 project_repository::Column::ProjectId,
386 project_repository::Column::Id,
387 ])
388 .update_columns([
389 project_repository::Column::ScanId,
390 project_repository::Column::BranchSummary,
391 project_repository::Column::EntryIds,
392 project_repository::Column::AbsPath,
393 project_repository::Column::CurrentMergeConflicts,
394 project_repository::Column::HeadCommitDetails,
395 project_repository::Column::MergeMessage,
396 project_repository::Column::LinkedWorktrees,
397 ])
398 .to_owned(),
399 )
400 .exec(&*tx)
401 .await?;
402
403 let has_any_statuses = !update.updated_statuses.is_empty();
404
405 if has_any_statuses {
406 project_repository_statuses::Entity::insert_many(
407 update.updated_statuses.iter().map(|status_entry| {
408 let (repo_path, status_kind, first_status, second_status) =
409 proto_status_to_db(status_entry.clone());
410 project_repository_statuses::ActiveModel {
411 project_id: ActiveValue::set(project_id),
412 repository_id: ActiveValue::set(repository_id),
413 scan_id: ActiveValue::set(update.scan_id as i64),
414 is_deleted: ActiveValue::set(false),
415 repo_path: ActiveValue::set(repo_path),
416 status: ActiveValue::set(0),
417 status_kind: ActiveValue::set(status_kind),
418 first_status: ActiveValue::set(first_status),
419 second_status: ActiveValue::set(second_status),
420 lines_added: ActiveValue::set(
421 status_entry.diff_stat_added.map(|v| v as i32),
422 ),
423 lines_deleted: ActiveValue::set(
424 status_entry.diff_stat_deleted.map(|v| v as i32),
425 ),
426 }
427 }),
428 )
429 .on_conflict(
430 OnConflict::columns([
431 project_repository_statuses::Column::ProjectId,
432 project_repository_statuses::Column::RepositoryId,
433 project_repository_statuses::Column::RepoPath,
434 ])
435 .update_columns([
436 project_repository_statuses::Column::ScanId,
437 project_repository_statuses::Column::StatusKind,
438 project_repository_statuses::Column::FirstStatus,
439 project_repository_statuses::Column::SecondStatus,
440 project_repository_statuses::Column::LinesAdded,
441 project_repository_statuses::Column::LinesDeleted,
442 ])
443 .to_owned(),
444 )
445 .exec(&*tx)
446 .await?;
447 }
448
449 let has_any_removed_statuses = !update.removed_statuses.is_empty();
450
451 if has_any_removed_statuses {
452 project_repository_statuses::Entity::update_many()
453 .filter(
454 project_repository_statuses::Column::ProjectId
455 .eq(project_id)
456 .and(
457 project_repository_statuses::Column::RepositoryId.eq(repository_id),
458 )
459 .and(
460 project_repository_statuses::Column::RepoPath
461 .is_in(update.removed_statuses.iter()),
462 ),
463 )
464 .set(project_repository_statuses::ActiveModel {
465 is_deleted: ActiveValue::Set(true),
466 scan_id: ActiveValue::Set(update.scan_id as i64),
467 ..Default::default()
468 })
469 .exec(&*tx)
470 .await?;
471 }
472
473 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
474 Ok(connection_ids)
475 })
476 .await
477 }
478
479 pub async fn remove_repository(
480 &self,
481 remove: &proto::RemoveRepository,
482 _connection: ConnectionId,
483 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
484 let project_id = ProjectId::from_proto(remove.project_id);
485 let repository_id = remove.id as i64;
486 self.project_transaction(project_id, |tx| async move {
487 project_repository::Entity::update_many()
488 .filter(
489 project_repository::Column::ProjectId
490 .eq(project_id)
491 .and(project_repository::Column::Id.eq(repository_id)),
492 )
493 .set(project_repository::ActiveModel {
494 is_deleted: ActiveValue::Set(true),
495 // scan_id: ActiveValue::Set(update.scan_id as i64),
496 ..Default::default()
497 })
498 .exec(&*tx)
499 .await?;
500
501 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
502 Ok(connection_ids)
503 })
504 .await
505 }
506
507 /// Updates the diagnostic summary for the given connection.
508 pub async fn update_diagnostic_summary(
509 &self,
510 update: &proto::UpdateDiagnosticSummary,
511 connection: ConnectionId,
512 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
513 let project_id = ProjectId::from_proto(update.project_id);
514 let worktree_id = update.worktree_id as i64;
515 self.project_transaction(project_id, |tx| async move {
516 let summary = update.summary.as_ref().context("invalid summary")?;
517
518 // Ensure the update comes from the host.
519 let project = project::Entity::find_by_id(project_id)
520 .one(&*tx)
521 .await?
522 .context("no such project")?;
523 if project.host_connection()? != connection {
524 return Err(anyhow!("can't update a project hosted by someone else"))?;
525 }
526
527 // Update summary.
528 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
529 project_id: ActiveValue::set(project_id),
530 worktree_id: ActiveValue::set(worktree_id),
531 path: ActiveValue::set(summary.path.clone()),
532 language_server_id: ActiveValue::set(summary.language_server_id as i64),
533 error_count: ActiveValue::set(summary.error_count as i32),
534 warning_count: ActiveValue::set(summary.warning_count as i32),
535 })
536 .on_conflict(
537 OnConflict::columns([
538 worktree_diagnostic_summary::Column::ProjectId,
539 worktree_diagnostic_summary::Column::WorktreeId,
540 worktree_diagnostic_summary::Column::Path,
541 ])
542 .update_columns([
543 worktree_diagnostic_summary::Column::LanguageServerId,
544 worktree_diagnostic_summary::Column::ErrorCount,
545 worktree_diagnostic_summary::Column::WarningCount,
546 ])
547 .to_owned(),
548 )
549 .exec(&*tx)
550 .await?;
551
552 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
553 Ok(connection_ids)
554 })
555 .await
556 }
557
558 /// Starts the language server for the given connection.
559 pub async fn start_language_server(
560 &self,
561 update: &proto::StartLanguageServer,
562 connection: ConnectionId,
563 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
564 let project_id = ProjectId::from_proto(update.project_id);
565 self.project_transaction(project_id, |tx| async move {
566 let server = update.server.as_ref().context("invalid language server")?;
567
568 // Ensure the update comes from the host.
569 let project = project::Entity::find_by_id(project_id)
570 .one(&*tx)
571 .await?
572 .context("no such project")?;
573 if project.host_connection()? != connection {
574 return Err(anyhow!("can't update a project hosted by someone else"))?;
575 }
576
577 // Add the newly-started language server.
578 language_server::Entity::insert(language_server::ActiveModel {
579 project_id: ActiveValue::set(project_id),
580 id: ActiveValue::set(server.id as i64),
581 name: ActiveValue::set(server.name.clone()),
582 worktree_id: ActiveValue::set(server.worktree_id.map(|id| id as i64)),
583 capabilities: ActiveValue::set(update.capabilities.clone()),
584 })
585 .on_conflict(
586 OnConflict::columns([
587 language_server::Column::ProjectId,
588 language_server::Column::Id,
589 ])
590 .update_columns([
591 language_server::Column::Name,
592 language_server::Column::Capabilities,
593 language_server::Column::WorktreeId,
594 ])
595 .to_owned(),
596 )
597 .exec(&*tx)
598 .await?;
599
600 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
601 Ok(connection_ids)
602 })
603 .await
604 }
605
606 /// Updates the worktree settings for the given connection.
607 pub async fn update_worktree_settings(
608 &self,
609 update: &proto::UpdateWorktreeSettings,
610 connection: ConnectionId,
611 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
612 let project_id = ProjectId::from_proto(update.project_id);
613 let kind = match update.kind {
614 Some(kind) => proto::LocalSettingsKind::from_i32(kind)
615 .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
616 None => proto::LocalSettingsKind::Settings,
617 };
618 let kind = LocalSettingsKind::from_proto(kind);
619 self.project_transaction(project_id, |tx| async move {
620 // Ensure the update comes from the host.
621 let project = project::Entity::find_by_id(project_id)
622 .one(&*tx)
623 .await?
624 .context("no such project")?;
625 if project.host_connection()? != connection {
626 return Err(anyhow!("can't update a project hosted by someone else"))?;
627 }
628
629 if let Some(content) = &update.content {
630 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
631 project_id: ActiveValue::Set(project_id),
632 worktree_id: ActiveValue::Set(update.worktree_id as i64),
633 path: ActiveValue::Set(update.path.clone()),
634 content: ActiveValue::Set(content.clone()),
635 kind: ActiveValue::Set(kind),
636 outside_worktree: ActiveValue::Set(update.outside_worktree.unwrap_or(false)),
637 })
638 .on_conflict(
639 OnConflict::columns([
640 worktree_settings_file::Column::ProjectId,
641 worktree_settings_file::Column::WorktreeId,
642 worktree_settings_file::Column::Path,
643 ])
644 .update_columns([
645 worktree_settings_file::Column::Content,
646 worktree_settings_file::Column::OutsideWorktree,
647 ])
648 .to_owned(),
649 )
650 .exec(&*tx)
651 .await?;
652 } else {
653 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
654 project_id: ActiveValue::Set(project_id),
655 worktree_id: ActiveValue::Set(update.worktree_id as i64),
656 path: ActiveValue::Set(update.path.clone()),
657 ..Default::default()
658 })
659 .exec(&*tx)
660 .await?;
661 }
662
663 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
664 Ok(connection_ids)
665 })
666 .await
667 }
668
669 pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
670 self.transaction(|tx| async move {
671 Ok(project::Entity::find_by_id(id)
672 .one(&*tx)
673 .await?
674 .context("no such project")?)
675 })
676 .await
677 }
678
679 /// Adds the given connection to the specified project
680 /// in the current room.
681 pub async fn join_project(
682 &self,
683 project_id: ProjectId,
684 connection: ConnectionId,
685 user_id: UserId,
686 committer_name: Option<String>,
687 committer_email: Option<String>,
688 ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
689 self.project_transaction(project_id, move |tx| {
690 let committer_name = committer_name.clone();
691 let committer_email = committer_email.clone();
692 async move {
693 let (project, role) = self
694 .access_project(project_id, connection, Capability::ReadOnly, &tx)
695 .await?;
696 self.join_project_internal(
697 project,
698 user_id,
699 committer_name,
700 committer_email,
701 connection,
702 role,
703 &tx,
704 )
705 .await
706 }
707 })
708 .await
709 }
710
711 async fn join_project_internal(
712 &self,
713 project: project::Model,
714 user_id: UserId,
715 committer_name: Option<String>,
716 committer_email: Option<String>,
717 connection: ConnectionId,
718 role: ChannelRole,
719 tx: &DatabaseTransaction,
720 ) -> Result<(Project, ReplicaId)> {
721 let mut collaborators = project
722 .find_related(project_collaborator::Entity)
723 .all(tx)
724 .await?;
725 let replica_ids = collaborators
726 .iter()
727 .map(|c| c.replica_id)
728 .collect::<HashSet<_>>();
729 let mut replica_id = ReplicaId(clock::ReplicaId::FIRST_COLLAB_ID.as_u16() as i32);
730 while replica_ids.contains(&replica_id) {
731 replica_id.0 += 1;
732 }
733 let new_collaborator = project_collaborator::ActiveModel {
734 project_id: ActiveValue::set(project.id),
735 connection_id: ActiveValue::set(connection.id as i32),
736 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
737 user_id: ActiveValue::set(user_id),
738 replica_id: ActiveValue::set(replica_id),
739 is_host: ActiveValue::set(false),
740 id: ActiveValue::NotSet,
741 committer_name: ActiveValue::set(committer_name),
742 committer_email: ActiveValue::set(committer_email),
743 }
744 .insert(tx)
745 .await?;
746 collaborators.push(new_collaborator);
747
748 let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
749 let mut worktrees = db_worktrees
750 .into_iter()
751 .map(|db_worktree| {
752 (
753 db_worktree.id as u64,
754 Worktree {
755 id: db_worktree.id as u64,
756 abs_path: db_worktree.abs_path,
757 root_name: db_worktree.root_name,
758 visible: db_worktree.visible,
759 entries: Default::default(),
760 diagnostic_summaries: Default::default(),
761 settings_files: Default::default(),
762 scan_id: db_worktree.scan_id as u64,
763 completed_scan_id: db_worktree.completed_scan_id as u64,
764 legacy_repository_entries: Default::default(),
765 },
766 )
767 })
768 .collect::<BTreeMap<_, _>>();
769
770 // Populate worktree entries.
771 {
772 let mut db_entries = worktree_entry::Entity::find()
773 .filter(
774 Condition::all()
775 .add(worktree_entry::Column::ProjectId.eq(project.id))
776 .add(worktree_entry::Column::IsDeleted.eq(false)),
777 )
778 .stream(tx)
779 .await?;
780 while let Some(db_entry) = db_entries.next().await {
781 let db_entry = db_entry?;
782 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
783 worktree.entries.push(proto::Entry {
784 id: db_entry.id as u64,
785 is_dir: db_entry.is_dir,
786 path: db_entry.path,
787 inode: db_entry.inode as u64,
788 mtime: Some(proto::Timestamp {
789 seconds: db_entry.mtime_seconds as u64,
790 nanos: db_entry.mtime_nanos as u32,
791 }),
792 canonical_path: db_entry.canonical_path,
793 is_ignored: db_entry.is_ignored,
794 is_external: db_entry.is_external,
795 is_hidden: db_entry.is_hidden,
796 // This is only used in the summarization backlog, so if it's None,
797 // that just means we won't be able to detect when to resummarize
798 // based on total number of backlogged bytes - instead, we'd go
799 // on number of files only. That shouldn't be a huge deal in practice.
800 size: None,
801 is_fifo: db_entry.is_fifo,
802 });
803 }
804 }
805 }
806
807 // Populate repository entries.
808 let mut repositories = Vec::new();
809 {
810 let db_repository_entries = project_repository::Entity::find()
811 .filter(
812 Condition::all()
813 .add(project_repository::Column::ProjectId.eq(project.id))
814 .add(project_repository::Column::IsDeleted.eq(false)),
815 )
816 .all(tx)
817 .await?;
818 for db_repository_entry in db_repository_entries {
819 let mut repository_statuses = project_repository_statuses::Entity::find()
820 .filter(
821 Condition::all()
822 .add(project_repository_statuses::Column::ProjectId.eq(project.id))
823 .add(
824 project_repository_statuses::Column::RepositoryId
825 .eq(db_repository_entry.id),
826 )
827 .add(project_repository_statuses::Column::IsDeleted.eq(false)),
828 )
829 .stream(tx)
830 .await?;
831 let mut updated_statuses = Vec::new();
832 while let Some(status_entry) = repository_statuses.next().await {
833 let status_entry = status_entry?;
834 updated_statuses.push(db_status_to_proto(status_entry)?);
835 }
836
837 let current_merge_conflicts = db_repository_entry
838 .current_merge_conflicts
839 .as_ref()
840 .map(|conflicts| serde_json::from_str(conflicts))
841 .transpose()?
842 .unwrap_or_default();
843
844 let branch_summary = db_repository_entry
845 .branch_summary
846 .as_ref()
847 .map(|branch_summary| serde_json::from_str(branch_summary))
848 .transpose()?
849 .unwrap_or_default();
850
851 let head_commit_details = db_repository_entry
852 .head_commit_details
853 .as_ref()
854 .map(|head_commit_details| serde_json::from_str(head_commit_details))
855 .transpose()?
856 .unwrap_or_default();
857
858 let entry_ids = serde_json::from_str(&db_repository_entry.entry_ids)
859 .context("failed to deserialize repository's entry ids")?;
860
861 if let Some(worktree_id) = db_repository_entry.legacy_worktree_id {
862 if let Some(worktree) = worktrees.get_mut(&(worktree_id as u64)) {
863 worktree.legacy_repository_entries.insert(
864 db_repository_entry.id as u64,
865 proto::RepositoryEntry {
866 repository_id: db_repository_entry.id as u64,
867 updated_statuses,
868 removed_statuses: Vec::new(),
869 current_merge_conflicts,
870 branch_summary,
871 },
872 );
873 }
874 } else {
875 repositories.push(proto::UpdateRepository {
876 project_id: db_repository_entry.project_id.0 as u64,
877 id: db_repository_entry.id as u64,
878 abs_path: db_repository_entry.abs_path.clone(),
879 entry_ids,
880 updated_statuses,
881 removed_statuses: Vec::new(),
882 current_merge_conflicts,
883 branch_summary,
884 head_commit_details,
885 scan_id: db_repository_entry.scan_id as u64,
886 is_last_update: true,
887 merge_message: db_repository_entry.merge_message,
888 stash_entries: Vec::new(),
889 remote_upstream_url: db_repository_entry.remote_upstream_url.clone(),
890 remote_origin_url: db_repository_entry.remote_origin_url.clone(),
891 original_repo_abs_path: Some(db_repository_entry.abs_path),
892 linked_worktrees: db_repository_entry
893 .linked_worktrees
894 .as_deref()
895 .and_then(|s| serde_json::from_str(s).ok())
896 .unwrap_or_default(),
897 });
898 }
899 }
900 }
901
902 // Populate worktree diagnostic summaries.
903 {
904 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
905 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
906 .stream(tx)
907 .await?;
908 while let Some(db_summary) = db_summaries.next().await {
909 let db_summary = db_summary?;
910 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
911 worktree
912 .diagnostic_summaries
913 .push(proto::DiagnosticSummary {
914 path: db_summary.path,
915 language_server_id: db_summary.language_server_id as u64,
916 error_count: db_summary.error_count as u32,
917 warning_count: db_summary.warning_count as u32,
918 });
919 }
920 }
921 }
922
923 // Populate worktree settings files
924 {
925 let mut db_settings_files = worktree_settings_file::Entity::find()
926 .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
927 .stream(tx)
928 .await?;
929 while let Some(db_settings_file) = db_settings_files.next().await {
930 let db_settings_file = db_settings_file?;
931 if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
932 worktree.settings_files.push(WorktreeSettingsFile {
933 path: db_settings_file.path,
934 content: db_settings_file.content,
935 kind: db_settings_file.kind,
936 outside_worktree: db_settings_file.outside_worktree,
937 });
938 }
939 }
940 }
941
942 // Populate language servers.
943 let language_servers = project
944 .find_related(language_server::Entity)
945 .all(tx)
946 .await?;
947
948 let path_style = if project.windows_paths {
949 PathStyle::Windows
950 } else {
951 PathStyle::Posix
952 };
953 let features: Vec<String> = serde_json::from_str(&project.features).unwrap_or_default();
954
955 let project = Project {
956 id: project.id,
957 role,
958 collaborators: collaborators
959 .into_iter()
960 .map(|collaborator| ProjectCollaborator {
961 connection_id: collaborator.connection(),
962 user_id: collaborator.user_id,
963 replica_id: collaborator.replica_id,
964 is_host: collaborator.is_host,
965 committer_name: collaborator.committer_name,
966 committer_email: collaborator.committer_email,
967 })
968 .collect(),
969 worktrees,
970 repositories,
971 language_servers: language_servers
972 .into_iter()
973 .map(|language_server| LanguageServer {
974 server: proto::LanguageServer {
975 id: language_server.id as u64,
976 name: language_server.name,
977 worktree_id: language_server.worktree_id.map(|id| id as u64),
978 },
979 capabilities: language_server.capabilities,
980 })
981 .collect(),
982 path_style,
983 features,
984 };
985 Ok((project, replica_id as ReplicaId))
986 }
987
988 /// Removes the given connection from the specified project.
989 pub async fn leave_project(
990 &self,
991 project_id: ProjectId,
992 connection: ConnectionId,
993 ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
994 self.project_transaction(project_id, |tx| async move {
995 let result = project_collaborator::Entity::delete_many()
996 .filter(
997 Condition::all()
998 .add(project_collaborator::Column::ProjectId.eq(project_id))
999 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
1000 .add(
1001 project_collaborator::Column::ConnectionServerId
1002 .eq(connection.owner_id as i32),
1003 ),
1004 )
1005 .exec(&*tx)
1006 .await?;
1007 if result.rows_affected == 0 {
1008 Err(anyhow!("not a collaborator on this project"))?;
1009 }
1010
1011 let project = project::Entity::find_by_id(project_id)
1012 .one(&*tx)
1013 .await?
1014 .context("no such project")?;
1015 let collaborators = project
1016 .find_related(project_collaborator::Entity)
1017 .all(&*tx)
1018 .await?;
1019 let connection_ids: Vec<ConnectionId> = collaborators
1020 .into_iter()
1021 .map(|collaborator| collaborator.connection())
1022 .collect();
1023
1024 follower::Entity::delete_many()
1025 .filter(
1026 Condition::any()
1027 .add(
1028 Condition::all()
1029 .add(follower::Column::ProjectId.eq(Some(project_id)))
1030 .add(
1031 follower::Column::LeaderConnectionServerId
1032 .eq(connection.owner_id),
1033 )
1034 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
1035 )
1036 .add(
1037 Condition::all()
1038 .add(follower::Column::ProjectId.eq(Some(project_id)))
1039 .add(
1040 follower::Column::FollowerConnectionServerId
1041 .eq(connection.owner_id),
1042 )
1043 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
1044 ),
1045 )
1046 .exec(&*tx)
1047 .await?;
1048
1049 let room = if let Some(room_id) = project.room_id {
1050 Some(self.get_room(room_id, &tx).await?)
1051 } else {
1052 None
1053 };
1054
1055 let left_project = LeftProject {
1056 id: project_id,
1057 should_unshare: connection == project.host_connection()?,
1058 connection_ids,
1059 };
1060 Ok((room, left_project))
1061 })
1062 .await
1063 }
1064
1065 pub async fn check_user_is_project_host(
1066 &self,
1067 project_id: ProjectId,
1068 connection_id: ConnectionId,
1069 ) -> Result<()> {
1070 self.project_transaction(project_id, |tx| async move {
1071 project::Entity::find()
1072 .filter(
1073 Condition::all()
1074 .add(project::Column::Id.eq(project_id))
1075 .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
1076 .add(
1077 project::Column::HostConnectionServerId
1078 .eq(Some(connection_id.owner_id as i32)),
1079 ),
1080 )
1081 .one(&*tx)
1082 .await?
1083 .context("failed to read project host")?;
1084
1085 Ok(())
1086 })
1087 .await
1088 .map(|guard| guard.into_inner())
1089 }
1090
1091 /// Returns the current project if the given user is authorized to access it with the specified capability.
1092 pub async fn access_project(
1093 &self,
1094 project_id: ProjectId,
1095 connection_id: ConnectionId,
1096 capability: Capability,
1097 tx: &DatabaseTransaction,
1098 ) -> Result<(project::Model, ChannelRole)> {
1099 let project = project::Entity::find_by_id(project_id)
1100 .one(tx)
1101 .await?
1102 .context("no such project")?;
1103
1104 let role_from_room = if let Some(room_id) = project.room_id {
1105 room_participant::Entity::find()
1106 .filter(room_participant::Column::RoomId.eq(room_id))
1107 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1108 .one(tx)
1109 .await?
1110 .and_then(|participant| participant.role)
1111 } else {
1112 None
1113 };
1114
1115 let role = role_from_room.unwrap_or(ChannelRole::Banned);
1116
1117 match capability {
1118 Capability::ReadWrite => {
1119 if !role.can_edit_projects() {
1120 return Err(anyhow!("not authorized to edit projects"))?;
1121 }
1122 }
1123 Capability::ReadOnly => {
1124 if !role.can_read_projects() {
1125 return Err(anyhow!("not authorized to read projects"))?;
1126 }
1127 }
1128 }
1129
1130 Ok((project, role))
1131 }
1132
1133 /// Returns the host connection for a read-only request to join a shared project.
1134 pub async fn host_for_read_only_project_request(
1135 &self,
1136 project_id: ProjectId,
1137 connection_id: ConnectionId,
1138 ) -> Result<ConnectionId> {
1139 self.project_transaction(project_id, |tx| async move {
1140 let (project, _) = self
1141 .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1142 .await?;
1143 project.host_connection()
1144 })
1145 .await
1146 .map(|guard| guard.into_inner())
1147 }
1148
1149 /// Returns the host connection for a request to join a shared project.
1150 pub async fn host_for_mutating_project_request(
1151 &self,
1152 project_id: ProjectId,
1153 connection_id: ConnectionId,
1154 ) -> Result<ConnectionId> {
1155 self.project_transaction(project_id, |tx| async move {
1156 let (project, _) = self
1157 .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1158 .await?;
1159 project.host_connection()
1160 })
1161 .await
1162 .map(|guard| guard.into_inner())
1163 }
1164
1165 pub async fn connections_for_buffer_update(
1166 &self,
1167 project_id: ProjectId,
1168 connection_id: ConnectionId,
1169 capability: Capability,
1170 ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1171 self.project_transaction(project_id, |tx| async move {
1172 // Authorize
1173 let (project, _) = self
1174 .access_project(project_id, connection_id, capability, &tx)
1175 .await?;
1176
1177 let host_connection_id = project.host_connection()?;
1178
1179 let collaborators = project_collaborator::Entity::find()
1180 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1181 .all(&*tx)
1182 .await?;
1183
1184 let guest_connection_ids = collaborators
1185 .into_iter()
1186 .filter_map(|collaborator| {
1187 if collaborator.is_host {
1188 None
1189 } else {
1190 Some(collaborator.connection())
1191 }
1192 })
1193 .collect();
1194
1195 Ok((host_connection_id, guest_connection_ids))
1196 })
1197 .await
1198 }
1199
1200 /// Returns the connection IDs in the given project.
1201 ///
1202 /// The provided `connection_id` must also be a collaborator in the project,
1203 /// otherwise an error will be returned.
1204 pub async fn project_connection_ids(
1205 &self,
1206 project_id: ProjectId,
1207 connection_id: ConnectionId,
1208 exclude_dev_server: bool,
1209 ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1210 self.project_transaction(project_id, |tx| async move {
1211 self.internal_project_connection_ids(project_id, connection_id, exclude_dev_server, &tx)
1212 .await
1213 })
1214 .await
1215 }
1216
1217 async fn internal_project_connection_ids(
1218 &self,
1219 project_id: ProjectId,
1220 connection_id: ConnectionId,
1221 exclude_dev_server: bool,
1222 tx: &DatabaseTransaction,
1223 ) -> Result<HashSet<ConnectionId>> {
1224 let project = project::Entity::find_by_id(project_id)
1225 .one(tx)
1226 .await?
1227 .context("no such project")?;
1228
1229 let mut collaborators = project_collaborator::Entity::find()
1230 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1231 .stream(tx)
1232 .await?;
1233
1234 let mut connection_ids = HashSet::default();
1235 if let Some(host_connection) = project.host_connection().log_err()
1236 && !exclude_dev_server
1237 {
1238 connection_ids.insert(host_connection);
1239 }
1240
1241 while let Some(collaborator) = collaborators.next().await {
1242 let collaborator = collaborator?;
1243 connection_ids.insert(collaborator.connection());
1244 }
1245
1246 if connection_ids.contains(&connection_id)
1247 || Some(connection_id) == project.host_connection().ok()
1248 {
1249 Ok(connection_ids)
1250 } else {
1251 Err(anyhow!(
1252 "can only send project updates to a project you're in"
1253 ))?
1254 }
1255 }
1256
1257 async fn project_guest_connection_ids(
1258 &self,
1259 project_id: ProjectId,
1260 tx: &DatabaseTransaction,
1261 ) -> Result<Vec<ConnectionId>> {
1262 let mut collaborators = project_collaborator::Entity::find()
1263 .filter(
1264 project_collaborator::Column::ProjectId
1265 .eq(project_id)
1266 .and(project_collaborator::Column::IsHost.eq(false)),
1267 )
1268 .stream(tx)
1269 .await?;
1270
1271 let mut guest_connection_ids = Vec::new();
1272 while let Some(collaborator) = collaborators.next().await {
1273 let collaborator = collaborator?;
1274 guest_connection_ids.push(collaborator.connection());
1275 }
1276 Ok(guest_connection_ids)
1277 }
1278
1279 /// Returns the [`RoomId`] for the given project.
1280 pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1281 self.transaction(|tx| async move {
1282 Ok(project::Entity::find_by_id(project_id)
1283 .one(&*tx)
1284 .await?
1285 .and_then(|project| project.room_id))
1286 })
1287 .await
1288 }
1289
1290 pub async fn check_room_participants(
1291 &self,
1292 room_id: RoomId,
1293 leader_id: ConnectionId,
1294 follower_id: ConnectionId,
1295 ) -> Result<()> {
1296 self.transaction(|tx| async move {
1297 use room_participant::Column;
1298
1299 let count = room_participant::Entity::find()
1300 .filter(
1301 Condition::all().add(Column::RoomId.eq(room_id)).add(
1302 Condition::any()
1303 .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1304 Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1305 ))
1306 .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1307 Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1308 )),
1309 ),
1310 )
1311 .count(&*tx)
1312 .await?;
1313
1314 if count < 2 {
1315 Err(anyhow!("not room participants"))?;
1316 }
1317
1318 Ok(())
1319 })
1320 .await
1321 }
1322
1323 /// Adds the given follower connection as a follower of the given leader connection.
1324 pub async fn follow(
1325 &self,
1326 room_id: RoomId,
1327 project_id: ProjectId,
1328 leader_connection: ConnectionId,
1329 follower_connection: ConnectionId,
1330 ) -> Result<TransactionGuard<proto::Room>> {
1331 self.room_transaction(room_id, |tx| async move {
1332 follower::ActiveModel {
1333 room_id: ActiveValue::set(room_id),
1334 project_id: ActiveValue::set(project_id),
1335 leader_connection_server_id: ActiveValue::set(ServerId(
1336 leader_connection.owner_id as i32,
1337 )),
1338 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1339 follower_connection_server_id: ActiveValue::set(ServerId(
1340 follower_connection.owner_id as i32,
1341 )),
1342 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1343 ..Default::default()
1344 }
1345 .insert(&*tx)
1346 .await?;
1347
1348 let room = self.get_room(room_id, &tx).await?;
1349 Ok(room)
1350 })
1351 .await
1352 }
1353
1354 /// Removes the given follower connection as a follower of the given leader connection.
1355 pub async fn unfollow(
1356 &self,
1357 room_id: RoomId,
1358 project_id: ProjectId,
1359 leader_connection: ConnectionId,
1360 follower_connection: ConnectionId,
1361 ) -> Result<TransactionGuard<proto::Room>> {
1362 self.room_transaction(room_id, |tx| async move {
1363 follower::Entity::delete_many()
1364 .filter(
1365 Condition::all()
1366 .add(follower::Column::RoomId.eq(room_id))
1367 .add(follower::Column::ProjectId.eq(project_id))
1368 .add(
1369 follower::Column::LeaderConnectionServerId
1370 .eq(leader_connection.owner_id),
1371 )
1372 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1373 .add(
1374 follower::Column::FollowerConnectionServerId
1375 .eq(follower_connection.owner_id),
1376 )
1377 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1378 )
1379 .exec(&*tx)
1380 .await?;
1381
1382 let room = self.get_room(room_id, &tx).await?;
1383 Ok(room)
1384 })
1385 .await
1386 }
1387}