1use anyhow::Context as _;
2use util::ResultExt;
3
4use super::*;
5
6impl Database {
7 /// Returns the count of all projects, excluding ones marked as admin.
8 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
9 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
10 enum QueryAs {
11 Count,
12 }
13
14 self.transaction(|tx| async move {
15 Ok(project::Entity::find()
16 .select_only()
17 .column_as(project::Column::Id.count(), QueryAs::Count)
18 .inner_join(user::Entity)
19 .filter(user::Column::Admin.eq(false))
20 .into_values::<_, QueryAs>()
21 .one(&*tx)
22 .await?
23 .unwrap_or(0i64) as usize)
24 })
25 .await
26 }
27
28 /// Shares a project with the given room.
29 pub async fn share_project(
30 &self,
31 room_id: RoomId,
32 connection: ConnectionId,
33 worktrees: &[proto::WorktreeMetadata],
34 is_ssh_project: bool,
35 ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
36 self.room_transaction(room_id, |tx| async move {
37 let participant = room_participant::Entity::find()
38 .filter(
39 Condition::all()
40 .add(
41 room_participant::Column::AnsweringConnectionId
42 .eq(connection.id as i32),
43 )
44 .add(
45 room_participant::Column::AnsweringConnectionServerId
46 .eq(connection.owner_id as i32),
47 ),
48 )
49 .one(&*tx)
50 .await?
51 .ok_or_else(|| anyhow!("could not find participant"))?;
52 if participant.room_id != room_id {
53 return Err(anyhow!("shared project on unexpected room"))?;
54 }
55 if !participant
56 .role
57 .unwrap_or(ChannelRole::Member)
58 .can_edit_projects()
59 {
60 return Err(anyhow!("guests cannot share projects"))?;
61 }
62
63 let project = project::ActiveModel {
64 room_id: ActiveValue::set(Some(participant.room_id)),
65 host_user_id: ActiveValue::set(Some(participant.user_id)),
66 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
67 host_connection_server_id: ActiveValue::set(Some(ServerId(
68 connection.owner_id as i32,
69 ))),
70 id: ActiveValue::NotSet,
71 }
72 .insert(&*tx)
73 .await?;
74
75 if !worktrees.is_empty() {
76 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
77 worktree::ActiveModel {
78 id: ActiveValue::set(worktree.id as i64),
79 project_id: ActiveValue::set(project.id),
80 abs_path: ActiveValue::set(worktree.abs_path.clone()),
81 root_name: ActiveValue::set(worktree.root_name.clone()),
82 visible: ActiveValue::set(worktree.visible),
83 scan_id: ActiveValue::set(0),
84 completed_scan_id: ActiveValue::set(0),
85 }
86 }))
87 .exec(&*tx)
88 .await?;
89 }
90
91 let replica_id = if is_ssh_project { 1 } else { 0 };
92
93 project_collaborator::ActiveModel {
94 project_id: ActiveValue::set(project.id),
95 connection_id: ActiveValue::set(connection.id as i32),
96 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
97 user_id: ActiveValue::set(participant.user_id),
98 replica_id: ActiveValue::set(ReplicaId(replica_id)),
99 is_host: ActiveValue::set(true),
100 ..Default::default()
101 }
102 .insert(&*tx)
103 .await?;
104
105 let room = self.get_room(room_id, &tx).await?;
106 Ok((project.id, room))
107 })
108 .await
109 }
110
111 pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
112 self.weak_transaction(|tx| async move {
113 project::Entity::delete_by_id(project_id).exec(&*tx).await?;
114 Ok(())
115 })
116 .await
117 }
118
119 /// Unshares the given project.
120 pub async fn unshare_project(
121 &self,
122 project_id: ProjectId,
123 connection: ConnectionId,
124 ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
125 self.project_transaction(project_id, |tx| async move {
126 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
127 let project = project::Entity::find_by_id(project_id)
128 .one(&*tx)
129 .await?
130 .ok_or_else(|| anyhow!("project not found"))?;
131 let room = if let Some(room_id) = project.room_id {
132 Some(self.get_room(room_id, &tx).await?)
133 } else {
134 None
135 };
136 if project.host_connection()? == connection {
137 return Ok((true, room, guest_connection_ids));
138 }
139 Err(anyhow!("cannot unshare a project hosted by another user"))?
140 })
141 .await
142 }
143
144 /// Updates the worktrees associated with the given project.
145 pub async fn update_project(
146 &self,
147 project_id: ProjectId,
148 connection: ConnectionId,
149 worktrees: &[proto::WorktreeMetadata],
150 ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
151 self.project_transaction(project_id, |tx| async move {
152 let project = project::Entity::find_by_id(project_id)
153 .filter(
154 Condition::all()
155 .add(project::Column::HostConnectionId.eq(connection.id as i32))
156 .add(
157 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
158 ),
159 )
160 .one(&*tx)
161 .await?
162 .ok_or_else(|| anyhow!("no such project"))?;
163
164 self.update_project_worktrees(project.id, worktrees, &tx)
165 .await?;
166
167 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
168
169 let room = if let Some(room_id) = project.room_id {
170 Some(self.get_room(room_id, &tx).await?)
171 } else {
172 None
173 };
174
175 Ok((room, guest_connection_ids))
176 })
177 .await
178 }
179
180 pub(in crate::db) async fn update_project_worktrees(
181 &self,
182 project_id: ProjectId,
183 worktrees: &[proto::WorktreeMetadata],
184 tx: &DatabaseTransaction,
185 ) -> Result<()> {
186 if !worktrees.is_empty() {
187 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
188 id: ActiveValue::set(worktree.id as i64),
189 project_id: ActiveValue::set(project_id),
190 abs_path: ActiveValue::set(worktree.abs_path.clone()),
191 root_name: ActiveValue::set(worktree.root_name.clone()),
192 visible: ActiveValue::set(worktree.visible),
193 scan_id: ActiveValue::set(0),
194 completed_scan_id: ActiveValue::set(0),
195 }))
196 .on_conflict(
197 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
198 .update_column(worktree::Column::RootName)
199 .to_owned(),
200 )
201 .exec(tx)
202 .await?;
203 }
204
205 worktree::Entity::delete_many()
206 .filter(worktree::Column::ProjectId.eq(project_id).and(
207 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
208 ))
209 .exec(tx)
210 .await?;
211
212 Ok(())
213 }
214
215 pub async fn update_worktree(
216 &self,
217 update: &proto::UpdateWorktree,
218 connection: ConnectionId,
219 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
220 if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
221 || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
222 {
223 return Err(anyhow!(
224 "invalid worktree update. removed entries: {}, updated entries: {}",
225 update.removed_entries.len(),
226 update.updated_entries.len()
227 ))?;
228 }
229
230 let project_id = ProjectId::from_proto(update.project_id);
231 let worktree_id = update.worktree_id as i64;
232 self.project_transaction(project_id, |tx| async move {
233 // Ensure the update comes from the host.
234 let _project = project::Entity::find_by_id(project_id)
235 .filter(
236 Condition::all()
237 .add(project::Column::HostConnectionId.eq(connection.id as i32))
238 .add(
239 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
240 ),
241 )
242 .one(&*tx)
243 .await?
244 .ok_or_else(|| anyhow!("no such project: {project_id}"))?;
245
246 // Update metadata.
247 worktree::Entity::update(worktree::ActiveModel {
248 id: ActiveValue::set(worktree_id),
249 project_id: ActiveValue::set(project_id),
250 root_name: ActiveValue::set(update.root_name.clone()),
251 scan_id: ActiveValue::set(update.scan_id as i64),
252 completed_scan_id: if update.is_last_update {
253 ActiveValue::set(update.scan_id as i64)
254 } else {
255 ActiveValue::default()
256 },
257 abs_path: ActiveValue::set(update.abs_path.clone()),
258 ..Default::default()
259 })
260 .exec(&*tx)
261 .await?;
262
263 if !update.updated_entries.is_empty() {
264 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
265 let mtime = entry.mtime.clone().unwrap_or_default();
266 worktree_entry::ActiveModel {
267 project_id: ActiveValue::set(project_id),
268 worktree_id: ActiveValue::set(worktree_id),
269 id: ActiveValue::set(entry.id as i64),
270 is_dir: ActiveValue::set(entry.is_dir),
271 path: ActiveValue::set(entry.path.clone()),
272 inode: ActiveValue::set(entry.inode as i64),
273 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
274 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
275 canonical_path: ActiveValue::set(entry.canonical_path.clone()),
276 is_ignored: ActiveValue::set(entry.is_ignored),
277 is_external: ActiveValue::set(entry.is_external),
278 git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
279 is_deleted: ActiveValue::set(false),
280 scan_id: ActiveValue::set(update.scan_id as i64),
281 is_fifo: ActiveValue::set(entry.is_fifo),
282 }
283 }))
284 .on_conflict(
285 OnConflict::columns([
286 worktree_entry::Column::ProjectId,
287 worktree_entry::Column::WorktreeId,
288 worktree_entry::Column::Id,
289 ])
290 .update_columns([
291 worktree_entry::Column::IsDir,
292 worktree_entry::Column::Path,
293 worktree_entry::Column::Inode,
294 worktree_entry::Column::MtimeSeconds,
295 worktree_entry::Column::MtimeNanos,
296 worktree_entry::Column::CanonicalPath,
297 worktree_entry::Column::IsIgnored,
298 worktree_entry::Column::GitStatus,
299 worktree_entry::Column::ScanId,
300 ])
301 .to_owned(),
302 )
303 .exec(&*tx)
304 .await?;
305 }
306
307 if !update.removed_entries.is_empty() {
308 worktree_entry::Entity::update_many()
309 .filter(
310 worktree_entry::Column::ProjectId
311 .eq(project_id)
312 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
313 .and(
314 worktree_entry::Column::Id
315 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
316 ),
317 )
318 .set(worktree_entry::ActiveModel {
319 is_deleted: ActiveValue::Set(true),
320 scan_id: ActiveValue::Set(update.scan_id as i64),
321 ..Default::default()
322 })
323 .exec(&*tx)
324 .await?;
325 }
326
327 if !update.updated_repositories.is_empty() {
328 worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
329 |repository| worktree_repository::ActiveModel {
330 project_id: ActiveValue::set(project_id),
331 worktree_id: ActiveValue::set(worktree_id),
332 work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
333 scan_id: ActiveValue::set(update.scan_id as i64),
334 branch: ActiveValue::set(repository.branch.clone()),
335 is_deleted: ActiveValue::set(false),
336 },
337 ))
338 .on_conflict(
339 OnConflict::columns([
340 worktree_repository::Column::ProjectId,
341 worktree_repository::Column::WorktreeId,
342 worktree_repository::Column::WorkDirectoryId,
343 ])
344 .update_columns([
345 worktree_repository::Column::ScanId,
346 worktree_repository::Column::Branch,
347 ])
348 .to_owned(),
349 )
350 .exec(&*tx)
351 .await?;
352 }
353
354 if !update.removed_repositories.is_empty() {
355 worktree_repository::Entity::update_many()
356 .filter(
357 worktree_repository::Column::ProjectId
358 .eq(project_id)
359 .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
360 .and(
361 worktree_repository::Column::WorkDirectoryId
362 .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
363 ),
364 )
365 .set(worktree_repository::ActiveModel {
366 is_deleted: ActiveValue::Set(true),
367 scan_id: ActiveValue::Set(update.scan_id as i64),
368 ..Default::default()
369 })
370 .exec(&*tx)
371 .await?;
372 }
373
374 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
375 Ok(connection_ids)
376 })
377 .await
378 }
379
380 /// Updates the diagnostic summary for the given connection.
381 pub async fn update_diagnostic_summary(
382 &self,
383 update: &proto::UpdateDiagnosticSummary,
384 connection: ConnectionId,
385 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
386 let project_id = ProjectId::from_proto(update.project_id);
387 let worktree_id = update.worktree_id as i64;
388 self.project_transaction(project_id, |tx| async move {
389 let summary = update
390 .summary
391 .as_ref()
392 .ok_or_else(|| anyhow!("invalid summary"))?;
393
394 // Ensure the update comes from the host.
395 let project = project::Entity::find_by_id(project_id)
396 .one(&*tx)
397 .await?
398 .ok_or_else(|| anyhow!("no such project"))?;
399 if project.host_connection()? != connection {
400 return Err(anyhow!("can't update a project hosted by someone else"))?;
401 }
402
403 // Update summary.
404 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
405 project_id: ActiveValue::set(project_id),
406 worktree_id: ActiveValue::set(worktree_id),
407 path: ActiveValue::set(summary.path.clone()),
408 language_server_id: ActiveValue::set(summary.language_server_id as i64),
409 error_count: ActiveValue::set(summary.error_count as i32),
410 warning_count: ActiveValue::set(summary.warning_count as i32),
411 })
412 .on_conflict(
413 OnConflict::columns([
414 worktree_diagnostic_summary::Column::ProjectId,
415 worktree_diagnostic_summary::Column::WorktreeId,
416 worktree_diagnostic_summary::Column::Path,
417 ])
418 .update_columns([
419 worktree_diagnostic_summary::Column::LanguageServerId,
420 worktree_diagnostic_summary::Column::ErrorCount,
421 worktree_diagnostic_summary::Column::WarningCount,
422 ])
423 .to_owned(),
424 )
425 .exec(&*tx)
426 .await?;
427
428 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
429 Ok(connection_ids)
430 })
431 .await
432 }
433
434 /// Starts the language server for the given connection.
435 pub async fn start_language_server(
436 &self,
437 update: &proto::StartLanguageServer,
438 connection: ConnectionId,
439 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
440 let project_id = ProjectId::from_proto(update.project_id);
441 self.project_transaction(project_id, |tx| async move {
442 let server = update
443 .server
444 .as_ref()
445 .ok_or_else(|| anyhow!("invalid language server"))?;
446
447 // Ensure the update comes from the host.
448 let project = project::Entity::find_by_id(project_id)
449 .one(&*tx)
450 .await?
451 .ok_or_else(|| anyhow!("no such project"))?;
452 if project.host_connection()? != connection {
453 return Err(anyhow!("can't update a project hosted by someone else"))?;
454 }
455
456 // Add the newly-started language server.
457 language_server::Entity::insert(language_server::ActiveModel {
458 project_id: ActiveValue::set(project_id),
459 id: ActiveValue::set(server.id as i64),
460 name: ActiveValue::set(server.name.clone()),
461 })
462 .on_conflict(
463 OnConflict::columns([
464 language_server::Column::ProjectId,
465 language_server::Column::Id,
466 ])
467 .update_column(language_server::Column::Name)
468 .to_owned(),
469 )
470 .exec(&*tx)
471 .await?;
472
473 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
474 Ok(connection_ids)
475 })
476 .await
477 }
478
479 /// Updates the worktree settings for the given connection.
480 pub async fn update_worktree_settings(
481 &self,
482 update: &proto::UpdateWorktreeSettings,
483 connection: ConnectionId,
484 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
485 let project_id = ProjectId::from_proto(update.project_id);
486 let kind = match update.kind {
487 Some(kind) => proto::LocalSettingsKind::from_i32(kind)
488 .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
489 None => proto::LocalSettingsKind::Settings,
490 };
491 let kind = LocalSettingsKind::from_proto(kind);
492 self.project_transaction(project_id, |tx| async move {
493 // Ensure the update comes from the host.
494 let project = project::Entity::find_by_id(project_id)
495 .one(&*tx)
496 .await?
497 .ok_or_else(|| anyhow!("no such project"))?;
498 if project.host_connection()? != connection {
499 return Err(anyhow!("can't update a project hosted by someone else"))?;
500 }
501
502 if let Some(content) = &update.content {
503 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
504 project_id: ActiveValue::Set(project_id),
505 worktree_id: ActiveValue::Set(update.worktree_id as i64),
506 path: ActiveValue::Set(update.path.clone()),
507 content: ActiveValue::Set(content.clone()),
508 kind: ActiveValue::Set(kind),
509 })
510 .on_conflict(
511 OnConflict::columns([
512 worktree_settings_file::Column::ProjectId,
513 worktree_settings_file::Column::WorktreeId,
514 worktree_settings_file::Column::Path,
515 ])
516 .update_column(worktree_settings_file::Column::Content)
517 .to_owned(),
518 )
519 .exec(&*tx)
520 .await?;
521 } else {
522 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
523 project_id: ActiveValue::Set(project_id),
524 worktree_id: ActiveValue::Set(update.worktree_id as i64),
525 path: ActiveValue::Set(update.path.clone()),
526 ..Default::default()
527 })
528 .exec(&*tx)
529 .await?;
530 }
531
532 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
533 Ok(connection_ids)
534 })
535 .await
536 }
537
538 pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
539 self.transaction(|tx| async move {
540 Ok(project::Entity::find_by_id(id)
541 .one(&*tx)
542 .await?
543 .ok_or_else(|| anyhow!("no such project"))?)
544 })
545 .await
546 }
547
548 /// Adds the given connection to the specified project
549 /// in the current room.
550 pub async fn join_project(
551 &self,
552 project_id: ProjectId,
553 connection: ConnectionId,
554 user_id: UserId,
555 ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
556 self.project_transaction(project_id, |tx| async move {
557 let (project, role) = self
558 .access_project(project_id, connection, Capability::ReadOnly, &tx)
559 .await?;
560 self.join_project_internal(project, user_id, connection, role, &tx)
561 .await
562 })
563 .await
564 }
565
566 async fn join_project_internal(
567 &self,
568 project: project::Model,
569 user_id: UserId,
570 connection: ConnectionId,
571 role: ChannelRole,
572 tx: &DatabaseTransaction,
573 ) -> Result<(Project, ReplicaId)> {
574 let mut collaborators = project
575 .find_related(project_collaborator::Entity)
576 .all(tx)
577 .await?;
578 let replica_ids = collaborators
579 .iter()
580 .map(|c| c.replica_id)
581 .collect::<HashSet<_>>();
582 let mut replica_id = ReplicaId(1);
583 while replica_ids.contains(&replica_id) {
584 replica_id.0 += 1;
585 }
586 let new_collaborator = project_collaborator::ActiveModel {
587 project_id: ActiveValue::set(project.id),
588 connection_id: ActiveValue::set(connection.id as i32),
589 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
590 user_id: ActiveValue::set(user_id),
591 replica_id: ActiveValue::set(replica_id),
592 is_host: ActiveValue::set(false),
593 ..Default::default()
594 }
595 .insert(tx)
596 .await?;
597 collaborators.push(new_collaborator);
598
599 let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
600 let mut worktrees = db_worktrees
601 .into_iter()
602 .map(|db_worktree| {
603 (
604 db_worktree.id as u64,
605 Worktree {
606 id: db_worktree.id as u64,
607 abs_path: db_worktree.abs_path,
608 root_name: db_worktree.root_name,
609 visible: db_worktree.visible,
610 entries: Default::default(),
611 repository_entries: Default::default(),
612 diagnostic_summaries: Default::default(),
613 settings_files: Default::default(),
614 scan_id: db_worktree.scan_id as u64,
615 completed_scan_id: db_worktree.completed_scan_id as u64,
616 },
617 )
618 })
619 .collect::<BTreeMap<_, _>>();
620
621 // Populate worktree entries.
622 {
623 let mut db_entries = worktree_entry::Entity::find()
624 .filter(
625 Condition::all()
626 .add(worktree_entry::Column::ProjectId.eq(project.id))
627 .add(worktree_entry::Column::IsDeleted.eq(false)),
628 )
629 .stream(tx)
630 .await?;
631 while let Some(db_entry) = db_entries.next().await {
632 let db_entry = db_entry?;
633 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
634 worktree.entries.push(proto::Entry {
635 id: db_entry.id as u64,
636 is_dir: db_entry.is_dir,
637 path: db_entry.path,
638 inode: db_entry.inode as u64,
639 mtime: Some(proto::Timestamp {
640 seconds: db_entry.mtime_seconds as u64,
641 nanos: db_entry.mtime_nanos as u32,
642 }),
643 canonical_path: db_entry.canonical_path,
644 is_ignored: db_entry.is_ignored,
645 is_external: db_entry.is_external,
646 git_status: db_entry.git_status.map(|status| status as i32),
647 // This is only used in the summarization backlog, so if it's None,
648 // that just means we won't be able to detect when to resummarize
649 // based on total number of backlogged bytes - instead, we'd go
650 // on number of files only. That shouldn't be a huge deal in practice.
651 size: None,
652 is_fifo: db_entry.is_fifo,
653 });
654 }
655 }
656 }
657
658 // Populate repository entries.
659 {
660 let mut db_repository_entries = worktree_repository::Entity::find()
661 .filter(
662 Condition::all()
663 .add(worktree_repository::Column::ProjectId.eq(project.id))
664 .add(worktree_repository::Column::IsDeleted.eq(false)),
665 )
666 .stream(tx)
667 .await?;
668 while let Some(db_repository_entry) = db_repository_entries.next().await {
669 let db_repository_entry = db_repository_entry?;
670 if let Some(worktree) = worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
671 {
672 worktree.repository_entries.insert(
673 db_repository_entry.work_directory_id as u64,
674 proto::RepositoryEntry {
675 work_directory_id: db_repository_entry.work_directory_id as u64,
676 branch: db_repository_entry.branch,
677 },
678 );
679 }
680 }
681 }
682
683 // Populate worktree diagnostic summaries.
684 {
685 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
686 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
687 .stream(tx)
688 .await?;
689 while let Some(db_summary) = db_summaries.next().await {
690 let db_summary = db_summary?;
691 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
692 worktree
693 .diagnostic_summaries
694 .push(proto::DiagnosticSummary {
695 path: db_summary.path,
696 language_server_id: db_summary.language_server_id as u64,
697 error_count: db_summary.error_count as u32,
698 warning_count: db_summary.warning_count as u32,
699 });
700 }
701 }
702 }
703
704 // Populate worktree settings files
705 {
706 let mut db_settings_files = worktree_settings_file::Entity::find()
707 .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
708 .stream(tx)
709 .await?;
710 while let Some(db_settings_file) = db_settings_files.next().await {
711 let db_settings_file = db_settings_file?;
712 if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
713 worktree.settings_files.push(WorktreeSettingsFile {
714 path: db_settings_file.path,
715 content: db_settings_file.content,
716 kind: db_settings_file.kind,
717 });
718 }
719 }
720 }
721
722 // Populate language servers.
723 let language_servers = project
724 .find_related(language_server::Entity)
725 .all(tx)
726 .await?;
727
728 let project = Project {
729 id: project.id,
730 role,
731 collaborators: collaborators
732 .into_iter()
733 .map(|collaborator| ProjectCollaborator {
734 connection_id: collaborator.connection(),
735 user_id: collaborator.user_id,
736 replica_id: collaborator.replica_id,
737 is_host: collaborator.is_host,
738 })
739 .collect(),
740 worktrees,
741 language_servers: language_servers
742 .into_iter()
743 .map(|language_server| proto::LanguageServer {
744 id: language_server.id as u64,
745 name: language_server.name,
746 worktree_id: None,
747 })
748 .collect(),
749 };
750 Ok((project, replica_id as ReplicaId))
751 }
752
753 /// Removes the given connection from the specified project.
754 pub async fn leave_project(
755 &self,
756 project_id: ProjectId,
757 connection: ConnectionId,
758 ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
759 self.project_transaction(project_id, |tx| async move {
760 let result = project_collaborator::Entity::delete_many()
761 .filter(
762 Condition::all()
763 .add(project_collaborator::Column::ProjectId.eq(project_id))
764 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
765 .add(
766 project_collaborator::Column::ConnectionServerId
767 .eq(connection.owner_id as i32),
768 ),
769 )
770 .exec(&*tx)
771 .await?;
772 if result.rows_affected == 0 {
773 Err(anyhow!("not a collaborator on this project"))?;
774 }
775
776 let project = project::Entity::find_by_id(project_id)
777 .one(&*tx)
778 .await?
779 .ok_or_else(|| anyhow!("no such project"))?;
780 let collaborators = project
781 .find_related(project_collaborator::Entity)
782 .all(&*tx)
783 .await?;
784 let connection_ids: Vec<ConnectionId> = collaborators
785 .into_iter()
786 .map(|collaborator| collaborator.connection())
787 .collect();
788
789 follower::Entity::delete_many()
790 .filter(
791 Condition::any()
792 .add(
793 Condition::all()
794 .add(follower::Column::ProjectId.eq(Some(project_id)))
795 .add(
796 follower::Column::LeaderConnectionServerId
797 .eq(connection.owner_id),
798 )
799 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
800 )
801 .add(
802 Condition::all()
803 .add(follower::Column::ProjectId.eq(Some(project_id)))
804 .add(
805 follower::Column::FollowerConnectionServerId
806 .eq(connection.owner_id),
807 )
808 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
809 ),
810 )
811 .exec(&*tx)
812 .await?;
813
814 let room = if let Some(room_id) = project.room_id {
815 Some(self.get_room(room_id, &tx).await?)
816 } else {
817 None
818 };
819
820 let left_project = LeftProject {
821 id: project_id,
822 should_unshare: connection == project.host_connection()?,
823 connection_ids,
824 };
825 Ok((room, left_project))
826 })
827 .await
828 }
829
830 pub async fn check_user_is_project_host(
831 &self,
832 project_id: ProjectId,
833 connection_id: ConnectionId,
834 ) -> Result<()> {
835 self.project_transaction(project_id, |tx| async move {
836 project::Entity::find()
837 .filter(
838 Condition::all()
839 .add(project::Column::Id.eq(project_id))
840 .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
841 .add(
842 project::Column::HostConnectionServerId
843 .eq(Some(connection_id.owner_id as i32)),
844 ),
845 )
846 .one(&*tx)
847 .await?
848 .ok_or_else(|| anyhow!("failed to read project host"))?;
849
850 Ok(())
851 })
852 .await
853 .map(|guard| guard.into_inner())
854 }
855
856 /// Returns the current project if the given user is authorized to access it with the specified capability.
857 pub async fn access_project(
858 &self,
859 project_id: ProjectId,
860 connection_id: ConnectionId,
861 capability: Capability,
862 tx: &DatabaseTransaction,
863 ) -> Result<(project::Model, ChannelRole)> {
864 let project = project::Entity::find_by_id(project_id)
865 .one(tx)
866 .await?
867 .ok_or_else(|| anyhow!("no such project"))?;
868
869 let role_from_room = if let Some(room_id) = project.room_id {
870 room_participant::Entity::find()
871 .filter(room_participant::Column::RoomId.eq(room_id))
872 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
873 .one(tx)
874 .await?
875 .and_then(|participant| participant.role)
876 } else {
877 None
878 };
879
880 let role = role_from_room.unwrap_or(ChannelRole::Banned);
881
882 match capability {
883 Capability::ReadWrite => {
884 if !role.can_edit_projects() {
885 return Err(anyhow!("not authorized to edit projects"))?;
886 }
887 }
888 Capability::ReadOnly => {
889 if !role.can_read_projects() {
890 return Err(anyhow!("not authorized to read projects"))?;
891 }
892 }
893 }
894
895 Ok((project, role))
896 }
897
898 /// Returns the host connection for a read-only request to join a shared project.
899 pub async fn host_for_read_only_project_request(
900 &self,
901 project_id: ProjectId,
902 connection_id: ConnectionId,
903 ) -> Result<ConnectionId> {
904 self.project_transaction(project_id, |tx| async move {
905 let (project, _) = self
906 .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
907 .await?;
908 project.host_connection()
909 })
910 .await
911 .map(|guard| guard.into_inner())
912 }
913
914 /// Returns the host connection for a request to join a shared project.
915 pub async fn host_for_mutating_project_request(
916 &self,
917 project_id: ProjectId,
918 connection_id: ConnectionId,
919 ) -> Result<ConnectionId> {
920 self.project_transaction(project_id, |tx| async move {
921 let (project, _) = self
922 .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
923 .await?;
924 project.host_connection()
925 })
926 .await
927 .map(|guard| guard.into_inner())
928 }
929
930 pub async fn connections_for_buffer_update(
931 &self,
932 project_id: ProjectId,
933 connection_id: ConnectionId,
934 capability: Capability,
935 ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
936 self.project_transaction(project_id, |tx| async move {
937 // Authorize
938 let (project, _) = self
939 .access_project(project_id, connection_id, capability, &tx)
940 .await?;
941
942 let host_connection_id = project.host_connection()?;
943
944 let collaborators = project_collaborator::Entity::find()
945 .filter(project_collaborator::Column::ProjectId.eq(project_id))
946 .all(&*tx)
947 .await?;
948
949 let guest_connection_ids = collaborators
950 .into_iter()
951 .filter_map(|collaborator| {
952 if collaborator.is_host {
953 None
954 } else {
955 Some(collaborator.connection())
956 }
957 })
958 .collect();
959
960 Ok((host_connection_id, guest_connection_ids))
961 })
962 .await
963 }
964
965 /// Returns the connection IDs in the given project.
966 ///
967 /// The provided `connection_id` must also be a collaborator in the project,
968 /// otherwise an error will be returned.
969 pub async fn project_connection_ids(
970 &self,
971 project_id: ProjectId,
972 connection_id: ConnectionId,
973 exclude_dev_server: bool,
974 ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
975 self.project_transaction(project_id, |tx| async move {
976 let project = project::Entity::find_by_id(project_id)
977 .one(&*tx)
978 .await?
979 .ok_or_else(|| anyhow!("no such project"))?;
980
981 let mut collaborators = project_collaborator::Entity::find()
982 .filter(project_collaborator::Column::ProjectId.eq(project_id))
983 .stream(&*tx)
984 .await?;
985
986 let mut connection_ids = HashSet::default();
987 if let Some(host_connection) = project.host_connection().log_err() {
988 if !exclude_dev_server {
989 connection_ids.insert(host_connection);
990 }
991 }
992
993 while let Some(collaborator) = collaborators.next().await {
994 let collaborator = collaborator?;
995 connection_ids.insert(collaborator.connection());
996 }
997
998 if connection_ids.contains(&connection_id)
999 || Some(connection_id) == project.host_connection().ok()
1000 {
1001 Ok(connection_ids)
1002 } else {
1003 Err(anyhow!(
1004 "can only send project updates to a project you're in"
1005 ))?
1006 }
1007 })
1008 .await
1009 }
1010
1011 async fn project_guest_connection_ids(
1012 &self,
1013 project_id: ProjectId,
1014 tx: &DatabaseTransaction,
1015 ) -> Result<Vec<ConnectionId>> {
1016 let mut collaborators = project_collaborator::Entity::find()
1017 .filter(
1018 project_collaborator::Column::ProjectId
1019 .eq(project_id)
1020 .and(project_collaborator::Column::IsHost.eq(false)),
1021 )
1022 .stream(tx)
1023 .await?;
1024
1025 let mut guest_connection_ids = Vec::new();
1026 while let Some(collaborator) = collaborators.next().await {
1027 let collaborator = collaborator?;
1028 guest_connection_ids.push(collaborator.connection());
1029 }
1030 Ok(guest_connection_ids)
1031 }
1032
1033 /// Returns the [`RoomId`] for the given project.
1034 pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1035 self.transaction(|tx| async move {
1036 Ok(project::Entity::find_by_id(project_id)
1037 .one(&*tx)
1038 .await?
1039 .and_then(|project| project.room_id))
1040 })
1041 .await
1042 }
1043
1044 pub async fn check_room_participants(
1045 &self,
1046 room_id: RoomId,
1047 leader_id: ConnectionId,
1048 follower_id: ConnectionId,
1049 ) -> Result<()> {
1050 self.transaction(|tx| async move {
1051 use room_participant::Column;
1052
1053 let count = room_participant::Entity::find()
1054 .filter(
1055 Condition::all().add(Column::RoomId.eq(room_id)).add(
1056 Condition::any()
1057 .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1058 Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1059 ))
1060 .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1061 Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1062 )),
1063 ),
1064 )
1065 .count(&*tx)
1066 .await?;
1067
1068 if count < 2 {
1069 Err(anyhow!("not room participants"))?;
1070 }
1071
1072 Ok(())
1073 })
1074 .await
1075 }
1076
1077 /// Adds the given follower connection as a follower of the given leader connection.
1078 pub async fn follow(
1079 &self,
1080 room_id: RoomId,
1081 project_id: ProjectId,
1082 leader_connection: ConnectionId,
1083 follower_connection: ConnectionId,
1084 ) -> Result<TransactionGuard<proto::Room>> {
1085 self.room_transaction(room_id, |tx| async move {
1086 follower::ActiveModel {
1087 room_id: ActiveValue::set(room_id),
1088 project_id: ActiveValue::set(project_id),
1089 leader_connection_server_id: ActiveValue::set(ServerId(
1090 leader_connection.owner_id as i32,
1091 )),
1092 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1093 follower_connection_server_id: ActiveValue::set(ServerId(
1094 follower_connection.owner_id as i32,
1095 )),
1096 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1097 ..Default::default()
1098 }
1099 .insert(&*tx)
1100 .await?;
1101
1102 let room = self.get_room(room_id, &tx).await?;
1103 Ok(room)
1104 })
1105 .await
1106 }
1107
1108 /// Removes the given follower connection as a follower of the given leader connection.
1109 pub async fn unfollow(
1110 &self,
1111 room_id: RoomId,
1112 project_id: ProjectId,
1113 leader_connection: ConnectionId,
1114 follower_connection: ConnectionId,
1115 ) -> Result<TransactionGuard<proto::Room>> {
1116 self.room_transaction(room_id, |tx| async move {
1117 follower::Entity::delete_many()
1118 .filter(
1119 Condition::all()
1120 .add(follower::Column::RoomId.eq(room_id))
1121 .add(follower::Column::ProjectId.eq(project_id))
1122 .add(
1123 follower::Column::LeaderConnectionServerId
1124 .eq(leader_connection.owner_id),
1125 )
1126 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1127 .add(
1128 follower::Column::FollowerConnectionServerId
1129 .eq(follower_connection.owner_id),
1130 )
1131 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1132 )
1133 .exec(&*tx)
1134 .await?;
1135
1136 let room = self.get_room(room_id, &tx).await?;
1137 Ok(room)
1138 })
1139 .await
1140 }
1141}