1use anyhow::Context as _;
2use util::ResultExt;
3
4use super::*;
5
6impl Database {
7 /// Returns the count of all projects, excluding ones marked as admin.
8 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
9 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
10 enum QueryAs {
11 Count,
12 }
13
14 self.transaction(|tx| async move {
15 Ok(project::Entity::find()
16 .select_only()
17 .column_as(project::Column::Id.count(), QueryAs::Count)
18 .inner_join(user::Entity)
19 .filter(user::Column::Admin.eq(false))
20 .into_values::<_, QueryAs>()
21 .one(&*tx)
22 .await?
23 .unwrap_or(0i64) as usize)
24 })
25 .await
26 }
27
28 /// Shares a project with the given room.
29 pub async fn share_project(
30 &self,
31 room_id: RoomId,
32 connection: ConnectionId,
33 worktrees: &[proto::WorktreeMetadata],
34 is_ssh_project: bool,
35 ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
36 self.room_transaction(room_id, |tx| async move {
37 let participant = room_participant::Entity::find()
38 .filter(
39 Condition::all()
40 .add(
41 room_participant::Column::AnsweringConnectionId
42 .eq(connection.id as i32),
43 )
44 .add(
45 room_participant::Column::AnsweringConnectionServerId
46 .eq(connection.owner_id as i32),
47 ),
48 )
49 .one(&*tx)
50 .await?
51 .ok_or_else(|| anyhow!("could not find participant"))?;
52 if participant.room_id != room_id {
53 return Err(anyhow!("shared project on unexpected room"))?;
54 }
55 if !participant
56 .role
57 .unwrap_or(ChannelRole::Member)
58 .can_edit_projects()
59 {
60 return Err(anyhow!("guests cannot share projects"))?;
61 }
62
63 let project = project::ActiveModel {
64 room_id: ActiveValue::set(Some(participant.room_id)),
65 host_user_id: ActiveValue::set(Some(participant.user_id)),
66 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
67 host_connection_server_id: ActiveValue::set(Some(ServerId(
68 connection.owner_id as i32,
69 ))),
70 id: ActiveValue::NotSet,
71 hosted_project_id: ActiveValue::Set(None),
72 }
73 .insert(&*tx)
74 .await?;
75
76 if !worktrees.is_empty() {
77 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
78 worktree::ActiveModel {
79 id: ActiveValue::set(worktree.id as i64),
80 project_id: ActiveValue::set(project.id),
81 abs_path: ActiveValue::set(worktree.abs_path.clone()),
82 root_name: ActiveValue::set(worktree.root_name.clone()),
83 visible: ActiveValue::set(worktree.visible),
84 scan_id: ActiveValue::set(0),
85 completed_scan_id: ActiveValue::set(0),
86 }
87 }))
88 .exec(&*tx)
89 .await?;
90 }
91
92 let replica_id = if is_ssh_project { 1 } else { 0 };
93
94 project_collaborator::ActiveModel {
95 project_id: ActiveValue::set(project.id),
96 connection_id: ActiveValue::set(connection.id as i32),
97 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
98 user_id: ActiveValue::set(participant.user_id),
99 replica_id: ActiveValue::set(ReplicaId(replica_id)),
100 is_host: ActiveValue::set(true),
101 ..Default::default()
102 }
103 .insert(&*tx)
104 .await?;
105
106 let room = self.get_room(room_id, &tx).await?;
107 Ok((project.id, room))
108 })
109 .await
110 }
111
112 pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
113 self.weak_transaction(|tx| async move {
114 project::Entity::delete_by_id(project_id).exec(&*tx).await?;
115 Ok(())
116 })
117 .await
118 }
119
120 /// Unshares the given project.
121 pub async fn unshare_project(
122 &self,
123 project_id: ProjectId,
124 connection: ConnectionId,
125 ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
126 self.project_transaction(project_id, |tx| async move {
127 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
128 let project = project::Entity::find_by_id(project_id)
129 .one(&*tx)
130 .await?
131 .ok_or_else(|| anyhow!("project not found"))?;
132 let room = if let Some(room_id) = project.room_id {
133 Some(self.get_room(room_id, &tx).await?)
134 } else {
135 None
136 };
137 if project.host_connection()? == connection {
138 return Ok((true, room, guest_connection_ids));
139 }
140 Err(anyhow!("cannot unshare a project hosted by another user"))?
141 })
142 .await
143 }
144
145 /// Updates the worktrees associated with the given project.
146 pub async fn update_project(
147 &self,
148 project_id: ProjectId,
149 connection: ConnectionId,
150 worktrees: &[proto::WorktreeMetadata],
151 ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
152 self.project_transaction(project_id, |tx| async move {
153 let project = project::Entity::find_by_id(project_id)
154 .filter(
155 Condition::all()
156 .add(project::Column::HostConnectionId.eq(connection.id as i32))
157 .add(
158 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
159 ),
160 )
161 .one(&*tx)
162 .await?
163 .ok_or_else(|| anyhow!("no such project"))?;
164
165 self.update_project_worktrees(project.id, worktrees, &tx)
166 .await?;
167
168 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
169
170 let room = if let Some(room_id) = project.room_id {
171 Some(self.get_room(room_id, &tx).await?)
172 } else {
173 None
174 };
175
176 Ok((room, guest_connection_ids))
177 })
178 .await
179 }
180
181 pub(in crate::db) async fn update_project_worktrees(
182 &self,
183 project_id: ProjectId,
184 worktrees: &[proto::WorktreeMetadata],
185 tx: &DatabaseTransaction,
186 ) -> Result<()> {
187 if !worktrees.is_empty() {
188 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
189 id: ActiveValue::set(worktree.id as i64),
190 project_id: ActiveValue::set(project_id),
191 abs_path: ActiveValue::set(worktree.abs_path.clone()),
192 root_name: ActiveValue::set(worktree.root_name.clone()),
193 visible: ActiveValue::set(worktree.visible),
194 scan_id: ActiveValue::set(0),
195 completed_scan_id: ActiveValue::set(0),
196 }))
197 .on_conflict(
198 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
199 .update_column(worktree::Column::RootName)
200 .to_owned(),
201 )
202 .exec(tx)
203 .await?;
204 }
205
206 worktree::Entity::delete_many()
207 .filter(worktree::Column::ProjectId.eq(project_id).and(
208 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
209 ))
210 .exec(tx)
211 .await?;
212
213 Ok(())
214 }
215
216 pub async fn update_worktree(
217 &self,
218 update: &proto::UpdateWorktree,
219 connection: ConnectionId,
220 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
221 if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
222 || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
223 {
224 return Err(anyhow!(
225 "invalid worktree update. removed entries: {}, updated entries: {}",
226 update.removed_entries.len(),
227 update.updated_entries.len()
228 ))?;
229 }
230
231 let project_id = ProjectId::from_proto(update.project_id);
232 let worktree_id = update.worktree_id as i64;
233 self.project_transaction(project_id, |tx| async move {
234 // Ensure the update comes from the host.
235 let _project = project::Entity::find_by_id(project_id)
236 .filter(
237 Condition::all()
238 .add(project::Column::HostConnectionId.eq(connection.id as i32))
239 .add(
240 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
241 ),
242 )
243 .one(&*tx)
244 .await?
245 .ok_or_else(|| anyhow!("no such project: {project_id}"))?;
246
247 // Update metadata.
248 worktree::Entity::update(worktree::ActiveModel {
249 id: ActiveValue::set(worktree_id),
250 project_id: ActiveValue::set(project_id),
251 root_name: ActiveValue::set(update.root_name.clone()),
252 scan_id: ActiveValue::set(update.scan_id as i64),
253 completed_scan_id: if update.is_last_update {
254 ActiveValue::set(update.scan_id as i64)
255 } else {
256 ActiveValue::default()
257 },
258 abs_path: ActiveValue::set(update.abs_path.clone()),
259 ..Default::default()
260 })
261 .exec(&*tx)
262 .await?;
263
264 if !update.updated_entries.is_empty() {
265 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
266 let mtime = entry.mtime.clone().unwrap_or_default();
267 worktree_entry::ActiveModel {
268 project_id: ActiveValue::set(project_id),
269 worktree_id: ActiveValue::set(worktree_id),
270 id: ActiveValue::set(entry.id as i64),
271 is_dir: ActiveValue::set(entry.is_dir),
272 path: ActiveValue::set(entry.path.clone()),
273 inode: ActiveValue::set(entry.inode as i64),
274 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
275 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
276 canonical_path: ActiveValue::set(entry.canonical_path.clone()),
277 is_ignored: ActiveValue::set(entry.is_ignored),
278 is_external: ActiveValue::set(entry.is_external),
279 git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
280 is_deleted: ActiveValue::set(false),
281 scan_id: ActiveValue::set(update.scan_id as i64),
282 is_fifo: ActiveValue::set(entry.is_fifo),
283 }
284 }))
285 .on_conflict(
286 OnConflict::columns([
287 worktree_entry::Column::ProjectId,
288 worktree_entry::Column::WorktreeId,
289 worktree_entry::Column::Id,
290 ])
291 .update_columns([
292 worktree_entry::Column::IsDir,
293 worktree_entry::Column::Path,
294 worktree_entry::Column::Inode,
295 worktree_entry::Column::MtimeSeconds,
296 worktree_entry::Column::MtimeNanos,
297 worktree_entry::Column::CanonicalPath,
298 worktree_entry::Column::IsIgnored,
299 worktree_entry::Column::GitStatus,
300 worktree_entry::Column::ScanId,
301 ])
302 .to_owned(),
303 )
304 .exec(&*tx)
305 .await?;
306 }
307
308 if !update.removed_entries.is_empty() {
309 worktree_entry::Entity::update_many()
310 .filter(
311 worktree_entry::Column::ProjectId
312 .eq(project_id)
313 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
314 .and(
315 worktree_entry::Column::Id
316 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
317 ),
318 )
319 .set(worktree_entry::ActiveModel {
320 is_deleted: ActiveValue::Set(true),
321 scan_id: ActiveValue::Set(update.scan_id as i64),
322 ..Default::default()
323 })
324 .exec(&*tx)
325 .await?;
326 }
327
328 if !update.updated_repositories.is_empty() {
329 worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
330 |repository| worktree_repository::ActiveModel {
331 project_id: ActiveValue::set(project_id),
332 worktree_id: ActiveValue::set(worktree_id),
333 work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
334 scan_id: ActiveValue::set(update.scan_id as i64),
335 branch: ActiveValue::set(repository.branch.clone()),
336 is_deleted: ActiveValue::set(false),
337 },
338 ))
339 .on_conflict(
340 OnConflict::columns([
341 worktree_repository::Column::ProjectId,
342 worktree_repository::Column::WorktreeId,
343 worktree_repository::Column::WorkDirectoryId,
344 ])
345 .update_columns([
346 worktree_repository::Column::ScanId,
347 worktree_repository::Column::Branch,
348 ])
349 .to_owned(),
350 )
351 .exec(&*tx)
352 .await?;
353 }
354
355 if !update.removed_repositories.is_empty() {
356 worktree_repository::Entity::update_many()
357 .filter(
358 worktree_repository::Column::ProjectId
359 .eq(project_id)
360 .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
361 .and(
362 worktree_repository::Column::WorkDirectoryId
363 .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
364 ),
365 )
366 .set(worktree_repository::ActiveModel {
367 is_deleted: ActiveValue::Set(true),
368 scan_id: ActiveValue::Set(update.scan_id as i64),
369 ..Default::default()
370 })
371 .exec(&*tx)
372 .await?;
373 }
374
375 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
376 Ok(connection_ids)
377 })
378 .await
379 }
380
381 /// Updates the diagnostic summary for the given connection.
382 pub async fn update_diagnostic_summary(
383 &self,
384 update: &proto::UpdateDiagnosticSummary,
385 connection: ConnectionId,
386 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
387 let project_id = ProjectId::from_proto(update.project_id);
388 let worktree_id = update.worktree_id as i64;
389 self.project_transaction(project_id, |tx| async move {
390 let summary = update
391 .summary
392 .as_ref()
393 .ok_or_else(|| anyhow!("invalid summary"))?;
394
395 // Ensure the update comes from the host.
396 let project = project::Entity::find_by_id(project_id)
397 .one(&*tx)
398 .await?
399 .ok_or_else(|| anyhow!("no such project"))?;
400 if project.host_connection()? != connection {
401 return Err(anyhow!("can't update a project hosted by someone else"))?;
402 }
403
404 // Update summary.
405 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
406 project_id: ActiveValue::set(project_id),
407 worktree_id: ActiveValue::set(worktree_id),
408 path: ActiveValue::set(summary.path.clone()),
409 language_server_id: ActiveValue::set(summary.language_server_id as i64),
410 error_count: ActiveValue::set(summary.error_count as i32),
411 warning_count: ActiveValue::set(summary.warning_count as i32),
412 })
413 .on_conflict(
414 OnConflict::columns([
415 worktree_diagnostic_summary::Column::ProjectId,
416 worktree_diagnostic_summary::Column::WorktreeId,
417 worktree_diagnostic_summary::Column::Path,
418 ])
419 .update_columns([
420 worktree_diagnostic_summary::Column::LanguageServerId,
421 worktree_diagnostic_summary::Column::ErrorCount,
422 worktree_diagnostic_summary::Column::WarningCount,
423 ])
424 .to_owned(),
425 )
426 .exec(&*tx)
427 .await?;
428
429 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
430 Ok(connection_ids)
431 })
432 .await
433 }
434
435 /// Starts the language server for the given connection.
436 pub async fn start_language_server(
437 &self,
438 update: &proto::StartLanguageServer,
439 connection: ConnectionId,
440 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
441 let project_id = ProjectId::from_proto(update.project_id);
442 self.project_transaction(project_id, |tx| async move {
443 let server = update
444 .server
445 .as_ref()
446 .ok_or_else(|| anyhow!("invalid language server"))?;
447
448 // Ensure the update comes from the host.
449 let project = project::Entity::find_by_id(project_id)
450 .one(&*tx)
451 .await?
452 .ok_or_else(|| anyhow!("no such project"))?;
453 if project.host_connection()? != connection {
454 return Err(anyhow!("can't update a project hosted by someone else"))?;
455 }
456
457 // Add the newly-started language server.
458 language_server::Entity::insert(language_server::ActiveModel {
459 project_id: ActiveValue::set(project_id),
460 id: ActiveValue::set(server.id as i64),
461 name: ActiveValue::set(server.name.clone()),
462 })
463 .on_conflict(
464 OnConflict::columns([
465 language_server::Column::ProjectId,
466 language_server::Column::Id,
467 ])
468 .update_column(language_server::Column::Name)
469 .to_owned(),
470 )
471 .exec(&*tx)
472 .await?;
473
474 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
475 Ok(connection_ids)
476 })
477 .await
478 }
479
480 /// Updates the worktree settings for the given connection.
481 pub async fn update_worktree_settings(
482 &self,
483 update: &proto::UpdateWorktreeSettings,
484 connection: ConnectionId,
485 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
486 let project_id = ProjectId::from_proto(update.project_id);
487 let kind = match update.kind {
488 Some(kind) => proto::LocalSettingsKind::from_i32(kind)
489 .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
490 None => proto::LocalSettingsKind::Settings,
491 };
492 let kind = LocalSettingsKind::from_proto(kind);
493 self.project_transaction(project_id, |tx| async move {
494 // Ensure the update comes from the host.
495 let project = project::Entity::find_by_id(project_id)
496 .one(&*tx)
497 .await?
498 .ok_or_else(|| anyhow!("no such project"))?;
499 if project.host_connection()? != connection {
500 return Err(anyhow!("can't update a project hosted by someone else"))?;
501 }
502
503 if let Some(content) = &update.content {
504 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
505 project_id: ActiveValue::Set(project_id),
506 worktree_id: ActiveValue::Set(update.worktree_id as i64),
507 path: ActiveValue::Set(update.path.clone()),
508 content: ActiveValue::Set(content.clone()),
509 kind: ActiveValue::Set(kind),
510 })
511 .on_conflict(
512 OnConflict::columns([
513 worktree_settings_file::Column::ProjectId,
514 worktree_settings_file::Column::WorktreeId,
515 worktree_settings_file::Column::Path,
516 ])
517 .update_column(worktree_settings_file::Column::Content)
518 .to_owned(),
519 )
520 .exec(&*tx)
521 .await?;
522 } else {
523 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
524 project_id: ActiveValue::Set(project_id),
525 worktree_id: ActiveValue::Set(update.worktree_id as i64),
526 path: ActiveValue::Set(update.path.clone()),
527 ..Default::default()
528 })
529 .exec(&*tx)
530 .await?;
531 }
532
533 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
534 Ok(connection_ids)
535 })
536 .await
537 }
538
539 /// Adds the given connection to the specified hosted project
540 pub async fn join_hosted_project(
541 &self,
542 id: ProjectId,
543 user_id: UserId,
544 connection: ConnectionId,
545 ) -> Result<(Project, ReplicaId)> {
546 self.transaction(|tx| async move {
547 let (project, hosted_project) = project::Entity::find_by_id(id)
548 .find_also_related(hosted_project::Entity)
549 .one(&*tx)
550 .await?
551 .ok_or_else(|| anyhow!("hosted project is no longer shared"))?;
552
553 let Some(hosted_project) = hosted_project else {
554 return Err(anyhow!("project is not hosted"))?;
555 };
556
557 let channel = channel::Entity::find_by_id(hosted_project.channel_id)
558 .one(&*tx)
559 .await?
560 .ok_or_else(|| anyhow!("no such channel"))?;
561
562 let role = self
563 .check_user_is_channel_participant(&channel, user_id, &tx)
564 .await?;
565
566 self.join_project_internal(project, user_id, connection, role, &tx)
567 .await
568 })
569 .await
570 }
571
572 pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
573 self.transaction(|tx| async move {
574 Ok(project::Entity::find_by_id(id)
575 .one(&*tx)
576 .await?
577 .ok_or_else(|| anyhow!("no such project"))?)
578 })
579 .await
580 }
581
582 /// Adds the given connection to the specified project
583 /// in the current room.
584 pub async fn join_project(
585 &self,
586 project_id: ProjectId,
587 connection: ConnectionId,
588 user_id: UserId,
589 ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
590 self.project_transaction(project_id, |tx| async move {
591 let (project, role) = self
592 .access_project(project_id, connection, Capability::ReadOnly, &tx)
593 .await?;
594 self.join_project_internal(project, user_id, connection, role, &tx)
595 .await
596 })
597 .await
598 }
599
600 async fn join_project_internal(
601 &self,
602 project: project::Model,
603 user_id: UserId,
604 connection: ConnectionId,
605 role: ChannelRole,
606 tx: &DatabaseTransaction,
607 ) -> Result<(Project, ReplicaId)> {
608 let mut collaborators = project
609 .find_related(project_collaborator::Entity)
610 .all(tx)
611 .await?;
612 let replica_ids = collaborators
613 .iter()
614 .map(|c| c.replica_id)
615 .collect::<HashSet<_>>();
616 let mut replica_id = ReplicaId(1);
617 while replica_ids.contains(&replica_id) {
618 replica_id.0 += 1;
619 }
620 let new_collaborator = project_collaborator::ActiveModel {
621 project_id: ActiveValue::set(project.id),
622 connection_id: ActiveValue::set(connection.id as i32),
623 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
624 user_id: ActiveValue::set(user_id),
625 replica_id: ActiveValue::set(replica_id),
626 is_host: ActiveValue::set(false),
627 ..Default::default()
628 }
629 .insert(tx)
630 .await?;
631 collaborators.push(new_collaborator);
632
633 let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
634 let mut worktrees = db_worktrees
635 .into_iter()
636 .map(|db_worktree| {
637 (
638 db_worktree.id as u64,
639 Worktree {
640 id: db_worktree.id as u64,
641 abs_path: db_worktree.abs_path,
642 root_name: db_worktree.root_name,
643 visible: db_worktree.visible,
644 entries: Default::default(),
645 repository_entries: Default::default(),
646 diagnostic_summaries: Default::default(),
647 settings_files: Default::default(),
648 scan_id: db_worktree.scan_id as u64,
649 completed_scan_id: db_worktree.completed_scan_id as u64,
650 },
651 )
652 })
653 .collect::<BTreeMap<_, _>>();
654
655 // Populate worktree entries.
656 {
657 let mut db_entries = worktree_entry::Entity::find()
658 .filter(
659 Condition::all()
660 .add(worktree_entry::Column::ProjectId.eq(project.id))
661 .add(worktree_entry::Column::IsDeleted.eq(false)),
662 )
663 .stream(tx)
664 .await?;
665 while let Some(db_entry) = db_entries.next().await {
666 let db_entry = db_entry?;
667 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
668 worktree.entries.push(proto::Entry {
669 id: db_entry.id as u64,
670 is_dir: db_entry.is_dir,
671 path: db_entry.path,
672 inode: db_entry.inode as u64,
673 mtime: Some(proto::Timestamp {
674 seconds: db_entry.mtime_seconds as u64,
675 nanos: db_entry.mtime_nanos as u32,
676 }),
677 canonical_path: db_entry.canonical_path,
678 is_ignored: db_entry.is_ignored,
679 is_external: db_entry.is_external,
680 git_status: db_entry.git_status.map(|status| status as i32),
681 // This is only used in the summarization backlog, so if it's None,
682 // that just means we won't be able to detect when to resummarize
683 // based on total number of backlogged bytes - instead, we'd go
684 // on number of files only. That shouldn't be a huge deal in practice.
685 size: None,
686 is_fifo: db_entry.is_fifo,
687 });
688 }
689 }
690 }
691
692 // Populate repository entries.
693 {
694 let mut db_repository_entries = worktree_repository::Entity::find()
695 .filter(
696 Condition::all()
697 .add(worktree_repository::Column::ProjectId.eq(project.id))
698 .add(worktree_repository::Column::IsDeleted.eq(false)),
699 )
700 .stream(tx)
701 .await?;
702 while let Some(db_repository_entry) = db_repository_entries.next().await {
703 let db_repository_entry = db_repository_entry?;
704 if let Some(worktree) = worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
705 {
706 worktree.repository_entries.insert(
707 db_repository_entry.work_directory_id as u64,
708 proto::RepositoryEntry {
709 work_directory_id: db_repository_entry.work_directory_id as u64,
710 branch: db_repository_entry.branch,
711 },
712 );
713 }
714 }
715 }
716
717 // Populate worktree diagnostic summaries.
718 {
719 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
720 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
721 .stream(tx)
722 .await?;
723 while let Some(db_summary) = db_summaries.next().await {
724 let db_summary = db_summary?;
725 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
726 worktree
727 .diagnostic_summaries
728 .push(proto::DiagnosticSummary {
729 path: db_summary.path,
730 language_server_id: db_summary.language_server_id as u64,
731 error_count: db_summary.error_count as u32,
732 warning_count: db_summary.warning_count as u32,
733 });
734 }
735 }
736 }
737
738 // Populate worktree settings files
739 {
740 let mut db_settings_files = worktree_settings_file::Entity::find()
741 .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
742 .stream(tx)
743 .await?;
744 while let Some(db_settings_file) = db_settings_files.next().await {
745 let db_settings_file = db_settings_file?;
746 if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
747 worktree.settings_files.push(WorktreeSettingsFile {
748 path: db_settings_file.path,
749 content: db_settings_file.content,
750 kind: db_settings_file.kind,
751 });
752 }
753 }
754 }
755
756 // Populate language servers.
757 let language_servers = project
758 .find_related(language_server::Entity)
759 .all(tx)
760 .await?;
761
762 let project = Project {
763 id: project.id,
764 role,
765 collaborators: collaborators
766 .into_iter()
767 .map(|collaborator| ProjectCollaborator {
768 connection_id: collaborator.connection(),
769 user_id: collaborator.user_id,
770 replica_id: collaborator.replica_id,
771 is_host: collaborator.is_host,
772 })
773 .collect(),
774 worktrees,
775 language_servers: language_servers
776 .into_iter()
777 .map(|language_server| proto::LanguageServer {
778 id: language_server.id as u64,
779 name: language_server.name,
780 worktree_id: None,
781 })
782 .collect(),
783 };
784 Ok((project, replica_id as ReplicaId))
785 }
786
787 pub async fn leave_hosted_project(
788 &self,
789 project_id: ProjectId,
790 connection: ConnectionId,
791 ) -> Result<LeftProject> {
792 self.transaction(|tx| async move {
793 let result = project_collaborator::Entity::delete_many()
794 .filter(
795 Condition::all()
796 .add(project_collaborator::Column::ProjectId.eq(project_id))
797 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
798 .add(
799 project_collaborator::Column::ConnectionServerId
800 .eq(connection.owner_id as i32),
801 ),
802 )
803 .exec(&*tx)
804 .await?;
805 if result.rows_affected == 0 {
806 return Err(anyhow!("not in the project"))?;
807 }
808
809 let project = project::Entity::find_by_id(project_id)
810 .one(&*tx)
811 .await?
812 .ok_or_else(|| anyhow!("no such project"))?;
813 let collaborators = project
814 .find_related(project_collaborator::Entity)
815 .all(&*tx)
816 .await?;
817 let connection_ids = collaborators
818 .into_iter()
819 .map(|collaborator| collaborator.connection())
820 .collect();
821 Ok(LeftProject {
822 id: project.id,
823 connection_ids,
824 should_unshare: false,
825 })
826 })
827 .await
828 }
829
830 /// Removes the given connection from the specified project.
831 pub async fn leave_project(
832 &self,
833 project_id: ProjectId,
834 connection: ConnectionId,
835 ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
836 self.project_transaction(project_id, |tx| async move {
837 let result = project_collaborator::Entity::delete_many()
838 .filter(
839 Condition::all()
840 .add(project_collaborator::Column::ProjectId.eq(project_id))
841 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
842 .add(
843 project_collaborator::Column::ConnectionServerId
844 .eq(connection.owner_id as i32),
845 ),
846 )
847 .exec(&*tx)
848 .await?;
849 if result.rows_affected == 0 {
850 Err(anyhow!("not a collaborator on this project"))?;
851 }
852
853 let project = project::Entity::find_by_id(project_id)
854 .one(&*tx)
855 .await?
856 .ok_or_else(|| anyhow!("no such project"))?;
857 let collaborators = project
858 .find_related(project_collaborator::Entity)
859 .all(&*tx)
860 .await?;
861 let connection_ids: Vec<ConnectionId> = collaborators
862 .into_iter()
863 .map(|collaborator| collaborator.connection())
864 .collect();
865
866 follower::Entity::delete_many()
867 .filter(
868 Condition::any()
869 .add(
870 Condition::all()
871 .add(follower::Column::ProjectId.eq(Some(project_id)))
872 .add(
873 follower::Column::LeaderConnectionServerId
874 .eq(connection.owner_id),
875 )
876 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
877 )
878 .add(
879 Condition::all()
880 .add(follower::Column::ProjectId.eq(Some(project_id)))
881 .add(
882 follower::Column::FollowerConnectionServerId
883 .eq(connection.owner_id),
884 )
885 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
886 ),
887 )
888 .exec(&*tx)
889 .await?;
890
891 let room = if let Some(room_id) = project.room_id {
892 Some(self.get_room(room_id, &tx).await?)
893 } else {
894 None
895 };
896
897 let left_project = LeftProject {
898 id: project_id,
899 should_unshare: connection == project.host_connection()?,
900 connection_ids,
901 };
902 Ok((room, left_project))
903 })
904 .await
905 }
906
907 pub async fn check_user_is_project_host(
908 &self,
909 project_id: ProjectId,
910 connection_id: ConnectionId,
911 ) -> Result<()> {
912 self.project_transaction(project_id, |tx| async move {
913 project::Entity::find()
914 .filter(
915 Condition::all()
916 .add(project::Column::Id.eq(project_id))
917 .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
918 .add(
919 project::Column::HostConnectionServerId
920 .eq(Some(connection_id.owner_id as i32)),
921 ),
922 )
923 .one(&*tx)
924 .await?
925 .ok_or_else(|| anyhow!("failed to read project host"))?;
926
927 Ok(())
928 })
929 .await
930 .map(|guard| guard.into_inner())
931 }
932
933 /// Returns the current project if the given user is authorized to access it with the specified capability.
934 pub async fn access_project(
935 &self,
936 project_id: ProjectId,
937 connection_id: ConnectionId,
938 capability: Capability,
939 tx: &DatabaseTransaction,
940 ) -> Result<(project::Model, ChannelRole)> {
941 let project = project::Entity::find_by_id(project_id)
942 .one(tx)
943 .await?
944 .ok_or_else(|| anyhow!("no such project"))?;
945
946 let role_from_room = if let Some(room_id) = project.room_id {
947 room_participant::Entity::find()
948 .filter(room_participant::Column::RoomId.eq(room_id))
949 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
950 .one(tx)
951 .await?
952 .and_then(|participant| participant.role)
953 } else {
954 None
955 };
956
957 let role = role_from_room.unwrap_or(ChannelRole::Banned);
958
959 match capability {
960 Capability::ReadWrite => {
961 if !role.can_edit_projects() {
962 return Err(anyhow!("not authorized to edit projects"))?;
963 }
964 }
965 Capability::ReadOnly => {
966 if !role.can_read_projects() {
967 return Err(anyhow!("not authorized to read projects"))?;
968 }
969 }
970 }
971
972 Ok((project, role))
973 }
974
975 /// Returns the host connection for a read-only request to join a shared project.
976 pub async fn host_for_read_only_project_request(
977 &self,
978 project_id: ProjectId,
979 connection_id: ConnectionId,
980 ) -> Result<ConnectionId> {
981 self.project_transaction(project_id, |tx| async move {
982 let (project, _) = self
983 .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
984 .await?;
985 project.host_connection()
986 })
987 .await
988 .map(|guard| guard.into_inner())
989 }
990
991 /// Returns the host connection for a request to join a shared project.
992 pub async fn host_for_mutating_project_request(
993 &self,
994 project_id: ProjectId,
995 connection_id: ConnectionId,
996 ) -> Result<ConnectionId> {
997 self.project_transaction(project_id, |tx| async move {
998 let (project, _) = self
999 .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
1000 .await?;
1001 project.host_connection()
1002 })
1003 .await
1004 .map(|guard| guard.into_inner())
1005 }
1006
1007 pub async fn connections_for_buffer_update(
1008 &self,
1009 project_id: ProjectId,
1010 connection_id: ConnectionId,
1011 capability: Capability,
1012 ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1013 self.project_transaction(project_id, |tx| async move {
1014 // Authorize
1015 let (project, _) = self
1016 .access_project(project_id, connection_id, capability, &tx)
1017 .await?;
1018
1019 let host_connection_id = project.host_connection()?;
1020
1021 let collaborators = project_collaborator::Entity::find()
1022 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1023 .all(&*tx)
1024 .await?;
1025
1026 let guest_connection_ids = collaborators
1027 .into_iter()
1028 .filter_map(|collaborator| {
1029 if collaborator.is_host {
1030 None
1031 } else {
1032 Some(collaborator.connection())
1033 }
1034 })
1035 .collect();
1036
1037 Ok((host_connection_id, guest_connection_ids))
1038 })
1039 .await
1040 }
1041
1042 /// Returns the connection IDs in the given project.
1043 ///
1044 /// The provided `connection_id` must also be a collaborator in the project,
1045 /// otherwise an error will be returned.
1046 pub async fn project_connection_ids(
1047 &self,
1048 project_id: ProjectId,
1049 connection_id: ConnectionId,
1050 exclude_dev_server: bool,
1051 ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1052 self.project_transaction(project_id, |tx| async move {
1053 let project = project::Entity::find_by_id(project_id)
1054 .one(&*tx)
1055 .await?
1056 .ok_or_else(|| anyhow!("no such project"))?;
1057
1058 let mut collaborators = project_collaborator::Entity::find()
1059 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1060 .stream(&*tx)
1061 .await?;
1062
1063 let mut connection_ids = HashSet::default();
1064 if let Some(host_connection) = project.host_connection().log_err() {
1065 if !exclude_dev_server {
1066 connection_ids.insert(host_connection);
1067 }
1068 }
1069
1070 while let Some(collaborator) = collaborators.next().await {
1071 let collaborator = collaborator?;
1072 connection_ids.insert(collaborator.connection());
1073 }
1074
1075 if connection_ids.contains(&connection_id)
1076 || Some(connection_id) == project.host_connection().ok()
1077 {
1078 Ok(connection_ids)
1079 } else {
1080 Err(anyhow!(
1081 "can only send project updates to a project you're in"
1082 ))?
1083 }
1084 })
1085 .await
1086 }
1087
1088 async fn project_guest_connection_ids(
1089 &self,
1090 project_id: ProjectId,
1091 tx: &DatabaseTransaction,
1092 ) -> Result<Vec<ConnectionId>> {
1093 let mut collaborators = project_collaborator::Entity::find()
1094 .filter(
1095 project_collaborator::Column::ProjectId
1096 .eq(project_id)
1097 .and(project_collaborator::Column::IsHost.eq(false)),
1098 )
1099 .stream(tx)
1100 .await?;
1101
1102 let mut guest_connection_ids = Vec::new();
1103 while let Some(collaborator) = collaborators.next().await {
1104 let collaborator = collaborator?;
1105 guest_connection_ids.push(collaborator.connection());
1106 }
1107 Ok(guest_connection_ids)
1108 }
1109
1110 /// Returns the [`RoomId`] for the given project.
1111 pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1112 self.transaction(|tx| async move {
1113 Ok(project::Entity::find_by_id(project_id)
1114 .one(&*tx)
1115 .await?
1116 .and_then(|project| project.room_id))
1117 })
1118 .await
1119 }
1120
1121 pub async fn check_room_participants(
1122 &self,
1123 room_id: RoomId,
1124 leader_id: ConnectionId,
1125 follower_id: ConnectionId,
1126 ) -> Result<()> {
1127 self.transaction(|tx| async move {
1128 use room_participant::Column;
1129
1130 let count = room_participant::Entity::find()
1131 .filter(
1132 Condition::all().add(Column::RoomId.eq(room_id)).add(
1133 Condition::any()
1134 .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1135 Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1136 ))
1137 .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1138 Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1139 )),
1140 ),
1141 )
1142 .count(&*tx)
1143 .await?;
1144
1145 if count < 2 {
1146 Err(anyhow!("not room participants"))?;
1147 }
1148
1149 Ok(())
1150 })
1151 .await
1152 }
1153
1154 /// Adds the given follower connection as a follower of the given leader connection.
1155 pub async fn follow(
1156 &self,
1157 room_id: RoomId,
1158 project_id: ProjectId,
1159 leader_connection: ConnectionId,
1160 follower_connection: ConnectionId,
1161 ) -> Result<TransactionGuard<proto::Room>> {
1162 self.room_transaction(room_id, |tx| async move {
1163 follower::ActiveModel {
1164 room_id: ActiveValue::set(room_id),
1165 project_id: ActiveValue::set(project_id),
1166 leader_connection_server_id: ActiveValue::set(ServerId(
1167 leader_connection.owner_id as i32,
1168 )),
1169 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1170 follower_connection_server_id: ActiveValue::set(ServerId(
1171 follower_connection.owner_id as i32,
1172 )),
1173 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1174 ..Default::default()
1175 }
1176 .insert(&*tx)
1177 .await?;
1178
1179 let room = self.get_room(room_id, &tx).await?;
1180 Ok(room)
1181 })
1182 .await
1183 }
1184
1185 /// Removes the given follower connection as a follower of the given leader connection.
1186 pub async fn unfollow(
1187 &self,
1188 room_id: RoomId,
1189 project_id: ProjectId,
1190 leader_connection: ConnectionId,
1191 follower_connection: ConnectionId,
1192 ) -> Result<TransactionGuard<proto::Room>> {
1193 self.room_transaction(room_id, |tx| async move {
1194 follower::Entity::delete_many()
1195 .filter(
1196 Condition::all()
1197 .add(follower::Column::RoomId.eq(room_id))
1198 .add(follower::Column::ProjectId.eq(project_id))
1199 .add(
1200 follower::Column::LeaderConnectionServerId
1201 .eq(leader_connection.owner_id),
1202 )
1203 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1204 .add(
1205 follower::Column::FollowerConnectionServerId
1206 .eq(follower_connection.owner_id),
1207 )
1208 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1209 )
1210 .exec(&*tx)
1211 .await?;
1212
1213 let room = self.get_room(room_id, &tx).await?;
1214 Ok(room)
1215 })
1216 .await
1217 }
1218}