1use anyhow::Context as _;
2use collections::HashSet;
3use util::ResultExt;
4
5use super::*;
6
7impl Database {
8 /// Returns the count of all projects, excluding ones marked as admin.
9 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
10 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
11 enum QueryAs {
12 Count,
13 }
14
15 self.transaction(|tx| async move {
16 Ok(project::Entity::find()
17 .select_only()
18 .column_as(project::Column::Id.count(), QueryAs::Count)
19 .inner_join(user::Entity)
20 .filter(user::Column::Admin.eq(false))
21 .into_values::<_, QueryAs>()
22 .one(&*tx)
23 .await?
24 .unwrap_or(0i64) as usize)
25 })
26 .await
27 }
28
29 /// Shares a project with the given room.
30 pub async fn share_project(
31 &self,
32 room_id: RoomId,
33 connection: ConnectionId,
34 worktrees: &[proto::WorktreeMetadata],
35 is_ssh_project: bool,
36 windows_paths: bool,
37 ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
38 self.room_transaction(room_id, |tx| async move {
39 let participant = room_participant::Entity::find()
40 .filter(
41 Condition::all()
42 .add(
43 room_participant::Column::AnsweringConnectionId
44 .eq(connection.id as i32),
45 )
46 .add(
47 room_participant::Column::AnsweringConnectionServerId
48 .eq(connection.owner_id as i32),
49 ),
50 )
51 .one(&*tx)
52 .await?
53 .context("could not find participant")?;
54 if participant.room_id != room_id {
55 return Err(anyhow!("shared project on unexpected room"))?;
56 }
57 if !participant
58 .role
59 .unwrap_or(ChannelRole::Member)
60 .can_edit_projects()
61 {
62 return Err(anyhow!("guests cannot share projects"))?;
63 }
64
65 let project = project::ActiveModel {
66 room_id: ActiveValue::set(Some(participant.room_id)),
67 host_user_id: ActiveValue::set(Some(participant.user_id)),
68 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
69 host_connection_server_id: ActiveValue::set(Some(ServerId(
70 connection.owner_id as i32,
71 ))),
72 id: ActiveValue::NotSet,
73 windows_paths: ActiveValue::set(windows_paths),
74 }
75 .insert(&*tx)
76 .await?;
77
78 if !worktrees.is_empty() {
79 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
80 worktree::ActiveModel {
81 id: ActiveValue::set(worktree.id as i64),
82 project_id: ActiveValue::set(project.id),
83 abs_path: ActiveValue::set(worktree.abs_path.clone()),
84 root_name: ActiveValue::set(worktree.root_name.clone()),
85 visible: ActiveValue::set(worktree.visible),
86 scan_id: ActiveValue::set(0),
87 completed_scan_id: ActiveValue::set(0),
88 }
89 }))
90 .exec(&*tx)
91 .await?;
92 }
93
94 let replica_id = if is_ssh_project {
95 clock::ReplicaId::REMOTE_SERVER
96 } else {
97 clock::ReplicaId::LOCAL
98 };
99
100 project_collaborator::ActiveModel {
101 project_id: ActiveValue::set(project.id),
102 connection_id: ActiveValue::set(connection.id as i32),
103 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
104 user_id: ActiveValue::set(participant.user_id),
105 replica_id: ActiveValue::set(ReplicaId(replica_id.as_u16() as i32)),
106 is_host: ActiveValue::set(true),
107 id: ActiveValue::NotSet,
108 committer_name: ActiveValue::Set(None),
109 committer_email: ActiveValue::Set(None),
110 }
111 .insert(&*tx)
112 .await?;
113
114 let room = self.get_room(room_id, &tx).await?;
115 Ok((project.id, room))
116 })
117 .await
118 }
119
120 pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
121 self.transaction(|tx| async move {
122 project::Entity::delete_by_id(project_id).exec(&*tx).await?;
123 Ok(())
124 })
125 .await
126 }
127
128 /// Unshares the given project.
129 pub async fn unshare_project(
130 &self,
131 project_id: ProjectId,
132 connection: ConnectionId,
133 ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
134 self.project_transaction(project_id, |tx| async move {
135 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
136 let project = project::Entity::find_by_id(project_id)
137 .one(&*tx)
138 .await?
139 .context("project not found")?;
140 let room = if let Some(room_id) = project.room_id {
141 Some(self.get_room(room_id, &tx).await?)
142 } else {
143 None
144 };
145 if project.host_connection()? == connection {
146 return Ok((true, room, guest_connection_ids));
147 }
148 Err(anyhow!("cannot unshare a project hosted by another user"))?
149 })
150 .await
151 }
152
153 /// Updates the worktrees associated with the given project.
154 pub async fn update_project(
155 &self,
156 project_id: ProjectId,
157 connection: ConnectionId,
158 worktrees: &[proto::WorktreeMetadata],
159 ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
160 self.project_transaction(project_id, |tx| async move {
161 let project = project::Entity::find_by_id(project_id)
162 .filter(
163 Condition::all()
164 .add(project::Column::HostConnectionId.eq(connection.id as i32))
165 .add(
166 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
167 ),
168 )
169 .one(&*tx)
170 .await?
171 .context("no such project")?;
172
173 self.update_project_worktrees(project.id, worktrees, &tx)
174 .await?;
175
176 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
177
178 let room = if let Some(room_id) = project.room_id {
179 Some(self.get_room(room_id, &tx).await?)
180 } else {
181 None
182 };
183
184 Ok((room, guest_connection_ids))
185 })
186 .await
187 }
188
189 pub(in crate::db) async fn update_project_worktrees(
190 &self,
191 project_id: ProjectId,
192 worktrees: &[proto::WorktreeMetadata],
193 tx: &DatabaseTransaction,
194 ) -> Result<()> {
195 if !worktrees.is_empty() {
196 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
197 id: ActiveValue::set(worktree.id as i64),
198 project_id: ActiveValue::set(project_id),
199 abs_path: ActiveValue::set(worktree.abs_path.clone()),
200 root_name: ActiveValue::set(worktree.root_name.clone()),
201 visible: ActiveValue::set(worktree.visible),
202 scan_id: ActiveValue::set(0),
203 completed_scan_id: ActiveValue::set(0),
204 }))
205 .on_conflict(
206 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
207 .update_column(worktree::Column::RootName)
208 .to_owned(),
209 )
210 .exec(tx)
211 .await?;
212 }
213
214 worktree::Entity::delete_many()
215 .filter(worktree::Column::ProjectId.eq(project_id).and(
216 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
217 ))
218 .exec(tx)
219 .await?;
220
221 Ok(())
222 }
223
224 pub async fn update_worktree(
225 &self,
226 update: &proto::UpdateWorktree,
227 connection: ConnectionId,
228 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
229 if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
230 || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
231 {
232 return Err(anyhow!(
233 "invalid worktree update. removed entries: {}, updated entries: {}",
234 update.removed_entries.len(),
235 update.updated_entries.len()
236 ))?;
237 }
238
239 let project_id = ProjectId::from_proto(update.project_id);
240 let worktree_id = update.worktree_id as i64;
241 self.project_transaction(project_id, |tx| async move {
242 // Ensure the update comes from the host.
243 let _project = project::Entity::find_by_id(project_id)
244 .filter(
245 Condition::all()
246 .add(project::Column::HostConnectionId.eq(connection.id as i32))
247 .add(
248 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
249 ),
250 )
251 .one(&*tx)
252 .await?
253 .with_context(|| format!("no such project: {project_id}"))?;
254
255 // Update metadata.
256 worktree::Entity::update(worktree::ActiveModel {
257 id: ActiveValue::set(worktree_id),
258 project_id: ActiveValue::set(project_id),
259 root_name: ActiveValue::set(update.root_name.clone()),
260 scan_id: ActiveValue::set(update.scan_id as i64),
261 completed_scan_id: if update.is_last_update {
262 ActiveValue::set(update.scan_id as i64)
263 } else {
264 ActiveValue::default()
265 },
266 abs_path: ActiveValue::set(update.abs_path.clone()),
267 ..Default::default()
268 })
269 .exec(&*tx)
270 .await?;
271
272 if !update.updated_entries.is_empty() {
273 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
274 let mtime = entry.mtime.clone().unwrap_or_default();
275 worktree_entry::ActiveModel {
276 project_id: ActiveValue::set(project_id),
277 worktree_id: ActiveValue::set(worktree_id),
278 id: ActiveValue::set(entry.id as i64),
279 is_dir: ActiveValue::set(entry.is_dir),
280 path: ActiveValue::set(entry.path.clone()),
281 inode: ActiveValue::set(entry.inode as i64),
282 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
283 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
284 canonical_path: ActiveValue::set(entry.canonical_path.clone()),
285 is_ignored: ActiveValue::set(entry.is_ignored),
286 git_status: ActiveValue::set(None),
287 is_external: ActiveValue::set(entry.is_external),
288 is_deleted: ActiveValue::set(false),
289 is_hidden: ActiveValue::set(entry.is_hidden),
290 scan_id: ActiveValue::set(update.scan_id as i64),
291 is_fifo: ActiveValue::set(entry.is_fifo),
292 }
293 }))
294 .on_conflict(
295 OnConflict::columns([
296 worktree_entry::Column::ProjectId,
297 worktree_entry::Column::WorktreeId,
298 worktree_entry::Column::Id,
299 ])
300 .update_columns([
301 worktree_entry::Column::IsDir,
302 worktree_entry::Column::Path,
303 worktree_entry::Column::Inode,
304 worktree_entry::Column::MtimeSeconds,
305 worktree_entry::Column::MtimeNanos,
306 worktree_entry::Column::CanonicalPath,
307 worktree_entry::Column::IsIgnored,
308 worktree_entry::Column::IsHidden,
309 worktree_entry::Column::ScanId,
310 ])
311 .to_owned(),
312 )
313 .exec(&*tx)
314 .await?;
315 }
316
317 if !update.removed_entries.is_empty() {
318 worktree_entry::Entity::update_many()
319 .filter(
320 worktree_entry::Column::ProjectId
321 .eq(project_id)
322 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
323 .and(
324 worktree_entry::Column::Id
325 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
326 ),
327 )
328 .set(worktree_entry::ActiveModel {
329 is_deleted: ActiveValue::Set(true),
330 scan_id: ActiveValue::Set(update.scan_id as i64),
331 ..Default::default()
332 })
333 .exec(&*tx)
334 .await?;
335 }
336
337 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
338 Ok(connection_ids)
339 })
340 .await
341 }
342
343 pub async fn update_repository(
344 &self,
345 update: &proto::UpdateRepository,
346 _connection: ConnectionId,
347 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
348 let project_id = ProjectId::from_proto(update.project_id);
349 let repository_id = update.id as i64;
350 self.project_transaction(project_id, |tx| async move {
351 project_repository::Entity::insert(project_repository::ActiveModel {
352 project_id: ActiveValue::set(project_id),
353 id: ActiveValue::set(repository_id),
354 legacy_worktree_id: ActiveValue::set(None),
355 abs_path: ActiveValue::set(update.abs_path.clone()),
356 entry_ids: ActiveValue::Set(serde_json::to_string(&update.entry_ids).unwrap()),
357 scan_id: ActiveValue::set(update.scan_id as i64),
358 is_deleted: ActiveValue::set(false),
359 branch_summary: ActiveValue::Set(
360 update
361 .branch_summary
362 .as_ref()
363 .map(|summary| serde_json::to_string(summary).unwrap()),
364 ),
365 head_commit_details: ActiveValue::Set(
366 update
367 .head_commit_details
368 .as_ref()
369 .map(|details| serde_json::to_string(details).unwrap()),
370 ),
371 current_merge_conflicts: ActiveValue::Set(Some(
372 serde_json::to_string(&update.current_merge_conflicts).unwrap(),
373 )),
374 merge_message: ActiveValue::set(update.merge_message.clone()),
375 remote_upstream_url: ActiveValue::set(update.remote_upstream_url.clone()),
376 remote_origin_url: ActiveValue::set(update.remote_origin_url.clone()),
377 })
378 .on_conflict(
379 OnConflict::columns([
380 project_repository::Column::ProjectId,
381 project_repository::Column::Id,
382 ])
383 .update_columns([
384 project_repository::Column::ScanId,
385 project_repository::Column::BranchSummary,
386 project_repository::Column::EntryIds,
387 project_repository::Column::AbsPath,
388 project_repository::Column::CurrentMergeConflicts,
389 project_repository::Column::HeadCommitDetails,
390 project_repository::Column::MergeMessage,
391 ])
392 .to_owned(),
393 )
394 .exec(&*tx)
395 .await?;
396
397 let has_any_statuses = !update.updated_statuses.is_empty();
398
399 if has_any_statuses {
400 project_repository_statuses::Entity::insert_many(
401 update.updated_statuses.iter().map(|status_entry| {
402 let (repo_path, status_kind, first_status, second_status) =
403 proto_status_to_db(status_entry.clone());
404 project_repository_statuses::ActiveModel {
405 project_id: ActiveValue::set(project_id),
406 repository_id: ActiveValue::set(repository_id),
407 scan_id: ActiveValue::set(update.scan_id as i64),
408 is_deleted: ActiveValue::set(false),
409 repo_path: ActiveValue::set(repo_path),
410 status: ActiveValue::set(0),
411 status_kind: ActiveValue::set(status_kind),
412 first_status: ActiveValue::set(first_status),
413 second_status: ActiveValue::set(second_status),
414 lines_added: ActiveValue::set(
415 status_entry.diff_stat_added.map(|v| v as i32),
416 ),
417 lines_deleted: ActiveValue::set(
418 status_entry.diff_stat_deleted.map(|v| v as i32),
419 ),
420 }
421 }),
422 )
423 .on_conflict(
424 OnConflict::columns([
425 project_repository_statuses::Column::ProjectId,
426 project_repository_statuses::Column::RepositoryId,
427 project_repository_statuses::Column::RepoPath,
428 ])
429 .update_columns([
430 project_repository_statuses::Column::ScanId,
431 project_repository_statuses::Column::StatusKind,
432 project_repository_statuses::Column::FirstStatus,
433 project_repository_statuses::Column::SecondStatus,
434 project_repository_statuses::Column::LinesAdded,
435 project_repository_statuses::Column::LinesDeleted,
436 ])
437 .to_owned(),
438 )
439 .exec(&*tx)
440 .await?;
441 }
442
443 let has_any_removed_statuses = !update.removed_statuses.is_empty();
444
445 if has_any_removed_statuses {
446 project_repository_statuses::Entity::update_many()
447 .filter(
448 project_repository_statuses::Column::ProjectId
449 .eq(project_id)
450 .and(
451 project_repository_statuses::Column::RepositoryId.eq(repository_id),
452 )
453 .and(
454 project_repository_statuses::Column::RepoPath
455 .is_in(update.removed_statuses.iter()),
456 ),
457 )
458 .set(project_repository_statuses::ActiveModel {
459 is_deleted: ActiveValue::Set(true),
460 scan_id: ActiveValue::Set(update.scan_id as i64),
461 ..Default::default()
462 })
463 .exec(&*tx)
464 .await?;
465 }
466
467 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
468 Ok(connection_ids)
469 })
470 .await
471 }
472
473 pub async fn remove_repository(
474 &self,
475 remove: &proto::RemoveRepository,
476 _connection: ConnectionId,
477 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
478 let project_id = ProjectId::from_proto(remove.project_id);
479 let repository_id = remove.id as i64;
480 self.project_transaction(project_id, |tx| async move {
481 project_repository::Entity::update_many()
482 .filter(
483 project_repository::Column::ProjectId
484 .eq(project_id)
485 .and(project_repository::Column::Id.eq(repository_id)),
486 )
487 .set(project_repository::ActiveModel {
488 is_deleted: ActiveValue::Set(true),
489 // scan_id: ActiveValue::Set(update.scan_id as i64),
490 ..Default::default()
491 })
492 .exec(&*tx)
493 .await?;
494
495 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
496 Ok(connection_ids)
497 })
498 .await
499 }
500
501 /// Updates the diagnostic summary for the given connection.
502 pub async fn update_diagnostic_summary(
503 &self,
504 update: &proto::UpdateDiagnosticSummary,
505 connection: ConnectionId,
506 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
507 let project_id = ProjectId::from_proto(update.project_id);
508 let worktree_id = update.worktree_id as i64;
509 self.project_transaction(project_id, |tx| async move {
510 let summary = update.summary.as_ref().context("invalid summary")?;
511
512 // Ensure the update comes from the host.
513 let project = project::Entity::find_by_id(project_id)
514 .one(&*tx)
515 .await?
516 .context("no such project")?;
517 if project.host_connection()? != connection {
518 return Err(anyhow!("can't update a project hosted by someone else"))?;
519 }
520
521 // Update summary.
522 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
523 project_id: ActiveValue::set(project_id),
524 worktree_id: ActiveValue::set(worktree_id),
525 path: ActiveValue::set(summary.path.clone()),
526 language_server_id: ActiveValue::set(summary.language_server_id as i64),
527 error_count: ActiveValue::set(summary.error_count as i32),
528 warning_count: ActiveValue::set(summary.warning_count as i32),
529 })
530 .on_conflict(
531 OnConflict::columns([
532 worktree_diagnostic_summary::Column::ProjectId,
533 worktree_diagnostic_summary::Column::WorktreeId,
534 worktree_diagnostic_summary::Column::Path,
535 ])
536 .update_columns([
537 worktree_diagnostic_summary::Column::LanguageServerId,
538 worktree_diagnostic_summary::Column::ErrorCount,
539 worktree_diagnostic_summary::Column::WarningCount,
540 ])
541 .to_owned(),
542 )
543 .exec(&*tx)
544 .await?;
545
546 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
547 Ok(connection_ids)
548 })
549 .await
550 }
551
552 /// Starts the language server for the given connection.
553 pub async fn start_language_server(
554 &self,
555 update: &proto::StartLanguageServer,
556 connection: ConnectionId,
557 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
558 let project_id = ProjectId::from_proto(update.project_id);
559 self.project_transaction(project_id, |tx| async move {
560 let server = update.server.as_ref().context("invalid language server")?;
561
562 // Ensure the update comes from the host.
563 let project = project::Entity::find_by_id(project_id)
564 .one(&*tx)
565 .await?
566 .context("no such project")?;
567 if project.host_connection()? != connection {
568 return Err(anyhow!("can't update a project hosted by someone else"))?;
569 }
570
571 // Add the newly-started language server.
572 language_server::Entity::insert(language_server::ActiveModel {
573 project_id: ActiveValue::set(project_id),
574 id: ActiveValue::set(server.id as i64),
575 name: ActiveValue::set(server.name.clone()),
576 worktree_id: ActiveValue::set(server.worktree_id.map(|id| id as i64)),
577 capabilities: ActiveValue::set(update.capabilities.clone()),
578 })
579 .on_conflict(
580 OnConflict::columns([
581 language_server::Column::ProjectId,
582 language_server::Column::Id,
583 ])
584 .update_columns([
585 language_server::Column::Name,
586 language_server::Column::Capabilities,
587 language_server::Column::WorktreeId,
588 ])
589 .to_owned(),
590 )
591 .exec(&*tx)
592 .await?;
593
594 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
595 Ok(connection_ids)
596 })
597 .await
598 }
599
600 /// Updates the worktree settings for the given connection.
601 pub async fn update_worktree_settings(
602 &self,
603 update: &proto::UpdateWorktreeSettings,
604 connection: ConnectionId,
605 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
606 let project_id = ProjectId::from_proto(update.project_id);
607 let kind = match update.kind {
608 Some(kind) => proto::LocalSettingsKind::from_i32(kind)
609 .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
610 None => proto::LocalSettingsKind::Settings,
611 };
612 let kind = LocalSettingsKind::from_proto(kind);
613 self.project_transaction(project_id, |tx| async move {
614 // Ensure the update comes from the host.
615 let project = project::Entity::find_by_id(project_id)
616 .one(&*tx)
617 .await?
618 .context("no such project")?;
619 if project.host_connection()? != connection {
620 return Err(anyhow!("can't update a project hosted by someone else"))?;
621 }
622
623 if let Some(content) = &update.content {
624 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
625 project_id: ActiveValue::Set(project_id),
626 worktree_id: ActiveValue::Set(update.worktree_id as i64),
627 path: ActiveValue::Set(update.path.clone()),
628 content: ActiveValue::Set(content.clone()),
629 kind: ActiveValue::Set(kind),
630 outside_worktree: ActiveValue::Set(update.outside_worktree.unwrap_or(false)),
631 })
632 .on_conflict(
633 OnConflict::columns([
634 worktree_settings_file::Column::ProjectId,
635 worktree_settings_file::Column::WorktreeId,
636 worktree_settings_file::Column::Path,
637 ])
638 .update_columns([
639 worktree_settings_file::Column::Content,
640 worktree_settings_file::Column::OutsideWorktree,
641 ])
642 .to_owned(),
643 )
644 .exec(&*tx)
645 .await?;
646 } else {
647 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
648 project_id: ActiveValue::Set(project_id),
649 worktree_id: ActiveValue::Set(update.worktree_id as i64),
650 path: ActiveValue::Set(update.path.clone()),
651 ..Default::default()
652 })
653 .exec(&*tx)
654 .await?;
655 }
656
657 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
658 Ok(connection_ids)
659 })
660 .await
661 }
662
663 pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
664 self.transaction(|tx| async move {
665 Ok(project::Entity::find_by_id(id)
666 .one(&*tx)
667 .await?
668 .context("no such project")?)
669 })
670 .await
671 }
672
673 /// Adds the given connection to the specified project
674 /// in the current room.
675 pub async fn join_project(
676 &self,
677 project_id: ProjectId,
678 connection: ConnectionId,
679 user_id: UserId,
680 committer_name: Option<String>,
681 committer_email: Option<String>,
682 ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
683 self.project_transaction(project_id, move |tx| {
684 let committer_name = committer_name.clone();
685 let committer_email = committer_email.clone();
686 async move {
687 let (project, role) = self
688 .access_project(project_id, connection, Capability::ReadOnly, &tx)
689 .await?;
690 self.join_project_internal(
691 project,
692 user_id,
693 committer_name,
694 committer_email,
695 connection,
696 role,
697 &tx,
698 )
699 .await
700 }
701 })
702 .await
703 }
704
705 async fn join_project_internal(
706 &self,
707 project: project::Model,
708 user_id: UserId,
709 committer_name: Option<String>,
710 committer_email: Option<String>,
711 connection: ConnectionId,
712 role: ChannelRole,
713 tx: &DatabaseTransaction,
714 ) -> Result<(Project, ReplicaId)> {
715 let mut collaborators = project
716 .find_related(project_collaborator::Entity)
717 .all(tx)
718 .await?;
719 let replica_ids = collaborators
720 .iter()
721 .map(|c| c.replica_id)
722 .collect::<HashSet<_>>();
723 let mut replica_id = ReplicaId(clock::ReplicaId::FIRST_COLLAB_ID.as_u16() as i32);
724 while replica_ids.contains(&replica_id) {
725 replica_id.0 += 1;
726 }
727 let new_collaborator = project_collaborator::ActiveModel {
728 project_id: ActiveValue::set(project.id),
729 connection_id: ActiveValue::set(connection.id as i32),
730 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
731 user_id: ActiveValue::set(user_id),
732 replica_id: ActiveValue::set(replica_id),
733 is_host: ActiveValue::set(false),
734 id: ActiveValue::NotSet,
735 committer_name: ActiveValue::set(committer_name),
736 committer_email: ActiveValue::set(committer_email),
737 }
738 .insert(tx)
739 .await?;
740 collaborators.push(new_collaborator);
741
742 let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
743 let mut worktrees = db_worktrees
744 .into_iter()
745 .map(|db_worktree| {
746 (
747 db_worktree.id as u64,
748 Worktree {
749 id: db_worktree.id as u64,
750 abs_path: db_worktree.abs_path,
751 root_name: db_worktree.root_name,
752 visible: db_worktree.visible,
753 entries: Default::default(),
754 diagnostic_summaries: Default::default(),
755 settings_files: Default::default(),
756 scan_id: db_worktree.scan_id as u64,
757 completed_scan_id: db_worktree.completed_scan_id as u64,
758 legacy_repository_entries: Default::default(),
759 },
760 )
761 })
762 .collect::<BTreeMap<_, _>>();
763
764 // Populate worktree entries.
765 {
766 let mut db_entries = worktree_entry::Entity::find()
767 .filter(
768 Condition::all()
769 .add(worktree_entry::Column::ProjectId.eq(project.id))
770 .add(worktree_entry::Column::IsDeleted.eq(false)),
771 )
772 .stream(tx)
773 .await?;
774 while let Some(db_entry) = db_entries.next().await {
775 let db_entry = db_entry?;
776 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
777 worktree.entries.push(proto::Entry {
778 id: db_entry.id as u64,
779 is_dir: db_entry.is_dir,
780 path: db_entry.path,
781 inode: db_entry.inode as u64,
782 mtime: Some(proto::Timestamp {
783 seconds: db_entry.mtime_seconds as u64,
784 nanos: db_entry.mtime_nanos as u32,
785 }),
786 canonical_path: db_entry.canonical_path,
787 is_ignored: db_entry.is_ignored,
788 is_external: db_entry.is_external,
789 is_hidden: db_entry.is_hidden,
790 // This is only used in the summarization backlog, so if it's None,
791 // that just means we won't be able to detect when to resummarize
792 // based on total number of backlogged bytes - instead, we'd go
793 // on number of files only. That shouldn't be a huge deal in practice.
794 size: None,
795 is_fifo: db_entry.is_fifo,
796 });
797 }
798 }
799 }
800
801 // Populate repository entries.
802 let mut repositories = Vec::new();
803 {
804 let db_repository_entries = project_repository::Entity::find()
805 .filter(
806 Condition::all()
807 .add(project_repository::Column::ProjectId.eq(project.id))
808 .add(project_repository::Column::IsDeleted.eq(false)),
809 )
810 .all(tx)
811 .await?;
812 for db_repository_entry in db_repository_entries {
813 let mut repository_statuses = project_repository_statuses::Entity::find()
814 .filter(
815 Condition::all()
816 .add(project_repository_statuses::Column::ProjectId.eq(project.id))
817 .add(
818 project_repository_statuses::Column::RepositoryId
819 .eq(db_repository_entry.id),
820 )
821 .add(project_repository_statuses::Column::IsDeleted.eq(false)),
822 )
823 .stream(tx)
824 .await?;
825 let mut updated_statuses = Vec::new();
826 while let Some(status_entry) = repository_statuses.next().await {
827 let status_entry = status_entry?;
828 updated_statuses.push(db_status_to_proto(status_entry)?);
829 }
830
831 let current_merge_conflicts = db_repository_entry
832 .current_merge_conflicts
833 .as_ref()
834 .map(|conflicts| serde_json::from_str(conflicts))
835 .transpose()?
836 .unwrap_or_default();
837
838 let branch_summary = db_repository_entry
839 .branch_summary
840 .as_ref()
841 .map(|branch_summary| serde_json::from_str(branch_summary))
842 .transpose()?
843 .unwrap_or_default();
844
845 let head_commit_details = db_repository_entry
846 .head_commit_details
847 .as_ref()
848 .map(|head_commit_details| serde_json::from_str(head_commit_details))
849 .transpose()?
850 .unwrap_or_default();
851
852 let entry_ids = serde_json::from_str(&db_repository_entry.entry_ids)
853 .context("failed to deserialize repository's entry ids")?;
854
855 if let Some(worktree_id) = db_repository_entry.legacy_worktree_id {
856 if let Some(worktree) = worktrees.get_mut(&(worktree_id as u64)) {
857 worktree.legacy_repository_entries.insert(
858 db_repository_entry.id as u64,
859 proto::RepositoryEntry {
860 repository_id: db_repository_entry.id as u64,
861 updated_statuses,
862 removed_statuses: Vec::new(),
863 current_merge_conflicts,
864 branch_summary,
865 },
866 );
867 }
868 } else {
869 repositories.push(proto::UpdateRepository {
870 project_id: db_repository_entry.project_id.0 as u64,
871 id: db_repository_entry.id as u64,
872 abs_path: db_repository_entry.abs_path.clone(),
873 entry_ids,
874 updated_statuses,
875 removed_statuses: Vec::new(),
876 current_merge_conflicts,
877 branch_summary,
878 head_commit_details,
879 scan_id: db_repository_entry.scan_id as u64,
880 is_last_update: true,
881 merge_message: db_repository_entry.merge_message,
882 stash_entries: Vec::new(),
883 remote_upstream_url: db_repository_entry.remote_upstream_url.clone(),
884 remote_origin_url: db_repository_entry.remote_origin_url.clone(),
885 original_repo_abs_path: Some(db_repository_entry.abs_path),
886 });
887 }
888 }
889 }
890
891 // Populate worktree diagnostic summaries.
892 {
893 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
894 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
895 .stream(tx)
896 .await?;
897 while let Some(db_summary) = db_summaries.next().await {
898 let db_summary = db_summary?;
899 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
900 worktree
901 .diagnostic_summaries
902 .push(proto::DiagnosticSummary {
903 path: db_summary.path,
904 language_server_id: db_summary.language_server_id as u64,
905 error_count: db_summary.error_count as u32,
906 warning_count: db_summary.warning_count as u32,
907 });
908 }
909 }
910 }
911
912 // Populate worktree settings files
913 {
914 let mut db_settings_files = worktree_settings_file::Entity::find()
915 .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
916 .stream(tx)
917 .await?;
918 while let Some(db_settings_file) = db_settings_files.next().await {
919 let db_settings_file = db_settings_file?;
920 if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
921 worktree.settings_files.push(WorktreeSettingsFile {
922 path: db_settings_file.path,
923 content: db_settings_file.content,
924 kind: db_settings_file.kind,
925 outside_worktree: db_settings_file.outside_worktree,
926 });
927 }
928 }
929 }
930
931 // Populate language servers.
932 let language_servers = project
933 .find_related(language_server::Entity)
934 .all(tx)
935 .await?;
936
937 let path_style = if project.windows_paths {
938 PathStyle::Windows
939 } else {
940 PathStyle::Posix
941 };
942
943 let project = Project {
944 id: project.id,
945 role,
946 collaborators: collaborators
947 .into_iter()
948 .map(|collaborator| ProjectCollaborator {
949 connection_id: collaborator.connection(),
950 user_id: collaborator.user_id,
951 replica_id: collaborator.replica_id,
952 is_host: collaborator.is_host,
953 committer_name: collaborator.committer_name,
954 committer_email: collaborator.committer_email,
955 })
956 .collect(),
957 worktrees,
958 repositories,
959 language_servers: language_servers
960 .into_iter()
961 .map(|language_server| LanguageServer {
962 server: proto::LanguageServer {
963 id: language_server.id as u64,
964 name: language_server.name,
965 worktree_id: language_server.worktree_id.map(|id| id as u64),
966 },
967 capabilities: language_server.capabilities,
968 })
969 .collect(),
970 path_style,
971 };
972 Ok((project, replica_id as ReplicaId))
973 }
974
975 /// Removes the given connection from the specified project.
976 pub async fn leave_project(
977 &self,
978 project_id: ProjectId,
979 connection: ConnectionId,
980 ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
981 self.project_transaction(project_id, |tx| async move {
982 let result = project_collaborator::Entity::delete_many()
983 .filter(
984 Condition::all()
985 .add(project_collaborator::Column::ProjectId.eq(project_id))
986 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
987 .add(
988 project_collaborator::Column::ConnectionServerId
989 .eq(connection.owner_id as i32),
990 ),
991 )
992 .exec(&*tx)
993 .await?;
994 if result.rows_affected == 0 {
995 Err(anyhow!("not a collaborator on this project"))?;
996 }
997
998 let project = project::Entity::find_by_id(project_id)
999 .one(&*tx)
1000 .await?
1001 .context("no such project")?;
1002 let collaborators = project
1003 .find_related(project_collaborator::Entity)
1004 .all(&*tx)
1005 .await?;
1006 let connection_ids: Vec<ConnectionId> = collaborators
1007 .into_iter()
1008 .map(|collaborator| collaborator.connection())
1009 .collect();
1010
1011 follower::Entity::delete_many()
1012 .filter(
1013 Condition::any()
1014 .add(
1015 Condition::all()
1016 .add(follower::Column::ProjectId.eq(Some(project_id)))
1017 .add(
1018 follower::Column::LeaderConnectionServerId
1019 .eq(connection.owner_id),
1020 )
1021 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
1022 )
1023 .add(
1024 Condition::all()
1025 .add(follower::Column::ProjectId.eq(Some(project_id)))
1026 .add(
1027 follower::Column::FollowerConnectionServerId
1028 .eq(connection.owner_id),
1029 )
1030 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
1031 ),
1032 )
1033 .exec(&*tx)
1034 .await?;
1035
1036 let room = if let Some(room_id) = project.room_id {
1037 Some(self.get_room(room_id, &tx).await?)
1038 } else {
1039 None
1040 };
1041
1042 let left_project = LeftProject {
1043 id: project_id,
1044 should_unshare: connection == project.host_connection()?,
1045 connection_ids,
1046 };
1047 Ok((room, left_project))
1048 })
1049 .await
1050 }
1051
1052 pub async fn check_user_is_project_host(
1053 &self,
1054 project_id: ProjectId,
1055 connection_id: ConnectionId,
1056 ) -> Result<()> {
1057 self.project_transaction(project_id, |tx| async move {
1058 project::Entity::find()
1059 .filter(
1060 Condition::all()
1061 .add(project::Column::Id.eq(project_id))
1062 .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
1063 .add(
1064 project::Column::HostConnectionServerId
1065 .eq(Some(connection_id.owner_id as i32)),
1066 ),
1067 )
1068 .one(&*tx)
1069 .await?
1070 .context("failed to read project host")?;
1071
1072 Ok(())
1073 })
1074 .await
1075 .map(|guard| guard.into_inner())
1076 }
1077
1078 /// Returns the current project if the given user is authorized to access it with the specified capability.
1079 pub async fn access_project(
1080 &self,
1081 project_id: ProjectId,
1082 connection_id: ConnectionId,
1083 capability: Capability,
1084 tx: &DatabaseTransaction,
1085 ) -> Result<(project::Model, ChannelRole)> {
1086 let project = project::Entity::find_by_id(project_id)
1087 .one(tx)
1088 .await?
1089 .context("no such project")?;
1090
1091 let role_from_room = if let Some(room_id) = project.room_id {
1092 room_participant::Entity::find()
1093 .filter(room_participant::Column::RoomId.eq(room_id))
1094 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1095 .one(tx)
1096 .await?
1097 .and_then(|participant| participant.role)
1098 } else {
1099 None
1100 };
1101
1102 let role = role_from_room.unwrap_or(ChannelRole::Banned);
1103
1104 match capability {
1105 Capability::ReadWrite => {
1106 if !role.can_edit_projects() {
1107 return Err(anyhow!("not authorized to edit projects"))?;
1108 }
1109 }
1110 Capability::ReadOnly => {
1111 if !role.can_read_projects() {
1112 return Err(anyhow!("not authorized to read projects"))?;
1113 }
1114 }
1115 }
1116
1117 Ok((project, role))
1118 }
1119
1120 /// Returns the host connection for a read-only request to join a shared project.
1121 pub async fn host_for_read_only_project_request(
1122 &self,
1123 project_id: ProjectId,
1124 connection_id: ConnectionId,
1125 ) -> Result<ConnectionId> {
1126 self.project_transaction(project_id, |tx| async move {
1127 let (project, _) = self
1128 .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
1129 .await?;
1130 project.host_connection()
1131 })
1132 .await
1133 .map(|guard| guard.into_inner())
1134 }
1135
1136 /// Returns the host connection for a request to join a shared project.
1137 pub async fn host_for_mutating_project_request(
1138 &self,
1139 project_id: ProjectId,
1140 connection_id: ConnectionId,
1141 ) -> Result<ConnectionId> {
1142 self.project_transaction(project_id, |tx| async move {
1143 let (project, _) = self
1144 .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1145 .await?;
1146 project.host_connection()
1147 })
1148 .await
1149 .map(|guard| guard.into_inner())
1150 }
1151
1152 pub async fn connections_for_buffer_update(
1153 &self,
1154 project_id: ProjectId,
1155 connection_id: ConnectionId,
1156 capability: Capability,
1157 ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1158 self.project_transaction(project_id, |tx| async move {
1159 // Authorize
1160 let (project, _) = self
1161 .access_project(project_id, connection_id, capability, &tx)
1162 .await?;
1163
1164 let host_connection_id = project.host_connection()?;
1165
1166 let collaborators = project_collaborator::Entity::find()
1167 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1168 .all(&*tx)
1169 .await?;
1170
1171 let guest_connection_ids = collaborators
1172 .into_iter()
1173 .filter_map(|collaborator| {
1174 if collaborator.is_host {
1175 None
1176 } else {
1177 Some(collaborator.connection())
1178 }
1179 })
1180 .collect();
1181
1182 Ok((host_connection_id, guest_connection_ids))
1183 })
1184 .await
1185 }
1186
1187 /// Returns the connection IDs in the given project.
1188 ///
1189 /// The provided `connection_id` must also be a collaborator in the project,
1190 /// otherwise an error will be returned.
1191 pub async fn project_connection_ids(
1192 &self,
1193 project_id: ProjectId,
1194 connection_id: ConnectionId,
1195 exclude_dev_server: bool,
1196 ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1197 self.project_transaction(project_id, |tx| async move {
1198 self.internal_project_connection_ids(project_id, connection_id, exclude_dev_server, &tx)
1199 .await
1200 })
1201 .await
1202 }
1203
1204 async fn internal_project_connection_ids(
1205 &self,
1206 project_id: ProjectId,
1207 connection_id: ConnectionId,
1208 exclude_dev_server: bool,
1209 tx: &DatabaseTransaction,
1210 ) -> Result<HashSet<ConnectionId>> {
1211 let project = project::Entity::find_by_id(project_id)
1212 .one(tx)
1213 .await?
1214 .context("no such project")?;
1215
1216 let mut collaborators = project_collaborator::Entity::find()
1217 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1218 .stream(tx)
1219 .await?;
1220
1221 let mut connection_ids = HashSet::default();
1222 if let Some(host_connection) = project.host_connection().log_err()
1223 && !exclude_dev_server
1224 {
1225 connection_ids.insert(host_connection);
1226 }
1227
1228 while let Some(collaborator) = collaborators.next().await {
1229 let collaborator = collaborator?;
1230 connection_ids.insert(collaborator.connection());
1231 }
1232
1233 if connection_ids.contains(&connection_id)
1234 || Some(connection_id) == project.host_connection().ok()
1235 {
1236 Ok(connection_ids)
1237 } else {
1238 Err(anyhow!(
1239 "can only send project updates to a project you're in"
1240 ))?
1241 }
1242 }
1243
1244 async fn project_guest_connection_ids(
1245 &self,
1246 project_id: ProjectId,
1247 tx: &DatabaseTransaction,
1248 ) -> Result<Vec<ConnectionId>> {
1249 let mut collaborators = project_collaborator::Entity::find()
1250 .filter(
1251 project_collaborator::Column::ProjectId
1252 .eq(project_id)
1253 .and(project_collaborator::Column::IsHost.eq(false)),
1254 )
1255 .stream(tx)
1256 .await?;
1257
1258 let mut guest_connection_ids = Vec::new();
1259 while let Some(collaborator) = collaborators.next().await {
1260 let collaborator = collaborator?;
1261 guest_connection_ids.push(collaborator.connection());
1262 }
1263 Ok(guest_connection_ids)
1264 }
1265
1266 /// Returns the [`RoomId`] for the given project.
1267 pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1268 self.transaction(|tx| async move {
1269 Ok(project::Entity::find_by_id(project_id)
1270 .one(&*tx)
1271 .await?
1272 .and_then(|project| project.room_id))
1273 })
1274 .await
1275 }
1276
1277 pub async fn check_room_participants(
1278 &self,
1279 room_id: RoomId,
1280 leader_id: ConnectionId,
1281 follower_id: ConnectionId,
1282 ) -> Result<()> {
1283 self.transaction(|tx| async move {
1284 use room_participant::Column;
1285
1286 let count = room_participant::Entity::find()
1287 .filter(
1288 Condition::all().add(Column::RoomId.eq(room_id)).add(
1289 Condition::any()
1290 .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1291 Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1292 ))
1293 .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1294 Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1295 )),
1296 ),
1297 )
1298 .count(&*tx)
1299 .await?;
1300
1301 if count < 2 {
1302 Err(anyhow!("not room participants"))?;
1303 }
1304
1305 Ok(())
1306 })
1307 .await
1308 }
1309
1310 /// Adds the given follower connection as a follower of the given leader connection.
1311 pub async fn follow(
1312 &self,
1313 room_id: RoomId,
1314 project_id: ProjectId,
1315 leader_connection: ConnectionId,
1316 follower_connection: ConnectionId,
1317 ) -> Result<TransactionGuard<proto::Room>> {
1318 self.room_transaction(room_id, |tx| async move {
1319 follower::ActiveModel {
1320 room_id: ActiveValue::set(room_id),
1321 project_id: ActiveValue::set(project_id),
1322 leader_connection_server_id: ActiveValue::set(ServerId(
1323 leader_connection.owner_id as i32,
1324 )),
1325 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1326 follower_connection_server_id: ActiveValue::set(ServerId(
1327 follower_connection.owner_id as i32,
1328 )),
1329 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1330 ..Default::default()
1331 }
1332 .insert(&*tx)
1333 .await?;
1334
1335 let room = self.get_room(room_id, &tx).await?;
1336 Ok(room)
1337 })
1338 .await
1339 }
1340
1341 /// Removes the given follower connection as a follower of the given leader connection.
1342 pub async fn unfollow(
1343 &self,
1344 room_id: RoomId,
1345 project_id: ProjectId,
1346 leader_connection: ConnectionId,
1347 follower_connection: ConnectionId,
1348 ) -> Result<TransactionGuard<proto::Room>> {
1349 self.room_transaction(room_id, |tx| async move {
1350 follower::Entity::delete_many()
1351 .filter(
1352 Condition::all()
1353 .add(follower::Column::RoomId.eq(room_id))
1354 .add(follower::Column::ProjectId.eq(project_id))
1355 .add(
1356 follower::Column::LeaderConnectionServerId
1357 .eq(leader_connection.owner_id),
1358 )
1359 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1360 .add(
1361 follower::Column::FollowerConnectionServerId
1362 .eq(follower_connection.owner_id),
1363 )
1364 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1365 )
1366 .exec(&*tx)
1367 .await?;
1368
1369 let room = self.get_room(room_id, &tx).await?;
1370 Ok(room)
1371 })
1372 .await
1373 }
1374}