1use anyhow::Context as _;
2use util::ResultExt;
3
4use super::*;
5
6impl Database {
7 /// Returns the count of all projects, excluding ones marked as admin.
8 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
9 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
10 enum QueryAs {
11 Count,
12 }
13
14 self.transaction(|tx| async move {
15 Ok(project::Entity::find()
16 .select_only()
17 .column_as(project::Column::Id.count(), QueryAs::Count)
18 .inner_join(user::Entity)
19 .filter(user::Column::Admin.eq(false))
20 .into_values::<_, QueryAs>()
21 .one(&*tx)
22 .await?
23 .unwrap_or(0i64) as usize)
24 })
25 .await
26 }
27
28 /// Shares a project with the given room.
29 pub async fn share_project(
30 &self,
31 room_id: RoomId,
32 connection: ConnectionId,
33 worktrees: &[proto::WorktreeMetadata],
34 is_ssh_project: bool,
35 ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
36 self.room_transaction(room_id, |tx| async move {
37 let participant = room_participant::Entity::find()
38 .filter(
39 Condition::all()
40 .add(
41 room_participant::Column::AnsweringConnectionId
42 .eq(connection.id as i32),
43 )
44 .add(
45 room_participant::Column::AnsweringConnectionServerId
46 .eq(connection.owner_id as i32),
47 ),
48 )
49 .one(&*tx)
50 .await?
51 .ok_or_else(|| anyhow!("could not find participant"))?;
52 if participant.room_id != room_id {
53 return Err(anyhow!("shared project on unexpected room"))?;
54 }
55 if !participant
56 .role
57 .unwrap_or(ChannelRole::Member)
58 .can_edit_projects()
59 {
60 return Err(anyhow!("guests cannot share projects"))?;
61 }
62
63 let project = project::ActiveModel {
64 room_id: ActiveValue::set(Some(participant.room_id)),
65 host_user_id: ActiveValue::set(Some(participant.user_id)),
66 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
67 host_connection_server_id: ActiveValue::set(Some(ServerId(
68 connection.owner_id as i32,
69 ))),
70 id: ActiveValue::NotSet,
71 }
72 .insert(&*tx)
73 .await?;
74
75 if !worktrees.is_empty() {
76 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
77 worktree::ActiveModel {
78 id: ActiveValue::set(worktree.id as i64),
79 project_id: ActiveValue::set(project.id),
80 abs_path: ActiveValue::set(worktree.abs_path.clone()),
81 root_name: ActiveValue::set(worktree.root_name.clone()),
82 visible: ActiveValue::set(worktree.visible),
83 scan_id: ActiveValue::set(0),
84 completed_scan_id: ActiveValue::set(0),
85 }
86 }))
87 .exec(&*tx)
88 .await?;
89 }
90
91 let replica_id = if is_ssh_project { 1 } else { 0 };
92
93 project_collaborator::ActiveModel {
94 project_id: ActiveValue::set(project.id),
95 connection_id: ActiveValue::set(connection.id as i32),
96 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
97 user_id: ActiveValue::set(participant.user_id),
98 replica_id: ActiveValue::set(ReplicaId(replica_id)),
99 is_host: ActiveValue::set(true),
100 ..Default::default()
101 }
102 .insert(&*tx)
103 .await?;
104
105 let room = self.get_room(room_id, &tx).await?;
106 Ok((project.id, room))
107 })
108 .await
109 }
110
111 pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
112 self.weak_transaction(|tx| async move {
113 project::Entity::delete_by_id(project_id).exec(&*tx).await?;
114 Ok(())
115 })
116 .await
117 }
118
119 /// Unshares the given project.
120 pub async fn unshare_project(
121 &self,
122 project_id: ProjectId,
123 connection: ConnectionId,
124 ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
125 self.project_transaction(project_id, |tx| async move {
126 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
127 let project = project::Entity::find_by_id(project_id)
128 .one(&*tx)
129 .await?
130 .ok_or_else(|| anyhow!("project not found"))?;
131 let room = if let Some(room_id) = project.room_id {
132 Some(self.get_room(room_id, &tx).await?)
133 } else {
134 None
135 };
136 if project.host_connection()? == connection {
137 return Ok((true, room, guest_connection_ids));
138 }
139 Err(anyhow!("cannot unshare a project hosted by another user"))?
140 })
141 .await
142 }
143
144 /// Updates the worktrees associated with the given project.
145 pub async fn update_project(
146 &self,
147 project_id: ProjectId,
148 connection: ConnectionId,
149 worktrees: &[proto::WorktreeMetadata],
150 ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
151 self.project_transaction(project_id, |tx| async move {
152 let project = project::Entity::find_by_id(project_id)
153 .filter(
154 Condition::all()
155 .add(project::Column::HostConnectionId.eq(connection.id as i32))
156 .add(
157 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
158 ),
159 )
160 .one(&*tx)
161 .await?
162 .ok_or_else(|| anyhow!("no such project"))?;
163
164 self.update_project_worktrees(project.id, worktrees, &tx)
165 .await?;
166
167 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
168
169 let room = if let Some(room_id) = project.room_id {
170 Some(self.get_room(room_id, &tx).await?)
171 } else {
172 None
173 };
174
175 Ok((room, guest_connection_ids))
176 })
177 .await
178 }
179
180 pub(in crate::db) async fn update_project_worktrees(
181 &self,
182 project_id: ProjectId,
183 worktrees: &[proto::WorktreeMetadata],
184 tx: &DatabaseTransaction,
185 ) -> Result<()> {
186 if !worktrees.is_empty() {
187 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
188 id: ActiveValue::set(worktree.id as i64),
189 project_id: ActiveValue::set(project_id),
190 abs_path: ActiveValue::set(worktree.abs_path.clone()),
191 root_name: ActiveValue::set(worktree.root_name.clone()),
192 visible: ActiveValue::set(worktree.visible),
193 scan_id: ActiveValue::set(0),
194 completed_scan_id: ActiveValue::set(0),
195 }))
196 .on_conflict(
197 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
198 .update_column(worktree::Column::RootName)
199 .to_owned(),
200 )
201 .exec(tx)
202 .await?;
203 }
204
205 worktree::Entity::delete_many()
206 .filter(worktree::Column::ProjectId.eq(project_id).and(
207 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
208 ))
209 .exec(tx)
210 .await?;
211
212 Ok(())
213 }
214
215 pub async fn update_worktree(
216 &self,
217 update: &proto::UpdateWorktree,
218 connection: ConnectionId,
219 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
220 if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
221 || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
222 {
223 return Err(anyhow!(
224 "invalid worktree update. removed entries: {}, updated entries: {}",
225 update.removed_entries.len(),
226 update.updated_entries.len()
227 ))?;
228 }
229
230 let project_id = ProjectId::from_proto(update.project_id);
231 let worktree_id = update.worktree_id as i64;
232 self.project_transaction(project_id, |tx| async move {
233 // Ensure the update comes from the host.
234 let _project = project::Entity::find_by_id(project_id)
235 .filter(
236 Condition::all()
237 .add(project::Column::HostConnectionId.eq(connection.id as i32))
238 .add(
239 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
240 ),
241 )
242 .one(&*tx)
243 .await?
244 .ok_or_else(|| anyhow!("no such project: {project_id}"))?;
245
246 // Update metadata.
247 worktree::Entity::update(worktree::ActiveModel {
248 id: ActiveValue::set(worktree_id),
249 project_id: ActiveValue::set(project_id),
250 root_name: ActiveValue::set(update.root_name.clone()),
251 scan_id: ActiveValue::set(update.scan_id as i64),
252 completed_scan_id: if update.is_last_update {
253 ActiveValue::set(update.scan_id as i64)
254 } else {
255 ActiveValue::default()
256 },
257 abs_path: ActiveValue::set(update.abs_path.clone()),
258 ..Default::default()
259 })
260 .exec(&*tx)
261 .await?;
262
263 if !update.updated_entries.is_empty() {
264 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
265 let mtime = entry.mtime.clone().unwrap_or_default();
266 worktree_entry::ActiveModel {
267 project_id: ActiveValue::set(project_id),
268 worktree_id: ActiveValue::set(worktree_id),
269 id: ActiveValue::set(entry.id as i64),
270 is_dir: ActiveValue::set(entry.is_dir),
271 path: ActiveValue::set(entry.path.clone()),
272 inode: ActiveValue::set(entry.inode as i64),
273 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
274 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
275 canonical_path: ActiveValue::set(entry.canonical_path.clone()),
276 is_ignored: ActiveValue::set(entry.is_ignored),
277 is_external: ActiveValue::set(entry.is_external),
278 git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
279 is_deleted: ActiveValue::set(false),
280 scan_id: ActiveValue::set(update.scan_id as i64),
281 is_fifo: ActiveValue::set(entry.is_fifo),
282 }
283 }))
284 .on_conflict(
285 OnConflict::columns([
286 worktree_entry::Column::ProjectId,
287 worktree_entry::Column::WorktreeId,
288 worktree_entry::Column::Id,
289 ])
290 .update_columns([
291 worktree_entry::Column::IsDir,
292 worktree_entry::Column::Path,
293 worktree_entry::Column::Inode,
294 worktree_entry::Column::MtimeSeconds,
295 worktree_entry::Column::MtimeNanos,
296 worktree_entry::Column::CanonicalPath,
297 worktree_entry::Column::IsIgnored,
298 worktree_entry::Column::GitStatus,
299 worktree_entry::Column::ScanId,
300 ])
301 .to_owned(),
302 )
303 .exec(&*tx)
304 .await?;
305 }
306
307 if !update.removed_entries.is_empty() {
308 worktree_entry::Entity::update_many()
309 .filter(
310 worktree_entry::Column::ProjectId
311 .eq(project_id)
312 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
313 .and(
314 worktree_entry::Column::Id
315 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
316 ),
317 )
318 .set(worktree_entry::ActiveModel {
319 is_deleted: ActiveValue::Set(true),
320 scan_id: ActiveValue::Set(update.scan_id as i64),
321 ..Default::default()
322 })
323 .exec(&*tx)
324 .await?;
325 }
326
327 if !update.updated_repositories.is_empty() {
328 worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
329 |repository| worktree_repository::ActiveModel {
330 project_id: ActiveValue::set(project_id),
331 worktree_id: ActiveValue::set(worktree_id),
332 work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
333 scan_id: ActiveValue::set(update.scan_id as i64),
334 branch: ActiveValue::set(repository.branch.clone()),
335 is_deleted: ActiveValue::set(false),
336 },
337 ))
338 .on_conflict(
339 OnConflict::columns([
340 worktree_repository::Column::ProjectId,
341 worktree_repository::Column::WorktreeId,
342 worktree_repository::Column::WorkDirectoryId,
343 ])
344 .update_columns([
345 worktree_repository::Column::ScanId,
346 worktree_repository::Column::Branch,
347 ])
348 .to_owned(),
349 )
350 .exec(&*tx)
351 .await?;
352 }
353
354 if !update.removed_repositories.is_empty() {
355 worktree_repository::Entity::update_many()
356 .filter(
357 worktree_repository::Column::ProjectId
358 .eq(project_id)
359 .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
360 .and(
361 worktree_repository::Column::WorkDirectoryId
362 .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
363 ),
364 )
365 .set(worktree_repository::ActiveModel {
366 is_deleted: ActiveValue::Set(true),
367 scan_id: ActiveValue::Set(update.scan_id as i64),
368 ..Default::default()
369 })
370 .exec(&*tx)
371 .await?;
372 }
373
374 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
375 Ok(connection_ids)
376 })
377 .await
378 }
379
380 /// Updates the diagnostic summary for the given connection.
381 pub async fn update_diagnostic_summary(
382 &self,
383 update: &proto::UpdateDiagnosticSummary,
384 connection: ConnectionId,
385 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
386 let project_id = ProjectId::from_proto(update.project_id);
387 let worktree_id = update.worktree_id as i64;
388 self.project_transaction(project_id, |tx| async move {
389 let summary = update
390 .summary
391 .as_ref()
392 .ok_or_else(|| anyhow!("invalid summary"))?;
393
394 // Ensure the update comes from the host.
395 let project = project::Entity::find_by_id(project_id)
396 .one(&*tx)
397 .await?
398 .ok_or_else(|| anyhow!("no such project"))?;
399 if project.host_connection()? != connection {
400 return Err(anyhow!("can't update a project hosted by someone else"))?;
401 }
402
403 // Update summary.
404 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
405 project_id: ActiveValue::set(project_id),
406 worktree_id: ActiveValue::set(worktree_id),
407 path: ActiveValue::set(summary.path.clone()),
408 language_server_id: ActiveValue::set(summary.language_server_id as i64),
409 error_count: ActiveValue::set(summary.error_count as i32),
410 warning_count: ActiveValue::set(summary.warning_count as i32),
411 })
412 .on_conflict(
413 OnConflict::columns([
414 worktree_diagnostic_summary::Column::ProjectId,
415 worktree_diagnostic_summary::Column::WorktreeId,
416 worktree_diagnostic_summary::Column::Path,
417 ])
418 .update_columns([
419 worktree_diagnostic_summary::Column::LanguageServerId,
420 worktree_diagnostic_summary::Column::ErrorCount,
421 worktree_diagnostic_summary::Column::WarningCount,
422 ])
423 .to_owned(),
424 )
425 .exec(&*tx)
426 .await?;
427
428 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
429 Ok(connection_ids)
430 })
431 .await
432 }
433
434 /// Starts the language server for the given connection.
435 pub async fn start_language_server(
436 &self,
437 update: &proto::StartLanguageServer,
438 connection: ConnectionId,
439 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
440 let project_id = ProjectId::from_proto(update.project_id);
441 self.project_transaction(project_id, |tx| async move {
442 let server = update
443 .server
444 .as_ref()
445 .ok_or_else(|| anyhow!("invalid language server"))?;
446
447 // Ensure the update comes from the host.
448 let project = project::Entity::find_by_id(project_id)
449 .one(&*tx)
450 .await?
451 .ok_or_else(|| anyhow!("no such project"))?;
452 if project.host_connection()? != connection {
453 return Err(anyhow!("can't update a project hosted by someone else"))?;
454 }
455
456 // Add the newly-started language server.
457 language_server::Entity::insert(language_server::ActiveModel {
458 project_id: ActiveValue::set(project_id),
459 id: ActiveValue::set(server.id as i64),
460 name: ActiveValue::set(server.name.clone()),
461 })
462 .on_conflict(
463 OnConflict::columns([
464 language_server::Column::ProjectId,
465 language_server::Column::Id,
466 ])
467 .update_column(language_server::Column::Name)
468 .to_owned(),
469 )
470 .exec(&*tx)
471 .await?;
472
473 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
474 Ok(connection_ids)
475 })
476 .await
477 }
478
479 /// Updates the worktree settings for the given connection.
480 pub async fn update_worktree_settings(
481 &self,
482 update: &proto::UpdateWorktreeSettings,
483 connection: ConnectionId,
484 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
485 let project_id = ProjectId::from_proto(update.project_id);
486 let kind = match update.kind {
487 Some(kind) => proto::LocalSettingsKind::from_i32(kind)
488 .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
489 None => proto::LocalSettingsKind::Settings,
490 };
491 let kind = LocalSettingsKind::from_proto(kind);
492 self.project_transaction(project_id, |tx| async move {
493 // Ensure the update comes from the host.
494 let project = project::Entity::find_by_id(project_id)
495 .one(&*tx)
496 .await?
497 .ok_or_else(|| anyhow!("no such project"))?;
498 if project.host_connection()? != connection {
499 return Err(anyhow!("can't update a project hosted by someone else"))?;
500 }
501
502 if let Some(content) = &update.content {
503 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
504 project_id: ActiveValue::Set(project_id),
505 worktree_id: ActiveValue::Set(update.worktree_id as i64),
506 path: ActiveValue::Set(update.path.clone()),
507 content: ActiveValue::Set(content.clone()),
508 kind: ActiveValue::Set(kind),
509 })
510 .on_conflict(
511 OnConflict::columns([
512 worktree_settings_file::Column::ProjectId,
513 worktree_settings_file::Column::WorktreeId,
514 worktree_settings_file::Column::Path,
515 ])
516 .update_column(worktree_settings_file::Column::Content)
517 .to_owned(),
518 )
519 .exec(&*tx)
520 .await?;
521 } else {
522 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
523 project_id: ActiveValue::Set(project_id),
524 worktree_id: ActiveValue::Set(update.worktree_id as i64),
525 path: ActiveValue::Set(update.path.clone()),
526 ..Default::default()
527 })
528 .exec(&*tx)
529 .await?;
530 }
531
532 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
533 Ok(connection_ids)
534 })
535 .await
536 }
537
538 pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
539 self.transaction(|tx| async move {
540 Ok(project::Entity::find_by_id(id)
541 .one(&*tx)
542 .await?
543 .ok_or_else(|| anyhow!("no such project"))?)
544 })
545 .await
546 }
547
548 /// Adds the given connection to the specified project
549 /// in the current room.
550 pub async fn join_project(
551 &self,
552 project_id: ProjectId,
553 connection: ConnectionId,
554 user_id: UserId,
555 ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
556 self.project_transaction(project_id, |tx| async move {
557 let (project, role) = self
558 .access_project(project_id, connection, Capability::ReadOnly, &tx)
559 .await?;
560 self.join_project_internal(project, user_id, connection, role, &tx)
561 .await
562 })
563 .await
564 }
565
566 async fn join_project_internal(
567 &self,
568 project: project::Model,
569 user_id: UserId,
570 connection: ConnectionId,
571 role: ChannelRole,
572 tx: &DatabaseTransaction,
573 ) -> Result<(Project, ReplicaId)> {
574 let mut collaborators = project
575 .find_related(project_collaborator::Entity)
576 .all(tx)
577 .await?;
578 let replica_ids = collaborators
579 .iter()
580 .map(|c| c.replica_id)
581 .collect::<HashSet<_>>();
582 let mut replica_id = ReplicaId(1);
583 while replica_ids.contains(&replica_id) {
584 replica_id.0 += 1;
585 }
586 let new_collaborator = project_collaborator::ActiveModel {
587 project_id: ActiveValue::set(project.id),
588 connection_id: ActiveValue::set(connection.id as i32),
589 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
590 user_id: ActiveValue::set(user_id),
591 replica_id: ActiveValue::set(replica_id),
592 is_host: ActiveValue::set(false),
593 ..Default::default()
594 }
595 .insert(tx)
596 .await?;
597 collaborators.push(new_collaborator);
598
599 let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
600 let mut worktrees = db_worktrees
601 .into_iter()
602 .map(|db_worktree| {
603 (
604 db_worktree.id as u64,
605 Worktree {
606 id: db_worktree.id as u64,
607 abs_path: db_worktree.abs_path,
608 root_name: db_worktree.root_name,
609 visible: db_worktree.visible,
610 entries: Default::default(),
611 repository_entries: Default::default(),
612 diagnostic_summaries: Default::default(),
613 settings_files: Default::default(),
614 scan_id: db_worktree.scan_id as u64,
615 completed_scan_id: db_worktree.completed_scan_id as u64,
616 },
617 )
618 })
619 .collect::<BTreeMap<_, _>>();
620
621 // Populate worktree entries.
622 {
623 let mut db_entries = worktree_entry::Entity::find()
624 .filter(
625 Condition::all()
626 .add(worktree_entry::Column::ProjectId.eq(project.id))
627 .add(worktree_entry::Column::IsDeleted.eq(false)),
628 )
629 .stream(tx)
630 .await?;
631 while let Some(db_entry) = db_entries.next().await {
632 let db_entry = db_entry?;
633 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
634 worktree.entries.push(proto::Entry {
635 id: db_entry.id as u64,
636 is_dir: db_entry.is_dir,
637 path: db_entry.path,
638 inode: db_entry.inode as u64,
639 mtime: Some(proto::Timestamp {
640 seconds: db_entry.mtime_seconds as u64,
641 nanos: db_entry.mtime_nanos as u32,
642 }),
643 canonical_path: db_entry.canonical_path,
644 is_ignored: db_entry.is_ignored,
645 is_external: db_entry.is_external,
646 git_status: db_entry.git_status.map(|status| status as i32),
647 // This is only used in the summarization backlog, so if it's None,
648 // that just means we won't be able to detect when to resummarize
649 // based on total number of backlogged bytes - instead, we'd go
650 // on number of files only. That shouldn't be a huge deal in practice.
651 size: None,
652 is_fifo: db_entry.is_fifo,
653 });
654 }
655 }
656 }
657
658 // Populate repository entries.
659 {
660 let mut db_repository_entries = worktree_repository::Entity::find()
661 .filter(
662 Condition::all()
663 .add(worktree_repository::Column::ProjectId.eq(project.id))
664 .add(worktree_repository::Column::IsDeleted.eq(false)),
665 )
666 .stream(tx)
667 .await?;
668 while let Some(db_repository_entry) = db_repository_entries.next().await {
669 let db_repository_entry = db_repository_entry?;
670 if let Some(worktree) = worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
671 {
672 worktree.repository_entries.insert(
673 db_repository_entry.work_directory_id as u64,
674 proto::RepositoryEntry {
675 work_directory_id: db_repository_entry.work_directory_id as u64,
676 branch: db_repository_entry.branch,
677 },
678 );
679 }
680 }
681 }
682
683 // Populate worktree diagnostic summaries.
684 {
685 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
686 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
687 .stream(tx)
688 .await?;
689 while let Some(db_summary) = db_summaries.next().await {
690 let db_summary = db_summary?;
691 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
692 worktree
693 .diagnostic_summaries
694 .push(proto::DiagnosticSummary {
695 path: db_summary.path,
696 language_server_id: db_summary.language_server_id as u64,
697 error_count: db_summary.error_count as u32,
698 warning_count: db_summary.warning_count as u32,
699 });
700 }
701 }
702 }
703
704 // Populate worktree settings files
705 {
706 let mut db_settings_files = worktree_settings_file::Entity::find()
707 .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
708 .stream(tx)
709 .await?;
710 while let Some(db_settings_file) = db_settings_files.next().await {
711 let db_settings_file = db_settings_file?;
712 if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
713 worktree.settings_files.push(WorktreeSettingsFile {
714 path: db_settings_file.path,
715 content: db_settings_file.content,
716 kind: db_settings_file.kind,
717 });
718 }
719 }
720 }
721
722 // Populate language servers.
723 let language_servers = project
724 .find_related(language_server::Entity)
725 .all(tx)
726 .await?;
727
728 let project = Project {
729 id: project.id,
730 role,
731 collaborators: collaborators
732 .into_iter()
733 .map(|collaborator| ProjectCollaborator {
734 connection_id: collaborator.connection(),
735 user_id: collaborator.user_id,
736 replica_id: collaborator.replica_id,
737 is_host: collaborator.is_host,
738 })
739 .collect(),
740 worktrees,
741 language_servers: language_servers
742 .into_iter()
743 .map(|language_server| proto::LanguageServer {
744 id: language_server.id as u64,
745 name: language_server.name,
746 worktree_id: None,
747 })
748 .collect(),
749 };
750 Ok((project, replica_id as ReplicaId))
751 }
752
753 pub async fn leave_hosted_project(
754 &self,
755 project_id: ProjectId,
756 connection: ConnectionId,
757 ) -> Result<LeftProject> {
758 self.transaction(|tx| async move {
759 let result = project_collaborator::Entity::delete_many()
760 .filter(
761 Condition::all()
762 .add(project_collaborator::Column::ProjectId.eq(project_id))
763 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
764 .add(
765 project_collaborator::Column::ConnectionServerId
766 .eq(connection.owner_id as i32),
767 ),
768 )
769 .exec(&*tx)
770 .await?;
771 if result.rows_affected == 0 {
772 return Err(anyhow!("not in the project"))?;
773 }
774
775 let project = project::Entity::find_by_id(project_id)
776 .one(&*tx)
777 .await?
778 .ok_or_else(|| anyhow!("no such project"))?;
779 let collaborators = project
780 .find_related(project_collaborator::Entity)
781 .all(&*tx)
782 .await?;
783 let connection_ids = collaborators
784 .into_iter()
785 .map(|collaborator| collaborator.connection())
786 .collect();
787 Ok(LeftProject {
788 id: project.id,
789 connection_ids,
790 should_unshare: false,
791 })
792 })
793 .await
794 }
795
796 /// Removes the given connection from the specified project.
797 pub async fn leave_project(
798 &self,
799 project_id: ProjectId,
800 connection: ConnectionId,
801 ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
802 self.project_transaction(project_id, |tx| async move {
803 let result = project_collaborator::Entity::delete_many()
804 .filter(
805 Condition::all()
806 .add(project_collaborator::Column::ProjectId.eq(project_id))
807 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
808 .add(
809 project_collaborator::Column::ConnectionServerId
810 .eq(connection.owner_id as i32),
811 ),
812 )
813 .exec(&*tx)
814 .await?;
815 if result.rows_affected == 0 {
816 Err(anyhow!("not a collaborator on this project"))?;
817 }
818
819 let project = project::Entity::find_by_id(project_id)
820 .one(&*tx)
821 .await?
822 .ok_or_else(|| anyhow!("no such project"))?;
823 let collaborators = project
824 .find_related(project_collaborator::Entity)
825 .all(&*tx)
826 .await?;
827 let connection_ids: Vec<ConnectionId> = collaborators
828 .into_iter()
829 .map(|collaborator| collaborator.connection())
830 .collect();
831
832 follower::Entity::delete_many()
833 .filter(
834 Condition::any()
835 .add(
836 Condition::all()
837 .add(follower::Column::ProjectId.eq(Some(project_id)))
838 .add(
839 follower::Column::LeaderConnectionServerId
840 .eq(connection.owner_id),
841 )
842 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
843 )
844 .add(
845 Condition::all()
846 .add(follower::Column::ProjectId.eq(Some(project_id)))
847 .add(
848 follower::Column::FollowerConnectionServerId
849 .eq(connection.owner_id),
850 )
851 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
852 ),
853 )
854 .exec(&*tx)
855 .await?;
856
857 let room = if let Some(room_id) = project.room_id {
858 Some(self.get_room(room_id, &tx).await?)
859 } else {
860 None
861 };
862
863 let left_project = LeftProject {
864 id: project_id,
865 should_unshare: connection == project.host_connection()?,
866 connection_ids,
867 };
868 Ok((room, left_project))
869 })
870 .await
871 }
872
873 pub async fn check_user_is_project_host(
874 &self,
875 project_id: ProjectId,
876 connection_id: ConnectionId,
877 ) -> Result<()> {
878 self.project_transaction(project_id, |tx| async move {
879 project::Entity::find()
880 .filter(
881 Condition::all()
882 .add(project::Column::Id.eq(project_id))
883 .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
884 .add(
885 project::Column::HostConnectionServerId
886 .eq(Some(connection_id.owner_id as i32)),
887 ),
888 )
889 .one(&*tx)
890 .await?
891 .ok_or_else(|| anyhow!("failed to read project host"))?;
892
893 Ok(())
894 })
895 .await
896 .map(|guard| guard.into_inner())
897 }
898
899 /// Returns the current project if the given user is authorized to access it with the specified capability.
900 pub async fn access_project(
901 &self,
902 project_id: ProjectId,
903 connection_id: ConnectionId,
904 capability: Capability,
905 tx: &DatabaseTransaction,
906 ) -> Result<(project::Model, ChannelRole)> {
907 let project = project::Entity::find_by_id(project_id)
908 .one(tx)
909 .await?
910 .ok_or_else(|| anyhow!("no such project"))?;
911
912 let role_from_room = if let Some(room_id) = project.room_id {
913 room_participant::Entity::find()
914 .filter(room_participant::Column::RoomId.eq(room_id))
915 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
916 .one(tx)
917 .await?
918 .and_then(|participant| participant.role)
919 } else {
920 None
921 };
922
923 let role = role_from_room.unwrap_or(ChannelRole::Banned);
924
925 match capability {
926 Capability::ReadWrite => {
927 if !role.can_edit_projects() {
928 return Err(anyhow!("not authorized to edit projects"))?;
929 }
930 }
931 Capability::ReadOnly => {
932 if !role.can_read_projects() {
933 return Err(anyhow!("not authorized to read projects"))?;
934 }
935 }
936 }
937
938 Ok((project, role))
939 }
940
941 /// Returns the host connection for a read-only request to join a shared project.
942 pub async fn host_for_read_only_project_request(
943 &self,
944 project_id: ProjectId,
945 connection_id: ConnectionId,
946 ) -> Result<ConnectionId> {
947 self.project_transaction(project_id, |tx| async move {
948 let (project, _) = self
949 .access_project(project_id, connection_id, Capability::ReadOnly, &tx)
950 .await?;
951 project.host_connection()
952 })
953 .await
954 .map(|guard| guard.into_inner())
955 }
956
957 /// Returns the host connection for a request to join a shared project.
958 pub async fn host_for_mutating_project_request(
959 &self,
960 project_id: ProjectId,
961 connection_id: ConnectionId,
962 ) -> Result<ConnectionId> {
963 self.project_transaction(project_id, |tx| async move {
964 let (project, _) = self
965 .access_project(project_id, connection_id, Capability::ReadWrite, &tx)
966 .await?;
967 project.host_connection()
968 })
969 .await
970 .map(|guard| guard.into_inner())
971 }
972
973 pub async fn connections_for_buffer_update(
974 &self,
975 project_id: ProjectId,
976 connection_id: ConnectionId,
977 capability: Capability,
978 ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
979 self.project_transaction(project_id, |tx| async move {
980 // Authorize
981 let (project, _) = self
982 .access_project(project_id, connection_id, capability, &tx)
983 .await?;
984
985 let host_connection_id = project.host_connection()?;
986
987 let collaborators = project_collaborator::Entity::find()
988 .filter(project_collaborator::Column::ProjectId.eq(project_id))
989 .all(&*tx)
990 .await?;
991
992 let guest_connection_ids = collaborators
993 .into_iter()
994 .filter_map(|collaborator| {
995 if collaborator.is_host {
996 None
997 } else {
998 Some(collaborator.connection())
999 }
1000 })
1001 .collect();
1002
1003 Ok((host_connection_id, guest_connection_ids))
1004 })
1005 .await
1006 }
1007
1008 /// Returns the connection IDs in the given project.
1009 ///
1010 /// The provided `connection_id` must also be a collaborator in the project,
1011 /// otherwise an error will be returned.
1012 pub async fn project_connection_ids(
1013 &self,
1014 project_id: ProjectId,
1015 connection_id: ConnectionId,
1016 exclude_dev_server: bool,
1017 ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1018 self.project_transaction(project_id, |tx| async move {
1019 let project = project::Entity::find_by_id(project_id)
1020 .one(&*tx)
1021 .await?
1022 .ok_or_else(|| anyhow!("no such project"))?;
1023
1024 let mut collaborators = project_collaborator::Entity::find()
1025 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1026 .stream(&*tx)
1027 .await?;
1028
1029 let mut connection_ids = HashSet::default();
1030 if let Some(host_connection) = project.host_connection().log_err() {
1031 if !exclude_dev_server {
1032 connection_ids.insert(host_connection);
1033 }
1034 }
1035
1036 while let Some(collaborator) = collaborators.next().await {
1037 let collaborator = collaborator?;
1038 connection_ids.insert(collaborator.connection());
1039 }
1040
1041 if connection_ids.contains(&connection_id)
1042 || Some(connection_id) == project.host_connection().ok()
1043 {
1044 Ok(connection_ids)
1045 } else {
1046 Err(anyhow!(
1047 "can only send project updates to a project you're in"
1048 ))?
1049 }
1050 })
1051 .await
1052 }
1053
1054 async fn project_guest_connection_ids(
1055 &self,
1056 project_id: ProjectId,
1057 tx: &DatabaseTransaction,
1058 ) -> Result<Vec<ConnectionId>> {
1059 let mut collaborators = project_collaborator::Entity::find()
1060 .filter(
1061 project_collaborator::Column::ProjectId
1062 .eq(project_id)
1063 .and(project_collaborator::Column::IsHost.eq(false)),
1064 )
1065 .stream(tx)
1066 .await?;
1067
1068 let mut guest_connection_ids = Vec::new();
1069 while let Some(collaborator) = collaborators.next().await {
1070 let collaborator = collaborator?;
1071 guest_connection_ids.push(collaborator.connection());
1072 }
1073 Ok(guest_connection_ids)
1074 }
1075
1076 /// Returns the [`RoomId`] for the given project.
1077 pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1078 self.transaction(|tx| async move {
1079 Ok(project::Entity::find_by_id(project_id)
1080 .one(&*tx)
1081 .await?
1082 .and_then(|project| project.room_id))
1083 })
1084 .await
1085 }
1086
1087 pub async fn check_room_participants(
1088 &self,
1089 room_id: RoomId,
1090 leader_id: ConnectionId,
1091 follower_id: ConnectionId,
1092 ) -> Result<()> {
1093 self.transaction(|tx| async move {
1094 use room_participant::Column;
1095
1096 let count = room_participant::Entity::find()
1097 .filter(
1098 Condition::all().add(Column::RoomId.eq(room_id)).add(
1099 Condition::any()
1100 .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1101 Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1102 ))
1103 .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1104 Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1105 )),
1106 ),
1107 )
1108 .count(&*tx)
1109 .await?;
1110
1111 if count < 2 {
1112 Err(anyhow!("not room participants"))?;
1113 }
1114
1115 Ok(())
1116 })
1117 .await
1118 }
1119
1120 /// Adds the given follower connection as a follower of the given leader connection.
1121 pub async fn follow(
1122 &self,
1123 room_id: RoomId,
1124 project_id: ProjectId,
1125 leader_connection: ConnectionId,
1126 follower_connection: ConnectionId,
1127 ) -> Result<TransactionGuard<proto::Room>> {
1128 self.room_transaction(room_id, |tx| async move {
1129 follower::ActiveModel {
1130 room_id: ActiveValue::set(room_id),
1131 project_id: ActiveValue::set(project_id),
1132 leader_connection_server_id: ActiveValue::set(ServerId(
1133 leader_connection.owner_id as i32,
1134 )),
1135 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1136 follower_connection_server_id: ActiveValue::set(ServerId(
1137 follower_connection.owner_id as i32,
1138 )),
1139 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1140 ..Default::default()
1141 }
1142 .insert(&*tx)
1143 .await?;
1144
1145 let room = self.get_room(room_id, &tx).await?;
1146 Ok(room)
1147 })
1148 .await
1149 }
1150
1151 /// Removes the given follower connection as a follower of the given leader connection.
1152 pub async fn unfollow(
1153 &self,
1154 room_id: RoomId,
1155 project_id: ProjectId,
1156 leader_connection: ConnectionId,
1157 follower_connection: ConnectionId,
1158 ) -> Result<TransactionGuard<proto::Room>> {
1159 self.room_transaction(room_id, |tx| async move {
1160 follower::Entity::delete_many()
1161 .filter(
1162 Condition::all()
1163 .add(follower::Column::RoomId.eq(room_id))
1164 .add(follower::Column::ProjectId.eq(project_id))
1165 .add(
1166 follower::Column::LeaderConnectionServerId
1167 .eq(leader_connection.owner_id),
1168 )
1169 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1170 .add(
1171 follower::Column::FollowerConnectionServerId
1172 .eq(follower_connection.owner_id),
1173 )
1174 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1175 )
1176 .exec(&*tx)
1177 .await?;
1178
1179 let room = self.get_room(room_id, &tx).await?;
1180 Ok(room)
1181 })
1182 .await
1183 }
1184}