1use util::ResultExt;
2
3use super::*;
4
5impl Database {
6 /// Returns the count of all projects, excluding ones marked as admin.
7 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
8 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
9 enum QueryAs {
10 Count,
11 }
12
13 self.transaction(|tx| async move {
14 Ok(project::Entity::find()
15 .select_only()
16 .column_as(project::Column::Id.count(), QueryAs::Count)
17 .inner_join(user::Entity)
18 .filter(user::Column::Admin.eq(false))
19 .into_values::<_, QueryAs>()
20 .one(&*tx)
21 .await?
22 .unwrap_or(0i64) as usize)
23 })
24 .await
25 }
26
27 /// Shares a project with the given room.
28 pub async fn share_project(
29 &self,
30 room_id: RoomId,
31 connection: ConnectionId,
32 worktrees: &[proto::WorktreeMetadata],
33 dev_server_project_id: Option<DevServerProjectId>,
34 ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
35 self.room_transaction(room_id, |tx| async move {
36 let participant = room_participant::Entity::find()
37 .filter(
38 Condition::all()
39 .add(
40 room_participant::Column::AnsweringConnectionId
41 .eq(connection.id as i32),
42 )
43 .add(
44 room_participant::Column::AnsweringConnectionServerId
45 .eq(connection.owner_id as i32),
46 ),
47 )
48 .one(&*tx)
49 .await?
50 .ok_or_else(|| anyhow!("could not find participant"))?;
51 if participant.room_id != room_id {
52 return Err(anyhow!("shared project on unexpected room"))?;
53 }
54 if !participant
55 .role
56 .unwrap_or(ChannelRole::Member)
57 .can_edit_projects()
58 {
59 return Err(anyhow!("guests cannot share projects"))?;
60 }
61
62 if let Some(dev_server_project_id) = dev_server_project_id {
63 let project = project::Entity::find()
64 .filter(project::Column::DevServerProjectId.eq(Some(dev_server_project_id)))
65 .one(&*tx)
66 .await?
67 .ok_or_else(|| anyhow!("no remote project"))?;
68
69 let (_, dev_server) = dev_server_project::Entity::find_by_id(dev_server_project_id)
70 .find_also_related(dev_server::Entity)
71 .one(&*tx)
72 .await?
73 .ok_or_else(|| anyhow!("no dev_server_project"))?;
74
75 if !dev_server.is_some_and(|dev_server| dev_server.user_id == participant.user_id) {
76 return Err(anyhow!("not your dev server"))?;
77 }
78
79 if project.room_id.is_some() {
80 return Err(anyhow!("project already shared"))?;
81 };
82
83 let project = project::Entity::update(project::ActiveModel {
84 room_id: ActiveValue::Set(Some(room_id)),
85 ..project.into_active_model()
86 })
87 .exec(&*tx)
88 .await?;
89
90 let room = self.get_room(room_id, &tx).await?;
91 return Ok((project.id, room));
92 }
93
94 let project = project::ActiveModel {
95 room_id: ActiveValue::set(Some(participant.room_id)),
96 host_user_id: ActiveValue::set(Some(participant.user_id)),
97 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
98 host_connection_server_id: ActiveValue::set(Some(ServerId(
99 connection.owner_id as i32,
100 ))),
101 id: ActiveValue::NotSet,
102 hosted_project_id: ActiveValue::Set(None),
103 dev_server_project_id: ActiveValue::Set(None),
104 }
105 .insert(&*tx)
106 .await?;
107
108 if !worktrees.is_empty() {
109 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
110 worktree::ActiveModel {
111 id: ActiveValue::set(worktree.id as i64),
112 project_id: ActiveValue::set(project.id),
113 abs_path: ActiveValue::set(worktree.abs_path.clone()),
114 root_name: ActiveValue::set(worktree.root_name.clone()),
115 visible: ActiveValue::set(worktree.visible),
116 scan_id: ActiveValue::set(0),
117 completed_scan_id: ActiveValue::set(0),
118 }
119 }))
120 .exec(&*tx)
121 .await?;
122 }
123
124 project_collaborator::ActiveModel {
125 project_id: ActiveValue::set(project.id),
126 connection_id: ActiveValue::set(connection.id as i32),
127 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
128 user_id: ActiveValue::set(participant.user_id),
129 replica_id: ActiveValue::set(ReplicaId(0)),
130 is_host: ActiveValue::set(true),
131 ..Default::default()
132 }
133 .insert(&*tx)
134 .await?;
135
136 let room = self.get_room(room_id, &tx).await?;
137 Ok((project.id, room))
138 })
139 .await
140 }
141
142 pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
143 self.weak_transaction(|tx| async move {
144 project::Entity::delete_by_id(project_id).exec(&*tx).await?;
145 Ok(())
146 })
147 .await
148 }
149
150 /// Unshares the given project.
151 pub async fn unshare_project(
152 &self,
153 project_id: ProjectId,
154 connection: ConnectionId,
155 user_id: Option<UserId>,
156 ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
157 self.project_transaction(project_id, |tx| async move {
158 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
159 let project = project::Entity::find_by_id(project_id)
160 .one(&*tx)
161 .await?
162 .ok_or_else(|| anyhow!("project not found"))?;
163 let room = if let Some(room_id) = project.room_id {
164 Some(self.get_room(room_id, &tx).await?)
165 } else {
166 None
167 };
168 if project.host_connection()? == connection {
169 return Ok((true, room, guest_connection_ids));
170 }
171 if let Some(dev_server_project_id) = project.dev_server_project_id {
172 if let Some(user_id) = user_id {
173 if user_id
174 != self
175 .owner_for_dev_server_project(dev_server_project_id, &tx)
176 .await?
177 {
178 Err(anyhow!("cannot unshare a project hosted by another user"))?
179 }
180 project::Entity::update(project::ActiveModel {
181 room_id: ActiveValue::Set(None),
182 ..project.into_active_model()
183 })
184 .exec(&*tx)
185 .await?;
186 return Ok((false, room, guest_connection_ids));
187 }
188 }
189
190 Err(anyhow!("cannot unshare a project hosted by another user"))?
191 })
192 .await
193 }
194
195 /// Updates the worktrees associated with the given project.
196 pub async fn update_project(
197 &self,
198 project_id: ProjectId,
199 connection: ConnectionId,
200 worktrees: &[proto::WorktreeMetadata],
201 ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
202 self.project_transaction(project_id, |tx| async move {
203 let project = project::Entity::find_by_id(project_id)
204 .filter(
205 Condition::all()
206 .add(project::Column::HostConnectionId.eq(connection.id as i32))
207 .add(
208 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
209 ),
210 )
211 .one(&*tx)
212 .await?
213 .ok_or_else(|| anyhow!("no such project"))?;
214
215 self.update_project_worktrees(project.id, worktrees, &tx)
216 .await?;
217
218 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
219
220 let room = if let Some(room_id) = project.room_id {
221 Some(self.get_room(room_id, &tx).await?)
222 } else {
223 None
224 };
225
226 Ok((room, guest_connection_ids))
227 })
228 .await
229 }
230
231 pub(in crate::db) async fn update_project_worktrees(
232 &self,
233 project_id: ProjectId,
234 worktrees: &[proto::WorktreeMetadata],
235 tx: &DatabaseTransaction,
236 ) -> Result<()> {
237 if !worktrees.is_empty() {
238 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
239 id: ActiveValue::set(worktree.id as i64),
240 project_id: ActiveValue::set(project_id),
241 abs_path: ActiveValue::set(worktree.abs_path.clone()),
242 root_name: ActiveValue::set(worktree.root_name.clone()),
243 visible: ActiveValue::set(worktree.visible),
244 scan_id: ActiveValue::set(0),
245 completed_scan_id: ActiveValue::set(0),
246 }))
247 .on_conflict(
248 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
249 .update_column(worktree::Column::RootName)
250 .to_owned(),
251 )
252 .exec(tx)
253 .await?;
254 }
255
256 worktree::Entity::delete_many()
257 .filter(worktree::Column::ProjectId.eq(project_id).and(
258 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
259 ))
260 .exec(tx)
261 .await?;
262
263 Ok(())
264 }
265
266 pub async fn update_worktree(
267 &self,
268 update: &proto::UpdateWorktree,
269 connection: ConnectionId,
270 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
271 let project_id = ProjectId::from_proto(update.project_id);
272 let worktree_id = update.worktree_id as i64;
273 self.project_transaction(project_id, |tx| async move {
274 // Ensure the update comes from the host.
275 let _project = project::Entity::find_by_id(project_id)
276 .filter(
277 Condition::all()
278 .add(project::Column::HostConnectionId.eq(connection.id as i32))
279 .add(
280 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
281 ),
282 )
283 .one(&*tx)
284 .await?
285 .ok_or_else(|| anyhow!("no such project"))?;
286
287 // Update metadata.
288 worktree::Entity::update(worktree::ActiveModel {
289 id: ActiveValue::set(worktree_id),
290 project_id: ActiveValue::set(project_id),
291 root_name: ActiveValue::set(update.root_name.clone()),
292 scan_id: ActiveValue::set(update.scan_id as i64),
293 completed_scan_id: if update.is_last_update {
294 ActiveValue::set(update.scan_id as i64)
295 } else {
296 ActiveValue::default()
297 },
298 abs_path: ActiveValue::set(update.abs_path.clone()),
299 ..Default::default()
300 })
301 .exec(&*tx)
302 .await?;
303
304 if !update.updated_entries.is_empty() {
305 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
306 let mtime = entry.mtime.clone().unwrap_or_default();
307 worktree_entry::ActiveModel {
308 project_id: ActiveValue::set(project_id),
309 worktree_id: ActiveValue::set(worktree_id),
310 id: ActiveValue::set(entry.id as i64),
311 is_dir: ActiveValue::set(entry.is_dir),
312 path: ActiveValue::set(entry.path.clone()),
313 inode: ActiveValue::set(entry.inode as i64),
314 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
315 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
316 is_symlink: ActiveValue::set(entry.is_symlink),
317 is_ignored: ActiveValue::set(entry.is_ignored),
318 is_external: ActiveValue::set(entry.is_external),
319 git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
320 is_deleted: ActiveValue::set(false),
321 scan_id: ActiveValue::set(update.scan_id as i64),
322 is_fifo: ActiveValue::set(entry.is_fifo),
323 }
324 }))
325 .on_conflict(
326 OnConflict::columns([
327 worktree_entry::Column::ProjectId,
328 worktree_entry::Column::WorktreeId,
329 worktree_entry::Column::Id,
330 ])
331 .update_columns([
332 worktree_entry::Column::IsDir,
333 worktree_entry::Column::Path,
334 worktree_entry::Column::Inode,
335 worktree_entry::Column::MtimeSeconds,
336 worktree_entry::Column::MtimeNanos,
337 worktree_entry::Column::IsSymlink,
338 worktree_entry::Column::IsIgnored,
339 worktree_entry::Column::GitStatus,
340 worktree_entry::Column::ScanId,
341 ])
342 .to_owned(),
343 )
344 .exec(&*tx)
345 .await?;
346 }
347
348 if !update.removed_entries.is_empty() {
349 worktree_entry::Entity::update_many()
350 .filter(
351 worktree_entry::Column::ProjectId
352 .eq(project_id)
353 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
354 .and(
355 worktree_entry::Column::Id
356 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
357 ),
358 )
359 .set(worktree_entry::ActiveModel {
360 is_deleted: ActiveValue::Set(true),
361 scan_id: ActiveValue::Set(update.scan_id as i64),
362 ..Default::default()
363 })
364 .exec(&*tx)
365 .await?;
366 }
367
368 if !update.updated_repositories.is_empty() {
369 worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
370 |repository| worktree_repository::ActiveModel {
371 project_id: ActiveValue::set(project_id),
372 worktree_id: ActiveValue::set(worktree_id),
373 work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
374 scan_id: ActiveValue::set(update.scan_id as i64),
375 branch: ActiveValue::set(repository.branch.clone()),
376 is_deleted: ActiveValue::set(false),
377 },
378 ))
379 .on_conflict(
380 OnConflict::columns([
381 worktree_repository::Column::ProjectId,
382 worktree_repository::Column::WorktreeId,
383 worktree_repository::Column::WorkDirectoryId,
384 ])
385 .update_columns([
386 worktree_repository::Column::ScanId,
387 worktree_repository::Column::Branch,
388 ])
389 .to_owned(),
390 )
391 .exec(&*tx)
392 .await?;
393 }
394
395 if !update.removed_repositories.is_empty() {
396 worktree_repository::Entity::update_many()
397 .filter(
398 worktree_repository::Column::ProjectId
399 .eq(project_id)
400 .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
401 .and(
402 worktree_repository::Column::WorkDirectoryId
403 .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
404 ),
405 )
406 .set(worktree_repository::ActiveModel {
407 is_deleted: ActiveValue::Set(true),
408 scan_id: ActiveValue::Set(update.scan_id as i64),
409 ..Default::default()
410 })
411 .exec(&*tx)
412 .await?;
413 }
414
415 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
416 Ok(connection_ids)
417 })
418 .await
419 }
420
421 /// Updates the diagnostic summary for the given connection.
422 pub async fn update_diagnostic_summary(
423 &self,
424 update: &proto::UpdateDiagnosticSummary,
425 connection: ConnectionId,
426 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
427 let project_id = ProjectId::from_proto(update.project_id);
428 let worktree_id = update.worktree_id as i64;
429 self.project_transaction(project_id, |tx| async move {
430 let summary = update
431 .summary
432 .as_ref()
433 .ok_or_else(|| anyhow!("invalid summary"))?;
434
435 // Ensure the update comes from the host.
436 let project = project::Entity::find_by_id(project_id)
437 .one(&*tx)
438 .await?
439 .ok_or_else(|| anyhow!("no such project"))?;
440 if project.host_connection()? != connection {
441 return Err(anyhow!("can't update a project hosted by someone else"))?;
442 }
443
444 // Update summary.
445 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
446 project_id: ActiveValue::set(project_id),
447 worktree_id: ActiveValue::set(worktree_id),
448 path: ActiveValue::set(summary.path.clone()),
449 language_server_id: ActiveValue::set(summary.language_server_id as i64),
450 error_count: ActiveValue::set(summary.error_count as i32),
451 warning_count: ActiveValue::set(summary.warning_count as i32),
452 })
453 .on_conflict(
454 OnConflict::columns([
455 worktree_diagnostic_summary::Column::ProjectId,
456 worktree_diagnostic_summary::Column::WorktreeId,
457 worktree_diagnostic_summary::Column::Path,
458 ])
459 .update_columns([
460 worktree_diagnostic_summary::Column::LanguageServerId,
461 worktree_diagnostic_summary::Column::ErrorCount,
462 worktree_diagnostic_summary::Column::WarningCount,
463 ])
464 .to_owned(),
465 )
466 .exec(&*tx)
467 .await?;
468
469 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
470 Ok(connection_ids)
471 })
472 .await
473 }
474
475 /// Starts the language server for the given connection.
476 pub async fn start_language_server(
477 &self,
478 update: &proto::StartLanguageServer,
479 connection: ConnectionId,
480 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
481 let project_id = ProjectId::from_proto(update.project_id);
482 self.project_transaction(project_id, |tx| async move {
483 let server = update
484 .server
485 .as_ref()
486 .ok_or_else(|| anyhow!("invalid language server"))?;
487
488 // Ensure the update comes from the host.
489 let project = project::Entity::find_by_id(project_id)
490 .one(&*tx)
491 .await?
492 .ok_or_else(|| anyhow!("no such project"))?;
493 if project.host_connection()? != connection {
494 return Err(anyhow!("can't update a project hosted by someone else"))?;
495 }
496
497 // Add the newly-started language server.
498 language_server::Entity::insert(language_server::ActiveModel {
499 project_id: ActiveValue::set(project_id),
500 id: ActiveValue::set(server.id as i64),
501 name: ActiveValue::set(server.name.clone()),
502 })
503 .on_conflict(
504 OnConflict::columns([
505 language_server::Column::ProjectId,
506 language_server::Column::Id,
507 ])
508 .update_column(language_server::Column::Name)
509 .to_owned(),
510 )
511 .exec(&*tx)
512 .await?;
513
514 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
515 Ok(connection_ids)
516 })
517 .await
518 }
519
520 /// Updates the worktree settings for the given connection.
521 pub async fn update_worktree_settings(
522 &self,
523 update: &proto::UpdateWorktreeSettings,
524 connection: ConnectionId,
525 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
526 let project_id = ProjectId::from_proto(update.project_id);
527 self.project_transaction(project_id, |tx| async move {
528 // Ensure the update comes from the host.
529 let project = project::Entity::find_by_id(project_id)
530 .one(&*tx)
531 .await?
532 .ok_or_else(|| anyhow!("no such project"))?;
533 if project.host_connection()? != connection {
534 return Err(anyhow!("can't update a project hosted by someone else"))?;
535 }
536
537 if let Some(content) = &update.content {
538 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
539 project_id: ActiveValue::Set(project_id),
540 worktree_id: ActiveValue::Set(update.worktree_id as i64),
541 path: ActiveValue::Set(update.path.clone()),
542 content: ActiveValue::Set(content.clone()),
543 })
544 .on_conflict(
545 OnConflict::columns([
546 worktree_settings_file::Column::ProjectId,
547 worktree_settings_file::Column::WorktreeId,
548 worktree_settings_file::Column::Path,
549 ])
550 .update_column(worktree_settings_file::Column::Content)
551 .to_owned(),
552 )
553 .exec(&*tx)
554 .await?;
555 } else {
556 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
557 project_id: ActiveValue::Set(project_id),
558 worktree_id: ActiveValue::Set(update.worktree_id as i64),
559 path: ActiveValue::Set(update.path.clone()),
560 ..Default::default()
561 })
562 .exec(&*tx)
563 .await?;
564 }
565
566 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
567 Ok(connection_ids)
568 })
569 .await
570 }
571
572 /// Adds the given connection to the specified hosted project
573 pub async fn join_hosted_project(
574 &self,
575 id: ProjectId,
576 user_id: UserId,
577 connection: ConnectionId,
578 ) -> Result<(Project, ReplicaId)> {
579 self.transaction(|tx| async move {
580 let (project, hosted_project) = project::Entity::find_by_id(id)
581 .find_also_related(hosted_project::Entity)
582 .one(&*tx)
583 .await?
584 .ok_or_else(|| anyhow!("hosted project is no longer shared"))?;
585
586 let Some(hosted_project) = hosted_project else {
587 return Err(anyhow!("project is not hosted"))?;
588 };
589
590 let channel = channel::Entity::find_by_id(hosted_project.channel_id)
591 .one(&*tx)
592 .await?
593 .ok_or_else(|| anyhow!("no such channel"))?;
594
595 let role = self
596 .check_user_is_channel_participant(&channel, user_id, &tx)
597 .await?;
598
599 self.join_project_internal(project, user_id, connection, role, &tx)
600 .await
601 })
602 .await
603 }
604
605 pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
606 self.transaction(|tx| async move {
607 Ok(project::Entity::find_by_id(id)
608 .one(&*tx)
609 .await?
610 .ok_or_else(|| anyhow!("no such project"))?)
611 })
612 .await
613 }
614
615 pub async fn find_dev_server_project(&self, id: DevServerProjectId) -> Result<project::Model> {
616 self.transaction(|tx| async move {
617 Ok(project::Entity::find()
618 .filter(project::Column::DevServerProjectId.eq(id))
619 .one(&*tx)
620 .await?
621 .ok_or_else(|| anyhow!("no such project"))?)
622 })
623 .await
624 }
625
626 /// Adds the given connection to the specified project
627 /// in the current room.
628 pub async fn join_project(
629 &self,
630 project_id: ProjectId,
631 connection: ConnectionId,
632 user_id: UserId,
633 ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
634 self.project_transaction(project_id, |tx| async move {
635 let (project, role) = self
636 .access_project(
637 project_id,
638 connection,
639 PrincipalId::UserId(user_id),
640 Capability::ReadOnly,
641 &tx,
642 )
643 .await?;
644 self.join_project_internal(project, user_id, connection, role, &tx)
645 .await
646 })
647 .await
648 }
649
650 async fn join_project_internal(
651 &self,
652 project: project::Model,
653 user_id: UserId,
654 connection: ConnectionId,
655 role: ChannelRole,
656 tx: &DatabaseTransaction,
657 ) -> Result<(Project, ReplicaId)> {
658 let mut collaborators = project
659 .find_related(project_collaborator::Entity)
660 .all(tx)
661 .await?;
662 let replica_ids = collaborators
663 .iter()
664 .map(|c| c.replica_id)
665 .collect::<HashSet<_>>();
666 let mut replica_id = ReplicaId(1);
667 while replica_ids.contains(&replica_id) {
668 replica_id.0 += 1;
669 }
670 let new_collaborator = project_collaborator::ActiveModel {
671 project_id: ActiveValue::set(project.id),
672 connection_id: ActiveValue::set(connection.id as i32),
673 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
674 user_id: ActiveValue::set(user_id),
675 replica_id: ActiveValue::set(replica_id),
676 is_host: ActiveValue::set(false),
677 ..Default::default()
678 }
679 .insert(tx)
680 .await?;
681 collaborators.push(new_collaborator);
682
683 let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
684 let mut worktrees = db_worktrees
685 .into_iter()
686 .map(|db_worktree| {
687 (
688 db_worktree.id as u64,
689 Worktree {
690 id: db_worktree.id as u64,
691 abs_path: db_worktree.abs_path,
692 root_name: db_worktree.root_name,
693 visible: db_worktree.visible,
694 entries: Default::default(),
695 repository_entries: Default::default(),
696 diagnostic_summaries: Default::default(),
697 settings_files: Default::default(),
698 scan_id: db_worktree.scan_id as u64,
699 completed_scan_id: db_worktree.completed_scan_id as u64,
700 },
701 )
702 })
703 .collect::<BTreeMap<_, _>>();
704
705 // Populate worktree entries.
706 {
707 let mut db_entries = worktree_entry::Entity::find()
708 .filter(
709 Condition::all()
710 .add(worktree_entry::Column::ProjectId.eq(project.id))
711 .add(worktree_entry::Column::IsDeleted.eq(false)),
712 )
713 .stream(tx)
714 .await?;
715 while let Some(db_entry) = db_entries.next().await {
716 let db_entry = db_entry?;
717 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
718 worktree.entries.push(proto::Entry {
719 id: db_entry.id as u64,
720 is_dir: db_entry.is_dir,
721 path: db_entry.path,
722 inode: db_entry.inode as u64,
723 mtime: Some(proto::Timestamp {
724 seconds: db_entry.mtime_seconds as u64,
725 nanos: db_entry.mtime_nanos as u32,
726 }),
727 is_symlink: db_entry.is_symlink,
728 is_ignored: db_entry.is_ignored,
729 is_external: db_entry.is_external,
730 git_status: db_entry.git_status.map(|status| status as i32),
731 is_fifo: db_entry.is_fifo,
732 });
733 }
734 }
735 }
736
737 // Populate repository entries.
738 {
739 let mut db_repository_entries = worktree_repository::Entity::find()
740 .filter(
741 Condition::all()
742 .add(worktree_repository::Column::ProjectId.eq(project.id))
743 .add(worktree_repository::Column::IsDeleted.eq(false)),
744 )
745 .stream(tx)
746 .await?;
747 while let Some(db_repository_entry) = db_repository_entries.next().await {
748 let db_repository_entry = db_repository_entry?;
749 if let Some(worktree) = worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
750 {
751 worktree.repository_entries.insert(
752 db_repository_entry.work_directory_id as u64,
753 proto::RepositoryEntry {
754 work_directory_id: db_repository_entry.work_directory_id as u64,
755 branch: db_repository_entry.branch,
756 },
757 );
758 }
759 }
760 }
761
762 // Populate worktree diagnostic summaries.
763 {
764 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
765 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
766 .stream(tx)
767 .await?;
768 while let Some(db_summary) = db_summaries.next().await {
769 let db_summary = db_summary?;
770 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
771 worktree
772 .diagnostic_summaries
773 .push(proto::DiagnosticSummary {
774 path: db_summary.path,
775 language_server_id: db_summary.language_server_id as u64,
776 error_count: db_summary.error_count as u32,
777 warning_count: db_summary.warning_count as u32,
778 });
779 }
780 }
781 }
782
783 // Populate worktree settings files
784 {
785 let mut db_settings_files = worktree_settings_file::Entity::find()
786 .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
787 .stream(tx)
788 .await?;
789 while let Some(db_settings_file) = db_settings_files.next().await {
790 let db_settings_file = db_settings_file?;
791 if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
792 worktree.settings_files.push(WorktreeSettingsFile {
793 path: db_settings_file.path,
794 content: db_settings_file.content,
795 });
796 }
797 }
798 }
799
800 // Populate language servers.
801 let language_servers = project
802 .find_related(language_server::Entity)
803 .all(tx)
804 .await?;
805
806 let project = Project {
807 id: project.id,
808 role,
809 collaborators: collaborators
810 .into_iter()
811 .map(|collaborator| ProjectCollaborator {
812 connection_id: collaborator.connection(),
813 user_id: collaborator.user_id,
814 replica_id: collaborator.replica_id,
815 is_host: collaborator.is_host,
816 })
817 .collect(),
818 worktrees,
819 language_servers: language_servers
820 .into_iter()
821 .map(|language_server| proto::LanguageServer {
822 id: language_server.id as u64,
823 name: language_server.name,
824 })
825 .collect(),
826 dev_server_project_id: project.dev_server_project_id,
827 };
828 Ok((project, replica_id as ReplicaId))
829 }
830
831 pub async fn leave_hosted_project(
832 &self,
833 project_id: ProjectId,
834 connection: ConnectionId,
835 ) -> Result<LeftProject> {
836 self.transaction(|tx| async move {
837 let result = project_collaborator::Entity::delete_many()
838 .filter(
839 Condition::all()
840 .add(project_collaborator::Column::ProjectId.eq(project_id))
841 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
842 .add(
843 project_collaborator::Column::ConnectionServerId
844 .eq(connection.owner_id as i32),
845 ),
846 )
847 .exec(&*tx)
848 .await?;
849 if result.rows_affected == 0 {
850 return Err(anyhow!("not in the project"))?;
851 }
852
853 let project = project::Entity::find_by_id(project_id)
854 .one(&*tx)
855 .await?
856 .ok_or_else(|| anyhow!("no such project"))?;
857 let collaborators = project
858 .find_related(project_collaborator::Entity)
859 .all(&*tx)
860 .await?;
861 let connection_ids = collaborators
862 .into_iter()
863 .map(|collaborator| collaborator.connection())
864 .collect();
865 Ok(LeftProject {
866 id: project.id,
867 connection_ids,
868 should_unshare: false,
869 })
870 })
871 .await
872 }
873
874 /// Removes the given connection from the specified project.
875 pub async fn leave_project(
876 &self,
877 project_id: ProjectId,
878 connection: ConnectionId,
879 ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
880 self.project_transaction(project_id, |tx| async move {
881 let result = project_collaborator::Entity::delete_many()
882 .filter(
883 Condition::all()
884 .add(project_collaborator::Column::ProjectId.eq(project_id))
885 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
886 .add(
887 project_collaborator::Column::ConnectionServerId
888 .eq(connection.owner_id as i32),
889 ),
890 )
891 .exec(&*tx)
892 .await?;
893 if result.rows_affected == 0 {
894 Err(anyhow!("not a collaborator on this project"))?;
895 }
896
897 let project = project::Entity::find_by_id(project_id)
898 .one(&*tx)
899 .await?
900 .ok_or_else(|| anyhow!("no such project"))?;
901 let collaborators = project
902 .find_related(project_collaborator::Entity)
903 .all(&*tx)
904 .await?;
905 let connection_ids: Vec<ConnectionId> = collaborators
906 .into_iter()
907 .map(|collaborator| collaborator.connection())
908 .collect();
909
910 follower::Entity::delete_many()
911 .filter(
912 Condition::any()
913 .add(
914 Condition::all()
915 .add(follower::Column::ProjectId.eq(Some(project_id)))
916 .add(
917 follower::Column::LeaderConnectionServerId
918 .eq(connection.owner_id),
919 )
920 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
921 )
922 .add(
923 Condition::all()
924 .add(follower::Column::ProjectId.eq(Some(project_id)))
925 .add(
926 follower::Column::FollowerConnectionServerId
927 .eq(connection.owner_id),
928 )
929 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
930 ),
931 )
932 .exec(&*tx)
933 .await?;
934
935 let room = if let Some(room_id) = project.room_id {
936 Some(self.get_room(room_id, &tx).await?)
937 } else {
938 None
939 };
940
941 let left_project = LeftProject {
942 id: project_id,
943 should_unshare: connection == project.host_connection()?,
944 connection_ids,
945 };
946 Ok((room, left_project))
947 })
948 .await
949 }
950
951 pub async fn check_user_is_project_host(
952 &self,
953 project_id: ProjectId,
954 connection_id: ConnectionId,
955 ) -> Result<()> {
956 self.project_transaction(project_id, |tx| async move {
957 project::Entity::find()
958 .filter(
959 Condition::all()
960 .add(project::Column::Id.eq(project_id))
961 .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
962 .add(
963 project::Column::HostConnectionServerId
964 .eq(Some(connection_id.owner_id as i32)),
965 ),
966 )
967 .one(&*tx)
968 .await?
969 .ok_or_else(|| anyhow!("failed to read project host"))?;
970
971 Ok(())
972 })
973 .await
974 .map(|guard| guard.into_inner())
975 }
976
977 /// Returns the current project if the given user is authorized to access it with the specified capability.
978 pub async fn access_project(
979 &self,
980 project_id: ProjectId,
981 connection_id: ConnectionId,
982 principal_id: PrincipalId,
983 capability: Capability,
984 tx: &DatabaseTransaction,
985 ) -> Result<(project::Model, ChannelRole)> {
986 let (mut project, dev_server_project) = project::Entity::find_by_id(project_id)
987 .find_also_related(dev_server_project::Entity)
988 .one(tx)
989 .await?
990 .ok_or_else(|| anyhow!("no such project"))?;
991
992 let user_id = match principal_id {
993 PrincipalId::DevServerId(_) => {
994 if project
995 .host_connection()
996 .is_ok_and(|connection| connection == connection_id)
997 {
998 return Ok((project, ChannelRole::Admin));
999 }
1000 return Err(anyhow!("not the project host"))?;
1001 }
1002 PrincipalId::UserId(user_id) => user_id,
1003 };
1004
1005 let role_from_room = if let Some(room_id) = project.room_id {
1006 room_participant::Entity::find()
1007 .filter(room_participant::Column::RoomId.eq(room_id))
1008 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1009 .one(tx)
1010 .await?
1011 .and_then(|participant| participant.role)
1012 } else {
1013 None
1014 };
1015 let role_from_dev_server = if let Some(dev_server_project) = dev_server_project {
1016 let dev_server = dev_server::Entity::find_by_id(dev_server_project.dev_server_id)
1017 .one(tx)
1018 .await?
1019 .ok_or_else(|| anyhow!("no such channel"))?;
1020 if user_id == dev_server.user_id {
1021 // If the user left the room "uncleanly" they may rejoin the
1022 // remote project before leave_room runs. IN that case kick
1023 // the project out of the room pre-emptively.
1024 if role_from_room.is_none() {
1025 project = project::Entity::update(project::ActiveModel {
1026 room_id: ActiveValue::Set(None),
1027 ..project.into_active_model()
1028 })
1029 .exec(tx)
1030 .await?;
1031 }
1032 Some(ChannelRole::Admin)
1033 } else {
1034 None
1035 }
1036 } else {
1037 None
1038 };
1039
1040 let role = role_from_dev_server
1041 .or(role_from_room)
1042 .unwrap_or(ChannelRole::Banned);
1043
1044 match capability {
1045 Capability::ReadWrite => {
1046 if !role.can_edit_projects() {
1047 return Err(anyhow!("not authorized to edit projects"))?;
1048 }
1049 }
1050 Capability::ReadOnly => {
1051 if !role.can_read_projects() {
1052 return Err(anyhow!("not authorized to read projects"))?;
1053 }
1054 }
1055 }
1056
1057 Ok((project, role))
1058 }
1059
1060 /// Returns the host connection for a read-only request to join a shared project.
1061 pub async fn host_for_read_only_project_request(
1062 &self,
1063 project_id: ProjectId,
1064 connection_id: ConnectionId,
1065 user_id: UserId,
1066 ) -> Result<ConnectionId> {
1067 self.project_transaction(project_id, |tx| async move {
1068 let (project, _) = self
1069 .access_project(
1070 project_id,
1071 connection_id,
1072 PrincipalId::UserId(user_id),
1073 Capability::ReadOnly,
1074 &tx,
1075 )
1076 .await?;
1077 project.host_connection()
1078 })
1079 .await
1080 .map(|guard| guard.into_inner())
1081 }
1082
1083 /// Returns the host connection for a request to join a shared project.
1084 pub async fn host_for_mutating_project_request(
1085 &self,
1086 project_id: ProjectId,
1087 connection_id: ConnectionId,
1088 user_id: UserId,
1089 ) -> Result<ConnectionId> {
1090 self.project_transaction(project_id, |tx| async move {
1091 let (project, _) = self
1092 .access_project(
1093 project_id,
1094 connection_id,
1095 PrincipalId::UserId(user_id),
1096 Capability::ReadWrite,
1097 &tx,
1098 )
1099 .await?;
1100 project.host_connection()
1101 })
1102 .await
1103 .map(|guard| guard.into_inner())
1104 }
1105
1106 /// Returns the host connection for a request to join a shared project.
1107 pub async fn host_for_owner_project_request(
1108 &self,
1109 project_id: ProjectId,
1110 _connection_id: ConnectionId,
1111 user_id: UserId,
1112 ) -> Result<ConnectionId> {
1113 self.project_transaction(project_id, |tx| async move {
1114 let (project, dev_server_project) = project::Entity::find_by_id(project_id)
1115 .find_also_related(dev_server_project::Entity)
1116 .one(&*tx)
1117 .await?
1118 .ok_or_else(|| anyhow!("no such project"))?;
1119
1120 let Some(dev_server_project) = dev_server_project else {
1121 return Err(anyhow!("not a dev server project"))?;
1122 };
1123 let dev_server = dev_server::Entity::find_by_id(dev_server_project.dev_server_id)
1124 .one(&*tx)
1125 .await?
1126 .ok_or_else(|| anyhow!("no such dev server"))?;
1127 if dev_server.user_id != user_id {
1128 return Err(anyhow!("not your project"))?;
1129 }
1130 project.host_connection()
1131 })
1132 .await
1133 .map(|guard| guard.into_inner())
1134 }
1135
1136 pub async fn connections_for_buffer_update(
1137 &self,
1138 project_id: ProjectId,
1139 principal_id: PrincipalId,
1140 connection_id: ConnectionId,
1141 capability: Capability,
1142 ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1143 self.project_transaction(project_id, |tx| async move {
1144 // Authorize
1145 let (project, _) = self
1146 .access_project(project_id, connection_id, principal_id, capability, &tx)
1147 .await?;
1148
1149 let host_connection_id = project.host_connection()?;
1150
1151 let collaborators = project_collaborator::Entity::find()
1152 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1153 .all(&*tx)
1154 .await?;
1155
1156 let guest_connection_ids = collaborators
1157 .into_iter()
1158 .filter_map(|collaborator| {
1159 if collaborator.is_host {
1160 None
1161 } else {
1162 Some(collaborator.connection())
1163 }
1164 })
1165 .collect();
1166
1167 Ok((host_connection_id, guest_connection_ids))
1168 })
1169 .await
1170 }
1171
1172 /// Returns the connection IDs in the given project.
1173 ///
1174 /// The provided `connection_id` must also be a collaborator in the project,
1175 /// otherwise an error will be returned.
1176 pub async fn project_connection_ids(
1177 &self,
1178 project_id: ProjectId,
1179 connection_id: ConnectionId,
1180 exclude_dev_server: bool,
1181 ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1182 self.project_transaction(project_id, |tx| async move {
1183 let project = project::Entity::find_by_id(project_id)
1184 .one(&*tx)
1185 .await?
1186 .ok_or_else(|| anyhow!("no such project"))?;
1187
1188 let mut collaborators = project_collaborator::Entity::find()
1189 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1190 .stream(&*tx)
1191 .await?;
1192
1193 let mut connection_ids = HashSet::default();
1194 if let Some(host_connection) = project.host_connection().log_err() {
1195 if !exclude_dev_server {
1196 connection_ids.insert(host_connection);
1197 }
1198 }
1199
1200 while let Some(collaborator) = collaborators.next().await {
1201 let collaborator = collaborator?;
1202 connection_ids.insert(collaborator.connection());
1203 }
1204
1205 if connection_ids.contains(&connection_id)
1206 || Some(connection_id) == project.host_connection().ok()
1207 {
1208 Ok(connection_ids)
1209 } else {
1210 Err(anyhow!(
1211 "can only send project updates to a project you're in"
1212 ))?
1213 }
1214 })
1215 .await
1216 }
1217
1218 async fn project_guest_connection_ids(
1219 &self,
1220 project_id: ProjectId,
1221 tx: &DatabaseTransaction,
1222 ) -> Result<Vec<ConnectionId>> {
1223 let mut collaborators = project_collaborator::Entity::find()
1224 .filter(
1225 project_collaborator::Column::ProjectId
1226 .eq(project_id)
1227 .and(project_collaborator::Column::IsHost.eq(false)),
1228 )
1229 .stream(tx)
1230 .await?;
1231
1232 let mut guest_connection_ids = Vec::new();
1233 while let Some(collaborator) = collaborators.next().await {
1234 let collaborator = collaborator?;
1235 guest_connection_ids.push(collaborator.connection());
1236 }
1237 Ok(guest_connection_ids)
1238 }
1239
1240 /// Returns the [`RoomId`] for the given project.
1241 pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1242 self.transaction(|tx| async move {
1243 Ok(project::Entity::find_by_id(project_id)
1244 .one(&*tx)
1245 .await?
1246 .and_then(|project| project.room_id))
1247 })
1248 .await
1249 }
1250
1251 pub async fn check_room_participants(
1252 &self,
1253 room_id: RoomId,
1254 leader_id: ConnectionId,
1255 follower_id: ConnectionId,
1256 ) -> Result<()> {
1257 self.transaction(|tx| async move {
1258 use room_participant::Column;
1259
1260 let count = room_participant::Entity::find()
1261 .filter(
1262 Condition::all().add(Column::RoomId.eq(room_id)).add(
1263 Condition::any()
1264 .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1265 Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1266 ))
1267 .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1268 Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1269 )),
1270 ),
1271 )
1272 .count(&*tx)
1273 .await?;
1274
1275 if count < 2 {
1276 Err(anyhow!("not room participants"))?;
1277 }
1278
1279 Ok(())
1280 })
1281 .await
1282 }
1283
1284 /// Adds the given follower connection as a follower of the given leader connection.
1285 pub async fn follow(
1286 &self,
1287 room_id: RoomId,
1288 project_id: ProjectId,
1289 leader_connection: ConnectionId,
1290 follower_connection: ConnectionId,
1291 ) -> Result<TransactionGuard<proto::Room>> {
1292 self.room_transaction(room_id, |tx| async move {
1293 follower::ActiveModel {
1294 room_id: ActiveValue::set(room_id),
1295 project_id: ActiveValue::set(project_id),
1296 leader_connection_server_id: ActiveValue::set(ServerId(
1297 leader_connection.owner_id as i32,
1298 )),
1299 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1300 follower_connection_server_id: ActiveValue::set(ServerId(
1301 follower_connection.owner_id as i32,
1302 )),
1303 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1304 ..Default::default()
1305 }
1306 .insert(&*tx)
1307 .await?;
1308
1309 let room = self.get_room(room_id, &tx).await?;
1310 Ok(room)
1311 })
1312 .await
1313 }
1314
1315 /// Removes the given follower connection as a follower of the given leader connection.
1316 pub async fn unfollow(
1317 &self,
1318 room_id: RoomId,
1319 project_id: ProjectId,
1320 leader_connection: ConnectionId,
1321 follower_connection: ConnectionId,
1322 ) -> Result<TransactionGuard<proto::Room>> {
1323 self.room_transaction(room_id, |tx| async move {
1324 follower::Entity::delete_many()
1325 .filter(
1326 Condition::all()
1327 .add(follower::Column::RoomId.eq(room_id))
1328 .add(follower::Column::ProjectId.eq(project_id))
1329 .add(
1330 follower::Column::LeaderConnectionServerId
1331 .eq(leader_connection.owner_id),
1332 )
1333 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1334 .add(
1335 follower::Column::FollowerConnectionServerId
1336 .eq(follower_connection.owner_id),
1337 )
1338 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1339 )
1340 .exec(&*tx)
1341 .await?;
1342
1343 let room = self.get_room(room_id, &tx).await?;
1344 Ok(room)
1345 })
1346 .await
1347 }
1348}